From e5e2bec3aac42dc79c89b450848b85b22995465f Mon Sep 17 00:00:00 2001 From: anti Date: Fri, 22 May 2026 16:33:48 -0400 Subject: [PATCH] feat(swarm): heartbeat handler applies lifecycle deltas HeartbeatRequest grows an optional lifecycle field carrying per-decky completion records from the worker: [{decky_name, operation, status, error?, completed_at?}] For each delta, the master finds the most-recently-started open DeckyLifecycle row for (decky_name, operation, host_uuid) and flips it to terminal with the worker's error text + timestamp. Stale duplicates (row already sealed or never existed) are logged and dropped -- not errors. Each successful pivot also emits decky..lifecycle on the bus so the dashboard sees the transition without waiting for its next poll tick. This is the master-side completion channel for the worker's 202 fire-and-forget /deploy and /mutate. --- decnet/web/router/swarm/api_heartbeat.py | 90 ++++++++++ tests/swarm/test_heartbeat_lifecycle.py | 202 +++++++++++++++++++++++ 2 files changed, 292 insertions(+) create mode 100644 tests/swarm/test_heartbeat_lifecycle.py diff --git a/decnet/web/router/swarm/api_heartbeat.py b/decnet/web/router/swarm/api_heartbeat.py index dfcb4e3a..b3d757fd 100644 --- a/decnet/web/router/swarm/api_heartbeat.py +++ b/decnet/web/router/swarm/api_heartbeat.py @@ -38,6 +38,11 @@ class HeartbeatRequest(BaseModel): agent_version: Optional[str] = None status: dict[str, Any] topology: Optional[dict[str, Any]] = None + # Per-decky lifecycle deltas the worker pushes on /deploy or /mutate + # completion (success / failure). Master pivots each one onto the + # matching open DeckyLifecycle row. Optional + best-effort: a missing + # field is fine, an unknown decky_name is logged-and-dropped. + lifecycle: Optional[list[dict[str, Any]]] = None def _extract_peer_fingerprint(scope: MutableMapping[str, Any]) -> Optional[str]: @@ -165,6 +170,90 @@ async def _reconcile_topology_report( log.exception("heartbeat: failed to flag resync tid=%s", tid) +async def _apply_lifecycle_deltas( + repo: BaseRepository, + host_uuid: str, + deltas: Optional[list[dict[str, Any]]], +) -> None: + """Pivot worker-side completion records onto the open + DeckyLifecycle rows for this host. + + Each delta: ``{decky_name, operation, status, error?, completed_at?}``. + We find the most-recently-started open row for that (decky_name, + operation, host_uuid) and apply the terminal status + error. A + missing open row is logged and dropped — the worker may be pushing + a stale duplicate after a master sweep, or the row was already + sealed by an earlier delta. Either way it's not an error. + """ + if not deltas: + return + from decnet.lifecycle.events import emit_lifecycle + try: + from decnet.bus.factory import get_bus + bus = get_bus(client_name="swarm.heartbeat") + except Exception: + bus = None + for delta in deltas: + try: + decky_name = str(delta["decky_name"]) + operation = str(delta["operation"]) + status = str(delta["status"]) + except (KeyError, TypeError): + log.warning("heartbeat lifecycle: malformed delta host=%s payload=%s", + host_uuid, delta) + continue + if status not in ("running", "succeeded", "failed"): + log.warning( + "heartbeat lifecycle: unexpected status=%r host=%s decky=%s", + status, host_uuid, decky_name, + ) + continue + try: + row = await repo.find_open_lifecycle( + decky_name=decky_name, operation=operation, host_uuid=host_uuid, + ) + except SQLAlchemyError: + log.exception( + "heartbeat lifecycle: find_open_lifecycle failed host=%s decky=%s", + host_uuid, decky_name, + ) + continue + if row is None: + log.info( + "heartbeat lifecycle: no open row for host=%s decky=%s op=%s " + "(stale duplicate or already-sealed)", + host_uuid, decky_name, operation, + ) + continue + fields: dict[str, Any] = {"status": status} + if delta.get("error") is not None: + fields["error"] = str(delta["error"])[:2000] + if delta.get("completed_at") is not None: + try: + fields["completed_at"] = datetime.fromisoformat( + str(delta["completed_at"]), + ) + except ValueError: + # Bad timestamp from worker — let the repo stamp its own. + pass + try: + await repo.update_lifecycle(row["id"], fields) + except SQLAlchemyError: + log.exception( + "heartbeat lifecycle: update_lifecycle failed id=%s", + row.get("id"), + ) + continue + await emit_lifecycle( + bus, + lifecycle_id=row["id"], + decky_name=decky_name, + operation=operation, + status=status, + error=fields.get("error"), + ) + + @router.post( "/heartbeat", status_code=204, @@ -190,6 +279,7 @@ async def heartbeat( ) await _reconcile_topology_report(repo, req.host_uuid, req.topology) + await _apply_lifecycle_deltas(repo, req.host_uuid, req.lifecycle) status_body = req.status or {} if not status_body.get("deployed"): diff --git a/tests/swarm/test_heartbeat_lifecycle.py b/tests/swarm/test_heartbeat_lifecycle.py new file mode 100644 index 00000000..482caee7 --- /dev/null +++ b/tests/swarm/test_heartbeat_lifecycle.py @@ -0,0 +1,202 @@ +"""POST /swarm/heartbeat — lifecycle delta application. + +Worker pushes one or more ``lifecycle`` deltas in the heartbeat body +on /deploy or /mutate completion; the master must pivot each delta +onto the matching open DeckyLifecycle row and flip it to terminal. +""" +from __future__ import annotations + +import asyncio +import pathlib +from typing import Any + +import pytest +from fastapi.testclient import TestClient + +from decnet.web.db.factory import get_repository +from decnet.web.dependencies import get_repo +from decnet.web.router.swarm import api_heartbeat as hb_mod + + +@pytest.fixture +def ca_dir(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: + ca = tmp_path / "ca" + from decnet.swarm import pki + from decnet.swarm import client as swarm_client + from decnet.web.router.swarm import api_enroll_host as enroll_mod + monkeypatch.setattr(pki, "DEFAULT_CA_DIR", ca) + monkeypatch.setattr(swarm_client, "pki", pki) + monkeypatch.setattr(enroll_mod, "pki", pki) + return ca + + +@pytest.fixture +def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch): + r = get_repository(db_path=str(tmp_path / "hb.db")) + import decnet.web.dependencies as deps + import decnet.web.swarm_api as swarm_api_mod + monkeypatch.setattr(deps, "repo", r) + monkeypatch.setattr(swarm_api_mod, "repo", r) + return r + + +@pytest.fixture +def client(repo, ca_dir: pathlib.Path): + from decnet.web.swarm_api import app + async def _override() -> Any: + return repo + app.dependency_overrides[get_repo] = _override + with TestClient(app) as c: + yield c + app.dependency_overrides.clear() + + +def _enroll(client: TestClient, name: str = "worker-a") -> dict: + resp = client.post( + "/swarm/enroll", + json={"name": name, "address": "10.0.0.5", "agent_port": 8765}, + ) + assert resp.status_code == 201, resp.text + return resp.json() + + +def _pin(monkeypatch: pytest.MonkeyPatch, fp: str | None) -> None: + monkeypatch.setattr(hb_mod, "_extract_peer_fingerprint", lambda scope: fp) + + +def _hb_body(host_uuid: str, lifecycle: list[dict] | None = None) -> dict: + body: dict = { + "host_uuid": host_uuid, + "agent_version": "1.0", + "status": {"deployed": False, "deckies": []}, + } + if lifecycle is not None: + body["lifecycle"] = lifecycle + return body + + +def test_lifecycle_delta_flips_open_row_to_succeeded( + client: TestClient, repo, monkeypatch: pytest.MonkeyPatch, +) -> None: + host = _enroll(client) + _pin(monkeypatch, host["fingerprint"]) + + async def _seed_row() -> str: + return await repo.create_lifecycle({ + "decky_name": "decky-01", + "host_uuid": host["host_uuid"], + "operation": "deploy", + "status": "running", + }) + lid = asyncio.run(_seed_row()) + + resp = client.post("/swarm/heartbeat", json=_hb_body( + host["host_uuid"], + lifecycle=[{ + "decky_name": "decky-01", + "operation": "deploy", + "status": "succeeded", + }], + )) + assert resp.status_code == 204, resp.text + + async def _verify() -> None: + rows = await repo.get_lifecycle_by_ids([lid]) + assert rows[0]["status"] == "succeeded" + assert rows[0]["completed_at"] is not None + asyncio.run(_verify()) + + +def test_lifecycle_delta_carries_error_on_failure( + client: TestClient, repo, monkeypatch: pytest.MonkeyPatch, +) -> None: + host = _enroll(client) + _pin(monkeypatch, host["fingerprint"]) + + async def _seed() -> str: + return await repo.create_lifecycle({ + "decky_name": "decky-01", + "host_uuid": host["host_uuid"], + "operation": "mutate", + "status": "running", + }) + lid = asyncio.run(_seed()) + + resp = client.post("/swarm/heartbeat", json=_hb_body( + host["host_uuid"], + lifecycle=[{ + "decky_name": "decky-01", + "operation": "mutate", + "status": "failed", + "error": "compose blew up", + }], + )) + assert resp.status_code == 204 + + async def _verify() -> None: + rows = await repo.get_lifecycle_by_ids([lid]) + assert rows[0]["status"] == "failed" + assert rows[0]["error"] == "compose blew up" + assert rows[0]["completed_at"] is not None + asyncio.run(_verify()) + + +def test_lifecycle_delta_without_open_row_is_silent_noop( + client: TestClient, repo, monkeypatch: pytest.MonkeyPatch, +) -> None: + """Stale duplicate (e.g. master swept the row out before the delta + arrived) must not error the heartbeat.""" + host = _enroll(client) + _pin(monkeypatch, host["fingerprint"]) + + resp = client.post("/swarm/heartbeat", json=_hb_body( + host["host_uuid"], + lifecycle=[{ + "decky_name": "ghost-decky", + "operation": "deploy", + "status": "succeeded", + }], + )) + assert resp.status_code == 204 + + +def test_lifecycle_delta_only_matches_same_host( + client: TestClient, repo, monkeypatch: pytest.MonkeyPatch, +) -> None: + """A delta from host A must not flip an open row that belongs to host B.""" + host_a = _enroll(client, name="worker-a") + host_b = _enroll(client, name="worker-b") + _pin(monkeypatch, host_a["fingerprint"]) + + async def _seed() -> str: + return await repo.create_lifecycle({ + "decky_name": "decky-01", + "host_uuid": host_b["host_uuid"], + "operation": "deploy", + "status": "running", + }) + lid_b = asyncio.run(_seed()) + + resp = client.post("/swarm/heartbeat", json=_hb_body( + host_a["host_uuid"], + lifecycle=[{ + "decky_name": "decky-01", + "operation": "deploy", + "status": "succeeded", + }], + )) + assert resp.status_code == 204 + + async def _verify() -> None: + rows = await repo.get_lifecycle_by_ids([lid_b]) + assert rows[0]["status"] == "running" # untouched + asyncio.run(_verify()) + + +def test_heartbeat_without_lifecycle_field_still_works( + client: TestClient, repo, monkeypatch: pytest.MonkeyPatch, +) -> None: + host = _enroll(client) + _pin(monkeypatch, host["fingerprint"]) + resp = client.post("/swarm/heartbeat", json=_hb_body(host["host_uuid"])) + assert resp.status_code == 204