453 lines
16 KiB
Python

from typing import Any
import json
from i3ipc.replies import OutputReply
from i3ipc.aio import Connection
import asyncio
import subprocess
from .utils import OutputMatch
DEFAULT_TERMINAL = "alacritty"
class Workspace:
"""A class representing a Sway workspace in the format we use.
Attributes:
index: The index of the workspace, as a string. This is the Sway workspace name, and is never displayed to the user.
name: The name of the workspace, as a string. This is the name that is displayed to the user, as defined in `workspaces.json`.
"""
index: int = None
name: str = None
active: bool = False
visible: bool = False
focused: bool = False
alerted: bool = False
def __init__(self, dictionary: dict[str, str | int | dict[str, str]]):
self.index = dictionary.get("index")
self.name = dictionary.get("name")
self.definition = dictionary
def update_state(self, state_update: Any):
self.active = True
self.visible = state_update.visible
self.focused = state_update.focused
self.alerted = state_update.urgent
def deactivate(self):
self.active = False
self.visible = False
self.focused = False
self.alerted = False
def __dict__(self):
return {
"index": str(self.index),
"name": self.name,
"active": self.active,
"visible": self.visible,
"focused": self.focused,
"alerted": self.alerted,
}
def __json__(self):
return self.__dict__()
def __repr__(self):
return f"Workspace({self.index}, '{self.name}')"
async def focus(self, i3: Connection):
"""Focus the workspace in Sway."""
await i3.command(f"workspace {self.index}")
async def move_container(self, i3: Connection):
"""Move the focused container to the workspace."""
await i3.command(f"move container to workspace {self.index}")
class WorkspaceGroup:
"""A class representing a group of workspaces. Generally maps to a monitor."""
name: str = None
workspaces: list[Workspace] = None
reverse: bool = False
def __init__(self, output_data: dict[str, str]):
self.name = output_data["group"]
self.output_names = output_data.get("names", [])
self.make = output_data.get("make", None)
self.model = output_data.get("model", None)
self.serial = output_data.get("serial", None)
self.position = output_data.get("position", [0, 0])
self.mode = output_data.get("mode", None)
self.transform = output_data.get("transform", None)
self.eww_windows = output_data.get("eww_windows", [])
self.workspaces = []
def add_workspace(self, workspace: Workspace):
"""Add a workspace to the group."""
self.workspaces.append(workspace)
def __iter__(self):
return iter(self.workspaces)
def __json__(self):
if self.reverse:
return [workspace.__json__() for workspace in reversed(self.workspaces)]
return [workspace.__json__() for workspace in self.workspaces]
def __repr__(self):
return f"WorkspaceGroup({self.name}, {repr(self.workspaces)})"
@property
def active_workspace(self) -> Workspace:
"""Returns the active workspace in the group."""
return next(
(workspace for workspace in self.workspaces if workspace.visible),
None,
)
@property
def active(self) -> bool:
"""Returns whether the group is active."""
return any(workspace.focused for workspace in self.workspaces)
async def focus(self, i3: Connection):
"""Focus the group in Sway."""
if self.make and self.model and self.serial:
await i3.command(f"focus output {self.make} {self.model} {self.serial}")
elif len(self.output_names) > 0:
for name in self.output_names:
await i3.command(f"focus output {name}")
else:
raise ValueError(
"No output name or make/model/serial provided, cannot focus group"
)
async def configure(self, i3: Connection, outputs: list[OutputReply]):
"""Configure the group output in Sway."""
transform = ""
mode = ""
selector = ""
if self.transform:
transform = f"transform {self.transform}"
if self.mode:
mode = f"mode {self.mode}"
if self.make and self.model and self.serial:
selector = f'"{self.make} {self.model} {self.serial}"'
elif len(self.output_names) > 0:
for name in self.output_names:
if name in [output.name for output in outputs]:
selector = name
break
# First, assign workspaces to the output
for workspace in self.workspaces:
await i3.command(f"workspace {workspace.index} output {selector}")
# Then, configure the output
await i3.command(
f"output {selector} position {self.position[0]} {self.position[1]} {mode} {transform} enable"
)
async def get_output_name(self, i3: Connection) -> str:
"""Get the name of the output in Sway."""
outputs = await i3.get_outputs()
# If we have make, model, and serial, search by those first
if self.make and self.model and self.serial:
for output in outputs:
if (
output.make == self.make
and output.model == self.model
and output.serial == self.serial
):
print(
f"Found output {output.name} by make, model, and serial for group {self.name}",
flush=True,
)
return output.name
# If we don't find an exact match for the output, search by name if we have any
if len(self.output_names) > 0:
for output in outputs:
if output.name in self.output_names:
print(
f"Found output {output.name} by name for group {self.name}",
flush=True,
)
return output.name
return None
def get_match_level(self, output: OutputReply) -> OutputMatch:
"""Get the match level score for the output."""
if self.make and self.model and self.serial:
if (
output.make == self.make
and output.model == self.model
and output.serial == self.serial
):
print(
f"Match level: ID_MATCH for {output.name} on group {self.name}",
flush=True,
)
return OutputMatch.ID_MATCH
if output.name in self.output_names:
print(
f"Match level: NAME_MATCH for {output.name} on group {self.name}",
flush=True,
)
return OutputMatch.NAME_MATCH
print(
f"Match level: NO_MATCH for {output.name} on group {self.name}", flush=True
)
return OutputMatch.NO_MATCH
class WorkspaceContext:
"""A class representing a context, containing all workspaces and groups within the context."""
name: str = None
groups: list[WorkspaceGroup] = None
def __init__(
self,
name: str,
data: dict[str, list[dict[str, list[int]]] | list[dict[str, str]]],
workspaces: list[Workspace],
):
self.groups = [WorkspaceGroup(output) for output in data["outputs"]]
self.name = name
for group in self.groups:
group_object = data["groups"][group.name]
if group_object.get("reverse", False):
group.reverse = True
for workspace in group_object["workspaces"]:
workspace_obj = next(
(w for w in workspaces if w.index == workspace),
None,
)
if workspace_obj:
group.add_workspace(workspace_obj)
else:
raise Exception(
f"Error: undefined workspace {workspace} referenced in context {name} group {group.name}"
)
primary_group_name = data.get("primary", self.groups[0].name)
self.primary_group = next(
(group for group in self.groups if group.name == primary_group_name),
None,
)
def add_group(self, group: WorkspaceGroup):
"""Add a group to the context."""
self.groups.append(group)
def compatability_rank(self, outputs: list[OutputReply]) -> int:
"""Get the compatability rank of the context with the given outputs."""
result = 0
for group in self.groups:
group_result = max(
[group.get_match_level(output).value for output in outputs]
)
if group_result == 0:
return 0
result += group_result
return result
async def activate(self, i3: Connection):
"""Activate the context in Sway."""
defined_displays = [
f"{group.make} {group.model} {group.serial}" for group in self.groups
]
outputs = await i3.get_outputs()
# First, close all EWW windows
proc = await asyncio.create_subprocess_exec("eww", "close-all")
await proc.wait()
# Second, if the focused workspace is not in the context, focus the primary output
if not self.active_group:
return
# Then, disable all displays, so we can assign workspaces to the correct ones before enabling them
for output in outputs:
await i3.command(f"output {output.name} disable")
# Next, configure all displays defined in the context
for group in self.groups:
await group.configure(i3, outputs)
# Finally, open all EWW windows defined in the context on the appropriate windows
for group in self.groups:
for window in group.eww_windows:
proc = await asyncio.create_subprocess_exec(
"eww",
"open",
window,
"--screen",
await group.get_output_name(i3),
)
await proc.wait()
@property
def active_group(self) -> WorkspaceGroup:
"""Returns the active group in the context."""
return next(
(group for group in self.groups if group.active),
None,
)
@property
def active_workspace(self) -> Workspace:
"""Returns the active workspace in the context."""
try:
return self.active_group.active_workspace
except AttributeError:
# A context may not have any active workspaces, in which case we return None
return None
@property
def visible_workspaces(self) -> dict[str, Workspace]:
"""Returns a dictionary of all visible workspaces in the context."""
return {group.name: group.active_workspace for group in self.groups}
def __iter__(self):
return iter(self.groups)
def __repr__(self):
return f"WorkspaceContext({self.name}, {repr(self.groups)})"
def __json__(self):
return {group.name: group.__json__() for group in self.groups}
class WorkspaceTree:
current_context: WorkspaceContext = None
workspaces: list[Workspace] = None
def __init__(self, filename: str):
with open(filename, "r") as file:
initial: dict = json.load(file)
self.contexts: list[WorkspaceContext] = []
self.workspaces = [Workspace(workspace) for workspace in initial["workspaces"]]
for context, context_dict in initial["contexts"].items():
context_obj = WorkspaceContext(context, context_dict, self.workspaces)
self.contexts.append(context_obj)
if initial.get("default_context", "personal") == context:
self.current_context = context_obj
def __dict__(self):
return {
"current_context": self.current_context,
"contexts": self.contexts,
}
def __repr__(self):
return f"WorkspaceTree({self.current_context}, {repr(self.contexts)})"
def __json__(self):
return {self.current_context.name: self.current_context.__json__()}
def get_workspace(self, user_index: int) -> Workspace:
"""Returns a workspace object based on the user index."""
if user_index < 1:
raise IndexError("Workspace index must be greater than or equal to 1.")
# First, find the active workspace
active_workspace = self.current_context.active_workspace
# Next, get its index within its group, and the active group index
active_index = (
self.current_context.active_group.workspaces.index(active_workspace) + 1
)
group_index = self.current_context.groups.index(
self.current_context.active_group
)
# If the user index is the same as the active index, or is outside the range for this group, start searching other groups
if user_index == active_index or user_index > len(
self.current_context.active_group.workspaces
):
counter = 0
while counter < len(self.current_context.groups):
group = self.current_context.groups[
(group_index + counter) % len(self.current_context.groups)
]
if (
user_index > len(group.workspaces)
or group == self.current_context.active_group
):
# Group doesn't have enough workspaces, or is the active group
counter += 1
continue
return group.workspaces[user_index - 1]
# If we didn't find a workspace, return None
return None
else:
# If the user index is different from the active index, return the workspace in the active group
return self.current_context.active_group.workspaces[user_index - 1]
def update_workspaces(self, ws_sway_dict: dict):
"""Updates the workspaces in the tree based on the return from sway IPC."""
touched = set()
for index, ws_data in ws_sway_dict.items():
# Udpate data for all active workspaces
ws_state: Workspace = next(
(ws for ws in self.workspaces if str(ws.index) == index),
None,
)
if ws_state:
touched.add(ws_state)
ws_state.update_state(ws_data)
else:
print(f"Warning: workspace {index} not found in tree.")
# Deactivate any workspaces that weren't touched
for ws in self.workspaces:
if ws not in touched:
ws.deactivate()
async def update_context(self, i3: Connection):
"""Activates a new context in Sway based on the current display configuration."""
# First, get the current display configuration
outputs = await i3.get_outputs()
print(outputs, flush=True)
# Next, calculate match scores for each context
scores = [
(context, context.compatability_rank(outputs)) for context in self.contexts
]
print([f"{context.name}: {score}" for context, score in scores], flush=True)
# Sort the scores by rank
scores.sort(key=lambda x: x[1], reverse=True)
# If the top context is the current context, or the rank is 0, do nothing
if scores[0][0] == self.current_context or scores[0][1] == 0:
return
self.current_context = scores[0][0]
await self.current_context.activate(i3)
async def activate_context(self, i3: Connection, name: str):
"""Activates a context by name. This will fail if the current display configuration is incompatible."""
context = next(
(context for context in self.contexts if context.name == name),
None,
)
outputs = await i3.get_outputs()
score = context.compatability_rank(outputs)
if score == 0:
raise ValueError(
"Context is incompatible with current display configuration."
)
print(f"Activating context {context.name} with score {score}.", flush=True)
self.current_context = context
await context.activate(i3)