Files
DECNET/decnet/network.py
anti 988732f4f9
Some checks failed
CI / Test (pytest) (3.11) (push) Has been cancelled
CI / Test (pytest) (3.12) (push) Has been cancelled
Security / SAST (bandit) (push) Has been cancelled
Security / Dependency audit (pip-audit) (push) Has been cancelled
CI / Lint (ruff) (push) Has been cancelled
Fix all ruff lint errors across decnet/, templates/, and tests/
2026-04-04 17:36:16 -03:00

264 lines
8.6 KiB
Python

"""
Network management for DECNET.
Handles:
- Auto-detection of the host's active interface + subnet + gateway
- MACVLAN Docker network creation
- Host-side macvlan interface (hairpin fix so the deployer can reach deckies)
- IP allocation (sequential, skipping reserved addresses)
"""
import os
import subprocess
from ipaddress import IPv4Address, IPv4Interface, IPv4Network
import docker
MACVLAN_NETWORK_NAME = "decnet_lan"
HOST_MACVLAN_IFACE = "decnet_macvlan0"
HOST_IPVLAN_IFACE = "decnet_ipvlan0"
# ---------------------------------------------------------------------------
# Interface / subnet auto-detection
# ---------------------------------------------------------------------------
def _run(cmd: list[str], check: bool = True) -> subprocess.CompletedProcess:
return subprocess.run(cmd, capture_output=True, text=True, check=check)
def detect_interface() -> str:
"""Return the name of the default outbound interface."""
result = _run(["ip", "route", "show", "default"])
for line in result.stdout.splitlines():
parts = line.split()
if "dev" in parts:
return parts[parts.index("dev") + 1]
raise RuntimeError("Could not auto-detect network interface. Use --interface.")
def detect_subnet(interface: str) -> tuple[str, str]:
"""
Return (subnet_cidr, gateway) for the given interface.
e.g. ("192.168.1.0/24", "192.168.1.1")
"""
result = _run(["ip", "addr", "show", interface])
subnet_cidr = None
for line in result.stdout.splitlines():
line = line.strip()
if line.startswith("inet ") and not line.startswith("inet6"):
# e.g. "inet 192.168.1.5/24 brd 192.168.1.255 scope global eth0"
addr_cidr = line.split()[1]
iface = IPv4Interface(addr_cidr)
subnet_cidr = str(iface.network)
break
if subnet_cidr is None:
raise RuntimeError(f"Could not detect subnet for interface {interface}.")
gw_result = _run(["ip", "route", "show", "default"])
gateway = None
for line in gw_result.stdout.splitlines():
parts = line.split()
if "via" in parts:
gateway = parts[parts.index("via") + 1]
break
if gateway is None:
raise RuntimeError("Could not detect gateway.")
return subnet_cidr, gateway
def get_host_ip(interface: str) -> str:
"""Return the host's IP on the given interface."""
result = _run(["ip", "addr", "show", interface])
for line in result.stdout.splitlines():
line = line.strip()
if line.startswith("inet ") and not line.startswith("inet6"):
return line.split()[1].split("/")[0]
raise RuntimeError(f"Could not determine host IP for interface {interface}.")
# ---------------------------------------------------------------------------
# IP allocation
# ---------------------------------------------------------------------------
def allocate_ips(
subnet: str,
gateway: str,
host_ip: str,
count: int,
ip_start: str | None = None,
) -> list[str]:
"""
Return a list of `count` available IPs from the subnet,
skipping network addr, broadcast, gateway, and host IP.
Starts from ip_start if given, else from the first usable host.
"""
net = IPv4Network(subnet, strict=False)
reserved = {
net.network_address,
net.broadcast_address,
IPv4Address(gateway),
IPv4Address(host_ip),
}
start_addr = IPv4Address(ip_start) if ip_start else net.network_address + 1
allocated: list[str] = []
for addr in net.hosts():
if addr < start_addr:
continue
if addr in reserved:
continue
allocated.append(str(addr))
if len(allocated) == count:
break
if len(allocated) < count:
raise RuntimeError(
f"Not enough free IPs in {subnet} for {count} deckies "
f"(found {len(allocated)})."
)
return allocated
# ---------------------------------------------------------------------------
# Docker MACVLAN network
# ---------------------------------------------------------------------------
def create_macvlan_network(
client: docker.DockerClient,
interface: str,
subnet: str,
gateway: str,
ip_range: str,
) -> None:
"""Create the MACVLAN Docker network. No-op if it already exists."""
existing = [n.name for n in client.networks.list()]
if MACVLAN_NETWORK_NAME in existing:
return
client.networks.create(
name=MACVLAN_NETWORK_NAME,
driver="macvlan",
options={"parent": interface},
ipam=docker.types.IPAMConfig(
driver="default",
pool_configs=[
docker.types.IPAMPool(
subnet=subnet,
gateway=gateway,
iprange=ip_range,
)
],
),
)
def create_ipvlan_network(
client: docker.DockerClient,
interface: str,
subnet: str,
gateway: str,
ip_range: str,
) -> None:
"""Create an IPvlan L2 Docker network. No-op if it already exists."""
existing = [n.name for n in client.networks.list()]
if MACVLAN_NETWORK_NAME in existing:
return
client.networks.create(
name=MACVLAN_NETWORK_NAME,
driver="ipvlan",
options={"parent": interface, "ipvlan_mode": "l2"},
ipam=docker.types.IPAMConfig(
driver="default",
pool_configs=[
docker.types.IPAMPool(
subnet=subnet,
gateway=gateway,
iprange=ip_range,
)
],
),
)
def remove_macvlan_network(client: docker.DockerClient) -> None:
nets = [n for n in client.networks.list() if n.name == MACVLAN_NETWORK_NAME]
for n in nets:
n.remove()
# ---------------------------------------------------------------------------
# Host-side macvlan interface (hairpin fix)
# ---------------------------------------------------------------------------
def _require_root() -> None:
if os.geteuid() != 0:
raise PermissionError(
"MACVLAN host-side interface setup requires root. Run with sudo."
)
def setup_host_macvlan(interface: str, host_macvlan_ip: str, decky_ip_range: str) -> None:
"""
Create a macvlan interface on the host so the deployer can reach deckies.
Idempotent — skips steps that are already done.
"""
_require_root()
# Check if interface already exists
result = _run(["ip", "link", "show", HOST_MACVLAN_IFACE], check=False)
if result.returncode != 0:
_run(["ip", "link", "add", HOST_MACVLAN_IFACE, "link", interface, "type", "macvlan", "mode", "bridge"])
_run(["ip", "addr", "add", f"{host_macvlan_ip}/32", "dev", HOST_MACVLAN_IFACE], check=False)
_run(["ip", "link", "set", HOST_MACVLAN_IFACE, "up"])
_run(["ip", "route", "add", decky_ip_range, "dev", HOST_MACVLAN_IFACE], check=False)
def teardown_host_macvlan(decky_ip_range: str) -> None:
_require_root()
_run(["ip", "route", "del", decky_ip_range, "dev", HOST_MACVLAN_IFACE], check=False)
_run(["ip", "link", "del", HOST_MACVLAN_IFACE], check=False)
def setup_host_ipvlan(interface: str, host_ipvlan_ip: str, decky_ip_range: str) -> None:
"""
Create an IPvlan interface on the host so the deployer can reach deckies.
Idempotent — skips steps that are already done.
"""
_require_root()
result = _run(["ip", "link", "show", HOST_IPVLAN_IFACE], check=False)
if result.returncode != 0:
_run(["ip", "link", "add", HOST_IPVLAN_IFACE, "link", interface, "type", "ipvlan", "mode", "l2"])
_run(["ip", "addr", "add", f"{host_ipvlan_ip}/32", "dev", HOST_IPVLAN_IFACE], check=False)
_run(["ip", "link", "set", HOST_IPVLAN_IFACE, "up"])
_run(["ip", "route", "add", decky_ip_range, "dev", HOST_IPVLAN_IFACE], check=False)
def teardown_host_ipvlan(decky_ip_range: str) -> None:
_require_root()
_run(["ip", "route", "del", decky_ip_range, "dev", HOST_IPVLAN_IFACE], check=False)
_run(["ip", "link", "del", HOST_IPVLAN_IFACE], check=False)
# ---------------------------------------------------------------------------
# Compute an ip_range CIDR that covers a list of IPs
# ---------------------------------------------------------------------------
def ips_to_range(ips: list[str]) -> str:
"""
Given a list of IPs, return the tightest /N CIDR that covers them all.
Used as the --ip-range for MACVLAN so Docker assigns exactly those IPs.
"""
addrs = [IPv4Address(ip) for ip in ips]
network = IPv4Network(
(int(min(addrs)), 32 - (int(max(addrs)) ^ int(min(addrs))).bit_length()),
strict=False,
)
return str(network)