feat(api): GET /deckies/lifecycle + master startup sweep
GET /deckies/lifecycle?ids=<uuid>&ids=<uuid> returns the matching DeckyLifecycle rows so the wizard can poll instead of holding an HTTP request open across compose work. require_viewer gating -- read-only. Startup sweep: on master boot, any pending/running row with started_at older than 1h flips to failed with error='master restarted during operation'. Pre-v1 substitute for a durable task queue: if the master crashes mid-deploy, the wizard sees FAILED on refresh and the operator retries. Idempotent + cheap; runs unconditionally including in contract-test mode.
This commit is contained in:
@@ -109,6 +109,22 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
|||||||
log.error("DB failed to initialize after 5 attempts — startup may be degraded")
|
log.error("DB failed to initialize after 5 attempts — startup may be degraded")
|
||||||
await _retry_sleep(0.5)
|
await _retry_sleep(0.5)
|
||||||
|
|
||||||
|
# Sweep stranded DeckyLifecycle rows from a prior master crash.
|
||||||
|
# Anything older than 1h that's still pending/running can never
|
||||||
|
# complete (the runner task died with the process), so flip it to
|
||||||
|
# failed. Cheap DB op; runs unconditionally including contract-test
|
||||||
|
# mode (idempotent and observable in tests).
|
||||||
|
try:
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
cutoff = datetime.now(timezone.utc) - timedelta(hours=1)
|
||||||
|
swept = await repo.sweep_stale_lifecycle(
|
||||||
|
cutoff, reason="master restarted during operation",
|
||||||
|
)
|
||||||
|
if swept:
|
||||||
|
log.warning("API startup: swept %d stranded lifecycle row(s)", swept)
|
||||||
|
except Exception:
|
||||||
|
log.exception("API startup: lifecycle sweep failed (non-fatal)")
|
||||||
|
|
||||||
# Conditionally enable OpenTelemetry tracing
|
# Conditionally enable OpenTelemetry tracing
|
||||||
from decnet.telemetry import setup_tracing
|
from decnet.telemetry import setup_tracing
|
||||||
setup_tracing(app)
|
setup_tracing(app)
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ from .fleet.api_get_deckies import router as get_deckies_router
|
|||||||
from .fleet.api_mutate_decky import router as mutate_decky_router
|
from .fleet.api_mutate_decky import router as mutate_decky_router
|
||||||
from .fleet.api_mutate_interval import router as mutate_interval_router
|
from .fleet.api_mutate_interval import router as mutate_interval_router
|
||||||
from .fleet.api_deploy_deckies import router as deploy_deckies_router
|
from .fleet.api_deploy_deckies import router as deploy_deckies_router
|
||||||
|
from .fleet.api_lifecycle import router as lifecycle_router
|
||||||
from .stream.api_stream_events import router as stream_router
|
from .stream.api_stream_events import router as stream_router
|
||||||
from .attackers.api_get_attackers import router as attackers_router
|
from .attackers.api_get_attackers import router as attackers_router
|
||||||
from .attackers.api_export_attackers import router as attackers_export_router
|
from .attackers.api_export_attackers import router as attackers_export_router
|
||||||
@@ -106,6 +107,7 @@ api_router.include_router(get_deckies_router)
|
|||||||
api_router.include_router(mutate_decky_router)
|
api_router.include_router(mutate_decky_router)
|
||||||
api_router.include_router(mutate_interval_router)
|
api_router.include_router(mutate_interval_router)
|
||||||
api_router.include_router(deploy_deckies_router)
|
api_router.include_router(deploy_deckies_router)
|
||||||
|
api_router.include_router(lifecycle_router)
|
||||||
|
|
||||||
# Attacker Profiles
|
# Attacker Profiles
|
||||||
api_router.include_router(attackers_router)
|
api_router.include_router(attackers_router)
|
||||||
|
|||||||
41
decnet/web/router/fleet/api_lifecycle.py
Normal file
41
decnet/web/router/fleet/api_lifecycle.py
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
"""GET /deckies/lifecycle?ids=… — poll lifecycle rows by id.
|
||||||
|
|
||||||
|
The wizard polls this endpoint every ~2 s after POSTing /deckies/deploy
|
||||||
|
or /deckies/{name}/mutate (which return 202 with the lifecycle ids) and
|
||||||
|
stops when every row reaches a terminal status.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from fastapi import APIRouter, Depends, Query
|
||||||
|
|
||||||
|
from decnet.logging import get_logger
|
||||||
|
from decnet.telemetry import traced as _traced
|
||||||
|
from decnet.web.db.models import DeckyLifecycleListResponse, DeckyLifecycleView
|
||||||
|
from decnet.web.dependencies import require_viewer, repo
|
||||||
|
|
||||||
|
log = get_logger("api.lifecycle")
|
||||||
|
|
||||||
|
router = APIRouter()
|
||||||
|
|
||||||
|
|
||||||
|
@router.get(
|
||||||
|
"/deckies/lifecycle",
|
||||||
|
tags=["Fleet Management"],
|
||||||
|
response_model=DeckyLifecycleListResponse,
|
||||||
|
responses={
|
||||||
|
401: {"description": "Could not validate credentials"},
|
||||||
|
422: {"description": "Validation error (ids missing or malformed)"},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
@_traced("api.lifecycle_get")
|
||||||
|
async def api_get_lifecycle(
|
||||||
|
ids: list[str] = Query(
|
||||||
|
..., description="One or more DeckyLifecycle row ids; pass repeated ?ids=<uuid>&ids=<uuid> in the URL.",
|
||||||
|
min_length=1, max_length=200,
|
||||||
|
),
|
||||||
|
user: dict = Depends(require_viewer),
|
||||||
|
) -> dict:
|
||||||
|
rows = await repo.get_lifecycle_by_ids(ids)
|
||||||
|
return {
|
||||||
|
"rows": [DeckyLifecycleView(**r).model_dump() for r in rows],
|
||||||
|
}
|
||||||
85
tests/api/fleet/test_lifecycle_endpoint.py
Normal file
85
tests/api/fleet/test_lifecycle_endpoint.py
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
"""GET /deckies/lifecycle?ids=… — poll endpoint for the wizard."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from decnet.web.dependencies import repo
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_get_lifecycle_unauthenticated_returns_401(client: httpx.AsyncClient):
|
||||||
|
resp = await client.get("/api/v1/deckies/lifecycle", params={"ids": "x"})
|
||||||
|
assert resp.status_code == 401
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_get_lifecycle_missing_ids_returns_validation_error(
|
||||||
|
client: httpx.AsyncClient, auth_token: str,
|
||||||
|
):
|
||||||
|
"""No ?ids= → validation rejection (Starlette short-circuits with 400
|
||||||
|
for the body-parse path; either 400 or 422 is acceptable contract)."""
|
||||||
|
resp = await client.get(
|
||||||
|
"/api/v1/deckies/lifecycle",
|
||||||
|
headers={"Authorization": f"Bearer {auth_token}"},
|
||||||
|
)
|
||||||
|
assert resp.status_code in (400, 422)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_get_lifecycle_returns_matching_rows(
|
||||||
|
client: httpx.AsyncClient, auth_token: str,
|
||||||
|
):
|
||||||
|
a = await repo.create_lifecycle({"decky_name": "d1", "operation": "deploy"})
|
||||||
|
b = await repo.create_lifecycle({"decky_name": "d2", "operation": "mutate"})
|
||||||
|
await repo.update_lifecycle(a, {"status": "running"})
|
||||||
|
|
||||||
|
resp = await client.get(
|
||||||
|
"/api/v1/deckies/lifecycle",
|
||||||
|
params=[("ids", a), ("ids", b)],
|
||||||
|
headers={"Authorization": f"Bearer {auth_token}"},
|
||||||
|
)
|
||||||
|
assert resp.status_code == 200, resp.text
|
||||||
|
rows = resp.json()["rows"]
|
||||||
|
assert len(rows) == 2
|
||||||
|
by_id = {r["id"]: r for r in rows}
|
||||||
|
assert by_id[a]["status"] == "running"
|
||||||
|
assert by_id[a]["operation"] == "deploy"
|
||||||
|
assert by_id[b]["status"] == "pending"
|
||||||
|
assert by_id[b]["operation"] == "mutate"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_get_lifecycle_unknown_id_silently_omitted(
|
||||||
|
client: httpx.AsyncClient, auth_token: str,
|
||||||
|
):
|
||||||
|
a = await repo.create_lifecycle({"decky_name": "d1", "operation": "deploy"})
|
||||||
|
resp = await client.get(
|
||||||
|
"/api/v1/deckies/lifecycle",
|
||||||
|
params=[("ids", a), ("ids", "no-such-id")],
|
||||||
|
headers={"Authorization": f"Bearer {auth_token}"},
|
||||||
|
)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
rows = resp.json()["rows"]
|
||||||
|
assert len(rows) == 1
|
||||||
|
assert rows[0]["id"] == a
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_startup_sweep_marks_stale_rows_failed():
|
||||||
|
"""The sweep stamps reason='master restarted during operation' on
|
||||||
|
any non-terminal row older than the cutoff."""
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
lid = await repo.create_lifecycle({"decky_name": "stale", "operation": "deploy"})
|
||||||
|
# Backdate started_at into the past so the sweep picks it up.
|
||||||
|
await repo.update_lifecycle(
|
||||||
|
lid, {"started_at": datetime.now(timezone.utc) - timedelta(hours=2)},
|
||||||
|
)
|
||||||
|
swept = await repo.sweep_stale_lifecycle(
|
||||||
|
datetime.now(timezone.utc) - timedelta(hours=1),
|
||||||
|
reason="master restarted during operation",
|
||||||
|
)
|
||||||
|
assert swept >= 1
|
||||||
|
rows = await repo.get_lifecycle_by_ids([lid])
|
||||||
|
assert rows[0]["status"] == "failed"
|
||||||
|
assert rows[0]["error"] == "master restarted during operation"
|
||||||
Reference in New Issue
Block a user