feat(topology): add pre-deploy validator and wire into deploy_topology

MazeNET phase 2 step 3. Blocks deploys of hand-authored topologies that
would fail mid-bring-up (orphan deckies, duplicate IPs, overlapping
subnets, unknown services) with a structured error list instead of a
docker error at startup.

Rules (one function each, composable by the editor for inline hints):
- exactly one DMZ
- every LAN has a bridge chain to the DMZ (BFS via multi-homed deckies)
- no orphan deckies
- unique LAN and decky names per topology
- no IP collisions + IPs inside their LAN's subnet
- no LAN subnet overlaps
- every service in decnet.fleet.all_service_names()
- service_config keys match the decky's declared services

deploy_topology runs the validator after hydrate, before any status
transition or Docker call; errors raise ValidationError and status
stays at pending.
This commit is contained in:
2026-04-20 17:45:32 -04:00
parent d4f4c58277
commit 2544d0294a
3 changed files with 491 additions and 0 deletions

View File

@@ -35,6 +35,7 @@ from decnet.topology.compose import (
)
from decnet.topology.persistence import hydrate, transition_status
from decnet.topology.status import TopologyStatus
from decnet.topology.validate import ValidationError, errors as _validation_errors, validate as _validate_topology
log = get_logger("engine")
console = Console()
@@ -318,6 +319,12 @@ async def deploy_topology(repo, topology_id: str, *, dry_run: bool = False) -> N
if hydrated is None:
raise ValueError(f"topology {topology_id!r} not found")
# Precondition: validate before any status transition or Docker call.
# Errors bubble up as ValidationError and leave status untouched.
issues = _validate_topology(hydrated)
if _validation_errors(issues):
raise ValidationError(issues)
lans = hydrated["lans"]
compose_path = _topology_compose_path(topology_id)

306
decnet/topology/validate.py Normal file
View File

@@ -0,0 +1,306 @@
"""Pre-deploy validator for MazeNET topologies.
Consumes a hydrated dict (output of
:func:`decnet.topology.persistence.hydrate`) and returns a list of
:class:`ValidationIssue` records. The deployer calls :func:`validate`
before transitioning to ``DEPLOYING`` and refuses to proceed if any
issue has ``severity=="error"``.
Rules are independent functions so the web editor can surface them as
inline diagnostics without running the full list.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from ipaddress import IPv4Address, IPv4Network
from typing import Any, Callable, Literal
from decnet.fleet import all_service_names
Severity = Literal["error", "warning"]
@dataclass
class ValidationIssue:
severity: Severity
code: str
message: str
target: dict = field(default_factory=dict)
class ValidationError(Exception):
"""Raised by the deployer when a topology fails pre-deploy checks."""
def __init__(self, issues: list[ValidationIssue]) -> None:
self.issues = issues
errors = [i for i in issues if i.severity == "error"]
super().__init__(
f"{len(errors)} topology validation error(s): "
+ "; ".join(f"[{i.code}] {i.message}" for i in errors)
)
# --------------------------------------------------------------------- rules
def check_exactly_one_dmz(h: dict[str, Any]) -> list[ValidationIssue]:
dmzs = [lan for lan in h["lans"] if lan.get("is_dmz")]
if len(dmzs) == 1:
return []
if not dmzs:
return [
ValidationIssue("error", "DMZ_MISSING", "no LAN is marked is_dmz=True")
]
return [
ValidationIssue(
"error",
"DMZ_MULTIPLE",
f"{len(dmzs)} LANs marked is_dmz=True; exactly one allowed",
target={"lans": [lan["name"] for lan in dmzs]},
)
]
def check_all_lans_connected_to_dmz(
h: dict[str, Any],
) -> list[ValidationIssue]:
lans = {lan["id"]: lan for lan in h["lans"]}
if not lans:
return []
dmz = next((lan for lan in h["lans"] if lan.get("is_dmz")), None)
if dmz is None:
return [] # covered by check_exactly_one_dmz
# Adjacency: LANs share an edge if ≥1 bridge decky is attached to both.
decky_lans: dict[str, set[str]] = {}
for edge in h["edges"]:
decky_lans.setdefault(edge["decky_uuid"], set()).add(edge["lan_id"])
adj: dict[str, set[str]] = {lid: set() for lid in lans}
for lan_ids in decky_lans.values():
if len(lan_ids) < 2:
continue
for a in lan_ids:
for b in lan_ids:
if a != b:
adj[a].add(b)
reachable = {dmz["id"]}
frontier = [dmz["id"]]
while frontier:
nxt: list[str] = []
for lid in frontier:
for peer in adj[lid]:
if peer not in reachable:
reachable.add(peer)
nxt.append(peer)
frontier = nxt
orphans = [lans[lid]["name"] for lid in lans if lid not in reachable]
if not orphans:
return []
return [
ValidationIssue(
"error",
"DMZ_ORPHAN",
f"LAN(s) have no bridge path to the DMZ: {', '.join(orphans)}",
target={"lans": orphans},
)
]
def check_no_orphan_deckies(h: dict[str, Any]) -> list[ValidationIssue]:
attached: set[str] = {e["decky_uuid"] for e in h["edges"]}
issues: list[ValidationIssue] = []
for d in h["deckies"]:
if d["uuid"] not in attached:
issues.append(
ValidationIssue(
"error",
"DECKY_ORPHAN",
f"decky {d['name']!r} has no LAN edges",
target={"decky": d["name"]},
)
)
return issues
def check_names_unique(h: dict[str, Any]) -> list[ValidationIssue]:
issues: list[ValidationIssue] = []
seen_lan: set[str] = set()
for lan in h["lans"]:
if lan["name"] in seen_lan:
issues.append(
ValidationIssue(
"error",
"LAN_NAME_DUP",
f"duplicate LAN name {lan['name']!r}",
target={"lan": lan["name"]},
)
)
seen_lan.add(lan["name"])
seen_decky: set[str] = set()
for d in h["deckies"]:
if d["name"] in seen_decky:
issues.append(
ValidationIssue(
"error",
"DECKY_NAME_DUP",
f"duplicate decky name {d['name']!r}",
target={"decky": d["name"]},
)
)
seen_decky.add(d["name"])
return issues
def check_no_ip_collisions(h: dict[str, Any]) -> list[ValidationIssue]:
lans_by_name = {lan["name"]: lan for lan in h["lans"]}
per_lan_ips: dict[str, dict[str, str]] = {} # lan_name → {ip: decky_name}
issues: list[ValidationIssue] = []
for d in h["deckies"]:
ips_by_lan: dict[str, str] = (d.get("decky_config") or {}).get(
"ips_by_lan", {}
)
for lan_name, ip in ips_by_lan.items():
lan = lans_by_name.get(lan_name)
if lan is None:
issues.append(
ValidationIssue(
"error",
"IP_UNKNOWN_LAN",
f"decky {d['name']!r} claims IP in unknown LAN "
f"{lan_name!r}",
target={"decky": d["name"], "lan": lan_name},
)
)
continue
# Out-of-subnet check.
try:
if IPv4Address(ip) not in IPv4Network(lan["subnet"]):
issues.append(
ValidationIssue(
"error",
"IP_OUT_OF_SUBNET",
f"{ip} not inside {lan['subnet']} "
f"(decky {d['name']!r}, LAN {lan_name!r})",
target={"decky": d["name"], "lan": lan_name, "ip": ip},
)
)
except (ValueError, TypeError):
issues.append(
ValidationIssue(
"error",
"IP_MALFORMED",
f"decky {d['name']!r}: malformed IP {ip!r}",
target={"decky": d["name"], "ip": ip},
)
)
continue
bucket = per_lan_ips.setdefault(lan_name, {})
if ip in bucket:
issues.append(
ValidationIssue(
"error",
"IP_COLLISION",
f"IP {ip} claimed by both {bucket[ip]!r} and "
f"{d['name']!r} in LAN {lan_name!r}",
target={
"lan": lan_name,
"ip": ip,
"deckies": [bucket[ip], d["name"]],
},
)
)
else:
bucket[ip] = d["name"]
return issues
def check_no_subnet_overlap(h: dict[str, Any]) -> list[ValidationIssue]:
nets: list[tuple[str, IPv4Network]] = []
issues: list[ValidationIssue] = []
for lan in h["lans"]:
try:
nets.append((lan["name"], IPv4Network(lan["subnet"])))
except ValueError:
issues.append(
ValidationIssue(
"error",
"SUBNET_MALFORMED",
f"LAN {lan['name']!r}: malformed subnet {lan['subnet']!r}",
target={"lan": lan["name"]},
)
)
for i, (na, a) in enumerate(nets):
for nb, b in nets[i + 1 :]:
if a.overlaps(b):
issues.append(
ValidationIssue(
"error",
"SUBNET_OVERLAP",
f"LAN {na!r} ({a}) overlaps LAN {nb!r} ({b})",
target={"lans": [na, nb]},
)
)
return issues
def check_services_known(h: dict[str, Any]) -> list[ValidationIssue]:
known = set(all_service_names())
issues: list[ValidationIssue] = []
for d in h["deckies"]:
for svc in d.get("services", []):
if svc not in known:
issues.append(
ValidationIssue(
"error",
"UNKNOWN_SERVICE",
f"decky {d['name']!r}: unknown service {svc!r}",
target={"decky": d["name"], "service": svc},
)
)
return issues
def check_service_config_shape(h: dict[str, Any]) -> list[ValidationIssue]:
issues: list[ValidationIssue] = []
for d in h["deckies"]:
svc_cfg = (d.get("decky_config") or {}).get("service_config") or {}
declared = set(d.get("services", []))
for svc_name in svc_cfg:
if svc_name not in declared:
issues.append(
ValidationIssue(
"error",
"SERVICE_CFG_UNDECLARED",
f"decky {d['name']!r}: service_config for "
f"{svc_name!r} but service not in services list",
target={"decky": d["name"], "service": svc_name},
)
)
return issues
_RULES: list[Callable[[dict[str, Any]], list[ValidationIssue]]] = [
check_exactly_one_dmz,
check_all_lans_connected_to_dmz,
check_no_orphan_deckies,
check_names_unique,
check_no_ip_collisions,
check_no_subnet_overlap,
check_services_known,
check_service_config_shape,
]
def validate(hydrated: dict[str, Any]) -> list[ValidationIssue]:
"""Run every rule and return the flat list of issues (may be empty)."""
out: list[ValidationIssue] = []
for rule in _RULES:
out.extend(rule(hydrated))
return out
def errors(issues: list[ValidationIssue]) -> list[ValidationIssue]:
return [i for i in issues if i.severity == "error"]

View File

@@ -0,0 +1,178 @@
"""Validator-rule unit tests + deployer precondition integration."""
from __future__ import annotations
from unittest.mock import patch
import pytest
from decnet.engine.deployer import deploy_topology
from decnet.topology.config import TopologyConfig
from decnet.topology.generator import generate
from decnet.topology.persistence import hydrate, persist
from decnet.topology.status import TopologyStatus
from decnet.topology.validate import (
ValidationError,
errors,
validate,
)
from decnet.web.db.factory import get_repository
def _cfg(**kw) -> TopologyConfig:
base = dict(
name="val",
depth=1,
branching_factor=1,
deckies_per_lan_min=1,
deckies_per_lan_max=1,
cross_edge_probability=0.0,
randomize_services=False,
services_explicit=["ssh"],
seed=9,
)
base.update(kw)
return TopologyConfig(**base)
@pytest.fixture
async def repo(tmp_path):
r = get_repository(db_path=str(tmp_path / "val.db"))
await r.initialize()
return r
async def _hydrate_plan(repo, plan) -> dict:
tid = await persist(repo, plan)
return await hydrate(repo, tid), tid
# --------------------------------------------------------------------- rules
@pytest.mark.anyio
async def test_valid_topology_has_no_errors(repo):
plan = generate(_cfg())
h, _ = await _hydrate_plan(repo, plan)
assert errors(validate(h)) == []
@pytest.mark.anyio
async def test_dmz_missing(repo):
plan = generate(_cfg())
h, _ = await _hydrate_plan(repo, plan)
for lan in h["lans"]:
lan["is_dmz"] = False
codes = [i.code for i in validate(h) if i.severity == "error"]
# DMZ_MISSING plus cascaded DMZ_ORPHAN checks are both acceptable;
# the specific rule must fire at minimum.
assert "DMZ_MISSING" in codes
@pytest.mark.anyio
async def test_dmz_multiple(repo):
plan = generate(_cfg())
h, _ = await _hydrate_plan(repo, plan)
for lan in h["lans"]:
lan["is_dmz"] = True
assert "DMZ_MULTIPLE" in [i.code for i in validate(h)]
@pytest.mark.anyio
async def test_orphan_decky(repo):
plan = generate(_cfg())
h, _ = await _hydrate_plan(repo, plan)
h["edges"] = [e for e in h["edges"] if e["decky_uuid"] != h["deckies"][0]["uuid"]]
assert "DECKY_ORPHAN" in [i.code for i in validate(h)]
@pytest.mark.anyio
async def test_ip_collision(repo):
plan = generate(_cfg(deckies_per_lan_max=2, deckies_per_lan_min=2))
h, _ = await _hydrate_plan(repo, plan)
# Force two deckies in the same LAN to claim the same IP.
deckies = [
d for d in h["deckies"]
if any(
e["decky_uuid"] == d["uuid"]
for e in h["edges"]
if e["lan_id"] == h["lans"][0]["id"]
)
]
assert len(deckies) >= 2
shared_ip = next(iter(deckies[0]["decky_config"]["ips_by_lan"].values()))
deckies[1]["decky_config"]["ips_by_lan"][h["lans"][0]["name"]] = shared_ip
assert "IP_COLLISION" in [i.code for i in validate(h)]
@pytest.mark.anyio
async def test_ip_out_of_subnet(repo):
plan = generate(_cfg())
h, _ = await _hydrate_plan(repo, plan)
d = h["deckies"][0]
lan_name = next(iter(d["decky_config"]["ips_by_lan"]))
d["decky_config"]["ips_by_lan"][lan_name] = "10.99.99.99"
assert "IP_OUT_OF_SUBNET" in [i.code for i in validate(h)]
@pytest.mark.anyio
async def test_subnet_overlap(repo):
plan = generate(_cfg())
h, _ = await _hydrate_plan(repo, plan)
# Shrink two LANs onto overlapping /16s.
h["lans"][0]["subnet"] = "10.0.0.0/16"
if len(h["lans"]) > 1:
h["lans"][1]["subnet"] = "10.0.5.0/24"
codes = [i.code for i in validate(h)]
assert "SUBNET_OVERLAP" in codes
@pytest.mark.anyio
async def test_unknown_service(repo):
plan = generate(_cfg())
h, _ = await _hydrate_plan(repo, plan)
h["deckies"][0]["services"].append("teleporter-xyz")
assert "UNKNOWN_SERVICE" in [i.code for i in validate(h)]
@pytest.mark.anyio
async def test_service_config_undeclared(repo):
plan = generate(_cfg())
h, _ = await _hydrate_plan(repo, plan)
h["deckies"][0]["decky_config"]["service_config"] = {
"rdp": {"password": "no"}
}
# "rdp" is not in the decky's services list (which is ["ssh"]).
assert "SERVICE_CFG_UNDECLARED" in [i.code for i in validate(h)]
# --------------------------------------------------------------------- deployer hook
@pytest.mark.anyio
async def test_deploy_aborts_on_validation_error(repo, tmp_path, monkeypatch):
"""Broken topology must be rejected before any Docker call."""
monkeypatch.chdir(tmp_path)
plan = generate(_cfg())
tid = await persist(repo, plan)
# Corrupt the persisted state: strip the DMZ flag.
lan = (await repo.list_lans_for_topology(tid))[0]
# Use raw repo path — SQLModel UPDATE via get + setattr.
from sqlmodel import select
from decnet.web.db.models import LAN
async with repo._session() as s:
row = (await s.execute(select(LAN).where(LAN.id == lan["id"]))).scalar_one()
row.is_dmz = False
s.add(row)
await s.commit()
class _ShouldNotCall:
def from_env(self): # noqa: D401
raise AssertionError("docker must not be called on a rejected topology")
with patch("decnet.engine.deployer.docker", _ShouldNotCall()):
with pytest.raises(ValidationError):
await deploy_topology(repo, tid)
topo = await repo.get_topology(tid)
assert topo["status"] == TopologyStatus.PENDING