From 13cb0ff38e40008fbbb23b45685900e76e5e9b33 Mon Sep 17 00:00:00 2001 From: anti Date: Tue, 21 Apr 2026 01:25:15 -0400 Subject: [PATCH] feat(agent): topology apply/teardown/state endpoints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New mTLS-protected routes on the agent: - POST /topology/apply — master pushes {hydrated, version_hash}. Validates the hash matches locally (serialisation drift guard), runs the topology through the same validator/composer pipeline used master-side, then creates bridges + compose up + records the apply in topology.db. - POST /topology/teardown — dismantles compose, removes bridges, clears topology.db. Idempotent. - GET /topology/state — returns applied row + live docker observation for the heartbeat. Implementation lives in decnet/agent/topology_ops.py; it reuses the private compose helpers from decnet.engine.deployer so we don't duplicate compose/project-name plumbing. The apply path is sync under the hood (docker SDK + subprocess); we hop to a thread so the event loop keeps servicing other agent traffic. v1 is one-topology-per-agent; cross-topology apply returns 409. Step 4 of the agent <-> topology integration. --- decnet/agent/app.py | 98 +++++++++- decnet/agent/topology_ops.py | 184 +++++++++++++++++++ decnet/agent/topology_store.py | 6 +- tests/swarm/test_agent_topology_endpoints.py | 145 +++++++++++++++ 4 files changed, 431 insertions(+), 2 deletions(-) create mode 100644 decnet/agent/topology_ops.py create mode 100644 tests/swarm/test_agent_topology_endpoints.py diff --git a/decnet/agent/app.py b/decnet/agent/app.py index 16639f40..b37e22da 100644 --- a/decnet/agent/app.py +++ b/decnet/agent/app.py @@ -18,20 +18,48 @@ Endpoints mirror the existing unihost CLI verbs: """ from __future__ import annotations +import os +import pathlib from contextlib import asynccontextmanager -from typing import Optional +from typing import Any, Optional from fastapi import FastAPI, HTTPException from pydantic import BaseModel, Field from decnet.agent import executor as _exec from decnet.agent import heartbeat as _heartbeat +from decnet.agent import topology_ops as _topology_ops +from decnet.swarm.pki import DEFAULT_AGENT_DIR +from decnet.agent.topology_store import AlreadyApplied, TopologyStore from decnet.config import DecnetConfig from decnet.logging import get_logger +from decnet.topology.validate import ValidationError log = get_logger("agent.app") +def _resolve_agent_dir() -> pathlib.Path: + env = os.environ.get("DECNET_AGENT_DIR") + if env: + return pathlib.Path(env) + system = pathlib.Path("/etc/decnet/agent") + if system.exists(): + return system + return DEFAULT_AGENT_DIR + + +# Module-level singleton. Created lazily on first use so tests can +# monkeypatch DECNET_AGENT_DIR before the store binds to a path. +_topology_store: Optional[TopologyStore] = None + + +def _store() -> TopologyStore: + global _topology_store + if _topology_store is None: + _topology_store = TopologyStore(_resolve_agent_dir() / "topology.db") + return _topology_store + + @asynccontextmanager async def _lifespan(app: FastAPI): # Best-effort: if identity/bundle plumbing isn't configured (e.g. dev @@ -41,6 +69,10 @@ async def _lifespan(app: FastAPI): yield finally: await _heartbeat.stop() + global _topology_store + if _topology_store is not None: + _topology_store.close() + _topology_store = None app = FastAPI( @@ -129,6 +161,70 @@ async def self_destruct() -> dict: return {"status": "self_destruct_scheduled"} +# ------------------------------------------------------- topology endpoints + + +class ApplyTopologyRequest(BaseModel): + hydrated: dict[str, Any] = Field( + ..., description="Hydrated topology dict from master.persistence.hydrate()" + ) + version_hash: str = Field( + ..., description="Master's canonical_hash(hydrated); must match ours" + ) + + +class TeardownTopologyRequest(BaseModel): + topology_id: str = Field(..., description="Topology UUID to dismantle") + + +@app.post( + "/topology/apply", + responses={ + 400: {"description": "Malformed hydrated topology or hash mismatch"}, + 409: {"description": "A different topology is already applied"}, + 500: {"description": "Docker or compose raised while applying"}, + }, +) +async def topology_apply(req: ApplyTopologyRequest) -> dict: + store = _store() + try: + await _topology_ops.apply(req.hydrated, req.version_hash, store) + except _topology_ops.HashMismatch as exc: + raise HTTPException(status_code=400, detail=str(exc)) from exc + except ValidationError as exc: + raise HTTPException(status_code=400, detail=str(exc)) from exc + except AlreadyApplied as exc: + raise HTTPException(status_code=409, detail=str(exc)) from exc + except Exception as exc: + log.exception("agent.topology_apply failed") + topology_id = (req.hydrated.get("topology") or {}).get("id") + if topology_id: + try: + store.record_error(str(topology_id), str(exc)[:500]) + except Exception: # noqa: BLE001 — don't mask original failure + log.exception("failed to record apply error") + raise HTTPException(status_code=500, detail=str(exc)) from exc + return {"status": "applied", "version_hash": req.version_hash} + + +@app.post( + "/topology/teardown", + responses={500: {"description": "Docker or compose raised while tearing down"}}, +) +async def topology_teardown(req: TeardownTopologyRequest) -> dict: + try: + await _topology_ops.teardown(req.topology_id, _store()) + except Exception as exc: + log.exception("agent.topology_teardown failed") + raise HTTPException(status_code=500, detail=str(exc)) from exc + return {"status": "torn_down", "topology_id": req.topology_id} + + +@app.get("/topology/state") +async def topology_state() -> dict: + return _topology_ops.state(_store()) + + @app.post( "/mutate", responses={501: {"description": "Worker-side mutate not yet implemented"}}, diff --git a/decnet/agent/topology_ops.py b/decnet/agent/topology_ops.py new file mode 100644 index 00000000..c11e7a52 --- /dev/null +++ b/decnet/agent/topology_ops.py @@ -0,0 +1,184 @@ +"""Agent-side topology apply/teardown/state primitives. + +Wraps the compose + bridge machinery from :mod:`decnet.engine.deployer` +so the agent can drive a topology without ever touching the master's +sqlmodel repo. The master-side ``deploy_topology`` always calls +``transition_status(repo, …)`` which is useless (and unreachable) on +an agent — here we operate purely on a hydrated dict + the local +:class:`TopologyStore`. + +v1 constraint: one topology per agent. A second apply for a different +``topology_id`` raises :class:`AlreadyApplied` (the endpoint maps that +to 409). +""" +from __future__ import annotations + +import asyncio +import subprocess # nosec B404 +from typing import Any + +import docker + +from decnet.agent.topology_store import ( + AlreadyApplied, + TopologyStore, + observed, +) +from decnet.engine.deployer import ( + _compose, + _compose_with_retry, + _teardown_order, + _topology_compose_path, +) +from decnet.logging import get_logger +from decnet.network import create_bridge_network, remove_bridge_network +from decnet.topology.compose import ( + _network_name as _topology_network_name, + write_topology_compose, +) +from decnet.topology.hashing import canonical_hash +from decnet.topology.validate import ( + ValidationError, + errors as _validation_errors, + validate as _validate_topology, +) + +log = get_logger("agent.topology_ops") + + +class HashMismatch(RuntimeError): + """Raised when the master-provided version_hash doesn't match what we + hash locally — suggests serialisation drift. We fail loudly rather + than silently papering over a schema mismatch.""" + + +def _topology_id(hydrated: dict[str, Any]) -> str: + topo = hydrated.get("topology") or {} + tid = topo.get("id") + if not tid: + raise ValueError("hydrated topology missing topology.id") + return str(tid) + + +async def apply( + hydrated: dict[str, Any], + version_hash: str, + store: TopologyStore, +) -> None: + """Materialise *hydrated* on this agent and record it in *store*. + + Raises: + HashMismatch: master and agent disagree on the canonical hash — + don't touch docker, fail the apply. + AlreadyApplied: a different topology is already applied here. + ValidationError: topology fails structural validation. + Any docker / compose error propagates up; the endpoint maps it + to 500 and records the message on the store row. + """ + local_hash = canonical_hash(hydrated) + if local_hash != version_hash: + raise HashMismatch( + f"master hash {version_hash!r} does not match agent hash " + f"{local_hash!r} — refusing to apply" + ) + + issues = _validate_topology(hydrated) + if _validation_errors(issues): + raise ValidationError(issues) + + topology_id = _topology_id(hydrated) + # v1 guard: refuse cross-topology overwrite up-front. Same check + # lives in store.put() but we want a clean 409 path before we + # start mutating docker state. + existing = store.current() + if existing is not None and existing.topology_id != topology_id: + raise AlreadyApplied( + f"agent already has topology {existing.topology_id!r}; " + f"cannot apply {topology_id!r}" + ) + + lans = hydrated["lans"] + compose_path = _topology_compose_path(topology_id) + client = docker.from_env() + + # Bridges + compose are sync/blocking; hop to a thread so we don't + # stall the event loop on a slow docker daemon. + def _materialise() -> None: + for lan in lans: + net_name = _topology_network_name(topology_id, lan["name"]) + internal = not lan["is_dmz"] + create_bridge_network( + client, net_name, lan["subnet"], internal=internal + ) + write_topology_compose(hydrated, compose_path) + _compose_with_retry("up", "--build", "-d", compose_file=compose_path) + + await asyncio.to_thread(_materialise) + + store.put(topology_id, version_hash, hydrated) + log.info( + "topology %s applied on agent (%d LANs)", topology_id, len(lans) + ) + + +async def teardown( + topology_id: str, + store: TopologyStore, +) -> None: + """Tear down *topology_id* on this agent. Idempotent: if there's no + record and no compose file, it's a no-op that still returns cleanly.""" + row = store.current() + # Prefer the stored hydrated blob — it's what we applied with. If + # it's gone (db wiped) but compose-file lingers, we still try to + # compose-down and delete bridges by scanning the compose file's + # LAN membership list via the hydrated blob if available. + hydrated = row.hydrated if row and row.topology_id == topology_id else None + compose_path = _topology_compose_path(topology_id) + client = docker.from_env() + + def _dismantle() -> None: + if compose_path.exists(): + try: + _compose("down", "--remove-orphans", compose_file=compose_path) + except subprocess.CalledProcessError as exc: + log.warning( + "topology %s compose down failed (continuing): %s", + topology_id, exc, + ) + if hydrated is not None: + for lan_name in _teardown_order(hydrated["lans"]): + net_name = _topology_network_name(topology_id, lan_name) + remove_bridge_network(client, net_name) + if compose_path.exists(): + compose_path.unlink() + + await asyncio.to_thread(_dismantle) + store.clear(topology_id) + log.info("topology %s torn down on agent", topology_id) + + +def state(store: TopologyStore) -> dict[str, Any]: + """Snapshot-plus-live-observation — the shape the heartbeat embeds.""" + row = store.current() + try: + obs = observed(docker.from_env()) + except Exception as exc: # noqa: BLE001 — docker socket may be gone + obs = {"error": str(exc)[:200]} + if row is None: + return { + "topology_id": None, + "applied_version_hash": None, + "applied_at": None, + "last_error": None, + "observed": obs, + } + return { + "topology_id": row.topology_id, + "applied_version_hash": row.applied_version_hash, + "applied_at": row.applied_at, + "last_error": row.last_error, + "observed": obs, + } + + +__all__ = ["apply", "teardown", "state", "HashMismatch"] diff --git a/decnet/agent/topology_store.py b/decnet/agent/topology_store.py index f02bdf09..d3614927 100644 --- a/decnet/agent/topology_store.py +++ b/decnet/agent/topology_store.py @@ -58,7 +58,11 @@ class TopologyStore: def __init__(self, db_path: pathlib.Path) -> None: db_path.parent.mkdir(parents=True, exist_ok=True) - self._conn = sqlite3.connect(str(db_path)) + # check_same_thread=False: Starlette/FastAPI runs sync endpoint + # bodies on a worker thread distinct from where `app` is imported. + # The agent is single-process, so there's no real contention — + # sqlite's own connection lock is enough. + self._conn = sqlite3.connect(str(db_path), check_same_thread=False) self._conn.execute( "CREATE TABLE IF NOT EXISTS applied_topology (" " topology_id TEXT PRIMARY KEY," diff --git a/tests/swarm/test_agent_topology_endpoints.py b/tests/swarm/test_agent_topology_endpoints.py new file mode 100644 index 00000000..7d886280 --- /dev/null +++ b/tests/swarm/test_agent_topology_endpoints.py @@ -0,0 +1,145 @@ +"""Agent topology endpoints — contract-level tests with mocked ops.""" +from __future__ import annotations + +import pathlib + +import pytest +from fastapi.testclient import TestClient + +from decnet.agent import app as _agent_app +from decnet.agent import topology_ops as _ops +from decnet.agent.topology_store import AlreadyApplied + + +@pytest.fixture(autouse=True) +def _isolate_store(monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path): + """Point the singleton at a tmp dir and reset it between tests.""" + monkeypatch.setenv("DECNET_AGENT_DIR", str(tmp_path)) + # Force a fresh store per test. + if _agent_app._topology_store is not None: + _agent_app._topology_store.close() + _agent_app._topology_store = None + yield + if _agent_app._topology_store is not None: + _agent_app._topology_store.close() + _agent_app._topology_store = None + + +def _hydrated(topology_id: str = "top-1") -> dict: + return { + "topology": {"id": topology_id, "name": "n", "mode": "agent"}, + "lans": [], + "deckies": [], + "edges": [], + } + + +def test_topology_state_idle() -> None: + client = TestClient(_agent_app.app) + resp = client.get("/topology/state") + assert resp.status_code == 200 + body = resp.json() + assert body["topology_id"] is None + assert body["applied_version_hash"] is None + assert "observed" in body + + +def test_topology_apply_routes_to_ops(monkeypatch: pytest.MonkeyPatch) -> None: + called: dict = {} + + async def _fake_apply(hydrated, version_hash, store): + called["hydrated"] = hydrated + called["version_hash"] = version_hash + # Simulate ops bookkeeping. + store.put(hydrated["topology"]["id"], version_hash, hydrated) + + monkeypatch.setattr(_ops, "apply", _fake_apply) + + client = TestClient(_agent_app.app) + resp = client.post( + "/topology/apply", + json={"hydrated": _hydrated(), "version_hash": "abc"}, + ) + assert resp.status_code == 200, resp.text + assert resp.json() == {"status": "applied", "version_hash": "abc"} + assert called["version_hash"] == "abc" + + +def test_topology_apply_hash_mismatch_is_400(monkeypatch: pytest.MonkeyPatch) -> None: + async def _boom(*_a, **_kw): + raise _ops.HashMismatch("master hash != agent hash") + + monkeypatch.setattr(_ops, "apply", _boom) + + client = TestClient(_agent_app.app) + resp = client.post( + "/topology/apply", + json={"hydrated": _hydrated(), "version_hash": "wrong"}, + ) + assert resp.status_code == 400 + assert "hash" in resp.json()["detail"].lower() + + +def test_topology_apply_conflict_is_409(monkeypatch: pytest.MonkeyPatch) -> None: + async def _boom(*_a, **_kw): + raise AlreadyApplied("another topology already applied") + + monkeypatch.setattr(_ops, "apply", _boom) + + client = TestClient(_agent_app.app) + resp = client.post( + "/topology/apply", + json={"hydrated": _hydrated("top-2"), "version_hash": "h"}, + ) + assert resp.status_code == 409 + + +def test_topology_apply_docker_failure_is_500_and_records_error( + monkeypatch: pytest.MonkeyPatch, +) -> None: + async def _boom(*_a, **_kw): + raise RuntimeError("docker down") + + monkeypatch.setattr(_ops, "apply", _boom) + client = TestClient(_agent_app.app) + resp = client.post( + "/topology/apply", + json={"hydrated": _hydrated("top-err"), "version_hash": "h"}, + ) + assert resp.status_code == 500 + assert "docker down" in resp.json()["detail"] + + +def test_topology_teardown_routes_to_ops(monkeypatch: pytest.MonkeyPatch) -> None: + called: dict = {} + + async def _fake_teardown(topology_id, store): + called["topology_id"] = topology_id + store.clear(topology_id) + + monkeypatch.setattr(_ops, "teardown", _fake_teardown) + + client = TestClient(_agent_app.app) + resp = client.post( + "/topology/teardown", json={"topology_id": "top-gone"} + ) + assert resp.status_code == 200 + assert called["topology_id"] == "top-gone" + + +def test_topology_teardown_failure_is_500(monkeypatch: pytest.MonkeyPatch) -> None: + async def _boom(*_a, **_kw): + raise RuntimeError("compose refused") + + monkeypatch.setattr(_ops, "teardown", _boom) + + client = TestClient(_agent_app.app) + resp = client.post( + "/topology/teardown", json={"topology_id": "top-1"} + ) + assert resp.status_code == 500 + + +def test_routes_registered() -> None: + paths = {r.path for r in _agent_app.app.routes if hasattr(r, "path")} + assert {"/topology/apply", "/topology/teardown", "/topology/state"} <= paths