dotfiles/.config/eww/scripts/network.py##hostname.gathering-storm

307 lines
9.7 KiB
Python
Executable File

#!/usr/bin/env python3
import trparse
from subprocess import run, PIPE, DEVNULL
import json
import sys
import os
import re
from time import sleep, time as now
from ipaddress import (
IPv4Address as IPAddress,
IPv4Network as IPNetwork,
IPv4Interface as IPInterface,
AddressValueError,
)
from netifaces import interfaces, ifaddresses, AF_INET, AF_LINK, gateways
import dbus
from datetime import datetime
# These are networks that are considered secure. All traffic being routed
# through one of these networks is being encrypted and routed through an
# anonymizing service.
SECURE_FIRSTHOP_NETWORKS = {
"home": IPNetwork("10.242.0.0/16"), # Personal network
"mullvad": IPNetwork("10.64.0.1/32"), # Mullvad VPN
"usu": IPNetwork("129.123.8.126/32"), # USU VPN
}
# Interfaces that we should check
DEFAULT_INTERFACES_TO_WATCH = ["lan", "ezrinet"]
# Interfaces that we should use when determining if we are online
DEFAULT_INTERFACES_TO_COUNT = ["lan"]
# Overrides for IP addresses to ping to determine if an interface is
# online This is useful for interfaces that don't have or report a
# gateway, but we still want to check if they're online, such as the
# ezrinet interface.
INTERFACE_PING_OVERRIDES = {"ezrinet": IPAddress("10.242.3.1")}
INTERFACES_TO_WATCH = os.environ.get(
"NETWORK_INTERFACES_TO_WATCH", ",".join(DEFAULT_INTERFACES_TO_WATCH)
).split(",")
INTERFACES_TO_COUNT = os.environ.get(
"NETWORK_INTERFACES_TO_COUNT", ",".join(DEFAULT_INTERFACES_TO_COUNT)
).split(",")
for key, value in os.environ.items():
if key[: len("NETWORK_PING_OVERRIDE_")] == "NETWORK_PING_OVERRIDE_":
INTERFACE_PING_OVERRIDES[key[len("NETWORK_PING_OVERRIDE_") :]] = IPAddress(
value
)
class Interface:
name: str
mac: str
addresses: list[IPInterface]
gateway: IPAddress | None
@property
def online(self):
# If we have no addresses, we're not online
if len(self.addresses) == 0:
return False
# If we have an override, ping that
if self.name in INTERFACE_PING_OVERRIDES:
return ping(INTERFACE_PING_OVERRIDES[self.name])
# If we have no gateway, this interface is likely not intended
# to route beyond the local network, so we'll assume it's
# online, at least for whatever it's used for.
if self.gateway is None:
return True
return ping(self.gateway)
def asdict(self):
return {
"name": self.name,
"mac": self.mac,
"addresses": [
{
"address": str(addr.ip),
"netmask": str(addr.netmask),
"network": str(addr.network),
"prefixlen": addr._prefixlen,
}
for addr in self.addresses
],
"online": self.online,
}
def get_first_hop() -> IPAddress | None:
"""Get the first network hop."""
# Use ping to get the first hop
cmd = ["/usr/bin/ping", "-c1", "-W0.3", "-t1", "1.1.1.1"]
result = run(cmd, stdout=PIPE, stderr=PIPE)
try:
ip = IPAddress(result.stdout.decode("utf-8").split("\n")[1].split()[1])
except (IndexError, AddressValueError):
# If we can't parse the output, return None
return None
return ip
def validate_first_hop_is_secure(first_hop: IPAddress):
"""Check if the first hop is in a secure network."""
for name, network in SECURE_FIRSTHOP_NETWORKS.items():
if first_hop in network:
return name
def get_gateways():
"""Get gateways on each interface."""
gw = gateways().get(AF_INET, [])
result = {}
for iface in INTERFACES_TO_WATCH:
for gateway in gw:
if gateway[1] == iface:
result[iface] = gateway[0]
return result
def interface_status(interface: str, gw):
"""Get the status of an interface."""
try:
addrs = ifaddresses(interface)
except:
addrs = {}
result = Interface()
result.name = interface
result.gateway = gw.get(interface, None)
if AF_LINK in addrs:
result.mac = addrs[AF_LINK][0]["addr"]
else:
result.mac = None
if AF_INET in addrs:
result.addresses = [
IPInterface(f'{addr["addr"]}/{addr["netmask"]}')
for addr in addrs.get(AF_INET, [])
]
else:
result.addresses = []
return result
def ping(host: IPAddress) -> bool:
cmd = ["/usr/bin/ping", "-c1", "-w1", str(host)]
result = run(cmd, stdout=DEVNULL, stderr=DEVNULL)
return result.returncode == 0
def get_public_ip():
"""Get the public IP address."""
cmd = ["/usr/bin/curl", "-s", "https://ipinfo.io"]
result = run(cmd, stdout=PIPE, stderr=PIPE)
try:
data = json.loads(result.stdout.decode("utf-8"))
except (IndexError, ValueError):
return None
try:
# If the IP address is invalid, don't return anything
IPAddress(data["ip"])
except (AddressValueError, KeyError):
if data.get("status", None) == 429:
# We're rate limited, so return something indicating that
return {"rate_limited": True}
return None
return data
def get_default_route():
"""Get the default route."""
cmd = ["/usr/bin/ip", "route", "show", "default"]
result = run(cmd, stdout=PIPE, stderr=PIPE)
try:
# Get first line (might have multiple gateway routes)
line = result.stdout.decode("utf-8").split("\n")[0]
# Get the gateway link (following "dev")
link = re.search(r"dev\s+(\S+)", line).group(1)
# We already know the gateway IP based on our firsthop check earlier, and that's more reliable since it will respect any routing rules
except (IndexError, AttributeError):
return None
return link
def format_time(time: datetime) -> dict[str, str | int]:
"""Format a datetime object for display."""
return {
"hour": f"{time.hour:02}",
"minute": f"{time.minute:02}",
"second": f"{time.second:02}",
"year": f"{time.year:04}",
"month": f"{time.month:02}",
"day": f"{time.day:02}",
"unix": int(time.timestamp()),
}
# system_bus = dbus.SystemBus()
# networkd = system_bus.get_object(
# "org.freedesktop.network1", "/org/freedesktop/network1"
# )
# manager = dbus.Interface(networkd, "org.freedesktop.network1.Manager")
# def get_dbus_interfaces():
# for iface in INTERFACES_TO_WATCH:
# dbus_object = system_bus.get_object(
# "org.freedesktop.network1", manager.GetLinkByName(iface)
# )
# dbus_interface = dbus.Interface(dbus_object, "org.freedesktop.network1.Link")
# yield dbus_interface
runtime_dir = os.environ.get("XDG_RUNTIME_DIR", "/tmp")
def load_last_data(var_name: str):
try:
if os.path.exists(f"{runtime_dir}/{var_name}.json"):
with open(f"{runtime_dir}/{var_name}.json", "r") as f:
return json.load(f), os.path.getmtime(f"{runtime_dir}/{var_name}.json")
except:
pass
return None, None
def store_last_data(var_name: str, data):
with open(f"{runtime_dir}/{var_name}.json", "w") as f:
json.dump(data, f)
last_default_route = load_last_data("default_route")[0]
last_ip_data, last_request = load_last_data("ip_data")
try:
last_first_hop = IPAddress(load_last_data("first_hop")[0])
except:
last_first_hop = None
while True:
try:
online = ping("1.1.1.1")
except:
online = False
hop = get_first_hop()
gw = get_gateways()
default_route = get_default_route()
# public IP shouldn't change often, so only check every 2 hours or
# if the default route or first hop changes
if (
default_route != last_default_route
or last_ip_data is None
or last_request is None
or now() - last_request > (2 * 60 * 60)
or (hop != last_first_hop and hop is not None)
):
print("refreshing public IP", file=sys.stderr, flush=True)
public_ip_data = get_public_ip()
if public_ip_data is None:
print("failed to get public IP", file=sys.stderr, flush=True)
elif public_ip_data.get("rate_limited", False):
last_ip_data = public_ip_data
public_ip_data = None
else:
last_ip_data = public_ip_data
# Write the public IP data to a file
store_last_data("ip_data", public_ip_data)
last_default_route = default_route
store_last_data("default_route", default_route)
last_request = now()
last_first_hop = hop
store_last_data("first_hop", str(hop))
else:
public_ip_data = last_ip_data
if hop is not None:
secure_msg = validate_first_hop_is_secure(hop)
secure = secure_msg is not None
else:
# If we can't reach the router, assume insecure
secure = False
iface_data = [interface_status(iface, gw) for iface in INTERFACES_TO_WATCH]
iface_dict = {iface.name: iface.asdict() for iface in iface_data}
default_route_iface_data = interface_status(default_route, gw)
print(
json.dumps(
{
"online": online,
"secure": secure,
"interfaces": iface_dict,
"public_ip": public_ip_data or {},
"have_public_ip": public_ip_data is not None and "ip" in public_ip_data,
"default_route": default_route,
"default_interface": default_route_iface_data.asdict(),
"have_default_route": default_route is not None,
"gateway": str(hop),
"have_gateway": hop is not None,
"last_update": format_time(datetime.now()),
}
),
flush=True,
)
sleep(5)