feat: improve writer performance with a deque (#30)
This commit is contained in:
@@ -2,7 +2,7 @@ import array
|
|||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import socket
|
import socket
|
||||||
from asyncio import Queue
|
from collections import deque
|
||||||
from copy import copy
|
from copy import copy
|
||||||
from typing import Any, Optional
|
from typing import Any, Optional
|
||||||
|
|
||||||
@@ -36,7 +36,7 @@ def _future_set_result(fut: asyncio.Future, result: Any) -> None:
|
|||||||
|
|
||||||
class _MessageWriter:
|
class _MessageWriter:
|
||||||
def __init__(self, bus: "MessageBus") -> None:
|
def __init__(self, bus: "MessageBus") -> None:
|
||||||
self.messages = Queue()
|
self.messages = deque()
|
||||||
self.negotiate_unix_fd = bus._negotiate_unix_fd
|
self.negotiate_unix_fd = bus._negotiate_unix_fd
|
||||||
self.bus = bus
|
self.bus = bus
|
||||||
self.sock = bus._sock
|
self.sock = bus._sock
|
||||||
@@ -51,12 +51,12 @@ class _MessageWriter:
|
|||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
if self.buf is None:
|
if self.buf is None:
|
||||||
if self.messages.qsize() == 0:
|
if not self.messages:
|
||||||
# nothing more to write
|
# nothing more to write
|
||||||
if remove_writer:
|
if remove_writer:
|
||||||
self.loop.remove_writer(self.fd)
|
self.loop.remove_writer(self.fd)
|
||||||
return
|
return
|
||||||
buf, unix_fds, fut = self.messages.get_nowait()
|
buf, unix_fds, fut = self.messages.pop()
|
||||||
self.unix_fds = unix_fds
|
self.unix_fds = unix_fds
|
||||||
self.buf = memoryview(buf)
|
self.buf = memoryview(buf)
|
||||||
self.offset = 0
|
self.offset = 0
|
||||||
@@ -90,7 +90,7 @@ class _MessageWriter:
|
|||||||
self.bus._finalize(e)
|
self.bus._finalize(e)
|
||||||
|
|
||||||
def buffer_message(self, msg: Message, future=None):
|
def buffer_message(self, msg: Message, future=None):
|
||||||
self.messages.put_nowait(
|
self.messages.append(
|
||||||
(
|
(
|
||||||
msg._marshall(negotiate_unix_fd=self.negotiate_unix_fd),
|
msg._marshall(negotiate_unix_fd=self.negotiate_unix_fd),
|
||||||
copy(msg.unix_fds),
|
copy(msg.unix_fds),
|
||||||
@@ -103,7 +103,7 @@ class _MessageWriter:
|
|||||||
self.write_callback(remove_writer=False)
|
self.write_callback(remove_writer=False)
|
||||||
|
|
||||||
def schedule_write(self, msg: Message = None, future=None):
|
def schedule_write(self, msg: Message = None, future=None):
|
||||||
queue_is_empty = self.messages.qsize() == 0
|
queue_is_empty = not self.messages
|
||||||
if msg is not None:
|
if msg is not None:
|
||||||
self.buffer_message(msg, future)
|
self.buffer_message(msg, future)
|
||||||
if self.bus.unique_name:
|
if self.bus.unique_name:
|
||||||
@@ -115,7 +115,7 @@ class _MessageWriter:
|
|||||||
self._write_without_remove_writer()
|
self._write_without_remove_writer()
|
||||||
if (
|
if (
|
||||||
self.buf is not None
|
self.buf is not None
|
||||||
or self.messages.qsize() != 0
|
or self.messages
|
||||||
or not self.fut
|
or not self.fut
|
||||||
or not self.fut.done()
|
or not self.fut.done()
|
||||||
):
|
):
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
import sys
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from dbus_fast import (
|
from dbus_fast import (
|
||||||
@@ -68,6 +70,7 @@ async def test_name_requests():
|
|||||||
bus2.disconnect()
|
bus2.disconnect()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skipif(sys.version_info[:3][1] == 10, reason="segfaults on py3.10")
|
||||||
@pytest.mark.skipif(not has_gi, reason=skip_reason_no_gi)
|
@pytest.mark.skipif(not has_gi, reason=skip_reason_no_gi)
|
||||||
def test_request_name_glib():
|
def test_request_name_glib():
|
||||||
test_name = "glib.test.request.name"
|
test_name = "glib.test.request.name"
|
||||||
|
|||||||
Reference in New Issue
Block a user