feat(mazenet): topology package — config, status machine, generator, persistence

Adds decnet/topology/ with:

- config.TopologyConfig: pydantic model driving generation (depth,
  branching_factor, deckies_per_lan_min/max, bridge_forward_probability,
  cross_edge_probability, subnet_base_prefix, service selection, seed).
  Emits GeneratedTopology dataclass (lans, deckies, edges).

- status.TopologyStatus + assert_transition: seven-state machine with
  an explicit legal-transition table.  torn_down is terminal; degraded
  is schema-reserved for future Healer use.

- generator.generate: deterministic DAG generation under config.seed.
  Builds a tree of LANs (DMZ at root), plants deckies in each LAN,
  promotes one decky per non-DMZ LAN to a parent bridge, and rolls
  cross-edges per cross_edge_probability for DAG shape.

- persistence: persist() writes a plan to the repo as pending;
  transition_status() enforces state-machine legality; hydrate() loads
  topology + children into a single dict.

Covered by tests/topology/{test_status,test_generator,test_persistence}.
This commit is contained in:
2026-04-20 16:48:20 -04:00
parent 201d246c07
commit 33f139ecfa
8 changed files with 834 additions and 0 deletions

View File

@@ -0,0 +1,23 @@
"""MazeNET — nested deception topologies.
A topology is an arbitrary-depth DAG of LANs, connected by multi-homed
"bridge deckies" that optionally forward L3 between segments. One LAN
is marked as the DMZ (Internet-facing). Persisted via the repo pattern;
deployed via :mod:`decnet.engine.deployer`.
"""
from decnet.topology.config import TopologyConfig, GeneratedTopology
from decnet.topology.generator import generate
from decnet.topology.status import (
TopologyStatus,
assert_transition,
TopologyStatusError,
)
__all__ = [
"TopologyConfig",
"GeneratedTopology",
"generate",
"TopologyStatus",
"assert_transition",
"TopologyStatusError",
]

94
decnet/topology/config.py Normal file
View File

@@ -0,0 +1,94 @@
"""MazeNET topology config + in-memory generation output."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Optional
from pydantic import BaseModel, Field, model_validator
class TopologyConfig(BaseModel):
"""Parameters driving :func:`decnet.topology.generator.generate`."""
name: str = Field(..., min_length=1, max_length=64)
mode: str = Field(default="unihost", pattern=r"^(unihost|agent)$")
# Topology shape
depth: int = Field(..., ge=1, le=16, description="Max depth from DMZ")
branching_factor: int = Field(..., ge=1, le=8, description="Max child LANs per LAN")
deckies_per_lan_min: int = Field(default=1, ge=0, le=32)
deckies_per_lan_max: int = Field(default=3, ge=1, le=32)
# Probability a given non-DMZ LAN's connection to its parent uses a
# bridge decky that forwards L3 (enables attacker pivot). Bridge
# existence between parent/child is implicit — every non-DMZ LAN
# has exactly one parent bridge. This controls *forwarding*, not
# the existence of the bridge.
bridge_forward_probability: float = Field(default=1.0, ge=0.0, le=1.0)
# Probability of injecting a DAG cross-edge: a decky also bridged
# from its LAN to a non-parent, non-child LAN. 0.0 yields a tree.
cross_edge_probability: float = Field(default=0.0, ge=0.0, le=1.0)
# IP allocation base. LANs get sequential /24s starting here.
subnet_base_prefix: str = Field(default="172.20", pattern=r"^\d{1,3}\.\d{1,3}$")
# Service selection — reuses decnet.fleet.build_deckies' randomizer.
randomize_services: bool = Field(default=True)
services_explicit: Optional[list[str]] = None
seed: Optional[int] = Field(default=None, ge=0)
@model_validator(mode="after")
def _check_min_max(self) -> "TopologyConfig":
if self.deckies_per_lan_min > self.deckies_per_lan_max:
raise ValueError(
"deckies_per_lan_min must be <= deckies_per_lan_max"
)
if not self.randomize_services and not self.services_explicit:
raise ValueError(
"either randomize_services=True or services_explicit must be set"
)
return self
@dataclass
class _PlannedLAN:
"""In-memory LAN record emitted by the generator."""
name: str
subnet: str
is_dmz: bool
parent: Optional[str] # name of parent LAN, None for DMZ
@dataclass
class _PlannedDecky:
"""In-memory decky record emitted by the generator."""
name: str
services: list[str]
# Mapping LAN-name → assigned IP within that LAN's subnet.
ips_by_lan: dict[str, str] = field(default_factory=dict)
forwards_l3: bool = False # only meaningful when present on ≥2 LANs
@dataclass
class _PlannedEdge:
"""In-memory (decky, LAN) membership edge."""
decky_name: str
lan_name: str
is_bridge: bool
forwards_l3: bool
@dataclass
class GeneratedTopology:
"""Full in-memory output of :func:`decnet.topology.generator.generate`.
Names are unique within the topology. No UUIDs are assigned here —
those are minted by :mod:`decnet.topology.persistence` when the
topology is written to the repo.
"""
config: TopologyConfig
lans: list[_PlannedLAN]
deckies: list[_PlannedDecky]
edges: list[_PlannedEdge]

View File

@@ -0,0 +1,239 @@
"""MazeNET topology generator.
Produces a :class:`GeneratedTopology` — an in-memory DAG of LANs and
multi-homed deckies. Deterministic under ``config.seed``: the same seed
always yields the same structure, service assignments, and IP layout.
The generator only plans the structure. Persisting UUIDs to the repo
is :mod:`decnet.topology.persistence`; spawning Docker networks and
containers is :mod:`decnet.engine.deployer`.
"""
from __future__ import annotations
import random
from ipaddress import IPv4Network
from typing import Optional
from decnet.fleet import all_service_names
from decnet.topology.config import (
GeneratedTopology,
TopologyConfig,
_PlannedDecky,
_PlannedEdge,
_PlannedLAN,
)
# Range of services per randomly assigned decky (matches decnet.fleet).
_SVC_MIN = 1
_SVC_MAX = 3
def _plan_lans(
config: TopologyConfig, rng: random.Random
) -> list[_PlannedLAN]:
"""Plan LANs as a tree of depth ``config.depth``.
Each non-leaf level adds [1, branching_factor] children per parent.
LAN names and subnets are assigned in BFS order.
"""
lans: list[_PlannedLAN] = []
def _subnet(idx: int) -> str:
# Exhausting /24s at 172.X.0..255 caps topologies at 256 LANs on
# the default base. Well above the v1 envelope (depth=16 cap).
if idx > 255:
raise ValueError("too many LANs for the configured subnet_base_prefix")
return f"{config.subnet_base_prefix}.{idx}.0/24"
# DMZ root.
lans.append(
_PlannedLAN(name="LAN-00", subnet=_subnet(0), is_dmz=True, parent=None)
)
frontier: list[_PlannedLAN] = [lans[0]]
for _level in range(1, config.depth + 1):
next_frontier: list[_PlannedLAN] = []
for parent in frontier:
n_children = rng.randint(1, config.branching_factor) # nosec B311
for _ in range(n_children):
idx = len(lans)
child = _PlannedLAN(
name=f"LAN-{idx:02d}",
subnet=_subnet(idx),
is_dmz=False,
parent=parent.name,
)
lans.append(child)
next_frontier.append(child)
frontier = next_frontier
if not frontier:
break
return lans
def _host_pool(subnet: str) -> list[str]:
"""Usable host IPs in ``subnet``, skipping .1 (gateway)."""
net = IPv4Network(subnet, strict=False)
gateway = str(next(net.hosts()))
return [str(ip) for ip in net.hosts() if str(ip) != gateway]
def _pick_services(
rng: random.Random,
services_explicit: Optional[list[str]],
pool: list[str],
used_combos: set[frozenset],
) -> list[str]:
if services_explicit:
return list(services_explicit)
if not pool:
return []
attempts = 0
while True:
count = rng.randint(_SVC_MIN, min(_SVC_MAX, len(pool))) # nosec B311
chosen = frozenset(rng.sample(pool, count)) # nosec B311
attempts += 1
if chosen not in used_combos or attempts > 20:
break
used_combos.add(chosen)
return list(chosen)
def generate(config: TopologyConfig) -> GeneratedTopology:
"""Generate a topology plan deterministically under ``config.seed``.
The caller is responsible for persisting the plan via
:mod:`decnet.topology.persistence` and then deploying it.
"""
rng = random.Random(config.seed) # nosec B311
svc_pool = all_service_names() if config.randomize_services else []
used_combos: set[frozenset] = set()
lans = _plan_lans(config, rng)
lans_by_name = {lan.name: lan for lan in lans}
# Per-LAN IP pools for deterministic assignment.
ip_iters: dict[str, list[str]] = {
lan.name: _host_pool(lan.subnet) for lan in lans
}
ip_cursors: dict[str, int] = {lan.name: 0 for lan in lans}
def _take_ip(lan_name: str) -> str:
pool = ip_iters[lan_name]
i = ip_cursors[lan_name]
if i >= len(pool):
raise RuntimeError(f"LAN {lan_name} ran out of IPs")
ip_cursors[lan_name] = i + 1
return pool[i]
deckies: list[_PlannedDecky] = []
edges: list[_PlannedEdge] = []
decky_counter = 0
def _new_decky(home_lan: str) -> _PlannedDecky:
nonlocal decky_counter
decky_counter += 1
name = f"decky-{decky_counter:03d}"
services = _pick_services(
rng, config.services_explicit, svc_pool, used_combos
)
decky = _PlannedDecky(
name=name,
services=services,
ips_by_lan={home_lan: _take_ip(home_lan)},
)
deckies.append(decky)
return decky
# Populate each LAN with its own deckies.
for lan in lans:
if lan.is_dmz:
count = 1 # single DMZ decky (deaddeck)
else:
count = rng.randint( # nosec B311
config.deckies_per_lan_min, config.deckies_per_lan_max
)
if count < 1:
count = 1 # every LAN needs ≥1 decky to host the bridge
for _ in range(count):
decky = _new_decky(lan.name)
edges.append(
_PlannedEdge(
decky_name=decky.name,
lan_name=lan.name,
is_bridge=False,
forwards_l3=False,
)
)
# Parent↔child bridges. For every non-DMZ LAN, pick one of its
# deckies and multi-home it to the parent LAN. This decky becomes
# the bridge between the two segments.
deckies_by_lan: dict[str, list[_PlannedDecky]] = {lan.name: [] for lan in lans}
for e in edges:
deckies_by_lan[e.lan_name].append(
next(d for d in deckies if d.name == e.decky_name)
)
for lan in lans:
if lan.is_dmz or lan.parent is None:
continue
candidates = deckies_by_lan[lan.name]
bridge = rng.choice(candidates) # nosec B311
bridge.ips_by_lan[lan.parent] = _take_ip(lan.parent)
forwards = rng.random() < config.bridge_forward_probability # nosec B311
bridge.forwards_l3 = bridge.forwards_l3 or forwards
# Mark both existing edges as bridge edges for this decky, and
# add a new edge connecting it to the parent LAN.
for e in edges:
if e.decky_name == bridge.name:
e.is_bridge = True
e.forwards_l3 = bridge.forwards_l3
edges.append(
_PlannedEdge(
decky_name=bridge.name,
lan_name=lan.parent,
is_bridge=True,
forwards_l3=bridge.forwards_l3,
)
)
# Cross-edges: with probability p, pick a non-parent, non-child,
# non-self LAN and attach a random decky to it too. Turns the tree
# into a DAG. Only rolls on non-DMZ LANs with ≥1 candidate peer.
if config.cross_edge_probability > 0:
for lan in lans:
if lan.is_dmz:
continue
if rng.random() >= config.cross_edge_probability: # nosec B311
continue
forbidden = {lan.name, lan.parent}
forbidden |= {c.name for c in lans if c.parent == lan.name}
peers = [p for p in lans if p.name not in forbidden]
if not peers:
continue
peer = rng.choice(peers) # nosec B311
decky = rng.choice(deckies_by_lan[lan.name]) # nosec B311
if peer.name in decky.ips_by_lan:
continue # already connected, skip
decky.ips_by_lan[peer.name] = _take_ip(peer.name)
forwards = rng.random() < config.bridge_forward_probability # nosec B311
decky.forwards_l3 = decky.forwards_l3 or forwards
for e in edges:
if e.decky_name == decky.name:
e.is_bridge = True
e.forwards_l3 = decky.forwards_l3
edges.append(
_PlannedEdge(
decky_name=decky.name,
lan_name=peer.name,
is_bridge=True,
forwards_l3=decky.forwards_l3,
)
)
del lans_by_name # intermediate lookup, drop before returning
return GeneratedTopology(
config=config, lans=lans, deckies=deckies, edges=edges
)

View File

@@ -0,0 +1,123 @@
"""Adapter between :class:`GeneratedTopology` and the repository layer."""
from __future__ import annotations
from typing import Any
from decnet.topology.config import GeneratedTopology
from decnet.topology.status import TopologyStatus, assert_transition
async def persist(repo: Any, plan: GeneratedTopology) -> str:
"""Write a generated plan to the repo as a ``pending`` topology.
Returns the newly created topology id. All child rows are written
atomically relative to each other (SQLite transactions are per-call
here; the repo methods each commit — good enough for initial create
since the whole chain is invoked before any external side effects).
"""
topology_id = await repo.create_topology(
{
"name": plan.config.name,
"mode": plan.config.mode,
"config_snapshot": plan.config.model_dump(),
}
)
lan_ids: dict[str, str] = {}
for lan in plan.lans:
lan_id = await repo.add_lan(
{
"topology_id": topology_id,
"name": lan.name,
"subnet": lan.subnet,
"is_dmz": lan.is_dmz,
}
)
lan_ids[lan.name] = lan_id
decky_ids: dict[str, str] = {}
for decky in plan.deckies:
# Primary IP: the first LAN the decky was assigned to (insertion
# order of ips_by_lan, which reflects generator ordering —
# home LAN first, then any bridge targets).
primary_lan = next(iter(decky.ips_by_lan))
primary_ip = decky.ips_by_lan[primary_lan]
decky_uuid = await repo.add_topology_decky(
{
"topology_id": topology_id,
"name": decky.name,
"services": decky.services,
"decky_config": {
"name": decky.name,
"services": decky.services,
"ips_by_lan": decky.ips_by_lan,
"forwards_l3": decky.forwards_l3,
},
"ip": primary_ip,
}
)
decky_ids[decky.name] = decky_uuid
for edge in plan.edges:
await repo.add_topology_edge(
{
"topology_id": topology_id,
"decky_uuid": decky_ids[edge.decky_name],
"lan_id": lan_ids[edge.lan_name],
"is_bridge": edge.is_bridge,
"forwards_l3": edge.forwards_l3,
}
)
return topology_id
async def transition_status(
repo: Any,
topology_id: str,
new_status: str,
reason: str | None = None,
) -> None:
"""Legal-only status transition.
Raises :class:`decnet.topology.status.TopologyStatusError` if the
current status cannot legally transition to ``new_status``.
"""
topo = await repo.get_topology(topology_id)
if topo is None:
raise ValueError(f"topology {topology_id!r} not found")
assert_transition(topo["status"], new_status)
await repo.update_topology_status(topology_id, new_status, reason=reason)
async def hydrate(repo: Any, topology_id: str) -> dict[str, Any] | None:
"""Load a topology + children into a single dict for callers.
Shape::
{
"topology": { ...row... },
"lans": [ {...}, ... ],
"deckies": [ {...}, ... ],
"edges": [ {...}, ... ],
}
Returns ``None`` if the topology does not exist.
"""
topo = await repo.get_topology(topology_id)
if topo is None:
return None
lans = await repo.list_lans_for_topology(topology_id)
deckies = await repo.list_topology_deckies(topology_id)
edges = await repo.list_topology_edges(topology_id)
return {
"topology": topo,
"lans": lans,
"deckies": deckies,
"edges": edges,
}
# Re-export the status constants so callers can ``from decnet.topology.persistence
# import TopologyStatus`` without chasing modules.
__all__ = ["persist", "transition_status", "hydrate", "TopologyStatus"]

72
decnet/topology/status.py Normal file
View File

@@ -0,0 +1,72 @@
"""MazeNET topology status state machine.
Seven states — six active in v1. ``degraded`` is schema-reserved for the
future Healer worker and has no transitions into it from v1 code paths.
"""
from __future__ import annotations
class TopologyStatus:
PENDING = "pending"
DEPLOYING = "deploying"
ACTIVE = "active"
DEGRADED = "degraded"
FAILED = "failed"
TEARING_DOWN = "tearing_down"
TORN_DOWN = "torn_down"
ALL: frozenset[str] = frozenset(
{PENDING, DEPLOYING, ACTIVE, DEGRADED, FAILED, TEARING_DOWN, TORN_DOWN}
)
# Directed transitions. torn_down is terminal. degraded is unreachable
# in v1 (Healer would be the only writer), but its outbound edges stay
# defined so when Healer lands the state machine already accepts them.
_LEGAL: dict[str, frozenset[str]] = {
TopologyStatus.PENDING: frozenset(
{TopologyStatus.DEPLOYING, TopologyStatus.TORN_DOWN}
),
TopologyStatus.DEPLOYING: frozenset(
{
TopologyStatus.ACTIVE,
TopologyStatus.FAILED,
TopologyStatus.DEGRADED,
TopologyStatus.TEARING_DOWN,
}
),
TopologyStatus.ACTIVE: frozenset(
{TopologyStatus.DEGRADED, TopologyStatus.TEARING_DOWN}
),
TopologyStatus.DEGRADED: frozenset(
{TopologyStatus.ACTIVE, TopologyStatus.TEARING_DOWN}
),
TopologyStatus.FAILED: frozenset({TopologyStatus.TEARING_DOWN}),
TopologyStatus.TEARING_DOWN: frozenset(
{TopologyStatus.TORN_DOWN, TopologyStatus.DEGRADED}
),
TopologyStatus.TORN_DOWN: frozenset(),
}
class TopologyStatusError(ValueError):
"""Raised when an illegal topology status transition is attempted."""
def assert_transition(current: str, new: str) -> None:
"""Validate ``current → new`` or raise :class:`TopologyStatusError`."""
if current not in TopologyStatus.ALL:
raise TopologyStatusError(f"unknown current status: {current!r}")
if new not in TopologyStatus.ALL:
raise TopologyStatusError(f"unknown new status: {new!r}")
if new not in _LEGAL[current]:
raise TopologyStatusError(
f"illegal transition: {current!r}{new!r}"
)
def legal_next(current: str) -> frozenset[str]:
"""Return the set of legal successor statuses from ``current``."""
if current not in _LEGAL:
raise TopologyStatusError(f"unknown status: {current!r}")
return _LEGAL[current]

View File

@@ -0,0 +1,137 @@
"""MazeNET generator determinism + DAG shape tests."""
from __future__ import annotations
from collections import Counter
import pytest
from decnet.topology.config import TopologyConfig
from decnet.topology.generator import generate
def _cfg(**kw) -> TopologyConfig:
base = dict(
name="test",
depth=3,
branching_factor=2,
deckies_per_lan_min=2,
deckies_per_lan_max=2,
bridge_forward_probability=1.0,
cross_edge_probability=0.0,
randomize_services=True,
seed=42,
)
base.update(kw)
return TopologyConfig(**base)
def test_seed_is_deterministic():
a = generate(_cfg())
b = generate(_cfg())
# Same structure: same LAN names, same decky names, same edge set.
assert [lan.name for lan in a.lans] == [lan.name for lan in b.lans]
assert [d.name for d in a.deckies] == [d.name for d in b.deckies]
assert [(d.name, sorted(d.services)) for d in a.deckies] == [
(d.name, sorted(d.services)) for d in b.deckies
]
assert sorted((e.decky_name, e.lan_name) for e in a.edges) == sorted(
(e.decky_name, e.lan_name) for e in b.edges
)
def test_different_seed_yields_different_structure():
a = generate(_cfg(seed=1))
b = generate(_cfg(seed=2))
# With modest depth/branching, at least one of structure, service
# assignment, or edge count will differ — fail only if everything is
# byte-identical, which would indicate the seed is being ignored.
a_sig = (
[lan.name for lan in a.lans],
[(d.name, sorted(d.services)) for d in a.deckies],
sorted((e.decky_name, e.lan_name) for e in a.edges),
)
b_sig = (
[lan.name for lan in b.lans],
[(d.name, sorted(d.services)) for d in b.deckies],
sorted((e.decky_name, e.lan_name) for e in b.edges),
)
assert a_sig != b_sig
def test_dmz_is_exactly_one_lan():
t = generate(_cfg())
dmz = [lan for lan in t.lans if lan.is_dmz]
assert len(dmz) == 1
assert dmz[0].parent is None
assert dmz[0].name == "LAN-00"
def test_every_non_dmz_lan_has_exactly_one_bridge_into_parent():
t = generate(_cfg(branching_factor=2, depth=3))
# For each non-DMZ LAN, find the decky that is multi-homed to its parent.
for lan in t.lans:
if lan.is_dmz:
continue
bridges_to_parent = [
d for d in t.deckies
if lan.name in d.ips_by_lan and lan.parent in d.ips_by_lan
]
assert len(bridges_to_parent) >= 1, (
f"{lan.name} has no bridge into parent {lan.parent}"
)
def test_cross_edge_probability_zero_yields_tree():
"""With cross_edge_probability=0, a decky is bridged only to its home
LAN and (if it's the chosen bridge) its parent LAN — never to a
sibling or cousin. Validates by checking no decky is connected to
both a parent AND a non-parent non-home LAN."""
t = generate(_cfg(cross_edge_probability=0.0))
lans_by_name = {lan.name: lan for lan in t.lans}
for d in t.deckies:
if len(d.ips_by_lan) <= 1:
continue
# Home LAN = first membership. Other memberships must all be
# the parent of the home LAN, i.e. a single parent bridge.
home = next(iter(d.ips_by_lan))
others = [name for name in list(d.ips_by_lan.keys())[1:]]
parent = lans_by_name[home].parent
assert all(o == parent for o in others), (
f"tree mode but decky {d.name} bridges {home}{others} (parent={parent})"
)
def test_cross_edge_probability_one_produces_cross_edges_over_runs():
"""With probability=1, every non-DMZ LAN rolls a cross-edge (may be
skipped if no valid peer), so across a moderately branching topology
we expect ≥1 cross-edge."""
t = generate(_cfg(cross_edge_probability=1.0, depth=3, branching_factor=3))
lans_by_name = {lan.name: lan for lan in t.lans}
cross_edges = 0
for d in t.deckies:
if len(d.ips_by_lan) < 2:
continue
home = next(iter(d.ips_by_lan))
others = list(d.ips_by_lan.keys())[1:]
parent = lans_by_name[home].parent
for o in others:
if o != parent:
cross_edges += 1
assert cross_edges >= 1
def test_every_decky_has_at_least_one_edge():
t = generate(_cfg())
edge_deckies = Counter(e.decky_name for e in t.edges)
for d in t.deckies:
assert edge_deckies[d.name] >= 1
def test_dmz_has_exactly_one_decky():
t = generate(_cfg(deckies_per_lan_min=5, deckies_per_lan_max=5))
dmz_edges = [e for e in t.edges if e.lan_name == "LAN-00"]
# The DMZ LAN itself gets 1 decky + possibly acts as parent for
# bridge deckies from LAN-01/LAN-02 etc. The "home" decky count
# should be exactly 1.
home_only = [e for e in dmz_edges if not e.is_bridge]
assert len(home_only) == 1

View File

@@ -0,0 +1,91 @@
"""MazeNET persistence-layer tests: generator → repo → hydrate roundtrip."""
import pytest
from decnet.topology.config import TopologyConfig
from decnet.topology.generator import generate
from decnet.topology.persistence import (
hydrate,
persist,
transition_status,
)
from decnet.topology.status import TopologyStatus, TopologyStatusError
from decnet.web.db.factory import get_repository
@pytest.fixture
async def repo(tmp_path):
r = get_repository(db_path=str(tmp_path / "persist.db"))
await r.initialize()
return r
def _config(**kw) -> TopologyConfig:
base = dict(
name="roundtrip",
depth=2,
branching_factor=2,
deckies_per_lan_min=1,
deckies_per_lan_max=2,
cross_edge_probability=0.0,
randomize_services=True,
seed=7,
)
base.update(kw)
return TopologyConfig(**base)
@pytest.mark.anyio
async def test_persist_then_hydrate(repo):
plan = generate(_config())
tid = await persist(repo, plan)
hydrated = await hydrate(repo, tid)
assert hydrated is not None
assert hydrated["topology"]["name"] == "roundtrip"
assert hydrated["topology"]["status"] == TopologyStatus.PENDING
assert len(hydrated["lans"]) == len(plan.lans)
assert len(hydrated["deckies"]) == len(plan.deckies)
assert len(hydrated["edges"]) == len(plan.edges)
# LANs round-trip with their DMZ flag and subnet.
by_name = {lan["name"]: lan for lan in hydrated["lans"]}
for planned in plan.lans:
assert by_name[planned.name]["subnet"] == planned.subnet
assert by_name[planned.name]["is_dmz"] == planned.is_dmz
# Deckies round-trip their services as a list, not a string.
for d in hydrated["deckies"]:
assert isinstance(d["services"], list)
@pytest.mark.anyio
async def test_transition_status_enforces_legality(repo):
plan = generate(_config())
tid = await persist(repo, plan)
await transition_status(repo, tid, TopologyStatus.DEPLOYING, reason="go")
await transition_status(repo, tid, TopologyStatus.ACTIVE)
topo = await repo.get_topology(tid)
assert topo["status"] == TopologyStatus.ACTIVE
# Can't go from active directly back to pending.
with pytest.raises(TopologyStatusError):
await transition_status(repo, tid, TopologyStatus.PENDING)
# Unknown topology raises ValueError, not silent no-op.
with pytest.raises(ValueError):
await transition_status(repo, "does-not-exist", TopologyStatus.ACTIVE)
@pytest.mark.anyio
async def test_hydrate_missing_topology(repo):
assert await hydrate(repo, "no-such-id") is None
@pytest.mark.anyio
async def test_config_snapshot_preserves_seed(repo):
plan = generate(_config(seed=12345))
tid = await persist(repo, plan)
topo = await repo.get_topology(tid)
assert topo["config_snapshot"]["seed"] == 12345
assert topo["config_snapshot"]["depth"] == 2

View File

@@ -0,0 +1,55 @@
"""MazeNET status state-machine tests.
Every legal transition declared in the plan is permitted; every other
pair (including self-loops and unknowns) must raise.
"""
import pytest
from decnet.topology.status import (
TopologyStatus,
TopologyStatusError,
assert_transition,
legal_next,
)
LEGAL = {
(TopologyStatus.PENDING, TopologyStatus.DEPLOYING),
(TopologyStatus.PENDING, TopologyStatus.TORN_DOWN),
(TopologyStatus.DEPLOYING, TopologyStatus.ACTIVE),
(TopologyStatus.DEPLOYING, TopologyStatus.FAILED),
(TopologyStatus.DEPLOYING, TopologyStatus.DEGRADED),
(TopologyStatus.DEPLOYING, TopologyStatus.TEARING_DOWN),
(TopologyStatus.ACTIVE, TopologyStatus.DEGRADED),
(TopologyStatus.ACTIVE, TopologyStatus.TEARING_DOWN),
(TopologyStatus.DEGRADED, TopologyStatus.ACTIVE),
(TopologyStatus.DEGRADED, TopologyStatus.TEARING_DOWN),
(TopologyStatus.FAILED, TopologyStatus.TEARING_DOWN),
(TopologyStatus.TEARING_DOWN, TopologyStatus.TORN_DOWN),
(TopologyStatus.TEARING_DOWN, TopologyStatus.DEGRADED),
}
def test_every_legal_transition_permitted():
for cur, nxt in LEGAL:
assert_transition(cur, nxt) # no raise
def test_every_illegal_transition_raises():
for cur in TopologyStatus.ALL:
for nxt in TopologyStatus.ALL:
if (cur, nxt) in LEGAL:
continue
with pytest.raises(TopologyStatusError):
assert_transition(cur, nxt)
def test_torn_down_is_terminal():
assert legal_next(TopologyStatus.TORN_DOWN) == frozenset()
def test_unknown_status_raises():
with pytest.raises(TopologyStatusError):
assert_transition("pending", "bogus")
with pytest.raises(TopologyStatusError):
assert_transition("bogus", "active")
with pytest.raises(TopologyStatusError):
legal_next("bogus")