Agent heartbeats now carry an applied-topology snapshot. The master heartbeat handler compares the reported version_hash against what canonical_hash yields for the hydrated topology pinned to that host and flags Topology.needs_resync on divergence (or when the agent reports no topology at all while master expects one). The mutator watch loop gains reconcile_agent_resyncs, which re-pushes the current hydrated blob via AgentClient.apply_topology without touching status, then clears the flag on success. Push failures leave the flag set so the next tick retries.
169 lines
5.4 KiB
Python
169 lines
5.4 KiB
Python
"""Mutator reconcile loop + deployer.resync_agent_topology.
|
|
|
|
Covers the last mile of Step 7: once the heartbeat handler flags a
|
|
topology as ``needs_resync``, the mutator's ``reconcile_agent_resyncs``
|
|
pass must pick it up, re-push via AgentClient, and clear the flag.
|
|
Failures must leave the flag set so the next tick retries.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
import pytest
|
|
|
|
from decnet.engine import deployer as _deployer
|
|
from decnet.mutator import engine as _mut_engine
|
|
from decnet.topology.config import TopologyConfig
|
|
from decnet.topology.generator import generate
|
|
from decnet.topology.hashing import canonical_hash
|
|
from decnet.topology.persistence import hydrate, persist, transition_status
|
|
from decnet.topology.status import TopologyStatus
|
|
from decnet.web.db.factory import get_repository
|
|
|
|
|
|
def _cfg(**kw) -> TopologyConfig:
|
|
base = dict(
|
|
name="resync",
|
|
mode="agent",
|
|
depth=1,
|
|
branching_factor=1,
|
|
deckies_per_lan_min=1,
|
|
deckies_per_lan_max=1,
|
|
cross_edge_probability=0.0,
|
|
randomize_services=False,
|
|
services_explicit=["ssh"],
|
|
seed=9,
|
|
)
|
|
base.update(kw)
|
|
return TopologyConfig(**base)
|
|
|
|
|
|
@pytest.fixture
|
|
async def repo(tmp_path):
|
|
r = get_repository(db_path=str(tmp_path / "resync.db"))
|
|
await r.initialize()
|
|
return r
|
|
|
|
|
|
async def _seed_host(repo, uuid_: str) -> None:
|
|
await repo.add_swarm_host(
|
|
{
|
|
"uuid": uuid_,
|
|
"name": f"host-{uuid_}",
|
|
"address": "10.9.9.9",
|
|
"agent_port": 8765,
|
|
"status": "active",
|
|
"client_cert_fingerprint": "a" * 64,
|
|
"cert_bundle_path": "/tmp/ignored",
|
|
}
|
|
)
|
|
|
|
|
|
class _FakeAgentClient:
|
|
instances: list["_FakeAgentClient"] = []
|
|
|
|
def __init__(self, *, host: dict[str, Any]) -> None:
|
|
self.host = host
|
|
self.calls: list[tuple[str, tuple]] = []
|
|
_FakeAgentClient.instances.append(self)
|
|
|
|
async def __aenter__(self) -> "_FakeAgentClient":
|
|
return self
|
|
|
|
async def __aexit__(self, *_exc) -> None:
|
|
return None
|
|
|
|
async def apply_topology(self, hydrated, version_hash):
|
|
self.calls.append(("apply", (hydrated, version_hash)))
|
|
return {"status": "applied", "version_hash": version_hash}
|
|
|
|
|
|
@pytest.fixture
|
|
def fake_agent(monkeypatch: pytest.MonkeyPatch):
|
|
_FakeAgentClient.instances.clear()
|
|
import decnet.swarm.client as _swarm_client
|
|
monkeypatch.setattr(_swarm_client, "AgentClient", _FakeAgentClient)
|
|
return _FakeAgentClient
|
|
|
|
|
|
async def _active_topology(repo, host_uuid: str) -> tuple[str, str]:
|
|
plan = generate(_cfg())
|
|
tid = await persist(repo, plan, target_host_uuid=host_uuid)
|
|
await transition_status(repo, tid, TopologyStatus.DEPLOYING)
|
|
await transition_status(repo, tid, TopologyStatus.ACTIVE)
|
|
hydrated = await hydrate(repo, tid)
|
|
return tid, canonical_hash(hydrated)
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_resync_agent_topology_pushes_current_hash(repo, fake_agent) -> None:
|
|
await _seed_host(repo, "h-sync")
|
|
tid, expected = await _active_topology(repo, "h-sync")
|
|
|
|
await _deployer.resync_agent_topology(repo, tid)
|
|
|
|
assert len(fake_agent.instances) == 1
|
|
inst = fake_agent.instances[0]
|
|
assert inst.calls[0][0] == "apply"
|
|
_, (hydrated, version_hash) = inst.calls[0]
|
|
assert version_hash == expected
|
|
assert hydrated["topology"]["id"] == tid
|
|
|
|
row = await repo.get_topology(tid)
|
|
assert row["status"] == TopologyStatus.ACTIVE # unchanged
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_resync_rejects_master_local_topology(repo) -> None:
|
|
plan = generate(_cfg(mode="unihost"))
|
|
tid = await persist(repo, plan, target_host_uuid=None)
|
|
await transition_status(repo, tid, TopologyStatus.DEPLOYING)
|
|
await transition_status(repo, tid, TopologyStatus.ACTIVE)
|
|
|
|
with pytest.raises(ValueError, match="no target_host_uuid"):
|
|
await _deployer.resync_agent_topology(repo, tid)
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_reconcile_agent_resyncs_drains_flag(repo, fake_agent) -> None:
|
|
await _seed_host(repo, "h-drain")
|
|
tid, _ = await _active_topology(repo, "h-drain")
|
|
await repo.set_topology_resync(tid, True)
|
|
|
|
drained = await _mut_engine.reconcile_agent_resyncs(repo)
|
|
assert drained == 1
|
|
row = await repo.get_topology(tid)
|
|
assert row["needs_resync"] is False
|
|
assert len(fake_agent.instances) == 1
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_reconcile_retains_flag_on_push_failure(repo, monkeypatch) -> None:
|
|
await _seed_host(repo, "h-boom")
|
|
tid, _ = await _active_topology(repo, "h-boom")
|
|
await repo.set_topology_resync(tid, True)
|
|
|
|
class _Boom:
|
|
def __init__(self, *, host): ...
|
|
async def __aenter__(self): return self
|
|
async def __aexit__(self, *_): return None
|
|
async def apply_topology(self, *_a, **_k):
|
|
raise RuntimeError("agent unreachable")
|
|
|
|
import decnet.swarm.client as _swarm_client
|
|
monkeypatch.setattr(_swarm_client, "AgentClient", _Boom)
|
|
|
|
drained = await _mut_engine.reconcile_agent_resyncs(repo)
|
|
assert drained == 0
|
|
row = await repo.get_topology(tid)
|
|
assert row["needs_resync"] is True # still flagged — next tick retries
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_reconcile_noop_when_nothing_flagged(repo, fake_agent) -> None:
|
|
await _seed_host(repo, "h-idle")
|
|
await _active_topology(repo, "h-idle")
|
|
drained = await _mut_engine.reconcile_agent_resyncs(repo)
|
|
assert drained == 0
|
|
assert fake_agent.instances == []
|