diff --git a/decnet/agent/topology_ops.py b/decnet/agent/topology_ops.py
index f8f156f2..7a03233d 100644
--- a/decnet/agent/topology_ops.py
+++ b/decnet/agent/topology_ops.py
@@ -59,6 +59,73 @@ def _topology_id(hydrated: dict[str, Any]) -> str:
return str(tid)
+def _check_hash_and_validate(hydrated: dict[str, Any], version_hash: str) -> str:
+ """Verify hash integrity and structural validity; return topology_id."""
+ local_hash = canonical_hash(hydrated)
+ if local_hash != version_hash:
+ raise HashMismatch(
+ f"master hash {version_hash!r} does not match agent hash "
+ f"{local_hash!r} — refusing to apply"
+ )
+ issues = _validate_topology(hydrated)
+ if _validation_errors(issues):
+ raise ValidationError(issues)
+ return _topology_id(hydrated)
+
+
+async def _teardown_superseded(topology_id: str, store: TopologyStore) -> None:
+ """Tear down the current topology if it differs from topology_id.
+
+ Master is authoritative — a different pinned topology (fully applied,
+ partially applied, or drifted) is torn down before the new apply proceeds.
+ Refusing with 409 would leave the agent stuck in a state only a human
+ could resolve.
+ """
+ existing = store.current()
+ if existing is None or existing.topology_id == topology_id:
+ return
+ log.info(
+ "superseding topology %s with %s on master authority",
+ existing.topology_id, topology_id,
+ )
+ try:
+ await teardown(existing.topology_id, store)
+ except Exception as exc: # noqa: BLE001 — we still want to try applying
+ log.warning(
+ "best-effort teardown of superseded topology %s failed: %s",
+ existing.topology_id, exc,
+ )
+ # Hard-clear the store row so the new apply isn't blocked by a
+ # half-torn-down predecessor. Leftover docker objects surface via
+ # the next heartbeat's observed block.
+ store.clear(existing.topology_id)
+
+
+def _materialise(hydrated: dict[str, Any], topology_id: str) -> None:
+ """Create bridge networks, write compose file, and bring up containers.
+
+ Sync/blocking — callers must dispatch via asyncio.to_thread.
+
+ ``--always-recreate-deps`` keeps service containers' netns shares
+ fresh: every decky service joins its base's netns via
+ ``network_mode: container:``, and that share is bound at
+ service start time. If a base is recreated (e.g. when ``ports:``
+ changes after toggling ``forwards_l3``) but compose decides the
+ services are unchanged, the services keep a stale netns FD
+ pointing at the destroyed base — they end up in an empty
+ namespace with only ``lo``, and external traffic hits a closed
+ port on the live base. Forcing dependents to recreate alongside
+ the base is the cheapest way to make this race impossible.
+ """
+ compose_path = _topology_compose_path(topology_id)
+ client = docker.from_env()
+ for lan in hydrated["lans"]:
+ net_name = _topology_network_name(topology_id, lan["name"])
+ create_bridge_network(client, net_name, lan["subnet"], internal=not lan["is_dmz"])
+ write_topology_compose(hydrated, compose_path)
+ _compose_with_retry("up", "--build", "-d", "--always-recreate-deps", compose_file=compose_path)
+
+
async def apply(
hydrated: dict[str, Any],
version_hash: str,
@@ -73,76 +140,11 @@ async def apply(
Any docker / compose error propagates up; the endpoint maps it
to 500 and records the message on the store row.
"""
- local_hash = canonical_hash(hydrated)
- if local_hash != version_hash:
- raise HashMismatch(
- f"master hash {version_hash!r} does not match agent hash "
- f"{local_hash!r} — refusing to apply"
- )
-
- issues = _validate_topology(hydrated)
- if _validation_errors(issues):
- raise ValidationError(issues)
-
- topology_id = _topology_id(hydrated)
- # Master is authoritative. If a different topology is pinned here
- # — whether it fully applied, only partially applied (failure
- # marker row + orphan containers), or drifted — teardown first,
- # then accept the new one. Refusing with 409 would leave the
- # agent stuck in a state only a human could resolve.
- existing = store.current()
- if existing is not None and existing.topology_id != topology_id:
- log.info(
- "superseding topology %s with %s on master authority",
- existing.topology_id, topology_id,
- )
- try:
- await teardown(existing.topology_id, store)
- except Exception as exc: # noqa: BLE001 — we still want to try applying
- log.warning(
- "best-effort teardown of superseded topology %s failed: %s",
- existing.topology_id, exc,
- )
- # Hard-clear the store row so the new apply isn't blocked
- # by a half-torn-down predecessor. Leftover docker objects
- # will surface via the next heartbeat's observed block.
- store.clear(existing.topology_id)
-
- lans = hydrated["lans"]
- compose_path = _topology_compose_path(topology_id)
- client = docker.from_env()
-
- # Bridges + compose are sync/blocking; hop to a thread so we don't
- # stall the event loop on a slow docker daemon.
- def _materialise() -> None:
- for lan in lans:
- net_name = _topology_network_name(topology_id, lan["name"])
- internal = not lan["is_dmz"]
- create_bridge_network(
- client, net_name, lan["subnet"], internal=internal
- )
- write_topology_compose(hydrated, compose_path)
- # ``--always-recreate-deps`` keeps service containers' netns shares
- # fresh: every decky service joins its base's netns via
- # ``network_mode: container:``, and that share is bound at
- # service start time. If a base is recreated (e.g. when ``ports:``
- # changes after toggling ``forwards_l3``) but compose decides the
- # services are unchanged, the services keep a stale netns FD
- # pointing at the destroyed base — they end up in an empty
- # namespace with only ``lo``, and external traffic hits a closed
- # port on the live base. Forcing dependents to recreate alongside
- # the base is the cheapest way to make this race impossible.
- _compose_with_retry(
- "up", "--build", "-d", "--always-recreate-deps",
- compose_file=compose_path,
- )
-
- await asyncio.to_thread(_materialise)
-
+ topology_id = _check_hash_and_validate(hydrated, version_hash)
+ await _teardown_superseded(topology_id, store)
+ await asyncio.to_thread(_materialise, hydrated, topology_id)
store.put(topology_id, version_hash, hydrated)
- log.info(
- "topology %s applied on agent (%d LANs)", topology_id, len(lans)
- )
+ log.info("topology %s applied on agent (%d LANs)", topology_id, len(hydrated["lans"]))
async def teardown(