diff --git a/decnet/cli.py b/decnet/cli.py index 047ba9c..ebb2190 100644 --- a/decnet/cli.py +++ b/decnet/cli.py @@ -124,6 +124,64 @@ def api( console.print("[red]Failed to start API. Ensure 'uvicorn' is installed in the current environment.[/]") +@app.command() +def swarmctl( + port: int = typer.Option(8770, "--port", help="Port for the swarm controller"), + host: str = typer.Option("127.0.0.1", "--host", help="Bind address for the swarm controller"), + daemon: bool = typer.Option(False, "--daemon", "-d", help="Detach to background as a daemon process"), +) -> None: + """Run the DECNET SWARM controller (master-side, separate process from `decnet api`).""" + import subprocess # nosec B404 + import sys + import os + import signal + + if daemon: + log.info("swarmctl daemonizing host=%s port=%d", host, port) + _daemonize() + + log.info("swarmctl command invoked host=%s port=%d", host, port) + console.print(f"[green]Starting DECNET SWARM controller on {host}:{port}...[/]") + _cmd = [sys.executable, "-m", "uvicorn", "decnet.web.swarm_api:app", + "--host", host, "--port", str(port)] + try: + proc = subprocess.Popen(_cmd, start_new_session=True) # nosec B603 B404 + try: + proc.wait() + except KeyboardInterrupt: + try: + os.killpg(proc.pid, signal.SIGTERM) + try: + proc.wait(timeout=10) + except subprocess.TimeoutExpired: + os.killpg(proc.pid, signal.SIGKILL) + proc.wait() + except ProcessLookupError: + pass + except (FileNotFoundError, subprocess.SubprocessError): + console.print("[red]Failed to start swarmctl. Ensure 'uvicorn' is installed in the current environment.[/]") + + +@app.command() +def agent( + port: int = typer.Option(8765, "--port", help="Port for the worker agent"), + host: str = typer.Option("0.0.0.0", "--host", help="Bind address for the worker agent"), # nosec B104 + daemon: bool = typer.Option(False, "--daemon", "-d", help="Detach to background as a daemon process"), +) -> None: + """Run the DECNET SWARM worker agent (requires a cert bundle in ~/.decnet/agent/).""" + from decnet.agent import server as _agent_server + + if daemon: + log.info("agent daemonizing host=%s port=%d", host, port) + _daemonize() + + log.info("agent command invoked host=%s port=%d", host, port) + console.print(f"[green]Starting DECNET worker agent on {host}:{port} (mTLS)...[/]") + rc = _agent_server.run(host, port) + if rc != 0: + raise typer.Exit(rc) + + @app.command() def deploy( mode: str = typer.Option("unihost", "--mode", "-m", help="Deployment mode: unihost | swarm"), diff --git a/decnet/web/router/swarm/__init__.py b/decnet/web/router/swarm/__init__.py new file mode 100644 index 0000000..b1fac7b --- /dev/null +++ b/decnet/web/router/swarm/__init__.py @@ -0,0 +1,16 @@ +"""Swarm controller routers. + +Mounted onto the swarm-api FastAPI app under the ``/swarm`` prefix. The +controller is a separate process from the main DECNET API so swarm +failures cannot cascade into log ingestion / dashboard serving. +""" +from fastapi import APIRouter + +from .hosts import router as hosts_router +from .deployments import router as deployments_router +from .health import router as health_router + +swarm_router = APIRouter(prefix="/swarm") +swarm_router.include_router(hosts_router) +swarm_router.include_router(deployments_router) +swarm_router.include_router(health_router) diff --git a/decnet/web/router/swarm/deployments.py b/decnet/web/router/swarm/deployments.py new file mode 100644 index 0000000..7afbc0a --- /dev/null +++ b/decnet/web/router/swarm/deployments.py @@ -0,0 +1,164 @@ +"""Deployment dispatch: shard deckies across enrolled workers and push. + +The master owns the DecnetConfig. Per worker we build a filtered copy +containing only the deckies assigned to that worker (via ``host_uuid``), +then POST it to the worker agent. Sharding strategy is explicit: the +caller is expected to have already set ``host_uuid`` on every decky. If +any decky arrives without one, we fail fast — auto-sharding lives in the +CLI layer (task #7), not here. +""" +from __future__ import annotations + +import asyncio +import json +from datetime import datetime, timezone +from typing import Any + +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel, Field + +from decnet.config import DecnetConfig, DeckyConfig +from decnet.logging import get_logger +from decnet.swarm.client import AgentClient +from decnet.web.db.repository import BaseRepository +from decnet.web.dependencies import get_repo + +log = get_logger("swarm.deployments") + +router = APIRouter(tags=["swarm-deployments"]) + + +class DeployRequest(BaseModel): + config: DecnetConfig + dry_run: bool = False + no_cache: bool = False + + +class TeardownRequest(BaseModel): + host_uuid: str | None = Field( + default=None, + description="If set, tear down only this worker; otherwise tear down all hosts", + ) + decky_id: str | None = None + + +class HostResult(BaseModel): + host_uuid: str + host_name: str + ok: bool + detail: Any | None = None + + +class DeployResponse(BaseModel): + results: list[HostResult] + + +# ----------------------------------------------------------------- helpers + + +def _shard_by_host(config: DecnetConfig) -> dict[str, list[DeckyConfig]]: + buckets: dict[str, list[DeckyConfig]] = {} + for d in config.deckies: + if not d.host_uuid: + raise HTTPException( + status_code=400, + detail=f"decky '{d.name}' has no host_uuid — caller must shard before dispatch", + ) + buckets.setdefault(d.host_uuid, []).append(d) + return buckets + + +def _worker_config(base: DecnetConfig, shard: list[DeckyConfig]) -> DecnetConfig: + return base.model_copy(update={"deckies": shard}) + + +# ------------------------------------------------------------------ routes + + +@router.post("/deploy", response_model=DeployResponse) +async def deploy( + req: DeployRequest, + repo: BaseRepository = Depends(get_repo), +) -> DeployResponse: + if req.config.mode != "swarm": + raise HTTPException(status_code=400, detail="mode must be 'swarm'") + + buckets = _shard_by_host(req.config) + + # Resolve host rows in one query-per-host pass; fail fast on unknown uuids. + hosts: dict[str, dict[str, Any]] = {} + for host_uuid in buckets: + row = await repo.get_swarm_host_by_uuid(host_uuid) + if row is None: + raise HTTPException(status_code=404, detail=f"unknown host_uuid: {host_uuid}") + hosts[host_uuid] = row + + async def _dispatch(host_uuid: str, shard: list[DeckyConfig]) -> HostResult: + host = hosts[host_uuid] + cfg = _worker_config(req.config, shard) + try: + async with AgentClient(host=host) as agent: + body = await agent.deploy(cfg, dry_run=req.dry_run, no_cache=req.no_cache) + # Persist a DeckyShard row per decky for status lookups. + for d in shard: + await repo.upsert_decky_shard( + { + "decky_name": d.name, + "host_uuid": host_uuid, + "services": json.dumps(d.services), + "state": "running" if not req.dry_run else "pending", + "last_error": None, + "updated_at": datetime.now(timezone.utc), + } + ) + await repo.update_swarm_host(host_uuid, {"status": "active"}) + return HostResult(host_uuid=host_uuid, host_name=host["name"], ok=True, detail=body) + except Exception as exc: + log.exception("swarm.deploy dispatch failed host=%s", host["name"]) + for d in shard: + await repo.upsert_decky_shard( + { + "decky_name": d.name, + "host_uuid": host_uuid, + "services": json.dumps(d.services), + "state": "failed", + "last_error": str(exc)[:512], + "updated_at": datetime.now(timezone.utc), + } + ) + return HostResult(host_uuid=host_uuid, host_name=host["name"], ok=False, detail=str(exc)) + + results = await asyncio.gather( + *(_dispatch(uuid_, shard) for uuid_, shard in buckets.items()) + ) + return DeployResponse(results=list(results)) + + +@router.post("/teardown", response_model=DeployResponse) +async def teardown( + req: TeardownRequest, + repo: BaseRepository = Depends(get_repo), +) -> DeployResponse: + if req.host_uuid is not None: + row = await repo.get_swarm_host_by_uuid(req.host_uuid) + if row is None: + raise HTTPException(status_code=404, detail="host not found") + targets = [row] + else: + targets = await repo.list_swarm_hosts() + + async def _call(host: dict[str, Any]) -> HostResult: + try: + async with AgentClient(host=host) as agent: + body = await agent.teardown(req.decky_id) + if req.decky_id is None: + await repo.delete_decky_shards_for_host(host["uuid"]) + return HostResult(host_uuid=host["uuid"], host_name=host["name"], ok=True, detail=body) + except Exception as exc: + log.exception("swarm.teardown failed host=%s", host["name"]) + return HostResult( + host_uuid=host["uuid"], host_name=host["name"], ok=False, detail=str(exc) + ) + + results = await asyncio.gather(*(_call(h) for h in targets)) + return DeployResponse(results=list(results)) diff --git a/decnet/web/router/swarm/health.py b/decnet/web/router/swarm/health.py new file mode 100644 index 0000000..c7df01d --- /dev/null +++ b/decnet/web/router/swarm/health.py @@ -0,0 +1,79 @@ +"""Health endpoints for the swarm controller. + +* ``GET /swarm/health`` — liveness of the controller itself (no I/O). +* ``POST /swarm/check`` — active probe of every enrolled worker over mTLS. + Updates ``SwarmHost.status`` and ``last_heartbeat``. +""" +from __future__ import annotations + +import asyncio +from datetime import datetime, timezone +from typing import Any + +from fastapi import APIRouter, Depends +from pydantic import BaseModel + +from decnet.logging import get_logger +from decnet.swarm.client import AgentClient +from decnet.web.db.repository import BaseRepository +from decnet.web.dependencies import get_repo + +log = get_logger("swarm.health") + +router = APIRouter(tags=["swarm-health"]) + + +class HostHealth(BaseModel): + host_uuid: str + name: str + address: str + reachable: bool + detail: Any | None = None + + +class CheckResponse(BaseModel): + results: list[HostHealth] + + +@router.get("/health") +async def health() -> dict[str, str]: + return {"status": "ok", "role": "swarm-controller"} + + +@router.post("/check", response_model=CheckResponse) +async def check( + repo: BaseRepository = Depends(get_repo), +) -> CheckResponse: + hosts = await repo.list_swarm_hosts() + + async def _probe(host: dict[str, Any]) -> HostHealth: + try: + async with AgentClient(host=host) as agent: + body = await agent.health() + await repo.update_swarm_host( + host["uuid"], + { + "status": "active", + "last_heartbeat": datetime.now(timezone.utc), + }, + ) + return HostHealth( + host_uuid=host["uuid"], + name=host["name"], + address=host["address"], + reachable=True, + detail=body, + ) + except Exception as exc: + log.warning("swarm.check unreachable host=%s err=%s", host["name"], exc) + await repo.update_swarm_host(host["uuid"], {"status": "unreachable"}) + return HostHealth( + host_uuid=host["uuid"], + name=host["name"], + address=host["address"], + reachable=False, + detail=str(exc), + ) + + results = await asyncio.gather(*(_probe(h) for h in hosts)) + return CheckResponse(results=list(results)) diff --git a/decnet/web/router/swarm/hosts.py b/decnet/web/router/swarm/hosts.py new file mode 100644 index 0000000..4c1fb2b --- /dev/null +++ b/decnet/web/router/swarm/hosts.py @@ -0,0 +1,162 @@ +"""Swarm host lifecycle endpoints: enroll, list, decommission. + +Enrollment design +----------------- +The master controller holds the CA private key. On ``POST /swarm/enroll`` +it generates a fresh worker keypair + cert (signed by the master CA) and +returns the full bundle to the operator. The operator is responsible for +delivering that bundle to the worker's ``~/.decnet/agent/`` directory +(scp/sshpass/ansible — outside this process's trust boundary). + +Rationale: the worker agent speaks ONLY mTLS. There is no pre-auth +bootstrap endpoint, so there is nothing to attack before the worker is +enrolled. The bundle-delivery step is explicit and auditable. +""" +from __future__ import annotations + +import pathlib +import uuid as _uuid +from datetime import datetime, timezone +from typing import Optional + +from fastapi import APIRouter, Depends, HTTPException, status +from pydantic import BaseModel, Field + +from decnet.swarm import pki +from decnet.web.db.repository import BaseRepository +from decnet.web.dependencies import get_repo + +router = APIRouter(tags=["swarm-hosts"]) + + +# ------------------------------------------------------------------- schemas + + +class EnrollRequest(BaseModel): + name: str = Field(..., min_length=1, max_length=128) + address: str = Field(..., description="IP or DNS the master uses to reach the worker") + agent_port: int = Field(default=8765, ge=1, le=65535) + sans: list[str] = Field( + default_factory=list, + description="Extra SANs (IPs / hostnames) to embed in the worker cert", + ) + notes: Optional[str] = None + + +class EnrolledBundle(BaseModel): + """Cert bundle returned to the operator — must be delivered to the worker.""" + + host_uuid: str + name: str + address: str + agent_port: int + fingerprint: str + ca_cert_pem: str + worker_cert_pem: str + worker_key_pem: str + + +class SwarmHostView(BaseModel): + uuid: str + name: str + address: str + agent_port: int + status: str + last_heartbeat: Optional[datetime] = None + client_cert_fingerprint: str + enrolled_at: datetime + notes: Optional[str] = None + + +# ------------------------------------------------------------------- routes + + +@router.post("/enroll", response_model=EnrolledBundle, status_code=status.HTTP_201_CREATED) +async def enroll( + req: EnrollRequest, + repo: BaseRepository = Depends(get_repo), +) -> EnrolledBundle: + existing = await repo.get_swarm_host_by_name(req.name) + if existing is not None: + raise HTTPException(status_code=409, detail=f"Worker '{req.name}' is already enrolled") + + ca = pki.ensure_ca() + sans = list({*req.sans, req.address, req.name}) + issued = pki.issue_worker_cert(ca, req.name, sans) + + # Persist the bundle under ~/.decnet/ca/workers// so the master + # can replay it if the operator loses the original delivery. + bundle_dir = pki.DEFAULT_CA_DIR / "workers" / req.name + pki.write_worker_bundle(issued, bundle_dir) + + host_uuid = str(_uuid.uuid4()) + await repo.add_swarm_host( + { + "uuid": host_uuid, + "name": req.name, + "address": req.address, + "agent_port": req.agent_port, + "status": "enrolled", + "client_cert_fingerprint": issued.fingerprint_sha256, + "cert_bundle_path": str(bundle_dir), + "enrolled_at": datetime.now(timezone.utc), + "notes": req.notes, + } + ) + return EnrolledBundle( + host_uuid=host_uuid, + name=req.name, + address=req.address, + agent_port=req.agent_port, + fingerprint=issued.fingerprint_sha256, + ca_cert_pem=issued.ca_cert_pem.decode(), + worker_cert_pem=issued.cert_pem.decode(), + worker_key_pem=issued.key_pem.decode(), + ) + + +@router.get("/hosts", response_model=list[SwarmHostView]) +async def list_hosts( + host_status: Optional[str] = None, + repo: BaseRepository = Depends(get_repo), +) -> list[SwarmHostView]: + rows = await repo.list_swarm_hosts(host_status) + return [SwarmHostView(**r) for r in rows] + + +@router.get("/hosts/{uuid}", response_model=SwarmHostView) +async def get_host( + uuid: str, + repo: BaseRepository = Depends(get_repo), +) -> SwarmHostView: + row = await repo.get_swarm_host_by_uuid(uuid) + if row is None: + raise HTTPException(status_code=404, detail="host not found") + return SwarmHostView(**row) + + +@router.delete("/hosts/{uuid}", status_code=status.HTTP_204_NO_CONTENT) +async def decommission( + uuid: str, + repo: BaseRepository = Depends(get_repo), +) -> None: + row = await repo.get_swarm_host_by_uuid(uuid) + if row is None: + raise HTTPException(status_code=404, detail="host not found") + + # Remove shard rows first (we own them; cascade is portable via the repo). + await repo.delete_decky_shards_for_host(uuid) + await repo.delete_swarm_host(uuid) + + # Best-effort bundle cleanup; if the dir was moved manually, don't fail. + bundle_dir = pathlib.Path(row.get("cert_bundle_path") or "") + if bundle_dir.is_dir(): + for child in bundle_dir.iterdir(): + try: + child.unlink() + except OSError: + pass + try: + bundle_dir.rmdir() + except OSError: + pass diff --git a/decnet/web/swarm_api.py b/decnet/web/swarm_api.py new file mode 100644 index 0000000..669252c --- /dev/null +++ b/decnet/web/swarm_api.py @@ -0,0 +1,65 @@ +"""DECNET SWARM Controller — master-side control plane. + +Runs as an independent FastAPI/uvicorn process. Isolated from +``decnet.web.api`` so controller failure cannot cascade to the main API, +ingester, or dashboard (mirrors the existing pattern used by +``decnet api`` with ``start_new_session=True``). + +Responsibilities: +* host enrollment (issues CA-signed worker bundles); +* dispatching DecnetConfig shards to worker agents over mTLS; +* active health probes of enrolled workers. + +The controller *reuses* the same ``get_repo`` dependency as the main API, +so SwarmHost / DeckyShard state is visible to both processes via the +shared DB. +""" +from __future__ import annotations + +from contextlib import asynccontextmanager +from typing import AsyncGenerator + +from fastapi import FastAPI +from fastapi.responses import ORJSONResponse + +from decnet.logging import get_logger +from decnet.swarm import pki +from decnet.swarm.client import ensure_master_identity +from decnet.web.dependencies import repo +from decnet.web.router.swarm import swarm_router + +log = get_logger("swarm_api") + + +@asynccontextmanager +async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: + log.info("swarm-controller starting up") + # Make sure the CA and master client cert exist before we accept any + # request — enrollment needs them and AgentClient needs them. + pki.ensure_ca() + ensure_master_identity() + await repo.initialize() + log.info("swarm-controller ready") + yield + log.info("swarm-controller shutdown") + + +app: FastAPI = FastAPI( + title="DECNET SWARM Controller", + version="0.1.0", + lifespan=lifespan, + default_response_class=ORJSONResponse, + # No interactive docs: the controller is an internal management plane, + # not a public surface. Enable explicitly in dev if needed. + docs_url=None, + redoc_url=None, + openapi_url=None, +) + +app.include_router(swarm_router) + + +@app.get("/health") +async def root_health() -> dict[str, str]: + """Top-level liveness probe (no DB I/O).""" + return {"status": "ok", "role": "swarm-controller"} diff --git a/tests/swarm/test_swarm_api.py b/tests/swarm/test_swarm_api.py new file mode 100644 index 0000000..1c2cc03 --- /dev/null +++ b/tests/swarm/test_swarm_api.py @@ -0,0 +1,294 @@ +"""Unit tests for the SWARM controller FastAPI app. + +Covers the enrollment, host-management, and deployment dispatch routes. +The AgentClient is stubbed so we exercise the controller's logic without +a live mTLS peer (that path has its own roundtrip test). +""" +from __future__ import annotations + +import pathlib +from typing import Any + +import pytest +from fastapi.testclient import TestClient + +from decnet.web.db.factory import get_repository +from decnet.web.dependencies import get_repo + + +@pytest.fixture +def ca_dir(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: + """Redirect the PKI default CA path into tmp so the test CA never + touches ``~/.decnet/ca``.""" + ca = tmp_path / "ca" + from decnet.swarm import pki + + monkeypatch.setattr(pki, "DEFAULT_CA_DIR", ca) + # Also patch the already-imported references inside client.py / routers. + from decnet.swarm import client as swarm_client + from decnet.web.router.swarm import hosts as swarm_hosts + + monkeypatch.setattr(swarm_client, "pki", pki) + monkeypatch.setattr(swarm_hosts, "pki", pki) + return ca + + +@pytest.fixture +def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch): + r = get_repository(db_path=str(tmp_path / "swarm.db")) + # The controller's lifespan initialises the module-level `repo` in + # decnet.web.dependencies. Swap that singleton for our test repo so + # schema creation targets the temp DB. + import decnet.web.dependencies as deps + import decnet.web.swarm_api as swarm_api_mod + + monkeypatch.setattr(deps, "repo", r) + monkeypatch.setattr(swarm_api_mod, "repo", r) + return r + + +@pytest.fixture +def client(repo, ca_dir: pathlib.Path): + from decnet.web.swarm_api import app + + async def _override() -> Any: + return repo + + app.dependency_overrides[get_repo] = _override + with TestClient(app) as c: + yield c + app.dependency_overrides.clear() + + +# ---------------------------------------------------------------- /enroll + + +def test_enroll_creates_host_and_returns_bundle(client: TestClient) -> None: + resp = client.post( + "/swarm/enroll", + json={"name": "worker-a", "address": "10.0.0.5", "agent_port": 8765}, + ) + assert resp.status_code == 201, resp.text + body = resp.json() + assert body["name"] == "worker-a" + assert body["address"] == "10.0.0.5" + assert "-----BEGIN CERTIFICATE-----" in body["worker_cert_pem"] + assert "-----BEGIN PRIVATE KEY-----" in body["worker_key_pem"] + assert "-----BEGIN CERTIFICATE-----" in body["ca_cert_pem"] + assert len(body["fingerprint"]) == 64 # sha256 hex + + +def test_enroll_rejects_duplicate_name(client: TestClient) -> None: + payload = {"name": "worker-dup", "address": "10.0.0.6", "agent_port": 8765} + assert client.post("/swarm/enroll", json=payload).status_code == 201 + resp2 = client.post("/swarm/enroll", json=payload) + assert resp2.status_code == 409 + + +# ---------------------------------------------------------------- /hosts + + +def test_list_hosts_empty(client: TestClient) -> None: + resp = client.get("/swarm/hosts") + assert resp.status_code == 200 + assert resp.json() == [] + + +def test_list_and_get_host_after_enroll(client: TestClient) -> None: + reg = client.post( + "/swarm/enroll", + json={"name": "worker-b", "address": "10.0.0.7", "agent_port": 8765}, + ).json() + uuid = reg["host_uuid"] + + lst = client.get("/swarm/hosts").json() + assert len(lst) == 1 + assert lst[0]["name"] == "worker-b" + + one = client.get(f"/swarm/hosts/{uuid}").json() + assert one["uuid"] == uuid + assert one["status"] == "enrolled" + + +def test_decommission_removes_host_and_bundle( + client: TestClient, ca_dir: pathlib.Path +) -> None: + reg = client.post( + "/swarm/enroll", + json={"name": "worker-c", "address": "10.0.0.8", "agent_port": 8765}, + ).json() + uuid = reg["host_uuid"] + + bundle_dir = ca_dir / "workers" / "worker-c" + assert bundle_dir.is_dir() + + resp = client.delete(f"/swarm/hosts/{uuid}") + assert resp.status_code == 204 + assert client.get(f"/swarm/hosts/{uuid}").status_code == 404 + assert not bundle_dir.exists() + + +# ---------------------------------------------------------------- /deploy + + +class _StubAgentClient: + """Minimal async-context-manager stub mirroring ``AgentClient``.""" + + deployed: list[dict[str, Any]] = [] + torn_down: list[dict[str, Any]] = [] + + def __init__(self, host: dict[str, Any] | None = None, **_: Any) -> None: + self._host = host or {} + + async def __aenter__(self) -> "_StubAgentClient": + return self + + async def __aexit__(self, *exc: Any) -> None: + return None + + async def health(self) -> dict[str, Any]: + return {"status": "ok"} + + async def deploy(self, config: Any, **kw: Any) -> dict[str, Any]: + _StubAgentClient.deployed.append( + {"host": self._host.get("name"), "deckies": [d.name for d in config.deckies]} + ) + return {"status": "deployed", "deckies": len(config.deckies)} + + async def teardown(self, decky_id: str | None = None) -> dict[str, Any]: + _StubAgentClient.torn_down.append( + {"host": self._host.get("name"), "decky_id": decky_id} + ) + return {"status": "torn_down"} + + +@pytest.fixture +def stub_agent(monkeypatch: pytest.MonkeyPatch): + _StubAgentClient.deployed.clear() + _StubAgentClient.torn_down.clear() + from decnet.web.router.swarm import deployments as dep_mod + from decnet.web.router.swarm import health as hlt_mod + + monkeypatch.setattr(dep_mod, "AgentClient", _StubAgentClient) + monkeypatch.setattr(hlt_mod, "AgentClient", _StubAgentClient) + return _StubAgentClient + + +def _decky_dict(name: str, host_uuid: str, ip: str) -> dict[str, Any]: + return { + "name": name, + "ip": ip, + "services": ["ssh"], + "distro": "debian", + "base_image": "debian:bookworm-slim", + "hostname": name, + "host_uuid": host_uuid, + } + + +def test_deploy_shards_across_hosts(client: TestClient, stub_agent) -> None: + h1 = client.post( + "/swarm/enroll", + json={"name": "w1", "address": "10.0.0.1", "agent_port": 8765}, + ).json() + h2 = client.post( + "/swarm/enroll", + json={"name": "w2", "address": "10.0.0.2", "agent_port": 8765}, + ).json() + + cfg = { + "mode": "swarm", + "interface": "eth0", + "subnet": "192.168.1.0/24", + "gateway": "192.168.1.1", + "deckies": [ + _decky_dict("decky-01", h1["host_uuid"], "192.168.1.10"), + _decky_dict("decky-02", h1["host_uuid"], "192.168.1.11"), + _decky_dict("decky-03", h2["host_uuid"], "192.168.1.12"), + ], + } + resp = client.post("/swarm/deploy", json={"config": cfg}) + assert resp.status_code == 200, resp.text + body = resp.json() + assert len(body["results"]) == 2 + assert all(r["ok"] for r in body["results"]) + + by_host = {d["host"]: d["deckies"] for d in stub_agent.deployed} + assert by_host["w1"] == ["decky-01", "decky-02"] + assert by_host["w2"] == ["decky-03"] + + +def test_deploy_rejects_missing_host_uuid(client: TestClient, stub_agent) -> None: + cfg = { + "mode": "swarm", + "interface": "eth0", + "subnet": "192.168.1.0/24", + "gateway": "192.168.1.1", + "deckies": [ + { + "name": "decky-01", + "ip": "192.168.1.10", + "services": ["ssh"], + "distro": "debian", + "base_image": "debian:bookworm-slim", + "hostname": "decky-01", + # host_uuid deliberately omitted + } + ], + } + resp = client.post("/swarm/deploy", json={"config": cfg}) + assert resp.status_code == 400 + assert "host_uuid" in resp.json()["detail"] + + +def test_deploy_rejects_non_swarm_mode(client: TestClient, stub_agent) -> None: + cfg = { + "mode": "unihost", + "interface": "eth0", + "subnet": "192.168.1.0/24", + "gateway": "192.168.1.1", + "deckies": [_decky_dict("decky-01", "fake-uuid", "192.168.1.10")], + } + resp = client.post("/swarm/deploy", json={"config": cfg}) + assert resp.status_code == 400 + + +def test_teardown_all_hosts(client: TestClient, stub_agent) -> None: + for i, addr in enumerate(("10.0.0.1", "10.0.0.2"), start=1): + client.post( + "/swarm/enroll", + json={"name": f"td{i}", "address": addr, "agent_port": 8765}, + ) + resp = client.post("/swarm/teardown", json={}) + assert resp.status_code == 200 + assert len(resp.json()["results"]) == 2 + assert {t["host"] for t in stub_agent.torn_down} == {"td1", "td2"} + + +# ---------------------------------------------------------------- /check + + +def test_check_marks_hosts_active(client: TestClient, stub_agent) -> None: + h = client.post( + "/swarm/enroll", + json={"name": "probe-w", "address": "10.0.0.9", "agent_port": 8765}, + ).json() + + resp = client.post("/swarm/check") + assert resp.status_code == 200 + results = resp.json()["results"] + assert len(results) == 1 + assert results[0]["reachable"] is True + + one = client.get(f"/swarm/hosts/{h['host_uuid']}").json() + assert one["status"] == "active" + assert one["last_heartbeat"] is not None + + +# ---------------------------------------------------------------- /health (root) + + +def test_root_health(client: TestClient) -> None: + resp = client.get("/health") + assert resp.status_code == 200 + assert resp.json()["role"] == "swarm-controller"