From 05d1ebbaaa4de9511f42deb3baa4edecaa87a48d Mon Sep 17 00:00:00 2001 From: anti Date: Tue, 21 Apr 2026 01:27:59 -0400 Subject: [PATCH] feat(engine): route agent-pinned topologies via AgentClient MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit deploy_topology and teardown_topology now branch on target_host_uuid. When set: - Hydrate the topology locally (validator runs exactly as before). - Compute canonical_hash; push {hydrated, version_hash} to the pinned agent through AgentClient.apply_topology. - Status machine still moves PENDING -> DEPLOYING -> ACTIVE on 2xx, PENDING -> DEPLOYING -> FAILED on error; master remains the sole owner of the row. Teardown flips to TEARING_DOWN, fires /topology/teardown, then TORN_DOWN — we log a warning on agent error but still settle to TORN_DOWN so operators can delete the row (agent garbage is cleaned on the next re-enroll). Unihost deploys are unchanged — the field defaults to NULL so every existing flow takes the local path. Step 6 of the agent <-> topology integration. --- decnet/engine/deployer.py | 75 +++++++++ tests/topology/test_deploy_agent_branch.py | 168 +++++++++++++++++++++ 2 files changed, 243 insertions(+) create mode 100644 tests/topology/test_deploy_agent_branch.py diff --git a/decnet/engine/deployer.py b/decnet/engine/deployer.py index 8db85014..e6a13e81 100644 --- a/decnet/engine/deployer.py +++ b/decnet/engine/deployer.py @@ -11,6 +11,8 @@ import docker from rich.console import Console from rich.table import Table +from decnet.topology.hashing import canonical_hash + from decnet.logging import get_logger from decnet.telemetry import traced as _traced from decnet.config import DecnetConfig, clear_state, load_state, save_state @@ -311,6 +313,69 @@ def _topology_compose_path(topology_id: str) -> Path: return Path(f"decnet-topology-{topology_id[:8]}-compose.yml") +async def _resolve_swarm_host(repo, host_uuid: str) -> dict: + host = await repo.get_swarm_host_by_uuid(host_uuid) + if host is None: + raise ValueError( + f"topology pinned to unknown swarm host {host_uuid!r}" + ) + return host + + +async def _deploy_on_agent(repo, topology_id: str, hydrated: dict) -> None: + """Route a topology apply to the agent pinned by ``target_host_uuid``. + + Local imports avoid a circular dependency: decnet.swarm.client already + pulls decnet.engine indirectly via decnet.config. + """ + from decnet.swarm.client import AgentClient + + target_host_uuid = hydrated["topology"]["target_host_uuid"] + host = await _resolve_swarm_host(repo, target_host_uuid) + version_hash = canonical_hash(hydrated) + + await transition_status(repo, topology_id, TopologyStatus.DEPLOYING) + try: + async with AgentClient(host=host) as agent: + await agent.apply_topology(hydrated, version_hash) + except Exception as exc: + log.error( + "topology %s agent-apply failed on %s: %s", + topology_id, host.get("name"), exc, + ) + await transition_status( + repo, topology_id, TopologyStatus.FAILED, reason=str(exc) + ) + raise + + await transition_status(repo, topology_id, TopologyStatus.ACTIVE) + log.info( + "topology %s deployed on agent %s (hash=%s)", + topology_id, host.get("name"), version_hash[:12], + ) + + +async def _teardown_on_agent(repo, topology_id: str, hydrated: dict) -> None: + """Route a topology teardown to the pinned agent.""" + from decnet.swarm.client import AgentClient + + target_host_uuid = hydrated["topology"]["target_host_uuid"] + host = await _resolve_swarm_host(repo, target_host_uuid) + + await transition_status(repo, topology_id, TopologyStatus.TEARING_DOWN) + try: + async with AgentClient(host=host) as agent: + await agent.teardown_topology(topology_id) + except Exception as exc: + log.warning( + "topology %s agent-teardown failed on %s (continuing): %s", + topology_id, host.get("name"), exc, + ) + + await transition_status(repo, topology_id, TopologyStatus.TORN_DOWN) + log.info("topology %s torn down on agent %s", topology_id, host.get("name")) + + def _warn_if_userland_proxy_enabled(hydrated: dict) -> None: """Soft warning: docker-proxy masks attacker source IPs. @@ -378,6 +443,12 @@ async def deploy_topology(repo, topology_id: str, *, dry_run: bool = False) -> N _warn_if_userland_proxy_enabled(hydrated) + # Pinned to an agent? Hand off to the mTLS path. Everything below + # this line is the master-local deploy. + if hydrated["topology"].get("target_host_uuid"): + await _deploy_on_agent(repo, topology_id, hydrated) + return + await transition_status(repo, topology_id, TopologyStatus.DEPLOYING) client = docker.from_env() @@ -418,6 +489,10 @@ async def teardown_topology(repo, topology_id: str) -> None: if hydrated is None: raise ValueError(f"topology {topology_id!r} not found") + if hydrated["topology"].get("target_host_uuid"): + await _teardown_on_agent(repo, topology_id, hydrated) + return + await transition_status(repo, topology_id, TopologyStatus.TEARING_DOWN) client = docker.from_env() diff --git a/tests/topology/test_deploy_agent_branch.py b/tests/topology/test_deploy_agent_branch.py new file mode 100644 index 00000000..d5ce80ad --- /dev/null +++ b/tests/topology/test_deploy_agent_branch.py @@ -0,0 +1,168 @@ +"""Agent-branch routing inside deploy_topology / teardown_topology. + +Exercises the target_host_uuid branch added in Step 6. We never hit a +real agent — AgentClient is swapped out for a recording fake so we +assert the right hydrated blob + version hash are forwarded and the +master's status machine advances as expected. +""" +from __future__ import annotations + +from typing import Any + +import pytest + +from decnet.engine import deployer as _deployer +from decnet.topology.config import TopologyConfig +from decnet.topology.generator import generate +from decnet.topology.hashing import canonical_hash +from decnet.topology.persistence import persist +from decnet.topology.status import TopologyStatus +from decnet.web.db.factory import get_repository + + +def _cfg(**kw) -> TopologyConfig: + base = dict( + name="agent-branch", + mode="agent", + depth=1, + branching_factor=1, + deckies_per_lan_min=1, + deckies_per_lan_max=1, + cross_edge_probability=0.0, + randomize_services=False, + services_explicit=["ssh"], + seed=7, + ) + base.update(kw) + return TopologyConfig(**base) + + +@pytest.fixture +async def repo(tmp_path): + r = get_repository(db_path=str(tmp_path / "agent-branch.db")) + await r.initialize() + return r + + +async def _seed_host(repo, uuid_: str = "h-1") -> None: + await repo.add_swarm_host( + { + "uuid": uuid_, + "name": f"host-{uuid_}", + "address": "10.9.9.9", + "agent_port": 8765, + "status": "active", + "client_cert_fingerprint": "a" * 64, + "cert_bundle_path": "/tmp/ignored", + } + ) + + +class _FakeAgentClient: + """Records every call; never touches the network.""" + + instances: list["_FakeAgentClient"] = [] + + def __init__(self, *, host: dict[str, Any]) -> None: + self.host = host + self.calls: list[tuple[str, tuple, dict]] = [] + _FakeAgentClient.instances.append(self) + + async def __aenter__(self) -> "_FakeAgentClient": + return self + + async def __aexit__(self, *_exc) -> None: + return None + + async def apply_topology(self, hydrated, version_hash): + self.calls.append(("apply", (hydrated, version_hash), {})) + return {"status": "applied", "version_hash": version_hash} + + async def teardown_topology(self, topology_id): + self.calls.append(("teardown", (topology_id,), {})) + return {"status": "torn_down", "topology_id": topology_id} + + +@pytest.fixture +def fake_agent(monkeypatch: pytest.MonkeyPatch): + _FakeAgentClient.instances.clear() + # Patch the import site inside the local functions; they do + # `from decnet.swarm.client import AgentClient` at call time. + import decnet.swarm.client as _swarm_client + monkeypatch.setattr(_swarm_client, "AgentClient", _FakeAgentClient) + return _FakeAgentClient + + +@pytest.mark.anyio +async def test_deploy_on_agent_routes_via_agent_client(repo, fake_agent) -> None: + await _seed_host(repo, "h-deploy") + plan = generate(_cfg()) + tid = await persist(repo, plan, target_host_uuid="h-deploy") + + await _deployer.deploy_topology(repo, tid) + + # Exactly one AgentClient, one apply call. + assert len(fake_agent.instances) == 1 + inst = fake_agent.instances[0] + assert inst.host["uuid"] == "h-deploy" + assert len(inst.calls) == 1 + verb, (hydrated, version_hash), _ = inst.calls[0] + assert verb == "apply" + assert hydrated["topology"]["id"] == tid + assert version_hash == canonical_hash(hydrated) + + topo = await repo.get_topology(tid) + assert topo["status"] == TopologyStatus.ACTIVE + + +@pytest.mark.anyio +async def test_deploy_on_agent_failure_marks_failed(repo, monkeypatch) -> None: + await _seed_host(repo, "h-fail") + plan = generate(_cfg(name="agent-fail")) + tid = await persist(repo, plan, target_host_uuid="h-fail") + + class _BoomClient(_FakeAgentClient): + async def apply_topology(self, hydrated, version_hash): + raise RuntimeError("agent refused") + + import decnet.swarm.client as _swarm_client + monkeypatch.setattr(_swarm_client, "AgentClient", _BoomClient) + + with pytest.raises(RuntimeError, match="agent refused"): + await _deployer.deploy_topology(repo, tid) + + topo = await repo.get_topology(tid) + assert topo["status"] == TopologyStatus.FAILED + + +@pytest.mark.anyio +async def test_deploy_on_agent_unknown_host_raises(repo, fake_agent) -> None: + plan = generate(_cfg(name="agent-missing")) + tid = await persist(repo, plan, target_host_uuid="nope") + + with pytest.raises(ValueError, match="unknown swarm host"): + await _deployer.deploy_topology(repo, tid) + + # No AgentClient should ever be constructed for a nonexistent host. + assert fake_agent.instances == [] + + +@pytest.mark.anyio +async def test_teardown_on_agent_routes_via_agent_client(repo, fake_agent) -> None: + await _seed_host(repo, "h-teardown") + plan = generate(_cfg(name="agent-down")) + tid = await persist(repo, plan, target_host_uuid="h-teardown") + + # Seed into an ACTIVE state the teardown will accept. + from decnet.topology.persistence import transition_status + await transition_status(repo, tid, TopologyStatus.DEPLOYING) + await transition_status(repo, tid, TopologyStatus.ACTIVE) + + await _deployer.teardown_topology(repo, tid) + + inst = fake_agent.instances[-1] + assert inst.host["uuid"] == "h-teardown" + assert inst.calls == [("teardown", (tid,), {})] + + topo = await repo.get_topology(tid) + assert topo["status"] == TopologyStatus.TORN_DOWN