feat: reduce duplicate code in aio MessageBus (#272)

This commit is contained in:
J. Nick Koston 2023-12-03 16:24:02 -10:00 committed by GitHub
parent e30c477eb0
commit 502ab0d47f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -4,6 +4,7 @@ import logging
import socket
from collections import deque
from copy import copy
from functools import partial
from typing import Any, Callable, List, Optional, Set, Tuple
from .. import introspection as intr
@ -288,13 +289,7 @@ class MessageBus(BaseMessageBus):
"""
future = self._loop.create_future()
def reply_handler(reply: Any, err: Exception) -> None:
if err:
_future_set_exception(future, err)
else:
_future_set_result(future, reply)
super().introspect(bus_name, path, reply_handler)
super().introspect(bus_name, path, partial(self._reply_handler, future))
timer_handle = self._loop.call_later(
timeout, _future_set_exception, future, asyncio.TimeoutError
@ -326,13 +321,7 @@ class MessageBus(BaseMessageBus):
"""
future = self._loop.create_future()
def reply_handler(reply, err):
if err:
_future_set_exception(future, err)
else:
_future_set_result(future, reply)
super().request_name(name, flags, reply_handler)
super().request_name(name, flags, partial(self._reply_handler, future))
return await future
@ -354,13 +343,7 @@ class MessageBus(BaseMessageBus):
"""
future = self._loop.create_future()
def reply_handler(reply, err):
if err:
_future_set_exception(future, err)
else:
_future_set_result(future, reply)
super().release_name(name, reply_handler)
super().release_name(name, partial(self._reply_handler, future))
return await future
@ -387,14 +370,7 @@ class MessageBus(BaseMessageBus):
future = self._loop.create_future()
def reply_handler(reply, err):
if not future.done():
if err:
_future_set_exception(future, err)
else:
_future_set_result(future, reply)
self._call(msg, reply_handler)
self._call(msg, partial(self._reply_handler, future))
await future
@ -546,3 +522,12 @@ class MessageBus(BaseMessageBus):
_future_set_exception(self._disconnect_future, err)
else:
_future_set_result(self._disconnect_future, None)
def _reply_handler(
self, future: asyncio.Future, reply: Optional[Any], err: Optional[Exception]
) -> None:
"""The reply handler for method calls."""
if err:
_future_set_exception(future, err)
else:
_future_set_result(future, reply)