From e124f9e296a7cedbe26c31590c3a653e37ee7093 Mon Sep 17 00:00:00 2001 From: anti Date: Thu, 30 Apr 2026 20:25:38 -0400 Subject: [PATCH] refactor(swarm): extract _shard_payload helper and promote _dispatch to module-level --- decnet/web/router/swarm/api_deploy_swarm.py | 118 +++++++++++--------- 1 file changed, 65 insertions(+), 53 deletions(-) diff --git a/decnet/web/router/swarm/api_deploy_swarm.py b/decnet/web/router/swarm/api_deploy_swarm.py index 1142df8e..56f62843 100644 --- a/decnet/web/router/swarm/api_deploy_swarm.py +++ b/decnet/web/router/swarm/api_deploy_swarm.py @@ -57,6 +57,67 @@ def _worker_config( return base.model_copy(update=updates) +def _shard_payload( + d: DeckyConfig, + host_uuid: str, + state: str, + error: str | None, +) -> dict[str, Any]: + return { + "decky_name": d.name, + "host_uuid": host_uuid, + "services": json.dumps(d.services), + "decky_config": d.model_dump_json(), + "decky_ip": d.ip, + "state": state, + "last_error": error, + "updated_at": datetime.now(timezone.utc), + } + + +async def _dispatch( + host_uuid: str, + shard: list[DeckyConfig], + hosts: dict[str, dict[str, Any]], + config: DecnetConfig, + repo: BaseRepository, + dry_run: bool, + no_cache: bool, +) -> SwarmHostResult: + host = hosts[host_uuid] + cfg = _worker_config(config, shard, host) + try: + async with AgentClient(host=host) as agent: + body = await agent.deploy(cfg, dry_run=dry_run, no_cache=no_cache) + for d in shard: + await repo.upsert_decky_shard( + _shard_payload(d, host_uuid, "running" if not dry_run else "pending", None) + ) + await repo.update_swarm_host(host_uuid, {"status": "active"}) + return SwarmHostResult(host_uuid=host_uuid, host_name=host["name"], ok=True, detail=body) + except Exception as exc: + log.exception("swarm.deploy dispatch failed host=%s", host["name"]) + # Compose-up is partial-success-friendly: one decky failing to + # build doesn't roll back the ones that already came up. Ask the + # agent which containers actually exist before painting the whole + # shard red — otherwise decky1 and decky2 look "failed" even + # though they're live on the worker. + runtime: dict[str, Any] = {} + try: + async with AgentClient(host=host) as probe: + snap = await probe.status() + runtime = snap.get("runtime") or {} + except Exception: + log.warning("swarm.deploy: runtime probe failed host=%s — marking shard failed", host["name"]) + for d in shard: + rstate = runtime.get(d.name) or {} + is_up = bool(rstate.get("running")) + await repo.upsert_decky_shard( + _shard_payload(d, host_uuid, "running" if is_up else "failed", None if is_up else str(exc)[:512]) + ) + return SwarmHostResult(host_uuid=host_uuid, host_name=host["name"], ok=False, detail=str(exc)) + + async def dispatch_decnet_config( config: DecnetConfig, repo: BaseRepository, @@ -77,60 +138,11 @@ async def dispatch_decnet_config( raise HTTPException(status_code=404, detail=f"unknown host_uuid: {host_uuid}") hosts[host_uuid] = row - async def _dispatch(host_uuid: str, shard: list[DeckyConfig]) -> SwarmHostResult: - host = hosts[host_uuid] - cfg = _worker_config(config, shard, host) - try: - async with AgentClient(host=host) as agent: - body = await agent.deploy(cfg, dry_run=dry_run, no_cache=no_cache) - for d in shard: - await repo.upsert_decky_shard( - { - "decky_name": d.name, - "host_uuid": host_uuid, - "services": json.dumps(d.services), - "decky_config": d.model_dump_json(), - "decky_ip": d.ip, - "state": "running" if not dry_run else "pending", - "last_error": None, - "updated_at": datetime.now(timezone.utc), - } - ) - await repo.update_swarm_host(host_uuid, {"status": "active"}) - return SwarmHostResult(host_uuid=host_uuid, host_name=host["name"], ok=True, detail=body) - except Exception as exc: - log.exception("swarm.deploy dispatch failed host=%s", host["name"]) - # Compose-up is partial-success-friendly: one decky failing to - # build doesn't roll back the ones that already came up. Ask the - # agent which containers actually exist before painting the whole - # shard red — otherwise decky1 and decky2 look "failed" even - # though they're live on the worker. - runtime: dict[str, Any] = {} - try: - async with AgentClient(host=host) as probe: - snap = await probe.status() - runtime = snap.get("runtime") or {} - except Exception: - log.warning("swarm.deploy: runtime probe failed host=%s — marking shard failed", host["name"]) - for d in shard: - rstate = runtime.get(d.name) or {} - is_up = bool(rstate.get("running")) - await repo.upsert_decky_shard( - { - "decky_name": d.name, - "host_uuid": host_uuid, - "services": json.dumps(d.services), - "decky_config": d.model_dump_json(), - "decky_ip": d.ip, - "state": "running" if is_up else "failed", - "last_error": None if is_up else str(exc)[:512], - "updated_at": datetime.now(timezone.utc), - } - ) - return SwarmHostResult(host_uuid=host_uuid, host_name=host["name"], ok=False, detail=str(exc)) - results = await asyncio.gather( - *(_dispatch(uuid_, shard) for uuid_, shard in buckets.items()) + *( + _dispatch(uuid_, shard, hosts, config, repo, dry_run, no_cache) + for uuid_, shard in buckets.items() + ) ) return SwarmDeployResponse(results=list(results))