feat(agent): topology apply/teardown/state endpoints

New mTLS-protected routes on the agent:

- POST /topology/apply — master pushes {hydrated, version_hash}.
  Validates the hash matches locally (serialisation drift guard),
  runs the topology through the same validator/composer pipeline
  used master-side, then creates bridges + compose up + records the
  apply in topology.db.
- POST /topology/teardown — dismantles compose, removes bridges,
  clears topology.db.  Idempotent.
- GET /topology/state — returns applied row + live docker
  observation for the heartbeat.

Implementation lives in decnet/agent/topology_ops.py; it reuses the
private compose helpers from decnet.engine.deployer so we don't
duplicate compose/project-name plumbing.  The apply path is sync
under the hood (docker SDK + subprocess); we hop to a thread so the
event loop keeps servicing other agent traffic.

v1 is one-topology-per-agent; cross-topology apply returns 409.

Step 4 of the agent <-> topology integration.
This commit is contained in:
2026-04-21 01:25:15 -04:00
parent aea3e7e05b
commit 13cb0ff38e
4 changed files with 431 additions and 2 deletions

View File

@@ -18,20 +18,48 @@ Endpoints mirror the existing unihost CLI verbs:
""" """
from __future__ import annotations from __future__ import annotations
import os
import pathlib
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Optional from typing import Any, Optional
from fastapi import FastAPI, HTTPException from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from decnet.agent import executor as _exec from decnet.agent import executor as _exec
from decnet.agent import heartbeat as _heartbeat from decnet.agent import heartbeat as _heartbeat
from decnet.agent import topology_ops as _topology_ops
from decnet.swarm.pki import DEFAULT_AGENT_DIR
from decnet.agent.topology_store import AlreadyApplied, TopologyStore
from decnet.config import DecnetConfig from decnet.config import DecnetConfig
from decnet.logging import get_logger from decnet.logging import get_logger
from decnet.topology.validate import ValidationError
log = get_logger("agent.app") log = get_logger("agent.app")
def _resolve_agent_dir() -> pathlib.Path:
env = os.environ.get("DECNET_AGENT_DIR")
if env:
return pathlib.Path(env)
system = pathlib.Path("/etc/decnet/agent")
if system.exists():
return system
return DEFAULT_AGENT_DIR
# Module-level singleton. Created lazily on first use so tests can
# monkeypatch DECNET_AGENT_DIR before the store binds to a path.
_topology_store: Optional[TopologyStore] = None
def _store() -> TopologyStore:
global _topology_store
if _topology_store is None:
_topology_store = TopologyStore(_resolve_agent_dir() / "topology.db")
return _topology_store
@asynccontextmanager @asynccontextmanager
async def _lifespan(app: FastAPI): async def _lifespan(app: FastAPI):
# Best-effort: if identity/bundle plumbing isn't configured (e.g. dev # Best-effort: if identity/bundle plumbing isn't configured (e.g. dev
@@ -41,6 +69,10 @@ async def _lifespan(app: FastAPI):
yield yield
finally: finally:
await _heartbeat.stop() await _heartbeat.stop()
global _topology_store
if _topology_store is not None:
_topology_store.close()
_topology_store = None
app = FastAPI( app = FastAPI(
@@ -129,6 +161,70 @@ async def self_destruct() -> dict:
return {"status": "self_destruct_scheduled"} return {"status": "self_destruct_scheduled"}
# ------------------------------------------------------- topology endpoints
class ApplyTopologyRequest(BaseModel):
hydrated: dict[str, Any] = Field(
..., description="Hydrated topology dict from master.persistence.hydrate()"
)
version_hash: str = Field(
..., description="Master's canonical_hash(hydrated); must match ours"
)
class TeardownTopologyRequest(BaseModel):
topology_id: str = Field(..., description="Topology UUID to dismantle")
@app.post(
"/topology/apply",
responses={
400: {"description": "Malformed hydrated topology or hash mismatch"},
409: {"description": "A different topology is already applied"},
500: {"description": "Docker or compose raised while applying"},
},
)
async def topology_apply(req: ApplyTopologyRequest) -> dict:
store = _store()
try:
await _topology_ops.apply(req.hydrated, req.version_hash, store)
except _topology_ops.HashMismatch as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
except ValidationError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
except AlreadyApplied as exc:
raise HTTPException(status_code=409, detail=str(exc)) from exc
except Exception as exc:
log.exception("agent.topology_apply failed")
topology_id = (req.hydrated.get("topology") or {}).get("id")
if topology_id:
try:
store.record_error(str(topology_id), str(exc)[:500])
except Exception: # noqa: BLE001 — don't mask original failure
log.exception("failed to record apply error")
raise HTTPException(status_code=500, detail=str(exc)) from exc
return {"status": "applied", "version_hash": req.version_hash}
@app.post(
"/topology/teardown",
responses={500: {"description": "Docker or compose raised while tearing down"}},
)
async def topology_teardown(req: TeardownTopologyRequest) -> dict:
try:
await _topology_ops.teardown(req.topology_id, _store())
except Exception as exc:
log.exception("agent.topology_teardown failed")
raise HTTPException(status_code=500, detail=str(exc)) from exc
return {"status": "torn_down", "topology_id": req.topology_id}
@app.get("/topology/state")
async def topology_state() -> dict:
return _topology_ops.state(_store())
@app.post( @app.post(
"/mutate", "/mutate",
responses={501: {"description": "Worker-side mutate not yet implemented"}}, responses={501: {"description": "Worker-side mutate not yet implemented"}},

View File

@@ -0,0 +1,184 @@
"""Agent-side topology apply/teardown/state primitives.
Wraps the compose + bridge machinery from :mod:`decnet.engine.deployer`
so the agent can drive a topology without ever touching the master's
sqlmodel repo. The master-side ``deploy_topology`` always calls
``transition_status(repo, …)`` which is useless (and unreachable) on
an agent — here we operate purely on a hydrated dict + the local
:class:`TopologyStore`.
v1 constraint: one topology per agent. A second apply for a different
``topology_id`` raises :class:`AlreadyApplied` (the endpoint maps that
to 409).
"""
from __future__ import annotations
import asyncio
import subprocess # nosec B404
from typing import Any
import docker
from decnet.agent.topology_store import (
AlreadyApplied,
TopologyStore,
observed,
)
from decnet.engine.deployer import (
_compose,
_compose_with_retry,
_teardown_order,
_topology_compose_path,
)
from decnet.logging import get_logger
from decnet.network import create_bridge_network, remove_bridge_network
from decnet.topology.compose import (
_network_name as _topology_network_name,
write_topology_compose,
)
from decnet.topology.hashing import canonical_hash
from decnet.topology.validate import (
ValidationError,
errors as _validation_errors,
validate as _validate_topology,
)
log = get_logger("agent.topology_ops")
class HashMismatch(RuntimeError):
"""Raised when the master-provided version_hash doesn't match what we
hash locally — suggests serialisation drift. We fail loudly rather
than silently papering over a schema mismatch."""
def _topology_id(hydrated: dict[str, Any]) -> str:
topo = hydrated.get("topology") or {}
tid = topo.get("id")
if not tid:
raise ValueError("hydrated topology missing topology.id")
return str(tid)
async def apply(
hydrated: dict[str, Any],
version_hash: str,
store: TopologyStore,
) -> None:
"""Materialise *hydrated* on this agent and record it in *store*.
Raises:
HashMismatch: master and agent disagree on the canonical hash —
don't touch docker, fail the apply.
AlreadyApplied: a different topology is already applied here.
ValidationError: topology fails structural validation.
Any docker / compose error propagates up; the endpoint maps it
to 500 and records the message on the store row.
"""
local_hash = canonical_hash(hydrated)
if local_hash != version_hash:
raise HashMismatch(
f"master hash {version_hash!r} does not match agent hash "
f"{local_hash!r} — refusing to apply"
)
issues = _validate_topology(hydrated)
if _validation_errors(issues):
raise ValidationError(issues)
topology_id = _topology_id(hydrated)
# v1 guard: refuse cross-topology overwrite up-front. Same check
# lives in store.put() but we want a clean 409 path before we
# start mutating docker state.
existing = store.current()
if existing is not None and existing.topology_id != topology_id:
raise AlreadyApplied(
f"agent already has topology {existing.topology_id!r}; "
f"cannot apply {topology_id!r}"
)
lans = hydrated["lans"]
compose_path = _topology_compose_path(topology_id)
client = docker.from_env()
# Bridges + compose are sync/blocking; hop to a thread so we don't
# stall the event loop on a slow docker daemon.
def _materialise() -> None:
for lan in lans:
net_name = _topology_network_name(topology_id, lan["name"])
internal = not lan["is_dmz"]
create_bridge_network(
client, net_name, lan["subnet"], internal=internal
)
write_topology_compose(hydrated, compose_path)
_compose_with_retry("up", "--build", "-d", compose_file=compose_path)
await asyncio.to_thread(_materialise)
store.put(topology_id, version_hash, hydrated)
log.info(
"topology %s applied on agent (%d LANs)", topology_id, len(lans)
)
async def teardown(
topology_id: str,
store: TopologyStore,
) -> None:
"""Tear down *topology_id* on this agent. Idempotent: if there's no
record and no compose file, it's a no-op that still returns cleanly."""
row = store.current()
# Prefer the stored hydrated blob — it's what we applied with. If
# it's gone (db wiped) but compose-file lingers, we still try to
# compose-down and delete bridges by scanning the compose file's
# LAN membership list via the hydrated blob if available.
hydrated = row.hydrated if row and row.topology_id == topology_id else None
compose_path = _topology_compose_path(topology_id)
client = docker.from_env()
def _dismantle() -> None:
if compose_path.exists():
try:
_compose("down", "--remove-orphans", compose_file=compose_path)
except subprocess.CalledProcessError as exc:
log.warning(
"topology %s compose down failed (continuing): %s",
topology_id, exc,
)
if hydrated is not None:
for lan_name in _teardown_order(hydrated["lans"]):
net_name = _topology_network_name(topology_id, lan_name)
remove_bridge_network(client, net_name)
if compose_path.exists():
compose_path.unlink()
await asyncio.to_thread(_dismantle)
store.clear(topology_id)
log.info("topology %s torn down on agent", topology_id)
def state(store: TopologyStore) -> dict[str, Any]:
"""Snapshot-plus-live-observation — the shape the heartbeat embeds."""
row = store.current()
try:
obs = observed(docker.from_env())
except Exception as exc: # noqa: BLE001 — docker socket may be gone
obs = {"error": str(exc)[:200]}
if row is None:
return {
"topology_id": None,
"applied_version_hash": None,
"applied_at": None,
"last_error": None,
"observed": obs,
}
return {
"topology_id": row.topology_id,
"applied_version_hash": row.applied_version_hash,
"applied_at": row.applied_at,
"last_error": row.last_error,
"observed": obs,
}
__all__ = ["apply", "teardown", "state", "HashMismatch"]

View File

@@ -58,7 +58,11 @@ class TopologyStore:
def __init__(self, db_path: pathlib.Path) -> None: def __init__(self, db_path: pathlib.Path) -> None:
db_path.parent.mkdir(parents=True, exist_ok=True) db_path.parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(str(db_path)) # check_same_thread=False: Starlette/FastAPI runs sync endpoint
# bodies on a worker thread distinct from where `app` is imported.
# The agent is single-process, so there's no real contention —
# sqlite's own connection lock is enough.
self._conn = sqlite3.connect(str(db_path), check_same_thread=False)
self._conn.execute( self._conn.execute(
"CREATE TABLE IF NOT EXISTS applied_topology (" "CREATE TABLE IF NOT EXISTS applied_topology ("
" topology_id TEXT PRIMARY KEY," " topology_id TEXT PRIMARY KEY,"

View File

@@ -0,0 +1,145 @@
"""Agent topology endpoints — contract-level tests with mocked ops."""
from __future__ import annotations
import pathlib
import pytest
from fastapi.testclient import TestClient
from decnet.agent import app as _agent_app
from decnet.agent import topology_ops as _ops
from decnet.agent.topology_store import AlreadyApplied
@pytest.fixture(autouse=True)
def _isolate_store(monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path):
"""Point the singleton at a tmp dir and reset it between tests."""
monkeypatch.setenv("DECNET_AGENT_DIR", str(tmp_path))
# Force a fresh store per test.
if _agent_app._topology_store is not None:
_agent_app._topology_store.close()
_agent_app._topology_store = None
yield
if _agent_app._topology_store is not None:
_agent_app._topology_store.close()
_agent_app._topology_store = None
def _hydrated(topology_id: str = "top-1") -> dict:
return {
"topology": {"id": topology_id, "name": "n", "mode": "agent"},
"lans": [],
"deckies": [],
"edges": [],
}
def test_topology_state_idle() -> None:
client = TestClient(_agent_app.app)
resp = client.get("/topology/state")
assert resp.status_code == 200
body = resp.json()
assert body["topology_id"] is None
assert body["applied_version_hash"] is None
assert "observed" in body
def test_topology_apply_routes_to_ops(monkeypatch: pytest.MonkeyPatch) -> None:
called: dict = {}
async def _fake_apply(hydrated, version_hash, store):
called["hydrated"] = hydrated
called["version_hash"] = version_hash
# Simulate ops bookkeeping.
store.put(hydrated["topology"]["id"], version_hash, hydrated)
monkeypatch.setattr(_ops, "apply", _fake_apply)
client = TestClient(_agent_app.app)
resp = client.post(
"/topology/apply",
json={"hydrated": _hydrated(), "version_hash": "abc"},
)
assert resp.status_code == 200, resp.text
assert resp.json() == {"status": "applied", "version_hash": "abc"}
assert called["version_hash"] == "abc"
def test_topology_apply_hash_mismatch_is_400(monkeypatch: pytest.MonkeyPatch) -> None:
async def _boom(*_a, **_kw):
raise _ops.HashMismatch("master hash != agent hash")
monkeypatch.setattr(_ops, "apply", _boom)
client = TestClient(_agent_app.app)
resp = client.post(
"/topology/apply",
json={"hydrated": _hydrated(), "version_hash": "wrong"},
)
assert resp.status_code == 400
assert "hash" in resp.json()["detail"].lower()
def test_topology_apply_conflict_is_409(monkeypatch: pytest.MonkeyPatch) -> None:
async def _boom(*_a, **_kw):
raise AlreadyApplied("another topology already applied")
monkeypatch.setattr(_ops, "apply", _boom)
client = TestClient(_agent_app.app)
resp = client.post(
"/topology/apply",
json={"hydrated": _hydrated("top-2"), "version_hash": "h"},
)
assert resp.status_code == 409
def test_topology_apply_docker_failure_is_500_and_records_error(
monkeypatch: pytest.MonkeyPatch,
) -> None:
async def _boom(*_a, **_kw):
raise RuntimeError("docker down")
monkeypatch.setattr(_ops, "apply", _boom)
client = TestClient(_agent_app.app)
resp = client.post(
"/topology/apply",
json={"hydrated": _hydrated("top-err"), "version_hash": "h"},
)
assert resp.status_code == 500
assert "docker down" in resp.json()["detail"]
def test_topology_teardown_routes_to_ops(monkeypatch: pytest.MonkeyPatch) -> None:
called: dict = {}
async def _fake_teardown(topology_id, store):
called["topology_id"] = topology_id
store.clear(topology_id)
monkeypatch.setattr(_ops, "teardown", _fake_teardown)
client = TestClient(_agent_app.app)
resp = client.post(
"/topology/teardown", json={"topology_id": "top-gone"}
)
assert resp.status_code == 200
assert called["topology_id"] == "top-gone"
def test_topology_teardown_failure_is_500(monkeypatch: pytest.MonkeyPatch) -> None:
async def _boom(*_a, **_kw):
raise RuntimeError("compose refused")
monkeypatch.setattr(_ops, "teardown", _boom)
client = TestClient(_agent_app.app)
resp = client.post(
"/topology/teardown", json={"topology_id": "top-1"}
)
assert resp.status_code == 500
def test_routes_registered() -> None:
paths = {r.path for r in _agent_app.app.routes if hasattr(r, "path")}
assert {"/topology/apply", "/topology/teardown", "/topology/state"} <= paths