feat(swarm): heartbeat handler applies lifecycle deltas
HeartbeatRequest grows an optional lifecycle field carrying per-decky
completion records from the worker:
[{decky_name, operation, status, error?, completed_at?}]
For each delta, the master finds the most-recently-started open
DeckyLifecycle row for (decky_name, operation, host_uuid) and flips
it to terminal with the worker's error text + timestamp. Stale
duplicates (row already sealed or never existed) are logged and
dropped -- not errors.
Each successful pivot also emits decky.<name>.lifecycle on the bus
so the dashboard sees the transition without waiting for its next
poll tick.
This is the master-side completion channel for the worker's 202
fire-and-forget /deploy and /mutate.
This commit is contained in:
@@ -38,6 +38,11 @@ class HeartbeatRequest(BaseModel):
|
||||
agent_version: Optional[str] = None
|
||||
status: dict[str, Any]
|
||||
topology: Optional[dict[str, Any]] = None
|
||||
# Per-decky lifecycle deltas the worker pushes on /deploy or /mutate
|
||||
# completion (success / failure). Master pivots each one onto the
|
||||
# matching open DeckyLifecycle row. Optional + best-effort: a missing
|
||||
# field is fine, an unknown decky_name is logged-and-dropped.
|
||||
lifecycle: Optional[list[dict[str, Any]]] = None
|
||||
|
||||
|
||||
def _extract_peer_fingerprint(scope: MutableMapping[str, Any]) -> Optional[str]:
|
||||
@@ -165,6 +170,90 @@ async def _reconcile_topology_report(
|
||||
log.exception("heartbeat: failed to flag resync tid=%s", tid)
|
||||
|
||||
|
||||
async def _apply_lifecycle_deltas(
|
||||
repo: BaseRepository,
|
||||
host_uuid: str,
|
||||
deltas: Optional[list[dict[str, Any]]],
|
||||
) -> None:
|
||||
"""Pivot worker-side completion records onto the open
|
||||
DeckyLifecycle rows for this host.
|
||||
|
||||
Each delta: ``{decky_name, operation, status, error?, completed_at?}``.
|
||||
We find the most-recently-started open row for that (decky_name,
|
||||
operation, host_uuid) and apply the terminal status + error. A
|
||||
missing open row is logged and dropped — the worker may be pushing
|
||||
a stale duplicate after a master sweep, or the row was already
|
||||
sealed by an earlier delta. Either way it's not an error.
|
||||
"""
|
||||
if not deltas:
|
||||
return
|
||||
from decnet.lifecycle.events import emit_lifecycle
|
||||
try:
|
||||
from decnet.bus.factory import get_bus
|
||||
bus = get_bus(client_name="swarm.heartbeat")
|
||||
except Exception:
|
||||
bus = None
|
||||
for delta in deltas:
|
||||
try:
|
||||
decky_name = str(delta["decky_name"])
|
||||
operation = str(delta["operation"])
|
||||
status = str(delta["status"])
|
||||
except (KeyError, TypeError):
|
||||
log.warning("heartbeat lifecycle: malformed delta host=%s payload=%s",
|
||||
host_uuid, delta)
|
||||
continue
|
||||
if status not in ("running", "succeeded", "failed"):
|
||||
log.warning(
|
||||
"heartbeat lifecycle: unexpected status=%r host=%s decky=%s",
|
||||
status, host_uuid, decky_name,
|
||||
)
|
||||
continue
|
||||
try:
|
||||
row = await repo.find_open_lifecycle(
|
||||
decky_name=decky_name, operation=operation, host_uuid=host_uuid,
|
||||
)
|
||||
except SQLAlchemyError:
|
||||
log.exception(
|
||||
"heartbeat lifecycle: find_open_lifecycle failed host=%s decky=%s",
|
||||
host_uuid, decky_name,
|
||||
)
|
||||
continue
|
||||
if row is None:
|
||||
log.info(
|
||||
"heartbeat lifecycle: no open row for host=%s decky=%s op=%s "
|
||||
"(stale duplicate or already-sealed)",
|
||||
host_uuid, decky_name, operation,
|
||||
)
|
||||
continue
|
||||
fields: dict[str, Any] = {"status": status}
|
||||
if delta.get("error") is not None:
|
||||
fields["error"] = str(delta["error"])[:2000]
|
||||
if delta.get("completed_at") is not None:
|
||||
try:
|
||||
fields["completed_at"] = datetime.fromisoformat(
|
||||
str(delta["completed_at"]),
|
||||
)
|
||||
except ValueError:
|
||||
# Bad timestamp from worker — let the repo stamp its own.
|
||||
pass
|
||||
try:
|
||||
await repo.update_lifecycle(row["id"], fields)
|
||||
except SQLAlchemyError:
|
||||
log.exception(
|
||||
"heartbeat lifecycle: update_lifecycle failed id=%s",
|
||||
row.get("id"),
|
||||
)
|
||||
continue
|
||||
await emit_lifecycle(
|
||||
bus,
|
||||
lifecycle_id=row["id"],
|
||||
decky_name=decky_name,
|
||||
operation=operation,
|
||||
status=status,
|
||||
error=fields.get("error"),
|
||||
)
|
||||
|
||||
|
||||
@router.post(
|
||||
"/heartbeat",
|
||||
status_code=204,
|
||||
@@ -190,6 +279,7 @@ async def heartbeat(
|
||||
)
|
||||
|
||||
await _reconcile_topology_report(repo, req.host_uuid, req.topology)
|
||||
await _apply_lifecycle_deltas(repo, req.host_uuid, req.lifecycle)
|
||||
|
||||
status_body = req.status or {}
|
||||
if not status_body.get("deployed"):
|
||||
|
||||
Reference in New Issue
Block a user