fix: messages could be sent out of order if they had to queue (#225)

This commit is contained in:
J. Nick Koston 2023-08-17 14:58:49 -05:00 committed by GitHub
parent 8cb0a62826
commit 4051cf283f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -4,7 +4,7 @@ import logging
import socket
from collections import deque
from copy import copy
from typing import Any, Optional
from typing import Any, List, Optional, Tuple
from .. import introspection as intr
from ..auth import Authenticator, AuthExternal
@ -37,28 +37,38 @@ def _future_set_result(fut: asyncio.Future, result: Any) -> None:
class _MessageWriter:
"""A class to handle writing messages to the message bus."""
def __init__(self, bus: "MessageBus") -> None:
self.messages = deque()
"""A class to handle writing messages to the message bus."""
self.messages: deque[
Tuple[bytearray, Optional[List[int]], Optional[asyncio.Future]]
] = deque()
self.negotiate_unix_fd = bus._negotiate_unix_fd
self.bus = bus
self.sock = bus._sock
self.loop = bus._loop
self.buf = None
self.buf: Optional[memoryview] = None
self.fd = bus._fd
self.offset = 0
self.unix_fds = None
self.unix_fds: Optional[List[int]] = None
self.fut: Optional[asyncio.Future] = None
def write_callback(self, remove_writer: bool = True) -> None:
"""The callback to write messages to the message bus."""
sock = self.sock
try:
while True:
if self.buf is None:
# If there is no buffer, get the next message
if not self.messages:
# nothing more to write
if remove_writer:
self.loop.remove_writer(self.fd)
return
buf, unix_fds, fut = self.messages.pop()
# Get the next message
buf, unix_fds, fut = self.messages.popleft()
self.unix_fds = unix_fds
self.buf = memoryview(buf)
self.offset = 0
@ -72,18 +82,18 @@ class _MessageWriter:
array.array("i", self.unix_fds),
)
]
self.offset += self.sock.sendmsg([self.buf[self.offset :]], ancdata)
self.offset += sock.sendmsg([self.buf[self.offset :]], ancdata)
self.unix_fds = None
else:
self.offset += self.sock.send(self.buf[self.offset :])
self.offset += sock.send(self.buf[self.offset :])
if self.offset >= len(self.buf):
# finished writing
self.buf = None
_future_set_result(self.fut, None)
else:
if self.offset < len(self.buf):
# wait for writable
return
# finished writing
self.buf = None
_future_set_result(self.fut, None)
except Exception as e:
if self.bus._user_disconnect:
_future_set_result(self.fut, None)
@ -91,11 +101,15 @@ class _MessageWriter:
_future_set_exception(self.fut, e)
self.bus._finalize(e)
def buffer_message(self, msg: Message, future=None) -> None:
def buffer_message(
self, msg: Message, future: Optional[asyncio.Future] = None
) -> None:
"""Buffer a message to be sent later."""
unix_fds = msg.unix_fds
self.messages.append(
(
msg._marshall(self.negotiate_unix_fd),
copy(msg.unix_fds),
copy(unix_fds) if unix_fds else None,
future,
)
)
@ -104,10 +118,14 @@ class _MessageWriter:
"""Call the write callback without removing the writer."""
self.write_callback(remove_writer=False)
def schedule_write(self, msg: Optional[Message] = None, future=None) -> None:
def schedule_write(
self, msg: Optional[Message] = None, future: Optional[asyncio.Future] = None
) -> None:
"""Schedule a message to be written."""
queue_is_empty = not self.messages
if msg is not None:
self.buffer_message(msg, future)
if self.bus.unique_name:
# Optimization: try to send now if the queue
# is empty. With bleak this usually means we
@ -115,6 +133,7 @@ class _MessageWriter:
# is a huge improvement in latency.
if queue_is_empty:
self._write_without_remove_writer()
if (
self.buf is not None
or self.messages