From 8257bcc031acb2802355b9401dac5681dea16ca5 Mon Sep 17 00:00:00 2001 From: anti Date: Sat, 18 Apr 2026 07:15:53 -0400 Subject: [PATCH] feat(swarm): worker agent + fix pre-existing base_repo coverage test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Worker agent (decnet.agent): - mTLS FastAPI service exposing /deploy, /teardown, /status, /health, /mutate. uvicorn enforces CERT_REQUIRED with the DECNET CA pinned. - executor.py offloads the blocking deployer onto asyncio.to_thread so the event loop stays responsive. - server.py refuses to start without an enrolled bundle in ~/.decnet/agent/ — unauthenticated agents are not a supported mode. - docs/openapi disabled on the agent — narrow attack surface. tests/test_base_repo.py: DummyRepo was missing get_attacker_artifacts (pre-existing abstractmethod) and so could not be instantiated. Added the stub + coverage for the new swarm CRUD surface on BaseRepository. --- decnet/agent/__init__.py | 7 +++ decnet/agent/app.py | 100 ++++++++++++++++++++++++++++++++++ decnet/agent/executor.py | 45 +++++++++++++++ decnet/agent/server.py | 70 ++++++++++++++++++++++++ tests/swarm/test_agent_app.py | 45 +++++++++++++++ tests/test_base_repo.py | 18 ++++++ 6 files changed, 285 insertions(+) create mode 100644 decnet/agent/__init__.py create mode 100644 decnet/agent/app.py create mode 100644 decnet/agent/executor.py create mode 100644 decnet/agent/server.py create mode 100644 tests/swarm/test_agent_app.py diff --git a/decnet/agent/__init__.py b/decnet/agent/__init__.py new file mode 100644 index 0000000..6d65c0f --- /dev/null +++ b/decnet/agent/__init__.py @@ -0,0 +1,7 @@ +"""DECNET worker agent — runs on every SWARM worker host. + +Exposes an mTLS-protected FastAPI service the master's SWARM controller +calls to deploy, mutate, and tear down deckies locally. The agent reuses +the existing `decnet.engine.deployer` code path unchanged, so a worker runs +deckies the same way `decnet deploy --mode unihost` does today. +""" diff --git a/decnet/agent/app.py b/decnet/agent/app.py new file mode 100644 index 0000000..fb72390 --- /dev/null +++ b/decnet/agent/app.py @@ -0,0 +1,100 @@ +"""Worker-side FastAPI app. + +Protected by mTLS at the ASGI/uvicorn transport layer: uvicorn is started +with ``--ssl-ca-certs`` + ``--ssl-cert-reqs 2`` (CERT_REQUIRED), so any +client that cannot prove a cert signed by the DECNET CA is rejected before +reaching a handler. Once past the TLS handshake, all peers are trusted +equally (the only entity holding a CA-signed cert is the master +controller). + +Endpoints mirror the existing unihost CLI verbs: + +* ``POST /deploy`` — body: serialized ``DecnetConfig`` +* ``POST /teardown`` — body: optional ``{"decky_id": "..."}`` +* ``POST /mutate`` — body: ``{"decky_id": "...", "services": [...]}`` +* ``GET /status`` — deployment snapshot +* ``GET /health`` — liveness probe, does NOT require mTLS? No — mTLS + still required; master pings it with its cert. +""" +from __future__ import annotations + +from typing import Optional + +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel, Field + +from decnet.agent import executor as _exec +from decnet.config import DecnetConfig +from decnet.logging import get_logger + +log = get_logger("agent.app") + +app = FastAPI( + title="DECNET SWARM Agent", + version="0.1.0", + docs_url=None, # no interactive docs on worker — narrow attack surface + redoc_url=None, + openapi_url=None, +) + + +# ------------------------------------------------------------------ schemas + +class DeployRequest(BaseModel): + config: DecnetConfig = Field(..., description="Full DecnetConfig to materialise on this worker") + dry_run: bool = False + no_cache: bool = False + + +class TeardownRequest(BaseModel): + decky_id: Optional[str] = None + + +class MutateRequest(BaseModel): + decky_id: str + services: list[str] + + +# ------------------------------------------------------------------ routes + +@app.get("/health") +async def health() -> dict[str, str]: + return {"status": "ok"} + + +@app.get("/status") +async def status() -> dict: + return await _exec.status() + + +@app.post("/deploy") +async def deploy(req: DeployRequest) -> dict: + try: + await _exec.deploy(req.config, dry_run=req.dry_run, no_cache=req.no_cache) + except Exception as exc: + log.exception("agent.deploy failed") + raise HTTPException(status_code=500, detail=str(exc)) from exc + return {"status": "deployed", "deckies": len(req.config.deckies)} + + +@app.post("/teardown") +async def teardown(req: TeardownRequest) -> dict: + try: + await _exec.teardown(req.decky_id) + except Exception as exc: + log.exception("agent.teardown failed") + raise HTTPException(status_code=500, detail=str(exc)) from exc + return {"status": "torn_down", "decky_id": req.decky_id} + + +@app.post("/mutate") +async def mutate(req: MutateRequest) -> dict: + # Service rotation is routed through the deployer's existing mutate path + # by the master (worker-side mutate is a redeploy of a single decky with + # the new service set). For v1 we accept the request and ask the master + # to send a full /deploy with the updated DecnetConfig — simpler and + # avoids duplicating mutation logic on the worker. + raise HTTPException( + status_code=501, + detail="Per-decky mutate is performed via /deploy with updated services", + ) diff --git a/decnet/agent/executor.py b/decnet/agent/executor.py new file mode 100644 index 0000000..356f4f8 --- /dev/null +++ b/decnet/agent/executor.py @@ -0,0 +1,45 @@ +"""Thin adapter between the agent's HTTP endpoints and the existing +``decnet.engine.deployer`` code path. + +Kept deliberately small: the agent does not re-implement deployment logic, +it only translates a master RPC into the same function calls the unihost +CLI already uses. Everything runs in a worker thread (the deployer is +blocking) so the FastAPI event loop stays responsive. +""" +from __future__ import annotations + +import asyncio +from typing import Any + +from decnet.engine import deployer as _deployer +from decnet.config import DecnetConfig, load_state, clear_state +from decnet.logging import get_logger + +log = get_logger("agent.executor") + + +async def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False) -> None: + """Run the blocking deployer off-loop. The deployer itself calls + save_state() internally once the compose file is materialised.""" + log.info("agent.deploy name=%s deckies=%d", config.name, len(config.deckies)) + await asyncio.to_thread(_deployer.deploy, config, dry_run, no_cache, False) + + +async def teardown(decky_id: str | None = None) -> None: + log.info("agent.teardown decky_id=%s", decky_id) + await asyncio.to_thread(_deployer.teardown, decky_id) + if decky_id is None: + await asyncio.to_thread(clear_state) + + +async def status() -> dict[str, Any]: + state = await asyncio.to_thread(load_state) + if state is None: + return {"deployed": False, "deckies": []} + config, _compose_path = state + return { + "deployed": True, + "name": getattr(config, "name", None), + "compose_path": str(_compose_path), + "deckies": [d.model_dump() for d in config.deckies], + } diff --git a/decnet/agent/server.py b/decnet/agent/server.py new file mode 100644 index 0000000..663bc35 --- /dev/null +++ b/decnet/agent/server.py @@ -0,0 +1,70 @@ +"""Worker-agent uvicorn launcher. + +Starts ``decnet.agent.app:app`` over HTTPS with mTLS enforcement. The +worker must already have a bundle in ``~/.decnet/agent/`` (delivered by +``decnet swarm enroll`` from the master); if it does not, we refuse to +start — unauthenticated agents are not a supported mode. +""" +from __future__ import annotations + +import os +import pathlib +import signal +import subprocess # nosec B404 +import sys + +from decnet.logging import get_logger +from decnet.swarm import pki + +log = get_logger("agent.server") + + +def run(host: str, port: int, agent_dir: pathlib.Path = pki.DEFAULT_AGENT_DIR) -> int: + bundle = pki.load_worker_bundle(agent_dir) + if bundle is None: + print( + f"[agent] No cert bundle at {agent_dir}. " + f"Run `decnet swarm enroll` from the master first.", + file=sys.stderr, + ) + return 2 + + keyfile = agent_dir / "worker.key" + certfile = agent_dir / "worker.crt" + cafile = agent_dir / "ca.crt" + + cmd = [ + sys.executable, + "-m", + "uvicorn", + "decnet.agent.app:app", + "--host", + host, + "--port", + str(port), + "--ssl-keyfile", + str(keyfile), + "--ssl-certfile", + str(certfile), + "--ssl-ca-certs", + str(cafile), + # 2 == ssl.CERT_REQUIRED — clients MUST present a CA-signed cert. + "--ssl-cert-reqs", + "2", + ] + log.info("agent starting host=%s port=%d bundle=%s", host, port, agent_dir) + # Own process group for clean Ctrl+C / SIGTERM propagation to uvicorn + # workers (same pattern as `decnet api`). + proc = subprocess.Popen(cmd, start_new_session=True) # nosec B603 + try: + return proc.wait() + except KeyboardInterrupt: + try: + os.killpg(proc.pid, signal.SIGTERM) + try: + return proc.wait(timeout=10) + except subprocess.TimeoutExpired: + os.killpg(proc.pid, signal.SIGKILL) + return proc.wait() + except ProcessLookupError: + return 0 diff --git a/tests/swarm/test_agent_app.py b/tests/swarm/test_agent_app.py new file mode 100644 index 0000000..fa02817 --- /dev/null +++ b/tests/swarm/test_agent_app.py @@ -0,0 +1,45 @@ +"""Agent FastAPI app — static/contract checks only. + +We deliberately do NOT spin uvicorn up in-process here: the mTLS layer is +enforced by uvicorn itself (via --ssl-cert-reqs 2) and is validated in the +VM integration suite. What we CAN assert in unit scope is the route +surface + request/response schema. +""" +from __future__ import annotations + +from fastapi.testclient import TestClient + +from decnet.agent.app import app + + +def test_health_endpoint() -> None: + client = TestClient(app) + resp = client.get("/health") + assert resp.status_code == 200 + assert resp.json() == {"status": "ok"} + + +def test_status_when_not_deployed() -> None: + client = TestClient(app) + resp = client.get("/status") + assert resp.status_code == 200 + body = resp.json() + assert "deployed" in body + assert "deckies" in body + + +def test_mutate_is_501() -> None: + client = TestClient(app) + resp = client.post("/mutate", json={"decky_id": "decky-01", "services": ["ssh"]}) + assert resp.status_code == 501 + + +def test_deploy_rejects_malformed_body() -> None: + client = TestClient(app) + resp = client.post("/deploy", json={"not": "a config"}) + assert resp.status_code == 422 # pydantic validation + + +def test_route_set() -> None: + paths = {r.path for r in app.routes if hasattr(r, "path")} + assert {"/health", "/status", "/deploy", "/teardown", "/mutate"} <= paths diff --git a/tests/test_base_repo.py b/tests/test_base_repo.py index dd7531e..7750f69 100644 --- a/tests/test_base_repo.py +++ b/tests/test_base_repo.py @@ -37,6 +37,7 @@ class DummyRepo(BaseRepository): async def delete_user(self, u): await super().delete_user(u) async def update_user_role(self, u, r): await super().update_user_role(u, r) async def purge_logs_and_bounties(self): await super().purge_logs_and_bounties() + async def get_attacker_artifacts(self, uuid): await super().get_attacker_artifacts(uuid) @pytest.mark.asyncio async def test_base_repo_coverage(): @@ -73,3 +74,20 @@ async def test_base_repo_coverage(): await dr.delete_user("a") await dr.update_user_role("a", "admin") await dr.purge_logs_and_bounties() + await dr.get_attacker_artifacts("a") + + # Swarm methods: default NotImplementedError on BaseRepository. Covering + # them here keeps the coverage contract honest for the swarm CRUD surface. + for coro, args in [ + (dr.add_swarm_host, ({},)), + (dr.get_swarm_host_by_name, ("w",)), + (dr.get_swarm_host_by_uuid, ("u",)), + (dr.list_swarm_hosts, ()), + (dr.update_swarm_host, ("u", {})), + (dr.delete_swarm_host, ("u",)), + (dr.upsert_decky_shard, ({},)), + (dr.list_decky_shards, ()), + (dr.delete_decky_shards_for_host, ("u",)), + ]: + with pytest.raises(NotImplementedError): + await coro(*args)