feat: add typing to auth module (#110)
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
import enum
|
import enum
|
||||||
import os
|
import os
|
||||||
|
from typing import List, Tuple
|
||||||
|
|
||||||
from .errors import AuthError
|
from .errors import AuthError
|
||||||
|
|
||||||
@@ -11,7 +12,7 @@ from .errors import AuthError
|
|||||||
# doing that. I might just end up giving the raw socket and leaving it all up
|
# doing that. I might just end up giving the raw socket and leaving it all up
|
||||||
# to the user, but it would be nice to have a little guidance in the interface
|
# to the user, but it would be nice to have a little guidance in the interface
|
||||||
# since a lot of it is strongly specified. If you have a need for this, contact
|
# since a lot of it is strongly specified. If you have a need for this, contact
|
||||||
# the project maintainer to help stabalize this interface.
|
# the project maintainer to help stabilize this interface.
|
||||||
|
|
||||||
|
|
||||||
class _AuthResponse(enum.Enum):
|
class _AuthResponse(enum.Enum):
|
||||||
@@ -22,7 +23,7 @@ class _AuthResponse(enum.Enum):
|
|||||||
AGREE_UNIX_FD = "AGREE_UNIX_FD"
|
AGREE_UNIX_FD = "AGREE_UNIX_FD"
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def parse(klass, line):
|
def parse(klass, line: str) -> Tuple["_AuthResponse", List[str]]:
|
||||||
args = line.split(" ")
|
args = line.split(" ")
|
||||||
response = klass(args[0])
|
response = klass(args[0])
|
||||||
return response, args[1:]
|
return response, args[1:]
|
||||||
@@ -37,18 +38,18 @@ class Authenticator:
|
|||||||
:seealso: https://dbus.freedesktop.org/doc/dbus-specification.html#auth-protocol
|
:seealso: https://dbus.freedesktop.org/doc/dbus-specification.html#auth-protocol
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def _authentication_start(self, negotiate_unix_fd=False):
|
def _authentication_start(self, negotiate_unix_fd: bool = False) -> str:
|
||||||
raise NotImplementedError(
|
raise NotImplementedError(
|
||||||
"authentication_start() must be implemented in the inheriting class"
|
"authentication_start() must be implemented in the inheriting class"
|
||||||
)
|
)
|
||||||
|
|
||||||
def _receive_line(self, line):
|
def _receive_line(self, line: str) -> str:
|
||||||
raise NotImplementedError(
|
raise NotImplementedError(
|
||||||
"receive_line() must be implemented in the inheriting class"
|
"receive_line() must be implemented in the inheriting class"
|
||||||
)
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _format_line(line):
|
def _format_line(line: str) -> bytes:
|
||||||
return f"{line}\r\n".encode()
|
return f"{line}\r\n".encode()
|
||||||
|
|
||||||
|
|
||||||
@@ -59,16 +60,16 @@ class AuthExternal(Authenticator):
|
|||||||
:sealso: https://dbus.freedesktop.org/doc/dbus-specification.html#auth-protocol
|
:sealso: https://dbus.freedesktop.org/doc/dbus-specification.html#auth-protocol
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self) -> None:
|
||||||
self.negotiate_unix_fd = False
|
self.negotiate_unix_fd: bool = False
|
||||||
self.negotiating_fds = False
|
self.negotiating_fds: bool = False
|
||||||
|
|
||||||
def _authentication_start(self, negotiate_unix_fd=False) -> str:
|
def _authentication_start(self, negotiate_unix_fd: bool = False) -> str:
|
||||||
self.negotiate_unix_fd = negotiate_unix_fd
|
self.negotiate_unix_fd = negotiate_unix_fd
|
||||||
hex_uid = str(os.getuid()).encode().hex()
|
hex_uid = str(os.getuid()).encode().hex()
|
||||||
return f"AUTH EXTERNAL {hex_uid}"
|
return f"AUTH EXTERNAL {hex_uid}"
|
||||||
|
|
||||||
def _receive_line(self, line: str):
|
def _receive_line(self, line: str) -> str:
|
||||||
response, args = _AuthResponse.parse(line)
|
response, args = _AuthResponse.parse(line)
|
||||||
|
|
||||||
if response is _AuthResponse.OK:
|
if response is _AuthResponse.OK:
|
||||||
@@ -91,7 +92,7 @@ class AuthAnnonymous(Authenticator):
|
|||||||
:sealso: https://dbus.freedesktop.org/doc/dbus-specification.html#auth-protocol
|
:sealso: https://dbus.freedesktop.org/doc/dbus-specification.html#auth-protocol
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def _authentication_start(self, negotiate_unix_fd=False) -> str:
|
def _authentication_start(self, negotiate_unix_fd: bool = False) -> str:
|
||||||
if negotiate_unix_fd:
|
if negotiate_unix_fd:
|
||||||
raise AuthError(
|
raise AuthError(
|
||||||
"annonymous authentication does not support negotiating unix fds right now"
|
"annonymous authentication does not support negotiating unix fds right now"
|
||||||
|
|||||||
Reference in New Issue
Block a user