from typing import Any import json from i3ipc.replies import OutputReply, WorkspaceReply from i3ipc.aio import Connection import asyncio from .utils import OutputMatch import itertools DEFAULT_TERMINAL = "alacritty" class Workspace: """A class representing a Sway workspace in the format we use. Attributes: index: The index of the workspace, as a string. This is the Sway workspace name, and is never displayed to the user. name: The name of the workspace, as a string. This is the name that is displayed to the user, as defined in `workspaces.json`. """ index: int = None name: str = None active: bool = False visible: bool = False focused: bool = False alerted: bool = False def __init__(self, dictionary: dict[str, str | int | dict[str, str]]): self.index = dictionary.get("index") self.name = dictionary.get("name") self.definition = dictionary def update_state(self, state_update: Any): self.active = True self.visible = state_update.visible self.focused = state_update.focused self.alerted = state_update.urgent def deactivate(self): self.active = False self.visible = False self.focused = False self.alerted = False def __dict__(self): return { "index": str(self.index), "name": self.name, "active": self.active, "visible": self.visible, "focused": self.focused, "alerted": self.alerted, } def __json__(self): return { "index": str(self.index), "name": self.name, "active": self.active, "visible": self.visible, "focused": self.focused, "alerted": self.alerted, } def __repr__(self): return f"Workspace({self.index}, '{self.name}')" async def focus(self, i3: Connection): """Focus the workspace in Sway.""" await i3.command(f"workspace {self.index}") async def move_container(self, i3: Connection): """Move the focused container to the workspace.""" await i3.command(f"move container to workspace {self.index}") async def reassign(self, i3: Connection, output: str): """Reassign the workspace to a different output.""" await i3.command(f"workspace {self.index} output {output}") async def relocate(self, i3: Connection, output: str): """Move the workspace to a different output.""" await i3.command(f"workspace {self.index}") await i3.command(f"move workspace to output {output}") class WorkspaceGroup: """A class representing a group of workspaces. Generally maps to a monitor.""" name: str = None workspaces: list[Workspace] = None reverse: bool = False output_name: str = None def __init__(self, output_data: dict[str, str]): self.name = output_data["group"] self.output_names = output_data.get("names", []) self.make = output_data.get("make", None) self.model = output_data.get("model", None) self.serial = output_data.get("serial", None) self.position = output_data.get("position", [0, 0]) self.mode = output_data.get("mode", None) self.transform = output_data.get("transform", None) self.eww_windows = output_data.get("eww_windows", []) if isinstance(self.eww_windows, list): self.eww_windows = {window: {} for window in self.eww_windows} self.workspaces = [] def add_workspace(self, workspace: Workspace): """Add a workspace to the group.""" self.workspaces.append(workspace) def __iter__(self): return iter(self.workspaces) def __json__(self): if self.reverse: return [workspace.__json__() for workspace in reversed(self.workspaces)] return [workspace.__json__() for workspace in self.workspaces] def __repr__(self): return f"WorkspaceGroup({self.name}, {repr(self.workspaces)})" @property def active_workspace(self) -> Workspace: """Returns the active workspace in the group.""" return next( (workspace for workspace in self.workspaces if workspace.visible), None, ) @property def active(self) -> bool: """Returns whether the group is active.""" return any(workspace.focused for workspace in self.workspaces) async def focus(self, i3: Connection): """Focus the group in Sway.""" if self.make and self.model and self.serial: await i3.command(f"focus output {self.make} {self.model} {self.serial}") elif len(self.output_names) > 0: for name in self.output_names: await i3.command(f"focus output {name}") else: raise ValueError( "No output name or make/model/serial provided, cannot focus group" ) async def configure(self, i3: Connection, outputs: list[OutputReply]): """Configure the group output in Sway.""" transform = "" mode = "" selector = "" if self.transform: transform = f"transform {self.transform}" if self.mode: mode = f"mode {self.mode}" if self.make and self.model and self.serial: selector = f'"{self.make} {self.model} {self.serial}"' await self.get_output_name(i3, force=True) elif len(self.output_names) > 0: for name in self.output_names: if name in [output.name for output in outputs]: selector = name self.output_name = selector break # Configure the output await i3.command( f"output {selector} position {self.position[0]} {self.position[1]} {mode} {transform} enable" ) async def validate(self, i3: Connection, workspaces: list[WorkspaceReply]): """Validate that each workspace in the group is assigned to and present on the correct output.""" ouput_name = self.get_output_name(i3) for workspace in self.workspaces: # Get output name for workspace workspace_output = next( ( workspace_output for workspace_output in workspaces if workspace_output.name == workspace.index ), None, ) if workspace_output is None: print( f"Workspace {workspace.index} not found in workspaces, skipping validation", flush=True, ) continue if workspace_output.output != ouput_name: print( f"Workspace {workspace.index} is assigned to {workspace_output.output}, not {ouput_name}, reassigning", flush=True, ) await workspace.relocate(i3, ouput_name) async def get_output_name(self, i3: Connection, force: bool = False) -> str: """Get the name of the output in Sway.""" if not force and self.output_name is not None: return self.output_name outputs = await i3.get_outputs() # If we have make, model, and serial, search by those first if self.make and self.model and self.serial: for output in outputs: if ( output.make == self.make and output.model == self.model and output.serial == self.serial ): print( f"Found output {output.name} by make, model, and serial for group {self.name}", flush=True, ) self.output_name = output.name return output.name # If we don't find an exact match for the output, search by name if we have any if len(self.output_names) > 0: for output in outputs: if output.name in self.output_names: print( f"Found output {output.name} by name for group {self.name}", flush=True, ) self.output_name = output.name return output.name return None def get_match_level(self, output: OutputReply, context: str = "") -> OutputMatch: """Get the match level score for the output.""" if self.make and self.model and self.serial: if ( output.make == self.make and output.model == self.model and output.serial == self.serial ): print( f"Context {context}: ID_MATCH for {output.name} on group {self.name}", flush=True, ) return OutputMatch.ID_MATCH if output.name in self.output_names: print( f"Context {context}: NAME_MATCH for {output.name} on group {self.name}", flush=True, ) return OutputMatch.NAME_MATCH return OutputMatch.NO_MATCH class WorkspaceContext: """A class representing a context, containing all workspaces and groups within the context.""" name: str = None groups: list[WorkspaceGroup] = None def __init__( self, name: str, data: dict[str, list[dict[str, list[int]]] | list[dict[str, str]]], workspaces: list[Workspace], ): self.groups = [WorkspaceGroup(output) for output in data["outputs"]] self.name = name for group in self.groups: group_object = data["groups"][group.name] if group_object.get("reverse", False): group.reverse = True for workspace in group_object["workspaces"]: workspace_obj = next( (w for w in workspaces if w.index == workspace), None, ) if workspace_obj: group.add_workspace(workspace_obj) else: raise Exception( f"Error: undefined workspace {workspace} referenced in context {name} group {group.name}" ) primary_group_name = data.get("primary", self.groups[0].name) self.primary_group = next( (group for group in self.groups if group.name == primary_group_name), None, ) def add_group(self, group: WorkspaceGroup): """Add a group to the context.""" self.groups.append(group) def compatability_rank(self, outputs: list[OutputReply]) -> int: """Get the compatability rank of the context with the given outputs.""" result = 0 for group in self.groups: group_result = max( [ group.get_match_level(output, context=self.name).value for output in outputs ] ) if group_result == 0: print( f"Context {self.name}: failed to match group {group.name}", flush=True, ) return 0 result += group_result return result async def deactivate(self, i3: Connection): """Deactivate the context in Sway.""" # First, close all EWW windows proc = await asyncio.create_subprocess_exec("eww", "close-all") await proc.wait() # Then, disable all displays await i3.command("output * disable") async def activate(self, i3: Connection): """Activate the context in Sway.""" outputs = await i3.get_outputs() # Configure all displays defined in the context for group in self.groups: await group.configure(i3, outputs) # Then, open all EWW windows defined in the context on the appropriate windows proc = await asyncio.create_subprocess_exec("eww", "reload") await proc.wait() for group in self.groups: for window in group.eww_windows: extra_args = [ f"{key}={value}" for key, value in group.eww_windows[window].items() ] extra_args = list( zip( ["--arg"] * len(extra_args), extra_args, ) ) proc = await asyncio.create_subprocess_exec( "eww", "open", window, "--id", f"{group.name}-{window}", "--screen", await group.get_output_name(i3), "--arg", f"group={group.name}", *list(itertools.chain(*extra_args)), ) await proc.wait() # Finally, validate workspaces and move them if necessary for group in self.groups: workspaces = await i3.get_workspaces() await group.validate(i3, workspaces) @property def active_group(self) -> WorkspaceGroup: """Returns the active group in the context.""" return next( (group for group in self.groups if group.active), None, ) @property def active_workspace(self) -> Workspace: """Returns the active workspace in the context.""" try: return self.active_group.active_workspace except AttributeError: # A context may not have any active workspaces, in which case we return None return None @property def visible_workspaces(self) -> dict[str, Workspace]: """Returns a dictionary of all visible workspaces in the context.""" return {group.name: group.active_workspace for group in self.groups} def __iter__(self): return iter(self.groups) def __repr__(self): return f"WorkspaceContext({self.name}, {repr(self.groups)})" def __json__(self): return {group.name: group.__json__() for group in self.groups} class WorkspaceTree: current_context: WorkspaceContext = None workspaces: list[Workspace] = None def __init__(self, filename: str): self.load_file(filename) def load_file(self, filename: str): with open(filename, "r") as file: initial: dict = json.load(file) self.contexts: list[WorkspaceContext] = [] self.workspaces = [Workspace(workspace) for workspace in initial["workspaces"]] for context, context_dict in initial["contexts"].items(): context_obj = WorkspaceContext(context, context_dict, self.workspaces) self.contexts.append(context_obj) # Don't override the current context if it's already set if ( initial.get("default_context", "default") == context and self.current_context is None ): self.current_context = context_obj def __dict__(self): return { "current_context": self.current_context, "contexts": self.contexts, } def __repr__(self): return f"WorkspaceTree({self.current_context}, {repr(self.contexts)})" def __json__(self): return {self.current_context.name: self.current_context.__json__()} def get_workspace(self, user_index: int) -> tuple[Workspace, WorkspaceGroup] | None: """Returns a workspace object based on the user index.""" if user_index < 1: raise IndexError("Workspace index must be greater than or equal to 1.") # First, find the active workspace active_workspace = self.current_context.active_workspace # Next, get its index within its group, and the active group index active_index = ( self.current_context.active_group.workspaces.index(active_workspace) + 1 ) group_index = self.current_context.groups.index( self.current_context.active_group ) # If the user index is the same as the active index, or is outside the range for this group, start searching other groups if user_index == active_index or user_index > len( self.current_context.active_group.workspaces ): counter = 0 while counter < len(self.current_context.groups): group = self.current_context.groups[ (group_index + counter) % len(self.current_context.groups) ] if ( user_index > len(group.workspaces) or group == self.current_context.active_group ): # Group doesn't have enough workspaces, or is the active group counter += 1 continue return group.workspaces[user_index - 1], group # If we didn't find a workspace, return None return None else: # If the user index is different from the active index, return the workspace in the active group return ( self.current_context.active_group.workspaces[user_index - 1], self.current_context.active_group, ) def update_workspaces(self, ws_sway_dict: dict): """Updates the workspaces in the tree based on the return from sway IPC.""" touched = set() for index, ws_data in ws_sway_dict.items(): # Udpate data for all active workspaces ws_state: Workspace = next( (ws for ws in self.workspaces if str(ws.index) == index), None, ) if ws_state: touched.add(ws_state) ws_state.update_state(ws_data) else: print(f"Warning: workspace {index} not found in tree.") # Deactivate any workspaces that weren't touched for ws in self.workspaces: if ws not in touched: ws.deactivate() async def score_contexts(self, i3: Connection): outputs = await i3.get_outputs() scores = [ (context, context.compatability_rank(outputs)) for context in self.contexts ] scores.sort(key=lambda x: x[1], reverse=True) return scores async def update_context(self, i3: Connection, match_context_on_name: bool = False): """Activates a new context in Sway based on the current display configuration.""" # Get the scores scores = await self.score_contexts(i3) # If the top context is the current context, or the rank is 0, do nothing if scores[0][1] == 0: print( "No context is compatible with the current display configuration, doing nothing.", flush=True, ) return if self.current_context is None: await scores[0][0].activate(i3) self.current_context = scores[0][0] return elif scores[0][0] == self.current_context: print("Context is already active.", flush=True) elif match_context_on_name and scores[0][0].name == self.current_context.name: print("Context is already active.", flush=True) await scores[0][0].activate(i3) self.current_context = scores[0][0] else: await self.current_context.deactivate(i3) await scores[0][0].activate(i3) self.current_context = scores[0][0] async def activate_context(self, i3: Connection, name: str): """Activates a context by name. This will fail if the current display configuration is incompatible.""" context = next( (context for context in self.contexts if context.name == name), None, ) outputs = await i3.get_outputs() score = context.compatability_rank(outputs) if score == 0: raise ValueError( "Context is incompatible with current display configuration." ) print(f"Activating context {context.name} with score {score}.", flush=True) if self.current_context is not None and self.current_context != context: await self.current_context.deactivate(i3) self.current_context = context await context.activate(i3)