""" Network management for DECNET. Handles: - Auto-detection of the host's active interface + subnet + gateway - MACVLAN Docker network creation - Host-side macvlan interface (hairpin fix so the deployer can reach deckies) - IP allocation (sequential, skipping reserved addresses) """ import os import subprocess # nosec B404 from ipaddress import IPv4Address, IPv4Interface, IPv4Network import docker MACVLAN_NETWORK_NAME = "decnet_lan" HOST_MACVLAN_IFACE = "decnet_macvlan0" HOST_IPVLAN_IFACE = "decnet_ipvlan0" # --------------------------------------------------------------------------- # Interface / subnet auto-detection # --------------------------------------------------------------------------- def _run(cmd: list[str], check: bool = True) -> subprocess.CompletedProcess: return subprocess.run(cmd, capture_output=True, text=True, check=check) # nosec B603 B404 def detect_interface() -> str: """Return the name of the default outbound interface.""" result = _run(["ip", "route", "show", "default"]) for line in result.stdout.splitlines(): parts = line.split() if "dev" in parts: return parts[parts.index("dev") + 1] raise RuntimeError("Could not auto-detect network interface. Use --interface.") def detect_subnet(interface: str) -> tuple[str, str]: """ Return (subnet_cidr, gateway) for the given interface. e.g. ("192.168.1.0/24", "192.168.1.1") """ result = _run(["ip", "addr", "show", interface]) subnet_cidr = None for line in result.stdout.splitlines(): line = line.strip() if line.startswith("inet ") and not line.startswith("inet6"): # e.g. "inet 192.168.1.5/24 brd 192.168.1.255 scope global eth0" addr_cidr = line.split()[1] iface = IPv4Interface(addr_cidr) subnet_cidr = str(iface.network) break if subnet_cidr is None: raise RuntimeError(f"Could not detect subnet for interface {interface}.") gw_result = _run(["ip", "route", "show", "default"]) gateway = None for line in gw_result.stdout.splitlines(): parts = line.split() if "via" in parts: gateway = parts[parts.index("via") + 1] break if gateway is None: raise RuntimeError("Could not detect gateway.") return subnet_cidr, gateway def get_host_ip(interface: str) -> str: """Return the host's IP on the given interface.""" result = _run(["ip", "addr", "show", interface]) for line in result.stdout.splitlines(): line = line.strip() if line.startswith("inet ") and not line.startswith("inet6"): return line.split()[1].split("/")[0] raise RuntimeError(f"Could not determine host IP for interface {interface}.") # --------------------------------------------------------------------------- # IP allocation # --------------------------------------------------------------------------- def allocate_ips( subnet: str, gateway: str, host_ip: str, count: int, ip_start: str | None = None, ) -> list[str]: """ Return a list of `count` available IPs from the subnet, skipping network addr, broadcast, gateway, and host IP. Starts from ip_start if given, else from the first usable host. """ net = IPv4Network(subnet, strict=False) reserved = { net.network_address, net.broadcast_address, IPv4Address(gateway), IPv4Address(host_ip), } start_addr = IPv4Address(ip_start) if ip_start else net.network_address + 1 allocated: list[str] = [] for addr in net.hosts(): if addr < start_addr: continue if addr in reserved: continue allocated.append(str(addr)) if len(allocated) == count: break if len(allocated) < count: raise RuntimeError( f"Not enough free IPs in {subnet} for {count} deckies " f"(found {len(allocated)})." ) return allocated # --------------------------------------------------------------------------- # Docker MACVLAN network # --------------------------------------------------------------------------- def _ensure_network( client: docker.DockerClient, *, driver: str, interface: str, subnet: str, gateway: str, ip_range: str, extra_options: dict | None = None, ) -> None: """Create the decnet docker network with ``driver``, replacing any existing network of the same name that was built with a different driver. Why the replace-on-driver-mismatch: macvlan and ipvlan slaves can't coexist on the same parent interface. If an earlier run left behind a macvlan-driver network and we're now asked for ipvlan (or vice versa), short-circuiting on name alone leaves Docker attaching new containers to the old driver and the host NIC ends up EBUSY on the next port create. So: when driver disagrees, disconnect everything and DROP it. """ options = {"parent": interface} if extra_options: options.update(extra_options) for net in client.networks.list(names=[MACVLAN_NETWORK_NAME]): if net.attrs.get("Driver") == driver: # Same driver — but if the IPAM pool drifted (different subnet, # gateway, or ip-range than this deploy asks for), reusing it # hands out addresses from the old pool and we race the real LAN. # Compare and rebuild on mismatch. pools = (net.attrs.get("IPAM") or {}).get("Config") or [] cur = pools[0] if pools else {} if ( cur.get("Subnet") == subnet and cur.get("Gateway") == gateway and cur.get("IPRange") == ip_range ): return # right driver AND matching pool, leave it alone # Driver mismatch OR IPAM drift — tear it down. Disconnect any live # containers first so `remove()` doesn't refuse with ErrNetworkInUse. for cid in (net.attrs.get("Containers") or {}): try: net.disconnect(cid, force=True) except docker.errors.APIError: pass net.remove() client.networks.create( name=MACVLAN_NETWORK_NAME, driver=driver, options=options, ipam=docker.types.IPAMConfig( driver="default", pool_configs=[ docker.types.IPAMPool( subnet=subnet, gateway=gateway, iprange=ip_range, ) ], ), ) def create_macvlan_network( client: docker.DockerClient, interface: str, subnet: str, gateway: str, ip_range: str, ) -> None: """Create the MACVLAN Docker network, replacing an ipvlan-driver one of the same name if necessary (parent-NIC can't host both drivers).""" _ensure_network( client, driver="macvlan", interface=interface, subnet=subnet, gateway=gateway, ip_range=ip_range, ) def create_ipvlan_network( client: docker.DockerClient, interface: str, subnet: str, gateway: str, ip_range: str, ) -> None: """Create an IPvlan L2 Docker network, replacing a macvlan-driver one of the same name if necessary (parent-NIC can't host both drivers).""" _ensure_network( client, driver="ipvlan", interface=interface, subnet=subnet, gateway=gateway, ip_range=ip_range, extra_options={"ipvlan_mode": "l2"}, ) def remove_macvlan_network(client: docker.DockerClient) -> None: nets = [n for n in client.networks.list() if n.name == MACVLAN_NETWORK_NAME] for n in nets: n.remove() # --------------------------------------------------------------------------- # Host-side macvlan interface (hairpin fix) # --------------------------------------------------------------------------- def _require_root() -> None: if os.geteuid() != 0: raise PermissionError( "MACVLAN host-side interface setup requires root. Run with sudo." ) def setup_host_macvlan(interface: str, host_macvlan_ip: str, decky_ip_range: str) -> None: """ Create a macvlan interface on the host so the deployer can reach deckies. Idempotent — skips steps that are already done. Drops a stale ipvlan host-helper first: the two drivers can share a parent NIC on paper but leaving the opposite helper in place is just cruft after a driver swap. """ _require_root() _run(["ip", "link", "del", HOST_IPVLAN_IFACE], check=False) # Check if interface already exists result = _run(["ip", "link", "show", HOST_MACVLAN_IFACE], check=False) if result.returncode != 0: _run(["ip", "link", "add", HOST_MACVLAN_IFACE, "link", interface, "type", "macvlan", "mode", "bridge"]) _run(["ip", "addr", "add", f"{host_macvlan_ip}/32", "dev", HOST_MACVLAN_IFACE], check=False) _run(["ip", "link", "set", HOST_MACVLAN_IFACE, "up"]) _run(["ip", "route", "add", decky_ip_range, "dev", HOST_MACVLAN_IFACE], check=False) def teardown_host_macvlan(decky_ip_range: str) -> None: _require_root() _run(["ip", "route", "del", decky_ip_range, "dev", HOST_MACVLAN_IFACE], check=False) _run(["ip", "link", "del", HOST_MACVLAN_IFACE], check=False) def setup_host_ipvlan(interface: str, host_ipvlan_ip: str, decky_ip_range: str) -> None: """ Create an IPvlan interface on the host so the deployer can reach deckies. Idempotent — skips steps that are already done. Drops a stale macvlan host-helper first so a prior macvlan deploy doesn't leave its slave dangling on the parent NIC after the driver swap. """ _require_root() _run(["ip", "link", "del", HOST_MACVLAN_IFACE], check=False) result = _run(["ip", "link", "show", HOST_IPVLAN_IFACE], check=False) if result.returncode != 0: _run(["ip", "link", "add", HOST_IPVLAN_IFACE, "link", interface, "type", "ipvlan", "mode", "l2"]) _run(["ip", "addr", "add", f"{host_ipvlan_ip}/32", "dev", HOST_IPVLAN_IFACE], check=False) _run(["ip", "link", "set", HOST_IPVLAN_IFACE, "up"]) _run(["ip", "route", "add", decky_ip_range, "dev", HOST_IPVLAN_IFACE], check=False) def teardown_host_ipvlan(decky_ip_range: str) -> None: _require_root() _run(["ip", "route", "del", decky_ip_range, "dev", HOST_IPVLAN_IFACE], check=False) _run(["ip", "link", "del", HOST_IPVLAN_IFACE], check=False) # --------------------------------------------------------------------------- # Compute an ip_range CIDR that covers a list of IPs # --------------------------------------------------------------------------- def ips_to_range(ips: list[str]) -> str: """ Given a list of IPs, return the tightest /N CIDR that covers them all. Used as the --ip-range for MACVLAN so Docker assigns exactly those IPs. """ addrs = [IPv4Address(ip) for ip in ips] network = IPv4Network( (int(min(addrs)), 32 - (int(max(addrs)) ^ int(min(addrs))).bit_length()), strict=False, ) return str(network)