fix: correctly handle big endian data (#405)
This commit is contained in:
parent
81ce03058a
commit
0adab935e3
@ -72,6 +72,9 @@ plugins = ["covdefaults"]
|
||||
|
||||
[tool.coverage.report]
|
||||
fail_under = 30 # cython version will have low cover because we do not have Cython tracing
|
||||
exclude_also = [
|
||||
"if cython.compiled:",
|
||||
]
|
||||
|
||||
[tool.isort]
|
||||
profile = "black"
|
||||
|
||||
@ -115,24 +115,19 @@ cdef cython.uint EWOULDBLOCK
|
||||
cdef get_signature_tree
|
||||
|
||||
|
||||
cdef inline unsigned long _cast_uint32_native(const char * payload, unsigned int offset):
|
||||
cdef unsigned long *u32p = <unsigned long *> &payload[offset]
|
||||
return u32p[0]
|
||||
cdef unsigned long _ustr_uint32(const unsigned char * buf, unsigned int offset, unsigned int endian) noexcept
|
||||
|
||||
cdef inline short _cast_int16_native(const char * payload, unsigned int offset):
|
||||
cdef short *s16p = <short *> &payload[offset]
|
||||
return s16p[0]
|
||||
|
||||
cdef inline unsigned short _cast_uint16_native(const char * payload, unsigned int offset):
|
||||
cdef unsigned short *u16p = <unsigned short *> &payload[offset]
|
||||
return u16p[0]
|
||||
cdef short _ustr_int16(const unsigned char * buf, unsigned int offset, unsigned int endian) noexcept
|
||||
|
||||
cdef unsigned short _ustr_uint16(const unsigned char * buf, unsigned int offset, unsigned int endian) noexcept
|
||||
|
||||
|
||||
cdef class Unmarshaller:
|
||||
|
||||
cdef object _unix_fds
|
||||
cdef bytearray _buf
|
||||
cdef Py_ssize_t _buf_len
|
||||
cdef const unsigned char * _buf_ustr
|
||||
cdef unsigned int _pos
|
||||
cdef object _stream
|
||||
cdef object _sock
|
||||
@ -144,16 +139,17 @@ cdef class Unmarshaller:
|
||||
cdef object _message_type
|
||||
cdef object _flag
|
||||
cdef unsigned int _msg_len
|
||||
cdef unsigned int _is_native
|
||||
cdef object _uint32_unpack
|
||||
cdef object _int16_unpack
|
||||
cdef object _uint16_unpack
|
||||
cdef object _stream_reader
|
||||
cdef object _sock_reader
|
||||
cdef object _sock_with_fds_reader
|
||||
cdef object _sock_without_fds_reader
|
||||
cdef bint _negotiate_unix_fd
|
||||
cdef bint _read_complete
|
||||
cdef unsigned int _endian
|
||||
|
||||
@cython.locals(to_clear=Py_ssize_t)
|
||||
cdef _next_message(self)
|
||||
|
||||
cdef bint _has_another_message_in_buffer(self)
|
||||
@ -232,6 +228,7 @@ cdef class Unmarshaller:
|
||||
buffer=cython.bytearray,
|
||||
protocol_version=cython.uint,
|
||||
key=cython.str,
|
||||
ustring="const unsigned char *",
|
||||
)
|
||||
cdef _read_header(self)
|
||||
|
||||
|
||||
@ -186,9 +186,52 @@ try:
|
||||
import cython
|
||||
except ImportError:
|
||||
from ._cython_compat import FAKE_CYTHON as cython
|
||||
int_ = int
|
||||
bytearray_ = bytearray
|
||||
|
||||
|
||||
def is_compiled() -> bool:
|
||||
return cython.compiled
|
||||
|
||||
|
||||
def _ustr_uint32(buf: bytearray_, pos: int_, endian: int_) -> int_:
|
||||
if endian == LITTLE_ENDIAN:
|
||||
return (
|
||||
buf[pos] | (buf[pos + 1] << 8) | (buf[pos + 2] << 16) | (buf[pos + 3] << 24)
|
||||
)
|
||||
return buf[pos + 3] | (buf[pos + 2] << 8) | (buf[pos + 1] << 16) | (buf[pos] << 24)
|
||||
|
||||
|
||||
def buffer_to_uint32(buf: bytearray, pos: int, endian: int) -> int:
|
||||
return _ustr_uint32(buf, pos, endian)
|
||||
|
||||
|
||||
def _ustr_int16(buf: bytearray_, pos: int_, endian: int_) -> int_:
|
||||
# Caution: this function will only work with Cython
|
||||
# because it relies on casting the result to a signed int
|
||||
# and will return an unsigned int if not compiled.
|
||||
if endian == LITTLE_ENDIAN:
|
||||
return buf[pos] | (buf[pos + 1] << 8) # pragma: no cover
|
||||
return buf[pos + 1] | (buf[pos] << 8) # pragma: no cover
|
||||
|
||||
|
||||
def buffer_to_int16(buf: bytearray | bytes, pos: int, endian: int) -> int:
|
||||
# Caution: this function will only work with Cython
|
||||
# because it relies on casting the result to a signed int
|
||||
# and will return an unsigned int if not compiled.
|
||||
return _ustr_int16(buf, pos, endian)
|
||||
|
||||
|
||||
def _ustr_uint16(buf: bytearray_, pos: int_, endian: int_) -> int_:
|
||||
if endian == LITTLE_ENDIAN:
|
||||
return buf[pos] | (buf[pos + 1] << 8)
|
||||
return buf[pos + 1] | (buf[pos] << 8)
|
||||
|
||||
|
||||
def buffer_to_uint16(buf: bytearray, pos: int, endian: int) -> int:
|
||||
return _ustr_uint16(buf, pos, endian)
|
||||
|
||||
|
||||
#
|
||||
# Alignment padding is handled with the following formula below
|
||||
#
|
||||
# For any align value, the correct padding formula is:
|
||||
@ -216,6 +259,8 @@ class Unmarshaller:
|
||||
__slots__ = (
|
||||
"_unix_fds",
|
||||
"_buf",
|
||||
"_buf_ustr",
|
||||
"_buf_len",
|
||||
"_pos",
|
||||
"_stream",
|
||||
"_sock",
|
||||
@ -230,14 +275,16 @@ class Unmarshaller:
|
||||
"_uint32_unpack",
|
||||
"_int16_unpack",
|
||||
"_uint16_unpack",
|
||||
"_is_native",
|
||||
"_stream_reader",
|
||||
"_sock_reader",
|
||||
"_sock_with_fds_reader",
|
||||
"_sock_without_fds_reader",
|
||||
"_negotiate_unix_fd",
|
||||
"_read_complete",
|
||||
"_endian",
|
||||
)
|
||||
|
||||
_stream_reader: Callable[[int], bytes]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
stream: io.BufferedRWPair | None = None,
|
||||
@ -246,6 +293,8 @@ class Unmarshaller:
|
||||
) -> None:
|
||||
self._unix_fds: list[int] = []
|
||||
self._buf: bytearray = bytearray.__new__(bytearray) # Actual buffer
|
||||
self._buf_ustr = self._buf # Used to avoid type checks
|
||||
self._buf_len = 0
|
||||
self._stream = stream
|
||||
self._sock = sock
|
||||
self._message: Message | None = None
|
||||
@ -257,25 +306,24 @@ class Unmarshaller:
|
||||
self._message_type = 0
|
||||
self._flag = 0
|
||||
self._msg_len = 0
|
||||
self._is_native = 0
|
||||
self._uint32_unpack: Callable | None = None
|
||||
self._int16_unpack: Callable | None = None
|
||||
self._uint16_unpack: Callable | None = None
|
||||
self._stream_reader: Callable | None = None
|
||||
self._uint32_unpack: Callable[[bytearray, int], tuple[int]] | None = None
|
||||
self._int16_unpack: Callable[[bytearray, int], tuple[int]] | None = None
|
||||
self._uint16_unpack: Callable[[bytearray, int], tuple[int]] | None = None
|
||||
self._negotiate_unix_fd = negotiate_unix_fd
|
||||
self._read_complete = False
|
||||
if stream:
|
||||
if isinstance(stream, io.BufferedRWPair) and hasattr(stream, "reader"):
|
||||
self._stream_reader = stream.reader.read
|
||||
self._stream_reader = stream.read
|
||||
else:
|
||||
self._stream_reader = stream.read
|
||||
elif self._negotiate_unix_fd:
|
||||
if TYPE_CHECKING:
|
||||
assert self._sock is not None
|
||||
self._sock_reader = self._sock.recvmsg
|
||||
self._sock_with_fds_reader = self._sock.recvmsg
|
||||
else:
|
||||
if TYPE_CHECKING:
|
||||
assert self._sock is not None
|
||||
self._sock_reader = self._sock.recv # type: ignore[assignment]
|
||||
self._sock_without_fds_reader = self._sock.recv
|
||||
self._endian = 0
|
||||
|
||||
def _next_message(self) -> None:
|
||||
@ -283,12 +331,16 @@ class Unmarshaller:
|
||||
|
||||
Call this before processing a new message.
|
||||
"""
|
||||
self._unix_fds = []
|
||||
if self._unix_fds:
|
||||
self._unix_fds = []
|
||||
to_clear = HEADER_SIGNATURE_SIZE + self._msg_len
|
||||
if len(self._buf) == to_clear:
|
||||
if self._buf_len == to_clear:
|
||||
self._buf = bytearray.__new__(bytearray)
|
||||
self._buf_len = 0
|
||||
else:
|
||||
del self._buf[:to_clear]
|
||||
self._buf_len -= to_clear
|
||||
self._buf_ustr = self._buf
|
||||
self._msg_len = 0 # used to check if we have ready the header
|
||||
self._read_complete = False # used to check if we have ready the message
|
||||
# No need to reset the unpack functions, they are set in _read_header
|
||||
@ -303,7 +355,7 @@ class Unmarshaller:
|
||||
|
||||
def _has_another_message_in_buffer(self) -> bool:
|
||||
"""Check if there is another message in the buffer."""
|
||||
return len(self._buf) > HEADER_SIGNATURE_SIZE + self._msg_len
|
||||
return self._buf_len > HEADER_SIGNATURE_SIZE + self._msg_len
|
||||
|
||||
def _read_sock_with_fds(self, pos: _int, missing_bytes: _int) -> None:
|
||||
"""reads from the socket, storing any fds sent and handling errors
|
||||
@ -315,7 +367,7 @@ class Unmarshaller:
|
||||
# This will raise BlockingIOError if there is no data to read
|
||||
# which we store in the MARSHALL_STREAM_END_ERROR object
|
||||
try:
|
||||
recv = self._sock_reader(missing_bytes, UNIX_FDS_CMSG_LENGTH)
|
||||
recv = self._sock_with_fds_reader(missing_bytes, UNIX_FDS_CMSG_LENGTH)
|
||||
except OSError as e:
|
||||
errno = e.errno
|
||||
if errno == EAGAIN or errno == EWOULDBLOCK:
|
||||
@ -333,7 +385,8 @@ class Unmarshaller:
|
||||
if not msg:
|
||||
raise EOFError()
|
||||
self._buf += msg
|
||||
if len(self._buf) < pos:
|
||||
self._buf_len = len(self._buf)
|
||||
if self._buf_len < pos:
|
||||
raise MARSHALL_STREAM_END_ERROR
|
||||
|
||||
def _read_sock_without_fds(self, pos: _int) -> None:
|
||||
@ -346,7 +399,7 @@ class Unmarshaller:
|
||||
# which we store in the MARSHALL_STREAM_END_ERROR object
|
||||
while True:
|
||||
try:
|
||||
data = self._sock_reader(DEFAULT_BUFFER_SIZE)
|
||||
data = self._sock_without_fds_reader(DEFAULT_BUFFER_SIZE)
|
||||
except OSError as e:
|
||||
errno = e.errno
|
||||
if errno == EAGAIN or errno == EWOULDBLOCK:
|
||||
@ -354,19 +407,21 @@ class Unmarshaller:
|
||||
raise
|
||||
if not data:
|
||||
raise EOFError()
|
||||
self._buf += data # type: ignore[arg-type]
|
||||
if len(self._buf) >= pos:
|
||||
self._buf += data
|
||||
self._buf_len = len(self._buf)
|
||||
if self._buf_len >= pos:
|
||||
return
|
||||
|
||||
def _read_stream(self, pos: _int, missing_bytes: _int) -> None:
|
||||
"""Read from the stream."""
|
||||
data = self._stream_reader(missing_bytes) # type: ignore[misc]
|
||||
data = self._stream_reader(missing_bytes)
|
||||
if data is None:
|
||||
raise MARSHALL_STREAM_END_ERROR
|
||||
if not data:
|
||||
raise EOFError()
|
||||
self._buf += data
|
||||
if len(self._buf) < pos:
|
||||
self._buf_len = len(self._buf)
|
||||
if self._buf_len < pos:
|
||||
raise MARSHALL_STREAM_END_ERROR
|
||||
|
||||
def _read_to_pos(self, pos: _int) -> None:
|
||||
@ -382,7 +437,7 @@ class Unmarshaller:
|
||||
:returns:
|
||||
None
|
||||
"""
|
||||
missing_bytes = pos - len(self._buf)
|
||||
missing_bytes = pos - self._buf_len
|
||||
if missing_bytes <= 0:
|
||||
return
|
||||
if self._sock is None:
|
||||
@ -391,39 +446,40 @@ class Unmarshaller:
|
||||
self._read_sock_with_fds(pos, missing_bytes)
|
||||
else:
|
||||
self._read_sock_without_fds(pos)
|
||||
self._buf_ustr = self._buf
|
||||
|
||||
def read_uint32_unpack(self, type_: _SignatureType) -> int:
|
||||
return self._read_uint32_unpack()
|
||||
|
||||
def _read_uint32_unpack(self) -> int:
|
||||
self._pos += UINT32_SIZE + (-self._pos & (UINT32_SIZE - 1)) # align
|
||||
if self._is_native and cython.compiled:
|
||||
return _cast_uint32_native( # type: ignore[name-defined] # pragma: no cover
|
||||
self._buf, self._pos - UINT32_SIZE
|
||||
)
|
||||
return self._uint32_unpack(self._buf, self._pos - UINT32_SIZE)[0] # type: ignore[misc]
|
||||
if cython.compiled:
|
||||
if self._buf_len < self._pos:
|
||||
raise IndexError("Not enough data to read uint32")
|
||||
return _ustr_uint32(self._buf_ustr, self._pos - UINT32_SIZE, self._endian)
|
||||
return self._uint32_unpack(self._buf, self._pos - UINT32_SIZE)[0]
|
||||
|
||||
def read_uint16_unpack(self, type_: _SignatureType) -> int:
|
||||
return self._read_uint16_unpack()
|
||||
|
||||
def _read_uint16_unpack(self) -> int:
|
||||
self._pos += UINT16_SIZE + (-self._pos & (UINT16_SIZE - 1)) # align
|
||||
if self._is_native and cython.compiled:
|
||||
return _cast_uint16_native( # type: ignore[name-defined] # pragma: no cover
|
||||
self._buf, self._pos - UINT16_SIZE
|
||||
)
|
||||
return self._uint16_unpack(self._buf, self._pos - UINT16_SIZE)[0] # type: ignore[misc]
|
||||
if cython.compiled:
|
||||
if self._buf_len < self._pos:
|
||||
raise IndexError("Not enough data to read uint16")
|
||||
return _ustr_uint16(self._buf_ustr, self._pos - UINT16_SIZE, self._endian)
|
||||
return self._uint16_unpack(self._buf, self._pos - UINT16_SIZE)[0]
|
||||
|
||||
def read_int16_unpack(self, type_: _SignatureType) -> int:
|
||||
return self._read_int16_unpack()
|
||||
|
||||
def _read_int16_unpack(self) -> int:
|
||||
self._pos += INT16_SIZE + (-self._pos & (INT16_SIZE - 1)) # align
|
||||
if self._is_native and cython.compiled:
|
||||
return _cast_int16_native( # type: ignore[name-defined] # pragma: no cover
|
||||
self._buf, self._pos - INT16_SIZE
|
||||
)
|
||||
return self._int16_unpack(self._buf, self._pos - INT16_SIZE)[0] # type: ignore[misc]
|
||||
if cython.compiled:
|
||||
if self._buf_len < self._pos:
|
||||
raise IndexError("Not enough data to read int16")
|
||||
return _ustr_int16(self._buf_ustr, self._pos - INT16_SIZE, self._endian)
|
||||
return self._int16_unpack(self._buf, self._pos - INT16_SIZE)[0]
|
||||
|
||||
def read_boolean(self, type_: _SignatureType) -> bool:
|
||||
return self._read_boolean()
|
||||
@ -439,23 +495,33 @@ class Unmarshaller:
|
||||
self._pos += UINT32_SIZE + (-self._pos & (UINT32_SIZE - 1)) # align
|
||||
str_start = self._pos
|
||||
# read terminating '\0' byte as well (str_length + 1)
|
||||
if self._is_native and cython.compiled:
|
||||
self._pos += ( # pragma: no cover
|
||||
_cast_uint32_native(self._buf, str_start - UINT32_SIZE) + 1 # type: ignore[name-defined]
|
||||
if cython.compiled:
|
||||
if self._buf_len < self._pos:
|
||||
raise IndexError("Not enough data to read uint32")
|
||||
self._pos += (
|
||||
_ustr_uint32(self._buf_ustr, str_start - UINT32_SIZE, self._endian) + 1
|
||||
)
|
||||
if self._buf_len < self._pos:
|
||||
raise IndexError("Not enough data to read string")
|
||||
else:
|
||||
self._pos += self._uint32_unpack(self._buf, str_start - UINT32_SIZE)[0] + 1 # type: ignore[misc]
|
||||
return self._buf[str_start : self._pos - 1].decode()
|
||||
self._pos += self._uint32_unpack(self._buf, str_start - UINT32_SIZE)[0] + 1
|
||||
return self._buf_ustr[str_start : self._pos - 1].decode()
|
||||
|
||||
def read_signature(self, type_: _SignatureType) -> str:
|
||||
return self._read_signature()
|
||||
|
||||
def _read_signature(self) -> str:
|
||||
signature_len = self._buf[self._pos] # byte
|
||||
if cython.compiled:
|
||||
if self._buf_len < self._pos:
|
||||
raise IndexError("Not enough data to read signature")
|
||||
signature_len = self._buf_ustr[self._pos] # byte
|
||||
o = self._pos + 1
|
||||
# read terminating '\0' byte as well (str_length + 1)
|
||||
self._pos = o + signature_len + 1
|
||||
return self._buf[o : o + signature_len].decode()
|
||||
if cython.compiled:
|
||||
if self._buf_len < self._pos:
|
||||
raise IndexError("Not enough data to read signature")
|
||||
return self._buf_ustr[o : o + signature_len].decode()
|
||||
|
||||
def read_variant(self, type_: _SignatureType) -> Variant:
|
||||
return self._read_variant()
|
||||
@ -542,13 +608,14 @@ class Unmarshaller:
|
||||
self._pos += (
|
||||
-self._pos & (UINT32_SIZE - 1)
|
||||
) + UINT32_SIZE # align for the uint32
|
||||
if self._is_native and cython.compiled:
|
||||
array_length = _cast_uint32_native( # type: ignore[name-defined] # pragma: no cover
|
||||
self._buf, self._pos - UINT32_SIZE
|
||||
if cython.compiled:
|
||||
if self._buf_len < self._pos:
|
||||
raise IndexError("Not enough data to read uint32")
|
||||
array_length = _ustr_uint32(
|
||||
self._buf_ustr, self._pos - UINT32_SIZE, self._endian
|
||||
)
|
||||
else:
|
||||
array_length = self._uint32_unpack(self._buf, self._pos - UINT32_SIZE)[0] # type: ignore[misc]
|
||||
|
||||
array_length = self._uint32_unpack(self._buf, self._pos - UINT32_SIZE)[0]
|
||||
child_type: SignatureType = type_.children[0]
|
||||
token_as_int = ord(child_type.token[0])
|
||||
|
||||
@ -628,22 +695,23 @@ class Unmarshaller:
|
||||
def _header_fields(self, header_length: _int) -> list[Any]:
|
||||
"""Header fields are always a(yv)."""
|
||||
beginning_pos = self._pos
|
||||
buf = self._buf
|
||||
readers = self._readers
|
||||
headers = _EMPTY_HEADERS.copy()
|
||||
if cython.compiled:
|
||||
if self._buf_len < self._pos + header_length:
|
||||
raise IndexError("Not enough data to read header")
|
||||
while self._pos - beginning_pos < header_length:
|
||||
# Now read the y (byte) of struct (yv)
|
||||
self._pos += (-self._pos & 7) + 1 # align 8 + 1 for 'y' byte
|
||||
field_0 = buf[self._pos - 1]
|
||||
field_0 = self._buf_ustr[self._pos - 1]
|
||||
|
||||
# Now read the v (variant) of struct (yv)
|
||||
# first we read the signature
|
||||
signature_len = buf[self._pos] # byte
|
||||
signature_len = self._buf_ustr[self._pos] # byte
|
||||
o = self._pos + 1
|
||||
self._pos += signature_len + 2 # one for the byte, one for the '\0'
|
||||
if field_0 == HEADER_UNIX_FDS_IDX: # defined by self._unix_fds
|
||||
continue
|
||||
token_as_int = buf[o]
|
||||
token_as_int = self._buf_ustr[o]
|
||||
# Now that we have the token we can read the variant value
|
||||
# Strings and signatures are the most common types
|
||||
# so we inline them for performance
|
||||
@ -652,10 +720,10 @@ class Unmarshaller:
|
||||
elif token_as_int == TOKEN_G_AS_INT:
|
||||
headers[field_0] = self._read_signature()
|
||||
else:
|
||||
token = buf[o : o + signature_len].decode()
|
||||
token = self._buf[o : o + signature_len].decode()
|
||||
# There shouldn't be any other types in the header
|
||||
# but just in case, we'll read it using the slow path
|
||||
headers[field_0] = readers[token](
|
||||
headers[field_0] = self._readers[token](
|
||||
self, get_signature_tree(token).types[0]
|
||||
)
|
||||
return headers
|
||||
@ -665,55 +733,42 @@ class Unmarshaller:
|
||||
# Signature is of the header is
|
||||
# BYTE, BYTE, BYTE, BYTE, UINT32, UINT32, ARRAY of STRUCT of (BYTE,VARIANT)
|
||||
self._read_to_pos(HEADER_SIGNATURE_SIZE)
|
||||
buffer = self._buf
|
||||
endian = buffer[0]
|
||||
self._message_type = buffer[1]
|
||||
self._flag = buffer[2]
|
||||
protocol_version = buffer[3]
|
||||
endian = self._buf_ustr[0]
|
||||
self._message_type = self._buf_ustr[1]
|
||||
self._flag = self._buf_ustr[2]
|
||||
protocol_version = self._buf_ustr[3]
|
||||
|
||||
if protocol_version != PROTOCOL_VERSION:
|
||||
raise InvalidMessageError(
|
||||
f"got unknown protocol version: {protocol_version}"
|
||||
)
|
||||
|
||||
if cython.compiled and (
|
||||
(endian == LITTLE_ENDIAN and SYS_IS_LITTLE_ENDIAN)
|
||||
or (endian == BIG_ENDIAN and SYS_IS_BIG_ENDIAN)
|
||||
):
|
||||
self._is_native = 1 # pragma: no cover
|
||||
self._body_len = _cast_uint32_native( # type: ignore[name-defined] # pragma: no cover
|
||||
buffer, 4
|
||||
)
|
||||
self._serial = _cast_uint32_native( # type: ignore[name-defined] # pragma: no cover
|
||||
buffer, 8
|
||||
)
|
||||
self._header_len = _cast_uint32_native( # type: ignore[name-defined] # pragma: no cover
|
||||
buffer, 12
|
||||
if endian != LITTLE_ENDIAN and endian != BIG_ENDIAN:
|
||||
raise InvalidMessageError(
|
||||
f"Expecting endianness as the first byte, got {endian} from {self._buf}"
|
||||
)
|
||||
|
||||
if cython.compiled:
|
||||
self._body_len = _ustr_uint32(self._buf_ustr, 4, endian)
|
||||
self._serial = _ustr_uint32(self._buf_ustr, 8, endian)
|
||||
self._header_len = _ustr_uint32(self._buf_ustr, 12, endian)
|
||||
elif endian == LITTLE_ENDIAN:
|
||||
(
|
||||
self._body_len,
|
||||
self._serial,
|
||||
self._header_len,
|
||||
) = UNPACK_HEADER_LITTLE_ENDIAN(buffer, 4)
|
||||
self._body_len, self._serial, self._header_len = (
|
||||
UNPACK_HEADER_LITTLE_ENDIAN(self._buf, 4)
|
||||
)
|
||||
self._uint32_unpack = UINT32_UNPACK_LITTLE_ENDIAN
|
||||
self._int16_unpack = INT16_UNPACK_LITTLE_ENDIAN
|
||||
self._uint16_unpack = UINT16_UNPACK_LITTLE_ENDIAN
|
||||
elif endian == BIG_ENDIAN:
|
||||
else: # BIG_ENDIAN
|
||||
self._body_len, self._serial, self._header_len = UNPACK_HEADER_BIG_ENDIAN(
|
||||
buffer, 4
|
||||
self._buf, 4
|
||||
)
|
||||
self._uint32_unpack = UINT32_UNPACK_BIG_ENDIAN
|
||||
self._int16_unpack = INT16_UNPACK_BIG_ENDIAN
|
||||
self._uint16_unpack = UINT16_UNPACK_BIG_ENDIAN
|
||||
else:
|
||||
raise InvalidMessageError(
|
||||
f"Expecting endianness as the first byte, got {endian} from {buffer}"
|
||||
)
|
||||
|
||||
self._msg_len = (
|
||||
self._header_len + (-self._header_len & 7) + self._body_len
|
||||
) # align 8
|
||||
# align 8
|
||||
self._msg_len = self._header_len + (-self._header_len & 7) + self._body_len
|
||||
if self._endian != endian:
|
||||
self._readers = self._readers_by_type[endian]
|
||||
self._endian = endian
|
||||
|
||||
@ -8,10 +8,54 @@ import pytest
|
||||
|
||||
from dbus_fast import Message, MessageFlag, MessageType, SignatureTree, Variant
|
||||
from dbus_fast._private._cython_compat import FakeCython
|
||||
from dbus_fast._private.unmarshaller import Unmarshaller
|
||||
from dbus_fast._private.unmarshaller import (
|
||||
Unmarshaller,
|
||||
is_compiled,
|
||||
buffer_to_int16,
|
||||
buffer_to_uint32,
|
||||
buffer_to_uint16,
|
||||
)
|
||||
from dbus_fast._private.constants import BIG_ENDIAN, LITTLE_ENDIAN
|
||||
|
||||
from dbus_fast.unpack import unpack_variants
|
||||
|
||||
|
||||
def test_bytearray_to_uint32_big_end():
|
||||
assert buffer_to_uint32(bytearray(b"\x01\x02\x03\x04"), 0, BIG_ENDIAN) == 16909060
|
||||
|
||||
|
||||
def test_bytearray_to_uint16_big_end():
|
||||
assert buffer_to_uint16(bytearray(b"\x01\x02"), 0, BIG_ENDIAN) == 258
|
||||
|
||||
|
||||
def test_bytearray_to_int16_big_end():
|
||||
assert buffer_to_int16(bytearray(b"\x01\x02"), 0, BIG_ENDIAN) == 258
|
||||
|
||||
|
||||
@pytest.mark.skipif(not is_compiled(), reason="requires cython")
|
||||
def test_bytearray_to_int16_big_end_signed():
|
||||
assert buffer_to_int16(bytearray(b"\xff\xff"), 0, BIG_ENDIAN) == -1
|
||||
|
||||
|
||||
def test_bytearray_to_uint32_little_end():
|
||||
assert (
|
||||
buffer_to_uint32(bytearray(b"\x01\x02\x03\x04"), 0, LITTLE_ENDIAN) == 67305985
|
||||
)
|
||||
|
||||
|
||||
def test_bytearray_to_uint16_little_end():
|
||||
assert buffer_to_uint16(bytearray(b"\x01\x02"), 0, LITTLE_ENDIAN) == 513
|
||||
|
||||
|
||||
def test_bytearray_to_int16_little_end():
|
||||
assert buffer_to_int16(bytearray(b"\x01\x02"), 0, LITTLE_ENDIAN) == 513
|
||||
|
||||
|
||||
@pytest.mark.skipif(not is_compiled(), reason="requires cython")
|
||||
def test_bytearray_to_int16_little_end_signed():
|
||||
assert buffer_to_int16(bytearray(b"\xff\xff"), 0, LITTLE_ENDIAN) == -1
|
||||
|
||||
|
||||
def print_buf(buf):
|
||||
i = 0
|
||||
while True:
|
||||
@ -536,7 +580,7 @@ def test_unmarshall_large_message():
|
||||
}
|
||||
|
||||
|
||||
def test_unmarshall_big_endian_message():
|
||||
def test_unmarshall_big_end_message():
|
||||
"""Test we can unmarshall a big endian message."""
|
||||
msg = (
|
||||
b"B\x01\x00\x01\x00\x00\x00 \x00\x00\x00\x00\x00\x00\x00\x82"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user