feat(agent,forwarder,updater): publish system.<worker>.health heartbeats (DEBT-031 workers 7-9)

All three workers now share a run_health_heartbeat helper in
decnet.bus.publish.  Each publishes system.<worker>.health on a 30s tick
with {worker, ts} plus optional per-worker extras.  Subscribers can
watch system.*.health to see every DECNET worker on a host at once.

- agent: heartbeat runs inside the FastAPI lifespan alongside the
  existing master-facing heartbeat; bus-disabled path is a no-op.
- forwarder: heartbeat task spawned at run_forwarder entry, cancelled
  in the finally block so a crashed master loop never leaks the task.
- updater: new FastAPI lifespan hosts the heartbeat.

Heartbeat helper swallows extra() failures and is cancellation-safe so
lifespan teardown never hangs on it.
This commit is contained in:
2026-04-21 17:02:10 -04:00
parent cbb394a160
commit 5c0631e12c
5 changed files with 240 additions and 0 deletions

View File

@@ -27,9 +27,13 @@ from typing import Any, Optional
from fastapi import FastAPI, HTTPException from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
import contextlib
from decnet.agent import executor as _exec from decnet.agent import executor as _exec
from decnet.agent import heartbeat as _heartbeat from decnet.agent import heartbeat as _heartbeat
from decnet.agent import topology_ops as _topology_ops from decnet.agent import topology_ops as _topology_ops
from decnet.bus.factory import get_bus
from decnet.bus.publish import run_health_heartbeat
from decnet.swarm.pki import DEFAULT_AGENT_DIR from decnet.swarm.pki import DEFAULT_AGENT_DIR
from decnet.agent.topology_store import AlreadyApplied, TopologyStore from decnet.agent.topology_store import AlreadyApplied, TopologyStore
from decnet.config import DecnetConfig from decnet.config import DecnetConfig
@@ -95,15 +99,45 @@ def _ensure_collector_started() -> None:
log.info("agent log collector started log_file=%s", DECNET_AGENT_LOG_FILE) log.info("agent log collector started log_file=%s", DECNET_AGENT_LOG_FILE)
_bus_heartbeat_task: Optional[asyncio.Task] = None
@asynccontextmanager @asynccontextmanager
async def _lifespan(app: FastAPI): async def _lifespan(app: FastAPI):
# Best-effort: if identity/bundle plumbing isn't configured (e.g. dev # Best-effort: if identity/bundle plumbing isn't configured (e.g. dev
# runs or non-enrolled hosts), heartbeat.start() is a silent no-op. # runs or non-enrolled hosts), heartbeat.start() is a silent no-op.
_heartbeat.start() _heartbeat.start()
# Host-local bus heartbeat (system.agent.health). Separate channel
# from the mTLS master-facing heartbeat above; this one lets peers on
# the same host (dashboard, updater) see the agent is alive without
# hitting its HTTPS endpoint. Bus-disabled path is a no-op loop.
bus = None
try:
bus = get_bus(client_name="agent")
await bus.connect()
except Exception as exc: # noqa: BLE001
log.warning("agent: bus unavailable, skipping health heartbeat: %s", exc)
bus = None
global _bus_heartbeat_task
_bus_heartbeat_task = asyncio.create_task(
run_health_heartbeat(bus, "agent"),
name="agent-bus-heartbeat",
)
try: try:
yield yield
finally: finally:
await _heartbeat.stop() await _heartbeat.stop()
if _bus_heartbeat_task is not None:
_bus_heartbeat_task.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception):
await _bus_heartbeat_task
_bus_heartbeat_task = None
if bus is not None:
with contextlib.suppress(Exception):
await bus.close()
global _collector_task global _collector_task
if _collector_task is not None and not _collector_task.done(): if _collector_task is not None and not _collector_task.done():
_collector_task.cancel() _collector_task.cancel()

View File

@@ -7,8 +7,11 @@ loop" guarantee is audited in exactly one place.
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import contextlib
import time
from typing import Any, Callable from typing import Any, Callable
from decnet.bus import topics as _topics
from decnet.bus.base import BaseBus from decnet.bus.base import BaseBus
from decnet.logging import get_logger from decnet.logging import get_logger
@@ -65,3 +68,34 @@ def make_thread_safe_publisher(
log.debug("cross-thread bus publish failed topic=%s: %s", topic, exc) log.debug("cross-thread bus publish failed topic=%s: %s", topic, exc)
return _publish return _publish
async def run_health_heartbeat(
bus: BaseBus | None,
worker: str,
*,
interval: float = 30.0,
extra: Callable[[], dict[str, Any]] | None = None,
) -> None:
"""Publish ``system.<worker>.health`` every *interval* seconds.
Standard heartbeat loop shared across agent/forwarder/updater. Emits
``{"worker": <name>, "ts": <unix-ts>, **extra()}`` on each tick. A
``None`` bus turns the loop into a no-op sleep cycle — still cancellable
so the caller can use the same ``asyncio.create_task``/``.cancel()``
pattern regardless of bus state.
Cancellation-safe: unwraps the ``CancelledError`` so callers awaiting
the task during shutdown see a clean exit.
"""
topic = _topics.system_health(worker)
with contextlib.suppress(asyncio.CancelledError):
while True:
payload: dict[str, Any] = {"worker": worker, "ts": time.time()}
if extra is not None:
try:
payload.update(extra())
except Exception as exc: # noqa: BLE001
log.debug("heartbeat extra() failed worker=%s: %s", worker, exc)
await publish_safely(bus, topic, payload, event_type=_topics.SYSTEM_HEALTH)
await asyncio.sleep(interval)

View File

@@ -21,6 +21,7 @@ losing the log tail, and vice versa.
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import contextlib
import os import os
import pathlib import pathlib
import sqlite3 import sqlite3
@@ -28,6 +29,8 @@ import ssl
from dataclasses import dataclass from dataclasses import dataclass
from typing import Optional from typing import Optional
from decnet.bus.factory import get_bus
from decnet.bus.publish import run_health_heartbeat
from decnet.logging import get_logger from decnet.logging import get_logger
from decnet.swarm import pki from decnet.swarm import pki
@@ -187,6 +190,22 @@ async def run_forwarder(
cfg.log_path, cfg.master_host, cfg.master_port, offset, cfg.log_path, cfg.master_host, cfg.master_port, offset,
) )
# Host-local bus heartbeat (system.forwarder.health). Peers on the
# same host can tail "is the log shipper alive" without hitting the
# master. Bus-disabled path is a no-op loop.
bus = None
try:
bus = get_bus(client_name="forwarder")
await bus.connect()
except Exception as exc: # noqa: BLE001
log.warning("forwarder: bus unavailable, skipping heartbeat: %s", exc)
bus = None
heartbeat_task = asyncio.create_task(
run_health_heartbeat(bus, "forwarder"),
name="forwarder-bus-heartbeat",
)
try: try:
while stop_event is None or not stop_event.is_set(): while stop_event is None or not stop_event.is_set():
try: try:
@@ -219,6 +238,12 @@ async def run_forwarder(
pass pass
backoff = min(_MAX_BACKOFF, backoff * 2) backoff = min(_MAX_BACKOFF, backoff * 2)
finally: finally:
heartbeat_task.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception):
await heartbeat_task
if bus is not None:
with contextlib.suppress(Exception):
await bus.close()
store.close() store.close()
log.info("forwarder stopped offset=%d", offset) log.info("forwarder stopped offset=%d", offset)

View File

@@ -9,24 +9,67 @@ only — agent certs are rejected).
""" """
from __future__ import annotations from __future__ import annotations
import asyncio
import contextlib
import os as _os import os as _os
import pathlib import pathlib
from contextlib import asynccontextmanager
from typing import Optional
from fastapi import FastAPI, File, Form, HTTPException, UploadFile from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from pydantic import BaseModel from pydantic import BaseModel
from decnet.bus.factory import get_bus
from decnet.bus.publish import run_health_heartbeat
from decnet.logging import get_logger from decnet.logging import get_logger
from decnet.swarm import pki from decnet.swarm import pki
from decnet.updater import executor as _exec from decnet.updater import executor as _exec
log = get_logger("updater.app") log = get_logger("updater.app")
_bus_heartbeat_task: Optional[asyncio.Task] = None
@asynccontextmanager
async def _lifespan(_app: FastAPI):
# Host-local bus heartbeat (system.updater.health). Lets the agent
# and dashboard tell "updater's up" without hitting the HTTPS port.
# Bus-disabled path is a no-op loop; the updater serves requests
# either way.
bus = None
try:
bus = get_bus(client_name="updater")
await bus.connect()
except Exception as exc: # noqa: BLE001
log.warning("updater: bus unavailable, skipping heartbeat: %s", exc)
bus = None
global _bus_heartbeat_task
_bus_heartbeat_task = asyncio.create_task(
run_health_heartbeat(bus, "updater"),
name="updater-bus-heartbeat",
)
try:
yield
finally:
if _bus_heartbeat_task is not None:
_bus_heartbeat_task.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception):
await _bus_heartbeat_task
_bus_heartbeat_task = None
if bus is not None:
with contextlib.suppress(Exception):
await bus.close()
app = FastAPI( app = FastAPI(
title="DECNET Self-Updater", title="DECNET Self-Updater",
version="0.1.0", version="0.1.0",
docs_url=None, docs_url=None,
redoc_url=None, redoc_url=None,
openapi_url=None, openapi_url=None,
lifespan=_lifespan,
) )

104
tests/bus/test_heartbeat.py Normal file
View File

@@ -0,0 +1,104 @@
"""Shared ``run_health_heartbeat`` helper (DEBT-031 workers 79).
Three workers (agent, forwarder, updater) publish identical
``system.<worker>.health`` heartbeats. Rather than copy the loop three
times, ``decnet.bus.publish.run_health_heartbeat`` carries it. These
tests pin:
* topic is ``system.<worker>.health`` via the builder;
* payload carries worker name and monotonic-ish timestamp;
* ``extra()`` hook merges per-worker fields;
* ``None`` bus yields a benign no-op loop (still cancellable);
* ``extra()`` failure doesn't break the tick.
"""
from __future__ import annotations
import asyncio
import pytest
import pytest_asyncio
from decnet.bus.fake import FakeBus
from decnet.bus.publish import run_health_heartbeat
@pytest_asyncio.fixture
async def bus() -> FakeBus:
b = FakeBus()
await b.connect()
yield b
await b.close()
@pytest.mark.asyncio
async def test_heartbeat_publishes_under_system_worker_health(bus: FakeBus) -> None:
task = asyncio.create_task(
run_health_heartbeat(bus, "agent", interval=0.05),
)
try:
sub = bus.subscribe("system.*.health")
async with sub:
event = await asyncio.wait_for(sub.__anext__(), timeout=2.0)
finally:
task.cancel()
await asyncio.gather(task, return_exceptions=True)
assert event.topic == "system.agent.health"
assert event.type == "health"
assert event.payload["worker"] == "agent"
assert isinstance(event.payload["ts"], float)
@pytest.mark.asyncio
async def test_heartbeat_merges_extra_payload(bus: FakeBus) -> None:
task = asyncio.create_task(
run_health_heartbeat(
bus, "forwarder", interval=0.05,
extra=lambda: {"offset": 4096, "connected": True},
),
)
try:
sub = bus.subscribe("system.forwarder.health")
async with sub:
event = await asyncio.wait_for(sub.__anext__(), timeout=2.0)
finally:
task.cancel()
await asyncio.gather(task, return_exceptions=True)
assert event.payload["offset"] == 4096
assert event.payload["connected"] is True
assert event.payload["worker"] == "forwarder"
@pytest.mark.asyncio
async def test_heartbeat_survives_extra_failure(bus: FakeBus) -> None:
# An extra() that blows up must not abort the heartbeat loop.
def _boom():
raise RuntimeError("extras exploded")
task = asyncio.create_task(
run_health_heartbeat(bus, "updater", interval=0.05, extra=_boom),
)
try:
sub = bus.subscribe("system.updater.health")
async with sub:
event = await asyncio.wait_for(sub.__anext__(), timeout=2.0)
finally:
task.cancel()
await asyncio.gather(task, return_exceptions=True)
# Base payload still present despite extra() blowing up.
assert event.payload["worker"] == "updater"
@pytest.mark.asyncio
async def test_heartbeat_is_cancellable_with_none_bus() -> None:
# Bus-disabled path: loop runs but publishes nothing. Must still
# cancel cleanly so lifespan teardown doesn't hang.
task = asyncio.create_task(
run_health_heartbeat(None, "agent", interval=0.01),
)
await asyncio.sleep(0.05)
task.cancel()
await asyncio.gather(task, return_exceptions=True)
assert task.done()