feat(agent/collector): topology-label discovery and master-authoritative supersede

Legacy fleet deckies live in decnet-state.json; MazeNET topology
containers don't. Tag them at compose-time with
decnet.topology.service=true and let the collector match on that label.
Spin up the agent's log collector on the first successful /topology/apply
(not in the lifespan — that would break the no-docker-on-boot invariant)
and tear it down with the app. Land log lines in DECNET_AGENT_LOG_FILE,
separate from master-side DECNET_INGEST_LOG_FILE, so a dev box running
both roles can't forward its own ingest back to itself.

When master pushes a topology that differs from whatever is pinned
locally, teardown the predecessor and accept the new one. Refusing with
409 left the agent stranded after partial deploys. record_error now
persists the hydrated blob so a later teardown can still walk the LAN
list — otherwise a half-failed apply strands containers + bridges with
no breadcrumb back to them.
This commit is contained in:
2026-04-21 10:23:10 -04:00
parent 050607e00d
commit 0cdcfe2653
8 changed files with 221 additions and 20 deletions

View File

@@ -18,6 +18,7 @@ Endpoints mirror the existing unihost CLI verbs:
"""
from __future__ import annotations
import asyncio
import os
import pathlib
from contextlib import asynccontextmanager
@@ -60,6 +61,40 @@ def _store() -> TopologyStore:
return _topology_store
_collector_task: Optional[asyncio.Task] = None
def _ensure_collector_started() -> None:
"""Spawn the log collector on demand — called from /topology/apply
after a successful materialise. We must NOT start this in the
lifespan hook: the agent's boot invariant is "never touch docker
until master tells us to" (see tests/swarm/test_agent_no_auto_restore.py).
The collector watches ``decnet.topology.service=true`` labels via
docker events, writing RFC 5424 lines to ``DECNET_AGENT_LOG_FILE``
which the forwarder ships to the master over syslog-TLS. Idempotent:
subsequent calls while the task is still running are no-ops.
"""
global _collector_task
if _collector_task is not None and not _collector_task.done():
return
from decnet.env import DECNET_AGENT_LOG_FILE
try:
from decnet.collector.worker import log_collector_worker
except Exception: # noqa: BLE001 — docker may be unavailable on dev
log.warning(
"agent log collector not starting — collector worker import failed",
exc_info=True,
)
return
_collector_task = asyncio.create_task(
log_collector_worker(DECNET_AGENT_LOG_FILE),
name="agent-log-collector",
)
log.info("agent log collector started log_file=%s", DECNET_AGENT_LOG_FILE)
@asynccontextmanager
async def _lifespan(app: FastAPI):
# Best-effort: if identity/bundle plumbing isn't configured (e.g. dev
@@ -69,6 +104,14 @@ async def _lifespan(app: FastAPI):
yield
finally:
await _heartbeat.stop()
global _collector_task
if _collector_task is not None and not _collector_task.done():
_collector_task.cancel()
try:
await _collector_task
except (asyncio.CancelledError, Exception): # noqa: BLE001
pass
_collector_task = None
global _topology_store
if _topology_store is not None:
_topology_store.close()
@@ -200,10 +243,13 @@ async def topology_apply(req: ApplyTopologyRequest) -> dict:
topology_id = (req.hydrated.get("topology") or {}).get("id")
if topology_id:
try:
store.record_error(str(topology_id), str(exc)[:500])
store.record_error(
str(topology_id), str(exc)[:500], hydrated=req.hydrated,
)
except Exception: # noqa: BLE001 — don't mask original failure
log.exception("failed to record apply error")
raise HTTPException(status_code=500, detail=str(exc)) from exc
_ensure_collector_started()
return {"status": "applied", "version_hash": req.version_hash}

View File

@@ -8,8 +8,8 @@ an agent — here we operate purely on a hydrated dict + the local
:class:`TopologyStore`.
v1 constraint: one topology per agent. A second apply for a different
``topology_id`` raises :class:`AlreadyApplied` (the endpoint maps that
to 409).
``topology_id`` triggers an on-the-spot teardown of the predecessor
before the new apply proceeds — master is authoritative.
"""
from __future__ import annotations
@@ -20,7 +20,6 @@ from typing import Any
import docker
from decnet.agent.topology_store import (
AlreadyApplied,
TopologyStore,
observed,
)
@@ -70,7 +69,6 @@ async def apply(
Raises:
HashMismatch: master and agent disagree on the canonical hash —
don't touch docker, fail the apply.
AlreadyApplied: a different topology is already applied here.
ValidationError: topology fails structural validation.
Any docker / compose error propagates up; the endpoint maps it
to 500 and records the message on the store row.
@@ -87,15 +85,28 @@ async def apply(
raise ValidationError(issues)
topology_id = _topology_id(hydrated)
# v1 guard: refuse cross-topology overwrite up-front. Same check
# lives in store.put() but we want a clean 409 path before we
# start mutating docker state.
# Master is authoritative. If a different topology is pinned here
# — whether it fully applied, only partially applied (failure
# marker row + orphan containers), or drifted — teardown first,
# then accept the new one. Refusing with 409 would leave the
# agent stuck in a state only a human could resolve.
existing = store.current()
if existing is not None and existing.topology_id != topology_id:
raise AlreadyApplied(
f"agent already has topology {existing.topology_id!r}; "
f"cannot apply {topology_id!r}"
log.info(
"superseding topology %s with %s on master authority",
existing.topology_id, topology_id,
)
try:
await teardown(existing.topology_id, store)
except Exception as exc: # noqa: BLE001 — we still want to try applying
log.warning(
"best-effort teardown of superseded topology %s failed: %s",
existing.topology_id, exc,
)
# Hard-clear the store row so the new apply isn't blocked
# by a half-torn-down predecessor. Leftover docker objects
# will surface via the next heartbeat's observed block.
store.clear(existing.topology_id)
lans = hydrated["lans"]
compose_path = _topology_compose_path(topology_id)

View File

@@ -130,7 +130,12 @@ class TopologyStore:
)
self._conn.commit()
def record_error(self, topology_id: str, message: str) -> None:
def record_error(
self,
topology_id: str,
message: str,
hydrated: Optional[dict[str, Any]] = None,
) -> None:
"""Attach a last-error message for *topology_id*.
Upserts a marker row when no apply has yet succeeded for this
@@ -139,14 +144,24 @@ class TopologyStore:
/topology/state and the next heartbeat. The marker row uses an
empty ``applied_version_hash`` so master's heartbeat check sees
the hash mismatch and schedules a resync.
If *hydrated* is provided it is stored so a later teardown can
still walk the LAN list — otherwise a partial deploy is strands
containers + bridges with no breadcrumb back to them.
"""
blob = json.dumps(hydrated, sort_keys=True) if hydrated else "{}"
self._conn.execute(
"INSERT INTO applied_topology"
" (topology_id, applied_version_hash, hydrated_blob_json,"
" applied_at, last_error)"
" VALUES (?, '', '{}', 0, ?)"
" ON CONFLICT(topology_id) DO UPDATE SET last_error=excluded.last_error",
(topology_id, message),
" VALUES (?, '', ?, 0, ?)"
" ON CONFLICT(topology_id) DO UPDATE SET"
" last_error=excluded.last_error,"
" hydrated_blob_json=CASE"
" WHEN applied_topology.hydrated_blob_json='{}'"
" THEN excluded.hydrated_blob_json"
" ELSE applied_topology.hydrated_blob_json END",
(topology_id, blob, message),
)
self._conn.commit()