From bf01804736dc1ae49207ac0f83dd942103fb57dc Mon Sep 17 00:00:00 2001 From: anti Date: Sun, 19 Apr 2026 21:49:34 -0400 Subject: [PATCH] feat(agent): periodic heartbeat loop posting status to swarmctl MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New decnet.agent.heartbeat asyncio loop wired into the agent FastAPI lifespan. Every 30 s the worker POSTs executor.status() to the master's /swarm/heartbeat with its DECNET_HOST_UUID for self-identity; the existing agent mTLS bundle provides the client cert the master pins against SwarmHost.client_cert_fingerprint. start() is a silent no-op when identity env (HOST_UUID, MASTER_HOST) is unset or the worker bundle is missing, so dev runs and un-enrolled hosts don't crash the agent app. On non-204 responses the loop logs loudly but keeps ticking — an operator may re-enrol mid-session, and fail-closed pinning shouldn't be self-silencing. --- decnet/agent/app.py | 15 ++++ decnet/agent/heartbeat.py | 134 ++++++++++++++++++++++++++++ tests/swarm/test_agent_heartbeat.py | 122 +++++++++++++++++++++++++ 3 files changed, 271 insertions(+) create mode 100644 decnet/agent/heartbeat.py create mode 100644 tests/swarm/test_agent_heartbeat.py diff --git a/decnet/agent/app.py b/decnet/agent/app.py index ce1213d..bb72dff 100644 --- a/decnet/agent/app.py +++ b/decnet/agent/app.py @@ -18,23 +18,38 @@ Endpoints mirror the existing unihost CLI verbs: """ from __future__ import annotations +from contextlib import asynccontextmanager from typing import Optional from fastapi import FastAPI, HTTPException from pydantic import BaseModel, Field from decnet.agent import executor as _exec +from decnet.agent import heartbeat as _heartbeat from decnet.config import DecnetConfig from decnet.logging import get_logger log = get_logger("agent.app") + +@asynccontextmanager +async def _lifespan(app: FastAPI): + # Best-effort: if identity/bundle plumbing isn't configured (e.g. dev + # runs or non-enrolled hosts), heartbeat.start() is a silent no-op. + _heartbeat.start() + try: + yield + finally: + await _heartbeat.stop() + + app = FastAPI( title="DECNET SWARM Agent", version="0.1.0", docs_url=None, # no interactive docs on worker — narrow attack surface redoc_url=None, openapi_url=None, + lifespan=_lifespan, ) diff --git a/decnet/agent/heartbeat.py b/decnet/agent/heartbeat.py new file mode 100644 index 0000000..bbc00aa --- /dev/null +++ b/decnet/agent/heartbeat.py @@ -0,0 +1,134 @@ +"""Agent → master liveness heartbeat loop. + +Every ``INTERVAL_S`` seconds the worker posts ``executor.status()`` to +``POST /swarm/heartbeat`` over mTLS. The master pins the +presented client cert's SHA-256 against the ``SwarmHost`` row for the +claimed ``host_uuid``; a match refreshes ``last_heartbeat`` + each +``DeckyShard``'s snapshot + runtime state. + +Identity comes from ``/etc/decnet/decnet.ini`` (seeded by the enroll +bundle) — specifically ``DECNET_HOST_UUID`` and ``DECNET_MASTER_HOST``. +The worker's existing ``~/.decnet/agent/`` bundle (or +``/etc/decnet/agent/``) provides the mTLS client cert. + +Started/stopped via the agent FastAPI app's lifespan. If identity +plumbing is missing (pre-enrollment dev runs) the loop logs at DEBUG and +declines to start — callers don't have to guard it. +""" +from __future__ import annotations + +import asyncio +import pathlib +from typing import Optional + +import httpx + +from decnet.agent import executor as _exec +from decnet.logging import get_logger +from decnet.swarm import pki +from decnet.swarm.log_forwarder import build_worker_ssl_context + +log = get_logger("agent.heartbeat") + +INTERVAL_S = 30.0 +_TIMEOUT = httpx.Timeout(connect=5.0, read=10.0, write=5.0, pool=5.0) + +_task: Optional[asyncio.Task] = None + + +def _resolve_agent_dir() -> pathlib.Path: + """Match the agent-dir resolution order used by the agent server: + DECNET_AGENT_DIR env, else /etc/decnet/agent (production install), + else ~/.decnet/agent (dev).""" + import os + env = os.environ.get("DECNET_AGENT_DIR") + if env: + return pathlib.Path(env) + system = pathlib.Path("/etc/decnet/agent") + if system.exists(): + return system + return pki.DEFAULT_AGENT_DIR + + +async def _tick(client: httpx.AsyncClient, url: str, host_uuid: str, agent_version: str) -> None: + snap = await _exec.status() + resp = await client.post( + url, + json={ + "host_uuid": host_uuid, + "agent_version": agent_version, + "status": snap, + }, + ) + # 403 / 404 are terminal-ish — we still keep looping because an + # operator may re-enrol the host mid-session, but we log loudly so + # prod ops can spot cert-pinning drift. + if resp.status_code == 204: + return + log.warning( + "heartbeat rejected status=%d body=%s", + resp.status_code, resp.text[:200], + ) + + +async def _loop(url: str, host_uuid: str, agent_version: str, ssl_ctx) -> None: + log.info("heartbeat loop starting url=%s host_uuid=%s interval=%ss", + url, host_uuid, INTERVAL_S) + async with httpx.AsyncClient(verify=ssl_ctx, timeout=_TIMEOUT) as client: + while True: + try: + await _tick(client, url, host_uuid, agent_version) + except asyncio.CancelledError: + raise + except Exception: + log.exception("heartbeat tick failed — will retry in %ss", INTERVAL_S) + await asyncio.sleep(INTERVAL_S) + + +def start() -> Optional[asyncio.Task]: + """Kick off the background heartbeat task. No-op if identity is + unconfigured (dev mode) — the caller doesn't need to check.""" + global _task + from decnet.env import ( + DECNET_HOST_UUID, + DECNET_MASTER_HOST, + DECNET_SWARMCTL_PORT, + ) + + if _task is not None and not _task.done(): + return _task + if not DECNET_HOST_UUID or not DECNET_MASTER_HOST: + log.debug("heartbeat not starting — DECNET_HOST_UUID or DECNET_MASTER_HOST unset") + return None + + agent_dir = _resolve_agent_dir() + try: + ssl_ctx = build_worker_ssl_context(agent_dir) + except Exception: + log.exception("heartbeat not starting — worker SSL context unavailable at %s", agent_dir) + return None + + try: + from decnet import __version__ as _v + agent_version = _v + except Exception: + agent_version = "unknown" + + url = f"https://{DECNET_MASTER_HOST}:{DECNET_SWARMCTL_PORT}/swarm/heartbeat" + _task = asyncio.create_task( + _loop(url, DECNET_HOST_UUID, agent_version, ssl_ctx), + name="agent-heartbeat", + ) + return _task + + +async def stop() -> None: + global _task + if _task is None: + return + _task.cancel() + try: + await _task + except (asyncio.CancelledError, Exception): + pass + _task = None diff --git a/tests/swarm/test_agent_heartbeat.py b/tests/swarm/test_agent_heartbeat.py new file mode 100644 index 0000000..103f939 --- /dev/null +++ b/tests/swarm/test_agent_heartbeat.py @@ -0,0 +1,122 @@ +"""Tests for the worker-side heartbeat loop (decnet.agent.heartbeat).""" +from __future__ import annotations + +import asyncio +from typing import Any + +import httpx +import pytest + +from decnet.agent import heartbeat as hb + + +@pytest.fixture(autouse=True) +def _reset_module_task(monkeypatch: pytest.MonkeyPatch): + # Each test gets a fresh _task slot so start()/stop() state doesn't + # leak between cases. + monkeypatch.setattr(hb, "_task", None) + yield + monkeypatch.setattr(hb, "_task", None) + + +class _StubTransport(httpx.AsyncBaseTransport): + """Record each POST and respond according to ``responder(req)``.""" + def __init__(self, responder): + self.calls: list[dict[str, Any]] = [] + self._responder = responder + + async def handle_async_request(self, request: httpx.Request) -> httpx.Response: + body = request.read() + self.calls.append({"url": str(request.url), "body": body}) + return self._responder(request) + + +@pytest.mark.asyncio +async def test_tick_posts_status_snapshot_and_accepts_204(monkeypatch) -> None: + async def fake_status() -> dict: + return {"deployed": False, "deckies": []} + + monkeypatch.setattr(hb._exec, "status", fake_status) + + transport = _StubTransport(lambda req: httpx.Response(204)) + async with httpx.AsyncClient(transport=transport) as client: + await hb._tick(client, "https://m/swarm/heartbeat", "uuid-a", "1.2.3") + + assert len(transport.calls) == 1 + import json + payload = json.loads(transport.calls[0]["body"]) + assert payload["host_uuid"] == "uuid-a" + assert payload["agent_version"] == "1.2.3" + assert payload["status"]["deployed"] is False + + +@pytest.mark.asyncio +async def test_tick_logs_on_non_204_response(monkeypatch, caplog) -> None: + async def fake_status() -> dict: + return {"deployed": False} + + monkeypatch.setattr(hb._exec, "status", fake_status) + transport = _StubTransport(lambda req: httpx.Response(403, text="mismatch")) + + async with httpx.AsyncClient(transport=transport) as client: + with caplog.at_level("WARNING", logger="agent.heartbeat"): + await hb._tick(client, "https://m/swarm/heartbeat", "uuid-a", "1.2.3") + + assert any("rejected" in rec.getMessage() for rec in caplog.records) + + +def test_start_is_noop_when_identity_missing(monkeypatch) -> None: + # Neither DECNET_HOST_UUID nor DECNET_MASTER_HOST set → start() must + # return None, never raise. Dev runs exercise this path every time. + import decnet.env as env + monkeypatch.setattr(env, "DECNET_HOST_UUID", None) + monkeypatch.setattr(env, "DECNET_MASTER_HOST", None) + assert hb.start() is None + assert hb._task is None + + +@pytest.mark.asyncio +async def test_start_is_noop_when_ssl_context_unavailable( + monkeypatch, tmp_path +) -> None: + # Identity plumbed, but worker bundle missing on disk → start() logs + # and bails instead of crashing the FastAPI app. + import decnet.env as env + monkeypatch.setattr(env, "DECNET_HOST_UUID", "uuid-a") + monkeypatch.setattr(env, "DECNET_MASTER_HOST", "master.lan") + monkeypatch.setattr(env, "DECNET_SWARMCTL_PORT", 8770) + monkeypatch.setenv("DECNET_AGENT_DIR", str(tmp_path / "empty")) + assert hb.start() is None + + +@pytest.mark.asyncio +async def test_loop_keeps_ticking_after_5xx_failures(monkeypatch) -> None: + # Simulates a flapping master: first two ticks raise/5xx, third succeeds. + # The loop must not crash — it must sleep and retry. + call_count = {"n": 0} + + def _responder(req): + call_count["n"] += 1 + if call_count["n"] < 3: + return httpx.Response(503, text="unavailable") + return httpx.Response(204) + + async def fake_status() -> dict: + return {"deployed": False} + + monkeypatch.setattr(hb._exec, "status", fake_status) + monkeypatch.setattr(hb, "INTERVAL_S", 0.01) # fast-forward the sleep + + transport = _StubTransport(_responder) + + async def _run(): + async with httpx.AsyncClient(transport=transport) as client: + while call_count["n"] < 3: + try: + await hb._tick(client, "https://m/swarm/heartbeat", "uuid-a", "1.2.3") + except Exception: + pass + await asyncio.sleep(0.01) + + await asyncio.wait_for(_run(), timeout=2.0) + assert call_count["n"] >= 3