Start work on generic workspaces service

This commit is contained in:
Ezri Brimhall 2024-11-17 14:15:31 -07:00
parent 8e9abdfd7f
commit c8d260892c
Signed by: ezri
GPG Key ID: 058A78E5680C6F24
4 changed files with 729 additions and 0 deletions

View File

@ -0,0 +1,140 @@
from abc import ABC, abstractmethod
from .config import OutputConfig
class AbstractWorkspaceBackend(ABC):
"""
Abstract class for workspace backends (i.e. an IPC interface to a compositor / window manager)
"""
_instance: "AbstractWorkspaceBackend" = None
def __init__(self):
super().__init__()
if self._instance is not None:
raise RuntimeError(
"Cannot instantiate more than one WorkspaceBackend at a time. Use getBackend() to get the current backend."
)
self._instance = self
def _scaleToString(self, scale: int) -> str:
"""Convert the fixed-point scale to a string"""
# We do it this way to not lose precision when converting to a float
return f"{scale // 100}.{scale % 100}"
@abstractmethod
async def focusWorkspace(self, index: int):
"""Focus a workspace in the compositor based off of its compositor index"""
pass
@abstractmethod
async def moveWorkspace(self, index: int, output: str):
"""Move a workspace to a given output"""
pass
@abstractmethod
async def focusOutput(self, output: str):
"""Focus an output in the compositor"""
pass
@abstractmethod
async def moveContainer(self, workspace: int):
"""Move the focused container to a given workspace compositor index"""
pass
@abstractmethod
async def configureOutput(self, output: str, config: OutputConfig):
"""Configure an output"""
pass
@abstractmethod
def onWorkspaceChange(self, callback):
"""Register a callback to be called when the workspace changes"""
pass
@abstractmethod
def onOutputChange(self, callback):
"""Register a callback to be called when the output configuration changes"""
pass
@abstractmethod
def onModeChange(self, callback):
"""Register a callback to be called when the binding mode changes, in compositors that support it"""
pass
@abstractmethod
@property
def name(self) -> str:
"""The name of the backend"""
pass
def getBackend() -> AbstractWorkspaceBackend:
"""
Get the currently-active workspace backend
"""
if AbstractWorkspaceBackend._instance is None:
raise RuntimeError(
"No workspace backend is currently active. Please instantiate one."
)
return AbstractWorkspaceBackend._instance
class WorkspaceAdapter:
"""
Common interface for workspace backends that represents the result of a
workspace query. Only contains information that the workspace manager needs to know.
"""
def __init__(
self, index: int, output: str, focused: bool, visible: bool, urgent: bool
):
self.index = index
self.output = output
self.focused = focused
self.visible = visible
self.urgent = urgent
class RectAdapter:
"""
Common interface for workspace backends that represents a rectangle.
"""
def __init__(self, x: int, y: int, width: int, height: int):
self.x = x
self.y = y
self.width = width
self.height = height
class OutputAdapter:
"""
Common interface for workspace backends that represents the result of an
output query. Only contains information that the workspace manager needs to know.
"""
def __init__(
self,
name: str,
active: bool,
rect: RectAdapter,
scale: int,
transform: str,
focused: bool,
current_workspace: int,
make: str,
model: str,
serial: str,
):
self.name = name
self.active = active
self.rect = rect
self.scale = scale
self.transform = transform
self.focused = focused
self.current_workspace = current_workspace
self.make = make
self.model = model
self.serial = serial

View File

@ -0,0 +1,108 @@
from typing import TypedDict, NotRequired, Annotated
class OutputConfig(TypedDict):
x: Annotated[int, "The x position of the output in the global coordinate space"]
y: Annotated[int, "The y position of the output in the global coordinate space"]
mode: Annotated[
str,
"The mode to set the output to (e.g. 1920x1080). Includes the refresh rate if applicable.",
]
scale: Annotated[
NotRequired[int],
"The scale of the output as a 10^2 fixed-point number (e.g. 1.0 = 100). If not provided, a value of 100 (no scaling) is assumed.",
]
transform: Annotated[
NotRequired[str],
"The transform to apply to the output (e.g. 'normal', '90', '180', '270', 'flipped', 'flipped-90', 'flipped-180', 'flipped-270'). If not provided, a value of 'normal' is assumed.",
]
color_profile: Annotated[
NotRequired[str],
"The color profile to apply to the output. If not provided, no color profile is applied. Support for this is compositor-dependent.",
]
class GroupConfig(OutputConfig):
"""A group of workspaces displayed on a single output.
At least one of either `output_names` or `make`, `model`, and `serial` must be provided.
"""
name: Annotated[str, "The name of the workspace group"]
workspaces: Annotated[
list[int],
"The indices of the workspaces in the group. The first workspace in the list is the default workspace for the group.",
]
output_names: Annotated[
NotRequired[list[str]],
"The names of the outputs to use for the group (e.g 'eDP-1').",
]
make: Annotated[NotRequired[str], "The make of the output (e.g. 'Dell')."]
model: Annotated[NotRequired[str], "The model of the output (e.g. 'U2412M')."]
serial: Annotated[NotRequired[str], "The serial number of the output."]
reverse_order: Annotated[
NotRequired[bool],
"Whether to reverse the order of the workspaces in the group when rendering workspace indicators. If not provided, a value of False is assumed.",
]
bars: Annotated[
NotRequired[list[str]],
"The names of the bars to display on the output. If not provided, no bars are displayed.",
]
class MemoryProfile(TypedDict):
"""A memory profile for an application"""
high: Annotated[int, "The high memory threshold"]
max: Annotated[int, "The maximum memory the application can use"]
class WorkspaceConfig(TypedDict):
"""A workspace configuration."""
index: Annotated[int, "The index of the workspace in the compositor"]
name: Annotated[str, "The name of the workspace, as displayed to the user"]
exec: Annotated[str, "The default application command to run in the workspace"]
program_name: Annotated[
NotRequired[str],
"The name of the program run in this workspace. If not provided, the name of the workspace is used.",
]
memory_profile: Annotated[
NotRequired[MemoryProfile],
"The memory profile for the application. If not provided, no memory profile is applied.",
]
args: Annotated[
NotRequired[list[str]],
"The arguments to pass to the default application. If not provided, no arguments are passed.",
]
environ: Annotated[
NotRequired[dict[str, str]],
"The environment variables to set when running the default application. If not provided, no additional environment variables are set.",
]
systemd: Annotated[
NotRequired[bool],
"Whether to run the default application as a systemd unit. If not provided, a value of True is assumed.",
]
void_output: Annotated[
NotRequired[bool],
"Whether to void the output (not capture it to the system journal) of the default application. If not provided, a value of False is assumed.",
]
class Context(TypedDict):
"""A context configuration."""
name: Annotated[str, "The name of the context"]
groups: Annotated[
list[GroupConfig],
"The groups of workspaces to display on the outputs",
]
priority: Annotated[
NotRequired[int],
"The priority of the context. If not provided, a value of 0 is assumed.",
]
class Config(TypedDict):
workspaces: list[WorkspaceConfig]
contexts: list[Context]

View File

@ -0,0 +1,94 @@
from .backend import OutputAdapter
from .config import GroupConfig, ContextConfig
from typing import Tuple
def getPerfectMatch(
config: GroupConfig, outputs: list[OutputAdapter]
) -> OutputAdapter | None:
"""Returns the output adapter of a perfectly-matching output for the given config. or None if no such output exists."""
for output in outputs:
if (
output.make == config.get("make")
and output.model == config.get("model")
and output.serial == config.get("serial")
):
return output
return None
def getNamePreferences(
config: GroupConfig, outputs: list[OutputAdapter]
) -> list[OutputAdapter]:
"""
Returns the list of outputs that match the names in the config, in preference order.
"""
result = [None] * len(config.get("output_names", []))
for output in outputs:
for i, name in enumerate(config.get("output_names", [])):
if output.name == name:
result[i] = output
def computePerfectScore(
config: ContextConfig, outputs: list[OutputAdapter]
) -> Tuple[int, dict[str, OutputAdapter]]:
"""
Returns the computed score of the context for the given outputs, along with
the outputs that would be matched to each group.
Does not include context priority.
"""
score = 0
map = {}
groups = config.get("groups", [])
outputSet = set(outputs)
usedGroups = set()
# First, compute the perfect score, and remove any outputs that match
for group in groups:
match = getPerfectMatch(group, list(outputSet))
if match is None:
continue
score += 3
outputSet.remove(match)
usedGroups.add(group)
map[group.get("name")] = match
groups = [group for group in groups if group not in usedGroups]
if len(groups) == 0:
# Early return if all groups have been matched
return score, map
# Next, compute the preferences based on name
outputList = list(outputSet)
preferences = [getNamePreferences(group, outputList) for group in groups]
maxPreferences = max(len(preference) for preference in preferences)
# Iterate through the preferences, and remove any outputs that match.
# This is essentially a form of ranked-choice voting system.
for i in range(maxPreferences):
for j, preference in enumerate(preferences):
if (
i < len(preference)
and preference[i] is not None
and preference[i] in outputSet
):
score += 2
outputSet.remove(preference[i])
usedGroups.add(groups[j])
map[groups[j].get("name")] = preference[i]
groups = [group for group in groups if group not in usedGroups]
# Now, if there is more than one group left, we have failed, as only one group can match on wildcard.
# If there are no groups left, return the score.
if len(groups) > 1:
return 0, {}
elif len(groups) == 0:
return score, map
# Finally, if there is one group left, we can match on wildcard. Add one point, and give it the first
# remaining output.
score += 1
map[groups[0].get("name")] = list(outputSet)[0]
return score, map

View File

@ -0,0 +1,387 @@
from gi.repository import GObject
from sdbus import (
DbusInterfaceCommonAsync,
dbus_method_async,
dbus_property_async,
dbus_signal_async,
)
from .config import WorkspaceConfig, GroupConfig, ContextConfig, Config
from .backend import getBackend, WorkspaceAdapter
import logging
logger = logging.getLogger(__name__)
class Workspace(
GObject.Object,
):
class DBusInterface(
DbusInterfaceCommonAsync,
interface_name="dev.ezri.voidshell.workspaces.Workspace",
):
def __init__(self, workspace: "Workspace" = None):
super().__init__()
self.workspace = workspace
@dbus_signal_async()
def Focused(self):
raise NotImplementedError
@dbus_signal_async()
def LostFocus(self):
raise NotImplementedError
@dbus_signal_async()
def Created(self):
raise NotImplementedError
@dbus_signal_async()
def Destroyed(self):
raise NotImplementedError
@dbus_signal_async("oo")
def Moved(self, oldGroupPath: str, newGroupPath: str):
raise NotImplementedError
@dbus_signal_async()
def Alert(self):
raise NotImplementedError
@dbus_method_async(input_signature="", result_signature="s")
async def Focus(self):
await self.workspace.focus()
@dbus_method_async(input_signature="o", result_signature="s")
async def Reassign(self, groupPath: str):
await self.workspace.reassign(Manager.get_manager().pathToObject(groupPath))
@dbus_property_async("s")
def Name(self):
return self.workspace.definition["name"]
@dbus_property_async("s")
def Output(self):
return self.workspace.current_output
@dbus_property_async("o")
def CurrentGroup(self):
return self.workspace.current_group.bus_name
@dbus_property_async("ao")
def Groups(self):
return [group.bus_name for group in self.workspace.groups]
@dbus_property_async("b")
def IsFocused(self):
return self.workspace.focused
@dbus_property_async("b")
def IsVisible(self):
return self.workspace.visible
@dbus_property_async("b")
def IsActive(self):
return self.workspace.active
@dbus_property_async("b")
def IsAlerted(self):
return self.workspace.alerted
class DBusObject(DBusInterface):
def __init__(self, workspace: "Workspace"):
super().__init__(workspace)
self.export_to_dbus(workspace.bus_name)
class DBusProxy(DBusInterface):
def __init__(self, workspaceIndex: str | int):
super().__init__()
if workspaceIndex is int:
self._proxify(
"dev.ezri.voidshell",
f"/dev/ezri/voidshell/workspaces/workspace/{workspaceIndex}",
)
else:
self._proxify(
"dev.ezri.voidshell",
workspaceIndex,
)
def __init__(
self,
config: WorkspaceConfig,
):
super().__init__()
self.definition = config
self.groups = []
self._current_group = None
self._current_output = None
self._focused = False
self._visible = False
self._active = False
self._alerted = False
self._dbusObject = Workspace.DBusObject(self)
@GObject.Property
def current_group(self):
return self._current_group
@GObject.Property
def current_output(self):
return self._current_output
@GObject.Property
def focused(self):
return self._focused
@GObject.Property
def visible(self):
return self._visible
@GObject.Property
def active(self):
return self._active
@GObject.Property
def alerted(self):
return self._alerted
@GObject.Property
def bus_name(self):
return f"/dev/ezri/voidshell/workspaces/workspace/{self.definition['index']}"
async def focus(self):
"""Focus the workspace"""
adapter = getBackend()
await adapter.focusWorkspace(self.definition["index"])
async def reassign(self, group):
"""Reassign the workspace to a different group"""
adapter = getBackend()
await adapter.moveWorkspace(self.definition["index"], group.output_name)
def sync(self, data: WorkspaceAdapter):
"""Synchronize the state of the workspace object with data from the compositor."""
if not self._active:
self._active = True
self._dbusObject.Created.emit(None)
manager = Manager.get_manager()
newOutput = data.output_name
if newOutput != self._current_output:
self._current_output = newOutput
self.notify("current-output")
newGroup = manager.getGroupByOutput(newOutput)
if newGroup != self._current_group:
oldGroup = self._current_group
self._current_group = newGroup
self.notify("current-group")
if newGroup not in self.groups:
logger.warn(
f"Group {newGroup} not in workspace {self.definition['name']}"
)
self._dbusObject.Moved.emit(
oldGroup.bus_name,
newGroup.bus_name,
)
newGroup.notify("focused-workspace")
if data.focused != self._focused:
self._focused = data.focused
self.notify("focused")
if data.focused:
self._dbusObject.Focused.emit(None)
else:
self._dbusObject.LostFocus.emit(None)
if data.visible != self._visible:
self._visible = data.visible
self.notify("visible")
if data.urgent != self._alerted:
self._alerted = data.urgent
self.notify("alerted")
def deactivate(self):
"""Deactivate the workspace."""
if self._focused:
self._focused = False
self.notify("focused")
self._dbusObject.LostFocus.emit(None)
if self._active:
self._active = False
self.notify("active")
self._dbusObject.Destroyed.emit(None)
if self._visible:
self._visible = False
self.notify("visible")
if self._alerted:
self._alerted = False
self.notify("alerted")
def addGroup(self, group: "Group"):
self.groups.append(group)
class Group(GObject.Object):
class DBusInterface(
DbusInterfaceCommonAsync,
interface_name="dev.ezri.voidshell.workspaces.Group",
):
def __init__(self, group: "Group" = None):
super().__init__()
self.group = group
@dbus_signal_async("")
def Focused(self):
raise NotImplementedError
@dbus_signal_async("")
def LostFocus(self):
raise NotImplementedError
@dbus_signal_async("o")
def VisibleWorkspaceChanged(self, workspacePath: str):
raise NotImplementedError
@dbus_method_async(input_signature="", result_signature="s")
async def Focus(self):
return await self.group.focus()
@dbus_method_async(input_signature="", result_signature="s")
async def Reconfigure(self):
return await self.group.configure()
@dbus_property_async("s")
def Name(self):
return self.group.definition["name"]
@dbus_property_async("s")
def Output(self):
return self.group.output_name
@dbus_property_async("b")
def IsFocused(self):
return self.group.focused
@dbus_property_async("o")
def Context(self):
return self.group.context.bus_name
@dbus_property_async("o")
def FocusedWorkspace(self):
return self.group.focused_workspace.bus_name
@dbus_property_async("ao")
def Workspaces(self):
return [w.bus_name for w in self.group.workspaces]
class DBusObject(DBusInterface):
def __init__(self, group: "Group"):
super().__init__(group)
self.export_to_dbus(group.bus_name)
class DBusProxy(DBusInterface):
def __init__(self, name: str):
super().__init__()
if "/" in name:
self._proxify(name)
else:
self._proxify(f"/dev/ezri/voidshell/workspaces/group/{name}")
def __init__(self, definition: GroupConfig):
super().__init__()
manager = Manager.get_manager()
self.definition = definition
self.workspaces = [
manager._getWorkspaceByCompositorIndex(ws)
for ws in definition.get("workspaces")
]
self.output_name = None
self.focused = False
self._focused_workspace = None
self.context = None
self._dbusObject = Group.DBusObject(self)
class Manager(
GObject.Object,
):
class DBusInterface(
DbusInterfaceCommonAsync,
interface_name="dev.ezri.voidshell.workspaces.Tree",
):
def __init__(self, tree: "Manager" = None):
super().__init__()
self.tree = tree
@dbus_signal_async()
def Updated(self):
raise NotImplementedError
@dbus_method_async(input_signature="s", result_signature="s")
async def GetWorkspace(self, workspacePath: str):
return self.tree.getWorkspace(workspacePath)
class DBusObject(DBusInterface):
def __init__(self, tree: "Manager"):
super().__init__(tree)
self.export_to_dbus("/dev/ezri/voidshell/workspaces/tree")
class DBusProxy(DBusInterface):
def __init__(self):
super().__init__()
self._proxify("dev.ezri.voidshell", "/dev/ezri/voidshell/workspaces/tree")
_instance = None
@staticmethod
def get_manager() -> "Manager":
return Manager._instance
def __init__(
self,
config: Config,
):
super().__init__()
Manager._instance = self
self.config = config
self.workspaces = [
Workspace(workspaceConfig) for workspaceConfig in self.config.workspaces
]
def pathToObject(self, path: str):
# Path structure: /dev/ezri/voidshell/workspaces/{context,group,workspace}/{index for workspace, name for others}
path = path.split("/")
if (
path[0] != ""
or path[1] != "dev"
or path[2] != "ezri"
or path[3] != "voidshell"
or path[4] != "workspaces"
):
raise ValueError("Invalid path")
if path[5] == "context":
return self.findContextByName(path[6])
if path[5] == "group":
return self.findGroupByName(path[6])
if path[5] == "workspace":
return self.findWorkspaceByIndex(int(path[6]))
raise ValueError("Invalid path")
def getGroupByOutput(self, outputName: str):
"""Get the group that is currently assigned to the given output"""
pass