From aea3e7e05b031066bb71c330b3867df0630f288d Mon Sep 17 00:00:00 2001 From: anti Date: Tue, 21 Apr 2026 01:22:01 -0400 Subject: [PATCH] feat(agent): sqlite-backed topology_store as applied-state cache MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Single-row sqlite tracking which topology the agent last applied and its version hash. Sync/stdlib, same pattern as the log-forwarder offset store. v1 is one-topology-per-agent; attempting to apply a different topology over a populated row raises AlreadyApplied so the endpoint can return 409. observed() snapshots live docker state (decnet-topology-* bridges + decnet-* containers) for the heartbeat. The store is a cache, not authority — no auto-restore on boot. Master remains the only source of truth. Step 3 of the agent <-> topology integration. --- decnet/agent/topology_store.py | 182 +++++++++++++++++++++++ tests/swarm/test_agent_topology_store.py | 132 ++++++++++++++++ 2 files changed, 314 insertions(+) create mode 100644 decnet/agent/topology_store.py create mode 100644 tests/swarm/test_agent_topology_store.py diff --git a/decnet/agent/topology_store.py b/decnet/agent/topology_store.py new file mode 100644 index 00000000..f02bdf09 --- /dev/null +++ b/decnet/agent/topology_store.py @@ -0,0 +1,182 @@ +"""Agent-side sqlite cache of the currently-applied topology. + +**This is a cache, not a source of truth.** The master is the only +authority for what the agent should be running. This store exists so +the agent can answer two questions quickly and offline: + +1. What topology did I last apply, and with what version hash? +2. Is what docker is currently doing consistent with that? + +The hash goes out on every heartbeat; the master compares it to what +it thinks this host should be running and schedules a re-push on +mismatch. + +Why sqlite when the blob is JSON? Consistent with +:mod:`decnet.swarm.log_forwarder._OffsetStore` — single-row sqlite is +the project-wide pattern for agent-local persistent state. Keeps +operational mental model small: "one state.db per thing". + +Design choices worth calling out: + +- **One row, one topology.** v1 only supports a single topology per + agent. Attempting to :meth:`put` a different ``topology_id`` while + a row already exists raises :class:`AlreadyApplied` — the agent + rejects the apply with 409 and the master is expected to teardown + the old one first. +- **No auto-restore on boot.** The agent does NOT read this db at + startup and try to re-apply. Whatever docker has after a restart + is what it has; the next heartbeat reports the truth and the + master decides whether to re-push. Same reason we don't sync + mutations from agent → master anywhere else: split-brain is worse + than temporary drift. +""" +from __future__ import annotations + +import json +import pathlib +import sqlite3 +import time +from dataclasses import dataclass +from typing import Any, Optional + + +class AlreadyApplied(RuntimeError): + """Raised when a different topology is already pinned to this agent.""" + + +@dataclass(frozen=True) +class AppliedRow: + topology_id: str + applied_version_hash: str + hydrated: dict[str, Any] + applied_at: int + last_error: Optional[str] + + +class TopologyStore: + """Single-row sqlite cache. Stdlib only, sync (called from endpoints).""" + + def __init__(self, db_path: pathlib.Path) -> None: + db_path.parent.mkdir(parents=True, exist_ok=True) + self._conn = sqlite3.connect(str(db_path)) + self._conn.execute( + "CREATE TABLE IF NOT EXISTS applied_topology (" + " topology_id TEXT PRIMARY KEY," + " applied_version_hash TEXT NOT NULL," + " hydrated_blob_json TEXT NOT NULL," + " applied_at INTEGER NOT NULL," + " last_error TEXT)" + ) + self._conn.commit() + + # ----------------------------------------------------------------- reads + + def current(self) -> Optional[AppliedRow]: + """Return the single applied topology, or ``None`` if idle.""" + row = self._conn.execute( + "SELECT topology_id, applied_version_hash, hydrated_blob_json," + " applied_at, last_error FROM applied_topology LIMIT 1" + ).fetchone() + if row is None: + return None + return AppliedRow( + topology_id=row[0], + applied_version_hash=row[1], + hydrated=json.loads(row[2]), + applied_at=int(row[3]), + last_error=row[4], + ) + + # ---------------------------------------------------------------- writes + + def put( + self, + topology_id: str, + applied_version_hash: str, + hydrated: dict[str, Any], + ) -> None: + """Record an applied topology. + + If a *different* topology is already recorded, raises + :class:`AlreadyApplied`. Re-applying the same ``topology_id`` + just updates the hash + blob (idempotent re-push). + """ + existing = self.current() + if existing is not None and existing.topology_id != topology_id: + raise AlreadyApplied( + f"agent already has topology {existing.topology_id!r}; " + f"cannot apply {topology_id!r}" + ) + self._conn.execute( + "INSERT INTO applied_topology" + " (topology_id, applied_version_hash, hydrated_blob_json," + " applied_at, last_error)" + " VALUES (?, ?, ?, ?, NULL)" + " ON CONFLICT(topology_id) DO UPDATE SET" + " applied_version_hash=excluded.applied_version_hash," + " hydrated_blob_json=excluded.hydrated_blob_json," + " applied_at=excluded.applied_at," + " last_error=NULL", + ( + topology_id, + applied_version_hash, + json.dumps(hydrated, sort_keys=True), + int(time.time()), + ), + ) + self._conn.commit() + + def record_error(self, topology_id: str, message: str) -> None: + """Attach a last-error message to the current row (for debugging).""" + self._conn.execute( + "UPDATE applied_topology SET last_error=? WHERE topology_id=?", + (message, topology_id), + ) + self._conn.commit() + + def clear(self, topology_id: str) -> None: + """Remove the row for *topology_id* (post-teardown). + + No-op if the row doesn't exist — makes teardown idempotent. + """ + self._conn.execute( + "DELETE FROM applied_topology WHERE topology_id=?", + (topology_id,), + ) + self._conn.commit() + + def close(self) -> None: + self._conn.close() + + +# --------------------------------------------------- live docker observation + + +def observed(docker_client: Any) -> dict[str, Any]: + """Snapshot what docker is *actually* running on this agent. + + Returns a compact dict the heartbeat can ship so the master can + cross-check ``applied_version_hash`` against reality (a matching + hash with missing bridges is still drift). Best-effort: if docker + is unreachable we return an ``error`` marker rather than raising — + the agent still needs to heartbeat, and the master can treat + ``error`` as "unknown, re-push". + """ + try: + bridges = [ + n.name + for n in docker_client.networks.list() + if n.attrs.get("Driver") == "bridge" + and n.name.startswith("decnet-topology-") + ] + containers = [ + c.name + for c in docker_client.containers.list(all=False) + if c.name.startswith("decnet-") + ] + return {"bridges": sorted(bridges), "containers": sorted(containers)} + except Exception as exc: # noqa: BLE001 — best-effort observation + return {"error": str(exc)[:200]} + + +__all__ = ["TopologyStore", "AppliedRow", "AlreadyApplied", "observed"] diff --git a/tests/swarm/test_agent_topology_store.py b/tests/swarm/test_agent_topology_store.py new file mode 100644 index 00000000..4d96eba1 --- /dev/null +++ b/tests/swarm/test_agent_topology_store.py @@ -0,0 +1,132 @@ +"""Tests for :mod:`decnet.agent.topology_store`.""" +from __future__ import annotations + +import pathlib + +import pytest + +from decnet.agent.topology_store import ( + AlreadyApplied, + TopologyStore, + observed, +) + + +def _store(tmp_path: pathlib.Path) -> TopologyStore: + return TopologyStore(tmp_path / "topology.db") + + +def test_idle_by_default(tmp_path: pathlib.Path) -> None: + s = _store(tmp_path) + assert s.current() is None + s.close() + + +def test_put_then_current(tmp_path: pathlib.Path) -> None: + s = _store(tmp_path) + s.put("t1", "hash-a", {"topology": {"id": "t1"}, "lans": []}) + row = s.current() + assert row is not None + assert row.topology_id == "t1" + assert row.applied_version_hash == "hash-a" + assert row.hydrated["topology"]["id"] == "t1" + assert row.last_error is None + s.close() + + +def test_put_same_id_is_idempotent_update(tmp_path: pathlib.Path) -> None: + s = _store(tmp_path) + s.put("t1", "hash-a", {"k": 1}) + s.put("t1", "hash-b", {"k": 2}) + row = s.current() + assert row is not None + assert row.applied_version_hash == "hash-b" + assert row.hydrated == {"k": 2} + s.close() + + +def test_put_different_id_rejected(tmp_path: pathlib.Path) -> None: + s = _store(tmp_path) + s.put("t1", "hash-a", {}) + with pytest.raises(AlreadyApplied): + s.put("t2", "hash-b", {}) + s.close() + + +def test_record_error_then_put_clears(tmp_path: pathlib.Path) -> None: + s = _store(tmp_path) + s.put("t1", "h", {}) + s.record_error("t1", "kaboom") + assert s.current().last_error == "kaboom" + # Re-applying clears the error flag. + s.put("t1", "h2", {}) + assert s.current().last_error is None + s.close() + + +def test_clear(tmp_path: pathlib.Path) -> None: + s = _store(tmp_path) + s.put("t1", "h", {}) + s.clear("t1") + assert s.current() is None + # Clearing a missing id is a no-op (teardown idempotency). + s.clear("t1") + s.close() + + +def test_persists_across_reopen(tmp_path: pathlib.Path) -> None: + s = _store(tmp_path) + s.put("t1", "h", {"x": 1}) + s.close() + s2 = _store(tmp_path) + row = s2.current() + assert row is not None + assert row.topology_id == "t1" + s2.close() + + +# -------------------------------------------------------- observed() helper + + +class _FakeNet: + def __init__(self, name: str, driver: str) -> None: + self.name = name + self.attrs = {"Driver": driver} + + +class _FakeContainer: + def __init__(self, name: str) -> None: + self.name = name + + +class _FakeDocker: + def __init__(self, nets, containers) -> None: + self.networks = type("N", (), {"list": lambda _self: nets})() + self.containers = type( + "C", (), {"list": lambda _self, all=False: containers} + )() + + +def test_observed_filters_by_prefix() -> None: + nets = [ + _FakeNet("decnet-topology-abc", "bridge"), + _FakeNet("bridge", "bridge"), + _FakeNet("decnet-topology-xyz", "overlay"), # wrong driver — filtered + ] + containers = [_FakeContainer("decnet-deaddeck"), _FakeContainer("sshd")] + snap = observed(_FakeDocker(nets, containers)) + assert snap == { + "bridges": ["decnet-topology-abc"], + "containers": ["decnet-deaddeck"], + } + + +def test_observed_reports_error_on_failure() -> None: + class _Broken: + @property + def networks(self): + raise RuntimeError("docker down") + + snap = observed(_Broken()) + assert "error" in snap + assert "docker down" in snap["error"]