From 12e18b75dbfbf3964702b4f70e6df3cb7cd97892 Mon Sep 17 00:00:00 2001 From: anti Date: Tue, 21 Apr 2026 01:41:30 -0400 Subject: [PATCH] feat(swarm): expose needs_resync on TopologySummary + upsert record_error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two small observability follow-ups to the phase-1 agent/topology wiring: TopologySummary now carries needs_resync so operators can see the heartbeat's resync flag via the topology list/detail API without dropping into the DB. TopologyStore.record_error becomes an upsert — when a docker/compose failure fires during the first materialise (put() never reached), we still land a marker row so GET /topology/state surfaces the error and the next heartbeat carries an empty applied_version_hash. That empty hash is what master's heartbeat check relies on to flag the topology for resync instead of assuming the apply succeeded. --- decnet/agent/topology_store.py | 18 ++++++++++--- decnet/web/db/models.py | 1 + tests/api/topology/test_models.py | 16 +++++++++++ tests/swarm/test_agent_topology_endpoints.py | 23 ++++++++++++++++ tests/swarm/test_agent_topology_store.py | 28 ++++++++++++++++++++ 5 files changed, 83 insertions(+), 3 deletions(-) diff --git a/decnet/agent/topology_store.py b/decnet/agent/topology_store.py index d3614927..f8f40bd1 100644 --- a/decnet/agent/topology_store.py +++ b/decnet/agent/topology_store.py @@ -131,10 +131,22 @@ class TopologyStore: self._conn.commit() def record_error(self, topology_id: str, message: str) -> None: - """Attach a last-error message to the current row (for debugging).""" + """Attach a last-error message for *topology_id*. + + Upserts a marker row when no apply has yet succeeded for this + topology — that way a failure *during* the first materialise + (put() hasn't been reached) still surfaces via GET + /topology/state and the next heartbeat. The marker row uses an + empty ``applied_version_hash`` so master's heartbeat check sees + the hash mismatch and schedules a resync. + """ self._conn.execute( - "UPDATE applied_topology SET last_error=? WHERE topology_id=?", - (message, topology_id), + "INSERT INTO applied_topology" + " (topology_id, applied_version_hash, hydrated_blob_json," + " applied_at, last_error)" + " VALUES (?, '', '{}', 0, ?)" + " ON CONFLICT(topology_id) DO UPDATE SET last_error=excluded.last_error", + (topology_id, message), ) self._conn.commit() diff --git a/decnet/web/db/models.py b/decnet/web/db/models.py index f5fb2e96..3dc9902e 100644 --- a/decnet/web/db/models.py +++ b/decnet/web/db/models.py @@ -687,6 +687,7 @@ class TopologySummary(BaseModel): target_host_uuid: Optional[str] = None status: str version: int + needs_resync: bool = False created_at: datetime status_changed_at: Optional[datetime] = None diff --git a/tests/api/topology/test_models.py b/tests/api/topology/test_models.py index 354f25c3..91cb7e6f 100644 --- a/tests/api/topology/test_models.py +++ b/tests/api/topology/test_models.py @@ -84,6 +84,22 @@ async def test_summary_accepts_repo_topology_row(repo): summary = TopologySummary(**row) assert summary.id == tid assert summary.version == 1 + # Defaults surface cleanly on a fresh topology. + assert summary.needs_resync is False + assert summary.target_host_uuid is None + + +@pytest.mark.anyio +async def test_summary_surfaces_needs_resync_flag(repo): + """When the heartbeat handler flags a topology for resync, the API + list/detail views must expose it so operators can debug without + shelling into the DB.""" + plan = generate(_cfg()) + tid = await persist(repo, plan) + await repo.set_topology_resync(tid, True) + row = await repo.get_topology(tid) + summary = TopologySummary(**row) + assert summary.needs_resync is True @pytest.mark.anyio diff --git a/tests/swarm/test_agent_topology_endpoints.py b/tests/swarm/test_agent_topology_endpoints.py index 7d886280..0417da22 100644 --- a/tests/swarm/test_agent_topology_endpoints.py +++ b/tests/swarm/test_agent_topology_endpoints.py @@ -101,6 +101,21 @@ def test_topology_apply_docker_failure_is_500_and_records_error( raise RuntimeError("docker down") monkeypatch.setattr(_ops, "apply", _boom) + + # Stub docker.from_env for the /topology/state observed() call so + # the state endpoint doesn't need a real daemon. + class _StubDocker: + class networks: + @staticmethod + def list(): return [] + + class containers: + @staticmethod + def list(all=False): return [] + + import docker as _docker + monkeypatch.setattr(_docker, "from_env", lambda: _StubDocker) + client = TestClient(_agent_app.app) resp = client.post( "/topology/apply", @@ -109,6 +124,14 @@ def test_topology_apply_docker_failure_is_500_and_records_error( assert resp.status_code == 500 assert "docker down" in resp.json()["detail"] + # The error must be persisted so GET /topology/state surfaces it, + # and the stored hash stays empty so master's heartbeat check flags + # the topology for resync rather than assuming it's applied. + state = client.get("/topology/state").json() + assert state["topology_id"] == "top-err" + assert state["applied_version_hash"] == "" + assert state["last_error"] == "docker down" + def test_topology_teardown_routes_to_ops(monkeypatch: pytest.MonkeyPatch) -> None: called: dict = {} diff --git a/tests/swarm/test_agent_topology_store.py b/tests/swarm/test_agent_topology_store.py index 4d96eba1..a4e4c0e4 100644 --- a/tests/swarm/test_agent_topology_store.py +++ b/tests/swarm/test_agent_topology_store.py @@ -64,6 +64,34 @@ def test_record_error_then_put_clears(tmp_path: pathlib.Path) -> None: s.close() +def test_record_error_upserts_when_no_prior_row(tmp_path: pathlib.Path) -> None: + """Apply failure mid-materialise: put() hasn't written a row yet but + we still want the error surfaced on GET /topology/state and the + next heartbeat. The marker uses empty hash so master sees drift.""" + s = _store(tmp_path) + s.record_error("t-fail", "docker refused connection") + row = s.current() + assert row is not None + assert row.topology_id == "t-fail" + assert row.applied_version_hash == "" + assert row.applied_at == 0 + assert row.last_error == "docker refused connection" + s.close() + + +def test_record_error_then_successful_put_replaces_marker(tmp_path: pathlib.Path) -> None: + """Once a retry succeeds, the marker row must be replaced with a + real applied row — no stale error or empty hash left behind.""" + s = _store(tmp_path) + s.record_error("t-retry", "first try failed") + s.put("t-retry", "real-hash", {"topology": {"id": "t-retry"}}) + row = s.current() + assert row.applied_version_hash == "real-hash" + assert row.last_error is None + assert row.applied_at > 0 + s.close() + + def test_clear(tmp_path: pathlib.Path) -> None: s = _store(tmp_path) s.put("t1", "h", {})