Files
DECNET/decnet/web/router/swarm/api_deploy_swarm.py
anti e2d6f857b5 refactor(swarm): move router DTOs into decnet/web/db/models.py
_schemas.py was a local exception to the codebase convention. The rest
of the app keeps all API request/response DTOs in decnet/web/db/models.py
alongside UserResponse, DeployIniRequest, etc. — the swarm endpoints now
follow the same convention (SwarmEnrollRequest, SwarmHostView, etc).
Deletes decnet/web/router/swarm/_schemas.py.
2026-04-18 19:28:15 -04:00

105 lines
3.9 KiB
Python

"""POST /swarm/deploy — shard a DecnetConfig across enrolled workers.
Per worker we build a filtered copy containing only the deckies assigned
to that worker (via ``host_uuid``), then POST it to the worker agent.
The caller is expected to have already set ``host_uuid`` on every decky;
if any decky arrives without one, we fail fast. Auto-sharding lives in
the CLI layer, not here.
"""
from __future__ import annotations
import asyncio
import json
from datetime import datetime, timezone
from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from decnet.config import DecnetConfig, DeckyConfig
from decnet.logging import get_logger
from decnet.swarm.client import AgentClient
from decnet.web.db.repository import BaseRepository
from decnet.web.dependencies import get_repo
from decnet.web.db.models import (
SwarmDeployRequest,
SwarmDeployResponse,
SwarmHostResult,
)
log = get_logger("swarm.deploy")
router = APIRouter()
def _shard_by_host(config: DecnetConfig) -> dict[str, list[DeckyConfig]]:
buckets: dict[str, list[DeckyConfig]] = {}
for d in config.deckies:
if not d.host_uuid:
raise HTTPException(
status_code=400,
detail=f"decky '{d.name}' has no host_uuid — caller must shard before dispatch",
)
buckets.setdefault(d.host_uuid, []).append(d)
return buckets
def _worker_config(base: DecnetConfig, shard: list[DeckyConfig]) -> DecnetConfig:
return base.model_copy(update={"deckies": shard})
@router.post("/deploy", response_model=SwarmDeployResponse, tags=["Swarm Deployments"])
async def api_deploy_swarm(
req: SwarmDeployRequest,
repo: BaseRepository = Depends(get_repo),
) -> SwarmDeployResponse:
if req.config.mode != "swarm":
raise HTTPException(status_code=400, detail="mode must be 'swarm'")
buckets = _shard_by_host(req.config)
hosts: dict[str, dict[str, Any]] = {}
for host_uuid in buckets:
row = await repo.get_swarm_host_by_uuid(host_uuid)
if row is None:
raise HTTPException(status_code=404, detail=f"unknown host_uuid: {host_uuid}")
hosts[host_uuid] = row
async def _dispatch(host_uuid: str, shard: list[DeckyConfig]) -> SwarmHostResult:
host = hosts[host_uuid]
cfg = _worker_config(req.config, shard)
try:
async with AgentClient(host=host) as agent:
body = await agent.deploy(cfg, dry_run=req.dry_run, no_cache=req.no_cache)
for d in shard:
await repo.upsert_decky_shard(
{
"decky_name": d.name,
"host_uuid": host_uuid,
"services": json.dumps(d.services),
"state": "running" if not req.dry_run else "pending",
"last_error": None,
"updated_at": datetime.now(timezone.utc),
}
)
await repo.update_swarm_host(host_uuid, {"status": "active"})
return SwarmHostResult(host_uuid=host_uuid, host_name=host["name"], ok=True, detail=body)
except Exception as exc:
log.exception("swarm.deploy dispatch failed host=%s", host["name"])
for d in shard:
await repo.upsert_decky_shard(
{
"decky_name": d.name,
"host_uuid": host_uuid,
"services": json.dumps(d.services),
"state": "failed",
"last_error": str(exc)[:512],
"updated_at": datetime.now(timezone.utc),
}
)
return SwarmHostResult(host_uuid=host_uuid, host_name=host["name"], ok=False, detail=str(exc))
results = await asyncio.gather(
*(_dispatch(uuid_, shard) for uuid_, shard in buckets.items())
)
return SwarmDeployResponse(results=list(results))