388 lines
14 KiB
Python
388 lines
14 KiB
Python
from typing import Any
|
|
import json
|
|
from i3ipc.replies import OutputReply
|
|
from i3ipc.aio import Connection
|
|
import asyncio
|
|
import subprocess
|
|
|
|
DEFAULT_TERMINAL = "alacritty"
|
|
|
|
|
|
class Workspace:
|
|
"""A class representing a Sway workspace in the format we use.
|
|
|
|
Attributes:
|
|
index: The index of the workspace, as a string. This is the Sway workspace name, and is never displayed to the user.
|
|
name: The name of the workspace, as a string. This is the name that is displayed to the user, as defined in `workspaces.json`.
|
|
"""
|
|
|
|
index: int = None
|
|
name: str = None
|
|
|
|
active: bool = False
|
|
visible: bool = False
|
|
focused: bool = False
|
|
alerted: bool = False
|
|
|
|
def __init__(self, dictionary: dict[str, str | int | dict[str, str]]):
|
|
self.index = dictionary.get("index")
|
|
self.name = dictionary.get("name")
|
|
|
|
self.definition = dictionary
|
|
|
|
def update_state(self, state_update: Any):
|
|
self.active = True
|
|
self.visible = state_update.visible
|
|
self.focused = state_update.focused
|
|
self.alerted = state_update.urgent
|
|
|
|
def deactivate(self):
|
|
self.active = False
|
|
self.visible = False
|
|
self.focused = False
|
|
self.alerted = False
|
|
|
|
def __dict__(self):
|
|
return {
|
|
"index": str(self.index),
|
|
"name": self.name,
|
|
"active": self.active,
|
|
"visible": self.visible,
|
|
"focused": self.focused,
|
|
"alerted": self.alerted,
|
|
}
|
|
|
|
def __json__(self):
|
|
return self.__dict__()
|
|
|
|
def __repr__(self):
|
|
return f"Workspace({self.index}, '{self.name}')"
|
|
|
|
async def focus(self, i3: Connection):
|
|
"""Focus the workspace in Sway."""
|
|
await i3.command(f"workspace {self.index}")
|
|
|
|
async def move_container(self, i3: Connection):
|
|
"""Move the focused container to the workspace."""
|
|
await i3.command(f"move container to workspace {self.index}")
|
|
|
|
|
|
class WorkspaceGroup:
|
|
"""A class representing a group of workspaces. Generally maps to a monitor."""
|
|
|
|
name: str = None
|
|
workspaces: list[Workspace] = None
|
|
reverse: bool = False
|
|
|
|
def __init__(self, output_data: dict[str, str]):
|
|
self.name = output_data["group"]
|
|
self.make = output_data.get("make", None)
|
|
self.model = output_data.get("model", None)
|
|
self.serial = output_data.get("serial", None)
|
|
self.position = output_data.get("position", [0, 0])
|
|
self.mode = output_data.get("mode", None)
|
|
self.transform = output_data.get("transform", None)
|
|
self.eww_windows = output_data.get("eww_windows", [])
|
|
self.workspaces = []
|
|
|
|
def add_workspace(self, workspace: Workspace):
|
|
"""Add a workspace to the group."""
|
|
self.workspaces.append(workspace)
|
|
|
|
def __iter__(self):
|
|
return iter(self.workspaces)
|
|
|
|
def __json__(self):
|
|
if self.reverse:
|
|
return [workspace.__json__() for workspace in reversed(self.workspaces)]
|
|
return [workspace.__json__() for workspace in self.workspaces]
|
|
|
|
def __repr__(self):
|
|
return f"WorkspaceGroup({self.name}, {repr(self.workspaces)})"
|
|
|
|
@property
|
|
def active_workspace(self) -> Workspace:
|
|
"""Returns the active workspace in the group."""
|
|
return next(
|
|
(workspace for workspace in self.workspaces if workspace.visible),
|
|
None,
|
|
)
|
|
|
|
@property
|
|
def active(self) -> bool:
|
|
"""Returns whether the group is active."""
|
|
return any(workspace.focused for workspace in self.workspaces)
|
|
|
|
async def configure(self, i3: Connection):
|
|
"""Configure the group output in Sway."""
|
|
transform = ""
|
|
mode = ""
|
|
selector = ""
|
|
if self.transform:
|
|
transform = f"transform {self.transform}"
|
|
if self.mode:
|
|
mode = f"mode {self.mode}"
|
|
if self.make and self.model and self.serial:
|
|
selector = f'"{self.make} {self.model} {self.serial}"'
|
|
await i3.command(
|
|
f"output {selector} position {self.position[0]} {self.position[1]} {mode} {transform}"
|
|
)
|
|
|
|
async def get_output_name(self, i3: Connection) -> str:
|
|
"""Get the name of the output in Sway."""
|
|
outputs = await i3.get_outputs()
|
|
for output in outputs:
|
|
if (
|
|
output.make == self.make
|
|
and output.model == self.model
|
|
and output.serial == self.serial
|
|
):
|
|
return output.name
|
|
return None
|
|
|
|
|
|
class WorkspaceContext:
|
|
"""A class representing a context, containing all workspaces and groups within the context."""
|
|
|
|
name: str = None
|
|
groups: list[WorkspaceGroup] = None
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
data: dict[str, list[dict[str, list[int]]] | list[dict[str, str]]],
|
|
workspaces: list[Workspace],
|
|
):
|
|
self.groups = [WorkspaceGroup(output) for output in data["outputs"]]
|
|
self.name = name
|
|
for group in self.groups:
|
|
group_object = data["groups"][group.name]
|
|
for workspace in group_object["workspaces"]:
|
|
workspace_obj = next(
|
|
(w for w in workspaces if w.index == workspace),
|
|
None,
|
|
)
|
|
if workspace_obj:
|
|
group.add_workspace(workspace_obj)
|
|
else:
|
|
raise Exception(
|
|
f"Error: undefined workspace {workspace} referenced in context {name} group {group.name}"
|
|
)
|
|
|
|
def add_group(self, group: WorkspaceGroup):
|
|
"""Add a group to the context."""
|
|
self.groups.append(group)
|
|
|
|
async def activate(self, i3: Connection):
|
|
"""Activate the context in Sway."""
|
|
defined_displays = [
|
|
f"{group.make} {group.model} {group.serial}" for group in self.groups
|
|
]
|
|
# First, disable all displays not defined in the context
|
|
for output in await i3.get_outputs():
|
|
if f"{output.make} {output.model} {output.serial}" not in defined_displays:
|
|
print("Disabling", output.name)
|
|
await i3.command(f"output {output.name} disable")
|
|
|
|
# Next, configure all displays defined in the context
|
|
for group in self.groups:
|
|
await group.configure(i3)
|
|
|
|
# Then, close all EWW windows
|
|
proc = await asyncio.create_subprocess_exec("eww", "close-all")
|
|
await proc.wait()
|
|
|
|
# Then, open all EWW windows defined in the context on the appropriate windows
|
|
for group in self.groups:
|
|
for window in group.eww_windows:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"eww",
|
|
"open",
|
|
window,
|
|
"--screen",
|
|
await group.get_output_name(i3),
|
|
)
|
|
await proc.wait()
|
|
|
|
@property
|
|
def active_group(self) -> WorkspaceGroup:
|
|
"""Returns the active group in the context."""
|
|
return next(
|
|
(group for group in self.groups if group.active),
|
|
None,
|
|
)
|
|
|
|
@property
|
|
def active_workspace(self) -> Workspace:
|
|
"""Returns the active workspace in the context."""
|
|
try:
|
|
return self.active_group.active_workspace
|
|
except AttributeError:
|
|
# A context may not have any active workspaces, in which case we return None
|
|
return None
|
|
|
|
@property
|
|
def visible_workspaces(self) -> dict[str, Workspace]:
|
|
"""Returns a dictionary of all visible workspaces in the context."""
|
|
return {group.name: group.active_workspace for group in self.groups}
|
|
|
|
def __iter__(self):
|
|
return iter(self.groups)
|
|
|
|
def __repr__(self):
|
|
return f"WorkspaceContext({self.name}, {repr(self.groups)})"
|
|
|
|
def __json__(self):
|
|
return {group.name: group.__json__() for group in self.groups}
|
|
|
|
|
|
class WorkspaceTree:
|
|
|
|
current_context: WorkspaceContext = None
|
|
workspaces: list[Workspace] = None
|
|
|
|
def __init__(self, filename: str):
|
|
|
|
with open(filename, "r") as file:
|
|
initial: dict = json.load(file)
|
|
|
|
self.contexts: list[WorkspaceContext] = []
|
|
self.workspaces = [Workspace(workspace) for workspace in initial["workspaces"]]
|
|
|
|
for context, context_dict in initial["contexts"].items():
|
|
context_obj = WorkspaceContext(context, context_dict, self.workspaces)
|
|
self.contexts.append(context_obj)
|
|
if initial.get("default_context", "personal") == context:
|
|
self.current_context = context_obj
|
|
|
|
def __dict__(self):
|
|
return {
|
|
"current_context": self.current_context,
|
|
"contexts": self.contexts,
|
|
}
|
|
|
|
def __repr__(self):
|
|
return f"WorkspaceTree({self.current_context}, {repr(self.contexts)})"
|
|
|
|
def __json__(self):
|
|
return {context.name: context.__json__() for context in self.contexts}
|
|
|
|
def get_workspace(self, user_index: int) -> Workspace:
|
|
"""Returns a workspace object based on the user index."""
|
|
if user_index < 1:
|
|
raise IndexError("Workspace index must be greater than or equal to 1.")
|
|
# First, find the active workspace
|
|
active_workspace = self.current_context.active_workspace
|
|
# Next, get its index within its group, and the active group index
|
|
active_index = (
|
|
self.current_context.active_group.workspaces.index(active_workspace) + 1
|
|
)
|
|
group_index = self.current_context.groups.index(
|
|
self.current_context.active_group
|
|
)
|
|
|
|
# If the user index is the same as the active index, or is outside the range for this group, start searching other groups
|
|
if user_index == active_index or user_index > len(
|
|
self.current_context.active_group.workspaces
|
|
):
|
|
counter = 0
|
|
while counter < len(self.current_context.groups):
|
|
group = self.current_context.groups[
|
|
(group_index + counter) % len(self.current_context.groups)
|
|
]
|
|
if (
|
|
user_index > len(group.workspaces)
|
|
or group == self.current_context.active_group
|
|
):
|
|
# Group doesn't have enough workspaces, or is the active group
|
|
counter += 1
|
|
|
|
continue
|
|
return group.workspaces[user_index - 1]
|
|
# If we didn't find a workspace, return None
|
|
return None
|
|
else:
|
|
# If the user index is different from the active index, return the workspace in the active group
|
|
return self.current_context.active_group.workspaces[user_index - 1]
|
|
|
|
def update_workspaces(self, ws_sway_dict: dict):
|
|
"""Updates the workspaces in the tree based on the return from sway IPC."""
|
|
touched = set()
|
|
|
|
for index, ws_data in ws_sway_dict.items():
|
|
# Udpate data for all active workspaces
|
|
ws_state: Workspace = next(
|
|
(ws for ws in self.workspaces if str(ws.index) == index),
|
|
None,
|
|
)
|
|
if ws_state:
|
|
touched.add(ws_state)
|
|
ws_state.update_state(ws_data)
|
|
else:
|
|
print(f"Warning: workspace {index} not found in tree.")
|
|
|
|
# Deactivate any workspaces that weren't touched
|
|
for ws in self.workspaces:
|
|
if ws not in touched:
|
|
ws.deactivate()
|
|
|
|
async def update_context(self, i3: Connection):
|
|
"""Activates a new context in Sway based on the current display configuration."""
|
|
# First, get the current display configuration
|
|
outputs = await i3.get_outputs()
|
|
active_outputs = [
|
|
f'"{output.make} {output.model} {output.serial}"' for output in outputs
|
|
]
|
|
|
|
# Next, find the context that matches the current display configuration.
|
|
# We first want to find an exact match, and if that fails, we want to find one where
|
|
# all of its required outputs are present (so extra outputs are fine, they'll be disabled).
|
|
# We will not return a partial match except for the case noted above.
|
|
|
|
for context in self.contexts:
|
|
# First pass, look for exact matches
|
|
context_outputs = [
|
|
f'"{group.make} {group.model} {group.serial}"'
|
|
for group in context.groups
|
|
]
|
|
if all(output in active_outputs for output in context_outputs) and all(
|
|
output in context_outputs for output in active_outputs
|
|
):
|
|
self.current_context = context
|
|
await context.activate(i3)
|
|
return
|
|
|
|
for context in self.contexts:
|
|
context_outputs = [
|
|
f'"{group.make} {group.model} {group.serial}"'
|
|
for group in context.groups
|
|
]
|
|
if all(output in active_outputs for output in context_outputs):
|
|
self.current_context = context
|
|
await context.activate(i3)
|
|
return
|
|
|
|
async def activate_context(self, i3: Connection, name: str):
|
|
"""Activates a context by name. This will fail if the current display configuration is incompatible."""
|
|
context = next(
|
|
(context for context in self.contexts if context.name == name),
|
|
None,
|
|
)
|
|
|
|
outputs = await i3.get_outputs()
|
|
active_outputs = [
|
|
f'"{output.make} {output.model} {output.serial}"' for output in outputs
|
|
]
|
|
|
|
context_outputs = [
|
|
f'"{group.make} {group.model} {group.serial}"' for group in context.groups
|
|
]
|
|
|
|
if all(output in active_outputs for output in context_outputs):
|
|
self.current_context = context
|
|
await context.activate(i3)
|
|
else:
|
|
raise ValueError(
|
|
f"Context {name} is incompatible with the current display configuration."
|
|
)
|