Added config watching and improved workspace output assignment

This commit is contained in:
Ezri Brimhall 2024-10-23 15:21:01 -06:00
parent b44110ca19
commit ded2c8676e
Signed by: ezri
GPG Key ID: 058A78E5680C6F24
7 changed files with 190 additions and 33 deletions

42
poetry.lock generated
View File

@ -18,6 +18,17 @@ six = ">=1.12.0"
astroid = ["astroid (>=1,<2)", "astroid (>=2,<4)"]
test = ["astroid (>=1,<2)", "astroid (>=2,<4)", "pytest"]
[[package]]
name = "asyncinotify"
version = "4.2.0"
description = "A simple optionally-async python inotify library, focused on simplicity of use and operation, and leveraging modern Python features"
optional = false
python-versions = "<4,>=3.6"
files = [
{file = "asyncinotify-4.2.0-py3-none-any.whl", hash = "sha256:23cbcb0704cc65a2009d5ddc5a70dc5be6560708d8a684bba82e03e384c6295f"},
{file = "asyncinotify-4.2.0.tar.gz", hash = "sha256:dac1d75e16a4919c6eab84a90ff51218db622c5524a84a5c501a0b62ea7ec7ea"},
]
[[package]]
name = "colorama"
version = "0.4.6"
@ -40,6 +51,20 @@ files = [
{file = "decorator-5.1.1.tar.gz", hash = "sha256:637996211036b6385ef91435e4fae22989472f9d571faba8927ba8253acbc330"},
]
[[package]]
name = "exceptiongroup"
version = "1.2.2"
description = "Backport of PEP 654 (exception groups)"
optional = false
python-versions = ">=3.7"
files = [
{file = "exceptiongroup-1.2.2-py3-none-any.whl", hash = "sha256:3111b9d131c238bec2f8f516e123e14ba243563fb135d3fe885990585aa7795b"},
{file = "exceptiongroup-1.2.2.tar.gz", hash = "sha256:47c2edf7c6738fafb49fd34290706d1a1a2f4d1c6df275526b62cbb4aa5393cc"},
]
[package.extras]
test = ["pytest (>=6)"]
[[package]]
name = "executing"
version = "2.1.0"
@ -82,6 +107,7 @@ files = [
[package.dependencies]
colorama = {version = "*", markers = "sys_platform == \"win32\""}
decorator = "*"
exceptiongroup = {version = "*", markers = "python_version < \"3.11\""}
jedi = ">=0.16"
matplotlib-inline = "*"
pexpect = {version = ">4.3", markers = "sys_platform != \"win32\" and sys_platform != \"emscripten\""}
@ -89,6 +115,7 @@ prompt-toolkit = ">=3.0.41,<3.1.0"
pygments = ">=2.4.0"
stack-data = "*"
traitlets = ">=5.13.0"
typing-extensions = {version = ">=4.6", markers = "python_version < \"3.12\""}
[package.extras]
all = ["ipython[black,doc,kernel,matplotlib,nbconvert,nbformat,notebook,parallel,qtconsole]", "ipython[test,test-extra]"]
@ -290,6 +317,17 @@ files = [
docs = ["myst-parser", "pydata-sphinx-theme", "sphinx"]
test = ["argcomplete (>=3.0.3)", "mypy (>=1.7.0)", "pre-commit", "pytest (>=7.0,<8.2)", "pytest-mock", "pytest-mypy-testing"]
[[package]]
name = "typing-extensions"
version = "4.12.2"
description = "Backported and Experimental Type Hints for Python 3.8+"
optional = false
python-versions = ">=3.8"
files = [
{file = "typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d"},
{file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"},
]
[[package]]
name = "wcwidth"
version = "0.2.13"
@ -303,5 +341,5 @@ files = [
[metadata]
lock-version = "2.0"
python-versions = "^3.12"
content-hash = "8a614d458afd106177bcc01d92e0fa81508bbde310b172ecd879451341907021"
python-versions = "^3.10"
content-hash = "9124e77caf9be4c685a59b0aea187f0f0bb60f2639b74ed8aa6f47ea97aba32e"

View File

@ -14,6 +14,7 @@ sway_get_focused_workspace = 'sway_context_manager.client:get_workspace_definiti
python = "^3.10"
i3ipc = "^2.2.1"
sdbus = "^0.12.0"
asyncinotify = "^4.2.0"
[tool.poetry.group.dev.dependencies]

View File

@ -44,10 +44,12 @@ class ContextMngrInterface(
async def focus_workspace(self, user_index: int) -> bool:
"""Focus a workspace by its 1-indexed ID in its group."""
print("focusing workspace", user_index)
workspace = self.workspace_tree.get_workspace(user_index)
if workspace is None:
workspace, group = self.workspace_tree.get_workspace(user_index)
if workspace is None or group is None:
return False
await workspace.focus(self.connection)
await workspace.relocate(
self.connection, await group.get_output_name(self.connection)
)
return True
@dbus_method_async(input_signature="y", result_signature="b")

View File

@ -4,6 +4,8 @@ from .interface import ContextMngrInterface
from .swayipc import SwayConnection
import os
from sdbus import request_default_bus_name_async, sd_bus_open_user, set_default_bus
from .workspace_config_watcher import Watcher
from pathlib import Path
WORKSPACES_FILE = os.environ.get(
"WORKSPACES_FILE", os.path.expanduser("~/.config/sway/workspaces.json")
@ -11,6 +13,7 @@ WORKSPACES_FILE = os.environ.get(
def main():
set_default_bus(sd_bus_open_user())
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
@ -18,11 +21,13 @@ def main():
tree = WorkspaceTree(WORKSPACES_FILE)
sway = SwayConnection(tree)
interface = ContextMngrInterface(tree, sway.connection)
watcher = Watcher(Path(WORKSPACES_FILE), sway, tree, interface)
sway.on_change(interface.on_tree_changed)
loop.run_until_complete(request_default_bus_name_async("dev.ezri.sway"))
interface.export_to_dbus("/ContextManager")
loop.create_task(sway.run())
loop.create_task(watcher.run())
loop.run_forever()

View File

@ -2,6 +2,7 @@ from i3ipc.aio import Connection
from i3ipc import Event
from .workspace_tree import WorkspaceTree
from .utils import async_debounce
import asyncio
class SwayConnection:
@ -35,6 +36,11 @@ class SwayConnection:
"""On mode change event, do something. Not sure what yet."""
pass
async def on_shutdown(self, event):
"""On shutdown event, close the connection and exit"""
await self.connection.close()
asyncio.get_event_loop().stop()
async def run(self):
# Connect to sway
await self.connection.connect()
@ -43,6 +49,7 @@ class SwayConnection:
self.connection.on(Event.WORKSPACE, self.on_workspace)
self.connection.on(Event.OUTPUT, self.on_output)
self.connection.on(Event.MODE, self.on_mode)
self.connection.on(Event.SHUTDOWN, self.on_shutdown)
# Get initial output set
self.last_output_set = await self.connection.get_outputs()

View File

@ -0,0 +1,40 @@
from asyncinotify import Inotify, Mask
from .swayipc import SwayConnection
from .interface import ContextMngrInterface
from .workspace_tree import WorkspaceTree
from pathlib import Path
class Watcher:
def __init__(
self,
filepath: Path,
sway: SwayConnection,
tree: WorkspaceTree,
interface: ContextMngrInterface,
):
self.sway = sway
self.tree = tree
self.interface = interface
self.filepath = filepath
if self.filepath.is_symlink():
# Resolve the symlink so we're tracking the actual file
self.filepath = self.filepath.resolve()
file_dir = self.filepath.parent
self.inotify = Inotify()
# Watch for changes in the file
self.watch = self.inotify.add_watch(
file_dir,
Mask.CREATE | Mask.DELETE | Mask.MOVED_TO | Mask.MOVED_FROM | Mask.MODIFY,
)
async def run(self):
async for event in self.inotify:
if event.path == self.filepath:
print(f"Config file changed, reloading", flush=True)
self.tree.load_file(str(self.filepath))
await self.tree.update_context(self.sway.connection, True)
await self.interface.on_tree_changed(self.tree)

View File

@ -1,6 +1,6 @@
from typing import Any
import json
from i3ipc.replies import OutputReply
from i3ipc.replies import OutputReply, WorkspaceReply
from i3ipc.aio import Connection
import asyncio
import subprocess
@ -67,6 +67,15 @@ class Workspace:
"""Move the focused container to the workspace."""
await i3.command(f"move container to workspace {self.index}")
async def reassign(self, i3: Connection, output: str):
"""Reassign the workspace to a different output."""
await i3.command(f"workspace {self.index} output {output}")
async def relocate(self, i3: Connection, output: str):
"""Move the workspace to a different output."""
await i3.command(f"workspace {self.index}")
await i3.command(f"move workspace to output {output}")
class WorkspaceGroup:
"""A class representing a group of workspaces. Generally maps to a monitor."""
@ -143,14 +152,38 @@ class WorkspaceGroup:
if name in [output.name for output in outputs]:
selector = name
break
# First, assign workspaces to the output
for workspace in self.workspaces:
await i3.command(f"workspace {workspace.index} output {selector}")
# Then, configure the output
# Configure the output
await i3.command(
f"output {selector} position {self.position[0]} {self.position[1]} {mode} {transform} enable"
)
async def validate(self, i3: Connection, workspaces: list[WorkspaceReply]):
"""Validate that each workspace in the group is assigned to and present on the correct output."""
ouput_name = self.get_output_name(i3)
for workspace in self.workspaces:
# Get output name for workspace
workspace_output = next(
(
workspace_output
for workspace_output in workspaces
if workspace_output.name == workspace.index
),
None,
)
if workspace_output is None:
print(
f"Workspace {workspace.index} not found in workspaces, skipping validation",
flush=True,
)
continue
if workspace_output.output != ouput_name:
print(
f"Workspace {workspace.index} is assigned to {workspace_output.output}, not {ouput_name}, reassigning",
flush=True,
)
await workspace.relocate(i3, ouput_name)
async def get_output_name(self, i3: Connection) -> str:
"""Get the name of the output in Sway."""
outputs = await i3.get_outputs()
@ -255,30 +288,29 @@ class WorkspaceContext:
result += group_result
return result
async def deactivate(self, i3: Connection):
"""Deactivate the context in Sway."""
# First, close all EWW windows
proc = await asyncio.create_subprocess_exec("eww", "close-all")
await proc.wait()
# Then, disable all displays
await i3.command("output * disable")
async def activate(self, i3: Connection):
"""Activate the context in Sway."""
defined_displays = [
f"{group.make} {group.model} {group.serial}" for group in self.groups
]
outputs = await i3.get_outputs()
# First, close all EWW windows
proc = await asyncio.create_subprocess_exec("eww", "close-all")
await proc.wait()
# Second, if the focused workspace is not in the context, focus the primary output
if not self.active_group:
return
# Then, disable all displays, so we can assign workspaces to the correct ones before enabling them
for output in outputs:
await i3.command(f"output {output.name} disable")
# Next, configure all displays defined in the context
# Configure all displays defined in the context
for group in self.groups:
await group.configure(i3, outputs)
# Finally, open all EWW windows defined in the context on the appropriate windows
# Then, open all EWW windows defined in the context on the appropriate windows
proc = await asyncio.create_subprocess_exec("eww", "reload")
await proc.wait()
for group in self.groups:
for window in group.eww_windows:
proc = await asyncio.create_subprocess_exec(
@ -290,6 +322,11 @@ class WorkspaceContext:
)
await proc.wait()
# Finally, validate workspaces and move them if necessary
for group in self.groups:
workspaces = await i3.get_workspaces()
await group.validate(i3, workspaces)
@property
def active_group(self) -> WorkspaceGroup:
"""Returns the active group in the context."""
@ -328,7 +365,9 @@ class WorkspaceTree:
workspaces: list[Workspace] = None
def __init__(self, filename: str):
self.load_file(filename)
def load_file(self, filename: str):
with open(filename, "r") as file:
initial: dict = json.load(file)
@ -338,7 +377,11 @@ class WorkspaceTree:
for context, context_dict in initial["contexts"].items():
context_obj = WorkspaceContext(context, context_dict, self.workspaces)
self.contexts.append(context_obj)
if initial.get("default_context", "personal") == context:
# Don't override the current context if it's already set
if (
initial.get("default_context", "default") == context
and self.current_context is None
):
self.current_context = context_obj
def __dict__(self):
@ -353,7 +396,7 @@ class WorkspaceTree:
def __json__(self):
return {self.current_context.name: self.current_context.__json__()}
def get_workspace(self, user_index: int) -> Workspace:
def get_workspace(self, user_index: int) -> tuple[Workspace, WorkspaceGroup] | None:
"""Returns a workspace object based on the user index."""
if user_index < 1:
raise IndexError("Workspace index must be greater than or equal to 1.")
@ -384,12 +427,15 @@ class WorkspaceTree:
counter += 1
continue
return group.workspaces[user_index - 1]
return group.workspaces[user_index - 1], group
# If we didn't find a workspace, return None
return None
else:
# If the user index is different from the active index, return the workspace in the active group
return self.current_context.active_group.workspaces[user_index - 1]
return (
self.current_context.active_group.workspaces[user_index - 1],
self.current_context.active_group,
)
def update_workspaces(self, ws_sway_dict: dict):
"""Updates the workspaces in the tree based on the return from sway IPC."""
@ -412,7 +458,7 @@ class WorkspaceTree:
if ws not in touched:
ws.deactivate()
async def update_context(self, i3: Connection):
async def update_context(self, i3: Connection, match_context_on_name: bool = False):
"""Activates a new context in Sway based on the current display configuration."""
# First, get the current display configuration
outputs = await i3.get_outputs()
@ -427,11 +473,26 @@ class WorkspaceTree:
scores.sort(key=lambda x: x[1], reverse=True)
# If the top context is the current context, or the rank is 0, do nothing
if scores[0][0] == self.current_context or scores[0][1] == 0:
if scores[0][1] == 0:
print(
"No context is compatible with the current display configuration, doing nothing.",
flush=True,
)
return
if self.current_context is None:
await scores[0][0].activate(i3)
self.current_context = scores[0][0]
return
elif scores[0][0] == self.current_context:
print("Context is already active.", flush=True)
elif match_context_on_name and scores[0][0].name == self.current_context.name:
print("Context is already active.", flush=True)
await scores[0][0].activate(i3)
self.current_context = scores[0][0]
else:
await self.current_context.deactivate(i3)
await scores[0][0].activate(i3)
self.current_context = scores[0][0]
await self.current_context.activate(i3)
async def activate_context(self, i3: Connection, name: str):
"""Activates a context by name. This will fail if the current display configuration is incompatible."""
@ -448,5 +509,8 @@ class WorkspaceTree:
"Context is incompatible with current display configuration."
)
print(f"Activating context {context.name} with score {score}.", flush=True)
if self.current_context is not None and self.current_context != context:
await self.current_context.deactivate(i3)
self.current_context = context
await context.activate(i3)