Replaces LICENSE (GPLv3 -> AGPLv3) and prepends `SPDX-License-Identifier: AGPL-3.0-or-later` to every source file across decnet/, decnet_web/, tests/, scripts/, and tools/. Rationale: closes the GPLv3 ASP loophole so any party operating a modified DECNET as a network service must offer their modified source. Personal copyright (Samuel Paschuan) + inbound=outbound contributions make a future unilateral relicense infeasible. - LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt) - COPYRIGHT: project copyright notice - tools/add_spdx_headers.py: idempotent header injector (shebang- and PEP 263-aware) Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh). No behavior change; comments only.
430 lines
15 KiB
Python
430 lines
15 KiB
Python
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
"""Pre-deploy validator for MazeNET topologies.
|
|
|
|
Consumes a hydrated dict (output of
|
|
:func:`decnet.topology.persistence.hydrate`) and returns a list of
|
|
:class:`ValidationIssue` records. The deployer calls :func:`validate`
|
|
before transitioning to ``DEPLOYING`` and refuses to proceed if any
|
|
issue has ``severity=="error"``.
|
|
|
|
Rules are independent functions so the web editor can surface them as
|
|
inline diagnostics without running the full list.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from ipaddress import IPv4Address, IPv4Network
|
|
from typing import Any, Callable, Literal
|
|
|
|
from decnet.fleet import all_service_names
|
|
from decnet.logging import get_logger
|
|
from decnet.services.registry import get_service
|
|
|
|
log = get_logger("topology.validate")
|
|
|
|
Severity = Literal["error", "warning"]
|
|
|
|
|
|
@dataclass
|
|
class ValidationIssue:
|
|
severity: Severity
|
|
code: str
|
|
message: str
|
|
target: dict = field(default_factory=dict)
|
|
|
|
|
|
class ValidationError(Exception):
|
|
"""Raised by the deployer when a topology fails pre-deploy checks."""
|
|
|
|
def __init__(self, issues: list[ValidationIssue]) -> None:
|
|
self.issues = issues
|
|
errors = [i for i in issues if i.severity == "error"]
|
|
super().__init__(
|
|
f"{len(errors)} topology validation error(s): "
|
|
+ "; ".join(f"[{i.code}] {i.message}" for i in errors)
|
|
)
|
|
|
|
|
|
# --------------------------------------------------------------------- rules
|
|
|
|
|
|
def check_exactly_one_dmz(h: dict[str, Any]) -> list[ValidationIssue]:
|
|
dmzs = [lan for lan in h["lans"] if lan.get("is_dmz")]
|
|
if len(dmzs) == 1:
|
|
return []
|
|
if not dmzs:
|
|
return [
|
|
ValidationIssue("error", "DMZ_MISSING", "no LAN is marked is_dmz=True")
|
|
]
|
|
return [
|
|
ValidationIssue(
|
|
"error",
|
|
"DMZ_MULTIPLE",
|
|
f"{len(dmzs)} LANs marked is_dmz=True; exactly one allowed",
|
|
target={"lans": [lan["name"] for lan in dmzs]},
|
|
)
|
|
]
|
|
|
|
|
|
def check_all_lans_connected_to_dmz(
|
|
h: dict[str, Any],
|
|
) -> list[ValidationIssue]:
|
|
lans = {lan["id"]: lan for lan in h["lans"]}
|
|
if not lans:
|
|
return []
|
|
dmz = next((lan for lan in h["lans"] if lan.get("is_dmz")), None)
|
|
if dmz is None:
|
|
return [] # covered by check_exactly_one_dmz
|
|
|
|
# Adjacency: LANs share an edge if ≥1 bridge decky is attached to both.
|
|
decky_lans: dict[str, set[str]] = {}
|
|
for edge in h["edges"]:
|
|
decky_lans.setdefault(edge["decky_uuid"], set()).add(edge["lan_id"])
|
|
|
|
adj: dict[str, set[str]] = {lid: set() for lid in lans}
|
|
for lan_ids in decky_lans.values():
|
|
if len(lan_ids) < 2:
|
|
continue
|
|
for a in lan_ids:
|
|
for b in lan_ids:
|
|
if a != b:
|
|
adj[a].add(b)
|
|
|
|
reachable = {dmz["id"]}
|
|
frontier = [dmz["id"]]
|
|
while frontier:
|
|
nxt: list[str] = []
|
|
for lid in frontier:
|
|
for peer in adj[lid]:
|
|
if peer not in reachable:
|
|
reachable.add(peer)
|
|
nxt.append(peer)
|
|
frontier = nxt
|
|
|
|
orphans = [lans[lid]["name"] for lid in lans if lid not in reachable]
|
|
if not orphans:
|
|
return []
|
|
return [
|
|
ValidationIssue(
|
|
"error",
|
|
"DMZ_ORPHAN",
|
|
f"LAN(s) have no bridge path to the DMZ: {', '.join(orphans)}",
|
|
target={"lans": orphans},
|
|
)
|
|
]
|
|
|
|
|
|
def check_no_orphan_deckies(h: dict[str, Any]) -> list[ValidationIssue]:
|
|
attached: set[str] = {e["decky_uuid"] for e in h["edges"]}
|
|
issues: list[ValidationIssue] = []
|
|
for d in h["deckies"]:
|
|
if d["uuid"] not in attached:
|
|
issues.append(
|
|
ValidationIssue(
|
|
"error",
|
|
"DECKY_ORPHAN",
|
|
f"decky {d['name']!r} has no LAN edges",
|
|
target={"decky": d["name"]},
|
|
)
|
|
)
|
|
return issues
|
|
|
|
|
|
def check_names_unique(h: dict[str, Any]) -> list[ValidationIssue]:
|
|
issues: list[ValidationIssue] = []
|
|
seen_lan: set[str] = set()
|
|
for lan in h["lans"]:
|
|
if lan["name"] in seen_lan:
|
|
issues.append(
|
|
ValidationIssue(
|
|
"error",
|
|
"LAN_NAME_DUP",
|
|
f"duplicate LAN name {lan['name']!r}",
|
|
target={"lan": lan["name"]},
|
|
)
|
|
)
|
|
seen_lan.add(lan["name"])
|
|
seen_decky: set[str] = set()
|
|
for d in h["deckies"]:
|
|
if d["name"] in seen_decky:
|
|
issues.append(
|
|
ValidationIssue(
|
|
"error",
|
|
"DECKY_NAME_DUP",
|
|
f"duplicate decky name {d['name']!r}",
|
|
target={"decky": d["name"]},
|
|
)
|
|
)
|
|
seen_decky.add(d["name"])
|
|
return issues
|
|
|
|
|
|
def check_no_ip_collisions(h: dict[str, Any]) -> list[ValidationIssue]:
|
|
lans_by_name = {lan["name"]: lan for lan in h["lans"]}
|
|
per_lan_ips: dict[str, dict[str, str]] = {} # lan_name → {ip: decky_name}
|
|
issues: list[ValidationIssue] = []
|
|
for d in h["deckies"]:
|
|
ips_by_lan: dict[str, str] = (d.get("decky_config") or {}).get(
|
|
"ips_by_lan", {}
|
|
)
|
|
for lan_name, ip in ips_by_lan.items():
|
|
lan = lans_by_name.get(lan_name)
|
|
if lan is None:
|
|
issues.append(
|
|
ValidationIssue(
|
|
"error",
|
|
"IP_UNKNOWN_LAN",
|
|
f"decky {d['name']!r} claims IP in unknown LAN "
|
|
f"{lan_name!r}",
|
|
target={"decky": d["name"], "lan": lan_name},
|
|
)
|
|
)
|
|
continue
|
|
# Out-of-subnet check.
|
|
try:
|
|
if IPv4Address(ip) not in IPv4Network(lan["subnet"]):
|
|
issues.append(
|
|
ValidationIssue(
|
|
"error",
|
|
"IP_OUT_OF_SUBNET",
|
|
f"{ip} not inside {lan['subnet']} "
|
|
f"(decky {d['name']!r}, LAN {lan_name!r})",
|
|
target={"decky": d["name"], "lan": lan_name, "ip": ip},
|
|
)
|
|
)
|
|
except (ValueError, TypeError):
|
|
issues.append(
|
|
ValidationIssue(
|
|
"error",
|
|
"IP_MALFORMED",
|
|
f"decky {d['name']!r}: malformed IP {ip!r}",
|
|
target={"decky": d["name"], "ip": ip},
|
|
)
|
|
)
|
|
continue
|
|
bucket = per_lan_ips.setdefault(lan_name, {})
|
|
if ip in bucket:
|
|
issues.append(
|
|
ValidationIssue(
|
|
"error",
|
|
"IP_COLLISION",
|
|
f"IP {ip} claimed by both {bucket[ip]!r} and "
|
|
f"{d['name']!r} in LAN {lan_name!r}",
|
|
target={
|
|
"lan": lan_name,
|
|
"ip": ip,
|
|
"deckies": [bucket[ip], d["name"]],
|
|
},
|
|
)
|
|
)
|
|
else:
|
|
bucket[ip] = d["name"]
|
|
return issues
|
|
|
|
|
|
def check_no_subnet_overlap(h: dict[str, Any]) -> list[ValidationIssue]:
|
|
nets: list[tuple[str, IPv4Network]] = []
|
|
issues: list[ValidationIssue] = []
|
|
for lan in h["lans"]:
|
|
try:
|
|
nets.append((lan["name"], IPv4Network(lan["subnet"])))
|
|
except ValueError:
|
|
issues.append(
|
|
ValidationIssue(
|
|
"error",
|
|
"SUBNET_MALFORMED",
|
|
f"LAN {lan['name']!r}: malformed subnet {lan['subnet']!r}",
|
|
target={"lan": lan["name"]},
|
|
)
|
|
)
|
|
for i, (na, a) in enumerate(nets):
|
|
for nb, b in nets[i + 1 :]:
|
|
if a.overlaps(b):
|
|
issues.append(
|
|
ValidationIssue(
|
|
"error",
|
|
"SUBNET_OVERLAP",
|
|
f"LAN {na!r} ({a}) overlaps LAN {nb!r} ({b})",
|
|
target={"lans": [na, nb]},
|
|
)
|
|
)
|
|
return issues
|
|
|
|
|
|
def check_services_known(h: dict[str, Any]) -> list[ValidationIssue]:
|
|
known = set(all_service_names())
|
|
issues: list[ValidationIssue] = []
|
|
for d in h["deckies"]:
|
|
for svc in d.get("services", []):
|
|
if svc not in known:
|
|
issues.append(
|
|
ValidationIssue(
|
|
"error",
|
|
"UNKNOWN_SERVICE",
|
|
f"decky {d['name']!r}: unknown service {svc!r}",
|
|
target={"decky": d["name"], "service": svc},
|
|
)
|
|
)
|
|
return issues
|
|
|
|
|
|
def check_service_config_shape(h: dict[str, Any]) -> list[ValidationIssue]:
|
|
issues: list[ValidationIssue] = []
|
|
for d in h["deckies"]:
|
|
svc_cfg = (d.get("decky_config") or {}).get("service_config") or {}
|
|
declared = set(d.get("services", []))
|
|
for svc_name in svc_cfg:
|
|
if svc_name not in declared:
|
|
issues.append(
|
|
ValidationIssue(
|
|
"error",
|
|
"SERVICE_CFG_UNDECLARED",
|
|
f"decky {d['name']!r}: service_config for "
|
|
f"{svc_name!r} but service not in services list",
|
|
target={"decky": d["name"], "service": svc_name},
|
|
)
|
|
)
|
|
return issues
|
|
|
|
|
|
def check_gateway_homed_in_dmz(h: dict[str, Any]) -> list[ValidationIssue]:
|
|
"""Gateway deckies must live in a DMZ LAN.
|
|
|
|
``forwards_l3=True`` triggers host-port publishing in the compose
|
|
generator (see :mod:`decnet.topology.compose`); a gateway sitting
|
|
on an internal LAN would publish ports on the host without anyone
|
|
on the right side of the perimeter able to reach the service
|
|
legitimately. The semantic is "this decky is the front door" —
|
|
only meaningful when the LAN is the DMZ.
|
|
|
|
Not in ``_RULES``: ``forwards_l3`` encodes two semantics — internal
|
|
bridge routing (generator-assigned, legitimately on non-DMZ LANs) and
|
|
DMZ gateway publication (operator-assigned, must be DMZ-homed).
|
|
Standing validation cannot distinguish them; this check is therefore
|
|
path-specific and called only on the explicit operator flip path
|
|
(``forwards_l3: False → True`` via ``apply_update_decky``).
|
|
"""
|
|
if not h.get("deckies"):
|
|
return []
|
|
|
|
lans_by_id = {lan["id"]: lan for lan in h["lans"]}
|
|
dmz_lan_ids = {
|
|
lan["id"] for lan in h["lans"] if lan.get("is_dmz")
|
|
}
|
|
dmz_lan_names = {
|
|
lan["name"] for lan in h["lans"] if lan.get("is_dmz")
|
|
}
|
|
|
|
# Home-LAN selection mirrors the frontend hydration: prefer the
|
|
# non-bridge edge. Falls back to the first edge if no
|
|
# is_bridge flag is set (legacy rows).
|
|
home_lan_for: dict[str, str] = {} # decky_uuid → lan_id
|
|
for e in h["edges"]:
|
|
if e.get("is_bridge") is False and e["decky_uuid"] not in home_lan_for:
|
|
home_lan_for[e["decky_uuid"]] = e["lan_id"]
|
|
for e in h["edges"]:
|
|
if e["decky_uuid"] in home_lan_for:
|
|
continue
|
|
home_lan_for[e["decky_uuid"]] = e["lan_id"]
|
|
|
|
issues: list[ValidationIssue] = []
|
|
for d in h["deckies"]:
|
|
cfg = d.get("decky_config") or {}
|
|
if not cfg.get("forwards_l3"):
|
|
continue
|
|
home_lan_id = home_lan_for.get(d["uuid"])
|
|
if home_lan_id is None or home_lan_id not in dmz_lan_ids:
|
|
home_lan_name = (
|
|
lans_by_id.get(home_lan_id, {}).get("name")
|
|
if home_lan_id
|
|
else "(no home LAN)"
|
|
)
|
|
allowed = ", ".join(sorted(dmz_lan_names)) or "(no DMZ defined)"
|
|
issues.append(
|
|
ValidationIssue(
|
|
"error",
|
|
"GATEWAY_NOT_IN_DMZ",
|
|
f"gateway decky {d['name']!r} is on LAN "
|
|
f"{home_lan_name!r}; gateways must home in a DMZ "
|
|
f"LAN ({allowed})",
|
|
target={"decky": d["name"], "lan": home_lan_name},
|
|
)
|
|
)
|
|
return issues
|
|
|
|
|
|
def check_no_host_port_collision(h: dict[str, Any]) -> list[ValidationIssue]:
|
|
"""Flag gateway service ports that are already bound on the host.
|
|
|
|
Only gateway deckies (``forwards_l3=True`` in decky_config) publish
|
|
ports (see decnet/topology/compose.py). Best-effort: if ``psutil``
|
|
isn't importable or probing fails, returns no issues.
|
|
"""
|
|
wanted: dict[int, str] = {} # host_port → gateway decky name
|
|
for d in h["deckies"]:
|
|
cfg = d.get("decky_config") or {}
|
|
if not cfg.get("forwards_l3"):
|
|
continue
|
|
for svc_name in d.get("services", []):
|
|
svc = get_service(svc_name)
|
|
if svc is None or getattr(svc, "fleet_singleton", False):
|
|
continue
|
|
for port in getattr(svc, "ports", []) or []:
|
|
wanted.setdefault(int(port), d["name"])
|
|
if not wanted:
|
|
return []
|
|
|
|
try:
|
|
import psutil
|
|
bound = {
|
|
c.laddr.port
|
|
for c in psutil.net_connections(kind="inet")
|
|
if c.status == psutil.CONN_LISTEN and c.laddr
|
|
}
|
|
except ImportError:
|
|
log.warning("psutil not available; skipping host port collision check")
|
|
return []
|
|
|
|
issues: list[ValidationIssue] = []
|
|
for port, decky_name in wanted.items():
|
|
if port in bound:
|
|
issues.append(
|
|
ValidationIssue(
|
|
"warning",
|
|
"PORT_COLLISION",
|
|
f"host port {port} is already bound; "
|
|
f"gateway {decky_name!r} may fail to publish it",
|
|
target={"decky": decky_name, "port": port},
|
|
)
|
|
)
|
|
return issues
|
|
|
|
|
|
# Pure-data rules. Host-state rules (like PORT_COLLISION) are
|
|
# *not* listed here — they're called separately by the live deployer
|
|
# so that unit tests exercising validate() stay hermetic.
|
|
_RULES: list[Callable[[dict[str, Any]], list[ValidationIssue]]] = [
|
|
check_exactly_one_dmz,
|
|
check_all_lans_connected_to_dmz,
|
|
check_no_orphan_deckies,
|
|
check_names_unique,
|
|
check_no_ip_collisions,
|
|
check_no_subnet_overlap,
|
|
check_services_known,
|
|
check_service_config_shape,
|
|
]
|
|
# check_gateway_homed_in_dmz is intentionally absent — it is path-specific
|
|
# (forwards_l3 overloads two semantics). See its docstring.
|
|
|
|
|
|
def validate(hydrated: dict[str, Any]) -> list[ValidationIssue]:
|
|
"""Run every rule and return the flat list of issues (may be empty)."""
|
|
out: list[ValidationIssue] = []
|
|
for rule in _RULES:
|
|
out.extend(rule(hydrated))
|
|
return out
|
|
|
|
|
|
def errors(issues: list[ValidationIssue]) -> list[ValidationIssue]:
|
|
return [i for i in issues if i.severity == "error"]
|