Files
DECNET/tests/swarm/test_heartbeat_topology_resync.py
anti e8f9c955b3 feat(swarm): heartbeat-driven topology resync for agent-pinned deployments
Agent heartbeats now carry an applied-topology snapshot. The master
heartbeat handler compares the reported version_hash against what
canonical_hash yields for the hydrated topology pinned to that host
and flags Topology.needs_resync on divergence (or when the agent
reports no topology at all while master expects one).

The mutator watch loop gains reconcile_agent_resyncs, which re-pushes
the current hydrated blob via AgentClient.apply_topology without
touching status, then clears the flag on success. Push failures leave
the flag set so the next tick retries.
2026-04-21 01:35:12 -04:00

225 lines
7.5 KiB
Python

"""Heartbeat-driven topology resync: master flags divergent agents.
When an agent reports an applied_version_hash that differs from what
master computed for the topology pinned to that host (or reports no
topology at all while master expects one), the heartbeat handler must
set ``needs_resync=True`` on the topology row. The mutator reconcile
loop picks it up later — tested separately.
"""
from __future__ import annotations
import pathlib
from typing import Any
import pytest
from fastapi.testclient import TestClient
from decnet.topology.config import TopologyConfig
from decnet.topology.generator import generate
from decnet.topology.hashing import canonical_hash
from decnet.topology.persistence import hydrate, persist, transition_status
from decnet.topology.status import TopologyStatus
from decnet.web.db.factory import get_repository
from decnet.web.dependencies import get_repo
from decnet.web.router.swarm import api_heartbeat as hb_mod
@pytest.fixture
def ca_dir(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path:
ca = tmp_path / "ca"
from decnet.swarm import pki
from decnet.swarm import client as swarm_client
from decnet.web.router.swarm import api_enroll_host as enroll_mod
monkeypatch.setattr(pki, "DEFAULT_CA_DIR", ca)
monkeypatch.setattr(swarm_client, "pki", pki)
monkeypatch.setattr(enroll_mod, "pki", pki)
return ca
@pytest.fixture
def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch):
r = get_repository(db_path=str(tmp_path / "hb-resync.db"))
import decnet.web.dependencies as deps
import decnet.web.swarm_api as swarm_api_mod
monkeypatch.setattr(deps, "repo", r)
monkeypatch.setattr(swarm_api_mod, "repo", r)
return r
@pytest.fixture
def client(repo, ca_dir):
from decnet.web.swarm_api import app
async def _override() -> Any:
return repo
app.dependency_overrides[get_repo] = _override
with TestClient(app) as c:
yield c
app.dependency_overrides.clear()
def _enroll(c: TestClient, name: str) -> dict:
r = c.post("/swarm/enroll", json={"name": name, "address": "10.0.0.5", "agent_port": 8765})
assert r.status_code == 201, r.text
return r.json()
def _cfg(**kw) -> TopologyConfig:
base = dict(
name="hb-resync",
mode="agent",
depth=1,
branching_factor=1,
deckies_per_lan_min=1,
deckies_per_lan_max=1,
cross_edge_probability=0.0,
randomize_services=False,
services_explicit=["ssh"],
seed=3,
)
base.update(kw)
return TopologyConfig(**base)
async def _persist_active(repo, host_uuid: str) -> tuple[str, str]:
plan = generate(_cfg())
tid = await persist(repo, plan, target_host_uuid=host_uuid)
await transition_status(repo, tid, TopologyStatus.DEPLOYING)
await transition_status(repo, tid, TopologyStatus.ACTIVE)
hydrated = await hydrate(repo, tid)
return tid, canonical_hash(hydrated)
@pytest.mark.anyio
async def test_heartbeat_matching_hash_does_not_flag(
client: TestClient, repo, monkeypatch: pytest.MonkeyPatch
) -> None:
host = _enroll(client, "worker-match")
monkeypatch.setattr(hb_mod, "_extract_peer_fingerprint", lambda s: host["fingerprint"])
tid, expected = await _persist_active(repo, host["host_uuid"])
resp = client.post(
"/swarm/heartbeat",
json={
"host_uuid": host["host_uuid"],
"status": {"deployed": False},
"topology": {
"topology_id": tid,
"applied_version_hash": expected,
"observed": {"bridges": [], "containers": []},
},
},
)
assert resp.status_code == 204, resp.text
row = await repo.get_topology(tid)
assert row["needs_resync"] is False
@pytest.mark.anyio
async def test_heartbeat_hash_mismatch_flags_resync(
client: TestClient, repo, monkeypatch: pytest.MonkeyPatch
) -> None:
host = _enroll(client, "worker-drift")
monkeypatch.setattr(hb_mod, "_extract_peer_fingerprint", lambda s: host["fingerprint"])
tid, _ = await _persist_active(repo, host["host_uuid"])
resp = client.post(
"/swarm/heartbeat",
json={
"host_uuid": host["host_uuid"],
"status": {"deployed": False},
"topology": {
"topology_id": tid,
"applied_version_hash": "stale-hash-" + "0" * 40,
"observed": {"bridges": [], "containers": []},
},
},
)
assert resp.status_code == 204, resp.text
row = await repo.get_topology(tid)
assert row["needs_resync"] is True
@pytest.mark.anyio
async def test_heartbeat_agent_reports_no_topology_flags_resync(
client: TestClient, repo, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Fresh-boot / wiped-cache case: agent says `null` but master expects
an ACTIVE topology pinned here → flag for re-push."""
host = _enroll(client, "worker-fresh")
monkeypatch.setattr(hb_mod, "_extract_peer_fingerprint", lambda s: host["fingerprint"])
tid, _ = await _persist_active(repo, host["host_uuid"])
resp = client.post(
"/swarm/heartbeat",
json={
"host_uuid": host["host_uuid"],
"status": {"deployed": False},
"topology": {
"topology_id": None,
"applied_version_hash": None,
"observed": {"bridges": [], "containers": []},
},
},
)
assert resp.status_code == 204, resp.text
row = await repo.get_topology(tid)
assert row["needs_resync"] is True
@pytest.mark.anyio
async def test_heartbeat_without_topology_block_is_noop_for_resync(
client: TestClient, repo, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Legacy agents that don't send a topology block are still valid;
they just don't contribute to resync detection. But we still should
treat the absence as 'no topology reported' for a pinned ACTIVE
topology → flag."""
host = _enroll(client, "worker-legacy")
monkeypatch.setattr(hb_mod, "_extract_peer_fingerprint", lambda s: host["fingerprint"])
tid, _ = await _persist_active(repo, host["host_uuid"])
resp = client.post(
"/swarm/heartbeat",
json={"host_uuid": host["host_uuid"], "status": {"deployed": False}},
)
assert resp.status_code == 204, resp.text
row = await repo.get_topology(tid)
# Absence of the topology block means agent hasn't reported anything
# → treat like no topology reported → flag.
assert row["needs_resync"] is True
@pytest.mark.anyio
async def test_heartbeat_other_host_topology_unaffected(
client: TestClient, repo, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Reports from one host must not flip resync flags on another
host's topologies."""
host_a = _enroll(client, "worker-a")
host_b = client.post(
"/swarm/enroll",
json={"name": "worker-b", "address": "10.0.0.6", "agent_port": 8765},
).json()
monkeypatch.setattr(hb_mod, "_extract_peer_fingerprint", lambda s: host_b["fingerprint"])
tid_a, hash_a = await _persist_active(repo, host_a["host_uuid"])
resp = client.post(
"/swarm/heartbeat",
json={
"host_uuid": host_b["host_uuid"],
"status": {"deployed": False},
"topology": {
"topology_id": None,
"applied_version_hash": None,
"observed": {},
},
},
)
assert resp.status_code == 204, resp.text
row = await repo.get_topology(tid_a)
assert row["needs_resync"] is False