refactor(topology_ops): decompose apply() into focused helpers
apply() was an 85-line function handling hash verification, validation, superseding teardown, bridge/compose provisioning, and store persistence. Extracted _check_hash_and_validate(), _teardown_superseded(), and _materialise() so each step is independently readable and testable.
This commit is contained in:
@@ -59,6 +59,73 @@ def _topology_id(hydrated: dict[str, Any]) -> str:
|
|||||||
return str(tid)
|
return str(tid)
|
||||||
|
|
||||||
|
|
||||||
|
def _check_hash_and_validate(hydrated: dict[str, Any], version_hash: str) -> str:
|
||||||
|
"""Verify hash integrity and structural validity; return topology_id."""
|
||||||
|
local_hash = canonical_hash(hydrated)
|
||||||
|
if local_hash != version_hash:
|
||||||
|
raise HashMismatch(
|
||||||
|
f"master hash {version_hash!r} does not match agent hash "
|
||||||
|
f"{local_hash!r} — refusing to apply"
|
||||||
|
)
|
||||||
|
issues = _validate_topology(hydrated)
|
||||||
|
if _validation_errors(issues):
|
||||||
|
raise ValidationError(issues)
|
||||||
|
return _topology_id(hydrated)
|
||||||
|
|
||||||
|
|
||||||
|
async def _teardown_superseded(topology_id: str, store: TopologyStore) -> None:
|
||||||
|
"""Tear down the current topology if it differs from topology_id.
|
||||||
|
|
||||||
|
Master is authoritative — a different pinned topology (fully applied,
|
||||||
|
partially applied, or drifted) is torn down before the new apply proceeds.
|
||||||
|
Refusing with 409 would leave the agent stuck in a state only a human
|
||||||
|
could resolve.
|
||||||
|
"""
|
||||||
|
existing = store.current()
|
||||||
|
if existing is None or existing.topology_id == topology_id:
|
||||||
|
return
|
||||||
|
log.info(
|
||||||
|
"superseding topology %s with %s on master authority",
|
||||||
|
existing.topology_id, topology_id,
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
await teardown(existing.topology_id, store)
|
||||||
|
except Exception as exc: # noqa: BLE001 — we still want to try applying
|
||||||
|
log.warning(
|
||||||
|
"best-effort teardown of superseded topology %s failed: %s",
|
||||||
|
existing.topology_id, exc,
|
||||||
|
)
|
||||||
|
# Hard-clear the store row so the new apply isn't blocked by a
|
||||||
|
# half-torn-down predecessor. Leftover docker objects surface via
|
||||||
|
# the next heartbeat's observed block.
|
||||||
|
store.clear(existing.topology_id)
|
||||||
|
|
||||||
|
|
||||||
|
def _materialise(hydrated: dict[str, Any], topology_id: str) -> None:
|
||||||
|
"""Create bridge networks, write compose file, and bring up containers.
|
||||||
|
|
||||||
|
Sync/blocking — callers must dispatch via asyncio.to_thread.
|
||||||
|
|
||||||
|
``--always-recreate-deps`` keeps service containers' netns shares
|
||||||
|
fresh: every decky service joins its base's netns via
|
||||||
|
``network_mode: container:<base>``, and that share is bound at
|
||||||
|
service start time. If a base is recreated (e.g. when ``ports:``
|
||||||
|
changes after toggling ``forwards_l3``) but compose decides the
|
||||||
|
services are unchanged, the services keep a stale netns FD
|
||||||
|
pointing at the destroyed base — they end up in an empty
|
||||||
|
namespace with only ``lo``, and external traffic hits a closed
|
||||||
|
port on the live base. Forcing dependents to recreate alongside
|
||||||
|
the base is the cheapest way to make this race impossible.
|
||||||
|
"""
|
||||||
|
compose_path = _topology_compose_path(topology_id)
|
||||||
|
client = docker.from_env()
|
||||||
|
for lan in hydrated["lans"]:
|
||||||
|
net_name = _topology_network_name(topology_id, lan["name"])
|
||||||
|
create_bridge_network(client, net_name, lan["subnet"], internal=not lan["is_dmz"])
|
||||||
|
write_topology_compose(hydrated, compose_path)
|
||||||
|
_compose_with_retry("up", "--build", "-d", "--always-recreate-deps", compose_file=compose_path)
|
||||||
|
|
||||||
|
|
||||||
async def apply(
|
async def apply(
|
||||||
hydrated: dict[str, Any],
|
hydrated: dict[str, Any],
|
||||||
version_hash: str,
|
version_hash: str,
|
||||||
@@ -73,76 +140,11 @@ async def apply(
|
|||||||
Any docker / compose error propagates up; the endpoint maps it
|
Any docker / compose error propagates up; the endpoint maps it
|
||||||
to 500 and records the message on the store row.
|
to 500 and records the message on the store row.
|
||||||
"""
|
"""
|
||||||
local_hash = canonical_hash(hydrated)
|
topology_id = _check_hash_and_validate(hydrated, version_hash)
|
||||||
if local_hash != version_hash:
|
await _teardown_superseded(topology_id, store)
|
||||||
raise HashMismatch(
|
await asyncio.to_thread(_materialise, hydrated, topology_id)
|
||||||
f"master hash {version_hash!r} does not match agent hash "
|
|
||||||
f"{local_hash!r} — refusing to apply"
|
|
||||||
)
|
|
||||||
|
|
||||||
issues = _validate_topology(hydrated)
|
|
||||||
if _validation_errors(issues):
|
|
||||||
raise ValidationError(issues)
|
|
||||||
|
|
||||||
topology_id = _topology_id(hydrated)
|
|
||||||
# Master is authoritative. If a different topology is pinned here
|
|
||||||
# — whether it fully applied, only partially applied (failure
|
|
||||||
# marker row + orphan containers), or drifted — teardown first,
|
|
||||||
# then accept the new one. Refusing with 409 would leave the
|
|
||||||
# agent stuck in a state only a human could resolve.
|
|
||||||
existing = store.current()
|
|
||||||
if existing is not None and existing.topology_id != topology_id:
|
|
||||||
log.info(
|
|
||||||
"superseding topology %s with %s on master authority",
|
|
||||||
existing.topology_id, topology_id,
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
await teardown(existing.topology_id, store)
|
|
||||||
except Exception as exc: # noqa: BLE001 — we still want to try applying
|
|
||||||
log.warning(
|
|
||||||
"best-effort teardown of superseded topology %s failed: %s",
|
|
||||||
existing.topology_id, exc,
|
|
||||||
)
|
|
||||||
# Hard-clear the store row so the new apply isn't blocked
|
|
||||||
# by a half-torn-down predecessor. Leftover docker objects
|
|
||||||
# will surface via the next heartbeat's observed block.
|
|
||||||
store.clear(existing.topology_id)
|
|
||||||
|
|
||||||
lans = hydrated["lans"]
|
|
||||||
compose_path = _topology_compose_path(topology_id)
|
|
||||||
client = docker.from_env()
|
|
||||||
|
|
||||||
# Bridges + compose are sync/blocking; hop to a thread so we don't
|
|
||||||
# stall the event loop on a slow docker daemon.
|
|
||||||
def _materialise() -> None:
|
|
||||||
for lan in lans:
|
|
||||||
net_name = _topology_network_name(topology_id, lan["name"])
|
|
||||||
internal = not lan["is_dmz"]
|
|
||||||
create_bridge_network(
|
|
||||||
client, net_name, lan["subnet"], internal=internal
|
|
||||||
)
|
|
||||||
write_topology_compose(hydrated, compose_path)
|
|
||||||
# ``--always-recreate-deps`` keeps service containers' netns shares
|
|
||||||
# fresh: every decky service joins its base's netns via
|
|
||||||
# ``network_mode: container:<base>``, and that share is bound at
|
|
||||||
# service start time. If a base is recreated (e.g. when ``ports:``
|
|
||||||
# changes after toggling ``forwards_l3``) but compose decides the
|
|
||||||
# services are unchanged, the services keep a stale netns FD
|
|
||||||
# pointing at the destroyed base — they end up in an empty
|
|
||||||
# namespace with only ``lo``, and external traffic hits a closed
|
|
||||||
# port on the live base. Forcing dependents to recreate alongside
|
|
||||||
# the base is the cheapest way to make this race impossible.
|
|
||||||
_compose_with_retry(
|
|
||||||
"up", "--build", "-d", "--always-recreate-deps",
|
|
||||||
compose_file=compose_path,
|
|
||||||
)
|
|
||||||
|
|
||||||
await asyncio.to_thread(_materialise)
|
|
||||||
|
|
||||||
store.put(topology_id, version_hash, hydrated)
|
store.put(topology_id, version_hash, hydrated)
|
||||||
log.info(
|
log.info("topology %s applied on agent (%d LANs)", topology_id, len(hydrated["lans"]))
|
||||||
"topology %s applied on agent (%d LANs)", topology_id, len(lans)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def teardown(
|
async def teardown(
|
||||||
|
|||||||
Reference in New Issue
Block a user