From e8f9c955b31ac6dc6c24925c4693b57cb9944989 Mon Sep 17 00:00:00 2001 From: anti Date: Tue, 21 Apr 2026 01:35:12 -0400 Subject: [PATCH] feat(swarm): heartbeat-driven topology resync for agent-pinned deployments Agent heartbeats now carry an applied-topology snapshot. The master heartbeat handler compares the reported version_hash against what canonical_hash yields for the hydrated topology pinned to that host and flags Topology.needs_resync on divergence (or when the agent reports no topology at all while master expects one). The mutator watch loop gains reconcile_agent_resyncs, which re-pushes the current hydrated blob via AgentClient.apply_topology without touching status, then clears the flag on success. Push failures leave the flag set so the next tick retries. --- decnet/agent/heartbeat.py | 28 ++- decnet/engine/deployer.py | 30 +++ decnet/mutator/engine.py | 40 ++++ decnet/web/db/models.py | 5 + decnet/web/db/repository.py | 6 + decnet/web/db/sqlmodel_repo.py | 24 ++ decnet/web/router/swarm/api_heartbeat.py | 64 +++++ tests/swarm/test_heartbeat_topology_resync.py | 224 ++++++++++++++++++ tests/topology/test_resync_reconcile.py | 168 +++++++++++++ 9 files changed, 581 insertions(+), 8 deletions(-) create mode 100644 tests/swarm/test_heartbeat_topology_resync.py create mode 100644 tests/topology/test_resync_reconcile.py diff --git a/decnet/agent/heartbeat.py b/decnet/agent/heartbeat.py index bbc00aad..f8d8eae2 100644 --- a/decnet/agent/heartbeat.py +++ b/decnet/agent/heartbeat.py @@ -52,14 +52,26 @@ def _resolve_agent_dir() -> pathlib.Path: async def _tick(client: httpx.AsyncClient, url: str, host_uuid: str, agent_version: str) -> None: snap = await _exec.status() - resp = await client.post( - url, - json={ - "host_uuid": host_uuid, - "agent_version": agent_version, - "status": snap, - }, - ) + body: dict = { + "host_uuid": host_uuid, + "agent_version": agent_version, + "status": snap, + } + # Best-effort: fold in applied-topology snapshot. Failures must never + # wedge the heartbeat loop — master will fall back to "no topology + # reported" which triggers a resync if it expected one. + try: + from decnet.agent import topology_ops as _topo_ops + from decnet.agent.topology_store import TopologyStore + store = TopologyStore(_resolve_agent_dir() / "topology.db") + try: + body["topology"] = _topo_ops.state(store) + finally: + store.close() + except Exception: + log.debug("heartbeat: topology state unavailable", exc_info=True) + + resp = await client.post(url, json=body) # 403 / 404 are terminal-ish — we still keep looping because an # operator may re-enrol the host mid-session, but we log loudly so # prod ops can spot cert-pinning drift. diff --git a/decnet/engine/deployer.py b/decnet/engine/deployer.py index e6a13e81..fd349d26 100644 --- a/decnet/engine/deployer.py +++ b/decnet/engine/deployer.py @@ -355,6 +355,36 @@ async def _deploy_on_agent(repo, topology_id: str, hydrated: dict) -> None: ) +async def resync_agent_topology(repo, topology_id: str) -> None: + """Re-push an ACTIVE agent-targeted topology without status churn. + + Used by the mutator reconcile loop when an agent's reported + applied_version_hash drifts from what master expects. Unlike the + initial deploy, we do NOT flip status — the topology is already + ACTIVE; we just want the agent's cache + live state to match + master's current hydrated blob. + """ + from decnet.swarm.client import AgentClient + + hydrated = await hydrate(repo, topology_id) + if hydrated is None: + raise ValueError(f"topology {topology_id!r} not found") + target_host_uuid = hydrated["topology"].get("target_host_uuid") + if not target_host_uuid: + raise ValueError( + f"topology {topology_id!r} has no target_host_uuid; " + "resync is agent-only" + ) + host = await _resolve_swarm_host(repo, target_host_uuid) + version_hash = canonical_hash(hydrated) + async with AgentClient(host=host) as agent: + await agent.apply_topology(hydrated, version_hash) + log.info( + "topology %s resynced to agent %s (hash=%s)", + topology_id, host.get("name"), version_hash[:12], + ) + + async def _teardown_on_agent(repo, topology_id: str, hydrated: dict) -> None: """Route a topology teardown to the pinned agent.""" from decnet.swarm.client import AgentClient diff --git a/decnet/mutator/engine.py b/decnet/mutator/engine.py index cc636c19..c9364b0d 100644 --- a/decnet/mutator/engine.py +++ b/decnet/mutator/engine.py @@ -190,6 +190,40 @@ async def reconcile_topologies(repo: BaseRepository) -> int: return drained +@_traced("mutator.reconcile_agent_resyncs") +async def reconcile_agent_resyncs(repo: BaseRepository) -> int: + """Re-push agent-targeted topologies flagged by the heartbeat handler. + + The heartbeat sets ``needs_resync=True`` when an agent's reported + applied_version_hash diverges from master's expectation. Here we + re-run the agent branch of ``deploy_topology`` which pushes the + current hydrated blob back down over mTLS and clears the flag on + success. Any push failure leaves the flag set so the next tick + retries — it also logs loudly so ops can see that a specific agent + is stuck. + """ + from decnet.engine import deployer as _deployer + + try: + pending = await repo.list_topologies_needing_resync() + except NotImplementedError: + return 0 + drained = 0 + for topo in pending: + tid = topo["id"] + try: + await _deployer.resync_agent_topology(repo, tid) + await repo.set_topology_resync(tid, False) + drained += 1 + log.info("topology %s resynced to agent %s", + tid, topo.get("target_host_uuid")) + except Exception as exc: # noqa: BLE001 + log.warning( + "topology %s resync failed (will retry): %s", tid, exc, + ) + return drained + + @_traced("mutator.watch_loop") async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) -> None: """Run an infinite loop checking for deckies that need mutation. @@ -216,6 +250,12 @@ async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) -> except NotImplementedError: # Backend without MazeNET support — nothing to reconcile. pass + try: + await reconcile_agent_resyncs(repo) + except NotImplementedError: + pass + except Exception: + log.exception("reconcile_agent_resyncs tick raised") await asyncio.sleep(poll_interval_secs) except KeyboardInterrupt: log.info("mutator watch loop stopped") diff --git a/decnet/web/db/models.py b/decnet/web/db/models.py index 0840f019..f5fb2e96 100644 --- a/decnet/web/db/models.py +++ b/decnet/web/db/models.py @@ -225,6 +225,11 @@ class Topology(SQLModel, table=True): # the topology or any child row when an expected_version is supplied. # Callers pass their last-seen version; mismatch raises VersionConflict. version: int = Field(default=1, nullable=False) + # Set by the heartbeat handler when an agent's reported + # ``applied_version_hash`` diverges from what we expect it to be + # running. Drained by the mutator watch loop, which re-pushes via + # AgentClient and clears the flag. NULL for unihost topologies. + needs_resync: bool = Field(default=False, nullable=False) class LAN(SQLModel, table=True): diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index acdcc638..4b52bda1 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -268,6 +268,12 @@ class BaseRepository(ABC): async def delete_topology_cascade(self, topology_id: str) -> bool: raise NotImplementedError + async def set_topology_resync(self, topology_id: str, value: bool) -> None: + raise NotImplementedError + + async def list_topologies_needing_resync(self) -> list[dict[str, Any]]: + raise NotImplementedError + async def add_lan(self, data: dict[str, Any]) -> str: raise NotImplementedError diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index b9c0ed76..ad88556f 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -1013,6 +1013,30 @@ class SQLModelRepository(BaseRepository): ) await session.commit() + async def set_topology_resync(self, topology_id: str, value: bool) -> None: + async with self._session() as session: + result = await session.execute( + select(Topology).where(Topology.id == topology_id) + ) + topo = result.scalar_one_or_none() + if topo is None: + return + topo.needs_resync = bool(value) + session.add(topo) + await session.commit() + + async def list_topologies_needing_resync(self) -> list[dict[str, Any]]: + async with self._session() as session: + result = await session.execute( + select(Topology).where(Topology.needs_resync == True) # noqa: E712 + ) + return [ + self._deserialize_json_fields( + r.model_dump(mode="json"), ("config_snapshot",) + ) + for r in result.scalars().all() + ] + async def delete_topology_cascade(self, topology_id: str) -> bool: """Delete topology and all children. No portable ON DELETE CASCADE.""" async with self._session() as session: diff --git a/decnet/web/router/swarm/api_heartbeat.py b/decnet/web/router/swarm/api_heartbeat.py index 52487ca6..0a8b3607 100644 --- a/decnet/web/router/swarm/api_heartbeat.py +++ b/decnet/web/router/swarm/api_heartbeat.py @@ -35,6 +35,7 @@ class HeartbeatRequest(BaseModel): host_uuid: str agent_version: Optional[str] = None status: dict[str, Any] + topology: Optional[dict[str, Any]] = None def _extract_peer_fingerprint(scope: dict[str, Any]) -> Optional[str]: @@ -96,6 +97,67 @@ async def _verify_peer_matches_host( return host +async def _reconcile_topology_report( + repo: BaseRepository, + host_uuid: str, + reported: Optional[dict[str, Any]], +) -> None: + """Compare the agent's reported applied_version_hash against what + master expects for any topology pinned to *host_uuid*. + + Sets ``needs_resync=True`` when: + - master has an ACTIVE topology targeted here but the agent reports + a different hash, OR + - master has an ACTIVE topology targeted here but the agent reports + no topology at all (fresh boot / wiped cache). + + The actual re-push is handled by the mutator reconcile loop so the + heartbeat endpoint stays cheap. + """ + from decnet.topology.hashing import canonical_hash + from decnet.topology.persistence import hydrate + from decnet.topology.status import TopologyStatus + + try: + topos = await repo.list_topologies(status=TopologyStatus.ACTIVE) + except Exception: + log.exception("heartbeat: could not list active topologies") + return + mine = [t for t in topos if t.get("target_host_uuid") == host_uuid] + if not mine: + return + + reported_id = (reported or {}).get("topology_id") + reported_hash = (reported or {}).get("applied_version_hash") + + for topo in mine: + tid = topo["id"] + if topo.get("needs_resync"): + continue + expected: Optional[str] = None + if reported_id == tid and reported_hash: + try: + hydrated = await hydrate(repo, tid) + except Exception: + log.exception("heartbeat: hydrate failed tid=%s", tid) + continue + if hydrated is None: + continue + expected = canonical_hash(hydrated) + if expected == reported_hash: + continue + # Either mismatch or agent reports no/other topology — flag it. + try: + await repo.set_topology_resync(tid, True) + log.info( + "heartbeat: flagged topology %s for resync (host=%s " + "reported_id=%s reported_hash=%s expected=%s)", + tid, host_uuid, reported_id, reported_hash, expected, + ) + except Exception: + log.exception("heartbeat: failed to flag resync tid=%s", tid) + + @router.post( "/heartbeat", status_code=204, @@ -120,6 +182,8 @@ async def heartbeat( {"status": "active", "last_heartbeat": now}, ) + await _reconcile_topology_report(repo, req.host_uuid, req.topology) + status_body = req.status or {} if not status_body.get("deployed"): return diff --git a/tests/swarm/test_heartbeat_topology_resync.py b/tests/swarm/test_heartbeat_topology_resync.py new file mode 100644 index 00000000..fbe8114d --- /dev/null +++ b/tests/swarm/test_heartbeat_topology_resync.py @@ -0,0 +1,224 @@ +"""Heartbeat-driven topology resync: master flags divergent agents. + +When an agent reports an applied_version_hash that differs from what +master computed for the topology pinned to that host (or reports no +topology at all while master expects one), the heartbeat handler must +set ``needs_resync=True`` on the topology row. The mutator reconcile +loop picks it up later — tested separately. +""" +from __future__ import annotations + +import pathlib +from typing import Any + +import pytest +from fastapi.testclient import TestClient + +from decnet.topology.config import TopologyConfig +from decnet.topology.generator import generate +from decnet.topology.hashing import canonical_hash +from decnet.topology.persistence import hydrate, persist, transition_status +from decnet.topology.status import TopologyStatus +from decnet.web.db.factory import get_repository +from decnet.web.dependencies import get_repo +from decnet.web.router.swarm import api_heartbeat as hb_mod + + +@pytest.fixture +def ca_dir(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: + ca = tmp_path / "ca" + from decnet.swarm import pki + from decnet.swarm import client as swarm_client + from decnet.web.router.swarm import api_enroll_host as enroll_mod + + monkeypatch.setattr(pki, "DEFAULT_CA_DIR", ca) + monkeypatch.setattr(swarm_client, "pki", pki) + monkeypatch.setattr(enroll_mod, "pki", pki) + return ca + + +@pytest.fixture +def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch): + r = get_repository(db_path=str(tmp_path / "hb-resync.db")) + import decnet.web.dependencies as deps + import decnet.web.swarm_api as swarm_api_mod + + monkeypatch.setattr(deps, "repo", r) + monkeypatch.setattr(swarm_api_mod, "repo", r) + return r + + +@pytest.fixture +def client(repo, ca_dir): + from decnet.web.swarm_api import app + + async def _override() -> Any: + return repo + + app.dependency_overrides[get_repo] = _override + with TestClient(app) as c: + yield c + app.dependency_overrides.clear() + + +def _enroll(c: TestClient, name: str) -> dict: + r = c.post("/swarm/enroll", json={"name": name, "address": "10.0.0.5", "agent_port": 8765}) + assert r.status_code == 201, r.text + return r.json() + + +def _cfg(**kw) -> TopologyConfig: + base = dict( + name="hb-resync", + mode="agent", + depth=1, + branching_factor=1, + deckies_per_lan_min=1, + deckies_per_lan_max=1, + cross_edge_probability=0.0, + randomize_services=False, + services_explicit=["ssh"], + seed=3, + ) + base.update(kw) + return TopologyConfig(**base) + + +async def _persist_active(repo, host_uuid: str) -> tuple[str, str]: + plan = generate(_cfg()) + tid = await persist(repo, plan, target_host_uuid=host_uuid) + await transition_status(repo, tid, TopologyStatus.DEPLOYING) + await transition_status(repo, tid, TopologyStatus.ACTIVE) + hydrated = await hydrate(repo, tid) + return tid, canonical_hash(hydrated) + + +@pytest.mark.anyio +async def test_heartbeat_matching_hash_does_not_flag( + client: TestClient, repo, monkeypatch: pytest.MonkeyPatch +) -> None: + host = _enroll(client, "worker-match") + monkeypatch.setattr(hb_mod, "_extract_peer_fingerprint", lambda s: host["fingerprint"]) + tid, expected = await _persist_active(repo, host["host_uuid"]) + + resp = client.post( + "/swarm/heartbeat", + json={ + "host_uuid": host["host_uuid"], + "status": {"deployed": False}, + "topology": { + "topology_id": tid, + "applied_version_hash": expected, + "observed": {"bridges": [], "containers": []}, + }, + }, + ) + assert resp.status_code == 204, resp.text + row = await repo.get_topology(tid) + assert row["needs_resync"] is False + + +@pytest.mark.anyio +async def test_heartbeat_hash_mismatch_flags_resync( + client: TestClient, repo, monkeypatch: pytest.MonkeyPatch +) -> None: + host = _enroll(client, "worker-drift") + monkeypatch.setattr(hb_mod, "_extract_peer_fingerprint", lambda s: host["fingerprint"]) + tid, _ = await _persist_active(repo, host["host_uuid"]) + + resp = client.post( + "/swarm/heartbeat", + json={ + "host_uuid": host["host_uuid"], + "status": {"deployed": False}, + "topology": { + "topology_id": tid, + "applied_version_hash": "stale-hash-" + "0" * 40, + "observed": {"bridges": [], "containers": []}, + }, + }, + ) + assert resp.status_code == 204, resp.text + row = await repo.get_topology(tid) + assert row["needs_resync"] is True + + +@pytest.mark.anyio +async def test_heartbeat_agent_reports_no_topology_flags_resync( + client: TestClient, repo, monkeypatch: pytest.MonkeyPatch +) -> None: + """Fresh-boot / wiped-cache case: agent says `null` but master expects + an ACTIVE topology pinned here → flag for re-push.""" + host = _enroll(client, "worker-fresh") + monkeypatch.setattr(hb_mod, "_extract_peer_fingerprint", lambda s: host["fingerprint"]) + tid, _ = await _persist_active(repo, host["host_uuid"]) + + resp = client.post( + "/swarm/heartbeat", + json={ + "host_uuid": host["host_uuid"], + "status": {"deployed": False}, + "topology": { + "topology_id": None, + "applied_version_hash": None, + "observed": {"bridges": [], "containers": []}, + }, + }, + ) + assert resp.status_code == 204, resp.text + row = await repo.get_topology(tid) + assert row["needs_resync"] is True + + +@pytest.mark.anyio +async def test_heartbeat_without_topology_block_is_noop_for_resync( + client: TestClient, repo, monkeypatch: pytest.MonkeyPatch +) -> None: + """Legacy agents that don't send a topology block are still valid; + they just don't contribute to resync detection. But we still should + treat the absence as 'no topology reported' for a pinned ACTIVE + topology → flag.""" + host = _enroll(client, "worker-legacy") + monkeypatch.setattr(hb_mod, "_extract_peer_fingerprint", lambda s: host["fingerprint"]) + tid, _ = await _persist_active(repo, host["host_uuid"]) + + resp = client.post( + "/swarm/heartbeat", + json={"host_uuid": host["host_uuid"], "status": {"deployed": False}}, + ) + assert resp.status_code == 204, resp.text + row = await repo.get_topology(tid) + # Absence of the topology block means agent hasn't reported anything + # → treat like no topology reported → flag. + assert row["needs_resync"] is True + + +@pytest.mark.anyio +async def test_heartbeat_other_host_topology_unaffected( + client: TestClient, repo, monkeypatch: pytest.MonkeyPatch +) -> None: + """Reports from one host must not flip resync flags on another + host's topologies.""" + host_a = _enroll(client, "worker-a") + host_b = client.post( + "/swarm/enroll", + json={"name": "worker-b", "address": "10.0.0.6", "agent_port": 8765}, + ).json() + monkeypatch.setattr(hb_mod, "_extract_peer_fingerprint", lambda s: host_b["fingerprint"]) + tid_a, hash_a = await _persist_active(repo, host_a["host_uuid"]) + + resp = client.post( + "/swarm/heartbeat", + json={ + "host_uuid": host_b["host_uuid"], + "status": {"deployed": False}, + "topology": { + "topology_id": None, + "applied_version_hash": None, + "observed": {}, + }, + }, + ) + assert resp.status_code == 204, resp.text + row = await repo.get_topology(tid_a) + assert row["needs_resync"] is False diff --git a/tests/topology/test_resync_reconcile.py b/tests/topology/test_resync_reconcile.py new file mode 100644 index 00000000..a9ada8f6 --- /dev/null +++ b/tests/topology/test_resync_reconcile.py @@ -0,0 +1,168 @@ +"""Mutator reconcile loop + deployer.resync_agent_topology. + +Covers the last mile of Step 7: once the heartbeat handler flags a +topology as ``needs_resync``, the mutator's ``reconcile_agent_resyncs`` +pass must pick it up, re-push via AgentClient, and clear the flag. +Failures must leave the flag set so the next tick retries. +""" +from __future__ import annotations + +from typing import Any + +import pytest + +from decnet.engine import deployer as _deployer +from decnet.mutator import engine as _mut_engine +from decnet.topology.config import TopologyConfig +from decnet.topology.generator import generate +from decnet.topology.hashing import canonical_hash +from decnet.topology.persistence import hydrate, persist, transition_status +from decnet.topology.status import TopologyStatus +from decnet.web.db.factory import get_repository + + +def _cfg(**kw) -> TopologyConfig: + base = dict( + name="resync", + mode="agent", + depth=1, + branching_factor=1, + deckies_per_lan_min=1, + deckies_per_lan_max=1, + cross_edge_probability=0.0, + randomize_services=False, + services_explicit=["ssh"], + seed=9, + ) + base.update(kw) + return TopologyConfig(**base) + + +@pytest.fixture +async def repo(tmp_path): + r = get_repository(db_path=str(tmp_path / "resync.db")) + await r.initialize() + return r + + +async def _seed_host(repo, uuid_: str) -> None: + await repo.add_swarm_host( + { + "uuid": uuid_, + "name": f"host-{uuid_}", + "address": "10.9.9.9", + "agent_port": 8765, + "status": "active", + "client_cert_fingerprint": "a" * 64, + "cert_bundle_path": "/tmp/ignored", + } + ) + + +class _FakeAgentClient: + instances: list["_FakeAgentClient"] = [] + + def __init__(self, *, host: dict[str, Any]) -> None: + self.host = host + self.calls: list[tuple[str, tuple]] = [] + _FakeAgentClient.instances.append(self) + + async def __aenter__(self) -> "_FakeAgentClient": + return self + + async def __aexit__(self, *_exc) -> None: + return None + + async def apply_topology(self, hydrated, version_hash): + self.calls.append(("apply", (hydrated, version_hash))) + return {"status": "applied", "version_hash": version_hash} + + +@pytest.fixture +def fake_agent(monkeypatch: pytest.MonkeyPatch): + _FakeAgentClient.instances.clear() + import decnet.swarm.client as _swarm_client + monkeypatch.setattr(_swarm_client, "AgentClient", _FakeAgentClient) + return _FakeAgentClient + + +async def _active_topology(repo, host_uuid: str) -> tuple[str, str]: + plan = generate(_cfg()) + tid = await persist(repo, plan, target_host_uuid=host_uuid) + await transition_status(repo, tid, TopologyStatus.DEPLOYING) + await transition_status(repo, tid, TopologyStatus.ACTIVE) + hydrated = await hydrate(repo, tid) + return tid, canonical_hash(hydrated) + + +@pytest.mark.anyio +async def test_resync_agent_topology_pushes_current_hash(repo, fake_agent) -> None: + await _seed_host(repo, "h-sync") + tid, expected = await _active_topology(repo, "h-sync") + + await _deployer.resync_agent_topology(repo, tid) + + assert len(fake_agent.instances) == 1 + inst = fake_agent.instances[0] + assert inst.calls[0][0] == "apply" + _, (hydrated, version_hash) = inst.calls[0] + assert version_hash == expected + assert hydrated["topology"]["id"] == tid + + row = await repo.get_topology(tid) + assert row["status"] == TopologyStatus.ACTIVE # unchanged + + +@pytest.mark.anyio +async def test_resync_rejects_master_local_topology(repo) -> None: + plan = generate(_cfg(mode="unihost")) + tid = await persist(repo, plan, target_host_uuid=None) + await transition_status(repo, tid, TopologyStatus.DEPLOYING) + await transition_status(repo, tid, TopologyStatus.ACTIVE) + + with pytest.raises(ValueError, match="no target_host_uuid"): + await _deployer.resync_agent_topology(repo, tid) + + +@pytest.mark.anyio +async def test_reconcile_agent_resyncs_drains_flag(repo, fake_agent) -> None: + await _seed_host(repo, "h-drain") + tid, _ = await _active_topology(repo, "h-drain") + await repo.set_topology_resync(tid, True) + + drained = await _mut_engine.reconcile_agent_resyncs(repo) + assert drained == 1 + row = await repo.get_topology(tid) + assert row["needs_resync"] is False + assert len(fake_agent.instances) == 1 + + +@pytest.mark.anyio +async def test_reconcile_retains_flag_on_push_failure(repo, monkeypatch) -> None: + await _seed_host(repo, "h-boom") + tid, _ = await _active_topology(repo, "h-boom") + await repo.set_topology_resync(tid, True) + + class _Boom: + def __init__(self, *, host): ... + async def __aenter__(self): return self + async def __aexit__(self, *_): return None + async def apply_topology(self, *_a, **_k): + raise RuntimeError("agent unreachable") + + import decnet.swarm.client as _swarm_client + monkeypatch.setattr(_swarm_client, "AgentClient", _Boom) + + drained = await _mut_engine.reconcile_agent_resyncs(repo) + assert drained == 0 + row = await repo.get_topology(tid) + assert row["needs_resync"] is True # still flagged — next tick retries + + +@pytest.mark.anyio +async def test_reconcile_noop_when_nothing_flagged(repo, fake_agent) -> None: + await _seed_host(repo, "h-idle") + await _active_topology(repo, "h-idle") + drained = await _mut_engine.reconcile_agent_resyncs(repo) + assert drained == 0 + assert fake_agent.instances == []