feat(topology): extract IP + subnet allocators as reusable services

MazeNET phase 2 step 1. Pulls inline IP/subnet allocation out of the
generator into decnet/topology/allocator.py so the editor + reconciler
can reuse the same primitives without duplicating logic.

- IPAllocator: stateful host-IP handout with reserve/release/is_free.
- SubnetAllocator: /24 handout under a base prefix, skips reservations.
- reserved_subnets(repo): collects claimed subnets across every
  non-torn_down topology so concurrent drafts cannot collide.
- generate() accepts reserved_subnets= to skip existing claims.

Generator output is byte-identical under seed (behavior preserved).
This commit is contained in:
2026-04-20 17:41:17 -04:00
parent 80e3c28234
commit 1bd1846e40
3 changed files with 313 additions and 31 deletions

View File

@@ -0,0 +1,129 @@
"""IP and subnet allocators for MazeNET topologies.
Extracted from :mod:`decnet.topology.generator` so the same primitives
can be reused by the generator, the pre-deploy editor (REST), and the
mutator reconciler. The allocators are pure — persistence lives in the
repo; these objects hold in-memory state for a single planning pass.
``reserved_subnets`` queries the repo for every subnet currently claimed
by a non-``torn_down`` topology so a new draft cannot collide with an
open one.
"""
from __future__ import annotations
from ipaddress import IPv4Network
from typing import Any, Iterable
from decnet.topology.status import TopologyStatus
class AllocatorExhausted(RuntimeError):
"""Raised when an allocator cannot produce another value."""
class IPAllocator:
"""Hands out host IPs within a single LAN subnet.
Skips the ``.1`` gateway. Callers may pre-seed taken IPs via
:meth:`reserve` before requesting :meth:`next_free`.
"""
def __init__(self, subnet: str) -> None:
self._net = IPv4Network(subnet, strict=False)
self._gateway = str(next(self._net.hosts()))
self._pool: list[str] = [
str(ip) for ip in self._net.hosts() if str(ip) != self._gateway
]
self._taken: set[str] = set()
self._cursor = 0
def next_free(self) -> str:
while self._cursor < len(self._pool):
ip = self._pool[self._cursor]
self._cursor += 1
if ip not in self._taken:
self._taken.add(ip)
return ip
# Cursor past the end — fall back to a linear scan in case
# releases opened up earlier slots.
for ip in self._pool:
if ip not in self._taken:
self._taken.add(ip)
return ip
raise AllocatorExhausted(
f"no free IPs left in {self._net.with_prefixlen}"
)
def reserve(self, ip: str) -> None:
if ip == self._gateway:
raise ValueError(f"{ip} is the gateway of {self._net.with_prefixlen}")
if ip not in {str(h) for h in self._net.hosts()}:
raise ValueError(f"{ip} not in {self._net.with_prefixlen}")
self._taken.add(ip)
def release(self, ip: str) -> None:
self._taken.discard(ip)
def is_free(self, ip: str) -> bool:
return ip not in self._taken and ip in {str(h) for h in self._net.hosts()} and ip != self._gateway
class SubnetAllocator:
"""Hands out ``/24`` subnets under a base prefix (e.g. ``172.20``)."""
_MAX_INDEX = 256 # 172.20.0/24 .. 172.20.255/24
def __init__(
self,
base_prefix: str,
reserved: Iterable[str] = (),
) -> None:
self._base = base_prefix.rstrip(".")
self._reserved: set[str] = {s for s in reserved}
self._cursor = 0
def _candidate(self, idx: int) -> str:
return f"{self._base}.{idx}.0/24"
def next_free(self) -> str:
while self._cursor < self._MAX_INDEX:
subnet = self._candidate(self._cursor)
self._cursor += 1
if subnet not in self._reserved:
self._reserved.add(subnet)
return subnet
raise AllocatorExhausted(
f"no free /24s left under {self._base}.0.0/16"
)
def reserve(self, subnet: str) -> None:
self._reserved.add(subnet)
def is_free(self, subnet: str) -> bool:
return subnet not in self._reserved
# Topology statuses whose LANs still claim subnets. torn_down is the
# only state that releases its networks back to the pool.
_SUBNET_CLAIMING_STATES: frozenset[str] = frozenset(
{
TopologyStatus.PENDING,
TopologyStatus.DEPLOYING,
TopologyStatus.ACTIVE,
TopologyStatus.DEGRADED,
TopologyStatus.FAILED,
TopologyStatus.TEARING_DOWN,
}
)
async def reserved_subnets(repo: Any) -> set[str]:
"""All LAN subnets currently claimed by non-torn-down topologies."""
out: set[str] = set()
for status in _SUBNET_CLAIMING_STATES:
for topo in await repo.list_topologies(status=status):
for lan in await repo.list_lans_for_topology(topo["id"]):
subnet = lan.get("subnet")
if subnet:
out.add(subnet)
return out

View File

@@ -11,10 +11,10 @@ containers is :mod:`decnet.engine.deployer`.
from __future__ import annotations
import random
from ipaddress import IPv4Network
from typing import Optional
from decnet.fleet import all_service_names
from decnet.topology.allocator import IPAllocator, SubnetAllocator
from decnet.topology.config import (
GeneratedTopology,
TopologyConfig,
@@ -29,25 +29,24 @@ _SVC_MAX = 3
def _plan_lans(
config: TopologyConfig, rng: random.Random
config: TopologyConfig,
rng: random.Random,
subnets: SubnetAllocator,
) -> list[_PlannedLAN]:
"""Plan LANs as a tree of depth ``config.depth``.
Each non-leaf level adds [1, branching_factor] children per parent.
LAN names and subnets are assigned in BFS order.
LAN names and subnets are assigned in BFS order; subnets come from
``subnets``, which the caller may have pre-seeded with reservations
from other topologies.
"""
lans: list[_PlannedLAN] = []
def _subnet(idx: int) -> str:
# Exhausting /24s at 172.X.0..255 caps topologies at 256 LANs on
# the default base. Well above the v1 envelope (depth=16 cap).
if idx > 255:
raise ValueError("too many LANs for the configured subnet_base_prefix")
return f"{config.subnet_base_prefix}.{idx}.0/24"
# DMZ root.
lans.append(
_PlannedLAN(name="LAN-00", subnet=_subnet(0), is_dmz=True, parent=None)
_PlannedLAN(
name="LAN-00", subnet=subnets.next_free(), is_dmz=True, parent=None
)
)
frontier: list[_PlannedLAN] = [lans[0]]
@@ -59,7 +58,7 @@ def _plan_lans(
idx = len(lans)
child = _PlannedLAN(
name=f"LAN-{idx:02d}",
subnet=_subnet(idx),
subnet=subnets.next_free(),
is_dmz=False,
parent=parent.name,
)
@@ -71,13 +70,6 @@ def _plan_lans(
return lans
def _host_pool(subnet: str) -> list[str]:
"""Usable host IPs in ``subnet``, skipping .1 (gateway)."""
net = IPv4Network(subnet, strict=False)
gateway = str(next(net.hosts()))
return [str(ip) for ip in net.hosts() if str(ip) != gateway]
def _pick_services(
rng: random.Random,
services_explicit: Optional[list[str]],
@@ -99,32 +91,38 @@ def _pick_services(
return list(chosen)
def generate(config: TopologyConfig) -> GeneratedTopology:
def generate(
config: TopologyConfig,
*,
reserved_subnets: Optional[set[str]] = None,
) -> GeneratedTopology:
"""Generate a topology plan deterministically under ``config.seed``.
The caller is responsible for persisting the plan via
:mod:`decnet.topology.persistence` and then deploying it.
``reserved_subnets`` (optional): /24s already claimed by other
topologies. The subnet allocator skips these so two concurrent
drafts can't collide. Populate via
:func:`decnet.topology.allocator.reserved_subnets`.
"""
rng = random.Random(config.seed) # nosec B311
svc_pool = all_service_names() if config.randomize_services else []
used_combos: set[frozenset] = set()
lans = _plan_lans(config, rng)
subnets = SubnetAllocator(
config.subnet_base_prefix, reserved=reserved_subnets or set()
)
lans = _plan_lans(config, rng, subnets)
lans_by_name = {lan.name: lan for lan in lans}
# Per-LAN IP pools for deterministic assignment.
ip_iters: dict[str, list[str]] = {
lan.name: _host_pool(lan.subnet) for lan in lans
# Per-LAN IP allocators for deterministic assignment.
ip_allocs: dict[str, IPAllocator] = {
lan.name: IPAllocator(lan.subnet) for lan in lans
}
ip_cursors: dict[str, int] = {lan.name: 0 for lan in lans}
def _take_ip(lan_name: str) -> str:
pool = ip_iters[lan_name]
i = ip_cursors[lan_name]
if i >= len(pool):
raise RuntimeError(f"LAN {lan_name} ran out of IPs")
ip_cursors[lan_name] = i + 1
return pool[i]
return ip_allocs[lan_name].next_free()
deckies: list[_PlannedDecky] = []
edges: list[_PlannedEdge] = []