From 0fbb07c2ec6cd68b0db8846c8e0eb8e2e7733cb2 Mon Sep 17 00:00:00 2001 From: anti Date: Wed, 22 Apr 2026 14:10:39 -0400 Subject: [PATCH] feat(workers): bus-backed Workers panel (registry, control, installed flag) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ships the backend half of Config → Workers: * Worker registry aggregates `system.*.health` + `system.bus.health` heartbeats into a last-seen dict; OK / STALE / UNKNOWN tiers drop out of a 90s window (3× the 30s heartbeat interval). * `GET /api/v1/workers` returns the snapshot plus `bus_connected` (so the UI can explain "all UNKNOWN" when the bus socket is down) and a per-row `installed` flag populated from `systemctl list-unit-files decnet-*.service` (cached 30s). * `POST /api/v1/workers/{name}/stop` publishes a stop intent on `system..control`; workers listen via the shared control listener in `bus/publish.py`. * Heartbeat + control listener wired into collector / profiler / sniffer / prober / mutator worker loops. API self-heartbeats too so the panel always has one ground-truth row. * Topic helper `system_control(name)` + tests covering builder validation, control listener shutdown path, and the API surface (auth gating, bus-connected field, unknown-name 404). Adds `StartFailure` / `StartAllResponse` models in anticipation of the upcoming start endpoints (DEBT-034). --- decnet/bus/publish.py | 102 +++++++++ decnet/bus/topics.py | 27 +++ decnet/collector/worker.py | 18 +- decnet/mutator/engine.py | 6 + decnet/prober/worker.py | 22 +- decnet/profiler/worker.py | 28 ++- decnet/sniffer/worker.py | 19 +- decnet/web/api.py | 27 ++- decnet/web/db/models.py | 48 ++++- decnet/web/router/__init__.py | 4 + decnet/web/router/workers/__init__.py | 0 .../web/router/workers/api_control_worker.py | 76 +++++++ decnet/web/router/workers/api_list_workers.py | 35 +++ decnet/web/worker_registry.py | 202 ++++++++++++++++++ tests/api/workers/__init__.py | 0 tests/api/workers/test_workers_api.py | 171 +++++++++++++++ tests/bus/test_control_listener.py | 77 +++++++ tests/bus/test_topics.py | 11 + 18 files changed, 863 insertions(+), 10 deletions(-) create mode 100644 decnet/web/router/workers/__init__.py create mode 100644 decnet/web/router/workers/api_control_worker.py create mode 100644 decnet/web/router/workers/api_list_workers.py create mode 100644 decnet/web/worker_registry.py create mode 100644 tests/api/workers/__init__.py create mode 100644 tests/api/workers/test_workers_api.py create mode 100644 tests/bus/test_control_listener.py diff --git a/decnet/bus/publish.py b/decnet/bus/publish.py index 8d5e15aa..dfe3276d 100644 --- a/decnet/bus/publish.py +++ b/decnet/bus/publish.py @@ -8,6 +8,8 @@ from __future__ import annotations import asyncio import contextlib +import os +import signal import time from typing import Any, Callable @@ -99,3 +101,103 @@ async def run_health_heartbeat( log.debug("heartbeat extra() failed worker=%s: %s", worker, exc) await publish_safely(bus, topic, payload, event_type=_topics.SYSTEM_HEALTH) await asyncio.sleep(interval) + + +async def run_control_listener( + bus: BaseBus | None, + worker: str, + shutdown: asyncio.Event, +) -> None: + """Subscribe to ``system..control`` and honour stop intents. + + On a well-formed ``{"action": "stop", ...}`` message the function sets + *shutdown* and returns — the worker's main loop is expected to check + the event and unwind cleanly, matching the SIGTERM path. + + Malformed payloads (missing/unknown action, non-dict, exception from + the transport) are logged and ignored. A ``None`` bus yields a noop + coroutine that simply awaits *shutdown* — callers can ``create_task`` + this unconditionally regardless of bus state. + + Cancellation-safe. + """ + if bus is None: + with contextlib.suppress(asyncio.CancelledError): + await shutdown.wait() + return + + topic = _topics.system_control(worker) + with contextlib.suppress(asyncio.CancelledError): + try: + async with bus.subscribe(topic) as sub: + async for event in sub: + payload = event.payload or {} + action = payload.get("action") + requested_by = payload.get("requested_by", "") + if action == _topics.WORKER_CONTROL_STOP: + log.info( + "control: stop requested worker=%s by=%s", + worker, requested_by, + ) + shutdown.set() + return + log.debug( + "control: ignoring unknown action worker=%s action=%r", + worker, action, + ) + except Exception as exc: # noqa: BLE001 + log.warning( + "control listener failed worker=%s: %s — shutdown via bus disabled", + worker, exc, + ) + + +async def run_control_listener_signal( + bus: BaseBus | None, + worker: str, +) -> None: + """Like :func:`run_control_listener` but signals the process on stop. + + Preferred for workers whose main loop is a blocking thread + (container-log tail, PTY read, scapy sniff) — wiring an + ``asyncio.Event`` through the thread boundary is error-prone, and + every DECNET worker already has systemd-equivalent SIGTERM cleanup. + A SIGTERM self-signal routes the stop through that same path + without inventing a second shutdown mechanism. + + Cancellation-safe. Never raises: a failed self-signal is logged + and the loop simply exits (admin can fall back to ``systemctl``). + """ + if bus is None: + return + + topic = _topics.system_control(worker) + with contextlib.suppress(asyncio.CancelledError): + try: + async with bus.subscribe(topic) as sub: + async for event in sub: + payload = event.payload or {} + action = payload.get("action") + requested_by = payload.get("requested_by", "") + if action == _topics.WORKER_CONTROL_STOP: + log.info( + "control: stop requested worker=%s by=%s → SIGTERM self", + worker, requested_by, + ) + try: + os.kill(os.getpid(), signal.SIGTERM) + except Exception as exc: # noqa: BLE001 + log.warning( + "control: self-signal failed worker=%s: %s", + worker, exc, + ) + return + log.debug( + "control: ignoring unknown action worker=%s action=%r", + worker, action, + ) + except Exception as exc: # noqa: BLE001 + log.warning( + "control signal listener failed worker=%s: %s", + worker, exc, + ) diff --git a/decnet/bus/topics.py b/decnet/bus/topics.py index af51d4b0..5f6900b7 100644 --- a/decnet/bus/topics.py +++ b/decnet/bus/topics.py @@ -82,6 +82,16 @@ SYSTEM_BUS_HEALTH = "bus.health" # :func:`system_health`. The leaf constant stays the same across workers; # the worker name goes in the middle token. SYSTEM_HEALTH = "health" +# Worker-control leaf — built per-worker as ``system..control`` via +# :func:`system_control`. Admin-originated stop intents travel on this +# topic; each worker subscribes to its own. +SYSTEM_CONTROL = "control" + +# Control payload ``action`` values — the wire vocabulary. Only ``stop`` is +# handled in v1; ``start`` is reserved because a stopped worker has no +# subscriber, so starting requires external supervision (systemd). +WORKER_CONTROL_STOP = "stop" +WORKER_CONTROL_START = "start" # ─── Builders ──────────────────────────────────────────────────────────────── @@ -152,6 +162,23 @@ def system_health(worker: str) -> str: return f"{SYSTEM}.{worker}.{SYSTEM_HEALTH}" +def system_control(worker: str) -> str: + """Build ``system..control``. + + Admin-originated stop (and, eventually, start) intents are published + here; the worker in question subscribes to its own address and reacts. + Payload shape:: + + {"action": "stop", "requested_by": "", "ts": } + + *action* must be one of :data:`WORKER_CONTROL_STOP` / + :data:`WORKER_CONTROL_START`; any other value is ignored by the + listener. Same segment rules as :func:`system_health`. + """ + _reject_tokens(worker) + return f"{SYSTEM}.{worker}.{SYSTEM_CONTROL}" + + def _reject_tokens(*parts: str) -> None: """Reject topic segments that would break NATS-style tokenization. diff --git a/decnet/collector/worker.py b/decnet/collector/worker.py index 582dc608..633f2f7f 100644 --- a/decnet/collector/worker.py +++ b/decnet/collector/worker.py @@ -20,7 +20,11 @@ from typing import Any, Callable, Optional from decnet.bus import topics as _topics from decnet.bus.factory import get_bus -from decnet.bus.publish import make_thread_safe_publisher +from decnet.bus.publish import ( + make_thread_safe_publisher, + run_control_listener_signal, + run_health_heartbeat, +) from decnet.logging import get_logger from decnet.telemetry import traced as _traced, get_tracer as _get_tracer, inject_context as _inject_ctx @@ -416,6 +420,14 @@ async def log_collector_worker(log_file: str) -> None: _publish_log = _make_system_log_publisher(bus, loop) + # Workers panel health heartbeat + bus-driven stop control. The + # heartbeat beacons on system.collector.health every 30s; the + # control listener translates a bus stop intent into a SIGTERM to + # this process (collector's main loop is a blocking thread pool, so + # self-signalling is cleaner than threading a shutdown event). + heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "collector")) + control_task = asyncio.create_task(run_control_listener_signal(bus, "collector")) + # Dedicated thread pool so long-running container log streams don't # saturate the default asyncio executor and starve short-lived # to_thread() calls elsewhere (e.g. load_state in the web API). @@ -465,6 +477,10 @@ async def log_collector_worker(log_file: str) -> None: logger.error("collector error: %s", exc) finally: collector_pool.shutdown(wait=False) + for t in (heartbeat_task, control_task): + t.cancel() + with contextlib.suppress(Exception, asyncio.CancelledError): + await t if bus is not None: with contextlib.suppress(Exception): await bus.close() diff --git a/decnet/mutator/engine.py b/decnet/mutator/engine.py index 3e839c03..1424f7f3 100644 --- a/decnet/mutator/engine.py +++ b/decnet/mutator/engine.py @@ -27,6 +27,7 @@ from decnet.bus.base import BaseBus from decnet.bus.factory import get_bus from decnet.bus.publish import ( publish_safely as _publish_safely, + run_control_listener_signal as _run_control_listener_signal, run_health_heartbeat as _run_health_heartbeat, ) from decnet.mutator.events import MutationTrigger, emit_decky_mutated @@ -332,6 +333,11 @@ async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) -> heartbeat_task = asyncio.create_task( _run_health_heartbeat(bus, "mutator"), ) + # Control listener: SIGTERM-based so the existing shutdown path + # (cancel wake_tasks + heartbeat_task) runs unchanged. + wake_tasks.append(asyncio.create_task( + _run_control_listener_signal(bus, "mutator"), + )) except Exception as exc: # noqa: BLE001 log.warning("mutator: bus unavailable, running in poll-only mode: %s", exc) diff --git a/decnet/prober/worker.py b/decnet/prober/worker.py index 5be89954..cdf6e6cb 100644 --- a/decnet/prober/worker.py +++ b/decnet/prober/worker.py @@ -30,7 +30,11 @@ from typing import Any, Callable from decnet.bus import topics as _topics from decnet.bus.base import BaseBus from decnet.bus.factory import get_bus -from decnet.bus.publish import make_thread_safe_publisher +from decnet.bus.publish import ( + make_thread_safe_publisher, + run_control_listener, + run_health_heartbeat, +) from decnet.logging import get_logger from decnet.prober.hassh import hassh_server from decnet.prober.jarm import JARM_EMPTY_HASH, jarm_hash @@ -519,8 +523,13 @@ async def prober_worker( event_type, ) + shutdown = asyncio.Event() + heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "prober")) + control_task = asyncio.create_task( + run_control_listener(bus, "prober", shutdown), + ) try: - while True: + while not shutdown.is_set(): # Discover new attacker IPs from the log stream new_ips, log_position = await asyncio.to_thread( _discover_attackers, json_path, log_position, @@ -542,8 +551,15 @@ async def prober_worker( _publish_attacker, ) - await asyncio.sleep(interval) + try: + await asyncio.wait_for(shutdown.wait(), timeout=interval) + except asyncio.TimeoutError: + pass finally: + for t in (heartbeat_task, control_task): + t.cancel() + with contextlib.suppress(Exception, asyncio.CancelledError): + await t if bus is not None: with contextlib.suppress(Exception): await bus.close() diff --git a/decnet/profiler/worker.py b/decnet/profiler/worker.py index 26c7b89d..d33f4767 100644 --- a/decnet/profiler/worker.py +++ b/decnet/profiler/worker.py @@ -21,7 +21,11 @@ from typing import Any, Callable from decnet.bus import topics as _topics from decnet.bus.factory import get_bus -from decnet.bus.publish import make_thread_safe_publisher +from decnet.bus.publish import ( + make_thread_safe_publisher, + run_control_listener, + run_health_heartbeat, +) from decnet.correlation.engine import CorrelationEngine from decnet.correlation.parser import LogEvent from decnet.logging import get_logger @@ -87,14 +91,32 @@ async def attacker_profile_worker(repo: BaseRepository, *, interval: int = 30) - state.last_log_id = _saved_cursor.get("last_log_id", 0) state.initialized = True logger.info("attacker worker: resumed from cursor last_log_id=%d", state.last_log_id) + + # Workers panel wiring: heartbeat + bus-driven stop. Main loop is + # pure asyncio sleep/await, so an event-based control listener + # drops in cleanly without a SIGTERM self-signal. + shutdown = asyncio.Event() + heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "profiler")) + control_task = asyncio.create_task( + run_control_listener(bus, "profiler", shutdown), + ) try: - while True: - await asyncio.sleep(interval) + while not shutdown.is_set(): + try: + await asyncio.wait_for(shutdown.wait(), timeout=interval) + except asyncio.TimeoutError: + pass # normal tick + if shutdown.is_set(): + break try: await _incremental_update(repo, state) except Exception as exc: logger.error("attacker worker: update failed: %s", exc) finally: + for t in (heartbeat_task, control_task): + t.cancel() + with contextlib.suppress(Exception, asyncio.CancelledError): + await t if bus is not None: with contextlib.suppress(Exception): await bus.close() diff --git a/decnet/sniffer/worker.py b/decnet/sniffer/worker.py index 82565104..1aea5822 100644 --- a/decnet/sniffer/worker.py +++ b/decnet/sniffer/worker.py @@ -22,7 +22,11 @@ from typing import Any, Callable from decnet.bus import topics as _topics from decnet.bus.base import BaseBus from decnet.bus.factory import get_bus -from decnet.bus.publish import make_thread_safe_publisher +from decnet.bus.publish import ( + make_thread_safe_publisher, + run_control_listener_signal, + run_health_heartbeat, +) from decnet.logging import get_logger from decnet.network import HOST_IPVLAN_IFACE, HOST_MACVLAN_IFACE from decnet.sniffer.fingerprint import SnifferEngine @@ -198,6 +202,15 @@ async def sniffer_worker(log_file: str) -> None: if bus is not None: publish_fn = _make_decky_traffic_publisher(bus, loop) + # Workers panel: heartbeat + SIGTERM-based stop control. The + # sniff loop is a blocking scapy thread, so an asyncio shutdown + # event can't reach it — translating the bus stop into SIGTERM + # routes through the existing CancelledError path below. + heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "sniffer")) + control_task = asyncio.create_task( + run_control_listener_signal(bus, "sniffer"), + ) + # Dedicated thread pool so the long-running sniff loop doesn't # occupy a slot in the default asyncio executor. sniffer_pool = ThreadPoolExecutor( @@ -216,6 +229,10 @@ async def sniffer_worker(log_file: str) -> None: raise finally: sniffer_pool.shutdown(wait=False) + for t in (heartbeat_task, control_task): + t.cancel() + with contextlib.suppress(Exception, asyncio.CancelledError): + await t if bus is not None: with contextlib.suppress(Exception): await bus.close() diff --git a/decnet/web/api.py b/decnet/web/api.py index 20151387..88863df9 100644 --- a/decnet/web/api.py +++ b/decnet/web/api.py @@ -30,6 +30,7 @@ ingestion_task: Optional[asyncio.Task[Any]] = None collector_task: Optional[asyncio.Task[Any]] = None attacker_task: Optional[asyncio.Task[Any]] = None sniffer_task: Optional[asyncio.Task[Any]] = None +heartbeat_task: Optional[asyncio.Task[Any]] = None def get_background_tasks() -> dict[str, Optional[asyncio.Task[Any]]]: @@ -45,6 +46,7 @@ def get_background_tasks() -> dict[str, Optional[asyncio.Task[Any]]]: @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: global ingestion_task, collector_task, attacker_task, sniffer_task + global heartbeat_task import resource soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) @@ -115,10 +117,33 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: else: log.info("Contract Test Mode: skipping background worker startup") + # Worker registry + API self-heartbeat — always on, even under + # contract-test mode, so the Workers panel can render the process + # without the dev needing to run a full stack. A missing bus turns + # both into no-ops inside the helpers. + try: + from decnet.bus.app import get_app_bus + from decnet.bus.publish import run_health_heartbeat + from decnet.web.worker_registry import get_registry + + _bus = await get_app_bus() + await get_registry().start(_bus) + if heartbeat_task is None or heartbeat_task.done(): + heartbeat_task = asyncio.create_task( + run_health_heartbeat(_bus, "api"), + ) + except Exception as exc: # noqa: BLE001 + log.warning("worker registry bootstrap failed: %s", exc) + yield log.info("API shutdown cancelling background tasks") - for task in (ingestion_task, collector_task, attacker_task, sniffer_task): + try: + from decnet.web.worker_registry import get_registry + await get_registry().stop() + except Exception as exc: # noqa: BLE001 + log.warning("worker registry stop raised: %s", exc) + for task in (ingestion_task, collector_task, attacker_task, sniffer_task, heartbeat_task): if task and not task.done(): task.cancel() try: diff --git a/decnet/web/db/models.py b/decnet/web/db/models.py index fd90f119..85684949 100644 --- a/decnet/web/db/models.py +++ b/decnet/web/db/models.py @@ -1,5 +1,5 @@ from datetime import datetime, timezone -from typing import Literal, Optional, Any, List, Annotated +from typing import Dict, Literal, Optional, Any, List, Annotated from uuid import uuid4 from sqlalchemy import Column, Index, Text, UniqueConstraint from sqlalchemy.dialects.mysql import MEDIUMTEXT @@ -452,6 +452,52 @@ class HealthResponse(BaseModel): components: dict[str, ComponentHealth] +# --- Workers panel (Config → Workers) --- +# Bus-backed health + control: workers heartbeat on ``system..health`` +# and listen on ``system..control``. The API aggregates last-seen +# heartbeats via the worker registry; these are the HTTP-facing shapes. + +class WorkerStatus(BaseModel): + name: str + # ``ok`` — heartbeat within 90s (3× 30s heartbeat interval) + # ``stale`` — worker was seen before but hasn't pulsed in 90s+ + # ``unknown`` — we've never received a heartbeat from this name + status: Literal["ok", "stale", "unknown"] + last_heartbeat_ts: Optional[float] = None + seconds_since: Optional[float] = None + # Whatever the worker's ``extra()`` callback put in the heartbeat; + # opaque to the panel, displayed only if the UI knows the key. + extra: Dict[str, Any] = PydanticField(default_factory=dict) + # True iff a ``decnet-.service`` unit file is present on the + # host. False flips the UI START button to disabled with a + # "Unit not installed" tooltip. Default True for backwards compat + # on clients that pre-date the field. + installed: bool = True + + +class WorkersResponse(BaseModel): + workers: List[WorkerStatus] + generated_at: float + bus_connected: bool + + +class WorkerControlResponse(BaseModel): + accepted: bool + worker: str + action: str + + +class StartFailure(BaseModel): + name: str + reason: str + + +class StartAllResponse(BaseModel): + started: List[str] + already_running: List[str] + failed: List[StartFailure] + + # --- Swarm API DTOs --- # Request/response contracts for the master-side swarm controller # (decnet/web/swarm_api.py). The underlying SQLModel tables — SwarmHost and diff --git a/decnet/web/router/__init__.py b/decnet/web/router/__init__.py index 5fb6d993..6cbff07c 100644 --- a/decnet/web/router/__init__.py +++ b/decnet/web/router/__init__.py @@ -22,6 +22,8 @@ from .config.api_update_config import router as config_update_router from .config.api_manage_users import router as config_users_router from .config.api_reinit import router as config_reinit_router from .health.api_get_health import router as health_router +from .workers.api_list_workers import router as workers_list_router +from .workers.api_control_worker import router as workers_control_router from .artifacts.api_get_artifact import router as artifacts_router from .swarm_updates import swarm_updates_router from .swarm_mgmt import swarm_mgmt_router @@ -69,6 +71,8 @@ api_router.include_router(attacker_transcripts_router) api_router.include_router(stats_router) api_router.include_router(stream_router) api_router.include_router(health_router) +api_router.include_router(workers_list_router) +api_router.include_router(workers_control_router) # Configuration api_router.include_router(config_get_router) diff --git a/decnet/web/router/workers/__init__.py b/decnet/web/router/workers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/decnet/web/router/workers/api_control_worker.py b/decnet/web/router/workers/api_control_worker.py new file mode 100644 index 00000000..fcd56df7 --- /dev/null +++ b/decnet/web/router/workers/api_control_worker.py @@ -0,0 +1,76 @@ +import time + +from fastapi import APIRouter, Depends, HTTPException, status +from fastapi.responses import ORJSONResponse + +from decnet.bus import topics as _topics +from decnet.bus.app import get_app_bus +from decnet.bus.publish import publish_safely +from decnet.logging import get_logger +from decnet.telemetry import traced as _traced +from decnet.web.db.models import WorkerControlResponse +from decnet.web.dependencies import require_admin +from decnet.web.worker_registry import KNOWN_WORKERS + +log = get_logger("api") + +router = APIRouter() + + +@router.post( + "/workers/{name}/stop", + tags=["Observability"], + responses={ + 202: {"model": WorkerControlResponse, "description": "Stop intent queued on bus"}, + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + 404: {"description": "Unknown worker"}, + 503: {"description": "Bus unavailable"}, + }, +) +@_traced("api.stop_worker") +async def stop_worker( + name: str, + admin: dict = Depends(require_admin), +) -> ORJSONResponse: + """Publish a stop intent on ``system..control``. + + Fire-and-forget: the endpoint does not wait for the worker to + actually exit — the caller observes the status row in the Workers + panel flipping to ``stale`` as heartbeats stop. Consistent with the + rest of the bus contract (at-most-once, DB is source of truth for + any persistent state; the bus is the notification layer). + """ + if name not in KNOWN_WORKERS: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Unknown worker: {name!r}", + ) + + bus = await get_app_bus() + if bus is None: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="bus unavailable", + ) + topic = _topics.system_control(name) + payload = { + "action": _topics.WORKER_CONTROL_STOP, + "requested_by": admin.get("username") or admin.get("sub") or "admin", + "ts": time.time(), + } + await publish_safely(bus, topic, payload, event_type=_topics.SYSTEM_CONTROL) + log.info( + "workers: stop requested worker=%s by=%s", + name, payload["requested_by"], + ) + + body = WorkerControlResponse( + accepted=True, + worker=name, + action=_topics.WORKER_CONTROL_STOP, + ) + return ORJSONResponse( + content=body.model_dump(), + status_code=status.HTTP_202_ACCEPTED, + ) diff --git a/decnet/web/router/workers/api_list_workers.py b/decnet/web/router/workers/api_list_workers.py new file mode 100644 index 00000000..cf2bf5fe --- /dev/null +++ b/decnet/web/router/workers/api_list_workers.py @@ -0,0 +1,35 @@ +import time + +from fastapi import APIRouter, Depends + +from decnet.bus.app import get_app_bus +from decnet.telemetry import traced as _traced +from decnet.web.db.models import WorkersResponse +from decnet.web.dependencies import require_viewer +from decnet.web.services import systemd_control +from decnet.web.worker_registry import get_registry + +router = APIRouter() + + +@router.get( + "/workers", + response_model=WorkersResponse, + tags=["Observability"], + responses={ + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + }, +) +@_traced("api.list_workers") +async def list_workers(user: dict = Depends(require_viewer)) -> WorkersResponse: + workers = get_registry().snapshot() + bus = await get_app_bus() + installed = await systemd_control.list_installed() + for w in workers: + w.installed = w.name in installed + return WorkersResponse( + workers=workers, + generated_at=time.time(), + bus_connected=bus is not None, + ) diff --git a/decnet/web/worker_registry.py b/decnet/web/worker_registry.py new file mode 100644 index 00000000..6257341a --- /dev/null +++ b/decnet/web/worker_registry.py @@ -0,0 +1,202 @@ +"""In-process registry that aggregates worker heartbeats from the bus. + +The API process subscribes to ``system.*.health`` (plus the bare +``system.bus.health`` leaf that the bus daemon itself publishes) at +lifespan startup and keeps a simple last-seen dict. The +:func:`snapshot` call renders that into a stable list of known workers, +including those we have **never** heard from (surfaced as ``unknown`` — +which is distinct from ``stale``, a worker that used to pulse but went +silent). + +Names are the canonical singular forms used by the CLI table in +``CLAUDE.md``. Keeping the list hardcoded is deliberate: an unknown +topic segment would otherwise let any publisher inject a row into the +Workers panel. +""" +from __future__ import annotations + +import asyncio +import time +from typing import Any, Dict, List + +from decnet.bus import topics as _topics +from decnet.bus.base import BaseBus +from decnet.logging import get_logger +from decnet.web.db.models import WorkerStatus + +log = get_logger("web.worker_registry") + +# Canonical worker names. Mirrors the CLAUDE.md worker table; keep in +# sync when a new worker CLI lands. The API process is included because +# it self-publishes its own heartbeat so the panel has at least one +# always-on row to ground "bus is reachable" sanity. +KNOWN_WORKERS: tuple[str, ...] = ( + "api", + "bus", + "collector", + "profiler", # hosts the correlation engine too — no separate daemon + "sniffer", + "prober", + "mutator", + "agent", + "forwarder", + "updater", +) + +# ``ok`` window: 3× the 30s heartbeat interval in +# :func:`decnet.bus.publish.run_health_heartbeat`. One missed beat is +# noise; three missed beats is a worker problem. +OK_WINDOW_SECONDS = 90.0 + + +class WorkerRegistry: + """Last-seen aggregator for worker heartbeats. + + Single-writer (the subscriber task) + many-reader (HTTP requests). + Python's GIL makes the dict mutations atomic enough that we don't + need a lock for the read path; ``snapshot`` copies the dict under a + single reference so a concurrent write can't produce a torn view. + """ + + def __init__(self) -> None: + # name → {"ts": float, "payload": dict} + self._seen: Dict[str, Dict[str, Any]] = {} + self._task: asyncio.Task[None] | None = None + + def record(self, worker: str, ts: float, payload: Dict[str, Any]) -> None: + self._seen[worker] = {"ts": ts, "payload": payload} + + def snapshot(self) -> List[WorkerStatus]: + now = time.time() + seen = dict(self._seen) # point-in-time copy + out: List[WorkerStatus] = [] + for name in KNOWN_WORKERS: + entry = seen.get(name) + if entry is None: + out.append(WorkerStatus( + name=name, + status="unknown", + last_heartbeat_ts=None, + seconds_since=None, + extra={}, + )) + continue + ts = float(entry["ts"]) + seconds_since = max(0.0, now - ts) + status = "ok" if seconds_since < OK_WINDOW_SECONDS else "stale" + payload = dict(entry.get("payload") or {}) + # ``worker`` and ``ts`` are redundant with the row itself; + # strip them so ``extra`` only surfaces worker-contributed + # metadata (uptime, queue depth, etc.). + payload.pop("worker", None) + payload.pop("ts", None) + out.append(WorkerStatus( + name=name, + status=status, # type: ignore[arg-type] + last_heartbeat_ts=ts, + seconds_since=seconds_since, + extra=payload, + )) + return out + + async def start(self, bus: BaseBus | None) -> None: + """Begin subscribing. Idempotent; a second call is a no-op.""" + if self._task is not None and not self._task.done(): + return + if bus is None: + log.debug("worker registry: no bus — panel will show all UNKNOWN") + return + self._task = asyncio.create_task(self._run(bus)) + + async def stop(self) -> None: + """Cancel the subscriber task. Idempotent.""" + if self._task is None: + return + self._task.cancel() + try: + await self._task + except (asyncio.CancelledError, Exception): # noqa: BLE001 + pass + self._task = None + + async def _run(self, bus: BaseBus) -> None: + """Fan in both ``system.*.health`` and ``system.bus.health``. + + Two subscriptions because the bus's own heartbeat uses the + pre-existing ``system.bus.health`` string (not nested under + ``system..health``) and ``*`` matches exactly one token, + so the wildcard would miss it. + """ + worker_sub = bus.subscribe("system.*.health") + bus_sub = bus.subscribe(_topics.system("bus.health")) + try: + async with worker_sub, bus_sub: + async for event in _merge(worker_sub, bus_sub): + self._on_event(event) + except asyncio.CancelledError: + raise + except Exception as exc: # noqa: BLE001 + log.warning("worker registry subscriber exited: %s", exc) + + def _on_event(self, event: Any) -> None: + payload = event.payload or {} + # ``system..health`` — middle token is the worker name. + # ``system.bus.health`` — special case, derive from topic. + tokens = event.topic.split(".") + name: str | None = None + if len(tokens) == 3 and tokens[0] == "system" and tokens[2] == "health": + # tokens[1] is the worker name; also handles "bus" when the + # bus daemon publishes ``system.bus.health``. + name = tokens[1] + if not name: + return + if name not in KNOWN_WORKERS: + # Unknown worker name — log once at debug; don't widen the + # panel beyond the hardcoded list. + log.debug("heartbeat from unknown worker=%r", name) + return + ts = float(payload.get("ts", time.time())) + self.record(name, ts, payload) + + +async def _merge(*subs: Any): + """Round-robin over multiple subscriptions without losing events. + + asyncio.wait + FIRST_COMPLETED keeps both streams live; a plain + ``async for`` over a merged generator would serialise them. + """ + iters = [sub.__aiter__() for sub in subs] + pending = {asyncio.create_task(it.__anext__()): it for it in iters} + try: + while pending: + done, _ = await asyncio.wait( + pending.keys(), return_when=asyncio.FIRST_COMPLETED, + ) + for task in done: + it = pending.pop(task) + try: + yield task.result() + except StopAsyncIteration: + continue + pending[asyncio.create_task(it.__anext__())] = it + finally: + for task in pending: + task.cancel() + + +# Module-level singleton so the API lifespan and route handlers share +# one registry without threading it through every Depends. +_registry: WorkerRegistry | None = None + + +def get_registry() -> WorkerRegistry: + global _registry + if _registry is None: + _registry = WorkerRegistry() + return _registry + + +def reset_registry_for_tests() -> None: + """Drop the singleton — tests that spin up their own registry.""" + global _registry + _registry = None diff --git a/tests/api/workers/__init__.py b/tests/api/workers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/api/workers/test_workers_api.py b/tests/api/workers/test_workers_api.py new file mode 100644 index 00000000..0969ecae --- /dev/null +++ b/tests/api/workers/test_workers_api.py @@ -0,0 +1,171 @@ +"""Tests for the Workers panel API endpoints. + +Covers ``GET /api/v1/workers`` (viewer-readable, always surfaces every +known worker) and ``POST /api/v1/workers/{name}/stop`` (admin-only, +publishes a stop intent on the bus). +""" +from __future__ import annotations + +from typing import Any + +import httpx +import pytest + +from decnet.bus import topics as _topics +from decnet.bus.fake import FakeBus +from decnet.web import worker_registry as _wr +from decnet.web.router.workers import api_control_worker as _ctl +from decnet.web.router.workers import api_list_workers as _list +from decnet.web.worker_registry import KNOWN_WORKERS + + +@pytest.fixture(autouse=True) +def _reset_registry() -> None: + _wr.reset_registry_for_tests() + yield + _wr.reset_registry_for_tests() + + +@pytest.fixture +async def fake_bus(monkeypatch) -> FakeBus: + bus = FakeBus() + await bus.connect() + + async def _stub_get_app_bus() -> FakeBus: + return bus + + # Patch the symbol the control endpoint imported into its namespace. + monkeypatch.setattr(_ctl, "get_app_bus", _stub_get_app_bus) + yield bus + await bus.close() + + +@pytest.mark.asyncio +async def test_list_workers_viewer_sees_all_unknown( + client: httpx.AsyncClient, viewer_token: str, +) -> None: + resp = await client.get( + "/api/v1/workers", + headers={"Authorization": f"Bearer {viewer_token}"}, + ) + assert resp.status_code == 200 + body = resp.json() + names = {w["name"] for w in body["workers"]} + assert names == set(KNOWN_WORKERS) + # No heartbeats have arrived in the test harness, so every row is unknown. + for w in body["workers"]: + assert w["status"] == "unknown" + assert w["last_heartbeat_ts"] is None + assert w["seconds_since"] is None + assert "bus_connected" in body + assert isinstance(body["bus_connected"], bool) + # `installed` flag is always present + boolean. + for w in body["workers"]: + assert "installed" in w + assert isinstance(w["installed"], bool) + + +@pytest.mark.asyncio +async def test_list_workers_requires_auth(client: httpx.AsyncClient) -> None: + resp = await client.get("/api/v1/workers") + assert resp.status_code == 401 + + +@pytest.mark.asyncio +async def test_list_workers_reports_bus_connected_false_when_no_bus( + client: httpx.AsyncClient, viewer_token: str, monkeypatch, +) -> None: + async def _no_bus() -> None: + return None + + monkeypatch.setattr(_list, "get_app_bus", _no_bus) + resp = await client.get( + "/api/v1/workers", + headers={"Authorization": f"Bearer {viewer_token}"}, + ) + assert resp.status_code == 200 + assert resp.json()["bus_connected"] is False + + +@pytest.mark.asyncio +async def test_list_workers_reports_bus_connected_true_with_fake_bus( + client: httpx.AsyncClient, viewer_token: str, monkeypatch, +) -> None: + bus = FakeBus() + await bus.connect() + + async def _fake_bus() -> FakeBus: + return bus + + monkeypatch.setattr(_list, "get_app_bus", _fake_bus) + try: + resp = await client.get( + "/api/v1/workers", + headers={"Authorization": f"Bearer {viewer_token}"}, + ) + assert resp.status_code == 200 + assert resp.json()["bus_connected"] is True + finally: + await bus.close() + + +@pytest.mark.asyncio +async def test_stop_worker_admin_publishes_on_bus( + client: httpx.AsyncClient, auth_token: str, fake_bus: FakeBus, +) -> None: + topic = _topics.system_control("mutator") + received: list[Any] = [] + + sub = fake_bus.subscribe(topic) + await sub.__aenter__() + + async def _drain() -> None: + async for event in sub: + received.append(event) + return + + import asyncio + reader = asyncio.create_task(_drain()) + # Give the subscribe a tick so the publish lands on a live reader. + await asyncio.sleep(0) + + try: + resp = await client.post( + "/api/v1/workers/mutator/stop", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 202 + body = resp.json() + assert body == {"accepted": True, "worker": "mutator", "action": "stop"} + + await asyncio.wait_for(reader, timeout=1.0) + assert len(received) == 1 + ev = received[0] + assert ev.topic == topic + assert ev.payload["action"] == _topics.WORKER_CONTROL_STOP + assert "requested_by" in ev.payload + assert "ts" in ev.payload + finally: + await sub.__aexit__(None, None, None) + + +@pytest.mark.asyncio +async def test_stop_worker_viewer_forbidden( + client: httpx.AsyncClient, viewer_token: str, fake_bus: FakeBus, +) -> None: + resp = await client.post( + "/api/v1/workers/mutator/stop", + headers={"Authorization": f"Bearer {viewer_token}"}, + ) + assert resp.status_code == 403 + + +@pytest.mark.asyncio +async def test_stop_worker_unknown_name_404( + client: httpx.AsyncClient, auth_token: str, fake_bus: FakeBus, +) -> None: + resp = await client.post( + "/api/v1/workers/nonsense/stop", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 404 diff --git a/tests/bus/test_control_listener.py b/tests/bus/test_control_listener.py new file mode 100644 index 00000000..3a1cc14a --- /dev/null +++ b/tests/bus/test_control_listener.py @@ -0,0 +1,77 @@ +"""Tests for :func:`run_control_listener`. + +The listener is the worker-side half of the Workers panel stop flow: +consume ``system..control`` messages, set a shutdown event on a +well-formed ``{"action": "stop"}``, and ignore everything else without +raising. +""" +from __future__ import annotations + +import asyncio + +import pytest + +from decnet.bus import topics as _topics +from decnet.bus.fake import FakeBus +from decnet.bus.publish import run_control_listener + + +@pytest.mark.asyncio +async def test_control_listener_sets_shutdown_on_stop() -> None: + bus = FakeBus() + await bus.connect() + shutdown = asyncio.Event() + try: + task = asyncio.create_task(run_control_listener(bus, "mutator", shutdown)) + # Give the subscribe() call a tick to register before we publish. + await asyncio.sleep(0) + await bus.publish( + _topics.system_control("mutator"), + {"action": _topics.WORKER_CONTROL_STOP, "requested_by": "admin"}, + event_type="control", + ) + await asyncio.wait_for(task, timeout=1.0) + assert shutdown.is_set() + finally: + await bus.close() + + +@pytest.mark.asyncio +async def test_control_listener_ignores_malformed() -> None: + bus = FakeBus() + await bus.connect() + shutdown = asyncio.Event() + try: + task = asyncio.create_task(run_control_listener(bus, "mutator", shutdown)) + await asyncio.sleep(0) + # Unknown action, non-dict-ish field, missing action — none of + # these should raise or trigger shutdown. + await bus.publish( + _topics.system_control("mutator"), + {"action": "bogus"}, event_type="control", + ) + await bus.publish( + _topics.system_control("mutator"), + {"requested_by": "admin"}, event_type="control", + ) + # Now send a real stop to unblock the task so the test terminates. + await bus.publish( + _topics.system_control("mutator"), + {"action": _topics.WORKER_CONTROL_STOP}, event_type="control", + ) + await asyncio.wait_for(task, timeout=1.0) + assert shutdown.is_set() + finally: + await bus.close() + + +@pytest.mark.asyncio +async def test_control_listener_none_bus_awaits_shutdown() -> None: + # With bus=None the listener degrades to awaiting the shutdown event + # directly — callers can create_task() unconditionally. + shutdown = asyncio.Event() + task = asyncio.create_task(run_control_listener(None, "mutator", shutdown)) + await asyncio.sleep(0) + assert not task.done() + shutdown.set() + await asyncio.wait_for(task, timeout=1.0) diff --git a/tests/bus/test_topics.py b/tests/bus/test_topics.py index b06494de..f6b16526 100644 --- a/tests/bus/test_topics.py +++ b/tests/bus/test_topics.py @@ -61,3 +61,14 @@ def test_attacker_builder_rejects_empty() -> None: def test_system_health_builder() -> None: assert topics.system_health("sniffer") == "system.sniffer.health" assert topics.system_health("mutator") == "system.mutator.health" + + +def test_system_control_builder() -> None: + assert topics.system_control("mutator") == "system.mutator.control" + assert topics.system_control("collector") == "system.collector.control" + + +@pytest.mark.parametrize("bad", ["", "has.dot", "has*wildcard", "has>wild", "with space", "with\ttab"]) +def test_system_control_rejects_bad_segments(bad: str) -> None: + with pytest.raises(ValueError): + topics.system_control(bad)