From c8d260892c418b00ff70e6bf51317f8ed20953f0 Mon Sep 17 00:00:00 2001 From: Ezri Brimhall Date: Sun, 17 Nov 2024 14:15:31 -0700 Subject: [PATCH] Start work on generic workspaces service --- VoidShell/services/workspaces/backend.py | 140 ++++++++ VoidShell/services/workspaces/config.py | 108 +++++++ VoidShell/services/workspaces/scoring.py | 94 ++++++ VoidShell/services/workspaces/tree.py | 387 +++++++++++++++++++++++ 4 files changed, 729 insertions(+) create mode 100644 VoidShell/services/workspaces/backend.py create mode 100644 VoidShell/services/workspaces/config.py create mode 100644 VoidShell/services/workspaces/scoring.py create mode 100644 VoidShell/services/workspaces/tree.py diff --git a/VoidShell/services/workspaces/backend.py b/VoidShell/services/workspaces/backend.py new file mode 100644 index 0000000..13c80cc --- /dev/null +++ b/VoidShell/services/workspaces/backend.py @@ -0,0 +1,140 @@ +from abc import ABC, abstractmethod +from .config import OutputConfig + + +class AbstractWorkspaceBackend(ABC): + """ + Abstract class for workspace backends (i.e. an IPC interface to a compositor / window manager) + """ + + _instance: "AbstractWorkspaceBackend" = None + + def __init__(self): + super().__init__() + if self._instance is not None: + raise RuntimeError( + "Cannot instantiate more than one WorkspaceBackend at a time. Use getBackend() to get the current backend." + ) + self._instance = self + + def _scaleToString(self, scale: int) -> str: + """Convert the fixed-point scale to a string""" + # We do it this way to not lose precision when converting to a float + return f"{scale // 100}.{scale % 100}" + + @abstractmethod + async def focusWorkspace(self, index: int): + """Focus a workspace in the compositor based off of its compositor index""" + pass + + @abstractmethod + async def moveWorkspace(self, index: int, output: str): + """Move a workspace to a given output""" + pass + + @abstractmethod + async def focusOutput(self, output: str): + """Focus an output in the compositor""" + pass + + @abstractmethod + async def moveContainer(self, workspace: int): + """Move the focused container to a given workspace compositor index""" + pass + + @abstractmethod + async def configureOutput(self, output: str, config: OutputConfig): + """Configure an output""" + pass + + @abstractmethod + def onWorkspaceChange(self, callback): + """Register a callback to be called when the workspace changes""" + pass + + @abstractmethod + def onOutputChange(self, callback): + """Register a callback to be called when the output configuration changes""" + pass + + @abstractmethod + def onModeChange(self, callback): + """Register a callback to be called when the binding mode changes, in compositors that support it""" + pass + + @abstractmethod + @property + def name(self) -> str: + """The name of the backend""" + pass + + +def getBackend() -> AbstractWorkspaceBackend: + """ + Get the currently-active workspace backend + """ + + if AbstractWorkspaceBackend._instance is None: + raise RuntimeError( + "No workspace backend is currently active. Please instantiate one." + ) + return AbstractWorkspaceBackend._instance + + +class WorkspaceAdapter: + """ + Common interface for workspace backends that represents the result of a + workspace query. Only contains information that the workspace manager needs to know. + """ + + def __init__( + self, index: int, output: str, focused: bool, visible: bool, urgent: bool + ): + self.index = index + self.output = output + self.focused = focused + self.visible = visible + self.urgent = urgent + + +class RectAdapter: + """ + Common interface for workspace backends that represents a rectangle. + """ + + def __init__(self, x: int, y: int, width: int, height: int): + self.x = x + self.y = y + self.width = width + self.height = height + + +class OutputAdapter: + """ + Common interface for workspace backends that represents the result of an + output query. Only contains information that the workspace manager needs to know. + """ + + def __init__( + self, + name: str, + active: bool, + rect: RectAdapter, + scale: int, + transform: str, + focused: bool, + current_workspace: int, + make: str, + model: str, + serial: str, + ): + self.name = name + self.active = active + self.rect = rect + self.scale = scale + self.transform = transform + self.focused = focused + self.current_workspace = current_workspace + self.make = make + self.model = model + self.serial = serial diff --git a/VoidShell/services/workspaces/config.py b/VoidShell/services/workspaces/config.py new file mode 100644 index 0000000..a642fed --- /dev/null +++ b/VoidShell/services/workspaces/config.py @@ -0,0 +1,108 @@ +from typing import TypedDict, NotRequired, Annotated + + +class OutputConfig(TypedDict): + x: Annotated[int, "The x position of the output in the global coordinate space"] + y: Annotated[int, "The y position of the output in the global coordinate space"] + mode: Annotated[ + str, + "The mode to set the output to (e.g. 1920x1080). Includes the refresh rate if applicable.", + ] + scale: Annotated[ + NotRequired[int], + "The scale of the output as a 10^2 fixed-point number (e.g. 1.0 = 100). If not provided, a value of 100 (no scaling) is assumed.", + ] + transform: Annotated[ + NotRequired[str], + "The transform to apply to the output (e.g. 'normal', '90', '180', '270', 'flipped', 'flipped-90', 'flipped-180', 'flipped-270'). If not provided, a value of 'normal' is assumed.", + ] + color_profile: Annotated[ + NotRequired[str], + "The color profile to apply to the output. If not provided, no color profile is applied. Support for this is compositor-dependent.", + ] + + +class GroupConfig(OutputConfig): + """A group of workspaces displayed on a single output. + + At least one of either `output_names` or `make`, `model`, and `serial` must be provided. + """ + + name: Annotated[str, "The name of the workspace group"] + workspaces: Annotated[ + list[int], + "The indices of the workspaces in the group. The first workspace in the list is the default workspace for the group.", + ] + output_names: Annotated[ + NotRequired[list[str]], + "The names of the outputs to use for the group (e.g 'eDP-1').", + ] + make: Annotated[NotRequired[str], "The make of the output (e.g. 'Dell')."] + model: Annotated[NotRequired[str], "The model of the output (e.g. 'U2412M')."] + serial: Annotated[NotRequired[str], "The serial number of the output."] + reverse_order: Annotated[ + NotRequired[bool], + "Whether to reverse the order of the workspaces in the group when rendering workspace indicators. If not provided, a value of False is assumed.", + ] + bars: Annotated[ + NotRequired[list[str]], + "The names of the bars to display on the output. If not provided, no bars are displayed.", + ] + + +class MemoryProfile(TypedDict): + """A memory profile for an application""" + + high: Annotated[int, "The high memory threshold"] + max: Annotated[int, "The maximum memory the application can use"] + + +class WorkspaceConfig(TypedDict): + """A workspace configuration.""" + + index: Annotated[int, "The index of the workspace in the compositor"] + name: Annotated[str, "The name of the workspace, as displayed to the user"] + exec: Annotated[str, "The default application command to run in the workspace"] + program_name: Annotated[ + NotRequired[str], + "The name of the program run in this workspace. If not provided, the name of the workspace is used.", + ] + memory_profile: Annotated[ + NotRequired[MemoryProfile], + "The memory profile for the application. If not provided, no memory profile is applied.", + ] + args: Annotated[ + NotRequired[list[str]], + "The arguments to pass to the default application. If not provided, no arguments are passed.", + ] + environ: Annotated[ + NotRequired[dict[str, str]], + "The environment variables to set when running the default application. If not provided, no additional environment variables are set.", + ] + systemd: Annotated[ + NotRequired[bool], + "Whether to run the default application as a systemd unit. If not provided, a value of True is assumed.", + ] + void_output: Annotated[ + NotRequired[bool], + "Whether to void the output (not capture it to the system journal) of the default application. If not provided, a value of False is assumed.", + ] + + +class Context(TypedDict): + """A context configuration.""" + + name: Annotated[str, "The name of the context"] + groups: Annotated[ + list[GroupConfig], + "The groups of workspaces to display on the outputs", + ] + priority: Annotated[ + NotRequired[int], + "The priority of the context. If not provided, a value of 0 is assumed.", + ] + + +class Config(TypedDict): + workspaces: list[WorkspaceConfig] + contexts: list[Context] diff --git a/VoidShell/services/workspaces/scoring.py b/VoidShell/services/workspaces/scoring.py new file mode 100644 index 0000000..3927aa0 --- /dev/null +++ b/VoidShell/services/workspaces/scoring.py @@ -0,0 +1,94 @@ +from .backend import OutputAdapter +from .config import GroupConfig, ContextConfig +from typing import Tuple + + +def getPerfectMatch( + config: GroupConfig, outputs: list[OutputAdapter] +) -> OutputAdapter | None: + """Returns the output adapter of a perfectly-matching output for the given config. or None if no such output exists.""" + for output in outputs: + if ( + output.make == config.get("make") + and output.model == config.get("model") + and output.serial == config.get("serial") + ): + return output + return None + + +def getNamePreferences( + config: GroupConfig, outputs: list[OutputAdapter] +) -> list[OutputAdapter]: + """ + Returns the list of outputs that match the names in the config, in preference order. + """ + result = [None] * len(config.get("output_names", [])) + for output in outputs: + for i, name in enumerate(config.get("output_names", [])): + if output.name == name: + result[i] = output + + +def computePerfectScore( + config: ContextConfig, outputs: list[OutputAdapter] +) -> Tuple[int, dict[str, OutputAdapter]]: + """ + Returns the computed score of the context for the given outputs, along with + the outputs that would be matched to each group. + + Does not include context priority. + """ + score = 0 + map = {} + groups = config.get("groups", []) + outputSet = set(outputs) + usedGroups = set() + # First, compute the perfect score, and remove any outputs that match + for group in groups: + match = getPerfectMatch(group, list(outputSet)) + if match is None: + continue + score += 3 + outputSet.remove(match) + usedGroups.add(group) + map[group.get("name")] = match + + groups = [group for group in groups if group not in usedGroups] + + if len(groups) == 0: + # Early return if all groups have been matched + return score, map + + # Next, compute the preferences based on name + outputList = list(outputSet) + preferences = [getNamePreferences(group, outputList) for group in groups] + maxPreferences = max(len(preference) for preference in preferences) + # Iterate through the preferences, and remove any outputs that match. + # This is essentially a form of ranked-choice voting system. + for i in range(maxPreferences): + for j, preference in enumerate(preferences): + if ( + i < len(preference) + and preference[i] is not None + and preference[i] in outputSet + ): + score += 2 + outputSet.remove(preference[i]) + usedGroups.add(groups[j]) + map[groups[j].get("name")] = preference[i] + + groups = [group for group in groups if group not in usedGroups] + + # Now, if there is more than one group left, we have failed, as only one group can match on wildcard. + # If there are no groups left, return the score. + if len(groups) > 1: + return 0, {} + elif len(groups) == 0: + return score, map + + # Finally, if there is one group left, we can match on wildcard. Add one point, and give it the first + # remaining output. + score += 1 + map[groups[0].get("name")] = list(outputSet)[0] + return score, map diff --git a/VoidShell/services/workspaces/tree.py b/VoidShell/services/workspaces/tree.py new file mode 100644 index 0000000..7410bfe --- /dev/null +++ b/VoidShell/services/workspaces/tree.py @@ -0,0 +1,387 @@ +from gi.repository import GObject +from sdbus import ( + DbusInterfaceCommonAsync, + dbus_method_async, + dbus_property_async, + dbus_signal_async, +) +from .config import WorkspaceConfig, GroupConfig, ContextConfig, Config +from .backend import getBackend, WorkspaceAdapter +import logging + +logger = logging.getLogger(__name__) + + +class Workspace( + GObject.Object, +): + + class DBusInterface( + DbusInterfaceCommonAsync, + interface_name="dev.ezri.voidshell.workspaces.Workspace", + ): + + def __init__(self, workspace: "Workspace" = None): + super().__init__() + self.workspace = workspace + + @dbus_signal_async() + def Focused(self): + raise NotImplementedError + + @dbus_signal_async() + def LostFocus(self): + raise NotImplementedError + + @dbus_signal_async() + def Created(self): + raise NotImplementedError + + @dbus_signal_async() + def Destroyed(self): + raise NotImplementedError + + @dbus_signal_async("oo") + def Moved(self, oldGroupPath: str, newGroupPath: str): + raise NotImplementedError + + @dbus_signal_async() + def Alert(self): + raise NotImplementedError + + @dbus_method_async(input_signature="", result_signature="s") + async def Focus(self): + await self.workspace.focus() + + @dbus_method_async(input_signature="o", result_signature="s") + async def Reassign(self, groupPath: str): + await self.workspace.reassign(Manager.get_manager().pathToObject(groupPath)) + + @dbus_property_async("s") + def Name(self): + return self.workspace.definition["name"] + + @dbus_property_async("s") + def Output(self): + return self.workspace.current_output + + @dbus_property_async("o") + def CurrentGroup(self): + return self.workspace.current_group.bus_name + + @dbus_property_async("ao") + def Groups(self): + return [group.bus_name for group in self.workspace.groups] + + @dbus_property_async("b") + def IsFocused(self): + return self.workspace.focused + + @dbus_property_async("b") + def IsVisible(self): + return self.workspace.visible + + @dbus_property_async("b") + def IsActive(self): + return self.workspace.active + + @dbus_property_async("b") + def IsAlerted(self): + return self.workspace.alerted + + class DBusObject(DBusInterface): + + def __init__(self, workspace: "Workspace"): + super().__init__(workspace) + self.export_to_dbus(workspace.bus_name) + + class DBusProxy(DBusInterface): + + def __init__(self, workspaceIndex: str | int): + super().__init__() + if workspaceIndex is int: + self._proxify( + "dev.ezri.voidshell", + f"/dev/ezri/voidshell/workspaces/workspace/{workspaceIndex}", + ) + else: + self._proxify( + "dev.ezri.voidshell", + workspaceIndex, + ) + + def __init__( + self, + config: WorkspaceConfig, + ): + super().__init__() + self.definition = config + self.groups = [] + self._current_group = None + self._current_output = None + self._focused = False + self._visible = False + self._active = False + self._alerted = False + + self._dbusObject = Workspace.DBusObject(self) + + @GObject.Property + def current_group(self): + return self._current_group + + @GObject.Property + def current_output(self): + return self._current_output + + @GObject.Property + def focused(self): + return self._focused + + @GObject.Property + def visible(self): + return self._visible + + @GObject.Property + def active(self): + return self._active + + @GObject.Property + def alerted(self): + return self._alerted + + @GObject.Property + def bus_name(self): + return f"/dev/ezri/voidshell/workspaces/workspace/{self.definition['index']}" + + async def focus(self): + """Focus the workspace""" + adapter = getBackend() + await adapter.focusWorkspace(self.definition["index"]) + + async def reassign(self, group): + """Reassign the workspace to a different group""" + adapter = getBackend() + await adapter.moveWorkspace(self.definition["index"], group.output_name) + + def sync(self, data: WorkspaceAdapter): + """Synchronize the state of the workspace object with data from the compositor.""" + if not self._active: + self._active = True + self._dbusObject.Created.emit(None) + manager = Manager.get_manager() + newOutput = data.output_name + if newOutput != self._current_output: + self._current_output = newOutput + self.notify("current-output") + newGroup = manager.getGroupByOutput(newOutput) + if newGroup != self._current_group: + oldGroup = self._current_group + self._current_group = newGroup + self.notify("current-group") + if newGroup not in self.groups: + logger.warn( + f"Group {newGroup} not in workspace {self.definition['name']}" + ) + self._dbusObject.Moved.emit( + oldGroup.bus_name, + newGroup.bus_name, + ) + newGroup.notify("focused-workspace") + if data.focused != self._focused: + self._focused = data.focused + self.notify("focused") + if data.focused: + self._dbusObject.Focused.emit(None) + else: + self._dbusObject.LostFocus.emit(None) + if data.visible != self._visible: + self._visible = data.visible + self.notify("visible") + if data.urgent != self._alerted: + self._alerted = data.urgent + self.notify("alerted") + + def deactivate(self): + """Deactivate the workspace.""" + if self._focused: + self._focused = False + self.notify("focused") + self._dbusObject.LostFocus.emit(None) + + if self._active: + self._active = False + self.notify("active") + self._dbusObject.Destroyed.emit(None) + + if self._visible: + self._visible = False + self.notify("visible") + + if self._alerted: + self._alerted = False + self.notify("alerted") + + def addGroup(self, group: "Group"): + self.groups.append(group) + + +class Group(GObject.Object): + + class DBusInterface( + DbusInterfaceCommonAsync, + interface_name="dev.ezri.voidshell.workspaces.Group", + ): + + def __init__(self, group: "Group" = None): + super().__init__() + self.group = group + + @dbus_signal_async("") + def Focused(self): + raise NotImplementedError + + @dbus_signal_async("") + def LostFocus(self): + raise NotImplementedError + + @dbus_signal_async("o") + def VisibleWorkspaceChanged(self, workspacePath: str): + raise NotImplementedError + + @dbus_method_async(input_signature="", result_signature="s") + async def Focus(self): + return await self.group.focus() + + @dbus_method_async(input_signature="", result_signature="s") + async def Reconfigure(self): + return await self.group.configure() + + @dbus_property_async("s") + def Name(self): + return self.group.definition["name"] + + @dbus_property_async("s") + def Output(self): + return self.group.output_name + + @dbus_property_async("b") + def IsFocused(self): + return self.group.focused + + @dbus_property_async("o") + def Context(self): + return self.group.context.bus_name + + @dbus_property_async("o") + def FocusedWorkspace(self): + return self.group.focused_workspace.bus_name + + @dbus_property_async("ao") + def Workspaces(self): + return [w.bus_name for w in self.group.workspaces] + + class DBusObject(DBusInterface): + + def __init__(self, group: "Group"): + super().__init__(group) + self.export_to_dbus(group.bus_name) + + class DBusProxy(DBusInterface): + + def __init__(self, name: str): + super().__init__() + if "/" in name: + self._proxify(name) + else: + self._proxify(f"/dev/ezri/voidshell/workspaces/group/{name}") + + def __init__(self, definition: GroupConfig): + super().__init__() + + manager = Manager.get_manager() + self.definition = definition + self.workspaces = [ + manager._getWorkspaceByCompositorIndex(ws) + for ws in definition.get("workspaces") + ] + self.output_name = None + self.focused = False + self._focused_workspace = None + self.context = None + self._dbusObject = Group.DBusObject(self) + + +class Manager( + GObject.Object, +): + + class DBusInterface( + DbusInterfaceCommonAsync, + interface_name="dev.ezri.voidshell.workspaces.Tree", + ): + + def __init__(self, tree: "Manager" = None): + super().__init__() + self.tree = tree + + @dbus_signal_async() + def Updated(self): + raise NotImplementedError + + @dbus_method_async(input_signature="s", result_signature="s") + async def GetWorkspace(self, workspacePath: str): + return self.tree.getWorkspace(workspacePath) + + class DBusObject(DBusInterface): + + def __init__(self, tree: "Manager"): + super().__init__(tree) + self.export_to_dbus("/dev/ezri/voidshell/workspaces/tree") + + class DBusProxy(DBusInterface): + + def __init__(self): + super().__init__() + self._proxify("dev.ezri.voidshell", "/dev/ezri/voidshell/workspaces/tree") + + _instance = None + + @staticmethod + def get_manager() -> "Manager": + return Manager._instance + + def __init__( + self, + config: Config, + ): + super().__init__() + Manager._instance = self + self.config = config + self.workspaces = [ + Workspace(workspaceConfig) for workspaceConfig in self.config.workspaces + ] + + def pathToObject(self, path: str): + + # Path structure: /dev/ezri/voidshell/workspaces/{context,group,workspace}/{index for workspace, name for others} + path = path.split("/") + if ( + path[0] != "" + or path[1] != "dev" + or path[2] != "ezri" + or path[3] != "voidshell" + or path[4] != "workspaces" + ): + raise ValueError("Invalid path") + if path[5] == "context": + return self.findContextByName(path[6]) + if path[5] == "group": + return self.findGroupByName(path[6]) + if path[5] == "workspace": + return self.findWorkspaceByIndex(int(path[6])) + raise ValueError("Invalid path") + + def getGroupByOutput(self, outputName: str): + """Get the group that is currently assigned to the given output""" + pass