diff --git a/decnet/agent/app.py b/decnet/agent/app.py index b6ca27d6..9af23ba3 100644 --- a/decnet/agent/app.py +++ b/decnet/agent/app.py @@ -25,6 +25,7 @@ from contextlib import asynccontextmanager from typing import Any, Optional from fastapi import FastAPI, HTTPException +from fastapi.responses import JSONResponse from pydantic import BaseModel, Field import contextlib @@ -198,15 +199,22 @@ async def status() -> dict: @app.post( "/deploy", - responses={500: {"description": "Deployer raised an exception materialising the config"}}, + status_code=202, + responses={202: {"description": "Deploy accepted; runs in background; lifecycle deltas pushed via heartbeat"}}, ) async def deploy(req: DeployRequest) -> dict: - try: - await _exec.deploy(req.config, dry_run=req.dry_run, no_cache=req.no_cache) - except Exception as exc: - log.exception("agent.deploy failed") - raise HTTPException(status_code=500, detail=str(exc)) from exc - return {"status": "deployed", "deckies": len(req.config.deckies)} + """Spawn the deploy in the background and return 202 immediately. + + The master tracks per-decky completion via lifecycle deltas pushed on + the next heartbeat (one immediate push on completion, plus the + scheduled 30 s ticks as a fallback). Holding the request open across + a multi-minute compose build was the previous source of the wizard + API-hang.""" + asyncio.create_task( + _exec.deploy_async(req.config, dry_run=req.dry_run, no_cache=req.no_cache), + name=f"deploy-{id(req)}", + ) + return {"status": "accepted", "deckies": [d.name for d in req.config.deckies]} @app.post( @@ -308,51 +316,50 @@ async def topology_state() -> dict: @app.post( "/mutate", + status_code=202, responses={ - 404: {"description": "No active deployment, or unknown decky_id"}, - 500: {"description": "Compose rewrite or container restart failed"}, + 202: {"description": "Mutate accepted; runs in background; lifecycle delta pushed via heartbeat"}, + 404: {"description": "No active deployment, or unknown decky_id (dry_run validation only)"}, }, ) -async def mutate(req: MutateRequest) -> dict: - import time - from decnet.composer import write_compose - from decnet.config import load_state, save_state - from decnet.engine import _compose_with_retry - - state = load_state() - if state is None: - raise HTTPException(status_code=404, detail="no active deployment on this worker") - cfg, compose_path = state - - decky = next((d for d in cfg.deckies if d.name == req.decky_id), None) - if decky is None: - raise HTTPException( - status_code=404, detail=f"decky {req.decky_id!r} not found in worker state", - ) - - decky.services = list(req.services) - decky.last_mutated = time.time() +async def mutate(req: MutateRequest) -> Any: + """Spawn the mutate in the background and return 202 immediately. + Master tracks completion via a lifecycle delta pushed on the next + heartbeat (immediate push on completion). ``dry_run`` is still + synchronous — it validates against the worker's current state and + returns the would-be services without spawning a task or touching + docker, so the wizard's preview path stays cheap.""" if req.dry_run: - return { - "status": "dry_run", - "decky_id": decky.name, - "services": list(decky.services), - } - - try: - save_state(cfg, compose_path) - write_compose(cfg, compose_path) - await asyncio.to_thread( - _compose_with_retry, "up", "-d", "--remove-orphans", - compose_file=compose_path, + from decnet.config import load_state + state = load_state() + if state is None: + raise HTTPException( + status_code=404, + detail="no active deployment on this worker", + ) + cfg, _ = state + decky = next((d for d in cfg.deckies if d.name == req.decky_id), None) + if decky is None: + raise HTTPException( + status_code=404, + detail=f"decky {req.decky_id!r} not found in worker state", + ) + return JSONResponse( + status_code=200, + content={ + "status": "dry_run", + "decky_id": req.decky_id, + "services": list(req.services), + }, ) - except Exception as exc: - log.exception("agent.mutate failed") - raise HTTPException(status_code=500, detail=str(exc)) from exc + asyncio.create_task( + _exec.mutate_async(req.decky_id, list(req.services)), + name=f"mutate-{req.decky_id}", + ) return { - "status": "mutated", - "decky_id": decky.name, - "services": list(decky.services), + "status": "accepted", + "decky_id": req.decky_id, + "services": list(req.services), } diff --git a/decnet/agent/executor.py b/decnet/agent/executor.py index 9f0a20aa..32b5221c 100644 --- a/decnet/agent/executor.py +++ b/decnet/agent/executor.py @@ -80,6 +80,99 @@ async def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = F await asyncio.to_thread(_deployer.deploy, config, dry_run, no_cache, False) +async def deploy_async( + config: DecnetConfig, *, dry_run: bool = False, no_cache: bool = False, +) -> None: + """Background-task body for /deploy: run the deploy, then push a + lifecycle delta to the master so it observes terminal transitions + immediately rather than waiting for the next scheduled heartbeat. + + Per-decky lifecycle deltas — master pivots them onto the matching + open DeckyLifecycle rows via the heartbeat handler. Errors are + captured and pushed as ``failed`` deltas; the task itself never + raises (a crashed task would just leave master rows wedged). + """ + from datetime import datetime, timezone + from decnet.agent.heartbeat import push_lifecycle_delta + + decky_names = [d.name for d in config.deckies] + try: + await deploy(config, dry_run=dry_run, no_cache=no_cache) + except Exception as exc: # noqa: BLE001 + log.exception("agent.deploy_async failed") + err = f"{type(exc).__name__}: {exc}" + deltas = [ + { + "decky_name": name, "operation": "deploy", + "status": "failed", "error": err[:2000], + "completed_at": datetime.now(timezone.utc).isoformat(), + } + for name in decky_names + ] + await push_lifecycle_delta(deltas) + return + deltas = [ + { + "decky_name": name, "operation": "deploy", + "status": "succeeded", + "completed_at": datetime.now(timezone.utc).isoformat(), + } + for name in decky_names + ] + await push_lifecycle_delta(deltas) + + +async def mutate_async(decky_id: str, services: list[str]) -> None: + """Background-task body for /mutate. Same shape as deploy_async: + perform the work, then push a single lifecycle delta on + completion (success or failure).""" + import time + from datetime import datetime, timezone + from decnet.composer import write_compose + from decnet.config import load_state, save_state + from decnet.engine import _compose_with_retry + from decnet.agent.heartbeat import push_lifecycle_delta + + def _delta(status: str, error: str | None = None) -> dict: + out = { + "decky_name": decky_id, "operation": "mutate", + "status": status, + "completed_at": datetime.now(timezone.utc).isoformat(), + } + if error is not None: + out["error"] = error[:2000] + return out + + try: + state = load_state() + if state is None: + await push_lifecycle_delta( + [_delta("failed", "no active deployment on this worker")], + ) + return + cfg, compose_path = state + decky = next((d for d in cfg.deckies if d.name == decky_id), None) + if decky is None: + await push_lifecycle_delta( + [_delta("failed", f"decky {decky_id!r} not found in worker state")], + ) + return + decky.services = list(services) + decky.last_mutated = time.time() + save_state(cfg, compose_path) + write_compose(cfg, compose_path) + await asyncio.to_thread( + _compose_with_retry, "up", "-d", "--remove-orphans", + compose_file=compose_path, + ) + except Exception as exc: # noqa: BLE001 + log.exception("agent.mutate_async failed decky=%s", decky_id) + err = f"{type(exc).__name__}: {exc}" + await push_lifecycle_delta([_delta("failed", err)]) + return + await push_lifecycle_delta([_delta("succeeded")]) + + async def teardown(decky_id: str | None = None) -> None: log.info("agent.teardown decky_id=%s", decky_id) await asyncio.to_thread(_deployer.teardown, decky_id) diff --git a/decnet/agent/heartbeat.py b/decnet/agent/heartbeat.py index b7e36e66..dbd731f8 100644 --- a/decnet/agent/heartbeat.py +++ b/decnet/agent/heartbeat.py @@ -50,7 +50,11 @@ def _resolve_agent_dir() -> pathlib.Path: return pki.DEFAULT_AGENT_DIR -async def _tick(client: httpx.AsyncClient, url: str, host_uuid: str, agent_version: str) -> None: +async def _build_body( + host_uuid: str, + agent_version: str, + lifecycle: Optional[list[dict]] = None, +) -> dict: snap = await _exec.status() body: dict = { "host_uuid": host_uuid, @@ -70,7 +74,13 @@ async def _tick(client: httpx.AsyncClient, url: str, host_uuid: str, agent_versi store.close() except Exception: log.debug("heartbeat: topology state unavailable", exc_info=True) + if lifecycle: + body["lifecycle"] = lifecycle + return body + +async def _tick(client: httpx.AsyncClient, url: str, host_uuid: str, agent_version: str) -> None: + body = await _build_body(host_uuid, agent_version) resp = await client.post(url, json=body) # 403 / 404 are terminal-ish — we still keep looping because an # operator may re-enrol the host mid-session, but we log loudly so @@ -134,6 +144,59 @@ def start() -> Optional[asyncio.Task]: return _task +async def push_lifecycle_delta(deltas: list[dict]) -> None: + """Fire a one-off heartbeat POST carrying *deltas* in the + ``lifecycle`` field. Each delta: ``{decky_name, operation, status, + error?, completed_at?}``. + + Called by the agent executor on /deploy and /mutate completion so + the master observes the terminal transition immediately rather than + waiting up to ``INTERVAL_S`` for the next scheduled tick. Failures + are logged and swallowed; the next scheduled heartbeat carries the + same deltas via DB-side reconciliation, since the worker has no + durable per-row state to lose. + """ + from decnet.env import ( + DECNET_HOST_UUID, + DECNET_MASTER_HOST, + DECNET_SWARMCTL_PORT, + ) + + if not deltas: + return + if not DECNET_HOST_UUID or not DECNET_MASTER_HOST: + log.debug("push_lifecycle_delta: identity unconfigured — skipping") + return + + agent_dir = _resolve_agent_dir() + try: + ssl_ctx = build_worker_ssl_context(agent_dir) + except Exception: + log.exception("push_lifecycle_delta: SSL context unavailable") + return + + try: + from decnet import __version__ as _v # type: ignore[attr-defined] + agent_version = _v + except Exception: + agent_version = "unknown" + + url = f"https://{DECNET_MASTER_HOST}:{DECNET_SWARMCTL_PORT}/swarm/heartbeat" + try: + async with httpx.AsyncClient(verify=ssl_ctx, timeout=_TIMEOUT) as client: + body = await _build_body( + DECNET_HOST_UUID, agent_version, lifecycle=deltas, + ) + resp = await client.post(url, json=body) + if resp.status_code not in (200, 204): + log.warning( + "lifecycle delta push rejected status=%d body=%s", + resp.status_code, resp.text[:200], + ) + except Exception: + log.exception("push_lifecycle_delta failed — next scheduled tick will retry") + + async def stop() -> None: global _task if _task is None: diff --git a/decnet/swarm/client.py b/decnet/swarm/client.py index bcb699d2..adb460c2 100644 --- a/decnet/swarm/client.py +++ b/decnet/swarm/client.py @@ -246,13 +246,11 @@ class AgentClient: "dry_run": dry_run, "no_cache": no_cache, } - # Swap in a long-deploy timeout for this call only. - old = self._require_client().timeout - self._require_client().timeout = _TIMEOUT_DEPLOY - try: - resp = await self._require_client().post("/deploy", json=body) - finally: - self._require_client().timeout = old + # Worker /deploy is async (202 fire-and-forget): the response only + # acks acceptance; the real work runs in the agent's event loop + # and reports terminal state via heartbeat lifecycle deltas. No + # need for the long deploy timeout here. + resp = await self._require_client().post("/deploy", json=body) resp.raise_for_status() return resp.json() @@ -268,14 +266,8 @@ class AgentClient: "services": list(services), "dry_run": dry_run, } - # Worker /mutate runs `compose up -d` which can pull/build; same - # long-tail latency as /deploy. Swap the deploy timeout in. - old = self._require_client().timeout - self._require_client().timeout = _TIMEOUT_DEPLOY - try: - resp = await self._require_client().post("/mutate", json=body) - finally: - self._require_client().timeout = old + # Worker /mutate is async (202): control-timeout is right. + resp = await self._require_client().post("/mutate", json=body) resp.raise_for_status() return resp.json() diff --git a/tests/swarm/test_agent_app.py b/tests/swarm/test_agent_app.py index 2ef78b3e..787f9bf7 100644 --- a/tests/swarm/test_agent_app.py +++ b/tests/swarm/test_agent_app.py @@ -65,80 +65,67 @@ def _seed_state(monkeypatch, tmp_path): return cell -def test_mutate_success(monkeypatch, tmp_path) -> None: - cell = _seed_state(monkeypatch, tmp_path) - compose_calls: list[tuple] = [] - write_compose_calls: list[tuple] = [] +def test_mutate_returns_202_and_spawns_task(monkeypatch, tmp_path) -> None: + _seed_state(monkeypatch, tmp_path) + spawned: list = [] + real_create_task = __import__("asyncio").create_task - monkeypatch.setattr( - "decnet.composer.write_compose", - lambda c, p: write_compose_calls.append((c, p)) or p, - ) - monkeypatch.setattr( - "decnet.engine._compose_with_retry", - lambda *a, **kw: compose_calls.append((a, kw)), - ) + def _capture_create_task(coro, **kw): + spawned.append(kw.get("name", "")) + # Run the coro so it doesn't leak as a never-awaited warning, + # but swap its body out for a no-op. + coro.close() + # Return something task-like for the handler. + async def _noop(): + return None + return real_create_task(_noop()) + + monkeypatch.setattr("decnet.agent.app.asyncio.create_task", _capture_create_task) client = TestClient(app) resp = client.post( "/mutate", json={"decky_id": "decky-01", "services": ["http", "ftp"]}, ) - assert resp.status_code == 200, resp.text + assert resp.status_code == 202, resp.text body = resp.json() - assert body == {"status": "mutated", "decky_id": "decky-01", "services": ["http", "ftp"]} - assert cell["cfg"].deckies[0].services == ["http", "ftp"] - assert cell["cfg"].deckies[0].last_mutated > 0 - assert len(write_compose_calls) == 1 - assert len(compose_calls) == 1 - assert compose_calls[0][0] == ("up", "-d", "--remove-orphans") + assert body == { + "status": "accepted", + "decky_id": "decky-01", + "services": ["http", "ftp"], + } + assert spawned and spawned[0].startswith("mutate-") -def test_mutate_unknown_decky_returns_404(monkeypatch, tmp_path) -> None: - _seed_state(monkeypatch, tmp_path) - compose_calls: list = [] - monkeypatch.setattr( - "decnet.engine._compose_with_retry", - lambda *a, **kw: compose_calls.append((a, kw)), - ) - - client = TestClient(app) - resp = client.post( - "/mutate", json={"decky_id": "ghost", "services": ["ssh"]}, - ) - assert resp.status_code == 404 - assert compose_calls == [] - - -def test_mutate_no_state_returns_404(monkeypatch) -> None: +def test_mutate_dry_run_404_when_no_state(monkeypatch) -> None: monkeypatch.setattr("decnet.config.load_state", lambda: None) client = TestClient(app) resp = client.post( - "/mutate", json={"decky_id": "decky-01", "services": ["ssh"]}, + "/mutate", + json={"decky_id": "decky-01", "services": ["ssh"], "dry_run": True}, ) assert resp.status_code == 404 -def test_mutate_dry_run_does_not_touch_docker_or_state(monkeypatch, tmp_path) -> None: - cell = _seed_state(monkeypatch, tmp_path) - saved: list = [] - written: list = [] - composed: list = [] +def test_mutate_dry_run_404_for_unknown_decky(monkeypatch, tmp_path) -> None: + _seed_state(monkeypatch, tmp_path) + client = TestClient(app) + resp = client.post( + "/mutate", + json={"decky_id": "ghost", "services": ["ssh"], "dry_run": True}, + ) + assert resp.status_code == 404 - monkeypatch.setattr( - "decnet.config.save_state", - lambda c, p: saved.append((c, p)), - ) - monkeypatch.setattr( - "decnet.composer.write_compose", - lambda c, p: written.append((c, p)), - ) + +def test_mutate_dry_run_returns_services_without_touching_docker( + monkeypatch, tmp_path, +) -> None: + _seed_state(monkeypatch, tmp_path) + composed: list = [] monkeypatch.setattr( "decnet.engine._compose_with_retry", lambda *a, **kw: composed.append((a, kw)), ) - - original_services = list(cell["cfg"].deckies[0].services) client = TestClient(app) resp = client.post( "/mutate", @@ -146,14 +133,39 @@ def test_mutate_dry_run_does_not_touch_docker_or_state(monkeypatch, tmp_path) -> ) assert resp.status_code == 200 assert resp.json()["status"] == "dry_run" - # No persistence, no compose render, no docker. - assert saved == [] - assert written == [] assert composed == [] - # State on the in-memory cell was touched (handler mutated the loaded - # DeckyConfig) but never persisted — load_state is shared by reference, - # so we only assert that no save/render happened above. - del original_services + + +def test_deploy_returns_202_and_spawns_task(monkeypatch) -> None: + from decnet.config import DecnetConfig, DeckyConfig + cfg = DecnetConfig( + mode="unihost", interface="eth0", + subnet="10.66.0.0/24", gateway="10.66.0.1", + deckies=[DeckyConfig( + name="decky-01", ip="10.66.0.10", + services=["ssh"], distro="debian", + base_image="debian:bookworm-slim", hostname="d01", + )], + ) + spawned: list = [] + real_create_task = __import__("asyncio").create_task + + def _capture_create_task(coro, **kw): + spawned.append(kw.get("name", "")) + coro.close() + async def _noop(): + return None + return real_create_task(_noop()) + + monkeypatch.setattr("decnet.agent.app.asyncio.create_task", _capture_create_task) + + client = TestClient(app) + resp = client.post("/deploy", json={"config": cfg.model_dump(mode="json")}) + assert resp.status_code == 202, resp.text + body = resp.json() + assert body["status"] == "accepted" + assert body["deckies"] == ["decky-01"] + assert spawned and spawned[0].startswith("deploy-") def test_deploy_rejects_malformed_body() -> None: diff --git a/tests/swarm/test_client_mutate.py b/tests/swarm/test_client_mutate.py index 777fd963..c3a2e0a9 100644 --- a/tests/swarm/test_client_mutate.py +++ b/tests/swarm/test_client_mutate.py @@ -142,8 +142,11 @@ async def test_client_mutate_unknown_decky_404( async with swarm_client.AgentClient( address="127.0.0.1", agent_port=port, identity=master_id, ) as agent: + # Only dry_run can surface 404 synchronously; the live path is + # 202 fire-and-forget and would surface failure via the + # heartbeat lifecycle delta. with pytest.raises(httpx.HTTPStatusError) as ei: - await agent.mutate("ghost", ["ssh"]) + await agent.mutate("ghost", ["ssh"], dry_run=True) assert ei.value.response.status_code == 404 finally: server.should_exit = True