diff --git a/build.py b/build.py index c9f9131..cc8bc17 100644 --- a/build.py +++ b/build.py @@ -22,6 +22,7 @@ def build(setup_kwargs): dict( ext_modules=cythonize( [ + "src/dbus_fast/aio/message_reader.py", "src/dbus_fast/message.py", "src/dbus_fast/signature.py", "src/dbus_fast/unpack.py", diff --git a/src/dbus_fast/aio/message_bus.py b/src/dbus_fast/aio/message_bus.py index 54f1c2c..b0941f6 100644 --- a/src/dbus_fast/aio/message_bus.py +++ b/src/dbus_fast/aio/message_bus.py @@ -24,6 +24,7 @@ from ..errors import AuthError from ..message import Message from ..message_bus import BaseMessageBus from ..service import ServiceInterface +from .message_reader import build_message_reader from .proxy_object import ProxyObject @@ -193,7 +194,14 @@ class MessageBus(BaseMessageBus): future = self._loop.create_future() - self._loop.add_reader(self._fd, self._message_reader) + self._loop.add_reader( + self._fd, + build_message_reader( + self._unmarshaller, + self._process_message, + self._finalize, + ), + ) def on_hello(reply, err): try: @@ -419,24 +427,6 @@ class MessageBus(BaseMessageBus): return handler - def _message_reader(self) -> None: - unmarshaller = self._unmarshaller - try: - while True: - message = unmarshaller.unmarshall() - if message: - try: - self._process_message(message) - except Exception as e: - logging.error( - f"got unexpected error processing a message: {e}.\n{traceback.format_exc()}" - ) - unmarshaller.reset() - else: - break - except Exception as e: - self._finalize(e) - async def _auth_readline(self) -> str: buf = b"" while buf[-2:] != b"\r\n": diff --git a/src/dbus_fast/aio/message_reader.pxd b/src/dbus_fast/aio/message_reader.pxd new file mode 100644 index 0000000..a53a8e3 --- /dev/null +++ b/src/dbus_fast/aio/message_reader.pxd @@ -0,0 +1,3 @@ +"""cdefs for message_reader.py""" + +import cython diff --git a/src/dbus_fast/aio/message_reader.py b/src/dbus_fast/aio/message_reader.py new file mode 100644 index 0000000..4da1f08 --- /dev/null +++ b/src/dbus_fast/aio/message_reader.py @@ -0,0 +1,35 @@ +import logging +import traceback +from typing import Callable, Optional + +from .._private.unmarshaller import Unmarshaller +from ..message import Message + + +def build_message_reader( + unmarshaller: Unmarshaller, + process: Callable[[Message], None], + finalize: Callable[[Optional[Exception]], None], +) -> None: + """Build a callable that reads messages from the unmarshaller and passes them to the process function.""" + unmarshall = unmarshaller.unmarshall + reset = unmarshaller.reset + + def _message_reader() -> None: + """Reads messages from the unmarshaller and passes them to the process function.""" + try: + while True: + message = unmarshall() + if not message: + return + try: + process(message) + except Exception as e: + logging.error( + f"got unexpected error processing a message: {e}.\n{traceback.format_exc()}" + ) + reset() + except Exception as e: + finalize(e) + + return _message_reader