From 1bd1846e40cb9529bcdc3e301592f3061ec0e453 Mon Sep 17 00:00:00 2001 From: anti Date: Mon, 20 Apr 2026 17:41:17 -0400 Subject: [PATCH] feat(topology): extract IP + subnet allocators as reusable services MazeNET phase 2 step 1. Pulls inline IP/subnet allocation out of the generator into decnet/topology/allocator.py so the editor + reconciler can reuse the same primitives without duplicating logic. - IPAllocator: stateful host-IP handout with reserve/release/is_free. - SubnetAllocator: /24 handout under a base prefix, skips reservations. - reserved_subnets(repo): collects claimed subnets across every non-torn_down topology so concurrent drafts cannot collide. - generate() accepts reserved_subnets= to skip existing claims. Generator output is byte-identical under seed (behavior preserved). --- decnet/topology/allocator.py | 129 +++++++++++++++++++++++++ decnet/topology/generator.py | 60 ++++++------ tests/topology/test_allocator.py | 155 +++++++++++++++++++++++++++++++ 3 files changed, 313 insertions(+), 31 deletions(-) create mode 100644 decnet/topology/allocator.py create mode 100644 tests/topology/test_allocator.py diff --git a/decnet/topology/allocator.py b/decnet/topology/allocator.py new file mode 100644 index 00000000..2749688b --- /dev/null +++ b/decnet/topology/allocator.py @@ -0,0 +1,129 @@ +"""IP and subnet allocators for MazeNET topologies. + +Extracted from :mod:`decnet.topology.generator` so the same primitives +can be reused by the generator, the pre-deploy editor (REST), and the +mutator reconciler. The allocators are pure — persistence lives in the +repo; these objects hold in-memory state for a single planning pass. + +``reserved_subnets`` queries the repo for every subnet currently claimed +by a non-``torn_down`` topology so a new draft cannot collide with an +open one. +""" +from __future__ import annotations + +from ipaddress import IPv4Network +from typing import Any, Iterable + +from decnet.topology.status import TopologyStatus + + +class AllocatorExhausted(RuntimeError): + """Raised when an allocator cannot produce another value.""" + + +class IPAllocator: + """Hands out host IPs within a single LAN subnet. + + Skips the ``.1`` gateway. Callers may pre-seed taken IPs via + :meth:`reserve` before requesting :meth:`next_free`. + """ + + def __init__(self, subnet: str) -> None: + self._net = IPv4Network(subnet, strict=False) + self._gateway = str(next(self._net.hosts())) + self._pool: list[str] = [ + str(ip) for ip in self._net.hosts() if str(ip) != self._gateway + ] + self._taken: set[str] = set() + self._cursor = 0 + + def next_free(self) -> str: + while self._cursor < len(self._pool): + ip = self._pool[self._cursor] + self._cursor += 1 + if ip not in self._taken: + self._taken.add(ip) + return ip + # Cursor past the end — fall back to a linear scan in case + # releases opened up earlier slots. + for ip in self._pool: + if ip not in self._taken: + self._taken.add(ip) + return ip + raise AllocatorExhausted( + f"no free IPs left in {self._net.with_prefixlen}" + ) + + def reserve(self, ip: str) -> None: + if ip == self._gateway: + raise ValueError(f"{ip} is the gateway of {self._net.with_prefixlen}") + if ip not in {str(h) for h in self._net.hosts()}: + raise ValueError(f"{ip} not in {self._net.with_prefixlen}") + self._taken.add(ip) + + def release(self, ip: str) -> None: + self._taken.discard(ip) + + def is_free(self, ip: str) -> bool: + return ip not in self._taken and ip in {str(h) for h in self._net.hosts()} and ip != self._gateway + + +class SubnetAllocator: + """Hands out ``/24`` subnets under a base prefix (e.g. ``172.20``).""" + + _MAX_INDEX = 256 # 172.20.0/24 .. 172.20.255/24 + + def __init__( + self, + base_prefix: str, + reserved: Iterable[str] = (), + ) -> None: + self._base = base_prefix.rstrip(".") + self._reserved: set[str] = {s for s in reserved} + self._cursor = 0 + + def _candidate(self, idx: int) -> str: + return f"{self._base}.{idx}.0/24" + + def next_free(self) -> str: + while self._cursor < self._MAX_INDEX: + subnet = self._candidate(self._cursor) + self._cursor += 1 + if subnet not in self._reserved: + self._reserved.add(subnet) + return subnet + raise AllocatorExhausted( + f"no free /24s left under {self._base}.0.0/16" + ) + + def reserve(self, subnet: str) -> None: + self._reserved.add(subnet) + + def is_free(self, subnet: str) -> bool: + return subnet not in self._reserved + + +# Topology statuses whose LANs still claim subnets. torn_down is the +# only state that releases its networks back to the pool. +_SUBNET_CLAIMING_STATES: frozenset[str] = frozenset( + { + TopologyStatus.PENDING, + TopologyStatus.DEPLOYING, + TopologyStatus.ACTIVE, + TopologyStatus.DEGRADED, + TopologyStatus.FAILED, + TopologyStatus.TEARING_DOWN, + } +) + + +async def reserved_subnets(repo: Any) -> set[str]: + """All LAN subnets currently claimed by non-torn-down topologies.""" + out: set[str] = set() + for status in _SUBNET_CLAIMING_STATES: + for topo in await repo.list_topologies(status=status): + for lan in await repo.list_lans_for_topology(topo["id"]): + subnet = lan.get("subnet") + if subnet: + out.add(subnet) + return out diff --git a/decnet/topology/generator.py b/decnet/topology/generator.py index bd3c468b..7933f189 100644 --- a/decnet/topology/generator.py +++ b/decnet/topology/generator.py @@ -11,10 +11,10 @@ containers is :mod:`decnet.engine.deployer`. from __future__ import annotations import random -from ipaddress import IPv4Network from typing import Optional from decnet.fleet import all_service_names +from decnet.topology.allocator import IPAllocator, SubnetAllocator from decnet.topology.config import ( GeneratedTopology, TopologyConfig, @@ -29,25 +29,24 @@ _SVC_MAX = 3 def _plan_lans( - config: TopologyConfig, rng: random.Random + config: TopologyConfig, + rng: random.Random, + subnets: SubnetAllocator, ) -> list[_PlannedLAN]: """Plan LANs as a tree of depth ``config.depth``. Each non-leaf level adds [1, branching_factor] children per parent. - LAN names and subnets are assigned in BFS order. + LAN names and subnets are assigned in BFS order; subnets come from + ``subnets``, which the caller may have pre-seeded with reservations + from other topologies. """ lans: list[_PlannedLAN] = [] - def _subnet(idx: int) -> str: - # Exhausting /24s at 172.X.0..255 caps topologies at 256 LANs on - # the default base. Well above the v1 envelope (depth=16 cap). - if idx > 255: - raise ValueError("too many LANs for the configured subnet_base_prefix") - return f"{config.subnet_base_prefix}.{idx}.0/24" - # DMZ root. lans.append( - _PlannedLAN(name="LAN-00", subnet=_subnet(0), is_dmz=True, parent=None) + _PlannedLAN( + name="LAN-00", subnet=subnets.next_free(), is_dmz=True, parent=None + ) ) frontier: list[_PlannedLAN] = [lans[0]] @@ -59,7 +58,7 @@ def _plan_lans( idx = len(lans) child = _PlannedLAN( name=f"LAN-{idx:02d}", - subnet=_subnet(idx), + subnet=subnets.next_free(), is_dmz=False, parent=parent.name, ) @@ -71,13 +70,6 @@ def _plan_lans( return lans -def _host_pool(subnet: str) -> list[str]: - """Usable host IPs in ``subnet``, skipping .1 (gateway).""" - net = IPv4Network(subnet, strict=False) - gateway = str(next(net.hosts())) - return [str(ip) for ip in net.hosts() if str(ip) != gateway] - - def _pick_services( rng: random.Random, services_explicit: Optional[list[str]], @@ -99,32 +91,38 @@ def _pick_services( return list(chosen) -def generate(config: TopologyConfig) -> GeneratedTopology: +def generate( + config: TopologyConfig, + *, + reserved_subnets: Optional[set[str]] = None, +) -> GeneratedTopology: """Generate a topology plan deterministically under ``config.seed``. The caller is responsible for persisting the plan via :mod:`decnet.topology.persistence` and then deploying it. + + ``reserved_subnets`` (optional): /24s already claimed by other + topologies. The subnet allocator skips these so two concurrent + drafts can't collide. Populate via + :func:`decnet.topology.allocator.reserved_subnets`. """ rng = random.Random(config.seed) # nosec B311 svc_pool = all_service_names() if config.randomize_services else [] used_combos: set[frozenset] = set() - lans = _plan_lans(config, rng) + subnets = SubnetAllocator( + config.subnet_base_prefix, reserved=reserved_subnets or set() + ) + lans = _plan_lans(config, rng, subnets) lans_by_name = {lan.name: lan for lan in lans} - # Per-LAN IP pools for deterministic assignment. - ip_iters: dict[str, list[str]] = { - lan.name: _host_pool(lan.subnet) for lan in lans + # Per-LAN IP allocators for deterministic assignment. + ip_allocs: dict[str, IPAllocator] = { + lan.name: IPAllocator(lan.subnet) for lan in lans } - ip_cursors: dict[str, int] = {lan.name: 0 for lan in lans} def _take_ip(lan_name: str) -> str: - pool = ip_iters[lan_name] - i = ip_cursors[lan_name] - if i >= len(pool): - raise RuntimeError(f"LAN {lan_name} ran out of IPs") - ip_cursors[lan_name] = i + 1 - return pool[i] + return ip_allocs[lan_name].next_free() deckies: list[_PlannedDecky] = [] edges: list[_PlannedEdge] = [] diff --git a/tests/topology/test_allocator.py b/tests/topology/test_allocator.py new file mode 100644 index 00000000..472a81a4 --- /dev/null +++ b/tests/topology/test_allocator.py @@ -0,0 +1,155 @@ +"""Allocator unit + integration tests.""" +from __future__ import annotations + +import pytest + +from decnet.topology.allocator import ( + AllocatorExhausted, + IPAllocator, + SubnetAllocator, + reserved_subnets, +) +from decnet.topology.config import TopologyConfig +from decnet.topology.generator import generate +from decnet.topology.persistence import persist, transition_status +from decnet.topology.status import TopologyStatus +from decnet.web.db.factory import get_repository + + +# --------------------------------------------------------------------- IPAllocator + + +def test_ip_allocator_sequential_skips_gateway(): + a = IPAllocator("10.0.0.0/29") # hosts: .1 .. .6; .1 is gateway + got = [a.next_free() for _ in range(5)] + assert got == ["10.0.0.2", "10.0.0.3", "10.0.0.4", "10.0.0.5", "10.0.0.6"] + + +def test_ip_allocator_reserve_release_roundtrip(): + a = IPAllocator("10.0.0.0/29") + a.reserve("10.0.0.3") + assert not a.is_free("10.0.0.3") + a.release("10.0.0.3") + assert a.is_free("10.0.0.3") + + +def test_ip_allocator_reserve_rejects_gateway(): + a = IPAllocator("10.0.0.0/29") + with pytest.raises(ValueError): + a.reserve("10.0.0.1") + + +def test_ip_allocator_reserve_rejects_out_of_subnet(): + a = IPAllocator("10.0.0.0/29") + with pytest.raises(ValueError): + a.reserve("10.0.0.100") + + +def test_ip_allocator_next_free_after_reserve_skips(): + a = IPAllocator("10.0.0.0/29") + a.reserve("10.0.0.2") + assert a.next_free() == "10.0.0.3" + + +def test_ip_allocator_exhaustion_raises(): + a = IPAllocator("10.0.0.0/30") # hosts: .1 .. .2; .1 gateway → only .2 usable + assert a.next_free() == "10.0.0.2" + with pytest.raises(AllocatorExhausted): + a.next_free() + + +# --------------------------------------------------------------------- SubnetAllocator + + +def test_subnet_allocator_sequential(): + s = SubnetAllocator("172.20") + assert s.next_free() == "172.20.0.0/24" + assert s.next_free() == "172.20.1.0/24" + assert s.next_free() == "172.20.2.0/24" + + +def test_subnet_allocator_skips_reserved(): + s = SubnetAllocator("172.20", reserved={"172.20.0.0/24", "172.20.1.0/24"}) + assert s.next_free() == "172.20.2.0/24" + + +def test_subnet_allocator_reserve_is_idempotent(): + s = SubnetAllocator("172.20") + s.reserve("172.20.0.0/24") + assert s.next_free() == "172.20.1.0/24" + + +def test_subnet_allocator_exhaustion_raises(): + reserved = {f"10.0.{i}.0/24" for i in range(256)} + s = SubnetAllocator("10.0", reserved=reserved) + with pytest.raises(AllocatorExhausted): + s.next_free() + + +# --------------------------------------------------------------------- reserved_subnets + + +def _cfg(**kw) -> TopologyConfig: + base = dict( + name="alloc", + depth=1, + branching_factor=1, + deckies_per_lan_min=1, + deckies_per_lan_max=1, + cross_edge_probability=0.0, + randomize_services=False, + services_explicit=["ssh"], + seed=3, + ) + base.update(kw) + return TopologyConfig(**base) + + +@pytest.fixture +async def repo(tmp_path): + r = get_repository(db_path=str(tmp_path / "alloc.db")) + await r.initialize() + return r + + +@pytest.mark.anyio +async def test_reserved_subnets_includes_pending_and_active(repo): + plan_a = generate(_cfg(name="a")) + tid_a = await persist(repo, plan_a) # pending + + plan_b = generate(_cfg(name="b", subnet_base_prefix="172.21")) + tid_b = await persist(repo, plan_b) + await transition_status(repo, tid_b, TopologyStatus.DEPLOYING) + # DEPLOYING → ACTIVE + await transition_status(repo, tid_b, TopologyStatus.ACTIVE) + + claimed = await reserved_subnets(repo) + for lan in plan_a.lans: + assert lan.subnet in claimed + for lan in plan_b.lans: + assert lan.subnet in claimed + + +@pytest.mark.anyio +async def test_reserved_subnets_excludes_torn_down(repo): + plan = generate(_cfg(name="gone")) + tid = await persist(repo, plan) + # pending → torn_down is legal + await transition_status(repo, tid, TopologyStatus.TORN_DOWN) + + claimed = await reserved_subnets(repo) + for lan in plan.lans: + assert lan.subnet not in claimed + + +@pytest.mark.anyio +async def test_generate_respects_reserved(repo): + plan_a = generate(_cfg(name="a")) + await persist(repo, plan_a) + claimed = await reserved_subnets(repo) + # Second topology on the same base, told about reservations: must + # pick subnets not in the first one's set. + plan_b = generate(_cfg(name="b"), reserved_subnets=claimed) + b_subnets = {lan.subnet for lan in plan_b.lans} + a_subnets = {lan.subnet for lan in plan_a.lans} + assert b_subnets.isdisjoint(a_subnets)