diff --git a/decnet/mutator/ops.py b/decnet/mutator/ops.py index e91393b9..e68edf8a 100644 --- a/decnet/mutator/ops.py +++ b/decnet/mutator/ops.py @@ -847,6 +847,39 @@ async def apply_update_decky( new_forwards_l3 = bool(new_decky_config.get("forwards_l3", False)) forwards_l3_flipped = new_forwards_l3 != old_forwards_l3 + # Promotion path: refuse to flip a non-DMZ decky to gateway. The + # 'gateway' semantic specifically means 'host-port publisher facing + # the DMZ' — running it on an internal LAN publishes ports the + # outside world can't reach and shadows the host's port space. + # Generic L3-bridge forwards_l3 (internal multi-homing) is set by + # the generator/attach paths, not by this op, so this check only + # fires when the operator explicitly toggles the flag. + if forwards_l3_flipped and new_forwards_l3: + # Re-derive the home LAN from the edges; same logic as + # check_gateway_homed_in_dmz. + decky_uuid = decky["uuid"] + home_lan_id: Optional[str] = None + for e in hydrated["edges"]: + if e["decky_uuid"] == decky_uuid and e.get("is_bridge") is False: + home_lan_id = e["lan_id"] + break + if home_lan_id is None: + for e in hydrated["edges"]: + if e["decky_uuid"] == decky_uuid: + home_lan_id = e["lan_id"] + break + home_lan = next( + (lan for lan in hydrated["lans"] if lan["id"] == home_lan_id), + None, + ) + if home_lan is None or not home_lan.get("is_dmz"): + home_name = home_lan["name"] if home_lan else "(unknown)" + raise MutationError( + f"cannot promote decky {decky['decky_config']['name']!r} " + f"to gateway: home LAN {home_name!r} is not a DMZ. " + "Move the decky to the DMZ first, or pick a different decky." + ) + # Pre-check the destructive flip BEFORE any DB write, so a refused # mutation leaves zero side-effects. is_live = (await _live_topology_or_none(repo, topology_id)) is not None diff --git a/decnet/topology/validate.py b/decnet/topology/validate.py index 338e55e9..4deb9b20 100644 --- a/decnet/topology/validate.py +++ b/decnet/topology/validate.py @@ -283,6 +283,68 @@ def check_service_config_shape(h: dict[str, Any]) -> list[ValidationIssue]: return issues +def check_gateway_homed_in_dmz(h: dict[str, Any]) -> list[ValidationIssue]: + """Gateway deckies must live in a DMZ LAN. + + ``forwards_l3=True`` triggers host-port publishing in the compose + generator (see :mod:`decnet.topology.compose`); a gateway sitting + on an internal LAN would publish ports on the host without anyone + on the right side of the perimeter able to reach the service + legitimately. The semantic is "this decky is the front door" — + only meaningful when the LAN is the DMZ. + + Errors out the validator so the live ``forwards_l3`` flip path + catches this before recreating the base. + """ + if not h.get("deckies"): + return [] + + lans_by_id = {lan["id"]: lan for lan in h["lans"]} + dmz_lan_ids = { + lan["id"] for lan in h["lans"] if lan.get("is_dmz") + } + dmz_lan_names = { + lan["name"] for lan in h["lans"] if lan.get("is_dmz") + } + + # Home-LAN selection mirrors the frontend hydration: prefer the + # non-bridge edge. Falls back to the first edge if no + # is_bridge flag is set (legacy rows). + home_lan_for: dict[str, str] = {} # decky_uuid → lan_id + for e in h["edges"]: + if e.get("is_bridge") is False and e["decky_uuid"] not in home_lan_for: + home_lan_for[e["decky_uuid"]] = e["lan_id"] + for e in h["edges"]: + if e["decky_uuid"] in home_lan_for: + continue + home_lan_for[e["decky_uuid"]] = e["lan_id"] + + issues: list[ValidationIssue] = [] + for d in h["deckies"]: + cfg = d.get("decky_config") or {} + if not cfg.get("forwards_l3"): + continue + home_lan_id = home_lan_for.get(d["uuid"]) + if home_lan_id is None or home_lan_id not in dmz_lan_ids: + home_lan_name = ( + lans_by_id.get(home_lan_id, {}).get("name") + if home_lan_id + else "(no home LAN)" + ) + allowed = ", ".join(sorted(dmz_lan_names)) or "(no DMZ defined)" + issues.append( + ValidationIssue( + "error", + "GATEWAY_NOT_IN_DMZ", + f"gateway decky {d['name']!r} is on LAN " + f"{home_lan_name!r}; gateways must home in a DMZ " + f"LAN ({allowed})", + target={"decky": d["name"], "lan": home_lan_name}, + ) + ) + return issues + + def check_no_host_port_collision(h: dict[str, Any]) -> list[ValidationIssue]: """Flag gateway service ports that are already bound on the host. @@ -342,6 +404,23 @@ _RULES: list[Callable[[dict[str, Any]], list[ValidationIssue]]] = [ check_services_known, check_service_config_shape, ] +# NOTE: ``check_gateway_homed_in_dmz`` is intentionally NOT in _RULES. +# The codebase uses ``forwards_l3=True`` for two distinct semantics: +# +# 1. Generic L3 forwarding (internal bridge deckies: enable +# net.ipv4.ip_forward so the decky can route between its multi-home +# LANs). The generator writes this on internal bridges via +# ``bridge_forward_probability``; those bridges legitimately home +# in non-DMZ LANs. +# 2. DMZ gateway (host-port publisher: the decky exposes its services +# on the host's public IP). Only meaningful when the home LAN is +# the DMZ. +# +# Standing validation can't enforce DMZ-homing without breaking case 1. +# Instead, the rule fires only on the explicit user-driven flip path +# (apply_update_decky setting forwards_l3 from False → True), where the +# operator's intent is unambiguously "make this a gateway". Generator +# output and bridge-decky paths bypass this check. def validate(hydrated: dict[str, Any]) -> list[ValidationIssue]: diff --git a/tests/mutator/test_ops_materialisation.py b/tests/mutator/test_ops_materialisation.py index 2956ef75..9a2a3c37 100644 --- a/tests/mutator/test_ops_materialisation.py +++ b/tests/mutator/test_ops_materialisation.py @@ -277,6 +277,9 @@ async def test_update_decky_forwards_l3_flip_with_force_recreates_base( ): tid = await _make_active(repo) deckies = await repo.list_topology_deckies(tid) + # _make_active produces a single-LAN topology where that LAN is the + # DMZ; both deckies home there, so promoting deckies[0] to gateway + # is valid (passes the DMZ-homing guard). target = deckies[0] target_name = target["decky_config"]["name"] @@ -295,6 +298,35 @@ async def test_update_decky_forwards_l3_flip_with_force_recreates_base( assert found, "expected force-recreate up against the base" +@pytest.mark.anyio +async def test_update_decky_refuses_gateway_promotion_on_non_dmz_lan( + repo, stubs, +): + """Promoting a decky homed on an internal LAN to gateway must fail.""" + tid = await _make_active(repo) + # Add an internal LAN + a decky homed there. + from decnet.mutator.ops import apply_add_lan, apply_add_decky + await apply_add_lan(repo, tid, { + "name": "internal", "subnet": "10.99.0.0/24", "is_dmz": False, + }) + await apply_add_decky(repo, tid, { + "name": "internalbox", "lan": "internal", "services": ["ssh"], + }) + stubs["compose_with_retry"].reset_mock() + + with pytest.raises(MutationError, match="not a DMZ"): + await apply_update_decky(repo, tid, { + "decky": "internalbox", + "patch": {"forwards_l3": True}, + "force": True, + }) + + # No recreate should have fired — refused mutations leave zero + # side-effects. + for call in stubs["compose_with_retry"].call_args_list: + assert "--force-recreate" not in call.args + + # ---------------- apply_update_lan ------------------------------------- diff --git a/tests/topology/test_validate.py b/tests/topology/test_validate.py index 8863507f..9fab5332 100644 --- a/tests/topology/test_validate.py +++ b/tests/topology/test_validate.py @@ -12,6 +12,7 @@ from decnet.topology.persistence import hydrate, persist from decnet.topology.status import TopologyStatus from decnet.topology.validate import ( ValidationError, + check_gateway_homed_in_dmz, errors, validate, ) @@ -176,3 +177,85 @@ async def test_deploy_aborts_on_validation_error(repo, tmp_path, monkeypatch): topo = await repo.get_topology(tid) assert topo["status"] == TopologyStatus.PENDING + + +# --------------------------------------------------------------------- gateway-in-dmz + + +def _make_hydrated(*, dmz_id="dmz-id", internal_id="int-id") -> dict: + """Tiny hand-rolled hydrated dict for hermetic check_* unit tests.""" + return { + "topology": {"id": "t", "status": "pending"}, + "lans": [ + {"id": dmz_id, "name": "dmz", "subnet": "10.0.0.0/24", "is_dmz": True}, + {"id": internal_id, "name": "internal", "subnet": "10.0.1.0/24", "is_dmz": False}, + ], + "deckies": [], + "edges": [], + } + + +def test_check_gateway_homed_in_dmz_passes_when_gateway_is_in_dmz() -> None: + h = _make_hydrated() + h["deckies"].append({ + "uuid": "d1", "name": "gw", + "decky_config": {"name": "gw", "forwards_l3": True}, + "services": ["ssh"], + }) + h["edges"].append({ + "decky_uuid": "d1", "lan_id": "dmz-id", + "is_bridge": False, "forwards_l3": True, + }) + assert check_gateway_homed_in_dmz(h) == [] + + +def test_check_gateway_homed_in_dmz_fails_when_gateway_is_internal() -> None: + h = _make_hydrated() + h["deckies"].append({ + "uuid": "d1", "name": "gw", + "decky_config": {"name": "gw", "forwards_l3": True}, + "services": ["ssh"], + }) + # Home edge points at the internal LAN, not the DMZ. + h["edges"].append({ + "decky_uuid": "d1", "lan_id": "int-id", + "is_bridge": False, "forwards_l3": True, + }) + issues = check_gateway_homed_in_dmz(h) + assert len(issues) == 1 + assert issues[0].code == "GATEWAY_NOT_IN_DMZ" + + +def test_check_gateway_homed_in_dmz_ignores_non_gateway_deckies() -> None: + h = _make_hydrated() + h["deckies"].append({ + "uuid": "d1", "name": "web", + "decky_config": {"name": "web"}, # forwards_l3 absent + "services": ["ssh"], + }) + h["edges"].append({ + "decky_uuid": "d1", "lan_id": "int-id", + "is_bridge": False, + }) + assert check_gateway_homed_in_dmz(h) == [] + + +def test_check_gateway_homed_in_dmz_uses_non_bridge_edge_as_home() -> None: + """Multi-homed gateway: home is the non-bridge edge, not the bridge edge.""" + h = _make_hydrated() + h["deckies"].append({ + "uuid": "d1", "name": "gw", + "decky_config": {"name": "gw", "forwards_l3": True}, + "services": ["ssh"], + }) + # Bridge edge first (would be picked by a naive 'first edge' rule). + h["edges"].append({ + "decky_uuid": "d1", "lan_id": "int-id", + "is_bridge": True, "forwards_l3": False, + }) + h["edges"].append({ + "decky_uuid": "d1", "lan_id": "dmz-id", + "is_bridge": False, "forwards_l3": True, + }) + # Home is the DMZ via the non-bridge edge → no issue. + assert check_gateway_homed_in_dmz(h) == []