dbus-fast/examples/aio-tcp-notification.py
David Lechner d71f0e1fbd
use asyncio.run() in examples
calling `asyncio.get_running_loop()` will fail if there is no running event loop, so we should use `asyncio.run()` instead to create a new loop.

Also, use events for infinite waiting instead of futures since there is no return value.
2022-09-26 12:50:22 -05:00

36 lines
1.2 KiB
Python
Executable File

#!/usr/bin/env python3
# In order for this to work a local tcp connection to the DBus a port
# must be opened to forward to the dbus socket file. The easiest way
# to achieve this is using "socat":
# socat TCP-LISTEN:55556,reuseaddr,fork,range=127.0.0.1/32 UNIX-CONNECT:$(echo $DBUS_SESSION_BUS_ADDRESS | sed 's/unix:path=//g')
# For actual DBus transport over network the authentication might
# be a further problem. More information here:
# https://dbus.freedesktop.org/doc/dbus-specification.html#auth-mechanisms
import os
import sys
sys.path.append(os.path.abspath(os.path.dirname(__file__) + "/.."))
import asyncio
from dbus_fast.aio import MessageBus
async def main():
bus = await MessageBus(bus_address="tcp:host=127.0.0.1,port=55556").connect()
introspection = await bus.introspect(
"org.freedesktop.Notifications", "/org/freedesktop/Notifications"
)
obj = bus.get_proxy_object(
"org.freedesktop.Notifications", "/org/freedesktop/Notifications", introspection
)
notification = obj.get_interface("org.freedesktop.Notifications")
await notification.call_notify(
"test.py", 0, "", "DBus Test", "Test notification", [""], dict(), 5000
)
asyncio.run(main())