379 lines
15 KiB
Python

"""Connection backend implementation file."""
from vpn_manager.service.connections.base import ConnectionBase
from typing import TypedDict, Required
from enum import IntEnum, StrEnum
from asyncio import (
create_subprocess_exec,
get_running_loop,
sleep,
timeout,
CancelledError,
)
from asyncio.subprocess import Process, PIPE, DEVNULL
from dbus_fast import Variant
from sys import platform
import requests
from vpn_manager.utils import unwrap_variant
from .insecure_tls import TLSAdapter
import logging
import ssl
from . import errors
from .auth_flows import AuthFlows
import base64
import re
import psutil
class LoginTarget(StrEnum):
"""Login target enum."""
GATEWAY = "gateway"
PORTAL = "portal"
class Options(TypedDict, total=False):
"""Options type definition for GlobalProtect VPNs."""
hostname: Required[str]
verify_certificate: bool
login_target: Required[LoginTarget]
allow_insecure_crypto: bool
spoof_clientos: str
use_default_browser: bool
# If set, will attempt to open the in-browser request using
# a Firefox "Open external links in a container" plugin URI,
# using the container name set here.
firefox_browser_container: str
class Auth(TypedDict):
"""Auth type definition for GlobalProtect VPNs."""
username: str
cookie: str
class GlobalProtectConnection(
ConnectionBase,
connection_type="dev.ezri.vpn1.Connection.GlobalProtect",
):
"""GlobalProtect VPN connection backend."""
_proc: Process | None = None
_pid: int | None = None
_disconnecting: bool = False
PLATFORM_TO_CLIENTOS = {
"linux": "Linux",
"darwin": "Mac",
"win32": "Windows",
"cygwin": "Windows",
}
logger = logging.getLogger(f"{__name__}.GlobalProtectConnection")
async def _prelogin(self, options: dict[str, Variant]):
"""Perform prelogin request."""
## Extract options from vardict
if "spoof_clientos" in options:
clientos = options["spoof_clientos"].value
else:
clientos = self.PLATFORM_TO_CLIENTOS.get(platform)
if clientos is None:
raise errors.PlatformError(
f"Platform {platform} is not supported. Specify a client OS to spoof in the connection options."
)
verify = unwrap_variant(options.get("verify_certificate", True))
if not verify:
# Disabling certificate verification is extremely dangerous and should only be done if you KNOW WHAT YOU ARE DOING
self.logger.warn("Certificate verification disabled! This is dangerous!")
if options.get("login_target").value == LoginTarget.GATEWAY:
url = f"{options.get('hostname').value}/ssl-vpn/prelogin.esp"
else:
url = f"{options.get('hostname').value}/global-protect/prelogin.esp"
allow_insecure_crypto = unwrap_variant(
options.get("allow_insecure_crypto", False)
)
use_default_browser = unwrap_variant(options.get("use_default_browser", True))
## Configure session
session = requests.Session()
# User-agent must be set to 'PAN GlobalProtect' or the subsequent SAML request will fail.
session.headers["User-Agent"] = "PAN GlobalProtect"
if allow_insecure_crypto:
# Insecure crypto is dangerous, log a warning
self.logger.warn(
"Using insecure crypto for prelogin request! This is dangerous!"
)
session.mount("https://", TLSAdapter(verify=verify))
data = {
"ipv6-support": "yes",
"clientos": clientos,
"clientVer": 4100, # This needs to be hardcoded apparently.
"tmp": "tmp", # No idea what this is for, but other GP OpenConnect wrappers set it so we do too
"cas-support": "yes",
"default-browser": (
1 if use_default_browser else 0
), # This will influence which auth flow we ultimately use, but the server may reject our request to use the default browser.
}
try:
res = session.post(url, verify=verify, data=data)
except Exception as ex:
# Get the root exception
rootex = ex
while True:
if isinstance(rootex, ssl.SSLError):
break
elif not rootex.__cause__ and not rootex.__context__:
break
rootex = rootex.__cause__ or rootex.__context__
if isinstance(rootex, ssl.CertificateError):
# Raise a D-Bus certificate error
self.logger.error(f"SSL certificate error: {rootex}")
raise errors.CertificateError(str(rootex))
elif isinstance(rootex, ssl.SSLError):
# Raise a D-Bus SSL error
self.logger.error(f"Generic SSL error: {rootex}")
raise errors.SSLError(str(rootex))
else:
# Raise a D-Bus generic prelogin failure
raise errors.PreloginFailure(f"An unexpected error occurred: {rootex}")
# Pass XML parsing off to agent for security reasons
result = await self._manager.request_credentials(
AuthFlows.GLOBALPROTECT_PARSE_PRELOGIN, Variant("s", res.text)
)
if result.signature != "a{ss}":
# Make sure that the agent is well-behaved and is returning a valid response for this request.
raise errors.PreloginFailure(
"Received unexpected response from agent. Expected strdict (a{ss}), got "
f"{result.signature}"
)
return result.value
async def authenticate(self, options: dict[str, Variant]) -> Variant:
f"""{super().authenticate.__doc__}"""
preauth: dict[str, str] = await self._prelogin(options)
if "saml-auth-method" not in preauth or "saml-request" not in preauth:
raise errors.AuthFlowUnsupported(
"Server requested unsupported authentication flow. We currently only support SAML auth for GlobalProtect."
)
auth_flow = (
AuthFlows.GLOBALPROTECT_SAML_DEFAULT_BROWSER
if preauth.get("saml-default-browser") == "yes"
and preauth.get("saml-auth-method") == "REDIRECT"
else AuthFlows.GLOBALPROTECT_SAML_INTEGRATED
)
if (
auth_flow == AuthFlows.GLOBALPROTECT_SAML_DEFAULT_BROWSER
and "firefox_browser_container" in options
):
auth_result = await self._manager.request_credentials(
auth_flow,
Variant(
"(ss)",
(
base64.b64decode(preauth.get("saml-request")).decode("utf-8"),
options["firefox_browser_container"].value,
),
),
)
else:
auth_result = await self._manager.request_credentials(
auth_flow,
Variant(
"s", base64.b64decode(preauth.get("saml-request")).decode("utf-8")
),
)
if auth_result.signature != "a{ss}":
# Make sure the agent is well-behaved and is returning a valid response for this request.
raise errors.AuthenticationFailure(
"Received unexpected auth data. Expected strdict (a{ss}), got "
f"{auth_result.signature}"
)
if auth_result.value.get("saml-auth-status") != "1":
raise errors.AuthenticationFailure(
"Server indicates authentication failure"
)
# Guard to make sure the attributes we use to authenticate to the VPN are actually available.
if "prelogin-cookie" not in auth_result.value:
raise errors.InvalidResponse("Did not receive cookie from server")
if "saml-username" not in auth_result.value:
raise errors.InvalidResponse("Did not receive SAML username from server")
return auth_result
async def connect(self, options: dict[str, Variant], auth: Variant):
f"""{super().connect.__doc__}"""
if self._proc and self._proc.is_running():
self.logger.error(
"Cowardly refusing to connect to VPN while already connected."
)
raise errors.ConnectionFailure(
"Cowardly refusing to connect to VPN while already connected."
)
proc = await create_subprocess_exec(
"/usr/bin/openconnect",
"--protocol=gp",
f"--user={auth.value.get('saml-username')}",
"--useragent=PAN GlobalProtect",
f"--usergroup={options.get('login_target', Variant('s', LoginTarget.PORTAL)).value == LoginTarget.PORTAL and 'portal' or 'gateway'}:prelogin-cookie",
"--passwd-on-stdin",
options.get("hostname").value,
"-b",
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
)
self.logger.debug(f"Initial process PID: {proc.pid}")
# Send prelogin cookie
proc.stdin.write(auth.value.get("prelogin-cookie").encode())
proc.stdin.close()
# Create logger for openconnect
openconnect_logger = logging.getLogger(
f"{__name__}.GlobalProtectConnection.openconnect"
)
# Spawn task to bridge stderr to log
self._stderr_task = get_running_loop().create_task(
self._bridge_output(proc.stderr, openconnect_logger.warning)
)
# Bridge stdout to log and also search for PID of forked process, with a timeout of 10 seconds after which we assume a connection failure and terminate the process.
self._pid = None
try:
async with timeout(10):
async for line in proc.stdout:
decoded = line.decode("utf-8").strip()
if decoded == "":
continue
openconnect_logger.debug(decoded)
if match_ := re.match(
r"Continuing in background; pid ([0-9]+)", decoded
):
self._pid = int(match_.group(1))
break
except TimeoutError:
self.logger.error("Connection timed out")
proc.kill()
await proc.wait()
raise errors.ConnectionFailure("Connection timed out")
if self._pid is None:
# Never received PID of child, if we get here then openconnect exited without reporting a PID, indicating a failure to connect.
self.logger.error("VPN failed to connect")
raise errors.ConnectionFailure()
self.logger.debug(f"Got PID for openconnect: {self._pid}")
# Spawn task to bridge remainder of stdout to log
self._stdout_task = get_running_loop().create_task(
self._bridge_output(proc.stdout, openconnect_logger.debug)
)
self.logger.info("Connection established")
self.logger.debug(f"openconnect PID: {self._pid}")
self._proc = psutil.Process(self._pid)
self._wait_task = get_running_loop().create_task(self._wait_for_exit())
# Create a task that will wait for the process to exit so we don't leave zombies around
# (for some reason, proc.wait() waits until the streams are all closed as well :/)
self._proc_wait_task = get_running_loop().create_task(proc.wait())
async def _bridge_output(self, stream, processor):
async for line in stream:
decoded = line.decode("utf-8").strip()
if decoded == "":
# Don't log a blank line
continue
processor(line.decode("utf-8").strip())
self.logger.debug(f"Stream {stream} closed")
async def _wait_for_exit(self):
"""Wait for self._proc to exit."""
try:
while self._proc.is_running():
# Check process every 5 seconds
# I'd prefer a proper-async way to do this, but i guess this will work.
self.logger.debug("openconnect process is running")
await sleep(5)
self.logger.debug("openconnect has exited")
if not self._disconnecting:
# Only call on_disconnected if the disconnect was asynchronous
self._connection.on_disconnected()
self._disconnecting = False
except CancelledError:
self.logger.debug("exit wait canceled.")
async def disconnect(self):
f"""{super().disconnect.__doc__}"""
if self._proc is None:
self.logger.warn(
"Cowardly refusing to disconnect from a VPN that is not connected"
)
return
if self._wait_task is not None:
self._wait_task.cancel()
self._disconnecting = True
self._proc.terminate()
await self._proc_wait_task
@classmethod
def validate_options(cls, options: dict[str, Variant]):
# options must contain hostname and login target.
if "hostname" not in options:
raise errors.InvalidOptions.missing("hostname")
if "login_target" not in options:
raise errors.InvalidOptions.missing(
"login_target", details="What login endpoint to use."
)
# validate types
if sig := options["hostname"].signature != "s":
raise errors.InvalidOptions.invalid_type("hostname", "s", sig)
if sig := options["login_target"].signature != "s":
raise errors.InvalidOptions.invalid_type("login_target", "s", sig)
if (
sig := options.get("verify_certificate", Variant("b", True)).signature
!= "b"
):
raise errors.InvalidOptions.invalid_type("verify_certifcate", "b", sig)
if (
sig := options.get("allow_insecure_crypto", Variant("b", True)).signature
!= "b"
):
raise errors.InvalidOptions.invalid_type("verify_certificate", "b", sig)
if sig := options.get("spoof_clientos", Variant("s", "")).signature != "s":
raise errors.InvalidOptions.invalid_type("spoof_clientos", "s", sig)
if (
sig := options.get("use_default_browser", Variant("b", True)).signature
!= "b"
):
raise errors.InvalidOptions.invalid_type("use_default_browser", "b", sig)
@classmethod
def restore_options(cls, options: dict) -> dict[str, Variant]:
result = {}
cls.put_value(result, "s", options, "hostname")
cls.put_value(result, "s", options, "login_target")
cls.put_value(result, "s", options, "spoof_clientos")
cls.put_value(result, "b", options, "verify_certificate")
cls.put_value(result, "b", options, "allow_insecure_crypto")
cls.put_value(result, "b", options, "use_default_browser")