feat(topology): pending-only mutation repo methods with cascade + guards

MazeNET phase 2 step 6. Equips the repo layer with the CRUD the web
editor needs before deploy.

- TopologyNotEditable exception: raised when a pending-only method hits
  a non-pending topology. The intent is "free-form edits stop at deploy;
  the mutator (step 7) takes over for live topologies."
- _assert_pending helper checks status inside the session.
- update_lan / update_topology_decky accept enforce_pending=True for
  pre-deploy callers (existing internal callers default to False so
  behavior is unchanged).
- delete_lan: cascades edges; refuses if any decky has only one edge
  (= this LAN is its home) to prevent orphans.
- delete_topology_decky: cascades edges.
- delete_topology_edge: bare-bones removal.

All four mutators accept expected_version for optimistic concurrency.
Existing tests continue to pass (no behavior change for persist/deploy).
This commit is contained in:
2026-04-20 17:50:29 -04:00
parent 9afaac7612
commit 91df57d36b
4 changed files with 321 additions and 15 deletions

View File

@@ -53,6 +53,24 @@ class TopologyStatusError(ValueError):
"""Raised when an illegal topology status transition is attempted."""
class TopologyNotEditable(RuntimeError):
"""Raised when a pending-only mutation hits a non-pending topology.
Pre-deploy edits (update_lan, delete_lan, update/delete decky,
delete_edge) are only legal while the topology is ``pending``.
After deploy the mutator's reconciler + topology_mutations table
take over.
"""
def __init__(self, *, status: str, reason: str = "") -> None:
self.status = status
self.reason = reason
super().__init__(
f"topology not editable (status={status!r})"
+ (f": {reason}" if reason else "")
)
class VersionConflict(RuntimeError):
"""Raised when a topology write is supplied a stale ``expected_version``.

View File

@@ -265,7 +265,14 @@ class BaseRepository(ABC):
async def add_lan(self, data: dict[str, Any]) -> str:
raise NotImplementedError
async def update_lan(self, lan_id: str, fields: dict[str, Any]) -> None:
async def update_lan(
self,
lan_id: str,
fields: dict[str, Any],
*,
expected_version: Optional[int] = None,
enforce_pending: bool = False,
) -> None:
raise NotImplementedError
async def list_lans_for_topology(
@@ -277,7 +284,12 @@ class BaseRepository(ABC):
raise NotImplementedError
async def update_topology_decky(
self, decky_uuid: str, fields: dict[str, Any]
self,
decky_uuid: str,
fields: dict[str, Any],
*,
expected_version: Optional[int] = None,
enforce_pending: bool = False,
) -> None:
raise NotImplementedError
@@ -298,3 +310,20 @@ class BaseRepository(ABC):
self, topology_id: str, limit: int = 100
) -> list[dict[str, Any]]:
raise NotImplementedError
# -------------------- pre-deploy (pending-only) mutations --------------------
async def delete_lan(
self, lan_id: str, *, expected_version: Optional[int] = None
) -> None:
raise NotImplementedError
async def delete_topology_decky(
self, decky_uuid: str, *, expected_version: Optional[int] = None
) -> None:
raise NotImplementedError
async def delete_topology_edge(
self, edge_id: str, *, expected_version: Optional[int] = None
) -> None:
raise NotImplementedError

View File

@@ -1027,6 +1027,23 @@ class SQLModelRepository(BaseRepository):
await session.commit()
return True
async def _assert_pending(self, session, topology_id: str) -> None:
"""Pre-deploy edits are pending-only. Raises TopologyNotEditable."""
from decnet.topology.status import TopologyNotEditable, TopologyStatus
result = await session.execute(
select(Topology).where(Topology.id == topology_id)
)
topo = result.scalar_one_or_none()
if topo is None:
raise ValueError(f"topology {topology_id!r} not found")
if topo.status != TopologyStatus.PENDING:
raise TopologyNotEditable(
status=topo.status,
reason="free-form edits are pending-only; use the "
"mutator (topology_mutations) after deploy",
)
async def _check_and_bump_version(
self,
session,
@@ -1082,18 +1099,20 @@ class SQLModelRepository(BaseRepository):
fields: dict[str, Any],
*,
expected_version: Optional[int] = None,
enforce_pending: bool = False,
) -> None:
if not fields:
return
async with self._session() as session:
if expected_version is not None:
# Need the LAN's topology_id to check version.
result = await session.execute(
select(LAN).where(LAN.id == lan_id)
)
lan = result.scalar_one_or_none()
if lan is None:
raise ValueError(f"lan {lan_id!r} not found")
if enforce_pending:
await self._assert_pending(session, lan.topology_id)
if expected_version is not None:
await self._check_and_bump_version(
session, lan.topology_id, expected_version
)
@@ -1102,6 +1121,58 @@ class SQLModelRepository(BaseRepository):
)
await session.commit()
async def delete_lan(
self,
lan_id: str,
*,
expected_version: Optional[int] = None,
) -> None:
"""Cascade-delete a LAN from a pending topology.
Rejects if any decky declares this LAN as its home (i.e. has a
non-bridge edge to it — the only LAN that decky lives in). The
caller must delete or reassign the home-deckies first.
"""
from decnet.topology.status import TopologyNotEditable # noqa: F401
async with self._session() as session:
result = await session.execute(select(LAN).where(LAN.id == lan_id))
lan = result.scalar_one_or_none()
if lan is None:
return
await self._assert_pending(session, lan.topology_id)
# Home-decky check: any decky whose only edge lands here?
edges_result = await session.execute(
select(TopologyEdge).where(TopologyEdge.lan_id == lan_id)
)
edges_here = edges_result.scalars().all()
decky_uuids_on_this_lan = {e.decky_uuid for e in edges_here}
for decky_uuid in decky_uuids_on_this_lan:
other = await session.execute(
select(TopologyEdge).where(
TopologyEdge.decky_uuid == decky_uuid,
TopologyEdge.lan_id != lan_id,
)
)
if other.scalar_one_or_none() is None:
raise ValueError(
f"cannot delete LAN {lan.name!r}: decky "
f"{decky_uuid} has no other LAN (would be orphaned)"
)
if expected_version is not None:
await self._check_and_bump_version(
session, lan.topology_id, expected_version
)
# Cascade edges → LAN.
await session.execute(
text("DELETE FROM topology_edges WHERE lan_id = :l"),
{"l": lan_id},
)
await session.execute(text("DELETE FROM lans WHERE id = :l"), {"l": lan_id})
await session.commit()
async def list_lans_for_topology(
self, topology_id: str
) -> list[dict[str, Any]]:
@@ -1134,19 +1205,22 @@ class SQLModelRepository(BaseRepository):
fields: dict[str, Any],
*,
expected_version: Optional[int] = None,
enforce_pending: bool = False,
) -> None:
if not fields:
return
payload = self._serialize_json_fields(fields, ("services", "decky_config"))
payload.setdefault("updated_at", datetime.now(timezone.utc))
async with self._session() as session:
if expected_version is not None:
result = await session.execute(
select(TopologyDecky).where(TopologyDecky.uuid == decky_uuid)
)
d = result.scalar_one_or_none()
if d is None:
raise ValueError(f"decky {decky_uuid!r} not found")
if enforce_pending:
await self._assert_pending(session, d.topology_id)
if expected_version is not None:
await self._check_and_bump_version(
session, d.topology_id, expected_version
)
@@ -1157,6 +1231,35 @@ class SQLModelRepository(BaseRepository):
)
await session.commit()
async def delete_topology_decky(
self,
decky_uuid: str,
*,
expected_version: Optional[int] = None,
) -> None:
"""Cascade-delete a decky + all its edges from a pending topology."""
async with self._session() as session:
result = await session.execute(
select(TopologyDecky).where(TopologyDecky.uuid == decky_uuid)
)
d = result.scalar_one_or_none()
if d is None:
return
await self._assert_pending(session, d.topology_id)
if expected_version is not None:
await self._check_and_bump_version(
session, d.topology_id, expected_version
)
await session.execute(
text("DELETE FROM topology_edges WHERE decky_uuid = :u"),
{"u": decky_uuid},
)
await session.execute(
text("DELETE FROM topology_deckies WHERE uuid = :u"),
{"u": decky_uuid},
)
await session.commit()
async def list_topology_deckies(
self, topology_id: str
) -> list[dict[str, Any]]:
@@ -1189,6 +1292,30 @@ class SQLModelRepository(BaseRepository):
await session.refresh(row)
return row.id
async def delete_topology_edge(
self,
edge_id: str,
*,
expected_version: Optional[int] = None,
) -> None:
async with self._session() as session:
result = await session.execute(
select(TopologyEdge).where(TopologyEdge.id == edge_id)
)
edge = result.scalar_one_or_none()
if edge is None:
return
await self._assert_pending(session, edge.topology_id)
if expected_version is not None:
await self._check_and_bump_version(
session, edge.topology_id, expected_version
)
await session.execute(
text("DELETE FROM topology_edges WHERE id = :e"),
{"e": edge_id},
)
await session.commit()
async def list_topology_edges(
self, topology_id: str
) -> list[dict[str, Any]]:

View File

@@ -0,0 +1,132 @@
"""Pre-deploy mutation repo methods: pending-only, version-aware."""
from __future__ import annotations
import pytest
from decnet.topology.config import TopologyConfig
from decnet.topology.generator import generate
from decnet.topology.persistence import persist, transition_status
from decnet.topology.status import TopologyNotEditable, TopologyStatus
from decnet.web.db.factory import get_repository
def _cfg(**kw) -> TopologyConfig:
base = dict(
name="edit",
depth=1,
branching_factor=1,
deckies_per_lan_min=2,
deckies_per_lan_max=2,
cross_edge_probability=0.0,
randomize_services=False,
services_explicit=["ssh"],
seed=6,
)
base.update(kw)
return TopologyConfig(**base)
@pytest.fixture
async def repo(tmp_path):
r = get_repository(db_path=str(tmp_path / "edit.db"))
await r.initialize()
return r
@pytest.mark.anyio
async def test_add_lan_to_pending_bumps_version(repo):
plan = generate(_cfg())
tid = await persist(repo, plan)
await repo.add_lan(
{"topology_id": tid, "name": "LAN-NEW", "subnet": "10.55.0.0/24", "is_dmz": False},
expected_version=1,
)
topo = await repo.get_topology(tid)
assert topo["version"] == 2
lans = {l["name"] for l in await repo.list_lans_for_topology(tid)}
assert "LAN-NEW" in lans
@pytest.mark.anyio
async def test_update_decky_roundtrips_service_config(repo):
plan = generate(_cfg())
tid = await persist(repo, plan)
decky = (await repo.list_topology_deckies(tid))[0]
patch = dict(decky["decky_config"])
patch["service_config"] = {"ssh": {"password": "megapassword"}}
await repo.update_topology_decky(
decky["uuid"], {"decky_config": patch}, expected_version=1,
)
fresh = next(
d for d in await repo.list_topology_deckies(tid)
if d["uuid"] == decky["uuid"]
)
assert fresh["decky_config"]["service_config"]["ssh"]["password"] == "megapassword"
@pytest.mark.anyio
async def test_update_decky_rejected_on_active_topology(repo):
plan = generate(_cfg())
tid = await persist(repo, plan)
decky = (await repo.list_topology_deckies(tid))[0]
# pending → deploying → active
await transition_status(repo, tid, TopologyStatus.DEPLOYING)
await transition_status(repo, tid, TopologyStatus.ACTIVE)
with pytest.raises(TopologyNotEditable) as ei:
await repo.update_topology_decky(
decky["uuid"],
{"decky_config": decky["decky_config"]},
enforce_pending=True,
)
assert ei.value.status == TopologyStatus.ACTIVE
@pytest.mark.anyio
async def test_delete_lan_with_home_decky_refused(repo):
"""A LAN whose decky has no other edge cannot be deleted — it'd orphan."""
plan = generate(_cfg(depth=1, branching_factor=1, deckies_per_lan_max=1, deckies_per_lan_min=1))
tid = await persist(repo, plan)
lan = (await repo.list_lans_for_topology(tid))[0]
with pytest.raises(ValueError, match="orphaned"):
await repo.delete_lan(lan["id"])
@pytest.mark.anyio
async def test_delete_edge_leaves_decky_intact(repo):
"""Deleting one bridge edge of a multi-homed decky should succeed."""
# depth=1 branching=1 gives DMZ(LAN-00) + LAN-01 with a bridge decky.
plan = generate(_cfg())
tid = await persist(repo, plan)
edges = await repo.list_topology_edges(tid)
bridge_edges = [e for e in edges if e["is_bridge"]]
assert bridge_edges, "generator should produce at least one bridge edge"
# Delete exactly one — the bridge decky should keep at least one edge.
edge = bridge_edges[0]
before_deckies = {d["uuid"] for d in await repo.list_topology_deckies(tid)}
await repo.delete_topology_edge(edge["id"])
after_deckies = {d["uuid"] for d in await repo.list_topology_deckies(tid)}
assert before_deckies == after_deckies
remaining = await repo.list_topology_edges(tid)
assert edge["id"] not in {e["id"] for e in remaining}
@pytest.mark.anyio
async def test_delete_decky_cascades_edges(repo):
plan = generate(_cfg())
tid = await persist(repo, plan)
decky = (await repo.list_topology_deckies(tid))[0]
await repo.delete_topology_decky(decky["uuid"])
# No edge pointing to the removed decky remains.
remaining = await repo.list_topology_edges(tid)
assert decky["uuid"] not in {e["decky_uuid"] for e in remaining}
@pytest.mark.anyio
async def test_delete_edge_rejected_on_active(repo):
plan = generate(_cfg())
tid = await persist(repo, plan)
edges = await repo.list_topology_edges(tid)
await transition_status(repo, tid, TopologyStatus.DEPLOYING)
await transition_status(repo, tid, TopologyStatus.ACTIVE)
with pytest.raises(TopologyNotEditable):
await repo.delete_topology_edge(edges[0]["id"])