feat: refactor message_reader to avoid python wrappers (#147)

This commit is contained in:
J. Nick Koston
2022-11-04 09:12:50 +01:00
committed by GitHub
parent c65bcd7b87
commit b81de4553f
5 changed files with 31 additions and 13 deletions

View File

@@ -106,6 +106,8 @@ cdef class Unmarshaller:
cdef object _int16_unpack cdef object _int16_unpack
cdef object _uint16_unpack cdef object _uint16_unpack
cdef _reset(self)
cpdef reset(self) cpdef reset(self)
@cython.locals( @cython.locals(
@@ -173,6 +175,8 @@ cdef class Unmarshaller:
) )
cdef _read_body(self) cdef _read_body(self)
cdef _unmarshall(self)
cpdef unmarshall(self) cpdef unmarshall(self)
@cython.locals( @cython.locals(

View File

@@ -202,6 +202,13 @@ class Unmarshaller:
def reset(self) -> None: def reset(self) -> None:
"""Reset the unmarshaller to its initial state. """Reset the unmarshaller to its initial state.
Call this before processing a new message.
"""
self._reset()
def _reset(self) -> None:
"""Reset the unmarshaller to its initial state.
Call this before processing a new message. Call this before processing a new message.
""" """
self._unix_fds = [] self._unix_fds = []
@@ -596,6 +603,15 @@ class Unmarshaller:
def unmarshall(self) -> Optional[Message]: def unmarshall(self) -> Optional[Message]:
"""Unmarshall the message. """Unmarshall the message.
The underlying read function will raise BlockingIOError if the
if there are not enough bytes in the buffer. This allows unmarshall
to be resumed when more data comes in over the wire.
"""
return self._unmarshall()
def _unmarshall(self) -> Optional[Message]:
"""Unmarshall the message.
The underlying read function will raise BlockingIOError if the The underlying read function will raise BlockingIOError if the
if there are not enough bytes in the buffer. This allows unmarshall if there are not enough bytes in the buffer. This allows unmarshall
to be resumed when more data comes in over the wire. to be resumed when more data comes in over the wire.

View File

@@ -170,7 +170,6 @@ class MessageBus(BaseMessageBus):
super().__init__(bus_address, bus_type, ProxyObject) super().__init__(bus_address, bus_type, ProxyObject)
self._negotiate_unix_fd = negotiate_unix_fd self._negotiate_unix_fd = negotiate_unix_fd
self._loop = asyncio.get_running_loop() self._loop = asyncio.get_running_loop()
self._unmarshaller = self._create_unmarshaller()
self._writer = _MessageWriter(self) self._writer = _MessageWriter(self)
@@ -201,7 +200,8 @@ class MessageBus(BaseMessageBus):
self._loop.add_reader( self._loop.add_reader(
self._fd, self._fd,
build_message_reader( build_message_reader(
self._unmarshaller, self._stream,
self._sock if self._negotiate_unix_fd else None,
self._process_message, self._process_message,
self._finalize, self._finalize,
), ),
@@ -477,12 +477,6 @@ class MessageBus(BaseMessageBus):
except Exception: except Exception:
logging.warning("could not close socket", exc_info=True) logging.warning("could not close socket", exc_info=True)
def _create_unmarshaller(self) -> Unmarshaller:
sock = None
if self._negotiate_unix_fd:
sock = self._sock
return Unmarshaller(self._stream, sock)
def _finalize(self, err: Optional[Exception] = None) -> None: def _finalize(self, err: Optional[Exception] = None) -> None:
try: try:
self._loop.remove_reader(self._fd) self._loop.remove_reader(self._fd)

View File

@@ -1,3 +1,5 @@
"""cdefs for message_reader.py""" """cdefs for message_reader.py"""
import cython import cython
from .._private.unmarshaller cimport Unmarshaller

View File

@@ -1,4 +1,6 @@
import io
import logging import logging
import socket
import traceback import traceback
from typing import Callable, Optional from typing import Callable, Optional
@@ -7,19 +9,19 @@ from ..message import Message
def build_message_reader( def build_message_reader(
unmarshaller: Unmarshaller, stream: io.BufferedRWPair,
sock: Optional[socket.socket],
process: Callable[[Message], None], process: Callable[[Message], None],
finalize: Callable[[Optional[Exception]], None], finalize: Callable[[Optional[Exception]], None],
) -> None: ) -> None:
"""Build a callable that reads messages from the unmarshaller and passes them to the process function.""" """Build a callable that reads messages from the unmarshaller and passes them to the process function."""
unmarshall = unmarshaller.unmarshall unmarshaller = Unmarshaller(stream, sock)
reset = unmarshaller.reset
def _message_reader() -> None: def _message_reader() -> None:
"""Reads messages from the unmarshaller and passes them to the process function.""" """Reads messages from the unmarshaller and passes them to the process function."""
try: try:
while True: while True:
message = unmarshall() message = unmarshaller._unmarshall()
if not message: if not message:
return return
try: try:
@@ -28,7 +30,7 @@ def build_message_reader(
logging.error( logging.error(
f"got unexpected error processing a message: {e}.\n{traceback.format_exc()}" f"got unexpected error processing a message: {e}.\n{traceback.format_exc()}"
) )
reset() unmarshaller._reset()
except Exception as e: except Exception as e:
finalize(e) finalize(e)