dbus-fast/tests/test_fd_passing.py
pre-commit-ci[bot] bfc8151005
chore(pre-commit.ci): pre-commit autoupdate (#427)
* chore(pre-commit.ci): pre-commit autoupdate

updates:
- [github.com/astral-sh/ruff-pre-commit: v0.1.0 → v0.11.0](https://github.com/astral-sh/ruff-pre-commit/compare/v0.1.0...v0.11.0)

* chore(pre-commit.ci): auto fixes

* chore: fix violations

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: J. Nick Koston <nick@koston.org>
2025-03-17 11:01:24 -10:00

376 lines
11 KiB
Python

"""This tests the ability to send and receive file descriptors in dbus messages"""
import os
import pytest
from dbus_fast import Message, MessageType
from dbus_fast.aio import MessageBus
from dbus_fast.service import ServiceInterface, dbus_property, method, signal
from dbus_fast.signature import SignatureTree, Variant
def open_file():
return os.open(os.devnull, os.O_RDONLY)
class ExampleInterface(ServiceInterface):
def __init__(self, name):
super().__init__(name)
self.fds = []
@method()
def ReturnsFd(self) -> "h":
fd = open_file()
self.fds.append(fd)
return fd
@method()
def AcceptsFd(self, fd: "h"):
assert fd != 0
self.fds.append(fd)
def get_last_fd(self):
return self.fds[-1]
def cleanup(self):
for fd in self.fds:
os.close(fd)
self.fds.clear()
@signal()
def SignalFd(self) -> "h":
fd = open_file()
self.fds.append(fd)
return fd
@dbus_property()
def PropFd(self) -> "h":
if not self.fds:
fd = open_file()
self.fds.append(fd)
return self.fds[-1]
@PropFd.setter
def PropFd(self, fd: "h"):
assert fd
self.fds.append(fd)
def assert_fds_equal(fd1, fd2):
assert fd1
assert fd2
stat1 = os.fstat(fd1)
stat2 = os.fstat(fd2)
assert stat1.st_dev == stat2.st_dev
assert stat1.st_ino == stat2.st_ino
assert stat1.st_rdev == stat2.st_rdev
@pytest.mark.asyncio
async def test_sending_file_descriptor_low_level():
bus1 = await MessageBus(negotiate_unix_fd=True).connect()
bus2 = await MessageBus(negotiate_unix_fd=True).connect()
fd_before = open_file()
fd_after = None
msg = Message(
destination=bus1.unique_name,
path="/org/test/path",
interface="org.test.iface",
member="SomeMember",
body=[0],
signature="h",
unix_fds=[fd_before],
)
def message_handler(sent):
nonlocal fd_after
if sent.sender == bus2.unique_name and sent.serial == msg.serial:
assert sent.path == msg.path
assert sent.serial == msg.serial
assert sent.interface == msg.interface
assert sent.member == msg.member
assert sent.body == [0]
assert len(sent.unix_fds) == 1
fd_after = sent.unix_fds[0]
bus1.send(Message.new_method_return(sent, "s", ["got it"]))
bus1.remove_message_handler(message_handler)
return True
bus1.add_message_handler(message_handler)
reply = await bus2.call(msg)
assert reply.body == ["got it"]
assert fd_after is not None
assert_fds_equal(fd_before, fd_after)
for fd in [fd_before, fd_after]:
os.close(fd)
for bus in [bus1, bus2]:
bus.disconnect()
@pytest.mark.asyncio
async def test_high_level_service_fd_passing(event_loop):
bus1 = await MessageBus(negotiate_unix_fd=True).connect()
bus2 = await MessageBus(negotiate_unix_fd=True).connect()
interface_name = "test.interface"
interface = ExampleInterface(interface_name)
export_path = "/test/path"
async def call(member, signature="", body=[], unix_fds=[], iface=interface.name):
return await bus2.call(
Message(
destination=bus1.unique_name,
path=export_path,
interface=iface,
member=member,
signature=signature,
body=body,
unix_fds=unix_fds,
)
)
bus1.export(export_path, interface)
# test that an fd can be returned by the service
reply = await call("ReturnsFd")
assert reply.message_type == MessageType.METHOD_RETURN, reply.body
assert reply.signature == "h"
assert len(reply.unix_fds) == 1
assert_fds_equal(interface.get_last_fd(), reply.unix_fds[0])
interface.cleanup()
os.close(reply.unix_fds[0])
# test that an fd can be sent to the service
fd = open_file()
reply = await call("AcceptsFd", signature="h", body=[0], unix_fds=[fd])
assert reply.message_type == MessageType.METHOD_RETURN, reply.body
assert_fds_equal(interface.get_last_fd(), fd)
interface.cleanup()
os.close(fd)
# signals
fut = event_loop.create_future()
def fd_listener(msg):
if msg.sender == bus1.unique_name and msg.message_type == MessageType.SIGNAL:
fut.set_result(msg)
reply = await bus2.call(
Message(
destination="org.freedesktop.DBus",
path="/org/freedesktop/DBus",
member="AddMatch",
signature="s",
body=[f"sender='{bus1.unique_name}'"],
)
)
assert reply.message_type == MessageType.METHOD_RETURN
bus2.add_message_handler(fd_listener)
interface.SignalFd()
reply = await fut
assert len(reply.unix_fds) == 1
assert reply.body == [0]
assert_fds_equal(reply.unix_fds[0], interface.get_last_fd())
interface.cleanup()
os.close(reply.unix_fds[0])
# properties
reply = await call(
"Get", "ss", [interface_name, "PropFd"], iface="org.freedesktop.DBus.Properties"
)
assert reply.message_type == MessageType.METHOD_RETURN, reply.body
assert reply.body[0].signature == "h"
assert reply.body[0].value == 0
assert len(reply.unix_fds) == 1
assert_fds_equal(interface.get_last_fd(), reply.unix_fds[0])
interface.cleanup()
os.close(reply.unix_fds[0])
fd = open_file()
reply = await call(
"Set",
"ssv",
[interface_name, "PropFd", Variant("h", 0)],
iface="org.freedesktop.DBus.Properties",
unix_fds=[fd],
)
assert reply.message_type == MessageType.METHOD_RETURN, reply.body
assert_fds_equal(interface.get_last_fd(), fd)
interface.cleanup()
os.close(fd)
reply = await call(
"GetAll", "s", [interface_name], iface="org.freedesktop.DBus.Properties"
)
assert reply.message_type == MessageType.METHOD_RETURN, reply.body
assert reply.body[0]["PropFd"].signature == "h"
assert reply.body[0]["PropFd"].value == 0
assert len(reply.unix_fds) == 1
assert_fds_equal(interface.get_last_fd(), reply.unix_fds[0])
interface.cleanup()
os.close(reply.unix_fds[0])
for bus in [bus1, bus2]:
bus.disconnect()
@pytest.mark.asyncio
async def test_sending_file_descriptor_with_proxy(event_loop):
name = "dbus.next.test.service"
path = "/test/path"
interface_name = "test.interface"
bus = await MessageBus(negotiate_unix_fd=True).connect()
interface = ExampleInterface(interface_name)
bus.export(path, interface)
await bus.request_name(name)
intr = await bus.introspect(name, path)
proxy = bus.get_proxy_object(name, path, intr)
proxy_interface = proxy.get_interface(interface_name)
# test fds are replaced correctly in all high level interfaces
fd = await proxy_interface.call_returns_fd()
assert_fds_equal(interface.get_last_fd(), fd)
interface.cleanup()
os.close(fd)
fd = open_file()
await proxy_interface.call_accepts_fd(fd)
assert_fds_equal(interface.get_last_fd(), fd)
interface.cleanup()
os.close(fd)
fd = await proxy_interface.get_prop_fd()
assert_fds_equal(interface.get_last_fd(), fd)
interface.cleanup()
os.close(fd)
fd = open_file()
await proxy_interface.set_prop_fd(fd)
assert_fds_equal(interface.get_last_fd(), fd)
interface.cleanup()
os.close(fd)
fut = event_loop.create_future()
def on_signal_fd(fd):
fut.set_result(fd)
proxy_interface.off_signal_fd(on_signal_fd)
proxy_interface.on_signal_fd(on_signal_fd)
interface.SignalFd()
fd = await fut
assert_fds_equal(interface.get_last_fd(), fd)
interface.cleanup()
os.close(fd)
bus.disconnect()
@pytest.mark.asyncio
@pytest.mark.parametrize(
"result, out_signature, expected",
[
pytest.param(5, "h", ([0], [5]), id='Signature: "h"'),
pytest.param([5, "foo"], "hs", ([0, "foo"], [5]), id='Signature: "hs"'),
pytest.param([5, 7], "hh", ([0, 1], [5, 7]), id='Signature: "hh"'),
pytest.param([5, 7], "ah", ([[0, 1]], [5, 7]), id='Signature: "ah"'),
pytest.param([9], "ah", ([[0]], [9]), id='Signature: "ah"'),
pytest.param([3], "(h)", ([[0]], [3]), id='Signature: "(h)"'),
pytest.param([3, "foo"], "(hs)", ([[0, "foo"]], [3]), id='Signature: "(hs)"'),
pytest.param(
[[7, "foo"], [8, "bar"]],
"a(hs)",
([[[0, "foo"], [1, "bar"]]], [7, 8]),
id='Signature: "a(hs)"',
),
pytest.param({"foo": 3}, "a{sh}", ([{"foo": 0}], [3]), id='Signature: "a{sh}"'),
pytest.param(
{"foo": 3, "bar": 6},
"a{sh}",
([{"foo": 0, "bar": 1}], [3, 6]),
id='Signature: "a{sh}"',
),
pytest.param(
{"foo": [3, 8]},
"a{sah}",
([{"foo": [0, 1]}], [3, 8]),
id='Signature: "a{sah}"',
),
pytest.param(
{"foo": Variant("t", 100)},
"a{sv}",
([{"foo": Variant("t", 100)}], []),
id='Signature: "a{sv}"',
),
pytest.param(
["one", ["two", [Variant("s", "three")]]],
"(s(s(v)))",
([["one", ["two", [Variant("s", "three")]]]], []),
id='Signature: "(s(s(v)))"',
),
pytest.param(
Variant("h", 2), "v", ([Variant("h", 0)], [2]), id='Variant with: "h"'
),
pytest.param(
Variant("(hh)", [2, 8]),
"v",
([Variant("(hh)", [0, 1])], [2, 8]),
id='Variant with: "(hh)"',
),
pytest.param(
Variant("ah", [2, 4]),
"v",
([Variant("ah", [0, 1])], [2, 4]),
id='Variant with: "ah"',
),
pytest.param(
Variant("(ss)", ["hello", "world"]),
"v",
([Variant("(ss)", ["hello", "world"])], []),
id='Variant with: "(ss)"',
),
pytest.param(
Variant("v", Variant("t", 100)),
"v",
([Variant("v", Variant("t", 100))], []),
id='Variant with: "v"',
),
pytest.param(
[
Variant("v", Variant("(ss)", ["hello", "world"])),
{"foo": Variant("t", 100)},
["one", ["two", [Variant("s", "three")]]],
],
"va{sv}(s(s(v)))",
(
[
Variant("v", Variant("(ss)", ["hello", "world"])),
{"foo": Variant("t", 100)},
["one", ["two", [Variant("s", "three")]]],
],
[],
),
id='Variant with: "va{sv}(s(s(v)))"',
),
],
)
async def test_fn_result_to_body(result, out_signature, expected):
out_signature_tree = SignatureTree(out_signature)
assert ServiceInterface._fn_result_to_body(result, out_signature_tree) == expected