diff --git a/decnet/agent/app.py b/decnet/agent/app.py index 7b71cf6f..8302314d 100644 --- a/decnet/agent/app.py +++ b/decnet/agent/app.py @@ -27,9 +27,13 @@ from typing import Any, Optional from fastapi import FastAPI, HTTPException from pydantic import BaseModel, Field +import contextlib + from decnet.agent import executor as _exec from decnet.agent import heartbeat as _heartbeat from decnet.agent import topology_ops as _topology_ops +from decnet.bus.factory import get_bus +from decnet.bus.publish import run_health_heartbeat from decnet.swarm.pki import DEFAULT_AGENT_DIR from decnet.agent.topology_store import AlreadyApplied, TopologyStore from decnet.config import DecnetConfig @@ -95,15 +99,45 @@ def _ensure_collector_started() -> None: log.info("agent log collector started log_file=%s", DECNET_AGENT_LOG_FILE) +_bus_heartbeat_task: Optional[asyncio.Task] = None + + @asynccontextmanager async def _lifespan(app: FastAPI): # Best-effort: if identity/bundle plumbing isn't configured (e.g. dev # runs or non-enrolled hosts), heartbeat.start() is a silent no-op. _heartbeat.start() + + # Host-local bus heartbeat (system.agent.health). Separate channel + # from the mTLS master-facing heartbeat above; this one lets peers on + # the same host (dashboard, updater) see the agent is alive without + # hitting its HTTPS endpoint. Bus-disabled path is a no-op loop. + bus = None + try: + bus = get_bus(client_name="agent") + await bus.connect() + except Exception as exc: # noqa: BLE001 + log.warning("agent: bus unavailable, skipping health heartbeat: %s", exc) + bus = None + + global _bus_heartbeat_task + _bus_heartbeat_task = asyncio.create_task( + run_health_heartbeat(bus, "agent"), + name="agent-bus-heartbeat", + ) + try: yield finally: await _heartbeat.stop() + if _bus_heartbeat_task is not None: + _bus_heartbeat_task.cancel() + with contextlib.suppress(asyncio.CancelledError, Exception): + await _bus_heartbeat_task + _bus_heartbeat_task = None + if bus is not None: + with contextlib.suppress(Exception): + await bus.close() global _collector_task if _collector_task is not None and not _collector_task.done(): _collector_task.cancel() diff --git a/decnet/bus/publish.py b/decnet/bus/publish.py index 4849d978..8d5e15aa 100644 --- a/decnet/bus/publish.py +++ b/decnet/bus/publish.py @@ -7,8 +7,11 @@ loop" guarantee is audited in exactly one place. from __future__ import annotations import asyncio +import contextlib +import time from typing import Any, Callable +from decnet.bus import topics as _topics from decnet.bus.base import BaseBus from decnet.logging import get_logger @@ -65,3 +68,34 @@ def make_thread_safe_publisher( log.debug("cross-thread bus publish failed topic=%s: %s", topic, exc) return _publish + + +async def run_health_heartbeat( + bus: BaseBus | None, + worker: str, + *, + interval: float = 30.0, + extra: Callable[[], dict[str, Any]] | None = None, +) -> None: + """Publish ``system..health`` every *interval* seconds. + + Standard heartbeat loop shared across agent/forwarder/updater. Emits + ``{"worker": , "ts": , **extra()}`` on each tick. A + ``None`` bus turns the loop into a no-op sleep cycle — still cancellable + so the caller can use the same ``asyncio.create_task``/``.cancel()`` + pattern regardless of bus state. + + Cancellation-safe: unwraps the ``CancelledError`` so callers awaiting + the task during shutdown see a clean exit. + """ + topic = _topics.system_health(worker) + with contextlib.suppress(asyncio.CancelledError): + while True: + payload: dict[str, Any] = {"worker": worker, "ts": time.time()} + if extra is not None: + try: + payload.update(extra()) + except Exception as exc: # noqa: BLE001 + log.debug("heartbeat extra() failed worker=%s: %s", worker, exc) + await publish_safely(bus, topic, payload, event_type=_topics.SYSTEM_HEALTH) + await asyncio.sleep(interval) diff --git a/decnet/swarm/log_forwarder.py b/decnet/swarm/log_forwarder.py index 0a873431..0e3d129f 100644 --- a/decnet/swarm/log_forwarder.py +++ b/decnet/swarm/log_forwarder.py @@ -21,6 +21,7 @@ losing the log tail, and vice versa. from __future__ import annotations import asyncio +import contextlib import os import pathlib import sqlite3 @@ -28,6 +29,8 @@ import ssl from dataclasses import dataclass from typing import Optional +from decnet.bus.factory import get_bus +from decnet.bus.publish import run_health_heartbeat from decnet.logging import get_logger from decnet.swarm import pki @@ -187,6 +190,22 @@ async def run_forwarder( cfg.log_path, cfg.master_host, cfg.master_port, offset, ) + # Host-local bus heartbeat (system.forwarder.health). Peers on the + # same host can tail "is the log shipper alive" without hitting the + # master. Bus-disabled path is a no-op loop. + bus = None + try: + bus = get_bus(client_name="forwarder") + await bus.connect() + except Exception as exc: # noqa: BLE001 + log.warning("forwarder: bus unavailable, skipping heartbeat: %s", exc) + bus = None + + heartbeat_task = asyncio.create_task( + run_health_heartbeat(bus, "forwarder"), + name="forwarder-bus-heartbeat", + ) + try: while stop_event is None or not stop_event.is_set(): try: @@ -219,6 +238,12 @@ async def run_forwarder( pass backoff = min(_MAX_BACKOFF, backoff * 2) finally: + heartbeat_task.cancel() + with contextlib.suppress(asyncio.CancelledError, Exception): + await heartbeat_task + if bus is not None: + with contextlib.suppress(Exception): + await bus.close() store.close() log.info("forwarder stopped offset=%d", offset) diff --git a/decnet/updater/app.py b/decnet/updater/app.py index 5c5d8795..91cbd0e2 100644 --- a/decnet/updater/app.py +++ b/decnet/updater/app.py @@ -9,24 +9,67 @@ only — agent certs are rejected). """ from __future__ import annotations +import asyncio +import contextlib import os as _os import pathlib +from contextlib import asynccontextmanager +from typing import Optional from fastapi import FastAPI, File, Form, HTTPException, UploadFile from pydantic import BaseModel +from decnet.bus.factory import get_bus +from decnet.bus.publish import run_health_heartbeat from decnet.logging import get_logger from decnet.swarm import pki from decnet.updater import executor as _exec log = get_logger("updater.app") + +_bus_heartbeat_task: Optional[asyncio.Task] = None + + +@asynccontextmanager +async def _lifespan(_app: FastAPI): + # Host-local bus heartbeat (system.updater.health). Lets the agent + # and dashboard tell "updater's up" without hitting the HTTPS port. + # Bus-disabled path is a no-op loop; the updater serves requests + # either way. + bus = None + try: + bus = get_bus(client_name="updater") + await bus.connect() + except Exception as exc: # noqa: BLE001 + log.warning("updater: bus unavailable, skipping heartbeat: %s", exc) + bus = None + + global _bus_heartbeat_task + _bus_heartbeat_task = asyncio.create_task( + run_health_heartbeat(bus, "updater"), + name="updater-bus-heartbeat", + ) + try: + yield + finally: + if _bus_heartbeat_task is not None: + _bus_heartbeat_task.cancel() + with contextlib.suppress(asyncio.CancelledError, Exception): + await _bus_heartbeat_task + _bus_heartbeat_task = None + if bus is not None: + with contextlib.suppress(Exception): + await bus.close() + + app = FastAPI( title="DECNET Self-Updater", version="0.1.0", docs_url=None, redoc_url=None, openapi_url=None, + lifespan=_lifespan, ) diff --git a/tests/bus/test_heartbeat.py b/tests/bus/test_heartbeat.py new file mode 100644 index 00000000..5d4fcba4 --- /dev/null +++ b/tests/bus/test_heartbeat.py @@ -0,0 +1,104 @@ +"""Shared ``run_health_heartbeat`` helper (DEBT-031 workers 7–9). + +Three workers (agent, forwarder, updater) publish identical +``system..health`` heartbeats. Rather than copy the loop three +times, ``decnet.bus.publish.run_health_heartbeat`` carries it. These +tests pin: + +* topic is ``system..health`` via the builder; +* payload carries worker name and monotonic-ish timestamp; +* ``extra()`` hook merges per-worker fields; +* ``None`` bus yields a benign no-op loop (still cancellable); +* ``extra()`` failure doesn't break the tick. +""" +from __future__ import annotations + +import asyncio + +import pytest +import pytest_asyncio + +from decnet.bus.fake import FakeBus +from decnet.bus.publish import run_health_heartbeat + + +@pytest_asyncio.fixture +async def bus() -> FakeBus: + b = FakeBus() + await b.connect() + yield b + await b.close() + + +@pytest.mark.asyncio +async def test_heartbeat_publishes_under_system_worker_health(bus: FakeBus) -> None: + task = asyncio.create_task( + run_health_heartbeat(bus, "agent", interval=0.05), + ) + try: + sub = bus.subscribe("system.*.health") + async with sub: + event = await asyncio.wait_for(sub.__anext__(), timeout=2.0) + finally: + task.cancel() + await asyncio.gather(task, return_exceptions=True) + + assert event.topic == "system.agent.health" + assert event.type == "health" + assert event.payload["worker"] == "agent" + assert isinstance(event.payload["ts"], float) + + +@pytest.mark.asyncio +async def test_heartbeat_merges_extra_payload(bus: FakeBus) -> None: + task = asyncio.create_task( + run_health_heartbeat( + bus, "forwarder", interval=0.05, + extra=lambda: {"offset": 4096, "connected": True}, + ), + ) + try: + sub = bus.subscribe("system.forwarder.health") + async with sub: + event = await asyncio.wait_for(sub.__anext__(), timeout=2.0) + finally: + task.cancel() + await asyncio.gather(task, return_exceptions=True) + + assert event.payload["offset"] == 4096 + assert event.payload["connected"] is True + assert event.payload["worker"] == "forwarder" + + +@pytest.mark.asyncio +async def test_heartbeat_survives_extra_failure(bus: FakeBus) -> None: + # An extra() that blows up must not abort the heartbeat loop. + def _boom(): + raise RuntimeError("extras exploded") + + task = asyncio.create_task( + run_health_heartbeat(bus, "updater", interval=0.05, extra=_boom), + ) + try: + sub = bus.subscribe("system.updater.health") + async with sub: + event = await asyncio.wait_for(sub.__anext__(), timeout=2.0) + finally: + task.cancel() + await asyncio.gather(task, return_exceptions=True) + + # Base payload still present despite extra() blowing up. + assert event.payload["worker"] == "updater" + + +@pytest.mark.asyncio +async def test_heartbeat_is_cancellable_with_none_bus() -> None: + # Bus-disabled path: loop runs but publishes nothing. Must still + # cancel cleanly so lifespan teardown doesn't hang. + task = asyncio.create_task( + run_health_heartbeat(None, "agent", interval=0.01), + ) + await asyncio.sleep(0.05) + task.cancel() + await asyncio.gather(task, return_exceptions=True) + assert task.done()