diff --git a/decnet/web/api.py b/decnet/web/api.py index 7d8dff12..efe8483e 100644 --- a/decnet/web/api.py +++ b/decnet/web/api.py @@ -109,6 +109,22 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: log.error("DB failed to initialize after 5 attempts — startup may be degraded") await _retry_sleep(0.5) + # Sweep stranded DeckyLifecycle rows from a prior master crash. + # Anything older than 1h that's still pending/running can never + # complete (the runner task died with the process), so flip it to + # failed. Cheap DB op; runs unconditionally including contract-test + # mode (idempotent and observable in tests). + try: + from datetime import datetime, timedelta, timezone + cutoff = datetime.now(timezone.utc) - timedelta(hours=1) + swept = await repo.sweep_stale_lifecycle( + cutoff, reason="master restarted during operation", + ) + if swept: + log.warning("API startup: swept %d stranded lifecycle row(s)", swept) + except Exception: + log.exception("API startup: lifecycle sweep failed (non-fatal)") + # Conditionally enable OpenTelemetry tracing from decnet.telemetry import setup_tracing setup_tracing(app) diff --git a/decnet/web/router/__init__.py b/decnet/web/router/__init__.py index 096d2f84..fff144f5 100644 --- a/decnet/web/router/__init__.py +++ b/decnet/web/router/__init__.py @@ -12,6 +12,7 @@ from .fleet.api_get_deckies import router as get_deckies_router from .fleet.api_mutate_decky import router as mutate_decky_router from .fleet.api_mutate_interval import router as mutate_interval_router from .fleet.api_deploy_deckies import router as deploy_deckies_router +from .fleet.api_lifecycle import router as lifecycle_router from .stream.api_stream_events import router as stream_router from .attackers.api_get_attackers import router as attackers_router from .attackers.api_export_attackers import router as attackers_export_router @@ -106,6 +107,7 @@ api_router.include_router(get_deckies_router) api_router.include_router(mutate_decky_router) api_router.include_router(mutate_interval_router) api_router.include_router(deploy_deckies_router) +api_router.include_router(lifecycle_router) # Attacker Profiles api_router.include_router(attackers_router) diff --git a/decnet/web/router/fleet/api_lifecycle.py b/decnet/web/router/fleet/api_lifecycle.py new file mode 100644 index 00000000..aa3916db --- /dev/null +++ b/decnet/web/router/fleet/api_lifecycle.py @@ -0,0 +1,41 @@ +"""GET /deckies/lifecycle?ids=… — poll lifecycle rows by id. + +The wizard polls this endpoint every ~2 s after POSTing /deckies/deploy +or /deckies/{name}/mutate (which return 202 with the lifecycle ids) and +stops when every row reaches a terminal status. +""" +from __future__ import annotations + +from fastapi import APIRouter, Depends, Query + +from decnet.logging import get_logger +from decnet.telemetry import traced as _traced +from decnet.web.db.models import DeckyLifecycleListResponse, DeckyLifecycleView +from decnet.web.dependencies import require_viewer, repo + +log = get_logger("api.lifecycle") + +router = APIRouter() + + +@router.get( + "/deckies/lifecycle", + tags=["Fleet Management"], + response_model=DeckyLifecycleListResponse, + responses={ + 401: {"description": "Could not validate credentials"}, + 422: {"description": "Validation error (ids missing or malformed)"}, + }, +) +@_traced("api.lifecycle_get") +async def api_get_lifecycle( + ids: list[str] = Query( + ..., description="One or more DeckyLifecycle row ids; pass repeated ?ids=&ids= in the URL.", + min_length=1, max_length=200, + ), + user: dict = Depends(require_viewer), +) -> dict: + rows = await repo.get_lifecycle_by_ids(ids) + return { + "rows": [DeckyLifecycleView(**r).model_dump() for r in rows], + } diff --git a/tests/api/fleet/test_lifecycle_endpoint.py b/tests/api/fleet/test_lifecycle_endpoint.py new file mode 100644 index 00000000..e64997a9 --- /dev/null +++ b/tests/api/fleet/test_lifecycle_endpoint.py @@ -0,0 +1,85 @@ +"""GET /deckies/lifecycle?ids=… — poll endpoint for the wizard.""" +from __future__ import annotations + +import httpx +import pytest + +from decnet.web.dependencies import repo + + +@pytest.mark.anyio +async def test_get_lifecycle_unauthenticated_returns_401(client: httpx.AsyncClient): + resp = await client.get("/api/v1/deckies/lifecycle", params={"ids": "x"}) + assert resp.status_code == 401 + + +@pytest.mark.anyio +async def test_get_lifecycle_missing_ids_returns_validation_error( + client: httpx.AsyncClient, auth_token: str, +): + """No ?ids= → validation rejection (Starlette short-circuits with 400 + for the body-parse path; either 400 or 422 is acceptable contract).""" + resp = await client.get( + "/api/v1/deckies/lifecycle", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code in (400, 422) + + +@pytest.mark.anyio +async def test_get_lifecycle_returns_matching_rows( + client: httpx.AsyncClient, auth_token: str, +): + a = await repo.create_lifecycle({"decky_name": "d1", "operation": "deploy"}) + b = await repo.create_lifecycle({"decky_name": "d2", "operation": "mutate"}) + await repo.update_lifecycle(a, {"status": "running"}) + + resp = await client.get( + "/api/v1/deckies/lifecycle", + params=[("ids", a), ("ids", b)], + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 200, resp.text + rows = resp.json()["rows"] + assert len(rows) == 2 + by_id = {r["id"]: r for r in rows} + assert by_id[a]["status"] == "running" + assert by_id[a]["operation"] == "deploy" + assert by_id[b]["status"] == "pending" + assert by_id[b]["operation"] == "mutate" + + +@pytest.mark.anyio +async def test_get_lifecycle_unknown_id_silently_omitted( + client: httpx.AsyncClient, auth_token: str, +): + a = await repo.create_lifecycle({"decky_name": "d1", "operation": "deploy"}) + resp = await client.get( + "/api/v1/deckies/lifecycle", + params=[("ids", a), ("ids", "no-such-id")], + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 200 + rows = resp.json()["rows"] + assert len(rows) == 1 + assert rows[0]["id"] == a + + +@pytest.mark.anyio +async def test_startup_sweep_marks_stale_rows_failed(): + """The sweep stamps reason='master restarted during operation' on + any non-terminal row older than the cutoff.""" + from datetime import datetime, timedelta, timezone + lid = await repo.create_lifecycle({"decky_name": "stale", "operation": "deploy"}) + # Backdate started_at into the past so the sweep picks it up. + await repo.update_lifecycle( + lid, {"started_at": datetime.now(timezone.utc) - timedelta(hours=2)}, + ) + swept = await repo.sweep_stale_lifecycle( + datetime.now(timezone.utc) - timedelta(hours=1), + reason="master restarted during operation", + ) + assert swept >= 1 + rows = await repo.get_lifecycle_by_ids([lid]) + assert rows[0]["status"] == "failed" + assert rows[0]["error"] == "master restarted during operation"