"""POST /swarm/deploy — shard a DecnetConfig across enrolled workers. Per worker we build a filtered copy containing only the deckies assigned to that worker (via ``host_uuid``), then POST it to the worker agent. The caller is expected to have already set ``host_uuid`` on every decky; if any decky arrives without one, we fail fast. Auto-sharding lives in the CLI layer, not here. """ from __future__ import annotations import asyncio import json from datetime import datetime, timezone from typing import Any from fastapi import APIRouter, Depends, HTTPException from decnet.config import DecnetConfig, DeckyConfig from decnet.logging import get_logger from decnet.swarm.client import AgentClient from decnet.web.db.repository import BaseRepository from decnet.web.dependencies import get_repo from decnet.web.router.swarm._schemas import ( DeployRequest, DeployResponse, HostResult, ) log = get_logger("swarm.deploy") router = APIRouter() def _shard_by_host(config: DecnetConfig) -> dict[str, list[DeckyConfig]]: buckets: dict[str, list[DeckyConfig]] = {} for d in config.deckies: if not d.host_uuid: raise HTTPException( status_code=400, detail=f"decky '{d.name}' has no host_uuid — caller must shard before dispatch", ) buckets.setdefault(d.host_uuid, []).append(d) return buckets def _worker_config(base: DecnetConfig, shard: list[DeckyConfig]) -> DecnetConfig: return base.model_copy(update={"deckies": shard}) @router.post("/deploy", response_model=DeployResponse, tags=["Swarm Deployments"]) async def api_deploy_swarm( req: DeployRequest, repo: BaseRepository = Depends(get_repo), ) -> DeployResponse: if req.config.mode != "swarm": raise HTTPException(status_code=400, detail="mode must be 'swarm'") buckets = _shard_by_host(req.config) hosts: dict[str, dict[str, Any]] = {} for host_uuid in buckets: row = await repo.get_swarm_host_by_uuid(host_uuid) if row is None: raise HTTPException(status_code=404, detail=f"unknown host_uuid: {host_uuid}") hosts[host_uuid] = row async def _dispatch(host_uuid: str, shard: list[DeckyConfig]) -> HostResult: host = hosts[host_uuid] cfg = _worker_config(req.config, shard) try: async with AgentClient(host=host) as agent: body = await agent.deploy(cfg, dry_run=req.dry_run, no_cache=req.no_cache) for d in shard: await repo.upsert_decky_shard( { "decky_name": d.name, "host_uuid": host_uuid, "services": json.dumps(d.services), "state": "running" if not req.dry_run else "pending", "last_error": None, "updated_at": datetime.now(timezone.utc), } ) await repo.update_swarm_host(host_uuid, {"status": "active"}) return HostResult(host_uuid=host_uuid, host_name=host["name"], ok=True, detail=body) except Exception as exc: log.exception("swarm.deploy dispatch failed host=%s", host["name"]) for d in shard: await repo.upsert_decky_shard( { "decky_name": d.name, "host_uuid": host_uuid, "services": json.dumps(d.services), "state": "failed", "last_error": str(exc)[:512], "updated_at": datetime.now(timezone.utc), } ) return HostResult(host_uuid=host_uuid, host_name=host["name"], ok=False, detail=str(exc)) results = await asyncio.gather( *(_dispatch(uuid_, shard) for uuid_, shard in buckets.items()) ) return DeployResponse(results=list(results))