From 91df57d36bfebdc4c31017f2c5ad4c630d79171f Mon Sep 17 00:00:00 2001 From: anti Date: Mon, 20 Apr 2026 17:50:29 -0400 Subject: [PATCH] feat(topology): pending-only mutation repo methods with cascade + guards MazeNET phase 2 step 6. Equips the repo layer with the CRUD the web editor needs before deploy. - TopologyNotEditable exception: raised when a pending-only method hits a non-pending topology. The intent is "free-form edits stop at deploy; the mutator (step 7) takes over for live topologies." - _assert_pending helper checks status inside the session. - update_lan / update_topology_decky accept enforce_pending=True for pre-deploy callers (existing internal callers default to False so behavior is unchanged). - delete_lan: cascades edges; refuses if any decky has only one edge (= this LAN is its home) to prevent orphans. - delete_topology_decky: cascades edges. - delete_topology_edge: bare-bones removal. All four mutators accept expected_version for optimistic concurrency. Existing tests continue to pass (no behavior change for persist/deploy). --- decnet/topology/status.py | 18 ++++ decnet/web/db/repository.py | 33 ++++++- decnet/web/db/sqlmodel_repo.py | 153 ++++++++++++++++++++++++++++++--- tests/topology/test_editing.py | 132 ++++++++++++++++++++++++++++ 4 files changed, 321 insertions(+), 15 deletions(-) create mode 100644 tests/topology/test_editing.py diff --git a/decnet/topology/status.py b/decnet/topology/status.py index cc5c818c..b01267b5 100644 --- a/decnet/topology/status.py +++ b/decnet/topology/status.py @@ -53,6 +53,24 @@ class TopologyStatusError(ValueError): """Raised when an illegal topology status transition is attempted.""" +class TopologyNotEditable(RuntimeError): + """Raised when a pending-only mutation hits a non-pending topology. + + Pre-deploy edits (update_lan, delete_lan, update/delete decky, + delete_edge) are only legal while the topology is ``pending``. + After deploy the mutator's reconciler + topology_mutations table + take over. + """ + + def __init__(self, *, status: str, reason: str = "") -> None: + self.status = status + self.reason = reason + super().__init__( + f"topology not editable (status={status!r})" + + (f": {reason}" if reason else "") + ) + + class VersionConflict(RuntimeError): """Raised when a topology write is supplied a stale ``expected_version``. diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index d7f1f53d..67af535b 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -265,7 +265,14 @@ class BaseRepository(ABC): async def add_lan(self, data: dict[str, Any]) -> str: raise NotImplementedError - async def update_lan(self, lan_id: str, fields: dict[str, Any]) -> None: + async def update_lan( + self, + lan_id: str, + fields: dict[str, Any], + *, + expected_version: Optional[int] = None, + enforce_pending: bool = False, + ) -> None: raise NotImplementedError async def list_lans_for_topology( @@ -277,7 +284,12 @@ class BaseRepository(ABC): raise NotImplementedError async def update_topology_decky( - self, decky_uuid: str, fields: dict[str, Any] + self, + decky_uuid: str, + fields: dict[str, Any], + *, + expected_version: Optional[int] = None, + enforce_pending: bool = False, ) -> None: raise NotImplementedError @@ -298,3 +310,20 @@ class BaseRepository(ABC): self, topology_id: str, limit: int = 100 ) -> list[dict[str, Any]]: raise NotImplementedError + + # -------------------- pre-deploy (pending-only) mutations -------------------- + + async def delete_lan( + self, lan_id: str, *, expected_version: Optional[int] = None + ) -> None: + raise NotImplementedError + + async def delete_topology_decky( + self, decky_uuid: str, *, expected_version: Optional[int] = None + ) -> None: + raise NotImplementedError + + async def delete_topology_edge( + self, edge_id: str, *, expected_version: Optional[int] = None + ) -> None: + raise NotImplementedError diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index 306d3d19..5d3da3ce 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -1027,6 +1027,23 @@ class SQLModelRepository(BaseRepository): await session.commit() return True + async def _assert_pending(self, session, topology_id: str) -> None: + """Pre-deploy edits are pending-only. Raises TopologyNotEditable.""" + from decnet.topology.status import TopologyNotEditable, TopologyStatus + + result = await session.execute( + select(Topology).where(Topology.id == topology_id) + ) + topo = result.scalar_one_or_none() + if topo is None: + raise ValueError(f"topology {topology_id!r} not found") + if topo.status != TopologyStatus.PENDING: + raise TopologyNotEditable( + status=topo.status, + reason="free-form edits are pending-only; use the " + "mutator (topology_mutations) after deploy", + ) + async def _check_and_bump_version( self, session, @@ -1082,18 +1099,20 @@ class SQLModelRepository(BaseRepository): fields: dict[str, Any], *, expected_version: Optional[int] = None, + enforce_pending: bool = False, ) -> None: if not fields: return async with self._session() as session: + result = await session.execute( + select(LAN).where(LAN.id == lan_id) + ) + lan = result.scalar_one_or_none() + if lan is None: + raise ValueError(f"lan {lan_id!r} not found") + if enforce_pending: + await self._assert_pending(session, lan.topology_id) if expected_version is not None: - # Need the LAN's topology_id to check version. - result = await session.execute( - select(LAN).where(LAN.id == lan_id) - ) - lan = result.scalar_one_or_none() - if lan is None: - raise ValueError(f"lan {lan_id!r} not found") await self._check_and_bump_version( session, lan.topology_id, expected_version ) @@ -1102,6 +1121,58 @@ class SQLModelRepository(BaseRepository): ) await session.commit() + async def delete_lan( + self, + lan_id: str, + *, + expected_version: Optional[int] = None, + ) -> None: + """Cascade-delete a LAN from a pending topology. + + Rejects if any decky declares this LAN as its home (i.e. has a + non-bridge edge to it — the only LAN that decky lives in). The + caller must delete or reassign the home-deckies first. + """ + from decnet.topology.status import TopologyNotEditable # noqa: F401 + + async with self._session() as session: + result = await session.execute(select(LAN).where(LAN.id == lan_id)) + lan = result.scalar_one_or_none() + if lan is None: + return + await self._assert_pending(session, lan.topology_id) + + # Home-decky check: any decky whose only edge lands here? + edges_result = await session.execute( + select(TopologyEdge).where(TopologyEdge.lan_id == lan_id) + ) + edges_here = edges_result.scalars().all() + decky_uuids_on_this_lan = {e.decky_uuid for e in edges_here} + for decky_uuid in decky_uuids_on_this_lan: + other = await session.execute( + select(TopologyEdge).where( + TopologyEdge.decky_uuid == decky_uuid, + TopologyEdge.lan_id != lan_id, + ) + ) + if other.scalar_one_or_none() is None: + raise ValueError( + f"cannot delete LAN {lan.name!r}: decky " + f"{decky_uuid} has no other LAN (would be orphaned)" + ) + + if expected_version is not None: + await self._check_and_bump_version( + session, lan.topology_id, expected_version + ) + # Cascade edges → LAN. + await session.execute( + text("DELETE FROM topology_edges WHERE lan_id = :l"), + {"l": lan_id}, + ) + await session.execute(text("DELETE FROM lans WHERE id = :l"), {"l": lan_id}) + await session.commit() + async def list_lans_for_topology( self, topology_id: str ) -> list[dict[str, Any]]: @@ -1134,19 +1205,22 @@ class SQLModelRepository(BaseRepository): fields: dict[str, Any], *, expected_version: Optional[int] = None, + enforce_pending: bool = False, ) -> None: if not fields: return payload = self._serialize_json_fields(fields, ("services", "decky_config")) payload.setdefault("updated_at", datetime.now(timezone.utc)) async with self._session() as session: + result = await session.execute( + select(TopologyDecky).where(TopologyDecky.uuid == decky_uuid) + ) + d = result.scalar_one_or_none() + if d is None: + raise ValueError(f"decky {decky_uuid!r} not found") + if enforce_pending: + await self._assert_pending(session, d.topology_id) if expected_version is not None: - result = await session.execute( - select(TopologyDecky).where(TopologyDecky.uuid == decky_uuid) - ) - d = result.scalar_one_or_none() - if d is None: - raise ValueError(f"decky {decky_uuid!r} not found") await self._check_and_bump_version( session, d.topology_id, expected_version ) @@ -1157,6 +1231,35 @@ class SQLModelRepository(BaseRepository): ) await session.commit() + async def delete_topology_decky( + self, + decky_uuid: str, + *, + expected_version: Optional[int] = None, + ) -> None: + """Cascade-delete a decky + all its edges from a pending topology.""" + async with self._session() as session: + result = await session.execute( + select(TopologyDecky).where(TopologyDecky.uuid == decky_uuid) + ) + d = result.scalar_one_or_none() + if d is None: + return + await self._assert_pending(session, d.topology_id) + if expected_version is not None: + await self._check_and_bump_version( + session, d.topology_id, expected_version + ) + await session.execute( + text("DELETE FROM topology_edges WHERE decky_uuid = :u"), + {"u": decky_uuid}, + ) + await session.execute( + text("DELETE FROM topology_deckies WHERE uuid = :u"), + {"u": decky_uuid}, + ) + await session.commit() + async def list_topology_deckies( self, topology_id: str ) -> list[dict[str, Any]]: @@ -1189,6 +1292,30 @@ class SQLModelRepository(BaseRepository): await session.refresh(row) return row.id + async def delete_topology_edge( + self, + edge_id: str, + *, + expected_version: Optional[int] = None, + ) -> None: + async with self._session() as session: + result = await session.execute( + select(TopologyEdge).where(TopologyEdge.id == edge_id) + ) + edge = result.scalar_one_or_none() + if edge is None: + return + await self._assert_pending(session, edge.topology_id) + if expected_version is not None: + await self._check_and_bump_version( + session, edge.topology_id, expected_version + ) + await session.execute( + text("DELETE FROM topology_edges WHERE id = :e"), + {"e": edge_id}, + ) + await session.commit() + async def list_topology_edges( self, topology_id: str ) -> list[dict[str, Any]]: diff --git a/tests/topology/test_editing.py b/tests/topology/test_editing.py new file mode 100644 index 00000000..3927be6b --- /dev/null +++ b/tests/topology/test_editing.py @@ -0,0 +1,132 @@ +"""Pre-deploy mutation repo methods: pending-only, version-aware.""" +from __future__ import annotations + +import pytest + +from decnet.topology.config import TopologyConfig +from decnet.topology.generator import generate +from decnet.topology.persistence import persist, transition_status +from decnet.topology.status import TopologyNotEditable, TopologyStatus +from decnet.web.db.factory import get_repository + + +def _cfg(**kw) -> TopologyConfig: + base = dict( + name="edit", + depth=1, + branching_factor=1, + deckies_per_lan_min=2, + deckies_per_lan_max=2, + cross_edge_probability=0.0, + randomize_services=False, + services_explicit=["ssh"], + seed=6, + ) + base.update(kw) + return TopologyConfig(**base) + + +@pytest.fixture +async def repo(tmp_path): + r = get_repository(db_path=str(tmp_path / "edit.db")) + await r.initialize() + return r + + +@pytest.mark.anyio +async def test_add_lan_to_pending_bumps_version(repo): + plan = generate(_cfg()) + tid = await persist(repo, plan) + await repo.add_lan( + {"topology_id": tid, "name": "LAN-NEW", "subnet": "10.55.0.0/24", "is_dmz": False}, + expected_version=1, + ) + topo = await repo.get_topology(tid) + assert topo["version"] == 2 + lans = {l["name"] for l in await repo.list_lans_for_topology(tid)} + assert "LAN-NEW" in lans + + +@pytest.mark.anyio +async def test_update_decky_roundtrips_service_config(repo): + plan = generate(_cfg()) + tid = await persist(repo, plan) + decky = (await repo.list_topology_deckies(tid))[0] + patch = dict(decky["decky_config"]) + patch["service_config"] = {"ssh": {"password": "megapassword"}} + await repo.update_topology_decky( + decky["uuid"], {"decky_config": patch}, expected_version=1, + ) + fresh = next( + d for d in await repo.list_topology_deckies(tid) + if d["uuid"] == decky["uuid"] + ) + assert fresh["decky_config"]["service_config"]["ssh"]["password"] == "megapassword" + + +@pytest.mark.anyio +async def test_update_decky_rejected_on_active_topology(repo): + plan = generate(_cfg()) + tid = await persist(repo, plan) + decky = (await repo.list_topology_deckies(tid))[0] + # pending → deploying → active + await transition_status(repo, tid, TopologyStatus.DEPLOYING) + await transition_status(repo, tid, TopologyStatus.ACTIVE) + with pytest.raises(TopologyNotEditable) as ei: + await repo.update_topology_decky( + decky["uuid"], + {"decky_config": decky["decky_config"]}, + enforce_pending=True, + ) + assert ei.value.status == TopologyStatus.ACTIVE + + +@pytest.mark.anyio +async def test_delete_lan_with_home_decky_refused(repo): + """A LAN whose decky has no other edge cannot be deleted — it'd orphan.""" + plan = generate(_cfg(depth=1, branching_factor=1, deckies_per_lan_max=1, deckies_per_lan_min=1)) + tid = await persist(repo, plan) + lan = (await repo.list_lans_for_topology(tid))[0] + with pytest.raises(ValueError, match="orphaned"): + await repo.delete_lan(lan["id"]) + + +@pytest.mark.anyio +async def test_delete_edge_leaves_decky_intact(repo): + """Deleting one bridge edge of a multi-homed decky should succeed.""" + # depth=1 branching=1 gives DMZ(LAN-00) + LAN-01 with a bridge decky. + plan = generate(_cfg()) + tid = await persist(repo, plan) + edges = await repo.list_topology_edges(tid) + bridge_edges = [e for e in edges if e["is_bridge"]] + assert bridge_edges, "generator should produce at least one bridge edge" + # Delete exactly one — the bridge decky should keep at least one edge. + edge = bridge_edges[0] + before_deckies = {d["uuid"] for d in await repo.list_topology_deckies(tid)} + await repo.delete_topology_edge(edge["id"]) + after_deckies = {d["uuid"] for d in await repo.list_topology_deckies(tid)} + assert before_deckies == after_deckies + remaining = await repo.list_topology_edges(tid) + assert edge["id"] not in {e["id"] for e in remaining} + + +@pytest.mark.anyio +async def test_delete_decky_cascades_edges(repo): + plan = generate(_cfg()) + tid = await persist(repo, plan) + decky = (await repo.list_topology_deckies(tid))[0] + await repo.delete_topology_decky(decky["uuid"]) + # No edge pointing to the removed decky remains. + remaining = await repo.list_topology_edges(tid) + assert decky["uuid"] not in {e["decky_uuid"] for e in remaining} + + +@pytest.mark.anyio +async def test_delete_edge_rejected_on_active(repo): + plan = generate(_cfg()) + tid = await persist(repo, plan) + edges = await repo.list_topology_edges(tid) + await transition_status(repo, tid, TopologyStatus.DEPLOYING) + await transition_status(repo, tid, TopologyStatus.ACTIVE) + with pytest.raises(TopologyNotEditable): + await repo.delete_topology_edge(edges[0]["id"])