Files
DECNET/decnet/network.py
anti 91549e6936 fix(deploy): prevent 'Address already in use' from stale IPAM and half-torn-down containers
Two compounding root causes produced the recurring 'Address already in use'
error on redeploy:

1. _ensure_network only compared driver+name; if a prior deploy's IPAM
   pool drifted (different subnet/gateway/range), Docker kept handing out
   addresses from the old pool and raced the real LAN. Now also compares
   Subnet/Gateway/IPRange and rebuilds on drift.

2. A prior half-failed 'up' could leave containers still holding the IPs
   and ports the new run wants. Run 'compose down --remove-orphans' as a
   best-effort pre-up cleanup so IPAM starts from a clean state.

Also surface docker compose stderr to the structured log on failure so
the agent's journal captures Docker's actual message (which IP, which
port) instead of just the exit code.
2026-04-19 19:59:06 -04:00

309 lines
11 KiB
Python

"""
Network management for DECNET.
Handles:
- Auto-detection of the host's active interface + subnet + gateway
- MACVLAN Docker network creation
- Host-side macvlan interface (hairpin fix so the deployer can reach deckies)
- IP allocation (sequential, skipping reserved addresses)
"""
import os
import subprocess # nosec B404
from ipaddress import IPv4Address, IPv4Interface, IPv4Network
import docker
MACVLAN_NETWORK_NAME = "decnet_lan"
HOST_MACVLAN_IFACE = "decnet_macvlan0"
HOST_IPVLAN_IFACE = "decnet_ipvlan0"
# ---------------------------------------------------------------------------
# Interface / subnet auto-detection
# ---------------------------------------------------------------------------
def _run(cmd: list[str], check: bool = True) -> subprocess.CompletedProcess:
return subprocess.run(cmd, capture_output=True, text=True, check=check) # nosec B603 B404
def detect_interface() -> str:
"""Return the name of the default outbound interface."""
result = _run(["ip", "route", "show", "default"])
for line in result.stdout.splitlines():
parts = line.split()
if "dev" in parts:
return parts[parts.index("dev") + 1]
raise RuntimeError("Could not auto-detect network interface. Use --interface.")
def detect_subnet(interface: str) -> tuple[str, str]:
"""
Return (subnet_cidr, gateway) for the given interface.
e.g. ("192.168.1.0/24", "192.168.1.1")
"""
result = _run(["ip", "addr", "show", interface])
subnet_cidr = None
for line in result.stdout.splitlines():
line = line.strip()
if line.startswith("inet ") and not line.startswith("inet6"):
# e.g. "inet 192.168.1.5/24 brd 192.168.1.255 scope global eth0"
addr_cidr = line.split()[1]
iface = IPv4Interface(addr_cidr)
subnet_cidr = str(iface.network)
break
if subnet_cidr is None:
raise RuntimeError(f"Could not detect subnet for interface {interface}.")
gw_result = _run(["ip", "route", "show", "default"])
gateway = None
for line in gw_result.stdout.splitlines():
parts = line.split()
if "via" in parts:
gateway = parts[parts.index("via") + 1]
break
if gateway is None:
raise RuntimeError("Could not detect gateway.")
return subnet_cidr, gateway
def get_host_ip(interface: str) -> str:
"""Return the host's IP on the given interface."""
result = _run(["ip", "addr", "show", interface])
for line in result.stdout.splitlines():
line = line.strip()
if line.startswith("inet ") and not line.startswith("inet6"):
return line.split()[1].split("/")[0]
raise RuntimeError(f"Could not determine host IP for interface {interface}.")
# ---------------------------------------------------------------------------
# IP allocation
# ---------------------------------------------------------------------------
def allocate_ips(
subnet: str,
gateway: str,
host_ip: str,
count: int,
ip_start: str | None = None,
) -> list[str]:
"""
Return a list of `count` available IPs from the subnet,
skipping network addr, broadcast, gateway, and host IP.
Starts from ip_start if given, else from the first usable host.
"""
net = IPv4Network(subnet, strict=False)
reserved = {
net.network_address,
net.broadcast_address,
IPv4Address(gateway),
IPv4Address(host_ip),
}
start_addr = IPv4Address(ip_start) if ip_start else net.network_address + 1
allocated: list[str] = []
for addr in net.hosts():
if addr < start_addr:
continue
if addr in reserved:
continue
allocated.append(str(addr))
if len(allocated) == count:
break
if len(allocated) < count:
raise RuntimeError(
f"Not enough free IPs in {subnet} for {count} deckies "
f"(found {len(allocated)})."
)
return allocated
# ---------------------------------------------------------------------------
# Docker MACVLAN network
# ---------------------------------------------------------------------------
def _ensure_network(
client: docker.DockerClient,
*,
driver: str,
interface: str,
subnet: str,
gateway: str,
ip_range: str,
extra_options: dict | None = None,
) -> None:
"""Create the decnet docker network with ``driver``, replacing any
existing network of the same name that was built with a different driver.
Why the replace-on-driver-mismatch: macvlan and ipvlan slaves can't
coexist on the same parent interface. If an earlier run left behind a
macvlan-driver network and we're now asked for ipvlan (or vice versa),
short-circuiting on name alone leaves Docker attaching new containers
to the old driver and the host NIC ends up EBUSY on the next port
create. So: when driver disagrees, disconnect everything and DROP it.
"""
options = {"parent": interface}
if extra_options:
options.update(extra_options)
for net in client.networks.list(names=[MACVLAN_NETWORK_NAME]):
if net.attrs.get("Driver") == driver:
# Same driver — but if the IPAM pool drifted (different subnet,
# gateway, or ip-range than this deploy asks for), reusing it
# hands out addresses from the old pool and we race the real LAN.
# Compare and rebuild on mismatch.
pools = (net.attrs.get("IPAM") or {}).get("Config") or []
cur = pools[0] if pools else {}
if (
cur.get("Subnet") == subnet
and cur.get("Gateway") == gateway
and cur.get("IPRange") == ip_range
):
return # right driver AND matching pool, leave it alone
# Driver mismatch OR IPAM drift — tear it down. Disconnect any live
# containers first so `remove()` doesn't refuse with ErrNetworkInUse.
for cid in (net.attrs.get("Containers") or {}):
try:
net.disconnect(cid, force=True)
except docker.errors.APIError:
pass
net.remove()
client.networks.create(
name=MACVLAN_NETWORK_NAME,
driver=driver,
options=options,
ipam=docker.types.IPAMConfig(
driver="default",
pool_configs=[
docker.types.IPAMPool(
subnet=subnet,
gateway=gateway,
iprange=ip_range,
)
],
),
)
def create_macvlan_network(
client: docker.DockerClient,
interface: str,
subnet: str,
gateway: str,
ip_range: str,
) -> None:
"""Create the MACVLAN Docker network, replacing an ipvlan-driver one of
the same name if necessary (parent-NIC can't host both drivers)."""
_ensure_network(
client, driver="macvlan", interface=interface,
subnet=subnet, gateway=gateway, ip_range=ip_range,
)
def create_ipvlan_network(
client: docker.DockerClient,
interface: str,
subnet: str,
gateway: str,
ip_range: str,
) -> None:
"""Create an IPvlan L2 Docker network, replacing a macvlan-driver one of
the same name if necessary (parent-NIC can't host both drivers)."""
_ensure_network(
client, driver="ipvlan", interface=interface,
subnet=subnet, gateway=gateway, ip_range=ip_range,
extra_options={"ipvlan_mode": "l2"},
)
def remove_macvlan_network(client: docker.DockerClient) -> None:
nets = [n for n in client.networks.list() if n.name == MACVLAN_NETWORK_NAME]
for n in nets:
n.remove()
# ---------------------------------------------------------------------------
# Host-side macvlan interface (hairpin fix)
# ---------------------------------------------------------------------------
def _require_root() -> None:
if os.geteuid() != 0:
raise PermissionError(
"MACVLAN host-side interface setup requires root. Run with sudo."
)
def setup_host_macvlan(interface: str, host_macvlan_ip: str, decky_ip_range: str) -> None:
"""
Create a macvlan interface on the host so the deployer can reach deckies.
Idempotent — skips steps that are already done. Drops a stale ipvlan
host-helper first: the two drivers can share a parent NIC on paper but
leaving the opposite helper in place is just cruft after a driver swap.
"""
_require_root()
_run(["ip", "link", "del", HOST_IPVLAN_IFACE], check=False)
# Check if interface already exists
result = _run(["ip", "link", "show", HOST_MACVLAN_IFACE], check=False)
if result.returncode != 0:
_run(["ip", "link", "add", HOST_MACVLAN_IFACE, "link", interface, "type", "macvlan", "mode", "bridge"])
_run(["ip", "addr", "add", f"{host_macvlan_ip}/32", "dev", HOST_MACVLAN_IFACE], check=False)
_run(["ip", "link", "set", HOST_MACVLAN_IFACE, "up"])
_run(["ip", "route", "add", decky_ip_range, "dev", HOST_MACVLAN_IFACE], check=False)
def teardown_host_macvlan(decky_ip_range: str) -> None:
_require_root()
_run(["ip", "route", "del", decky_ip_range, "dev", HOST_MACVLAN_IFACE], check=False)
_run(["ip", "link", "del", HOST_MACVLAN_IFACE], check=False)
def setup_host_ipvlan(interface: str, host_ipvlan_ip: str, decky_ip_range: str) -> None:
"""
Create an IPvlan interface on the host so the deployer can reach deckies.
Idempotent — skips steps that are already done. Drops a stale macvlan
host-helper first so a prior macvlan deploy doesn't leave its slave
dangling on the parent NIC after the driver swap.
"""
_require_root()
_run(["ip", "link", "del", HOST_MACVLAN_IFACE], check=False)
result = _run(["ip", "link", "show", HOST_IPVLAN_IFACE], check=False)
if result.returncode != 0:
_run(["ip", "link", "add", HOST_IPVLAN_IFACE, "link", interface, "type", "ipvlan", "mode", "l2"])
_run(["ip", "addr", "add", f"{host_ipvlan_ip}/32", "dev", HOST_IPVLAN_IFACE], check=False)
_run(["ip", "link", "set", HOST_IPVLAN_IFACE, "up"])
_run(["ip", "route", "add", decky_ip_range, "dev", HOST_IPVLAN_IFACE], check=False)
def teardown_host_ipvlan(decky_ip_range: str) -> None:
_require_root()
_run(["ip", "route", "del", decky_ip_range, "dev", HOST_IPVLAN_IFACE], check=False)
_run(["ip", "link", "del", HOST_IPVLAN_IFACE], check=False)
# ---------------------------------------------------------------------------
# Compute an ip_range CIDR that covers a list of IPs
# ---------------------------------------------------------------------------
def ips_to_range(ips: list[str]) -> str:
"""
Given a list of IPs, return the tightest /N CIDR that covers them all.
Used as the --ip-range for MACVLAN so Docker assigns exactly those IPs.
"""
addrs = [IPv4Address(ip) for ip in ips]
network = IPv4Network(
(int(min(addrs)), 32 - (int(max(addrs)) ^ int(min(addrs))).bit_length()),
strict=False,
)
return str(network)