2025-02-14 17:42:48 -07:00

540 lines
19 KiB
Python

from typing import Any
import json
from i3ipc.replies import OutputReply, WorkspaceReply
import asyncio
from .utils import OutputMatch
from .sway import SwayIPC
from gi.repository import GObject
from . import config
class Workspace(GObject.Object):
"""A class representing a Sway workspace in the format we use."""
_index: int = None
_name: str = None
_definition: config.Workspace = None
_active: bool = False
_visible: bool = False
_focused: bool = False
_alerted: bool = False
# This is not a GObject property because it won't ever change, and we don't
# need GObject to be aware of it.
@property
def definition(self) -> config.Workspace:
return self._definition
@GObject.Property
def index(self) -> int:
return self._index
@GObject.Property
def name(self) -> str:
return self._name
@GObject.Property
def active(self) -> bool:
return self._active
@GObject.Property
def visible(self) -> bool:
return self._visible
@GObject.Property
def focused(self) -> bool:
return self._focused
@GObject.Property
def alerted(self) -> bool:
return self._alerted
def __init__(self, dictionary: config.Workspace):
super().__init__()
self._index = dictionary.get("index")
self._name = dictionary.get("name")
self._definition = dictionary
def update_state(self, state_update: Any):
self._active = True
self._visible = state_update.visible
self._focused = state_update.focused
self._alerted = state_update.urgent
self.notify("active")
self.notify("visible")
self.notify("focused")
self.notify("alerted")
def deactivate(self):
self.active = False
self.visible = False
self.focused = False
self.alerted = False
self.notify("active")
self.notify("visible")
self.notify("focused")
self.notify("alerted")
def __repr__(self):
return f"Workspace({self.index}, '{self.name}')"
async def focus(self):
"""Focus the workspace in Sway."""
sway = SwayIPC.get_instance()
await sway.command(f"workspace {self.index}")
async def move_container(self):
"""Move the focused container to the workspace."""
sway = SwayIPC.get_instance()
await sway.command(f"move container to workspace {self.index}")
async def reassign(self, output: str):
"""Reassign the workspace to a different output."""
sway = SwayIPC.get_instance()
await sway.command(f"workspace {self.index} output {output}")
async def relocate(self, output: str):
"""Move the workspace to a different output."""
sway = SwayIPC.get_instance()
await sway.command(f"workspace {self.index}")
await sway.command(f"move workspace to output {output}")
class WorkspaceGroup(GObject.Object):
"""
A class representing a group of workspaces.
Generally maps to a monitor.
"""
name: str = None
workspaces: list[Workspace] = None
reverse: bool = False
def __init__(self, output_data: config.Output):
super().__init__()
self.name = output_data["group"]
self.output_names = output_data.get("names", [])
self.make = output_data.get("make", None)
self.model = output_data.get("model", None)
self.serial = output_data.get("serial", None)
self.position = output_data.get("position", [0, 0])
self.mode = output_data.get("mode", None)
self.transform = output_data.get("transform", None)
self.eww_windows = output_data.get("bars", output_data.get("eww_windows", []))
self.workspaces = []
def add_workspace(self, workspace: Workspace):
"""Add a workspace to the group."""
self.workspaces.append(workspace)
workspace.connect("notify::active", self.on_workspace_active)
def on_workspace_active(self, workspace: Workspace, _):
"""Handle the active state of a workspace changing."""
self.notify("active")
def __iter__(self):
return iter(self.workspaces)
def __repr__(self):
return f"WorkspaceGroup({self.name}, {repr(self.workspaces)})"
@GObject.Property
def active_workspace(self) -> Workspace:
"""Returns the active workspace in the group."""
return next(
(workspace for workspace in self.workspaces if workspace.visible),
None,
)
@GObject.Property
def active(self) -> bool:
"""Returns whether the group is active."""
return any(workspace.focused for workspace in self.workspaces)
async def focus(self):
"""Focus the group in Sway."""
sway = SwayIPC.get_instance()
if self.make and self.model and self.serial:
await sway.command(f"focus output {self.make} {self.model} {self.serial}")
elif len(self.output_names) > 0:
for name in self.output_names:
await sway.command(f"focus output {name}")
else:
raise ValueError(
"No output name or make/model/serial provided, cannot focus group"
)
async def configure(self, output: OutputReply):
"""Configure the group output in Sway."""
sway = SwayIPC.get_instance()
self.output_name = output.name
# Configure the output
await sway.command(
f"output {self.output_name} position {self.position[0]} {self.position[1]} {self.mode} enable"
)
async def validate(self, workspaces: list[WorkspaceReply]):
"""Validate that each workspace in the group is assigned to and present on the correct output."""
sway = SwayIPC.get_instance()
ouput_name = self.get_output_name(sway)
for workspace in self.workspaces:
# Get output name for workspace
workspace_output = next(
(
workspace_output
for workspace_output in workspaces
if workspace_output.name == workspace.index
),
None,
)
if workspace_output is None:
print(
f"Workspace {workspace.index} not found in workspaces, skipping validation",
flush=True,
)
continue
if workspace_output.output != ouput_name:
print(
f"Workspace {workspace.index} is assigned to {workspace_output.output}, not {ouput_name}, reassigning",
flush=True,
)
await workspace.relocate(sway, ouput_name)
async def get_output_name(self) -> str:
"""Get the name of the output in Sway."""
sway = SwayIPC.get_instance()
outputs = await sway.get_outputs()
# If we have make, model, and serial, search by those first
if self.make and self.model and self.serial:
for output in outputs:
if (
output.make == self.make
and output.model == self.model
and output.serial == self.serial
):
print(
f"Found output {output.name} by make, model, and serial for group {self.name}",
flush=True,
)
return output.name
# If we don't find an exact match for the output, search by name if we have any
if len(self.output_names) > 0:
for output in outputs:
if output.name in self.output_names:
print(
f"Found output {output.name} by name for group {self.name}",
flush=True,
)
return output.name
return None
def get_match_level(self, output: OutputReply) -> OutputMatch:
"""Get the match level score for the output."""
if self.make and self.model and self.serial:
if (
output.make == self.make
and output.model == self.model
and output.serial == self.serial
):
print(
f"Match level: ID_MATCH for {output.name} on group {self.name}",
flush=True,
)
return OutputMatch.ID_MATCH
if output.name in self.output_names:
print(
f"Match level: NAME_MATCH for {output.name} on group {self.name}",
flush=True,
)
return OutputMatch.NAME_MATCH
print(
f"Match level: NO_MATCH for {output.name} on group {self.name}", flush=True
)
return OutputMatch.NO_MATCH
class WorkspaceContext:
"""A class representing a context, containing all workspaces and groups within the context."""
name: str = None
groups: list[WorkspaceGroup] = None
def __init__(
self,
name: str,
data: dict[str, list[dict[str, list[int]]] | list[dict[str, str]]],
workspaces: list[Workspace],
):
self.groups = [WorkspaceGroup(output) for output in data["outputs"]]
self.name = name
for group in self.groups:
group_object = data["groups"][group.name]
if group_object.get("reverse", False):
group.reverse = True
for workspace in group_object["workspaces"]:
workspace_obj = next(
(w for w in workspaces if w.index == workspace),
None,
)
if workspace_obj:
group.add_workspace(workspace_obj)
else:
raise Exception(
f"Error: undefined workspace {workspace} referenced in context {name} group {group.name}"
)
primary_group_name = data.get("primary", self.groups[0].name)
self.primary_group = next(
(group for group in self.groups if group.name == primary_group_name),
None,
)
def add_group(self, group: WorkspaceGroup):
"""Add a group to the context."""
self.groups.append(group)
def compatability_rank(self, outputs: list[OutputReply]) -> int:
"""Get the compatability rank of the context with the given outputs."""
result = 0
for group in self.groups:
group_result = max(
[group.get_match_level(output).value for output in outputs]
)
if group_result == 0:
return 0
result += group_result
return result
async def deactivate(self):
"""Deactivate the context in Sway."""
# First, close all EWW windows
sway = SwayIPC.get_instance()
proc = await asyncio.create_subprocess_exec("eww", "close-all")
await proc.wait()
# Then, disable all displays
await sway.command("output * disable")
async def activate(self):
"""Activate the context in Sway."""
sway = SwayIPC.get_instance()
outputs = await sway.get_outputs()
# Configure all displays defined in the context
for group in self.groups:
await group.configure(sway, outputs)
# Then, open all EWW windows defined in the context on the appropriate windows
proc = await asyncio.create_subprocess_exec("eww", "reload")
await proc.wait()
for group in self.groups:
for window in group.eww_windows:
proc = await asyncio.create_subprocess_exec(
"eww",
"open",
window,
"--screen",
await group.get_output_name(sway),
)
await proc.wait()
# Finally, validate workspaces and move them if necessary
for group in self.groups:
workspaces = await sway.get_workspaces()
await group.validate(sway, workspaces)
@property
def active_group(self) -> WorkspaceGroup:
"""Returns the active group in the context."""
return next(
(group for group in self.groups if group.active),
None,
)
@property
def active_workspace(self) -> Workspace:
"""Returns the active workspace in the context."""
try:
return self.active_group.active_workspace
except AttributeError:
# A context may not have any active workspaces, in which case we return None
return None
@property
def visible_workspaces(self) -> dict[str, Workspace]:
"""Returns a dictionary of all visible workspaces in the context."""
return {group.name: group.active_workspace for group in self.groups}
def __iter__(self):
return iter(self.groups)
def __repr__(self):
return f"WorkspaceContext({self.name}, {repr(self.groups)})"
def __json__(self):
return {group.name: group.__json__() for group in self.groups}
class WorkspaceTree:
current_context: WorkspaceContext = None
workspaces: list[Workspace] = None
def __init__(self, filename: str):
self.load_file(filename)
def load_file(self, filename: str):
with open(filename, "r") as file:
initial: dict = json.load(file)
self.contexts: list[WorkspaceContext] = []
self.workspaces = [Workspace(workspace) for workspace in initial["workspaces"]]
for context, context_dict in initial["contexts"].items():
context_obj = WorkspaceContext(context, context_dict, self.workspaces)
self.contexts.append(context_obj)
# Don't override the current context if it's already set
if (
initial.get("default_context", "default") == context
and self.current_context is None
):
self.current_context = context_obj
def __dict__(self):
return {
"current_context": self.current_context,
"contexts": self.contexts,
}
def __repr__(self):
return f"WorkspaceTree({self.current_context}, {repr(self.contexts)})"
def __json__(self):
return {self.current_context.name: self.current_context.__json__()}
def get_workspace(self, user_index: int) -> tuple[Workspace, WorkspaceGroup] | None:
"""Returns a workspace object based on the user index."""
if user_index < 1:
raise IndexError("Workspace index must be greater than or equal to 1.")
# First, find the active workspace
active_workspace = self.current_context.active_workspace
# Next, get its index within its group, and the active group index
active_index = (
self.current_context.active_group.workspaces.index(active_workspace) + 1
)
group_index = self.current_context.groups.index(
self.current_context.active_group
)
# If the user index is the same as the active index, or is outside the range for this group, start searching other groups
if user_index == active_index or user_index > len(
self.current_context.active_group.workspaces
):
counter = 0
while counter < len(self.current_context.groups):
group = self.current_context.groups[
(group_index + counter) % len(self.current_context.groups)
]
if (
user_index > len(group.workspaces)
or group == self.current_context.active_group
):
# Group doesn't have enough workspaces, or is the active group
counter += 1
continue
return group.workspaces[user_index - 1], group
# If we didn't find a workspace, return None
return None
else:
# If the user index is different from the active index, return the workspace in the active group
return (
self.current_context.active_group.workspaces[user_index - 1],
self.current_context.active_group,
)
def update_workspaces(self, ws_sway_dict: dict):
"""Updates the workspaces in the tree based on the return from sway IPC."""
touched = set()
for index, ws_data in ws_sway_dict.items():
# Udpate data for all active workspaces
ws_state: Workspace = next(
(ws for ws in self.workspaces if str(ws.index) == index),
None,
)
if ws_state:
touched.add(ws_state)
ws_state.update_state(ws_data)
else:
print(f"Warning: workspace {index} not found in tree.")
# Deactivate any workspaces that weren't touched
for ws in self.workspaces:
if ws not in touched:
ws.deactivate()
async def update_context(self, match_context_on_name: bool = False):
"""Activates a new context in Sway based on the current display configuration."""
# First, get the current display configuration
sway = SwayIPC.get_instance()
outputs = await sway.get_outputs()
print(outputs, flush=True)
# Next, calculate match scores for each context
scores = [
(context, context.compatability_rank(outputs)) for context in self.contexts
]
print([f"{context.name}: {score}" for context, score in scores], flush=True)
# Sort the scores by rank
scores.sort(key=lambda x: x[1], reverse=True)
# If the top context is the current context, or the rank is 0, do nothing
if scores[0][1] == 0:
print(
"No context is compatible with the current display configuration, doing nothing.",
flush=True,
)
return
if self.current_context is None:
await scores[0][0].activate(sway)
self.current_context = scores[0][0]
return
elif scores[0][0] == self.current_context:
print("Context is already active.", flush=True)
elif match_context_on_name and scores[0][0].name == self.current_context.name:
print("Context is already active.", flush=True)
await scores[0][0].activate(sway)
self.current_context = scores[0][0]
else:
await self.current_context.deactivate(sway)
await scores[0][0].activate(sway)
self.current_context = scores[0][0]
async def activate_context(self, name: str):
"""Activates a context by name. This will fail if the current display configuration is incompatible."""
sway = SwayIPC.get_instance()
context = next(
(context for context in self.contexts if context.name == name),
None,
)
outputs = await sway.get_outputs()
score = context.compatability_rank(outputs)
if score == 0:
raise ValueError(
"Context is incompatible with current display configuration."
)
print(f"Activating context {context.name} with score {score}.", flush=True)
if self.current_context is not None and self.current_context != context:
await self.current_context.deactivate()
self.current_context = context
await context.activate(sway)