Added custom tz support to eww clocks

~/.config/eww/displaytime, if it exists, should be a symlink to a
timezone file (like /etc/localtime). When set, clocks will use this
timezone instead of system time. The large sidebar clock will show
the timezone name when the timezone used was not read from /etc/localtime.
This commit is contained in:
2026-01-23 10:29:24 -07:00
parent d55a8a4d0d
commit e44618391b
8 changed files with 424 additions and 35 deletions

View File

@@ -1 +1,2 @@
__pycache__/
/displaytime

View File

@@ -88,6 +88,11 @@
color: $red;
}
.timezone {
color: $red;
padding-top: 5px;
}
.offline {
color: $bg1;
}

View File

@@ -9,16 +9,16 @@
:spacing 0
:orientation "v"
(box :orientation "h"
:spacing -20
:spacing -5
:space-evenly true
:class "special"
"${formattime(clock--data, '%H')}"
"${formattime(clock--data.stamp, '%H', clock--data.tz)}"
":"
"${formattime(clock--data, '%M')}"
"${formattime(clock--data.stamp, '%M', clock--data.tz)}"
":"
"${formattime(clock--data, '%S')}"
"${formattime(clock--data.stamp, '%S', clock--data.tz)}"
)
(label :text {formattime(clock--data, "%Y-%m-%d")})))
(label :text {formattime(clock--data.stamp, "%Y-%m-%d", clock--data.tz)})))
(defwidget big-clock []
@@ -28,9 +28,9 @@
:valign "center"
:width 100
:orientation "h"
(box :halign "start" {formattime(clock--data, "%H")})
(box :halign "start" {formattime(clock--data.stamp, "%H", clock--data.tz)})
(box :halign "center" ":")
(box :halign "end" {formattime(clock--data, "%M")})))
(box :halign "end" {formattime(clock--data.stamp, "%M", clock--data.tz)})))
(defwidget date []
@@ -40,10 +40,10 @@
:halign "start"
:valign "center"
:orientation "h"
(label :text {formattime(clock--data, "%A")}
(label :text {formattime(clock--data.stamp, "%A", clock--data.tz)}
:class "medium"
:valign "center")
(label :text {formattime(clock--data, "%B %d")}
(label :text {formattime(clock--data.stamp, "%B %d", clock--data.tz)}
:class "special"
:valign "center")))
@@ -53,7 +53,7 @@
(button :onclick "echo -n $(date +%Y-%d-%m) | wl-copy && eww update clock--show=date && sleep 2 && eww update clock--show=clock"
:onrightclick "echo -n $(date +%s) | wl-copy && eww update clock--show=unixtime && sleep 2 && eww update clock--show=clock"
(overlay
(box :class "module text nebula"
(box :class "module text"
:spacing 0
:space-evenly false
:orientation "v"
@@ -62,12 +62,17 @@
:width 150
:halign "center"
:orientation "h"
(box :halign "start" {formattime(clock--data, "%H")})
(box :halign "start" {formattime(clock--data.stamp, "%H", clock--data.tz)})
(box :halign "center" ":")
(box :halign "end" {formattime(clock--data, "%M")}))
(label :text {formattime(clock--data, "%A")}
:class "special")
(label :text {formattime(clock--data, "%B %d")}))
(box :halign "end" {formattime(clock--data.stamp, "%M", clock--data.tz)}))
(label :text {formattime(clock--data.stamp, "%A", clock--data.tz)}
:class "special nebula")
(label :text {formattime(clock--data.stamp, "%B %d", clock--data.tz)}
:class "nebula")
(label :text {clock--data.tz}
:class "highlight timezone"
:visible {clock--data.tz-source != "system"})
)
(box
:height 100
(revealer :transition "crossfade"

View File

@@ -21,10 +21,13 @@
{mpris--data.position_minutes}
":"
{mpris--data.position_seconds}
"/"
{mpris--data.length_minutes}
":"
{mpris--data.length_seconds})
(box :space-evenly false
:orientation "h"
:visible {mpris--data.length > 0}
"/"
{mpris--data.length_minutes}
":"
{mpris--data.length_seconds}))
)
(label :class "offline"
:visible {!mpris--data.running}

View File

@@ -67,7 +67,7 @@
:visible {!device.online}
:text "offline")
(label :visible {device.online}
:class {network--data.last_update.unix < clock--data - 30 ? "highlight" : "special"}
:class {network--data.last_update.unix < clock--data.stamp - 30 ? "highlight" : "special"}
:text "${device.addresses[0].address}/${device.addresses[0].prefixlen}")))
(defwidget network--secure
@@ -237,7 +237,7 @@
(box :halign "end"
:spacing 0
:space-evenly false
(label :class {network--data.last_update.unix < clock--data - 30 ? "highlight" : "special"}
(label :class {network--data.last_update.unix < clock--data.stamp - 30 ? "highlight" : "special"}
:text "${network--data.last_update.month}-${network--data.last_update.day} ${network--data.last_update.hour}:${network--data.last_update.minute}:${network--data.last_update.second}")))
))

View File

@@ -8,15 +8,59 @@ Used instead of the `date` command in order to avoid spawning a new process ever
import datetime
import sys
import time
import asyncio
from asyncinotify import Inotify, Mask
from pathlib import Path
from json import dump
def get_date():
"""Return the current date as a JSON object string."""
return int(datetime.datetime.now().timestamp())
def refresh_tz():
"""Reload the timezone."""
tz_file = Path("~/.config/eww/displaytime").expanduser()
source = "user"
if not tz_file.exists():
tz_file = Path("/etc/localtime")
source = "system"
if tz_file.exists():
tz = str(tz_file.resolve().relative_to("/usr/share/zoneinfo"))
else:
tz = "Etc/UTC"
source = "static"
return tz, source
timezone, source = refresh_tz()
async def main():
"""Print loop."""
asyncio.ensure_future(asyncio.create_task(monitor()))
while True:
dump(
{
"stamp": int(datetime.datetime.now().timestamp()),
"tz": timezone,
"tz-source": source,
},
sys.stdout,
)
sys.stdout.write("\n")
sys.stdout.flush()
await asyncio.sleep(0.5)
async def monitor():
"""Timezone monitor loop."""
global timezone, source
with Inotify() as inotify:
inotify.add_watch(
Path("~/.config/eww").expanduser(),
Mask.CREATE | Mask.DELETE,
)
inotify.add_watch(Path("/etc"), Mask.CREATE | Mask.DELETE)
async for event in inotify:
timezone, source = refresh_tz()
if __name__ == "__main__":
while True:
print(get_date())
sys.stdout.flush()
time.sleep(0.5)
asyncio.run(main())

View File

@@ -9,6 +9,9 @@ import sys
PRIORITY_PLAYERS = {"Feishin"}
# Override the reported album if the URL is in this dict
URL_ALBUM_OVERRIDES = {"https://distantworlds3.space/radio/": "Distant Radio"}
class MprisMonitor(Gio.Application):
"""MPRIS monitor application."""
@@ -98,7 +101,10 @@ class MprisMonitor(Gio.Application):
== Mpris.PlaybackStatus.PLAYING,
"title": player.props.title,
"artist": player.props.artist,
"album": player.props.album,
"album": URL_ALBUM_OVERRIDES.get(
player.props.metadata.unpack().get("xesam:url", ""),
player.props.album,
),
"album_artist": player.props.album_artist,
"position": player.props.position,
"position_minutes": int((player.props.position) // 60),

337
.config/eww/scripts/network2.py Normal file → Executable file
View File

@@ -7,17 +7,75 @@ from ipaddress import (
IPv4Interface as IPInterface,
AddressValueError,
)
from dbus_fast.aio import MessageBus
from dbus_fast import Variant, BusType
import re
import asyncio
from subprocess import PIPE
import functools
import sys
from typing import Any, Self, Callable, Awaitable
import aiohttp
import datetime
import json
import socket
from vpn_manager.api.manager import VPNManager, VPNConnection
MONITOR_IPS = {"ezrinet": IPAddress("10.242.3.1"), "internet": IPAddress("1.1.1.1")}
INTERFACE_IGNORE_PATTERNS = [
class PingableIPAddress(IPAddress):
"""IPv4 address with a ping function."""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.reachable: bool | None = None
async def ping(self) -> tuple[bool, float | None]:
"""
Ping this IP address.
Returns a tuple of [success, ping time in ms]
"""
proc = await asyncio.create_subprocess_exec(
"ping", "-c1", "-w1", "-n", str(self), stdout=PIPE
)
stdout, stderr = await proc.communicate()
_match = re.search(
r"icmp_seq=1 ttl=[0-9]+ time=([0-9]+\.?[0-9]*) ms", stdout.decode("utf-8")
)
if _match is None:
self.reachable = False
return (False, None)
else:
self.reachable = True
return (True, float(_match.group(1)))
class Netdata:
"""Container for netdata."""
monitor_ips = {
"ezrinet": PingableIPAddress("10.242.3.1"),
"internet": PingableIPAddress("1.1.1.1"),
}
ssids: list[str] | None = None
public_ip: IPAddress | None = None
public_ip_meta: dict[str, str] | None = None
last_ip_pull = datetime.datetime.fromtimestamp(0)
gateway: PingableIPAddress | None = None
route_on: str | None = None
net_state: str | None = None
vpns: dict[str, bool] = {}
interfaces: dict[str, Any] = {}
INTERFACE_IGNORE_PATTERN = re.compile(r"^vb-|^vz-|^virbr[0-9]+|^lo$")
[
re.compile(r"^vb-"),
re.compile(r"^vz-"),
re.compile(r"^virbr[0-9]+"),
@@ -34,7 +92,12 @@ def timer(interval: float, initial_delay: float = 0):
try:
await asyncio.sleep(initial_delay)
while True:
await func()
try:
await func()
except asyncio.CancelledError:
return
except:
pass
await asyncio.sleep(interval)
except asyncio.CancelledError:
return
@@ -67,6 +130,268 @@ async def ping(address: IPAddress) -> tuple[bool, float | None]:
return (True, float(_match.group(1)))
@timer(5)
async def ping_checks():
class Poller:
"""Poller."""
def __init__(self, freq: int, startup_delay: int = 0):
self._freq = freq
self._startup_delay = startup_delay
self._coros = []
self._task: asyncio.Task | None = None
self._after_poll = lambda context: None
self._setup = lambda context: None
self._context: dict[str, Any] = {}
def poll(
self, func: Callable[[asyncio.TaskGroup, dict[str, Any]], Awaitable[None]]
) -> Callable[[asyncio.TaskGroup], Awaitable[None]]:
"""Decorate a coroutine function to be polled."""
# suppress the exceptions, logging them but otherwise ignoring them.
async def suppressor(*args):
try:
await func(*args)
except (KeyboardInterrupt, SystemExit, asyncio.CancelledError):
print(
f"Poller {func.__name__} cancelled or aborted.",
file=sys.stderr,
flush=True,
)
raise
except BaseException as e:
print(
f"Poller {func.__name__} raised an exception:",
e,
file=sys.stderr,
flush=True,
)
self._coros.append(suppressor)
def after_poll(
self, func: Callable[[dict[str, Any]], None]
) -> Callable[[dict[str, Any]], None]:
"""
Decorate a function to run after each poll cycle (after all pollers are done).
Should not be a coroutine function, and should not execute any blocking I/O or long-running
tasks.
"""
self._after_poll = func
return func
def setup(
self, func: Callable[[dict[str, Any]], None]
) -> Callable[[dict[str, Any]], None]:
"""
Decorate a function to run before the poll cycle starts to set up the context.
This can be a coroutine.
"""
self._setup = func
return func
async def start(self):
"""Execute polling in a loop."""
try:
await asyncio.sleep(self._startup_delay)
if asyncio.iscoroutinefunction(self._setup):
await self._setup(self._context)
else:
self._setup(self._context)
while True:
start = datetime.datetime.now()
try:
async with asyncio.timeout(self._freq):
async with asyncio.TaskGroup() as tg:
for coro in self._coros:
tg.create_task(coro(tg, self._context))
except asyncio.TimeoutError:
print(
"Warning: poll took too long! Not all data will be updated!",
file=sys.stderr,
flush=True,
)
self._after_poll(self._context)
end = datetime.datetime.now()
await asyncio.sleep(self._freq - (start - end).total_seconds())
except asyncio.CancelledError:
return
mainloop = Poller(5)
@mainloop.poll
async def ping_checks(tg: asyncio.TaskGroup, context: dict[str, Any]):
"""Recurring ping checks for configured IP addresses."""
for ip in Netdata.monitor_ips.values():
tg.create_task(ip.ping())
@mainloop.poll
async def firsthop_check(tg: asyncio.TaskGroup, context: dict[str, Any]):
"""Recurring firsthop check."""
# Targeted IP doesn't matter, since TTL is 1
proc = await asyncio.create_subprocess_exec(
"ping", "-c1", "-W1", "-t1", "1.1.1.1", stdout=PIPE
)
stdout, stderr = await proc.communicate()
try:
ip = IPAddress(stdout.decode("utf-8").split("\n")[1].split()[1])
Netdata.gateway = ip
except (IndexError, AddressValueError):
# No firsthop
Netdata.gateway = None
@mainloop.poll
async def route_check(tg: asyncio.TaskGroup, context: dict[str, Any]):
"""Recurring route check."""
proc = await asyncio.create_subprocess_exec(
"ip", "route", "show", "default", stdout=PIPE
)
stdout, stderr = await proc.communicate()
try:
# First line
line = stdout.decode("utf-8").splitlines()[0]
# Get the link name
link = re.search(r"dev\s+(\S+)", line).group(1)
Netdata.route_on = link
except:
Netdata.route_on = None
@mainloop.poll
async def public_ip(tg: asyncio.TaskGroup, context: dict[str, Any]):
"""Recurring public IP retriever."""
# Only query for public IP if it's been at least 1 hour since the last query or our gateway has changed.
refresh = False
if Netdata.public_ip is None:
refresh = True
print("no public IP data, fetching", file=sys.stderr, flush=True)
if context.get("last_gateway") != Netdata.gateway:
refresh = True
print("gateway changed, refreshing public IP", file=sys.stderr, flush=True)
if Netdata.last_ip_pull < datetime.datetime.now() - datetime.timedelta(hours=1):
refresh = True
print(
f"last public IP pull was at {Netdata.last_ip_pull.isoformat()}, refreshing",
file=sys.stderr,
flush=True,
)
if refresh:
async with aiohttp.ClientSession() as session:
async with session.get("https://ipinfo.io/json") as response:
data: dict[str, str] = await response.json()
Netdata.public_ip_meta = data
Netdata.public_ip = IPAddress(data["ip"])
Netdata.last_ip_pull = datetime.datetime.now()
context["last_gateway"] = Netdata.gateway
@mainloop.setup
async def setup_dbus(context: dict[str, Any]) -> MessageBus:
"""Set up the D-Bus connections."""
bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
context["system_bus"] = bus
intro = await bus.introspect(
"org.freedesktop.network1", "/org/freedesktop/network1"
)
context["network_manager"] = bus.get_proxy_object(
"org.freedesktop.network1", "/org/freedesktop/network1", intro
)
context["network_manager_iface"] = context["network_manager"].get_interface(
"org.freedesktop.network1.Manager"
)
context["network_manager_props"] = context["network_manager"].get_interface(
"org.freedesktop.DBus.Properties"
)
context["vpn_manager"] = VPNManager(allow_interactive_authorization=True, bus=bus)
@mainloop.poll
async def global_netstate(tg: asyncio.TaskGroup, context: dict[str, Any]):
"""Recurring global netstate retriever."""
manager = context["network_manager_iface"]
Netdata.net_state = await manager.get_operational_state()
@mainloop.poll
async def interface_status(tg: asyncio.TaskGroup, context: dict[str, Any]):
"""Recurring interface stats generator."""
# Set up D-Bus connection if it's not there already
bus: MessageBus = context["system_bus"]
manager = context["network_manager_iface"]
descr = json.loads(await manager.call_describe())
ifaces = [
{
**iface,
"Addresses": [
IPInterface(
f"{'.'.join(str(octet) for octet in addr['Address'])}/{addr['PrefixLength']}"
)
for addr in iface.get("Addresses", [])
if addr["Family"] == socket.AF_INET
],
}
for iface in descr["Interfaces"]
if not INTERFACE_IGNORE_PATTERN.match(iface["Name"])
]
wlans = [iface for iface in ifaces if iface["Type"] == "wlan"]
Netdata.ssids = [wlan["SSID"] for wlan in wlans if "SSID" in wlan]
Netdata.interfaces = {
iface["Name"]: {
"type": iface["Type"],
"state": iface["OnlineState"],
"addresses": [str(addr) for addr in iface["Addresses"]],
}
for iface in ifaces
}
async def get_vpn_conn(conn: VPNConnection):
"""Populate data for a VPN connection."""
Netdata.vpns[conn._name] = await conn.is_connected()
@mainloop.poll
async def vpn_status(tg: asyncio.TaskGroup, context: dict[str, Any]):
"""Recurring VPN stats generator."""
manager: VPNManager = context["vpn_manager"]
connections = await manager.get_connections()
Netdata.vpns = {}
for connection in connections:
tg.create_task(get_vpn_conn(connection))
@mainloop.after_poll
def output_data(context: dict[str, Any]):
"""Print the data to stdout."""
routable = False
for iface in Netdata.interfaces.values():
if iface["type"] in {"wlan", "ether"} and iface["state"] == "online":
routable = True
break
json.dump(
{
"routable": routable,
"ssids": Netdata.ssids,
"ping_status": {
key: value.reachable for key, value in Netdata.monitor_ips.items()
},
"gateway": str(Netdata.gateway) if Netdata.gateway else None,
"route_on": Netdata.route_on,
"public_ip": Netdata.public_ip_meta,
"operational_state": Netdata.net_state,
"vpn_state": Netdata.vpns,
"interfaces": Netdata.interfaces,
},
sys.stdout,
)
sys.stdout.write("\n")
sys.stdout.flush()
asyncio.run(mainloop.start())