refactor(topology): introduce TopologyRepository protocol with DTO return types

Replace repo: BaseRepository with a structural TopologyRepository protocol
in persistence.py and allocator.py. All read methods now return typed DTOs
(TopologySummary, LANRow, DeckyRow, EdgeRow) instead of raw dicts, eliminating
silent field-shape regressions across the topology subsystem.

TopologySummary gains email_personas and language_default so api_personas.py
can continue reading those fields via attribute access. hydrate() converts
DTOs to dicts before passing to _backfill_decky_configs, keeping the mutable
working-state function dict-based at its boundary. All production callers
(router handlers, mutator, CLI, heartbeat) migrated from dict/get access to
attribute access. 134 tests pass.
This commit is contained in:
2026-04-30 23:51:41 -04:00
parent 3456d3ab45
commit fc1f0914b7
34 changed files with 231 additions and 175 deletions

View File

@@ -135,8 +135,8 @@ async def _reconcile_topology_report(
reported_hash = (reported or {}).get("applied_version_hash")
for topo in mine:
tid = topo["id"]
if topo.get("needs_resync"):
tid = topo.id
if topo.needs_resync:
continue
expected: Optional[str] = None
if reported_id == tid and reported_hash:

View File

@@ -1,8 +1,6 @@
"""Shared helpers for the Phase-3 child-CRUD routes."""
from __future__ import annotations
from typing import Any
from fastapi import HTTPException
from decnet.topology.status import (
@@ -10,17 +8,18 @@ from decnet.topology.status import (
TopologyStatus,
VersionConflict,
)
from decnet.web.db.models.topology import TopologySummary
from decnet.web.dependencies import repo
async def get_topology_or_404(topology_id: str) -> dict[str, Any]:
async def get_topology_or_404(topology_id: str) -> TopologySummary:
topo = await repo.get_topology(topology_id)
if topo is None:
raise HTTPException(status_code=404, detail="Topology not found")
return topo
async def assert_pending_or_409(topology_id: str) -> dict[str, Any]:
async def assert_pending_or_409(topology_id: str) -> TopologySummary:
"""Ensure the topology exists and is in ``pending`` state.
The repo layer enforces the same rule inside mutation methods, but the
@@ -28,11 +27,11 @@ async def assert_pending_or_409(topology_id: str) -> dict[str, Any]:
the pre-condition before any side effect.
"""
topo = await get_topology_or_404(topology_id)
if topo["status"] != TopologyStatus.PENDING:
if topo.status != TopologyStatus.PENDING:
raise HTTPException(
status_code=409,
detail=(
f"Topology is {topo['status']!r}; free-form child edits are "
f"Topology is {topo.status!r}; free-form child edits are "
f"pending-only. Use the mutation queue for active topologies."
),
)

View File

@@ -164,13 +164,13 @@ async def api_next_ip(
if await repo.get_topology(topology_id) is None:
raise HTTPException(status_code=404, detail="Topology not found")
lans = await repo.list_lans_for_topology(topology_id)
lan = next((ln for ln in lans if ln["id"] == lan_id), None)
lan = next((ln for ln in lans if ln.id == lan_id), None)
if lan is None:
raise HTTPException(status_code=404, detail="LAN not found")
deckies = await repo.list_topology_deckies(topology_id)
alloc = IPAllocator(subnet=lan["subnet"])
alloc = IPAllocator(subnet=lan.subnet)
for d in deckies:
ip = (d.get("decky_config") or {}).get("ips_by_lan", {}).get(lan["name"])
ip = (d.decky_config or {}).get("ips_by_lan", {}).get(lan.name)
if ip:
try:
alloc.reserve(ip)
@@ -180,4 +180,4 @@ async def api_next_ip(
ip = alloc.next_free()
except AllocatorExhausted as e:
raise HTTPException(status_code=409, detail=str(e))
return NextIPResponse(subnet=lan["subnet"], ip=ip)
return NextIPResponse(subnet=lan.subnet, ip=ip)

View File

@@ -58,10 +58,10 @@ async def api_create_decky(
raise map_repo_exception(exc) from exc
rows = await repo.list_topology_deckies(topology_id)
row = next((r for r in rows if r["uuid"] == decky_uuid), None)
row = next((r for r in rows if r.uuid == decky_uuid), None)
if row is None: # pragma: no cover
raise HTTPException(status_code=500, detail="Decky insert vanished")
return DeckyRow(**row)
return row
@router.patch(
@@ -99,10 +99,10 @@ async def api_update_decky(
raise HTTPException(status_code=404, detail=str(exc)) from exc
rows = await repo.list_topology_deckies(topology_id)
row = next((r for r in rows if r["uuid"] == decky_uuid), None)
row = next((r for r in rows if r.uuid == decky_uuid), None)
if row is None:
raise HTTPException(status_code=404, detail="Decky not found")
return DeckyRow(**row)
return row
@router.delete(
@@ -126,7 +126,7 @@ async def api_delete_decky(
await assert_pending_or_409(topology_id)
rows = await repo.list_topology_deckies(topology_id)
if not any(r["uuid"] == decky_uuid for r in rows):
if not any(r.uuid == decky_uuid for r in rows):
raise HTTPException(status_code=404, detail="Decky not found")
try:

View File

@@ -36,11 +36,11 @@ async def api_delete_topology(
topo = await repo.get_topology(topology_id)
if topo is None:
raise HTTPException(status_code=404, detail="Topology not found")
if topo["status"] not in _DELETABLE:
if topo.status not in _DELETABLE:
raise HTTPException(
status_code=409,
detail=(
f"Topology is {topo['status']!r}; teardown to 'torn_down' "
f"Topology is {topo.status!r}; teardown to 'torn_down' "
f"before delete."
),
)

View File

@@ -63,11 +63,11 @@ async def api_deploy_topology(
topo = await repo.get_topology(topology_id)
if topo is None:
raise HTTPException(status_code=404, detail="Topology not found")
if topo["status"] != TopologyStatus.PENDING:
if topo.status != TopologyStatus.PENDING:
raise HTTPException(
status_code=409,
detail=(
f"Topology is {topo['status']!r}; only 'pending' topologies "
f"Topology is {topo.status!r}; only 'pending' topologies "
f"can be deployed."
),
)

View File

@@ -46,13 +46,13 @@ async def api_create_edge(
# Referential integrity: decky + LAN must belong to this topology.
deckies = await repo.list_topology_deckies(topology_id)
if not any(d["uuid"] == body.decky_uuid for d in deckies):
if not any(d.uuid == body.decky_uuid for d in deckies):
raise HTTPException(
status_code=400,
detail=f"decky {body.decky_uuid!r} not in topology {topology_id!r}",
)
lans = await repo.list_lans_for_topology(topology_id)
if not any(r["id"] == body.lan_id for r in lans):
if not any(r.id == body.lan_id for r in lans):
raise HTTPException(
status_code=400,
detail=f"lan {body.lan_id!r} not in topology {topology_id!r}",
@@ -73,10 +73,10 @@ async def api_create_edge(
raise map_repo_exception(exc) from exc
edges = await repo.list_topology_edges(topology_id)
row = next((e for e in edges if e["id"] == edge_id), None)
row = next((e for e in edges if e.id == edge_id), None)
if row is None: # pragma: no cover
raise HTTPException(status_code=500, detail="Edge insert vanished")
return EdgeRow(**row)
return row
@router.delete(
@@ -100,7 +100,7 @@ async def api_delete_edge(
await assert_pending_or_409(topology_id)
edges = await repo.list_topology_edges(topology_id)
if not any(e["id"] == edge_id for e in edges):
if not any(e.id == edge_id for e in edges):
raise HTTPException(status_code=404, detail="Edge not found")
try:

View File

@@ -69,7 +69,7 @@ async def api_topology_events(
# GET /topologies/{id}/mutations). Adding a new event family here
# requires a threat-model review for F6/I (role leakage).
topo = await get_topology_or_404(topology_id)
snapshot_status = topo["status"]
snapshot_status = topo.status
in_flight: list[dict] = []
for state in _IN_FLIGHT_STATES:
in_flight.extend(await repo.list_topology_mutations(topology_id, state=state))

View File

@@ -73,11 +73,11 @@ async def api_create_lan(
raise map_repo_exception(exc) from exc
rows = await repo.list_lans_for_topology(topology_id)
row = next((r for r in rows if r["id"] == lan_id), None)
row = next((r for r in rows if r.id == lan_id), None)
if row is None: # pragma: no cover — would mean insert vanished
raise HTTPException(status_code=500, detail="LAN insert vanished")
return LANRow(**row)
return row
@router.patch(
@@ -115,10 +115,10 @@ async def api_update_lan(
raise HTTPException(status_code=404, detail=str(exc)) from exc
rows = await repo.list_lans_for_topology(topology_id)
row = next((r for r in rows if r["id"] == lan_id), None)
row = next((r for r in rows if r.id == lan_id), None)
if row is None:
raise HTTPException(status_code=404, detail="LAN not found")
return LANRow(**row)
return row
@router.delete(
@@ -142,7 +142,7 @@ async def api_delete_lan(
await assert_pending_or_409(topology_id)
rows = await repo.list_lans_for_topology(topology_id)
if not any(r["id"] == lan_id for r in rows):
if not any(r.id == lan_id for r in rows):
raise HTTPException(status_code=404, detail="LAN not found")
try:

View File

@@ -63,11 +63,11 @@ async def api_enqueue_mutation(
_admin: dict = Depends(require_admin),
) -> MutationEnqueueResponse:
topo = await get_topology_or_404(topology_id)
if topo["status"] not in _MUTATABLE:
if topo.status not in _MUTATABLE:
raise HTTPException(
status_code=409,
detail=(
f"Topology is {topo['status']!r}; the mutation queue is "
f"Topology is {topo.status!r}; the mutation queue is "
f"only open for 'active' or 'degraded' topologies. Use "
f"child-CRUD endpoints while pending."
),

View File

@@ -54,13 +54,13 @@ async def list_topology_personas(
topo = await repo.get_topology(topology_id)
if topo is None:
raise HTTPException(status_code=404, detail="Topology not found")
language_default = topo.get("language_default") or "en"
language_default = topo.language_default or "en"
personas = parse_personas(
topo.get("email_personas"), language_default=language_default,
topo.email_personas, language_default=language_default,
)
return {
"topology_id": topology_id,
"topology_name": topo.get("name", ""),
"topology_name": topo.name,
"language_default": language_default,
"personas": _serialize(personas),
}
@@ -100,7 +100,7 @@ async def replace_topology_personas(
topo = await repo.get_topology(topology_id)
if topo is None:
raise HTTPException(status_code=404, detail="Topology not found")
language_default = topo.get("language_default") or "en"
language_default = topo.language_default or "en"
parsed = parse_personas(raw, language_default=language_default)
if raw and not parsed:
@@ -125,7 +125,7 @@ async def replace_topology_personas(
)
return {
"topology_id": topology_id,
"topology_name": topo.get("name", ""),
"topology_name": topo.name,
"language_default": language_default,
"personas": serialized,
}

View File

@@ -66,11 +66,11 @@ async def api_teardown_topology(
topo = await repo.get_topology(topology_id)
if topo is None:
raise HTTPException(status_code=404, detail="Topology not found")
if topo["status"] not in _TEARDOWNABLE:
if topo.status not in _TEARDOWNABLE:
raise HTTPException(
status_code=409,
detail=(
f"Topology is {topo['status']!r}; cannot teardown "
f"Topology is {topo.status!r}; cannot teardown "
f"(allowed from: {sorted(_TEARDOWNABLE)})."
),
)