feat: improve aio message reader performance (#104)

This commit is contained in:
J. Nick Koston 2022-10-11 17:59:37 -10:00 committed by GitHub
parent c3fcc9eda1
commit 9fa697da65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 48 additions and 19 deletions

View File

@ -22,6 +22,7 @@ def build(setup_kwargs):
dict( dict(
ext_modules=cythonize( ext_modules=cythonize(
[ [
"src/dbus_fast/aio/message_reader.py",
"src/dbus_fast/message.py", "src/dbus_fast/message.py",
"src/dbus_fast/signature.py", "src/dbus_fast/signature.py",
"src/dbus_fast/unpack.py", "src/dbus_fast/unpack.py",

View File

@ -24,6 +24,7 @@ from ..errors import AuthError
from ..message import Message from ..message import Message
from ..message_bus import BaseMessageBus from ..message_bus import BaseMessageBus
from ..service import ServiceInterface from ..service import ServiceInterface
from .message_reader import build_message_reader
from .proxy_object import ProxyObject from .proxy_object import ProxyObject
@ -193,7 +194,14 @@ class MessageBus(BaseMessageBus):
future = self._loop.create_future() future = self._loop.create_future()
self._loop.add_reader(self._fd, self._message_reader) self._loop.add_reader(
self._fd,
build_message_reader(
self._unmarshaller,
self._process_message,
self._finalize,
),
)
def on_hello(reply, err): def on_hello(reply, err):
try: try:
@ -419,24 +427,6 @@ class MessageBus(BaseMessageBus):
return handler return handler
def _message_reader(self) -> None:
unmarshaller = self._unmarshaller
try:
while True:
message = unmarshaller.unmarshall()
if message:
try:
self._process_message(message)
except Exception as e:
logging.error(
f"got unexpected error processing a message: {e}.\n{traceback.format_exc()}"
)
unmarshaller.reset()
else:
break
except Exception as e:
self._finalize(e)
async def _auth_readline(self) -> str: async def _auth_readline(self) -> str:
buf = b"" buf = b""
while buf[-2:] != b"\r\n": while buf[-2:] != b"\r\n":

View File

@ -0,0 +1,3 @@
"""cdefs for message_reader.py"""
import cython

View File

@ -0,0 +1,35 @@
import logging
import traceback
from typing import Callable, Optional
from .._private.unmarshaller import Unmarshaller
from ..message import Message
def build_message_reader(
unmarshaller: Unmarshaller,
process: Callable[[Message], None],
finalize: Callable[[Optional[Exception]], None],
) -> None:
"""Build a callable that reads messages from the unmarshaller and passes them to the process function."""
unmarshall = unmarshaller.unmarshall
reset = unmarshaller.reset
def _message_reader() -> None:
"""Reads messages from the unmarshaller and passes them to the process function."""
try:
while True:
message = unmarshall()
if not message:
return
try:
process(message)
except Exception as e:
logging.error(
f"got unexpected error processing a message: {e}.\n{traceback.format_exc()}"
)
reset()
except Exception as e:
finalize(e)
return _message_reader