288 lines
9.1 KiB
Python
Executable File
288 lines
9.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import trparse
|
|
from subprocess import run, PIPE, DEVNULL
|
|
import json
|
|
import sys
|
|
import os
|
|
import re
|
|
from time import sleep, time as now
|
|
from ipaddress import (
|
|
IPv4Address as IPAddress,
|
|
IPv4Network as IPNetwork,
|
|
IPv4Interface as IPInterface,
|
|
AddressValueError,
|
|
)
|
|
from netifaces import interfaces, ifaddresses, AF_INET, AF_LINK, gateways
|
|
import dbus
|
|
from datetime import datetime
|
|
|
|
|
|
# These are networks that are considered secure. All traffic being routed
|
|
# through one of these networks is being encrypted and routed through an
|
|
# anonymizing service.
|
|
SECURE_FIRSTHOP_NETWORKS = {
|
|
"home": IPNetwork("10.242.0.0/16"), # Personal network
|
|
"mullvad": IPNetwork("10.64.0.1/32"), # Mullvad VPN
|
|
"usu": IPNetwork("129.123.8.126/32"), # USU VPN
|
|
}
|
|
|
|
# Interfaces that we should check
|
|
DEFAULT_INTERFACES_TO_WATCH = ["lan", "ezrinet"]
|
|
# Interfaces that we should use when determining if we are online
|
|
DEFAULT_INTERFACES_TO_COUNT = ["lan"]
|
|
# Overrides for IP addresses to ping to determine if an interface is
|
|
# online This is useful for interfaces that don't have or report a
|
|
# gateway, but we still want to check if they're online, such as the
|
|
# ezrinet interface.
|
|
INTERFACE_PING_OVERRIDES = {"ezrinet": IPAddress("10.242.3.1")}
|
|
|
|
INTERFACES_TO_WATCH = os.environ.get(
|
|
"NETWORK_INTERFACES_TO_WATCH", ",".join(DEFAULT_INTERFACES_TO_WATCH)
|
|
).split(",")
|
|
INTERFACES_TO_COUNT = os.environ.get(
|
|
"NETWORK_INTERFACES_TO_COUNT", ",".join(DEFAULT_INTERFACES_TO_COUNT)
|
|
).split(",")
|
|
|
|
for key, value in os.environ.items():
|
|
if key[: len("NETWORK_PING_OVERRIDE_")] == "NETWORK_PING_OVERRIDE_":
|
|
INTERFACE_PING_OVERRIDES[key[len("NETWORK_PING_OVERRIDE_") :]] = IPAddress(
|
|
value
|
|
)
|
|
|
|
|
|
class Interface:
|
|
name: str
|
|
mac: str
|
|
addresses: list[IPInterface]
|
|
gateway: IPAddress | None
|
|
|
|
@property
|
|
def online(self):
|
|
# If we have no addresses, we're not online
|
|
if len(self.addresses) == 0:
|
|
return False
|
|
# If we have an override, ping that
|
|
if self.name in INTERFACE_PING_OVERRIDES:
|
|
return ping(INTERFACE_PING_OVERRIDES[self.name])
|
|
# If we have no gateway, this interface is likely not intended
|
|
# to route beyond the local network, so we'll assume it's
|
|
# online, at least for whatever it's used for.
|
|
if self.gateway is None:
|
|
return True
|
|
return ping(self.gateway)
|
|
|
|
def asdict(self):
|
|
return {
|
|
"name": self.name,
|
|
"mac": self.mac,
|
|
"addresses": [
|
|
{
|
|
"address": str(addr.ip),
|
|
"netmask": str(addr.netmask),
|
|
"network": str(addr.network),
|
|
"prefixlen": addr._prefixlen,
|
|
}
|
|
for addr in self.addresses
|
|
],
|
|
"online": self.online,
|
|
}
|
|
|
|
|
|
def get_first_hop() -> IPAddress | None:
|
|
"""Get the first network hop."""
|
|
# Use ping to get the first hop
|
|
cmd = ["/usr/bin/ping", "-c1", "-W0.3", "-t1", "1.1.1.1"]
|
|
result = run(cmd, stdout=PIPE, stderr=PIPE)
|
|
try:
|
|
ip = IPAddress(result.stdout.decode("utf-8").split("\n")[1].split()[1])
|
|
except (IndexError, AddressValueError):
|
|
# If we can't parse the output, return None
|
|
return None
|
|
return ip
|
|
|
|
|
|
def validate_first_hop_is_secure(first_hop: IPAddress):
|
|
"""Check if the first hop is in a secure network."""
|
|
for name, network in SECURE_FIRSTHOP_NETWORKS.items():
|
|
if first_hop in network:
|
|
return name
|
|
|
|
|
|
def get_gateways():
|
|
"""Get gateways on each interface."""
|
|
gw = gateways().get(AF_INET, [])
|
|
result = {}
|
|
for iface in INTERFACES_TO_WATCH:
|
|
for gateway in gw:
|
|
if gateway[1] == iface:
|
|
result[iface] = gateway[0]
|
|
|
|
return result
|
|
|
|
|
|
def interface_status(interface: str, gw):
|
|
"""Get the status of an interface."""
|
|
try:
|
|
addrs = ifaddresses(interface)
|
|
except:
|
|
addrs = {}
|
|
result = Interface()
|
|
result.name = interface
|
|
result.gateway = gw.get(interface, None)
|
|
if AF_LINK in addrs:
|
|
result.mac = addrs[AF_LINK][0]["addr"]
|
|
else:
|
|
result.mac = None
|
|
if AF_INET in addrs:
|
|
result.addresses = [
|
|
IPInterface(f'{addr["addr"]}/{addr["netmask"]}')
|
|
for addr in addrs.get(AF_INET, [])
|
|
]
|
|
else:
|
|
result.addresses = []
|
|
return result
|
|
|
|
|
|
def ping(host: IPAddress) -> bool:
|
|
cmd = ["/usr/bin/ping", "-c1", "-w1", str(host)]
|
|
result = run(cmd, stdout=DEVNULL, stderr=DEVNULL)
|
|
return result.returncode == 0
|
|
|
|
|
|
def get_public_ip():
|
|
"""Get the public IP address."""
|
|
cmd = ["/usr/bin/curl", "-s", "https://ipinfo.io"]
|
|
result = run(cmd, stdout=PIPE, stderr=PIPE)
|
|
try:
|
|
data = json.loads(result.stdout.decode("utf-8"))
|
|
except (IndexError, ValueError):
|
|
return None
|
|
try:
|
|
# If the IP address is invalid, don't return anything
|
|
IPAddress(data["ip"])
|
|
except (AddressValueError, KeyError):
|
|
if data.get("status", None) == 429:
|
|
# We're rate limited, so return something indicating that
|
|
return {"rate_limited": True}
|
|
return None
|
|
return data
|
|
|
|
|
|
def get_default_route():
|
|
"""Get the default route."""
|
|
cmd = ["/usr/bin/ip", "route", "show", "default"]
|
|
result = run(cmd, stdout=PIPE, stderr=PIPE)
|
|
try:
|
|
# Get first line (might have multiple gateway routes)
|
|
line = result.stdout.decode("utf-8").split("\n")[0]
|
|
# Get the gateway link (following "dev")
|
|
link = re.search(r"dev\s+(\S+)", line).group(1)
|
|
# We already know the gateway IP based on our firsthop check earlier, and that's more reliable since it will respect any routing rules
|
|
except (IndexError, AttributeError):
|
|
return None
|
|
return link
|
|
|
|
|
|
def format_time(time: datetime) -> dict[str, str | int]:
|
|
"""Format a datetime object for display."""
|
|
return {
|
|
"hour": f"{time.hour:02}",
|
|
"minute": f"{time.minute:02}",
|
|
"second": f"{time.second:02}",
|
|
"year": f"{time.year:04}",
|
|
"month": f"{time.month:02}",
|
|
"day": f"{time.day:02}",
|
|
"unix": int(time.timestamp()),
|
|
}
|
|
|
|
|
|
# system_bus = dbus.SystemBus()
|
|
# networkd = system_bus.get_object(
|
|
# "org.freedesktop.network1", "/org/freedesktop/network1"
|
|
# )
|
|
# manager = dbus.Interface(networkd, "org.freedesktop.network1.Manager")
|
|
|
|
|
|
# def get_dbus_interfaces():
|
|
# for iface in INTERFACES_TO_WATCH:
|
|
# dbus_object = system_bus.get_object(
|
|
# "org.freedesktop.network1", manager.GetLinkByName(iface)
|
|
# )
|
|
# dbus_interface = dbus.Interface(dbus_object, "org.freedesktop.network1.Link")
|
|
# yield dbus_interface
|
|
|
|
last_default_route = None
|
|
last_ip_data = None
|
|
last_request = None
|
|
last_first_hop = None
|
|
|
|
while True:
|
|
start = now()
|
|
try:
|
|
online = ping("1.1.1.1")
|
|
except:
|
|
online = False
|
|
hop = get_first_hop()
|
|
gw = get_gateways()
|
|
default_route = get_default_route()
|
|
# public IP shouldn't change often, so only check every 30 minutes or
|
|
# if the default route or first hop changes
|
|
if (
|
|
default_route != last_default_route
|
|
or last_ip_data is None
|
|
or last_request is None
|
|
or now() - last_request > 1800
|
|
or (hop != last_first_hop and hop is not None)
|
|
):
|
|
print("refreshing public IP", file=sys.stderr, flush=True)
|
|
public_ip_data = get_public_ip()
|
|
if public_ip_data is None:
|
|
print("failed to get public IP", file=sys.stderr, flush=True)
|
|
elif public_ip_data.get("rate_limited", False):
|
|
last_ip_data = public_ip_data
|
|
public_ip_data = None
|
|
else:
|
|
last_ip_data = public_ip_data
|
|
last_default_route = default_route
|
|
last_request = now()
|
|
last_first_hop = hop
|
|
else:
|
|
public_ip_data = last_ip_data
|
|
if hop is not None:
|
|
secure_msg = validate_first_hop_is_secure(hop)
|
|
secure = secure_msg is not None
|
|
else:
|
|
# If we can't reach the router, assume insecure
|
|
secure = False
|
|
iface_data = [interface_status(iface, gw) for iface in INTERFACES_TO_WATCH]
|
|
iface_dict = {iface.name: iface.asdict() for iface in iface_data}
|
|
|
|
default_route_iface_data = interface_status(default_route, gw)
|
|
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"online": online,
|
|
"secure": secure,
|
|
"interfaces": iface_dict,
|
|
"public_ip": public_ip_data or {},
|
|
"have_public_ip": public_ip_data is not None and "ip" in public_ip_data,
|
|
"default_route": default_route,
|
|
"default_interface": default_route_iface_data.asdict(),
|
|
"have_default_route": default_route is not None,
|
|
"gateway": str(hop),
|
|
"have_gateway": hop is not None,
|
|
"last_update": format_time(datetime.now()),
|
|
}
|
|
),
|
|
flush=True,
|
|
)
|
|
|
|
end = now()
|
|
delta = end - start
|
|
if delta > 5:
|
|
print("Warning: took longer than 5 seconds to run", file=sys.stderr, flush=True)
|
|
else:
|
|
sleep(5 - delta)
|