"""Connection backend implementation file.""" from vpn_manager.service.connections.base import ConnectionBase, ConfigSpec from typing import TypedDict, Required from enum import IntEnum, StrEnum from asyncio import ( create_subprocess_exec, get_running_loop, sleep, timeout, CancelledError, ) from asyncio.subprocess import Process, PIPE, DEVNULL from dbus_fast import Variant from sys import platform import requests from vpn_manager.utils import unwrap_variant from .insecure_tls import TLSAdapter import logging import ssl from . import errors from .auth_flows import AuthFlows import base64 import re import psutil import signal from datetime import timedelta from vpn_manager.service.inhibitor import Inhibitor class LoginTarget(StrEnum): """ Login target enum. Has two methods for selecting the target, using the GlobalProtect client's names, and the more technical names that specify """ GATEWAY = "gateway" PORTAL = "portal" SSL = "ssl" IPSEC = "ipsec" class Options(TypedDict, total=False): """Options type definition for GlobalProtect VPNs.""" hostname: Required[str] verify_certificate: bool login_target: Required[LoginTarget] allow_insecure_crypto: bool spoof_clientos: str use_default_browser: bool auth_cache_timeout: int # If set, will attempt to open the in-browser request using # a Firefox "Open external links in a container" plugin URI, # using the container name set here. firefox_browser_container: str class Auth(TypedDict): """Auth type definition for GlobalProtect VPNs.""" username: str cookie: str class GlobalProtectConnection( ConnectionBase, connection_type="com.paloaltonetworks.GlobalProtect", ): """GlobalProtect VPN connection backend.""" _proc: Process | None = None _pid: int | None = None _disconnecting: bool = False PLATFORM_TO_CLIENTOS = { "linux": "Linux", "darwin": "Mac", "win32": "Windows", "cygwin": "Windows", } logger = logging.getLogger(f"{__name__}.GlobalProtectConnection") def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self._inhibitor = Inhibitor( Inhibitor.What.sleep, "vpn-manager", "Disconnect from VPN before sleep", "delay", ) async def inhibitor_disconnect(): await self.disconnect(timeout_sec=2) self._inhibitor.on_sleep_or_shutdown(inhibitor_disconnect) def get_auth_cache_timeout(self, options: dict[str, Variant]) -> timedelta | None: """Get the timeout for the authdata cache.""" timeout = options.get("auth_cache_timeout") if timeout is None: return None return timedelta(seconds=timeout.value) async def _prelogin(self, options: dict[str, Variant]): """Perform prelogin request.""" self.logger.debug("Begin preauth with options:", options) ## Extract options from vardict if "spoof_clientos" in options: clientos = options["spoof_clientos"].value else: clientos = self.PLATFORM_TO_CLIENTOS.get(platform) if clientos is None: raise errors.PlatformError( f"Platform {platform} is not supported. Specify a client OS to spoof in the connection options." ) verify = unwrap_variant(options.get("verify_certificate", True)) if not verify: # Disabling certificate verification is extremely dangerous and should only be done if you KNOW WHAT YOU ARE DOING self.logger.warn("Certificate verification disabled! This is dangerous!") if options.get("login_target").value in {LoginTarget.GATEWAY, LoginTarget.SSL}: url = f"{options.get('hostname').value}/ssl-vpn/prelogin.esp" else: url = f"{options.get('hostname').value}/global-protect/prelogin.esp" allow_insecure_crypto = unwrap_variant( options.get("allow_insecure_crypto", False) ) use_default_browser = unwrap_variant(options.get("use_default_browser", True)) ## Configure session session = requests.Session() # User-agent must be set to 'PAN GlobalProtect' or the subsequent SAML request will fail. session.headers["User-Agent"] = "PAN GlobalProtect" if allow_insecure_crypto: # Insecure crypto is dangerous, log a warning self.logger.warn( "Using insecure crypto for prelogin request! This is dangerous!" ) session.mount("https://", TLSAdapter(verify=verify)) data = { "ipv6-support": "yes", "clientos": clientos, "clientVer": 4100, # This needs to be hardcoded apparently. "tmp": "tmp", # No idea what this is for, but other GP OpenConnect wrappers set it so we do too "cas-support": "yes", "default-browser": ( 1 if use_default_browser else 0 ), # This will influence which auth flow we ultimately use, but the server may reject our request to use the default browser. } try: res = session.post(url, verify=verify, data=data) except Exception as ex: # Get the root exception rootex = ex while True: if isinstance(rootex, ssl.SSLError): break elif not rootex.__cause__ and not rootex.__context__: break rootex = rootex.__cause__ or rootex.__context__ if isinstance(rootex, ssl.CertificateError): # Raise a D-Bus certificate error self.logger.error(f"SSL certificate error: {rootex}") raise errors.CertificateError(str(rootex)) elif isinstance(rootex, ssl.SSLError): # Raise a D-Bus SSL error self.logger.error(f"Generic SSL error: {rootex}") raise errors.SSLError(str(rootex)) else: # Raise a D-Bus generic prelogin failure raise errors.PreloginFailure(f"An unexpected error occurred: {rootex}") self.logger.debug("Got XML response:", res.text) # Pass XML parsing off to agent for security reasons result = await self._manager.request_credentials( AuthFlows.GLOBALPROTECT_PARSE_PRELOGIN, Variant("s", res.text) ) if result.signature != "a{ss}": # Make sure that the agent is well-behaved and is returning a valid response for this request. raise errors.PreloginFailure( "Received unexpected response from agent. Expected strdict (a{ss}), got " f"{result.signature}" ) self.logger.debug("Got parsed result", result.value) return result.value async def authenticate(self, options: dict[str, Variant]) -> Variant: f"""{super().authenticate.__doc__}""" self.logger.info("Starting GlobalProtect authentication.") preauth: dict[str, str] = await self._prelogin(options) if "saml-auth-method" not in preauth or "saml-request" not in preauth: raise errors.AuthFlowUnsupported( "Server requested unsupported authentication flow. We currently only support SAML auth for GlobalProtect." ) auth_flow = ( AuthFlows.GLOBALPROTECT_SAML_DEFAULT_BROWSER if preauth.get("saml-default-browser") == "yes" and preauth.get("saml-auth-method") == "REDIRECT" else AuthFlows.GLOBALPROTECT_SAML_INTEGRATED ) if ( auth_flow == AuthFlows.GLOBALPROTECT_SAML_DEFAULT_BROWSER and "firefox_browser_container" in options ): auth_result = await self._manager.request_credentials( auth_flow, Variant( "(ss)", ( base64.b64decode(preauth.get("saml-request")).decode("utf-8"), options["firefox_browser_container"].value, ), ), ) else: auth_result = await self._manager.request_credentials( auth_flow, Variant( "s", base64.b64decode(preauth.get("saml-request")).decode("utf-8") ), ) if auth_result.signature != "a{ss}": # Make sure the agent is well-behaved and is returning a valid response for this request. raise errors.AuthenticationFailure( "Received unexpected auth data. Expected strdict (a{ss}), got " f"{auth_result.signature}" ) if auth_result.value.get("saml-auth-status") != "1": raise errors.AuthenticationFailure( "Server indicates authentication failure" ) # Guard to make sure the attributes we use to authenticate to the VPN are actually available. if "prelogin-cookie" not in auth_result.value: raise errors.InvalidResponse("Did not receive cookie from server") if "saml-username" not in auth_result.value: raise errors.InvalidResponse("Did not receive SAML username from server") return auth_result async def connect(self, options: dict[str, Variant], auth: Variant): f"""{super().connect.__doc__}""" if self._proc and self._proc.is_running(): self.logger.error( "Cowardly refusing to connect to VPN while already connected." ) raise errors.ConnectionFailure( "Cowardly refusing to connect to VPN while already connected." ) # Take logind inhibitor try: await self._inhibitor.acquire() except: self.logger.warning( f"Unable to acquire sleep delay inhibitor. Networking will be broken on resume from suspend." ) # Create subprocess proc = await create_subprocess_exec( "/usr/bin/openconnect", "--protocol=gp", f"--user={auth.value.get('saml-username')}", "--useragent=PAN GlobalProtect", f"--usergroup={options.get('login_target', Variant('s', LoginTarget.PORTAL)).value in {LoginTarget.PORTAL, LoginTarget.IPSEC} and 'portal' or 'gateway'}:prelogin-cookie", "--passwd-on-stdin", options.get("hostname").value, "-b", stdin=PIPE, stdout=PIPE, stderr=PIPE, ) self.logger.debug(f"Initial process PID: {proc.pid}") # Send prelogin cookie proc.stdin.write(auth.value.get("prelogin-cookie").encode()) proc.stdin.close() # Create logger for openconnect openconnect_logger = logging.getLogger( f"{__name__}.GlobalProtectConnection.openconnect" ) # Spawn task to bridge stderr to log self._stderr_task = get_running_loop().create_task( self._bridge_output(proc.stderr, openconnect_logger.warning) ) # Bridge stdout to log and also search for PID of forked process, with a timeout of 10 seconds after which we assume a connection failure and terminate the process. self._pid = None try: async with timeout(10): async for line in proc.stdout: decoded = line.decode("utf-8").strip() if decoded == "": continue openconnect_logger.debug(decoded) if match_ := re.match( r"Continuing in background; pid ([0-9]+)", decoded ): self._pid = int(match_.group(1)) break except TimeoutError: self.logger.error("Connection timed out") proc.kill() await proc.wait() raise errors.ConnectionFailure("Connection timed out") if self._pid is None: # Never received PID of child, if we get here then openconnect exited without reporting a PID, indicating a failure to connect. self.logger.error("VPN failed to connect") raise errors.ConnectionFailure() self.logger.debug(f"Got PID for openconnect: {self._pid}") # Spawn task to bridge remainder of stdout to log self._stdout_task = get_running_loop().create_task( self._bridge_output(proc.stdout, openconnect_logger.debug) ) self.logger.info("Connection established") self.logger.debug(f"openconnect PID: {self._pid}") self._proc = psutil.Process(self._pid) self._wait_task = get_running_loop().create_task(self._wait_for_exit()) # Create a task that will wait for the process to exit so we don't leave zombies around # (for some reason, proc.wait() waits until the streams are all closed as well :/) self._proc_wait_task = get_running_loop().create_task(proc.wait()) async def _bridge_output(self, stream, processor): async for line in stream: decoded = line.decode("utf-8").strip() if decoded == "": # Don't log a blank line continue processor(line.decode("utf-8").strip()) self.logger.debug(f"Stream {stream} closed") async def _wait_for_exit(self): """Wait for self._proc to exit.""" try: while self._proc.is_running(): # Check process every 5 seconds # I'd prefer a proper-async way to do this, but i guess this will work. self.logger.debug("openconnect process is running") await sleep(5) self.logger.debug("openconnect has exited") if not self._disconnecting: # Only call on_disconnected if the disconnect was asynchronous self._connection.on_disconnected() self._disconnecting = False except CancelledError: self.logger.debug("exit wait canceled.") finally: self._inhibitor.release() async def disconnect(self, *, timeout_sec: int = 10): f"""{super().disconnect.__doc__}""" if self._proc is None: self.logger.warn( "Cowardly refusing to disconnect from a VPN that is not connected" ) return if self._wait_task is not None: self._wait_task.cancel() self._disconnecting = True try: self._proc.terminate() except: # Process doesn't exist, we are in the state we want so... :shrug: this is fine return # If the process takes longer than 10 seconds to exit, kill it sith WIGKILL try: async with timeout(timeout_sec): await self._proc_wait_task except TimeoutError: self._proc.kill() async def temporary_disconnect(self): f"""{super().temporary_disconnect.__doc__}""" if self._proc is None: self.logger.warn( "Cowardly refusing to disconnect from a VPN that is not connected" ) return if self._wait_task is not None: self._wait_task.cancel() self._disconnecting = True self._proc.send_signal(signal.SIGHUP) await self._proc_wait_task async def reconnect(self): f"""{super().reconnect.__doc__}""" if self._proc is None: self.logger.warn( "Cowardly refusing to atomically reconnect to a VPN that is not connected" ) return self._proc.send_signal(signal.SIGUSR2) @classmethod def validate_options(cls, options: dict[str, Variant]): # options must contain hostname and login target. if "hostname" not in options: raise errors.InvalidOptions.missing("hostname") if "login_target" not in options: raise errors.InvalidOptions.missing( "login_target", details="What login endpoint to use." ) # validate types if sig := options["hostname"].signature != "s": raise errors.InvalidOptions.invalid_type("hostname", "s", sig) if sig := options["login_target"].signature != "s": raise errors.InvalidOptions.invalid_type("login_target", "s", sig) if ( sig := options.get("verify_certificate", Variant("b", True)).signature != "b" ): raise errors.InvalidOptions.invalid_type("verify_certifcate", "b", sig) if ( sig := options.get("allow_insecure_crypto", Variant("b", True)).signature != "b" ): raise errors.InvalidOptions.invalid_type("verify_certificate", "b", sig) if sig := options.get("spoof_clientos", Variant("s", "")).signature != "s": raise errors.InvalidOptions.invalid_type("spoof_clientos", "s", sig) if ( sig := options.get("use_default_browser", Variant("b", True)).signature != "b" ): raise errors.InvalidOptions.invalid_type("use_default_browser", "b", sig) if ( sig := options.get("auth_cache_timeout", Variant("u", True)).signature != "u" ): raise errors.InvalidOptions.invalid_type("auth_cache_timeout", "u", sig) if ( sig := options.get("firefox_browser_container", Variant("s", "")).signature != "s" ): raise errors.InvalidOptions.invalid_type("use_default_browser", "s", sig) @classmethod def restore_options(cls, options: dict) -> dict[str, Variant]: result = {} cls.put_value(result, "s", options, "hostname") cls.put_value(result, "s", options, "login_target") cls.put_value(result, "s", options, "spoof_clientos") cls.put_value(result, "b", options, "verify_certificate") cls.put_value(result, "b", options, "allow_insecure_crypto") cls.put_value(result, "b", options, "use_default_browser") cls.put_value(result, "u", options, "auth_cache_timeout") cls.put_value(result, "s", options, "firefox_browser_container") return result @classmethod def get_config_spec(cls): """See parent.""" return cls._build_config_spec( ConfigSpec( name="hostname", signature="s", description="Hostname of the VPN server", required=True, ), ConfigSpec( name="login_target", signature="s", description="What kind of connection to create (IPSec or SSL)", choices=[ LoginTarget.PORTAL, LoginTarget.GATEWAY, LoginTarget.SSL, LoginTarget.IPSEC, ], default=LoginTarget.IPSEC, ), ConfigSpec( name="spoof_clientos", signature="s", description="Custom OS identifier to use instead of autodetected OS", ), ConfigSpec( name="verify_certificate", signature="b", description="Whether to verify the certificate provided by the server. DISABLING THIS IS DANGEROUS!!!", default=True, ), ConfigSpec( name="allow_insecure_crypto", signature="b", description="Whether to allow older, insecure TLS versions when connecting to the server. ENABLING THIS IS DANGEROUS!!!", default=False, ), ConfigSpec( name="use_default_browser", signature="b", description="Whether to request use of the default browser when authenticating. The server may not allow this.", default=True, ), ConfigSpec( name="auth_cache_timeout", signature="u", description="The timeout on caching of authentication results for reconnecting after unexpected or temporary disconnects. This should never be longer than the session timeout. Leaving empty disables caching entirely.", ), ConfigSpec( name="firefox_browser_container", signature="s", description="Specify a browser container to open the authentication page in. Requires Firefox and the 'Open external links in container' plugin", ), )