feat: improve performance of processing incoming messages (#228)
This commit is contained in:
@@ -28,7 +28,7 @@ cdef class BaseMessageBus:
|
|||||||
cdef public object unique_name
|
cdef public object unique_name
|
||||||
cdef public object _disconnected
|
cdef public object _disconnected
|
||||||
cdef public object _user_disconnect
|
cdef public object _user_disconnect
|
||||||
cdef public object _method_return_handlers
|
cdef public cython.dict _method_return_handlers
|
||||||
cdef public object _serial
|
cdef public object _serial
|
||||||
cdef public cython.dict _path_exports
|
cdef public cython.dict _path_exports
|
||||||
cdef public cython.list _user_message_handlers
|
cdef public cython.list _user_message_handlers
|
||||||
|
|||||||
@@ -770,7 +770,7 @@ class BaseMessageBus:
|
|||||||
if not msg.serial:
|
if not msg.serial:
|
||||||
msg.serial = self.next_serial()
|
msg.serial = self.next_serial()
|
||||||
|
|
||||||
no_reply_expected = not _expects_reply(msg)
|
no_reply_expected = _expects_reply(msg) is False
|
||||||
# Make sure the return reply handler is installed
|
# Make sure the return reply handler is installed
|
||||||
# before sending the message to avoid a race condition
|
# before sending the message to avoid a race condition
|
||||||
# where the reply is lost in case the backend can
|
# where the reply is lost in case the backend can
|
||||||
@@ -827,6 +827,7 @@ class BaseMessageBus:
|
|||||||
)
|
)
|
||||||
|
|
||||||
def _process_message(self, msg: _Message) -> None:
|
def _process_message(self, msg: _Message) -> None:
|
||||||
|
"""Process a message received from the message bus."""
|
||||||
handled = False
|
handled = False
|
||||||
for user_handler in self._user_message_handlers:
|
for user_handler in self._user_message_handlers:
|
||||||
try:
|
try:
|
||||||
@@ -842,13 +843,9 @@ class BaseMessageBus:
|
|||||||
handled = True
|
handled = True
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
logging.error(
|
logging.exception("A message handler raised an exception: %s", e)
|
||||||
f"A message handler raised an exception: {e}.\n{traceback.format_exc()}"
|
|
||||||
)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(
|
logging.exception("A message handler raised an exception: %s", e)
|
||||||
f"A message handler raised an exception: {e}.\n{traceback.format_exc()}"
|
|
||||||
)
|
|
||||||
if msg.message_type is MESSAGE_TYPE_CALL:
|
if msg.message_type is MESSAGE_TYPE_CALL:
|
||||||
self.send(
|
self.send(
|
||||||
Message.new_error(
|
Message.new_error(
|
||||||
@@ -877,7 +874,7 @@ class BaseMessageBus:
|
|||||||
if msg.message_type is MESSAGE_TYPE_CALL:
|
if msg.message_type is MESSAGE_TYPE_CALL:
|
||||||
if not handled:
|
if not handled:
|
||||||
handler = self._find_message_handler(msg)
|
handler = self._find_message_handler(msg)
|
||||||
if not _expects_reply(msg):
|
if _expects_reply(msg) is False:
|
||||||
if handler:
|
if handler:
|
||||||
handler(msg, BLOCK_UNEXPECTED_REPLY)
|
handler(msg, BLOCK_UNEXPECTED_REPLY)
|
||||||
else:
|
else:
|
||||||
@@ -898,15 +895,16 @@ class BaseMessageBus:
|
|||||||
Message.new_error(
|
Message.new_error(
|
||||||
msg,
|
msg,
|
||||||
ErrorType.UNKNOWN_METHOD,
|
ErrorType.UNKNOWN_METHOD,
|
||||||
f'{msg.interface}.{msg.member} with signature "{msg.signature}" could not be found',
|
f"{msg.interface}.{msg.member} with signature "
|
||||||
|
f'"{msg.signature}" could not be found',
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
# An ERROR or a METHOD_RETURN
|
# An ERROR or a METHOD_RETURN
|
||||||
if msg.reply_serial in self._method_return_handlers:
|
return_handler = self._method_return_handlers.get(msg.reply_serial)
|
||||||
|
if return_handler is not None:
|
||||||
if not handled:
|
if not handled:
|
||||||
return_handler = self._method_return_handlers[msg.reply_serial]
|
|
||||||
return_handler(msg, None)
|
return_handler(msg, None)
|
||||||
del self._method_return_handlers[msg.reply_serial]
|
del self._method_return_handlers[msg.reply_serial]
|
||||||
|
|
||||||
@@ -927,7 +925,7 @@ class BaseMessageBus:
|
|||||||
"""This is the callback that will be called when a method call is."""
|
"""This is the callback that will be called when a method call is."""
|
||||||
args = msg_body_to_args(msg) if msg.unix_fds else msg.body
|
args = msg_body_to_args(msg) if msg.unix_fds else msg.body
|
||||||
result = method_fn(interface, *args)
|
result = method_fn(interface, *args)
|
||||||
if send_reply is BLOCK_UNEXPECTED_REPLY or not _expects_reply(msg):
|
if send_reply is BLOCK_UNEXPECTED_REPLY or _expects_reply(msg) is False:
|
||||||
return
|
return
|
||||||
body, fds = fn_result_to_body(
|
body, fds = fn_result_to_body(
|
||||||
result,
|
result,
|
||||||
|
|||||||
Reference in New Issue
Block a user