diff --git a/decnet/agent/app.py b/decnet/agent/app.py index b37e22da..7b71cf6f 100644 --- a/decnet/agent/app.py +++ b/decnet/agent/app.py @@ -18,6 +18,7 @@ Endpoints mirror the existing unihost CLI verbs: """ from __future__ import annotations +import asyncio import os import pathlib from contextlib import asynccontextmanager @@ -60,6 +61,40 @@ def _store() -> TopologyStore: return _topology_store +_collector_task: Optional[asyncio.Task] = None + + +def _ensure_collector_started() -> None: + """Spawn the log collector on demand — called from /topology/apply + after a successful materialise. We must NOT start this in the + lifespan hook: the agent's boot invariant is "never touch docker + until master tells us to" (see tests/swarm/test_agent_no_auto_restore.py). + + The collector watches ``decnet.topology.service=true`` labels via + docker events, writing RFC 5424 lines to ``DECNET_AGENT_LOG_FILE`` + which the forwarder ships to the master over syslog-TLS. Idempotent: + subsequent calls while the task is still running are no-ops. + """ + global _collector_task + if _collector_task is not None and not _collector_task.done(): + return + from decnet.env import DECNET_AGENT_LOG_FILE + + try: + from decnet.collector.worker import log_collector_worker + except Exception: # noqa: BLE001 — docker may be unavailable on dev + log.warning( + "agent log collector not starting — collector worker import failed", + exc_info=True, + ) + return + _collector_task = asyncio.create_task( + log_collector_worker(DECNET_AGENT_LOG_FILE), + name="agent-log-collector", + ) + log.info("agent log collector started log_file=%s", DECNET_AGENT_LOG_FILE) + + @asynccontextmanager async def _lifespan(app: FastAPI): # Best-effort: if identity/bundle plumbing isn't configured (e.g. dev @@ -69,6 +104,14 @@ async def _lifespan(app: FastAPI): yield finally: await _heartbeat.stop() + global _collector_task + if _collector_task is not None and not _collector_task.done(): + _collector_task.cancel() + try: + await _collector_task + except (asyncio.CancelledError, Exception): # noqa: BLE001 + pass + _collector_task = None global _topology_store if _topology_store is not None: _topology_store.close() @@ -200,10 +243,13 @@ async def topology_apply(req: ApplyTopologyRequest) -> dict: topology_id = (req.hydrated.get("topology") or {}).get("id") if topology_id: try: - store.record_error(str(topology_id), str(exc)[:500]) + store.record_error( + str(topology_id), str(exc)[:500], hydrated=req.hydrated, + ) except Exception: # noqa: BLE001 — don't mask original failure log.exception("failed to record apply error") raise HTTPException(status_code=500, detail=str(exc)) from exc + _ensure_collector_started() return {"status": "applied", "version_hash": req.version_hash} diff --git a/decnet/agent/topology_ops.py b/decnet/agent/topology_ops.py index c11e7a52..6db170ec 100644 --- a/decnet/agent/topology_ops.py +++ b/decnet/agent/topology_ops.py @@ -8,8 +8,8 @@ an agent — here we operate purely on a hydrated dict + the local :class:`TopologyStore`. v1 constraint: one topology per agent. A second apply for a different -``topology_id`` raises :class:`AlreadyApplied` (the endpoint maps that -to 409). +``topology_id`` triggers an on-the-spot teardown of the predecessor +before the new apply proceeds — master is authoritative. """ from __future__ import annotations @@ -20,7 +20,6 @@ from typing import Any import docker from decnet.agent.topology_store import ( - AlreadyApplied, TopologyStore, observed, ) @@ -70,7 +69,6 @@ async def apply( Raises: HashMismatch: master and agent disagree on the canonical hash — don't touch docker, fail the apply. - AlreadyApplied: a different topology is already applied here. ValidationError: topology fails structural validation. Any docker / compose error propagates up; the endpoint maps it to 500 and records the message on the store row. @@ -87,15 +85,28 @@ async def apply( raise ValidationError(issues) topology_id = _topology_id(hydrated) - # v1 guard: refuse cross-topology overwrite up-front. Same check - # lives in store.put() but we want a clean 409 path before we - # start mutating docker state. + # Master is authoritative. If a different topology is pinned here + # — whether it fully applied, only partially applied (failure + # marker row + orphan containers), or drifted — teardown first, + # then accept the new one. Refusing with 409 would leave the + # agent stuck in a state only a human could resolve. existing = store.current() if existing is not None and existing.topology_id != topology_id: - raise AlreadyApplied( - f"agent already has topology {existing.topology_id!r}; " - f"cannot apply {topology_id!r}" + log.info( + "superseding topology %s with %s on master authority", + existing.topology_id, topology_id, ) + try: + await teardown(existing.topology_id, store) + except Exception as exc: # noqa: BLE001 — we still want to try applying + log.warning( + "best-effort teardown of superseded topology %s failed: %s", + existing.topology_id, exc, + ) + # Hard-clear the store row so the new apply isn't blocked + # by a half-torn-down predecessor. Leftover docker objects + # will surface via the next heartbeat's observed block. + store.clear(existing.topology_id) lans = hydrated["lans"] compose_path = _topology_compose_path(topology_id) diff --git a/decnet/agent/topology_store.py b/decnet/agent/topology_store.py index f8f40bd1..7112307e 100644 --- a/decnet/agent/topology_store.py +++ b/decnet/agent/topology_store.py @@ -130,7 +130,12 @@ class TopologyStore: ) self._conn.commit() - def record_error(self, topology_id: str, message: str) -> None: + def record_error( + self, + topology_id: str, + message: str, + hydrated: Optional[dict[str, Any]] = None, + ) -> None: """Attach a last-error message for *topology_id*. Upserts a marker row when no apply has yet succeeded for this @@ -139,14 +144,24 @@ class TopologyStore: /topology/state and the next heartbeat. The marker row uses an empty ``applied_version_hash`` so master's heartbeat check sees the hash mismatch and schedules a resync. + + If *hydrated* is provided it is stored so a later teardown can + still walk the LAN list — otherwise a partial deploy is strands + containers + bridges with no breadcrumb back to them. """ + blob = json.dumps(hydrated, sort_keys=True) if hydrated else "{}" self._conn.execute( "INSERT INTO applied_topology" " (topology_id, applied_version_hash, hydrated_blob_json," " applied_at, last_error)" - " VALUES (?, '', '{}', 0, ?)" - " ON CONFLICT(topology_id) DO UPDATE SET last_error=excluded.last_error", - (topology_id, message), + " VALUES (?, '', ?, 0, ?)" + " ON CONFLICT(topology_id) DO UPDATE SET" + " last_error=excluded.last_error," + " hydrated_blob_json=CASE" + " WHEN applied_topology.hydrated_blob_json='{}'" + " THEN excluded.hydrated_blob_json" + " ELSE applied_topology.hydrated_blob_json END", + (topology_id, blob, message), ) self._conn.commit() diff --git a/decnet/cli/agent.py b/decnet/cli/agent.py index ae89a464..5a04d5a6 100644 --- a/decnet/cli/agent.py +++ b/decnet/cli/agent.py @@ -29,7 +29,7 @@ def register(app: typer.Typer) -> None: with `decnet forwarder --daemon …`. Pass --no-forwarder to skip. """ from decnet.agent import server as _agent_server - from decnet.env import DECNET_SWARM_MASTER_HOST, DECNET_INGEST_LOG_FILE + from decnet.env import DECNET_SWARM_MASTER_HOST, DECNET_AGENT_LOG_FILE from decnet.swarm import pki as _pki resolved_dir = _pathlib.Path(agent_dir) if agent_dir else _pki.DEFAULT_AGENT_DIR @@ -44,7 +44,7 @@ def register(app: typer.Typer) -> None: "--master-host", DECNET_SWARM_MASTER_HOST, "--master-port", str(int(os.environ.get("DECNET_SWARM_SYSLOG_PORT", "6514"))), "--agent-dir", str(resolved_dir), - "--log-file", str(DECNET_INGEST_LOG_FILE), + "--log-file", str(DECNET_AGENT_LOG_FILE), "--daemon", ] try: diff --git a/decnet/collector/worker.py b/decnet/collector/worker.py index 3234afc8..2a747a23 100644 --- a/decnet/collector/worker.py +++ b/decnet/collector/worker.py @@ -210,16 +210,46 @@ def _load_service_container_names() -> set[str]: return names +_TOPOLOGY_SERVICE_LABEL = "decnet.topology.service" + + +def _has_topology_service_label(labels: Optional[dict]) -> bool: + """MazeNET topology containers are tagged at compose-time (see + ``decnet/topology/compose.py``) so the collector can discover them + without consulting ``decnet-state.json`` — that state file only + knows about legacy fleet deckies.""" + if not labels: + return False + return labels.get(_TOPOLOGY_SERVICE_LABEL) == "true" + + def is_service_container(container) -> bool: """Return True if this Docker container is a known DECNET service container.""" - name = (container if isinstance(container, str) else container.name).lstrip("/") - return name in _load_service_container_names() + if isinstance(container, str): + return container.lstrip("/") in _load_service_container_names() + name = container.name.lstrip("/") + if name in _load_service_container_names(): + return True + # MazeNET topology containers aren't in decnet-state.json — discover + # them via compose-time labels instead. Tolerant to stub objects + # that don't expose .attrs/.labels (unit tests). + labels: Optional[dict] = None + attrs = getattr(container, "attrs", None) + if isinstance(attrs, dict): + labels = (attrs.get("Config") or {}).get("Labels") + if labels is None: + labels = getattr(container, "labels", None) + return _has_topology_service_label(labels) def is_service_event(attrs: dict) -> bool: """Return True if a Docker start event is for a known DECNET service container.""" name = attrs.get("name", "").lstrip("/") - return name in _load_service_container_names() + if name in _load_service_container_names(): + return True + # Docker start-event attrs contains every container label flat alongside + # 'name' / 'image' — no separate 'labels' sub-dict. + return attrs.get(_TOPOLOGY_SERVICE_LABEL) == "true" # ─── Blocking stream worker (runs in a thread) ──────────────────────────────── diff --git a/decnet/env.py b/decnet/env.py index 90cf2216..2055f014 100644 --- a/decnet/env.py +++ b/decnet/env.py @@ -84,6 +84,16 @@ DECNET_API_PORT: int = _port("DECNET_API_PORT", 8000) # the master's JWT secret being present in the environment. DECNET_INGEST_LOG_FILE: str | None = os.environ.get("DECNET_INGEST_LOG_FILE", "/var/log/decnet/decnet.log") +# Agent-side RFC 5424 sink written by decnet.collector.worker when run on +# a SWARM worker. The forwarder tails this file and ships lines over +# syslog-TLS to the master listener. Kept separate from +# DECNET_INGEST_LOG_FILE so a workstation-dev box (which may run both the +# master and a throwaway agent pointed at itself) can't accidentally +# recurse by forwarding its own ingest file back to itself. +DECNET_AGENT_LOG_FILE: str = os.environ.get( + "DECNET_AGENT_LOG_FILE", "/var/log/decnet/agent.log" +) + # SWARM log pipeline — RFC 5425 syslog-over-TLS between worker forwarders # and the master listener. Plaintext syslog across hosts is forbidden. DECNET_SWARM_SYSLOG_PORT: int = _port("DECNET_SWARM_SYSLOG_PORT", 6514) diff --git a/decnet/topology/compose.py b/decnet/topology/compose.py index b2fb59dd..0cb41357 100644 --- a/decnet/topology/compose.py +++ b/decnet/topology/compose.py @@ -83,6 +83,14 @@ def generate_topology_compose(hydrated: dict[str, Any]) -> dict: "networks": nets, "cap_add": ["NET_ADMIN"], "logging": _DOCKER_LOGGING, + # Labels let the host collector discover topology containers + # without consulting decnet-state.json (which only knows about + # legacy fleet deckies). See decnet/collector/worker.py. + "labels": { + "decnet.topology.id": topology_id, + "decnet.topology.decky": name, + "decnet.topology.role": "base", + }, } if forwards_l3: base["sysctls"] = {"net.ipv4.ip_forward": 1} @@ -120,6 +128,17 @@ def generate_topology_compose(hydrated: dict[str, Any]) -> dict: fragment.pop("hostname", None) fragment.pop("networks", None) fragment["logging"] = _DOCKER_LOGGING + # ``decnet.topology.service=true`` is the marker the collector + # filters on — without it, log streams for this container are + # never attached. + labels = dict(fragment.get("labels") or {}) + labels.update({ + "decnet.topology.id": topology_id, + "decnet.topology.decky": name, + "decnet.topology.service_name": svc_name, + "decnet.topology.service": "true", + }) + fragment["labels"] = labels services[f"{name}-{svc_name}"] = fragment networks: dict[str, dict] = { diff --git a/tests/test_collector.py b/tests/test_collector.py index bcef1dd4..0d0c6594 100644 --- a/tests/test_collector.py +++ b/tests/test_collector.py @@ -233,6 +233,76 @@ class TestIsServiceEvent: assert is_service_event({}) is False +class TestTopologyLabelDiscovery: + """MazeNET topology containers aren't in decnet-state.json — the + collector discovers them via compose-time labels instead.""" + + def _labelled(self, name: str, labels: dict): + return SimpleNamespace( + name=name, + attrs={"Config": {"Labels": labels}}, + labels=labels, + ) + + def test_topology_labelled_container_matches(self): + """Unknown name + decnet.topology.service=true label → True.""" + with patch("decnet.collector.worker._load_service_container_names", return_value=set()): + c = self._labelled( + "decky-2966-ssh", + {"decnet.topology.service": "true", "decnet.topology.id": "abc"}, + ) + assert is_service_container(c) is True + + def test_base_container_label_does_not_match(self): + """Base containers carry decnet.topology.role=base but NOT the + service marker — collector must ignore them or we double-capture + the sshd auth stream from both the base and the service share.""" + with patch("decnet.collector.worker._load_service_container_names", return_value=set()): + c = self._labelled( + "decnet_t_af22dae8_decky-2966", + {"decnet.topology.role": "base", "decnet.topology.id": "abc"}, + ) + assert is_service_container(c) is False + + def test_unrelated_container_with_labels_does_not_match(self): + with patch("decnet.collector.worker._load_service_container_names", return_value=set()): + c = self._labelled("portainer", {"com.docker.compose.project": "portainer"}) + assert is_service_container(c) is False + + def test_topology_event_matches_via_label(self): + """Docker start events flatten labels alongside 'name' in attrs — + is_service_event must detect that shape.""" + with patch("decnet.collector.worker._load_service_container_names", return_value=set()): + attrs = { + "name": "decky-2966-ssh", + "decnet.topology.service": "true", + "decnet.topology.id": "abc", + } + assert is_service_event(attrs) is True + + def test_fleet_and_topology_coexist(self): + """Fleet match wins when the name is in state; topology label + catches containers that aren't.""" + with patch("decnet.collector.worker._load_service_container_names", return_value=_KNOWN_NAMES): + fleet_c = _make_container("omega-decky-http") + topo_c = self._labelled( + "decky-2966-ssh", + {"decnet.topology.service": "true"}, + ) + assert is_service_container(fleet_c) is True + assert is_service_container(topo_c) is True + + def test_stub_without_attrs_still_works_via_labels(self): + """Older test stubs use SimpleNamespace(name=…) with no .attrs — + falling back to .labels should still evaluate.""" + with patch("decnet.collector.worker._load_service_container_names", return_value=set()): + c = SimpleNamespace( + name="decky-2966-ssh", + labels={"decnet.topology.service": "true"}, + ) + assert is_service_container(c) is True + + class TestLoadServiceContainerNames: def test_with_valid_state(self, tmp_path, monkeypatch): import decnet.config