feat(swarm): heartbeat-driven topology resync for agent-pinned deployments

Agent heartbeats now carry an applied-topology snapshot. The master
heartbeat handler compares the reported version_hash against what
canonical_hash yields for the hydrated topology pinned to that host
and flags Topology.needs_resync on divergence (or when the agent
reports no topology at all while master expects one).

The mutator watch loop gains reconcile_agent_resyncs, which re-pushes
the current hydrated blob via AgentClient.apply_topology without
touching status, then clears the flag on success. Push failures leave
the flag set so the next tick retries.
This commit is contained in:
2026-04-21 01:35:12 -04:00
parent 05d1ebbaaa
commit e8f9c955b3
9 changed files with 581 additions and 8 deletions

View File

@@ -52,14 +52,26 @@ def _resolve_agent_dir() -> pathlib.Path:
async def _tick(client: httpx.AsyncClient, url: str, host_uuid: str, agent_version: str) -> None:
snap = await _exec.status()
resp = await client.post(
url,
json={
"host_uuid": host_uuid,
"agent_version": agent_version,
"status": snap,
},
)
body: dict = {
"host_uuid": host_uuid,
"agent_version": agent_version,
"status": snap,
}
# Best-effort: fold in applied-topology snapshot. Failures must never
# wedge the heartbeat loop — master will fall back to "no topology
# reported" which triggers a resync if it expected one.
try:
from decnet.agent import topology_ops as _topo_ops
from decnet.agent.topology_store import TopologyStore
store = TopologyStore(_resolve_agent_dir() / "topology.db")
try:
body["topology"] = _topo_ops.state(store)
finally:
store.close()
except Exception:
log.debug("heartbeat: topology state unavailable", exc_info=True)
resp = await client.post(url, json=body)
# 403 / 404 are terminal-ish — we still keep looping because an
# operator may re-enrol the host mid-session, but we log loudly so
# prod ops can spot cert-pinning drift.

View File

@@ -355,6 +355,36 @@ async def _deploy_on_agent(repo, topology_id: str, hydrated: dict) -> None:
)
async def resync_agent_topology(repo, topology_id: str) -> None:
"""Re-push an ACTIVE agent-targeted topology without status churn.
Used by the mutator reconcile loop when an agent's reported
applied_version_hash drifts from what master expects. Unlike the
initial deploy, we do NOT flip status — the topology is already
ACTIVE; we just want the agent's cache + live state to match
master's current hydrated blob.
"""
from decnet.swarm.client import AgentClient
hydrated = await hydrate(repo, topology_id)
if hydrated is None:
raise ValueError(f"topology {topology_id!r} not found")
target_host_uuid = hydrated["topology"].get("target_host_uuid")
if not target_host_uuid:
raise ValueError(
f"topology {topology_id!r} has no target_host_uuid; "
"resync is agent-only"
)
host = await _resolve_swarm_host(repo, target_host_uuid)
version_hash = canonical_hash(hydrated)
async with AgentClient(host=host) as agent:
await agent.apply_topology(hydrated, version_hash)
log.info(
"topology %s resynced to agent %s (hash=%s)",
topology_id, host.get("name"), version_hash[:12],
)
async def _teardown_on_agent(repo, topology_id: str, hydrated: dict) -> None:
"""Route a topology teardown to the pinned agent."""
from decnet.swarm.client import AgentClient

View File

@@ -190,6 +190,40 @@ async def reconcile_topologies(repo: BaseRepository) -> int:
return drained
@_traced("mutator.reconcile_agent_resyncs")
async def reconcile_agent_resyncs(repo: BaseRepository) -> int:
"""Re-push agent-targeted topologies flagged by the heartbeat handler.
The heartbeat sets ``needs_resync=True`` when an agent's reported
applied_version_hash diverges from master's expectation. Here we
re-run the agent branch of ``deploy_topology`` which pushes the
current hydrated blob back down over mTLS and clears the flag on
success. Any push failure leaves the flag set so the next tick
retries — it also logs loudly so ops can see that a specific agent
is stuck.
"""
from decnet.engine import deployer as _deployer
try:
pending = await repo.list_topologies_needing_resync()
except NotImplementedError:
return 0
drained = 0
for topo in pending:
tid = topo["id"]
try:
await _deployer.resync_agent_topology(repo, tid)
await repo.set_topology_resync(tid, False)
drained += 1
log.info("topology %s resynced to agent %s",
tid, topo.get("target_host_uuid"))
except Exception as exc: # noqa: BLE001
log.warning(
"topology %s resync failed (will retry): %s", tid, exc,
)
return drained
@_traced("mutator.watch_loop")
async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) -> None:
"""Run an infinite loop checking for deckies that need mutation.
@@ -216,6 +250,12 @@ async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) ->
except NotImplementedError:
# Backend without MazeNET support — nothing to reconcile.
pass
try:
await reconcile_agent_resyncs(repo)
except NotImplementedError:
pass
except Exception:
log.exception("reconcile_agent_resyncs tick raised")
await asyncio.sleep(poll_interval_secs)
except KeyboardInterrupt:
log.info("mutator watch loop stopped")

View File

@@ -225,6 +225,11 @@ class Topology(SQLModel, table=True):
# the topology or any child row when an expected_version is supplied.
# Callers pass their last-seen version; mismatch raises VersionConflict.
version: int = Field(default=1, nullable=False)
# Set by the heartbeat handler when an agent's reported
# ``applied_version_hash`` diverges from what we expect it to be
# running. Drained by the mutator watch loop, which re-pushes via
# AgentClient and clears the flag. NULL for unihost topologies.
needs_resync: bool = Field(default=False, nullable=False)
class LAN(SQLModel, table=True):

View File

@@ -268,6 +268,12 @@ class BaseRepository(ABC):
async def delete_topology_cascade(self, topology_id: str) -> bool:
raise NotImplementedError
async def set_topology_resync(self, topology_id: str, value: bool) -> None:
raise NotImplementedError
async def list_topologies_needing_resync(self) -> list[dict[str, Any]]:
raise NotImplementedError
async def add_lan(self, data: dict[str, Any]) -> str:
raise NotImplementedError

View File

@@ -1013,6 +1013,30 @@ class SQLModelRepository(BaseRepository):
)
await session.commit()
async def set_topology_resync(self, topology_id: str, value: bool) -> None:
async with self._session() as session:
result = await session.execute(
select(Topology).where(Topology.id == topology_id)
)
topo = result.scalar_one_or_none()
if topo is None:
return
topo.needs_resync = bool(value)
session.add(topo)
await session.commit()
async def list_topologies_needing_resync(self) -> list[dict[str, Any]]:
async with self._session() as session:
result = await session.execute(
select(Topology).where(Topology.needs_resync == True) # noqa: E712
)
return [
self._deserialize_json_fields(
r.model_dump(mode="json"), ("config_snapshot",)
)
for r in result.scalars().all()
]
async def delete_topology_cascade(self, topology_id: str) -> bool:
"""Delete topology and all children. No portable ON DELETE CASCADE."""
async with self._session() as session:

View File

@@ -35,6 +35,7 @@ class HeartbeatRequest(BaseModel):
host_uuid: str
agent_version: Optional[str] = None
status: dict[str, Any]
topology: Optional[dict[str, Any]] = None
def _extract_peer_fingerprint(scope: dict[str, Any]) -> Optional[str]:
@@ -96,6 +97,67 @@ async def _verify_peer_matches_host(
return host
async def _reconcile_topology_report(
repo: BaseRepository,
host_uuid: str,
reported: Optional[dict[str, Any]],
) -> None:
"""Compare the agent's reported applied_version_hash against what
master expects for any topology pinned to *host_uuid*.
Sets ``needs_resync=True`` when:
- master has an ACTIVE topology targeted here but the agent reports
a different hash, OR
- master has an ACTIVE topology targeted here but the agent reports
no topology at all (fresh boot / wiped cache).
The actual re-push is handled by the mutator reconcile loop so the
heartbeat endpoint stays cheap.
"""
from decnet.topology.hashing import canonical_hash
from decnet.topology.persistence import hydrate
from decnet.topology.status import TopologyStatus
try:
topos = await repo.list_topologies(status=TopologyStatus.ACTIVE)
except Exception:
log.exception("heartbeat: could not list active topologies")
return
mine = [t for t in topos if t.get("target_host_uuid") == host_uuid]
if not mine:
return
reported_id = (reported or {}).get("topology_id")
reported_hash = (reported or {}).get("applied_version_hash")
for topo in mine:
tid = topo["id"]
if topo.get("needs_resync"):
continue
expected: Optional[str] = None
if reported_id == tid and reported_hash:
try:
hydrated = await hydrate(repo, tid)
except Exception:
log.exception("heartbeat: hydrate failed tid=%s", tid)
continue
if hydrated is None:
continue
expected = canonical_hash(hydrated)
if expected == reported_hash:
continue
# Either mismatch or agent reports no/other topology — flag it.
try:
await repo.set_topology_resync(tid, True)
log.info(
"heartbeat: flagged topology %s for resync (host=%s "
"reported_id=%s reported_hash=%s expected=%s)",
tid, host_uuid, reported_id, reported_hash, expected,
)
except Exception:
log.exception("heartbeat: failed to flag resync tid=%s", tid)
@router.post(
"/heartbeat",
status_code=204,
@@ -120,6 +182,8 @@ async def heartbeat(
{"status": "active", "last_heartbeat": now},
)
await _reconcile_topology_report(repo, req.host_uuid, req.topology)
status_body = req.status or {}
if not status_body.get("deployed"):
return