Files
DECNET/decnet/agent/app.py
anti 65fc9ac2b9 fix(tests): clean up two pre-existing failures before config work
- decnet/agent/app.py /health: drop leftover 'push-test-2' canary
  planted during live VM push verification and never cleaned up;
  test_health_endpoint asserts the exact dict shape.
- tests/test_factory.py: switch the lazy-engine check from
  mysql+aiomysql (not in pyproject) to mysql+asyncmy (the driver the
  project actually ships). The test does not hit the wire so the
  dialect swap is safe.

Both were red on `pytest tests/` before any config/auto-spawn work
began; fixing them here so the upcoming commits land on a green
full-suite baseline.
2026-04-19 03:17:17 -04:00

101 lines
3.2 KiB
Python

"""Worker-side FastAPI app.
Protected by mTLS at the ASGI/uvicorn transport layer: uvicorn is started
with ``--ssl-ca-certs`` + ``--ssl-cert-reqs 2`` (CERT_REQUIRED), so any
client that cannot prove a cert signed by the DECNET CA is rejected before
reaching a handler. Once past the TLS handshake, all peers are trusted
equally (the only entity holding a CA-signed cert is the master
controller).
Endpoints mirror the existing unihost CLI verbs:
* ``POST /deploy`` — body: serialized ``DecnetConfig``
* ``POST /teardown`` — body: optional ``{"decky_id": "..."}``
* ``POST /mutate`` — body: ``{"decky_id": "...", "services": [...]}``
* ``GET /status`` — deployment snapshot
* ``GET /health`` — liveness probe, does NOT require mTLS? No — mTLS
still required; master pings it with its cert.
"""
from __future__ import annotations
from typing import Optional
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from decnet.agent import executor as _exec
from decnet.config import DecnetConfig
from decnet.logging import get_logger
log = get_logger("agent.app")
app = FastAPI(
title="DECNET SWARM Agent",
version="0.1.0",
docs_url=None, # no interactive docs on worker — narrow attack surface
redoc_url=None,
openapi_url=None,
)
# ------------------------------------------------------------------ schemas
class DeployRequest(BaseModel):
config: DecnetConfig = Field(..., description="Full DecnetConfig to materialise on this worker")
dry_run: bool = False
no_cache: bool = False
class TeardownRequest(BaseModel):
decky_id: Optional[str] = None
class MutateRequest(BaseModel):
decky_id: str
services: list[str]
# ------------------------------------------------------------------ routes
@app.get("/health")
async def health() -> dict[str, str]:
return {"status": "ok"}
@app.get("/status")
async def status() -> dict:
return await _exec.status()
@app.post("/deploy")
async def deploy(req: DeployRequest) -> dict:
try:
await _exec.deploy(req.config, dry_run=req.dry_run, no_cache=req.no_cache)
except Exception as exc:
log.exception("agent.deploy failed")
raise HTTPException(status_code=500, detail=str(exc)) from exc
return {"status": "deployed", "deckies": len(req.config.deckies)}
@app.post("/teardown")
async def teardown(req: TeardownRequest) -> dict:
try:
await _exec.teardown(req.decky_id)
except Exception as exc:
log.exception("agent.teardown failed")
raise HTTPException(status_code=500, detail=str(exc)) from exc
return {"status": "torn_down", "decky_id": req.decky_id}
@app.post("/mutate")
async def mutate(req: MutateRequest) -> dict:
# Service rotation is routed through the deployer's existing mutate path
# by the master (worker-side mutate is a redeploy of a single decky with
# the new service set). For v1 we accept the request and ask the master
# to send a full /deploy with the updated DecnetConfig — simpler and
# avoids duplicating mutation logic on the worker.
raise HTTPException(
status_code=501,
detail="Per-decky mutate is performed via /deploy with updated services",
)