feat(workers): bus-backed Workers panel (registry, control, installed flag)

Ships the backend half of Config → Workers:

* Worker registry aggregates `system.*.health` + `system.bus.health`
  heartbeats into a last-seen dict; OK / STALE / UNKNOWN tiers drop
  out of a 90s window (3× the 30s heartbeat interval).
* `GET /api/v1/workers` returns the snapshot plus `bus_connected`
  (so the UI can explain "all UNKNOWN" when the bus socket is down)
  and a per-row `installed` flag populated from
  `systemctl list-unit-files decnet-*.service` (cached 30s).
* `POST /api/v1/workers/{name}/stop` publishes a stop intent on
  `system.<name>.control`; workers listen via the shared control
  listener in `bus/publish.py`.
* Heartbeat + control listener wired into collector / profiler /
  sniffer / prober / mutator worker loops. API self-heartbeats too
  so the panel always has one ground-truth row.
* Topic helper `system_control(name)` + tests covering builder
  validation, control listener shutdown path, and the API surface
  (auth gating, bus-connected field, unknown-name 404).

Adds `StartFailure` / `StartAllResponse` models in anticipation of
the upcoming start endpoints (DEBT-034).
This commit is contained in:
2026-04-22 14:10:39 -04:00
parent fcaac648a4
commit 0fbb07c2ec
18 changed files with 863 additions and 10 deletions

View File

@@ -8,6 +8,8 @@ from __future__ import annotations
import asyncio import asyncio
import contextlib import contextlib
import os
import signal
import time import time
from typing import Any, Callable from typing import Any, Callable
@@ -99,3 +101,103 @@ async def run_health_heartbeat(
log.debug("heartbeat extra() failed worker=%s: %s", worker, exc) log.debug("heartbeat extra() failed worker=%s: %s", worker, exc)
await publish_safely(bus, topic, payload, event_type=_topics.SYSTEM_HEALTH) await publish_safely(bus, topic, payload, event_type=_topics.SYSTEM_HEALTH)
await asyncio.sleep(interval) await asyncio.sleep(interval)
async def run_control_listener(
bus: BaseBus | None,
worker: str,
shutdown: asyncio.Event,
) -> None:
"""Subscribe to ``system.<worker>.control`` and honour stop intents.
On a well-formed ``{"action": "stop", ...}`` message the function sets
*shutdown* and returns — the worker's main loop is expected to check
the event and unwind cleanly, matching the SIGTERM path.
Malformed payloads (missing/unknown action, non-dict, exception from
the transport) are logged and ignored. A ``None`` bus yields a noop
coroutine that simply awaits *shutdown* — callers can ``create_task``
this unconditionally regardless of bus state.
Cancellation-safe.
"""
if bus is None:
with contextlib.suppress(asyncio.CancelledError):
await shutdown.wait()
return
topic = _topics.system_control(worker)
with contextlib.suppress(asyncio.CancelledError):
try:
async with bus.subscribe(topic) as sub:
async for event in sub:
payload = event.payload or {}
action = payload.get("action")
requested_by = payload.get("requested_by", "<unknown>")
if action == _topics.WORKER_CONTROL_STOP:
log.info(
"control: stop requested worker=%s by=%s",
worker, requested_by,
)
shutdown.set()
return
log.debug(
"control: ignoring unknown action worker=%s action=%r",
worker, action,
)
except Exception as exc: # noqa: BLE001
log.warning(
"control listener failed worker=%s: %s — shutdown via bus disabled",
worker, exc,
)
async def run_control_listener_signal(
bus: BaseBus | None,
worker: str,
) -> None:
"""Like :func:`run_control_listener` but signals the process on stop.
Preferred for workers whose main loop is a blocking thread
(container-log tail, PTY read, scapy sniff) — wiring an
``asyncio.Event`` through the thread boundary is error-prone, and
every DECNET worker already has systemd-equivalent SIGTERM cleanup.
A SIGTERM self-signal routes the stop through that same path
without inventing a second shutdown mechanism.
Cancellation-safe. Never raises: a failed self-signal is logged
and the loop simply exits (admin can fall back to ``systemctl``).
"""
if bus is None:
return
topic = _topics.system_control(worker)
with contextlib.suppress(asyncio.CancelledError):
try:
async with bus.subscribe(topic) as sub:
async for event in sub:
payload = event.payload or {}
action = payload.get("action")
requested_by = payload.get("requested_by", "<unknown>")
if action == _topics.WORKER_CONTROL_STOP:
log.info(
"control: stop requested worker=%s by=%s → SIGTERM self",
worker, requested_by,
)
try:
os.kill(os.getpid(), signal.SIGTERM)
except Exception as exc: # noqa: BLE001
log.warning(
"control: self-signal failed worker=%s: %s",
worker, exc,
)
return
log.debug(
"control: ignoring unknown action worker=%s action=%r",
worker, action,
)
except Exception as exc: # noqa: BLE001
log.warning(
"control signal listener failed worker=%s: %s",
worker, exc,
)

View File

@@ -82,6 +82,16 @@ SYSTEM_BUS_HEALTH = "bus.health"
# :func:`system_health`. The leaf constant stays the same across workers; # :func:`system_health`. The leaf constant stays the same across workers;
# the worker name goes in the middle token. # the worker name goes in the middle token.
SYSTEM_HEALTH = "health" SYSTEM_HEALTH = "health"
# Worker-control leaf — built per-worker as ``system.<worker>.control`` via
# :func:`system_control`. Admin-originated stop intents travel on this
# topic; each worker subscribes to its own.
SYSTEM_CONTROL = "control"
# Control payload ``action`` values — the wire vocabulary. Only ``stop`` is
# handled in v1; ``start`` is reserved because a stopped worker has no
# subscriber, so starting requires external supervision (systemd).
WORKER_CONTROL_STOP = "stop"
WORKER_CONTROL_START = "start"
# ─── Builders ──────────────────────────────────────────────────────────────── # ─── Builders ────────────────────────────────────────────────────────────────
@@ -152,6 +162,23 @@ def system_health(worker: str) -> str:
return f"{SYSTEM}.{worker}.{SYSTEM_HEALTH}" return f"{SYSTEM}.{worker}.{SYSTEM_HEALTH}"
def system_control(worker: str) -> str:
"""Build ``system.<worker>.control``.
Admin-originated stop (and, eventually, start) intents are published
here; the worker in question subscribes to its own address and reacts.
Payload shape::
{"action": "stop", "requested_by": "<username>", "ts": <unix>}
*action* must be one of :data:`WORKER_CONTROL_STOP` /
:data:`WORKER_CONTROL_START`; any other value is ignored by the
listener. Same segment rules as :func:`system_health`.
"""
_reject_tokens(worker)
return f"{SYSTEM}.{worker}.{SYSTEM_CONTROL}"
def _reject_tokens(*parts: str) -> None: def _reject_tokens(*parts: str) -> None:
"""Reject topic segments that would break NATS-style tokenization. """Reject topic segments that would break NATS-style tokenization.

View File

@@ -20,7 +20,11 @@ from typing import Any, Callable, Optional
from decnet.bus import topics as _topics from decnet.bus import topics as _topics
from decnet.bus.factory import get_bus from decnet.bus.factory import get_bus
from decnet.bus.publish import make_thread_safe_publisher from decnet.bus.publish import (
make_thread_safe_publisher,
run_control_listener_signal,
run_health_heartbeat,
)
from decnet.logging import get_logger from decnet.logging import get_logger
from decnet.telemetry import traced as _traced, get_tracer as _get_tracer, inject_context as _inject_ctx from decnet.telemetry import traced as _traced, get_tracer as _get_tracer, inject_context as _inject_ctx
@@ -416,6 +420,14 @@ async def log_collector_worker(log_file: str) -> None:
_publish_log = _make_system_log_publisher(bus, loop) _publish_log = _make_system_log_publisher(bus, loop)
# Workers panel health heartbeat + bus-driven stop control. The
# heartbeat beacons on system.collector.health every 30s; the
# control listener translates a bus stop intent into a SIGTERM to
# this process (collector's main loop is a blocking thread pool, so
# self-signalling is cleaner than threading a shutdown event).
heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "collector"))
control_task = asyncio.create_task(run_control_listener_signal(bus, "collector"))
# Dedicated thread pool so long-running container log streams don't # Dedicated thread pool so long-running container log streams don't
# saturate the default asyncio executor and starve short-lived # saturate the default asyncio executor and starve short-lived
# to_thread() calls elsewhere (e.g. load_state in the web API). # to_thread() calls elsewhere (e.g. load_state in the web API).
@@ -465,6 +477,10 @@ async def log_collector_worker(log_file: str) -> None:
logger.error("collector error: %s", exc) logger.error("collector error: %s", exc)
finally: finally:
collector_pool.shutdown(wait=False) collector_pool.shutdown(wait=False)
for t in (heartbeat_task, control_task):
t.cancel()
with contextlib.suppress(Exception, asyncio.CancelledError):
await t
if bus is not None: if bus is not None:
with contextlib.suppress(Exception): with contextlib.suppress(Exception):
await bus.close() await bus.close()

View File

@@ -27,6 +27,7 @@ from decnet.bus.base import BaseBus
from decnet.bus.factory import get_bus from decnet.bus.factory import get_bus
from decnet.bus.publish import ( from decnet.bus.publish import (
publish_safely as _publish_safely, publish_safely as _publish_safely,
run_control_listener_signal as _run_control_listener_signal,
run_health_heartbeat as _run_health_heartbeat, run_health_heartbeat as _run_health_heartbeat,
) )
from decnet.mutator.events import MutationTrigger, emit_decky_mutated from decnet.mutator.events import MutationTrigger, emit_decky_mutated
@@ -332,6 +333,11 @@ async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) ->
heartbeat_task = asyncio.create_task( heartbeat_task = asyncio.create_task(
_run_health_heartbeat(bus, "mutator"), _run_health_heartbeat(bus, "mutator"),
) )
# Control listener: SIGTERM-based so the existing shutdown path
# (cancel wake_tasks + heartbeat_task) runs unchanged.
wake_tasks.append(asyncio.create_task(
_run_control_listener_signal(bus, "mutator"),
))
except Exception as exc: # noqa: BLE001 except Exception as exc: # noqa: BLE001
log.warning("mutator: bus unavailable, running in poll-only mode: %s", exc) log.warning("mutator: bus unavailable, running in poll-only mode: %s", exc)

View File

@@ -30,7 +30,11 @@ from typing import Any, Callable
from decnet.bus import topics as _topics from decnet.bus import topics as _topics
from decnet.bus.base import BaseBus from decnet.bus.base import BaseBus
from decnet.bus.factory import get_bus from decnet.bus.factory import get_bus
from decnet.bus.publish import make_thread_safe_publisher from decnet.bus.publish import (
make_thread_safe_publisher,
run_control_listener,
run_health_heartbeat,
)
from decnet.logging import get_logger from decnet.logging import get_logger
from decnet.prober.hassh import hassh_server from decnet.prober.hassh import hassh_server
from decnet.prober.jarm import JARM_EMPTY_HASH, jarm_hash from decnet.prober.jarm import JARM_EMPTY_HASH, jarm_hash
@@ -519,8 +523,13 @@ async def prober_worker(
event_type, event_type,
) )
shutdown = asyncio.Event()
heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "prober"))
control_task = asyncio.create_task(
run_control_listener(bus, "prober", shutdown),
)
try: try:
while True: while not shutdown.is_set():
# Discover new attacker IPs from the log stream # Discover new attacker IPs from the log stream
new_ips, log_position = await asyncio.to_thread( new_ips, log_position = await asyncio.to_thread(
_discover_attackers, json_path, log_position, _discover_attackers, json_path, log_position,
@@ -542,8 +551,15 @@ async def prober_worker(
_publish_attacker, _publish_attacker,
) )
await asyncio.sleep(interval) try:
await asyncio.wait_for(shutdown.wait(), timeout=interval)
except asyncio.TimeoutError:
pass
finally: finally:
for t in (heartbeat_task, control_task):
t.cancel()
with contextlib.suppress(Exception, asyncio.CancelledError):
await t
if bus is not None: if bus is not None:
with contextlib.suppress(Exception): with contextlib.suppress(Exception):
await bus.close() await bus.close()

View File

@@ -21,7 +21,11 @@ from typing import Any, Callable
from decnet.bus import topics as _topics from decnet.bus import topics as _topics
from decnet.bus.factory import get_bus from decnet.bus.factory import get_bus
from decnet.bus.publish import make_thread_safe_publisher from decnet.bus.publish import (
make_thread_safe_publisher,
run_control_listener,
run_health_heartbeat,
)
from decnet.correlation.engine import CorrelationEngine from decnet.correlation.engine import CorrelationEngine
from decnet.correlation.parser import LogEvent from decnet.correlation.parser import LogEvent
from decnet.logging import get_logger from decnet.logging import get_logger
@@ -87,14 +91,32 @@ async def attacker_profile_worker(repo: BaseRepository, *, interval: int = 30) -
state.last_log_id = _saved_cursor.get("last_log_id", 0) state.last_log_id = _saved_cursor.get("last_log_id", 0)
state.initialized = True state.initialized = True
logger.info("attacker worker: resumed from cursor last_log_id=%d", state.last_log_id) logger.info("attacker worker: resumed from cursor last_log_id=%d", state.last_log_id)
# Workers panel wiring: heartbeat + bus-driven stop. Main loop is
# pure asyncio sleep/await, so an event-based control listener
# drops in cleanly without a SIGTERM self-signal.
shutdown = asyncio.Event()
heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "profiler"))
control_task = asyncio.create_task(
run_control_listener(bus, "profiler", shutdown),
)
try: try:
while True: while not shutdown.is_set():
await asyncio.sleep(interval) try:
await asyncio.wait_for(shutdown.wait(), timeout=interval)
except asyncio.TimeoutError:
pass # normal tick
if shutdown.is_set():
break
try: try:
await _incremental_update(repo, state) await _incremental_update(repo, state)
except Exception as exc: except Exception as exc:
logger.error("attacker worker: update failed: %s", exc) logger.error("attacker worker: update failed: %s", exc)
finally: finally:
for t in (heartbeat_task, control_task):
t.cancel()
with contextlib.suppress(Exception, asyncio.CancelledError):
await t
if bus is not None: if bus is not None:
with contextlib.suppress(Exception): with contextlib.suppress(Exception):
await bus.close() await bus.close()

View File

@@ -22,7 +22,11 @@ from typing import Any, Callable
from decnet.bus import topics as _topics from decnet.bus import topics as _topics
from decnet.bus.base import BaseBus from decnet.bus.base import BaseBus
from decnet.bus.factory import get_bus from decnet.bus.factory import get_bus
from decnet.bus.publish import make_thread_safe_publisher from decnet.bus.publish import (
make_thread_safe_publisher,
run_control_listener_signal,
run_health_heartbeat,
)
from decnet.logging import get_logger from decnet.logging import get_logger
from decnet.network import HOST_IPVLAN_IFACE, HOST_MACVLAN_IFACE from decnet.network import HOST_IPVLAN_IFACE, HOST_MACVLAN_IFACE
from decnet.sniffer.fingerprint import SnifferEngine from decnet.sniffer.fingerprint import SnifferEngine
@@ -198,6 +202,15 @@ async def sniffer_worker(log_file: str) -> None:
if bus is not None: if bus is not None:
publish_fn = _make_decky_traffic_publisher(bus, loop) publish_fn = _make_decky_traffic_publisher(bus, loop)
# Workers panel: heartbeat + SIGTERM-based stop control. The
# sniff loop is a blocking scapy thread, so an asyncio shutdown
# event can't reach it — translating the bus stop into SIGTERM
# routes through the existing CancelledError path below.
heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "sniffer"))
control_task = asyncio.create_task(
run_control_listener_signal(bus, "sniffer"),
)
# Dedicated thread pool so the long-running sniff loop doesn't # Dedicated thread pool so the long-running sniff loop doesn't
# occupy a slot in the default asyncio executor. # occupy a slot in the default asyncio executor.
sniffer_pool = ThreadPoolExecutor( sniffer_pool = ThreadPoolExecutor(
@@ -216,6 +229,10 @@ async def sniffer_worker(log_file: str) -> None:
raise raise
finally: finally:
sniffer_pool.shutdown(wait=False) sniffer_pool.shutdown(wait=False)
for t in (heartbeat_task, control_task):
t.cancel()
with contextlib.suppress(Exception, asyncio.CancelledError):
await t
if bus is not None: if bus is not None:
with contextlib.suppress(Exception): with contextlib.suppress(Exception):
await bus.close() await bus.close()

View File

@@ -30,6 +30,7 @@ ingestion_task: Optional[asyncio.Task[Any]] = None
collector_task: Optional[asyncio.Task[Any]] = None collector_task: Optional[asyncio.Task[Any]] = None
attacker_task: Optional[asyncio.Task[Any]] = None attacker_task: Optional[asyncio.Task[Any]] = None
sniffer_task: Optional[asyncio.Task[Any]] = None sniffer_task: Optional[asyncio.Task[Any]] = None
heartbeat_task: Optional[asyncio.Task[Any]] = None
def get_background_tasks() -> dict[str, Optional[asyncio.Task[Any]]]: def get_background_tasks() -> dict[str, Optional[asyncio.Task[Any]]]:
@@ -45,6 +46,7 @@ def get_background_tasks() -> dict[str, Optional[asyncio.Task[Any]]]:
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
global ingestion_task, collector_task, attacker_task, sniffer_task global ingestion_task, collector_task, attacker_task, sniffer_task
global heartbeat_task
import resource import resource
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
@@ -115,10 +117,33 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
else: else:
log.info("Contract Test Mode: skipping background worker startup") log.info("Contract Test Mode: skipping background worker startup")
# Worker registry + API self-heartbeat — always on, even under
# contract-test mode, so the Workers panel can render the process
# without the dev needing to run a full stack. A missing bus turns
# both into no-ops inside the helpers.
try:
from decnet.bus.app import get_app_bus
from decnet.bus.publish import run_health_heartbeat
from decnet.web.worker_registry import get_registry
_bus = await get_app_bus()
await get_registry().start(_bus)
if heartbeat_task is None or heartbeat_task.done():
heartbeat_task = asyncio.create_task(
run_health_heartbeat(_bus, "api"),
)
except Exception as exc: # noqa: BLE001
log.warning("worker registry bootstrap failed: %s", exc)
yield yield
log.info("API shutdown cancelling background tasks") log.info("API shutdown cancelling background tasks")
for task in (ingestion_task, collector_task, attacker_task, sniffer_task): try:
from decnet.web.worker_registry import get_registry
await get_registry().stop()
except Exception as exc: # noqa: BLE001
log.warning("worker registry stop raised: %s", exc)
for task in (ingestion_task, collector_task, attacker_task, sniffer_task, heartbeat_task):
if task and not task.done(): if task and not task.done():
task.cancel() task.cancel()
try: try:

View File

@@ -1,5 +1,5 @@
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Literal, Optional, Any, List, Annotated from typing import Dict, Literal, Optional, Any, List, Annotated
from uuid import uuid4 from uuid import uuid4
from sqlalchemy import Column, Index, Text, UniqueConstraint from sqlalchemy import Column, Index, Text, UniqueConstraint
from sqlalchemy.dialects.mysql import MEDIUMTEXT from sqlalchemy.dialects.mysql import MEDIUMTEXT
@@ -452,6 +452,52 @@ class HealthResponse(BaseModel):
components: dict[str, ComponentHealth] components: dict[str, ComponentHealth]
# --- Workers panel (Config → Workers) ---
# Bus-backed health + control: workers heartbeat on ``system.<name>.health``
# and listen on ``system.<name>.control``. The API aggregates last-seen
# heartbeats via the worker registry; these are the HTTP-facing shapes.
class WorkerStatus(BaseModel):
name: str
# ``ok`` — heartbeat within 90s (3× 30s heartbeat interval)
# ``stale`` — worker was seen before but hasn't pulsed in 90s+
# ``unknown`` — we've never received a heartbeat from this name
status: Literal["ok", "stale", "unknown"]
last_heartbeat_ts: Optional[float] = None
seconds_since: Optional[float] = None
# Whatever the worker's ``extra()`` callback put in the heartbeat;
# opaque to the panel, displayed only if the UI knows the key.
extra: Dict[str, Any] = PydanticField(default_factory=dict)
# True iff a ``decnet-<name>.service`` unit file is present on the
# host. False flips the UI START button to disabled with a
# "Unit not installed" tooltip. Default True for backwards compat
# on clients that pre-date the field.
installed: bool = True
class WorkersResponse(BaseModel):
workers: List[WorkerStatus]
generated_at: float
bus_connected: bool
class WorkerControlResponse(BaseModel):
accepted: bool
worker: str
action: str
class StartFailure(BaseModel):
name: str
reason: str
class StartAllResponse(BaseModel):
started: List[str]
already_running: List[str]
failed: List[StartFailure]
# --- Swarm API DTOs --- # --- Swarm API DTOs ---
# Request/response contracts for the master-side swarm controller # Request/response contracts for the master-side swarm controller
# (decnet/web/swarm_api.py). The underlying SQLModel tables — SwarmHost and # (decnet/web/swarm_api.py). The underlying SQLModel tables — SwarmHost and

View File

@@ -22,6 +22,8 @@ from .config.api_update_config import router as config_update_router
from .config.api_manage_users import router as config_users_router from .config.api_manage_users import router as config_users_router
from .config.api_reinit import router as config_reinit_router from .config.api_reinit import router as config_reinit_router
from .health.api_get_health import router as health_router from .health.api_get_health import router as health_router
from .workers.api_list_workers import router as workers_list_router
from .workers.api_control_worker import router as workers_control_router
from .artifacts.api_get_artifact import router as artifacts_router from .artifacts.api_get_artifact import router as artifacts_router
from .swarm_updates import swarm_updates_router from .swarm_updates import swarm_updates_router
from .swarm_mgmt import swarm_mgmt_router from .swarm_mgmt import swarm_mgmt_router
@@ -69,6 +71,8 @@ api_router.include_router(attacker_transcripts_router)
api_router.include_router(stats_router) api_router.include_router(stats_router)
api_router.include_router(stream_router) api_router.include_router(stream_router)
api_router.include_router(health_router) api_router.include_router(health_router)
api_router.include_router(workers_list_router)
api_router.include_router(workers_control_router)
# Configuration # Configuration
api_router.include_router(config_get_router) api_router.include_router(config_get_router)

View File

View File

@@ -0,0 +1,76 @@
import time
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import ORJSONResponse
from decnet.bus import topics as _topics
from decnet.bus.app import get_app_bus
from decnet.bus.publish import publish_safely
from decnet.logging import get_logger
from decnet.telemetry import traced as _traced
from decnet.web.db.models import WorkerControlResponse
from decnet.web.dependencies import require_admin
from decnet.web.worker_registry import KNOWN_WORKERS
log = get_logger("api")
router = APIRouter()
@router.post(
"/workers/{name}/stop",
tags=["Observability"],
responses={
202: {"model": WorkerControlResponse, "description": "Stop intent queued on bus"},
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "Unknown worker"},
503: {"description": "Bus unavailable"},
},
)
@_traced("api.stop_worker")
async def stop_worker(
name: str,
admin: dict = Depends(require_admin),
) -> ORJSONResponse:
"""Publish a stop intent on ``system.<name>.control``.
Fire-and-forget: the endpoint does not wait for the worker to
actually exit — the caller observes the status row in the Workers
panel flipping to ``stale`` as heartbeats stop. Consistent with the
rest of the bus contract (at-most-once, DB is source of truth for
any persistent state; the bus is the notification layer).
"""
if name not in KNOWN_WORKERS:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Unknown worker: {name!r}",
)
bus = await get_app_bus()
if bus is None:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="bus unavailable",
)
topic = _topics.system_control(name)
payload = {
"action": _topics.WORKER_CONTROL_STOP,
"requested_by": admin.get("username") or admin.get("sub") or "admin",
"ts": time.time(),
}
await publish_safely(bus, topic, payload, event_type=_topics.SYSTEM_CONTROL)
log.info(
"workers: stop requested worker=%s by=%s",
name, payload["requested_by"],
)
body = WorkerControlResponse(
accepted=True,
worker=name,
action=_topics.WORKER_CONTROL_STOP,
)
return ORJSONResponse(
content=body.model_dump(),
status_code=status.HTTP_202_ACCEPTED,
)

View File

@@ -0,0 +1,35 @@
import time
from fastapi import APIRouter, Depends
from decnet.bus.app import get_app_bus
from decnet.telemetry import traced as _traced
from decnet.web.db.models import WorkersResponse
from decnet.web.dependencies import require_viewer
from decnet.web.services import systemd_control
from decnet.web.worker_registry import get_registry
router = APIRouter()
@router.get(
"/workers",
response_model=WorkersResponse,
tags=["Observability"],
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
},
)
@_traced("api.list_workers")
async def list_workers(user: dict = Depends(require_viewer)) -> WorkersResponse:
workers = get_registry().snapshot()
bus = await get_app_bus()
installed = await systemd_control.list_installed()
for w in workers:
w.installed = w.name in installed
return WorkersResponse(
workers=workers,
generated_at=time.time(),
bus_connected=bus is not None,
)

View File

@@ -0,0 +1,202 @@
"""In-process registry that aggregates worker heartbeats from the bus.
The API process subscribes to ``system.*.health`` (plus the bare
``system.bus.health`` leaf that the bus daemon itself publishes) at
lifespan startup and keeps a simple last-seen dict. The
:func:`snapshot` call renders that into a stable list of known workers,
including those we have **never** heard from (surfaced as ``unknown`` —
which is distinct from ``stale``, a worker that used to pulse but went
silent).
Names are the canonical singular forms used by the CLI table in
``CLAUDE.md``. Keeping the list hardcoded is deliberate: an unknown
topic segment would otherwise let any publisher inject a row into the
Workers panel.
"""
from __future__ import annotations
import asyncio
import time
from typing import Any, Dict, List
from decnet.bus import topics as _topics
from decnet.bus.base import BaseBus
from decnet.logging import get_logger
from decnet.web.db.models import WorkerStatus
log = get_logger("web.worker_registry")
# Canonical worker names. Mirrors the CLAUDE.md worker table; keep in
# sync when a new worker CLI lands. The API process is included because
# it self-publishes its own heartbeat so the panel has at least one
# always-on row to ground "bus is reachable" sanity.
KNOWN_WORKERS: tuple[str, ...] = (
"api",
"bus",
"collector",
"profiler", # hosts the correlation engine too — no separate daemon
"sniffer",
"prober",
"mutator",
"agent",
"forwarder",
"updater",
)
# ``ok`` window: 3× the 30s heartbeat interval in
# :func:`decnet.bus.publish.run_health_heartbeat`. One missed beat is
# noise; three missed beats is a worker problem.
OK_WINDOW_SECONDS = 90.0
class WorkerRegistry:
"""Last-seen aggregator for worker heartbeats.
Single-writer (the subscriber task) + many-reader (HTTP requests).
Python's GIL makes the dict mutations atomic enough that we don't
need a lock for the read path; ``snapshot`` copies the dict under a
single reference so a concurrent write can't produce a torn view.
"""
def __init__(self) -> None:
# name → {"ts": float, "payload": dict}
self._seen: Dict[str, Dict[str, Any]] = {}
self._task: asyncio.Task[None] | None = None
def record(self, worker: str, ts: float, payload: Dict[str, Any]) -> None:
self._seen[worker] = {"ts": ts, "payload": payload}
def snapshot(self) -> List[WorkerStatus]:
now = time.time()
seen = dict(self._seen) # point-in-time copy
out: List[WorkerStatus] = []
for name in KNOWN_WORKERS:
entry = seen.get(name)
if entry is None:
out.append(WorkerStatus(
name=name,
status="unknown",
last_heartbeat_ts=None,
seconds_since=None,
extra={},
))
continue
ts = float(entry["ts"])
seconds_since = max(0.0, now - ts)
status = "ok" if seconds_since < OK_WINDOW_SECONDS else "stale"
payload = dict(entry.get("payload") or {})
# ``worker`` and ``ts`` are redundant with the row itself;
# strip them so ``extra`` only surfaces worker-contributed
# metadata (uptime, queue depth, etc.).
payload.pop("worker", None)
payload.pop("ts", None)
out.append(WorkerStatus(
name=name,
status=status, # type: ignore[arg-type]
last_heartbeat_ts=ts,
seconds_since=seconds_since,
extra=payload,
))
return out
async def start(self, bus: BaseBus | None) -> None:
"""Begin subscribing. Idempotent; a second call is a no-op."""
if self._task is not None and not self._task.done():
return
if bus is None:
log.debug("worker registry: no bus — panel will show all UNKNOWN")
return
self._task = asyncio.create_task(self._run(bus))
async def stop(self) -> None:
"""Cancel the subscriber task. Idempotent."""
if self._task is None:
return
self._task.cancel()
try:
await self._task
except (asyncio.CancelledError, Exception): # noqa: BLE001
pass
self._task = None
async def _run(self, bus: BaseBus) -> None:
"""Fan in both ``system.*.health`` and ``system.bus.health``.
Two subscriptions because the bus's own heartbeat uses the
pre-existing ``system.bus.health`` string (not nested under
``system.<worker>.health``) and ``*`` matches exactly one token,
so the wildcard would miss it.
"""
worker_sub = bus.subscribe("system.*.health")
bus_sub = bus.subscribe(_topics.system("bus.health"))
try:
async with worker_sub, bus_sub:
async for event in _merge(worker_sub, bus_sub):
self._on_event(event)
except asyncio.CancelledError:
raise
except Exception as exc: # noqa: BLE001
log.warning("worker registry subscriber exited: %s", exc)
def _on_event(self, event: Any) -> None:
payload = event.payload or {}
# ``system.<name>.health`` — middle token is the worker name.
# ``system.bus.health`` — special case, derive from topic.
tokens = event.topic.split(".")
name: str | None = None
if len(tokens) == 3 and tokens[0] == "system" and tokens[2] == "health":
# tokens[1] is the worker name; also handles "bus" when the
# bus daemon publishes ``system.bus.health``.
name = tokens[1]
if not name:
return
if name not in KNOWN_WORKERS:
# Unknown worker name — log once at debug; don't widen the
# panel beyond the hardcoded list.
log.debug("heartbeat from unknown worker=%r", name)
return
ts = float(payload.get("ts", time.time()))
self.record(name, ts, payload)
async def _merge(*subs: Any):
"""Round-robin over multiple subscriptions without losing events.
asyncio.wait + FIRST_COMPLETED keeps both streams live; a plain
``async for`` over a merged generator would serialise them.
"""
iters = [sub.__aiter__() for sub in subs]
pending = {asyncio.create_task(it.__anext__()): it for it in iters}
try:
while pending:
done, _ = await asyncio.wait(
pending.keys(), return_when=asyncio.FIRST_COMPLETED,
)
for task in done:
it = pending.pop(task)
try:
yield task.result()
except StopAsyncIteration:
continue
pending[asyncio.create_task(it.__anext__())] = it
finally:
for task in pending:
task.cancel()
# Module-level singleton so the API lifespan and route handlers share
# one registry without threading it through every Depends.
_registry: WorkerRegistry | None = None
def get_registry() -> WorkerRegistry:
global _registry
if _registry is None:
_registry = WorkerRegistry()
return _registry
def reset_registry_for_tests() -> None:
"""Drop the singleton — tests that spin up their own registry."""
global _registry
_registry = None

View File

View File

@@ -0,0 +1,171 @@
"""Tests for the Workers panel API endpoints.
Covers ``GET /api/v1/workers`` (viewer-readable, always surfaces every
known worker) and ``POST /api/v1/workers/{name}/stop`` (admin-only,
publishes a stop intent on the bus).
"""
from __future__ import annotations
from typing import Any
import httpx
import pytest
from decnet.bus import topics as _topics
from decnet.bus.fake import FakeBus
from decnet.web import worker_registry as _wr
from decnet.web.router.workers import api_control_worker as _ctl
from decnet.web.router.workers import api_list_workers as _list
from decnet.web.worker_registry import KNOWN_WORKERS
@pytest.fixture(autouse=True)
def _reset_registry() -> None:
_wr.reset_registry_for_tests()
yield
_wr.reset_registry_for_tests()
@pytest.fixture
async def fake_bus(monkeypatch) -> FakeBus:
bus = FakeBus()
await bus.connect()
async def _stub_get_app_bus() -> FakeBus:
return bus
# Patch the symbol the control endpoint imported into its namespace.
monkeypatch.setattr(_ctl, "get_app_bus", _stub_get_app_bus)
yield bus
await bus.close()
@pytest.mark.asyncio
async def test_list_workers_viewer_sees_all_unknown(
client: httpx.AsyncClient, viewer_token: str,
) -> None:
resp = await client.get(
"/api/v1/workers",
headers={"Authorization": f"Bearer {viewer_token}"},
)
assert resp.status_code == 200
body = resp.json()
names = {w["name"] for w in body["workers"]}
assert names == set(KNOWN_WORKERS)
# No heartbeats have arrived in the test harness, so every row is unknown.
for w in body["workers"]:
assert w["status"] == "unknown"
assert w["last_heartbeat_ts"] is None
assert w["seconds_since"] is None
assert "bus_connected" in body
assert isinstance(body["bus_connected"], bool)
# `installed` flag is always present + boolean.
for w in body["workers"]:
assert "installed" in w
assert isinstance(w["installed"], bool)
@pytest.mark.asyncio
async def test_list_workers_requires_auth(client: httpx.AsyncClient) -> None:
resp = await client.get("/api/v1/workers")
assert resp.status_code == 401
@pytest.mark.asyncio
async def test_list_workers_reports_bus_connected_false_when_no_bus(
client: httpx.AsyncClient, viewer_token: str, monkeypatch,
) -> None:
async def _no_bus() -> None:
return None
monkeypatch.setattr(_list, "get_app_bus", _no_bus)
resp = await client.get(
"/api/v1/workers",
headers={"Authorization": f"Bearer {viewer_token}"},
)
assert resp.status_code == 200
assert resp.json()["bus_connected"] is False
@pytest.mark.asyncio
async def test_list_workers_reports_bus_connected_true_with_fake_bus(
client: httpx.AsyncClient, viewer_token: str, monkeypatch,
) -> None:
bus = FakeBus()
await bus.connect()
async def _fake_bus() -> FakeBus:
return bus
monkeypatch.setattr(_list, "get_app_bus", _fake_bus)
try:
resp = await client.get(
"/api/v1/workers",
headers={"Authorization": f"Bearer {viewer_token}"},
)
assert resp.status_code == 200
assert resp.json()["bus_connected"] is True
finally:
await bus.close()
@pytest.mark.asyncio
async def test_stop_worker_admin_publishes_on_bus(
client: httpx.AsyncClient, auth_token: str, fake_bus: FakeBus,
) -> None:
topic = _topics.system_control("mutator")
received: list[Any] = []
sub = fake_bus.subscribe(topic)
await sub.__aenter__()
async def _drain() -> None:
async for event in sub:
received.append(event)
return
import asyncio
reader = asyncio.create_task(_drain())
# Give the subscribe a tick so the publish lands on a live reader.
await asyncio.sleep(0)
try:
resp = await client.post(
"/api/v1/workers/mutator/stop",
headers={"Authorization": f"Bearer {auth_token}"},
)
assert resp.status_code == 202
body = resp.json()
assert body == {"accepted": True, "worker": "mutator", "action": "stop"}
await asyncio.wait_for(reader, timeout=1.0)
assert len(received) == 1
ev = received[0]
assert ev.topic == topic
assert ev.payload["action"] == _topics.WORKER_CONTROL_STOP
assert "requested_by" in ev.payload
assert "ts" in ev.payload
finally:
await sub.__aexit__(None, None, None)
@pytest.mark.asyncio
async def test_stop_worker_viewer_forbidden(
client: httpx.AsyncClient, viewer_token: str, fake_bus: FakeBus,
) -> None:
resp = await client.post(
"/api/v1/workers/mutator/stop",
headers={"Authorization": f"Bearer {viewer_token}"},
)
assert resp.status_code == 403
@pytest.mark.asyncio
async def test_stop_worker_unknown_name_404(
client: httpx.AsyncClient, auth_token: str, fake_bus: FakeBus,
) -> None:
resp = await client.post(
"/api/v1/workers/nonsense/stop",
headers={"Authorization": f"Bearer {auth_token}"},
)
assert resp.status_code == 404

View File

@@ -0,0 +1,77 @@
"""Tests for :func:`run_control_listener`.
The listener is the worker-side half of the Workers panel stop flow:
consume ``system.<worker>.control`` messages, set a shutdown event on a
well-formed ``{"action": "stop"}``, and ignore everything else without
raising.
"""
from __future__ import annotations
import asyncio
import pytest
from decnet.bus import topics as _topics
from decnet.bus.fake import FakeBus
from decnet.bus.publish import run_control_listener
@pytest.mark.asyncio
async def test_control_listener_sets_shutdown_on_stop() -> None:
bus = FakeBus()
await bus.connect()
shutdown = asyncio.Event()
try:
task = asyncio.create_task(run_control_listener(bus, "mutator", shutdown))
# Give the subscribe() call a tick to register before we publish.
await asyncio.sleep(0)
await bus.publish(
_topics.system_control("mutator"),
{"action": _topics.WORKER_CONTROL_STOP, "requested_by": "admin"},
event_type="control",
)
await asyncio.wait_for(task, timeout=1.0)
assert shutdown.is_set()
finally:
await bus.close()
@pytest.mark.asyncio
async def test_control_listener_ignores_malformed() -> None:
bus = FakeBus()
await bus.connect()
shutdown = asyncio.Event()
try:
task = asyncio.create_task(run_control_listener(bus, "mutator", shutdown))
await asyncio.sleep(0)
# Unknown action, non-dict-ish field, missing action — none of
# these should raise or trigger shutdown.
await bus.publish(
_topics.system_control("mutator"),
{"action": "bogus"}, event_type="control",
)
await bus.publish(
_topics.system_control("mutator"),
{"requested_by": "admin"}, event_type="control",
)
# Now send a real stop to unblock the task so the test terminates.
await bus.publish(
_topics.system_control("mutator"),
{"action": _topics.WORKER_CONTROL_STOP}, event_type="control",
)
await asyncio.wait_for(task, timeout=1.0)
assert shutdown.is_set()
finally:
await bus.close()
@pytest.mark.asyncio
async def test_control_listener_none_bus_awaits_shutdown() -> None:
# With bus=None the listener degrades to awaiting the shutdown event
# directly — callers can create_task() unconditionally.
shutdown = asyncio.Event()
task = asyncio.create_task(run_control_listener(None, "mutator", shutdown))
await asyncio.sleep(0)
assert not task.done()
shutdown.set()
await asyncio.wait_for(task, timeout=1.0)

View File

@@ -61,3 +61,14 @@ def test_attacker_builder_rejects_empty() -> None:
def test_system_health_builder() -> None: def test_system_health_builder() -> None:
assert topics.system_health("sniffer") == "system.sniffer.health" assert topics.system_health("sniffer") == "system.sniffer.health"
assert topics.system_health("mutator") == "system.mutator.health" assert topics.system_health("mutator") == "system.mutator.health"
def test_system_control_builder() -> None:
assert topics.system_control("mutator") == "system.mutator.control"
assert topics.system_control("collector") == "system.collector.control"
@pytest.mark.parametrize("bad", ["", "has.dot", "has*wildcard", "has>wild", "with space", "with\ttab"])
def test_system_control_rejects_bad_segments(bad: str) -> None:
with pytest.raises(ValueError):
topics.system_control(bad)