524 lines
19 KiB
Python
524 lines
19 KiB
Python
from typing import Any
|
|
import json
|
|
from i3ipc.replies import OutputReply, WorkspaceReply
|
|
from i3ipc.aio import Connection
|
|
import asyncio
|
|
import subprocess
|
|
from .utils import OutputMatch
|
|
|
|
DEFAULT_TERMINAL = "alacritty"
|
|
|
|
|
|
class Workspace:
|
|
"""A class representing a Sway workspace in the format we use.
|
|
|
|
Attributes:
|
|
index: The index of the workspace, as a string. This is the Sway workspace name, and is never displayed to the user.
|
|
name: The name of the workspace, as a string. This is the name that is displayed to the user, as defined in `workspaces.json`.
|
|
"""
|
|
|
|
index: int = None
|
|
name: str = None
|
|
|
|
active: bool = False
|
|
visible: bool = False
|
|
focused: bool = False
|
|
alerted: bool = False
|
|
|
|
def __init__(self, dictionary: dict[str, str | int | dict[str, str]]):
|
|
self.index = dictionary.get("index")
|
|
self.name = dictionary.get("name")
|
|
|
|
self.definition = dictionary
|
|
|
|
def update_state(self, state_update: Any):
|
|
self.active = True
|
|
self.visible = state_update.visible
|
|
self.focused = state_update.focused
|
|
self.alerted = state_update.urgent
|
|
|
|
def deactivate(self):
|
|
self.active = False
|
|
self.visible = False
|
|
self.focused = False
|
|
self.alerted = False
|
|
|
|
def __dict__(self):
|
|
return {
|
|
"index": str(self.index),
|
|
"name": self.name,
|
|
"active": self.active,
|
|
"visible": self.visible,
|
|
"focused": self.focused,
|
|
"alerted": self.alerted,
|
|
}
|
|
|
|
def __json__(self):
|
|
return {
|
|
"index": str(self.index),
|
|
"name": self.name,
|
|
"active": self.active,
|
|
"visible": self.visible,
|
|
"focused": self.focused,
|
|
"alerted": self.alerted,
|
|
}
|
|
|
|
def __repr__(self):
|
|
return f"Workspace({self.index}, '{self.name}')"
|
|
|
|
async def focus(self, i3: Connection):
|
|
"""Focus the workspace in Sway."""
|
|
await i3.command(f"workspace {self.index}")
|
|
|
|
async def move_container(self, i3: Connection):
|
|
"""Move the focused container to the workspace."""
|
|
await i3.command(f"move container to workspace {self.index}")
|
|
|
|
async def reassign(self, i3: Connection, output: str):
|
|
"""Reassign the workspace to a different output."""
|
|
await i3.command(f"workspace {self.index} output {output}")
|
|
|
|
async def relocate(self, i3: Connection, output: str):
|
|
"""Move the workspace to a different output."""
|
|
await i3.command(f"workspace {self.index}")
|
|
await i3.command(f"move workspace to output {output}")
|
|
|
|
|
|
class WorkspaceGroup:
|
|
"""A class representing a group of workspaces. Generally maps to a monitor."""
|
|
|
|
name: str = None
|
|
workspaces: list[Workspace] = None
|
|
reverse: bool = False
|
|
|
|
def __init__(self, output_data: dict[str, str]):
|
|
self.name = output_data["group"]
|
|
self.output_names = output_data.get("names", [])
|
|
self.make = output_data.get("make", None)
|
|
self.model = output_data.get("model", None)
|
|
self.serial = output_data.get("serial", None)
|
|
self.position = output_data.get("position", [0, 0])
|
|
self.mode = output_data.get("mode", None)
|
|
self.transform = output_data.get("transform", None)
|
|
self.eww_windows = output_data.get("eww_windows", [])
|
|
self.workspaces = []
|
|
|
|
def add_workspace(self, workspace: Workspace):
|
|
"""Add a workspace to the group."""
|
|
self.workspaces.append(workspace)
|
|
|
|
def __iter__(self):
|
|
return iter(self.workspaces)
|
|
|
|
def __json__(self):
|
|
if self.reverse:
|
|
return [workspace.__json__() for workspace in reversed(self.workspaces)]
|
|
return [workspace.__json__() for workspace in self.workspaces]
|
|
|
|
def __repr__(self):
|
|
return f"WorkspaceGroup({self.name}, {repr(self.workspaces)})"
|
|
|
|
@property
|
|
def active_workspace(self) -> Workspace:
|
|
"""Returns the active workspace in the group."""
|
|
return next(
|
|
(workspace for workspace in self.workspaces if workspace.visible),
|
|
None,
|
|
)
|
|
|
|
@property
|
|
def active(self) -> bool:
|
|
"""Returns whether the group is active."""
|
|
return any(workspace.focused for workspace in self.workspaces)
|
|
|
|
async def focus(self, i3: Connection):
|
|
"""Focus the group in Sway."""
|
|
if self.make and self.model and self.serial:
|
|
await i3.command(f"focus output {self.make} {self.model} {self.serial}")
|
|
elif len(self.output_names) > 0:
|
|
for name in self.output_names:
|
|
await i3.command(f"focus output {name}")
|
|
else:
|
|
raise ValueError(
|
|
"No output name or make/model/serial provided, cannot focus group"
|
|
)
|
|
|
|
async def configure(self, i3: Connection, outputs: list[OutputReply]):
|
|
"""Configure the group output in Sway."""
|
|
transform = ""
|
|
mode = ""
|
|
selector = ""
|
|
if self.transform:
|
|
transform = f"transform {self.transform}"
|
|
if self.mode:
|
|
mode = f"mode {self.mode}"
|
|
if self.make and self.model and self.serial:
|
|
selector = f'"{self.make} {self.model} {self.serial}"'
|
|
elif len(self.output_names) > 0:
|
|
for name in self.output_names:
|
|
if name in [output.name for output in outputs]:
|
|
selector = name
|
|
break
|
|
# Configure the output
|
|
await i3.command(
|
|
f"output {selector} position {self.position[0]} {self.position[1]} {mode} {transform} enable"
|
|
)
|
|
|
|
async def validate(self, i3: Connection, workspaces: list[WorkspaceReply]):
|
|
"""Validate that each workspace in the group is assigned to and present on the correct output."""
|
|
ouput_name = self.get_output_name(i3)
|
|
|
|
for workspace in self.workspaces:
|
|
# Get output name for workspace
|
|
workspace_output = next(
|
|
(
|
|
workspace_output
|
|
for workspace_output in workspaces
|
|
if workspace_output.name == workspace.index
|
|
),
|
|
None,
|
|
)
|
|
if workspace_output is None:
|
|
print(
|
|
f"Workspace {workspace.index} not found in workspaces, skipping validation",
|
|
flush=True,
|
|
)
|
|
continue
|
|
if workspace_output.output != ouput_name:
|
|
print(
|
|
f"Workspace {workspace.index} is assigned to {workspace_output.output}, not {ouput_name}, reassigning",
|
|
flush=True,
|
|
)
|
|
await workspace.relocate(i3, ouput_name)
|
|
|
|
async def get_output_name(self, i3: Connection) -> str:
|
|
"""Get the name of the output in Sway."""
|
|
outputs = await i3.get_outputs()
|
|
# If we have make, model, and serial, search by those first
|
|
if self.make and self.model and self.serial:
|
|
for output in outputs:
|
|
if (
|
|
output.make == self.make
|
|
and output.model == self.model
|
|
and output.serial == self.serial
|
|
):
|
|
print(
|
|
f"Found output {output.name} by make, model, and serial for group {self.name}",
|
|
flush=True,
|
|
)
|
|
return output.name
|
|
# If we don't find an exact match for the output, search by name if we have any
|
|
if len(self.output_names) > 0:
|
|
for output in outputs:
|
|
if output.name in self.output_names:
|
|
print(
|
|
f"Found output {output.name} by name for group {self.name}",
|
|
flush=True,
|
|
)
|
|
return output.name
|
|
return None
|
|
|
|
def get_match_level(self, output: OutputReply) -> OutputMatch:
|
|
"""Get the match level score for the output."""
|
|
if self.make and self.model and self.serial:
|
|
if (
|
|
output.make == self.make
|
|
and output.model == self.model
|
|
and output.serial == self.serial
|
|
):
|
|
print(
|
|
f"Match level: ID_MATCH for {output.name} on group {self.name}",
|
|
flush=True,
|
|
)
|
|
return OutputMatch.ID_MATCH
|
|
if output.name in self.output_names:
|
|
print(
|
|
f"Match level: NAME_MATCH for {output.name} on group {self.name}",
|
|
flush=True,
|
|
)
|
|
return OutputMatch.NAME_MATCH
|
|
print(
|
|
f"Match level: NO_MATCH for {output.name} on group {self.name}", flush=True
|
|
)
|
|
return OutputMatch.NO_MATCH
|
|
|
|
|
|
class WorkspaceContext:
|
|
"""A class representing a context, containing all workspaces and groups within the context."""
|
|
|
|
name: str = None
|
|
groups: list[WorkspaceGroup] = None
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
data: dict[str, list[dict[str, list[int]]] | list[dict[str, str]]],
|
|
workspaces: list[Workspace],
|
|
):
|
|
self.groups = [WorkspaceGroup(output) for output in data["outputs"]]
|
|
self.name = name
|
|
for group in self.groups:
|
|
group_object = data["groups"][group.name]
|
|
if group_object.get("reverse", False):
|
|
group.reverse = True
|
|
for workspace in group_object["workspaces"]:
|
|
workspace_obj = next(
|
|
(w for w in workspaces if w.index == workspace),
|
|
None,
|
|
)
|
|
if workspace_obj:
|
|
group.add_workspace(workspace_obj)
|
|
else:
|
|
raise Exception(
|
|
f"Error: undefined workspace {workspace} referenced in context {name} group {group.name}"
|
|
)
|
|
primary_group_name = data.get("primary", self.groups[0].name)
|
|
self.primary_group = next(
|
|
(group for group in self.groups if group.name == primary_group_name),
|
|
None,
|
|
)
|
|
|
|
def add_group(self, group: WorkspaceGroup):
|
|
"""Add a group to the context."""
|
|
self.groups.append(group)
|
|
|
|
def compatability_rank(self, outputs: list[OutputReply]) -> int:
|
|
"""Get the compatability rank of the context with the given outputs."""
|
|
|
|
result = 0
|
|
for group in self.groups:
|
|
group_result = max(
|
|
[group.get_match_level(output).value for output in outputs]
|
|
)
|
|
if group_result == 0:
|
|
return 0
|
|
result += group_result
|
|
return result
|
|
|
|
async def deactivate(self, i3: Connection):
|
|
"""Deactivate the context in Sway."""
|
|
# First, close all EWW windows
|
|
proc = await asyncio.create_subprocess_exec("eww", "close-all")
|
|
await proc.wait()
|
|
|
|
# Then, disable all displays
|
|
await i3.command("output * disable")
|
|
|
|
async def activate(self, i3: Connection):
|
|
"""Activate the context in Sway."""
|
|
defined_displays = [
|
|
f"{group.make} {group.model} {group.serial}" for group in self.groups
|
|
]
|
|
outputs = await i3.get_outputs()
|
|
|
|
# Configure all displays defined in the context
|
|
for group in self.groups:
|
|
await group.configure(i3, outputs)
|
|
|
|
# Then, open all EWW windows defined in the context on the appropriate windows
|
|
proc = await asyncio.create_subprocess_exec("eww", "reload")
|
|
await proc.wait()
|
|
for group in self.groups:
|
|
for window in group.eww_windows:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"eww",
|
|
"open",
|
|
window,
|
|
"--screen",
|
|
await group.get_output_name(i3),
|
|
)
|
|
await proc.wait()
|
|
|
|
# Finally, validate workspaces and move them if necessary
|
|
for group in self.groups:
|
|
workspaces = await i3.get_workspaces()
|
|
await group.validate(i3, workspaces)
|
|
|
|
@property
|
|
def active_group(self) -> WorkspaceGroup:
|
|
"""Returns the active group in the context."""
|
|
return next(
|
|
(group for group in self.groups if group.active),
|
|
None,
|
|
)
|
|
|
|
@property
|
|
def active_workspace(self) -> Workspace:
|
|
"""Returns the active workspace in the context."""
|
|
try:
|
|
return self.active_group.active_workspace
|
|
except AttributeError:
|
|
# A context may not have any active workspaces, in which case we return None
|
|
return None
|
|
|
|
@property
|
|
def visible_workspaces(self) -> dict[str, Workspace]:
|
|
"""Returns a dictionary of all visible workspaces in the context."""
|
|
return {group.name: group.active_workspace for group in self.groups}
|
|
|
|
def __iter__(self):
|
|
return iter(self.groups)
|
|
|
|
def __repr__(self):
|
|
return f"WorkspaceContext({self.name}, {repr(self.groups)})"
|
|
|
|
def __json__(self):
|
|
return {group.name: group.__json__() for group in self.groups}
|
|
|
|
|
|
class WorkspaceTree:
|
|
|
|
current_context: WorkspaceContext = None
|
|
workspaces: list[Workspace] = None
|
|
|
|
def __init__(self, filename: str):
|
|
self.load_file(filename)
|
|
|
|
def load_file(self, filename: str):
|
|
with open(filename, "r") as file:
|
|
initial: dict = json.load(file)
|
|
|
|
self.contexts: list[WorkspaceContext] = []
|
|
self.workspaces = [Workspace(workspace) for workspace in initial["workspaces"]]
|
|
|
|
for context, context_dict in initial["contexts"].items():
|
|
context_obj = WorkspaceContext(context, context_dict, self.workspaces)
|
|
self.contexts.append(context_obj)
|
|
# Don't override the current context if it's already set
|
|
if (
|
|
initial.get("default_context", "default") == context
|
|
and self.current_context is None
|
|
):
|
|
self.current_context = context_obj
|
|
|
|
def __dict__(self):
|
|
return {
|
|
"current_context": self.current_context,
|
|
"contexts": self.contexts,
|
|
}
|
|
|
|
def __repr__(self):
|
|
return f"WorkspaceTree({self.current_context}, {repr(self.contexts)})"
|
|
|
|
def __json__(self):
|
|
return {self.current_context.name: self.current_context.__json__()}
|
|
|
|
def get_workspace(self, user_index: int) -> tuple[Workspace, WorkspaceGroup] | None:
|
|
"""Returns a workspace object based on the user index."""
|
|
if user_index < 1:
|
|
raise IndexError("Workspace index must be greater than or equal to 1.")
|
|
# First, find the active workspace
|
|
active_workspace = self.current_context.active_workspace
|
|
# Next, get its index within its group, and the active group index
|
|
active_index = (
|
|
self.current_context.active_group.workspaces.index(active_workspace) + 1
|
|
)
|
|
group_index = self.current_context.groups.index(
|
|
self.current_context.active_group
|
|
)
|
|
|
|
# If the user index is the same as the active index, or is outside the range for this group, start searching other groups
|
|
if user_index == active_index or user_index > len(
|
|
self.current_context.active_group.workspaces
|
|
):
|
|
counter = 0
|
|
while counter < len(self.current_context.groups):
|
|
group = self.current_context.groups[
|
|
(group_index + counter) % len(self.current_context.groups)
|
|
]
|
|
if (
|
|
user_index > len(group.workspaces)
|
|
or group == self.current_context.active_group
|
|
):
|
|
# Group doesn't have enough workspaces, or is the active group
|
|
counter += 1
|
|
|
|
continue
|
|
return group.workspaces[user_index - 1], group
|
|
# If we didn't find a workspace, return None
|
|
return None
|
|
else:
|
|
# If the user index is different from the active index, return the workspace in the active group
|
|
return (
|
|
self.current_context.active_group.workspaces[user_index - 1],
|
|
self.current_context.active_group,
|
|
)
|
|
|
|
def update_workspaces(self, ws_sway_dict: dict):
|
|
"""Updates the workspaces in the tree based on the return from sway IPC."""
|
|
touched = set()
|
|
|
|
for index, ws_data in ws_sway_dict.items():
|
|
# Udpate data for all active workspaces
|
|
ws_state: Workspace = next(
|
|
(ws for ws in self.workspaces if str(ws.index) == index),
|
|
None,
|
|
)
|
|
if ws_state:
|
|
touched.add(ws_state)
|
|
ws_state.update_state(ws_data)
|
|
else:
|
|
print(f"Warning: workspace {index} not found in tree.")
|
|
|
|
# Deactivate any workspaces that weren't touched
|
|
for ws in self.workspaces:
|
|
if ws not in touched:
|
|
ws.deactivate()
|
|
|
|
async def update_context(self, i3: Connection, match_context_on_name: bool = False):
|
|
"""Activates a new context in Sway based on the current display configuration."""
|
|
# First, get the current display configuration
|
|
outputs = await i3.get_outputs()
|
|
print(outputs, flush=True)
|
|
|
|
# Next, calculate match scores for each context
|
|
scores = [
|
|
(context, context.compatability_rank(outputs)) for context in self.contexts
|
|
]
|
|
print([f"{context.name}: {score}" for context, score in scores], flush=True)
|
|
# Sort the scores by rank
|
|
scores.sort(key=lambda x: x[1], reverse=True)
|
|
|
|
# If the top context is the current context, or the rank is 0, do nothing
|
|
if scores[0][1] == 0:
|
|
print(
|
|
"No context is compatible with the current display configuration, doing nothing.",
|
|
flush=True,
|
|
)
|
|
return
|
|
if self.current_context is None:
|
|
await scores[0][0].activate(i3)
|
|
self.current_context = scores[0][0]
|
|
return
|
|
elif scores[0][0] == self.current_context:
|
|
print("Context is already active.", flush=True)
|
|
elif match_context_on_name and scores[0][0].name == self.current_context.name:
|
|
print("Context is already active.", flush=True)
|
|
await scores[0][0].activate(i3)
|
|
self.current_context = scores[0][0]
|
|
else:
|
|
await self.current_context.deactivate(i3)
|
|
await scores[0][0].activate(i3)
|
|
self.current_context = scores[0][0]
|
|
|
|
async def activate_context(self, i3: Connection, name: str):
|
|
"""Activates a context by name. This will fail if the current display configuration is incompatible."""
|
|
context = next(
|
|
(context for context in self.contexts if context.name == name),
|
|
None,
|
|
)
|
|
|
|
outputs = await i3.get_outputs()
|
|
|
|
score = context.compatability_rank(outputs)
|
|
if score == 0:
|
|
raise ValueError(
|
|
"Context is incompatible with current display configuration."
|
|
)
|
|
print(f"Activating context {context.name} with score {score}.", flush=True)
|
|
if self.current_context is not None and self.current_context != context:
|
|
await self.current_context.deactivate(i3)
|
|
self.current_context = context
|
|
|
|
await context.activate(i3)
|