#!/usr/bin/env python3 import trparse from subprocess import run, PIPE, DEVNULL import json import sys import os import re from time import sleep, time as now from ipaddress import ( IPv4Address as IPAddress, IPv4Network as IPNetwork, IPv4Interface as IPInterface, AddressValueError, ) from netifaces import interfaces, ifaddresses, AF_INET, AF_LINK, gateways import dbus from datetime import datetime # These are networks that are considered secure. All traffic being routed # through one of these networks is being encrypted and routed through an # anonymizing service. SECURE_FIRSTHOP_NETWORKS = { "home": IPNetwork("10.242.0.0/16"), # Personal network "mullvad": IPNetwork("10.64.0.1/32"), # Mullvad VPN "usu": IPNetwork("129.123.8.126/32"), # USU VPN } # Interfaces that we should check DEFAULT_INTERFACES_TO_WATCH = ["lan", "ezrinet"] # Interfaces that we should use when determining if we are online DEFAULT_INTERFACES_TO_COUNT = ["lan"] # Overrides for IP addresses to ping to determine if an interface is # online This is useful for interfaces that don't have or report a # gateway, but we still want to check if they're online, such as the # ezrinet interface. INTERFACE_PING_OVERRIDES = {"ezrinet": IPAddress("10.242.3.1")} INTERFACES_TO_WATCH = os.environ.get( "NETWORK_INTERFACES_TO_WATCH", ",".join(DEFAULT_INTERFACES_TO_WATCH) ).split(",") INTERFACES_TO_COUNT = os.environ.get( "NETWORK_INTERFACES_TO_COUNT", ",".join(DEFAULT_INTERFACES_TO_COUNT) ).split(",") for key, value in os.environ.items(): if key[: len("NETWORK_PING_OVERRIDE_")] == "NETWORK_PING_OVERRIDE_": INTERFACE_PING_OVERRIDES[key[len("NETWORK_PING_OVERRIDE_") :]] = IPAddress( value ) class Interface: name: str mac: str addresses: list[IPInterface] gateway: IPAddress | None @property def online(self): # If we have no addresses, we're not online if len(self.addresses) == 0: return False # If we have an override, ping that if self.name in INTERFACE_PING_OVERRIDES: return ping(INTERFACE_PING_OVERRIDES[self.name]) # If we have no gateway, this interface is likely not intended # to route beyond the local network, so we'll assume it's # online, at least for whatever it's used for. if self.gateway is None: return True return ping(self.gateway) def asdict(self): return { "name": self.name, "mac": self.mac, "addresses": [ { "address": str(addr.ip), "netmask": str(addr.netmask), "network": str(addr.network), "prefixlen": addr._prefixlen, } for addr in self.addresses ], "online": self.online, } def get_first_hop() -> IPAddress | None: """Get the first network hop.""" # Use ping to get the first hop cmd = ["ping", "-c1", "-W0.3", "-t1", "1.1.1.1"] result = run(cmd, stdout=PIPE, stderr=PIPE) try: ip = IPAddress(result.stdout.decode("utf-8").split("\n")[1].split()[1]) except (IndexError, AddressValueError): # If we can't parse the output, return None return None return ip def validate_first_hop_is_secure(first_hop: IPAddress): """Check if the first hop is in a secure network.""" for name, network in SECURE_FIRSTHOP_NETWORKS.items(): if first_hop in network: return name def get_gateways(): """Get gateways on each interface.""" gw = gateways().get(AF_INET, []) result = {} for iface in INTERFACES_TO_WATCH: for gateway in gw: if gateway[1] == iface: result[iface] = gateway[0] return result def interface_status(interface: str, gw): """Get the status of an interface.""" try: addrs = ifaddresses(interface) except: addrs = {} result = Interface() result.name = interface result.gateway = gw.get(interface, None) if AF_LINK in addrs: result.mac = addrs[AF_LINK][0]["addr"] else: result.mac = None if AF_INET in addrs: result.addresses = [ IPInterface(f'{addr["addr"]}/{addr["netmask"]}') for addr in addrs.get(AF_INET, []) ] else: result.addresses = [] return result def ping(host: IPAddress) -> bool: cmd = ["ping", "-c1", "-w1", str(host)] result = run(cmd, stdout=DEVNULL, stderr=DEVNULL) return result.returncode == 0 def get_public_ip(): """Get the public IP address.""" cmd = ["curl", "-s", "https://ipinfo.io"] result = run(cmd, stdout=PIPE, stderr=PIPE) try: data = json.loads(result.stdout.decode("utf-8")) except (IndexError, ValueError): return None try: # If the IP address is invalid, don't return anything IPAddress(data["ip"]) except (AddressValueError, KeyError): if data.get("status", None) == 429: # We're rate limited, so return something indicating that return {"rate_limited": True} return None return data def get_default_route(): """Get the default route.""" cmd = ["ip", "route", "show", "default"] result = run(cmd, stdout=PIPE, stderr=PIPE) try: # Get first line (might have multiple gateway routes) line = result.stdout.decode("utf-8").split("\n")[0] # Get the gateway link (following "dev") link = re.search(r"dev\s+(\S+)", line).group(1) # We already know the gateway IP based on our firsthop check earlier, and that's more reliable since it will respect any routing rules except (IndexError, AttributeError): return None return link def format_time(time: datetime) -> dict[str, str | int]: """Format a datetime object for display.""" return { "hour": f"{time.hour:02}", "minute": f"{time.minute:02}", "second": f"{time.second:02}", "year": f"{time.year:04}", "month": f"{time.month:02}", "day": f"{time.day:02}", "unix": int(time.timestamp()), } # system_bus = dbus.SystemBus() # networkd = system_bus.get_object( # "org.freedesktop.network1", "/org/freedesktop/network1" # ) # manager = dbus.Interface(networkd, "org.freedesktop.network1.Manager") # def get_dbus_interfaces(): # for iface in INTERFACES_TO_WATCH: # dbus_object = system_bus.get_object( # "org.freedesktop.network1", manager.GetLinkByName(iface) # ) # dbus_interface = dbus.Interface(dbus_object, "org.freedesktop.network1.Link") # yield dbus_interface runtime_dir = os.environ.get("XDG_RUNTIME_DIR", "/tmp") def load_last_data(var_name: str): try: if os.path.exists(f"{runtime_dir}/{var_name}.json"): with open(f"{runtime_dir}/{var_name}.json", "r") as f: return json.load(f), os.path.getmtime(f"{runtime_dir}/{var_name}.json") except: pass return None, None def store_last_data(var_name: str, data): with open(f"{runtime_dir}/{var_name}.json", "w") as f: json.dump(data, f) last_default_route = load_last_data("default_route")[0] last_ip_data, last_request = load_last_data("ip_data") try: last_first_hop = IPAddress(load_last_data("first_hop")[0]) except: last_first_hop = None while True: try: online = ping("1.1.1.1") except: online = False hop = get_first_hop() gw = get_gateways() default_route = get_default_route() # public IP shouldn't change often, so only check every 2 hours or # if the default route or first hop changes if ( default_route != last_default_route or last_ip_data is None or last_request is None or now() - last_request > (2 * 60 * 60) or (hop != last_first_hop and hop is not None) ): print("refreshing public IP", file=sys.stderr, flush=True) public_ip_data = get_public_ip() if public_ip_data is None: print("failed to get public IP", file=sys.stderr, flush=True) elif public_ip_data.get("rate_limited", False): last_ip_data = public_ip_data public_ip_data = None else: last_ip_data = public_ip_data # Write the public IP data to a file store_last_data("ip_data", public_ip_data) last_default_route = default_route store_last_data("default_route", default_route) last_request = now() last_first_hop = hop store_last_data("first_hop", str(hop)) else: public_ip_data = last_ip_data if hop is not None: secure_msg = validate_first_hop_is_secure(hop) secure = secure_msg is not None else: # If we can't reach the router, assume insecure secure = False iface_data = [interface_status(iface, gw) for iface in INTERFACES_TO_WATCH] iface_dict = {iface.name: iface.asdict() for iface in iface_data} default_route_iface_data = interface_status(default_route, gw) print( json.dumps( { "online": online, "secure": secure, "interfaces": iface_dict, "public_ip": public_ip_data or {}, "have_public_ip": public_ip_data is not None and "ip" in public_ip_data, "default_route": default_route, "default_interface": default_route_iface_data.asdict(), "have_default_route": default_route is not None, "gateway": str(hop), "have_gateway": hop is not None, "last_update": format_time(datetime.now()), } ), flush=True, ) sleep(5)