#!/usr/bin/env python3 import asyncio import signal from contextlib import suppress import pulsectl_asyncio import json import sys import os async def get_values(pulse): sink = None try: sink = await pulse.get_sink_by_name("@DEFAULT_SINK@") source = await pulse.get_source_by_name("@DEFAULT_SOURCE@") except: # reload the script in the event of an exception os.execv(sys.argv[0], sys.argv) sink_result = { "mute": sink.mute == 1, "volume": int(sink.volume.value_flat * 100 + 0.5), } source_result = { "mute": source.mute == 1, "volume": int(source.volume.value_flat * 100 + 0.5), } result = {"output": sink_result, "input": source_result} print(json.dumps(result), flush=True) return result async def listen(): async with pulsectl_asyncio.PulseAsync("volume-monitor") as pulse: await get_values(pulse) async for _ in pulse.subscribe_events("all"): await get_values(pulse) async def main(): listen_task = asyncio.create_task(listen()) loop = asyncio.get_running_loop() for sig in (signal.SIGTERM, signal.SIGHUP, signal.SIGINT): loop.add_signal_handler(sig, listen_task.cancel) with suppress(asyncio.CancelledError): await listen_task asyncio.run(main())