214 lines
6.4 KiB
Python
Executable File
214 lines
6.4 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import sys
|
|
|
|
WorkspaceTree = dict[str, dict[str, list['Workspace']]]
|
|
WorkspaceList = dict[str, 'Workspace']
|
|
|
|
socketdir = f"/tmp/hypr/{os.environ['HYPRLAND_INSTANCE_SIGNATURE']}"
|
|
socket1 = f"{socketdir}/.socket.sock"
|
|
socket2 = f"{socketdir}/.socket2.sock"
|
|
|
|
listened_events = ['workspace', 'movewindow', 'createworkspace', 'destroyworkspace']
|
|
|
|
class Workspace:
|
|
|
|
index: str = None
|
|
name: str = None
|
|
exec_cmd: str = None
|
|
group: str = None
|
|
|
|
active: bool = False
|
|
visible: bool = False
|
|
focused: bool = False
|
|
alerted: bool = False
|
|
|
|
def __init__(self, dictionary: dict[str, str | int], group: str):
|
|
self.index = str(dictionary.get('index'))
|
|
self.name = dictionary.get('name')
|
|
self.exec_cmd = dictionary.get('exec')
|
|
self.group = group
|
|
|
|
self.active = dictionary.get('active', False)
|
|
self.visible = dictionary.get('visible', False)
|
|
self.focused = dictionary.get('focused', False)
|
|
self.alerted = dictionary.get('alerted', False)
|
|
|
|
def update_state(self, state_update, monitor_dict):
|
|
self.active = True
|
|
self.visible = monitor_dict[state_update['monitor']]['activeWorkspace']['name'] == state_update['name']
|
|
self.focused = self.visible and monitor_dict[state_update['monitor']]['focused']
|
|
self.alerted = False
|
|
|
|
def deactivate(self):
|
|
self.active = False
|
|
self.visible = False
|
|
self.focused = False
|
|
self.alerted = False
|
|
|
|
@classmethod
|
|
def parse_file(cls: type, filename: str) -> tuple[WorkspaceTree, WorkspaceList]:
|
|
|
|
result = {}
|
|
|
|
initial: dict = None
|
|
with open(filename, 'r') as file:
|
|
initial = json.load(file)
|
|
|
|
workspaces: WorkspaceList = {}
|
|
|
|
def find_workspace(workspace: dict[str, str|int], group: str) -> 'Workspace':
|
|
index = str(workspace.get('index'))
|
|
if index in workspaces:
|
|
return workspaces[index]
|
|
else:
|
|
workspaces[index] = cls(workspace, group)
|
|
return workspaces[index]
|
|
|
|
result = {
|
|
context: {
|
|
group: [
|
|
find_workspace(workspace, group) for workspace in workspaces
|
|
] for group, workspaces in groups.items()
|
|
} for context, groups in initial.items()
|
|
}
|
|
|
|
return result, workspaces
|
|
|
|
@staticmethod
|
|
def full_dictify(tree: WorkspaceTree):
|
|
return {
|
|
context: {
|
|
group: [
|
|
workspace.dictify() for workspace in workspaces
|
|
] for group, workspaces in groups.items()
|
|
} for context, groups in tree.items()
|
|
}
|
|
|
|
def dictify(self):
|
|
|
|
return {
|
|
'index': self.index,
|
|
'name': self.name,
|
|
'exec': self.exec_cmd,
|
|
'active': self.active,
|
|
'visible': self.visible,
|
|
'focused': self.focused,
|
|
'alerted': self.alerted
|
|
}
|
|
|
|
workspace_tree, workspace_list = Workspace.parse_file(f"{os.environ['HOME']}/.config/hypr/workspaces.json")
|
|
|
|
data = {
|
|
"ws": {},
|
|
"current": {},
|
|
"context": "personal",
|
|
"visible": {},
|
|
}
|
|
|
|
def write_data():
|
|
global data
|
|
print(json.dumps(data), flush=True)
|
|
|
|
async def get_workspace_data():
|
|
request = b"[[BATCH]] j/workspaces; j/monitors"
|
|
reader = None
|
|
writer = None
|
|
data = None
|
|
try:
|
|
reader, writer = await asyncio.open_unix_connection(socket1)
|
|
writer.write(request)
|
|
await writer.drain()
|
|
data = await reader.read()
|
|
finally:
|
|
if writer is not None:
|
|
writer.close()
|
|
if data is None:
|
|
return;
|
|
|
|
workspace_data = None
|
|
monitor_data = None
|
|
try:
|
|
workspace_data, monitor_data = data.decode().split("}][{")
|
|
workspace_data += "}]"
|
|
monitor_data = "[{" + monitor_data
|
|
except ValueError:
|
|
print("Error unpacking response", file=sys.stderr)
|
|
return
|
|
|
|
return [json.loads(workspace_data), json.loads(monitor_data)]
|
|
|
|
async def update_workspaces():
|
|
global workspace_tree, workspace_list, data
|
|
|
|
workspaces, monitors = await get_workspace_data()
|
|
|
|
ws_hypr_dict = { ws["name"]: ws for ws in workspaces }
|
|
monitor_dict = { monitor["name"]: monitor for monitor in monitors }
|
|
|
|
touched = []
|
|
|
|
for index, ws_data in ws_hypr_dict.items():
|
|
|
|
# Update data for all active workspaces
|
|
ws_state = workspace_list.get(index)
|
|
if ws_state is None:
|
|
# If the workspace isn't configured, note it and handle gracefully
|
|
print(f"Warning: found unconfigured workspace {index}", file=sys.stderr, flush=True)
|
|
if monitor_dict[ws_data['monitor']]['activeWorkspace']['name'] == ws_data['name']:
|
|
data['current'] = {
|
|
'index': index,
|
|
'name': 'undefined',
|
|
'exec': 'alacritty',
|
|
'active': True,
|
|
'visible': True,
|
|
'focused': True,
|
|
'alerted': False
|
|
}
|
|
else:
|
|
# Otherwise, 'touch' it and update status data
|
|
touched.append(index)
|
|
ws_state.update_state(ws_data, monitor_dict)
|
|
if ws_state.focused:
|
|
data['current'] = ws_state.dictify()
|
|
if ws_state.visible:
|
|
data['visible'][ws_state.group] = ws_state.dictify()
|
|
|
|
inactives = [
|
|
v for k, v in workspace_list.items() if v.active and k not in touched
|
|
]
|
|
|
|
for workspace in inactives:
|
|
workspace.deactivate()
|
|
|
|
# Create an 'unknown' workspace object for each group that doesn't yet have a listed visible workspace
|
|
for group in workspace_tree[data['context'] or 'personal'].keys():
|
|
if group not in data['visible']:
|
|
data['visible'][group] = {
|
|
'index': 'U',
|
|
'name': 'undefined',
|
|
'exec': 'alacritty',
|
|
'active': True,
|
|
'visible': True,
|
|
'focused': False,
|
|
'alerted': False
|
|
}
|
|
data['ws'] = Workspace.full_dictify(workspace_tree)
|
|
write_data()
|
|
|
|
async def main():
|
|
reader, writer = await asyncio.open_unix_connection(socket2)
|
|
|
|
await update_workspaces()
|
|
|
|
while True:
|
|
event = (await reader.readline()).decode()
|
|
eventType, eventData = event.split(">>")
|
|
if eventType in listened_events:
|
|
await update_workspaces()
|
|
|
|
asyncio.run(main())
|