feat(workers): bus-backed Workers panel (registry, control, installed flag)
Ships the backend half of Config → Workers:
* Worker registry aggregates `system.*.health` + `system.bus.health`
heartbeats into a last-seen dict; OK / STALE / UNKNOWN tiers drop
out of a 90s window (3× the 30s heartbeat interval).
* `GET /api/v1/workers` returns the snapshot plus `bus_connected`
(so the UI can explain "all UNKNOWN" when the bus socket is down)
and a per-row `installed` flag populated from
`systemctl list-unit-files decnet-*.service` (cached 30s).
* `POST /api/v1/workers/{name}/stop` publishes a stop intent on
`system.<name>.control`; workers listen via the shared control
listener in `bus/publish.py`.
* Heartbeat + control listener wired into collector / profiler /
sniffer / prober / mutator worker loops. API self-heartbeats too
so the panel always has one ground-truth row.
* Topic helper `system_control(name)` + tests covering builder
validation, control listener shutdown path, and the API surface
(auth gating, bus-connected field, unknown-name 404).
Adds `StartFailure` / `StartAllResponse` models in anticipation of
the upcoming start endpoints (DEBT-034).
This commit is contained in:
@@ -8,6 +8,8 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import contextlib
|
import contextlib
|
||||||
|
import os
|
||||||
|
import signal
|
||||||
import time
|
import time
|
||||||
from typing import Any, Callable
|
from typing import Any, Callable
|
||||||
|
|
||||||
@@ -99,3 +101,103 @@ async def run_health_heartbeat(
|
|||||||
log.debug("heartbeat extra() failed worker=%s: %s", worker, exc)
|
log.debug("heartbeat extra() failed worker=%s: %s", worker, exc)
|
||||||
await publish_safely(bus, topic, payload, event_type=_topics.SYSTEM_HEALTH)
|
await publish_safely(bus, topic, payload, event_type=_topics.SYSTEM_HEALTH)
|
||||||
await asyncio.sleep(interval)
|
await asyncio.sleep(interval)
|
||||||
|
|
||||||
|
|
||||||
|
async def run_control_listener(
|
||||||
|
bus: BaseBus | None,
|
||||||
|
worker: str,
|
||||||
|
shutdown: asyncio.Event,
|
||||||
|
) -> None:
|
||||||
|
"""Subscribe to ``system.<worker>.control`` and honour stop intents.
|
||||||
|
|
||||||
|
On a well-formed ``{"action": "stop", ...}`` message the function sets
|
||||||
|
*shutdown* and returns — the worker's main loop is expected to check
|
||||||
|
the event and unwind cleanly, matching the SIGTERM path.
|
||||||
|
|
||||||
|
Malformed payloads (missing/unknown action, non-dict, exception from
|
||||||
|
the transport) are logged and ignored. A ``None`` bus yields a noop
|
||||||
|
coroutine that simply awaits *shutdown* — callers can ``create_task``
|
||||||
|
this unconditionally regardless of bus state.
|
||||||
|
|
||||||
|
Cancellation-safe.
|
||||||
|
"""
|
||||||
|
if bus is None:
|
||||||
|
with contextlib.suppress(asyncio.CancelledError):
|
||||||
|
await shutdown.wait()
|
||||||
|
return
|
||||||
|
|
||||||
|
topic = _topics.system_control(worker)
|
||||||
|
with contextlib.suppress(asyncio.CancelledError):
|
||||||
|
try:
|
||||||
|
async with bus.subscribe(topic) as sub:
|
||||||
|
async for event in sub:
|
||||||
|
payload = event.payload or {}
|
||||||
|
action = payload.get("action")
|
||||||
|
requested_by = payload.get("requested_by", "<unknown>")
|
||||||
|
if action == _topics.WORKER_CONTROL_STOP:
|
||||||
|
log.info(
|
||||||
|
"control: stop requested worker=%s by=%s",
|
||||||
|
worker, requested_by,
|
||||||
|
)
|
||||||
|
shutdown.set()
|
||||||
|
return
|
||||||
|
log.debug(
|
||||||
|
"control: ignoring unknown action worker=%s action=%r",
|
||||||
|
worker, action,
|
||||||
|
)
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
log.warning(
|
||||||
|
"control listener failed worker=%s: %s — shutdown via bus disabled",
|
||||||
|
worker, exc,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def run_control_listener_signal(
|
||||||
|
bus: BaseBus | None,
|
||||||
|
worker: str,
|
||||||
|
) -> None:
|
||||||
|
"""Like :func:`run_control_listener` but signals the process on stop.
|
||||||
|
|
||||||
|
Preferred for workers whose main loop is a blocking thread
|
||||||
|
(container-log tail, PTY read, scapy sniff) — wiring an
|
||||||
|
``asyncio.Event`` through the thread boundary is error-prone, and
|
||||||
|
every DECNET worker already has systemd-equivalent SIGTERM cleanup.
|
||||||
|
A SIGTERM self-signal routes the stop through that same path
|
||||||
|
without inventing a second shutdown mechanism.
|
||||||
|
|
||||||
|
Cancellation-safe. Never raises: a failed self-signal is logged
|
||||||
|
and the loop simply exits (admin can fall back to ``systemctl``).
|
||||||
|
"""
|
||||||
|
if bus is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
topic = _topics.system_control(worker)
|
||||||
|
with contextlib.suppress(asyncio.CancelledError):
|
||||||
|
try:
|
||||||
|
async with bus.subscribe(topic) as sub:
|
||||||
|
async for event in sub:
|
||||||
|
payload = event.payload or {}
|
||||||
|
action = payload.get("action")
|
||||||
|
requested_by = payload.get("requested_by", "<unknown>")
|
||||||
|
if action == _topics.WORKER_CONTROL_STOP:
|
||||||
|
log.info(
|
||||||
|
"control: stop requested worker=%s by=%s → SIGTERM self",
|
||||||
|
worker, requested_by,
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
os.kill(os.getpid(), signal.SIGTERM)
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
log.warning(
|
||||||
|
"control: self-signal failed worker=%s: %s",
|
||||||
|
worker, exc,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
log.debug(
|
||||||
|
"control: ignoring unknown action worker=%s action=%r",
|
||||||
|
worker, action,
|
||||||
|
)
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
log.warning(
|
||||||
|
"control signal listener failed worker=%s: %s",
|
||||||
|
worker, exc,
|
||||||
|
)
|
||||||
|
|||||||
@@ -82,6 +82,16 @@ SYSTEM_BUS_HEALTH = "bus.health"
|
|||||||
# :func:`system_health`. The leaf constant stays the same across workers;
|
# :func:`system_health`. The leaf constant stays the same across workers;
|
||||||
# the worker name goes in the middle token.
|
# the worker name goes in the middle token.
|
||||||
SYSTEM_HEALTH = "health"
|
SYSTEM_HEALTH = "health"
|
||||||
|
# Worker-control leaf — built per-worker as ``system.<worker>.control`` via
|
||||||
|
# :func:`system_control`. Admin-originated stop intents travel on this
|
||||||
|
# topic; each worker subscribes to its own.
|
||||||
|
SYSTEM_CONTROL = "control"
|
||||||
|
|
||||||
|
# Control payload ``action`` values — the wire vocabulary. Only ``stop`` is
|
||||||
|
# handled in v1; ``start`` is reserved because a stopped worker has no
|
||||||
|
# subscriber, so starting requires external supervision (systemd).
|
||||||
|
WORKER_CONTROL_STOP = "stop"
|
||||||
|
WORKER_CONTROL_START = "start"
|
||||||
|
|
||||||
|
|
||||||
# ─── Builders ────────────────────────────────────────────────────────────────
|
# ─── Builders ────────────────────────────────────────────────────────────────
|
||||||
@@ -152,6 +162,23 @@ def system_health(worker: str) -> str:
|
|||||||
return f"{SYSTEM}.{worker}.{SYSTEM_HEALTH}"
|
return f"{SYSTEM}.{worker}.{SYSTEM_HEALTH}"
|
||||||
|
|
||||||
|
|
||||||
|
def system_control(worker: str) -> str:
|
||||||
|
"""Build ``system.<worker>.control``.
|
||||||
|
|
||||||
|
Admin-originated stop (and, eventually, start) intents are published
|
||||||
|
here; the worker in question subscribes to its own address and reacts.
|
||||||
|
Payload shape::
|
||||||
|
|
||||||
|
{"action": "stop", "requested_by": "<username>", "ts": <unix>}
|
||||||
|
|
||||||
|
*action* must be one of :data:`WORKER_CONTROL_STOP` /
|
||||||
|
:data:`WORKER_CONTROL_START`; any other value is ignored by the
|
||||||
|
listener. Same segment rules as :func:`system_health`.
|
||||||
|
"""
|
||||||
|
_reject_tokens(worker)
|
||||||
|
return f"{SYSTEM}.{worker}.{SYSTEM_CONTROL}"
|
||||||
|
|
||||||
|
|
||||||
def _reject_tokens(*parts: str) -> None:
|
def _reject_tokens(*parts: str) -> None:
|
||||||
"""Reject topic segments that would break NATS-style tokenization.
|
"""Reject topic segments that would break NATS-style tokenization.
|
||||||
|
|
||||||
|
|||||||
@@ -20,7 +20,11 @@ from typing import Any, Callable, Optional
|
|||||||
|
|
||||||
from decnet.bus import topics as _topics
|
from decnet.bus import topics as _topics
|
||||||
from decnet.bus.factory import get_bus
|
from decnet.bus.factory import get_bus
|
||||||
from decnet.bus.publish import make_thread_safe_publisher
|
from decnet.bus.publish import (
|
||||||
|
make_thread_safe_publisher,
|
||||||
|
run_control_listener_signal,
|
||||||
|
run_health_heartbeat,
|
||||||
|
)
|
||||||
from decnet.logging import get_logger
|
from decnet.logging import get_logger
|
||||||
from decnet.telemetry import traced as _traced, get_tracer as _get_tracer, inject_context as _inject_ctx
|
from decnet.telemetry import traced as _traced, get_tracer as _get_tracer, inject_context as _inject_ctx
|
||||||
|
|
||||||
@@ -416,6 +420,14 @@ async def log_collector_worker(log_file: str) -> None:
|
|||||||
|
|
||||||
_publish_log = _make_system_log_publisher(bus, loop)
|
_publish_log = _make_system_log_publisher(bus, loop)
|
||||||
|
|
||||||
|
# Workers panel health heartbeat + bus-driven stop control. The
|
||||||
|
# heartbeat beacons on system.collector.health every 30s; the
|
||||||
|
# control listener translates a bus stop intent into a SIGTERM to
|
||||||
|
# this process (collector's main loop is a blocking thread pool, so
|
||||||
|
# self-signalling is cleaner than threading a shutdown event).
|
||||||
|
heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "collector"))
|
||||||
|
control_task = asyncio.create_task(run_control_listener_signal(bus, "collector"))
|
||||||
|
|
||||||
# Dedicated thread pool so long-running container log streams don't
|
# Dedicated thread pool so long-running container log streams don't
|
||||||
# saturate the default asyncio executor and starve short-lived
|
# saturate the default asyncio executor and starve short-lived
|
||||||
# to_thread() calls elsewhere (e.g. load_state in the web API).
|
# to_thread() calls elsewhere (e.g. load_state in the web API).
|
||||||
@@ -465,6 +477,10 @@ async def log_collector_worker(log_file: str) -> None:
|
|||||||
logger.error("collector error: %s", exc)
|
logger.error("collector error: %s", exc)
|
||||||
finally:
|
finally:
|
||||||
collector_pool.shutdown(wait=False)
|
collector_pool.shutdown(wait=False)
|
||||||
|
for t in (heartbeat_task, control_task):
|
||||||
|
t.cancel()
|
||||||
|
with contextlib.suppress(Exception, asyncio.CancelledError):
|
||||||
|
await t
|
||||||
if bus is not None:
|
if bus is not None:
|
||||||
with contextlib.suppress(Exception):
|
with contextlib.suppress(Exception):
|
||||||
await bus.close()
|
await bus.close()
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ from decnet.bus.base import BaseBus
|
|||||||
from decnet.bus.factory import get_bus
|
from decnet.bus.factory import get_bus
|
||||||
from decnet.bus.publish import (
|
from decnet.bus.publish import (
|
||||||
publish_safely as _publish_safely,
|
publish_safely as _publish_safely,
|
||||||
|
run_control_listener_signal as _run_control_listener_signal,
|
||||||
run_health_heartbeat as _run_health_heartbeat,
|
run_health_heartbeat as _run_health_heartbeat,
|
||||||
)
|
)
|
||||||
from decnet.mutator.events import MutationTrigger, emit_decky_mutated
|
from decnet.mutator.events import MutationTrigger, emit_decky_mutated
|
||||||
@@ -332,6 +333,11 @@ async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) ->
|
|||||||
heartbeat_task = asyncio.create_task(
|
heartbeat_task = asyncio.create_task(
|
||||||
_run_health_heartbeat(bus, "mutator"),
|
_run_health_heartbeat(bus, "mutator"),
|
||||||
)
|
)
|
||||||
|
# Control listener: SIGTERM-based so the existing shutdown path
|
||||||
|
# (cancel wake_tasks + heartbeat_task) runs unchanged.
|
||||||
|
wake_tasks.append(asyncio.create_task(
|
||||||
|
_run_control_listener_signal(bus, "mutator"),
|
||||||
|
))
|
||||||
except Exception as exc: # noqa: BLE001
|
except Exception as exc: # noqa: BLE001
|
||||||
log.warning("mutator: bus unavailable, running in poll-only mode: %s", exc)
|
log.warning("mutator: bus unavailable, running in poll-only mode: %s", exc)
|
||||||
|
|
||||||
|
|||||||
@@ -30,7 +30,11 @@ from typing import Any, Callable
|
|||||||
from decnet.bus import topics as _topics
|
from decnet.bus import topics as _topics
|
||||||
from decnet.bus.base import BaseBus
|
from decnet.bus.base import BaseBus
|
||||||
from decnet.bus.factory import get_bus
|
from decnet.bus.factory import get_bus
|
||||||
from decnet.bus.publish import make_thread_safe_publisher
|
from decnet.bus.publish import (
|
||||||
|
make_thread_safe_publisher,
|
||||||
|
run_control_listener,
|
||||||
|
run_health_heartbeat,
|
||||||
|
)
|
||||||
from decnet.logging import get_logger
|
from decnet.logging import get_logger
|
||||||
from decnet.prober.hassh import hassh_server
|
from decnet.prober.hassh import hassh_server
|
||||||
from decnet.prober.jarm import JARM_EMPTY_HASH, jarm_hash
|
from decnet.prober.jarm import JARM_EMPTY_HASH, jarm_hash
|
||||||
@@ -519,8 +523,13 @@ async def prober_worker(
|
|||||||
event_type,
|
event_type,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
shutdown = asyncio.Event()
|
||||||
|
heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "prober"))
|
||||||
|
control_task = asyncio.create_task(
|
||||||
|
run_control_listener(bus, "prober", shutdown),
|
||||||
|
)
|
||||||
try:
|
try:
|
||||||
while True:
|
while not shutdown.is_set():
|
||||||
# Discover new attacker IPs from the log stream
|
# Discover new attacker IPs from the log stream
|
||||||
new_ips, log_position = await asyncio.to_thread(
|
new_ips, log_position = await asyncio.to_thread(
|
||||||
_discover_attackers, json_path, log_position,
|
_discover_attackers, json_path, log_position,
|
||||||
@@ -542,8 +551,15 @@ async def prober_worker(
|
|||||||
_publish_attacker,
|
_publish_attacker,
|
||||||
)
|
)
|
||||||
|
|
||||||
await asyncio.sleep(interval)
|
try:
|
||||||
|
await asyncio.wait_for(shutdown.wait(), timeout=interval)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
pass
|
||||||
finally:
|
finally:
|
||||||
|
for t in (heartbeat_task, control_task):
|
||||||
|
t.cancel()
|
||||||
|
with contextlib.suppress(Exception, asyncio.CancelledError):
|
||||||
|
await t
|
||||||
if bus is not None:
|
if bus is not None:
|
||||||
with contextlib.suppress(Exception):
|
with contextlib.suppress(Exception):
|
||||||
await bus.close()
|
await bus.close()
|
||||||
|
|||||||
@@ -21,7 +21,11 @@ from typing import Any, Callable
|
|||||||
|
|
||||||
from decnet.bus import topics as _topics
|
from decnet.bus import topics as _topics
|
||||||
from decnet.bus.factory import get_bus
|
from decnet.bus.factory import get_bus
|
||||||
from decnet.bus.publish import make_thread_safe_publisher
|
from decnet.bus.publish import (
|
||||||
|
make_thread_safe_publisher,
|
||||||
|
run_control_listener,
|
||||||
|
run_health_heartbeat,
|
||||||
|
)
|
||||||
from decnet.correlation.engine import CorrelationEngine
|
from decnet.correlation.engine import CorrelationEngine
|
||||||
from decnet.correlation.parser import LogEvent
|
from decnet.correlation.parser import LogEvent
|
||||||
from decnet.logging import get_logger
|
from decnet.logging import get_logger
|
||||||
@@ -87,14 +91,32 @@ async def attacker_profile_worker(repo: BaseRepository, *, interval: int = 30) -
|
|||||||
state.last_log_id = _saved_cursor.get("last_log_id", 0)
|
state.last_log_id = _saved_cursor.get("last_log_id", 0)
|
||||||
state.initialized = True
|
state.initialized = True
|
||||||
logger.info("attacker worker: resumed from cursor last_log_id=%d", state.last_log_id)
|
logger.info("attacker worker: resumed from cursor last_log_id=%d", state.last_log_id)
|
||||||
|
|
||||||
|
# Workers panel wiring: heartbeat + bus-driven stop. Main loop is
|
||||||
|
# pure asyncio sleep/await, so an event-based control listener
|
||||||
|
# drops in cleanly without a SIGTERM self-signal.
|
||||||
|
shutdown = asyncio.Event()
|
||||||
|
heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "profiler"))
|
||||||
|
control_task = asyncio.create_task(
|
||||||
|
run_control_listener(bus, "profiler", shutdown),
|
||||||
|
)
|
||||||
try:
|
try:
|
||||||
while True:
|
while not shutdown.is_set():
|
||||||
await asyncio.sleep(interval)
|
try:
|
||||||
|
await asyncio.wait_for(shutdown.wait(), timeout=interval)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
pass # normal tick
|
||||||
|
if shutdown.is_set():
|
||||||
|
break
|
||||||
try:
|
try:
|
||||||
await _incremental_update(repo, state)
|
await _incremental_update(repo, state)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.error("attacker worker: update failed: %s", exc)
|
logger.error("attacker worker: update failed: %s", exc)
|
||||||
finally:
|
finally:
|
||||||
|
for t in (heartbeat_task, control_task):
|
||||||
|
t.cancel()
|
||||||
|
with contextlib.suppress(Exception, asyncio.CancelledError):
|
||||||
|
await t
|
||||||
if bus is not None:
|
if bus is not None:
|
||||||
with contextlib.suppress(Exception):
|
with contextlib.suppress(Exception):
|
||||||
await bus.close()
|
await bus.close()
|
||||||
|
|||||||
@@ -22,7 +22,11 @@ from typing import Any, Callable
|
|||||||
from decnet.bus import topics as _topics
|
from decnet.bus import topics as _topics
|
||||||
from decnet.bus.base import BaseBus
|
from decnet.bus.base import BaseBus
|
||||||
from decnet.bus.factory import get_bus
|
from decnet.bus.factory import get_bus
|
||||||
from decnet.bus.publish import make_thread_safe_publisher
|
from decnet.bus.publish import (
|
||||||
|
make_thread_safe_publisher,
|
||||||
|
run_control_listener_signal,
|
||||||
|
run_health_heartbeat,
|
||||||
|
)
|
||||||
from decnet.logging import get_logger
|
from decnet.logging import get_logger
|
||||||
from decnet.network import HOST_IPVLAN_IFACE, HOST_MACVLAN_IFACE
|
from decnet.network import HOST_IPVLAN_IFACE, HOST_MACVLAN_IFACE
|
||||||
from decnet.sniffer.fingerprint import SnifferEngine
|
from decnet.sniffer.fingerprint import SnifferEngine
|
||||||
@@ -198,6 +202,15 @@ async def sniffer_worker(log_file: str) -> None:
|
|||||||
if bus is not None:
|
if bus is not None:
|
||||||
publish_fn = _make_decky_traffic_publisher(bus, loop)
|
publish_fn = _make_decky_traffic_publisher(bus, loop)
|
||||||
|
|
||||||
|
# Workers panel: heartbeat + SIGTERM-based stop control. The
|
||||||
|
# sniff loop is a blocking scapy thread, so an asyncio shutdown
|
||||||
|
# event can't reach it — translating the bus stop into SIGTERM
|
||||||
|
# routes through the existing CancelledError path below.
|
||||||
|
heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "sniffer"))
|
||||||
|
control_task = asyncio.create_task(
|
||||||
|
run_control_listener_signal(bus, "sniffer"),
|
||||||
|
)
|
||||||
|
|
||||||
# Dedicated thread pool so the long-running sniff loop doesn't
|
# Dedicated thread pool so the long-running sniff loop doesn't
|
||||||
# occupy a slot in the default asyncio executor.
|
# occupy a slot in the default asyncio executor.
|
||||||
sniffer_pool = ThreadPoolExecutor(
|
sniffer_pool = ThreadPoolExecutor(
|
||||||
@@ -216,6 +229,10 @@ async def sniffer_worker(log_file: str) -> None:
|
|||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
sniffer_pool.shutdown(wait=False)
|
sniffer_pool.shutdown(wait=False)
|
||||||
|
for t in (heartbeat_task, control_task):
|
||||||
|
t.cancel()
|
||||||
|
with contextlib.suppress(Exception, asyncio.CancelledError):
|
||||||
|
await t
|
||||||
if bus is not None:
|
if bus is not None:
|
||||||
with contextlib.suppress(Exception):
|
with contextlib.suppress(Exception):
|
||||||
await bus.close()
|
await bus.close()
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ ingestion_task: Optional[asyncio.Task[Any]] = None
|
|||||||
collector_task: Optional[asyncio.Task[Any]] = None
|
collector_task: Optional[asyncio.Task[Any]] = None
|
||||||
attacker_task: Optional[asyncio.Task[Any]] = None
|
attacker_task: Optional[asyncio.Task[Any]] = None
|
||||||
sniffer_task: Optional[asyncio.Task[Any]] = None
|
sniffer_task: Optional[asyncio.Task[Any]] = None
|
||||||
|
heartbeat_task: Optional[asyncio.Task[Any]] = None
|
||||||
|
|
||||||
|
|
||||||
def get_background_tasks() -> dict[str, Optional[asyncio.Task[Any]]]:
|
def get_background_tasks() -> dict[str, Optional[asyncio.Task[Any]]]:
|
||||||
@@ -45,6 +46,7 @@ def get_background_tasks() -> dict[str, Optional[asyncio.Task[Any]]]:
|
|||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
||||||
global ingestion_task, collector_task, attacker_task, sniffer_task
|
global ingestion_task, collector_task, attacker_task, sniffer_task
|
||||||
|
global heartbeat_task
|
||||||
|
|
||||||
import resource
|
import resource
|
||||||
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
|
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
|
||||||
@@ -115,10 +117,33 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
|||||||
else:
|
else:
|
||||||
log.info("Contract Test Mode: skipping background worker startup")
|
log.info("Contract Test Mode: skipping background worker startup")
|
||||||
|
|
||||||
|
# Worker registry + API self-heartbeat — always on, even under
|
||||||
|
# contract-test mode, so the Workers panel can render the process
|
||||||
|
# without the dev needing to run a full stack. A missing bus turns
|
||||||
|
# both into no-ops inside the helpers.
|
||||||
|
try:
|
||||||
|
from decnet.bus.app import get_app_bus
|
||||||
|
from decnet.bus.publish import run_health_heartbeat
|
||||||
|
from decnet.web.worker_registry import get_registry
|
||||||
|
|
||||||
|
_bus = await get_app_bus()
|
||||||
|
await get_registry().start(_bus)
|
||||||
|
if heartbeat_task is None or heartbeat_task.done():
|
||||||
|
heartbeat_task = asyncio.create_task(
|
||||||
|
run_health_heartbeat(_bus, "api"),
|
||||||
|
)
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
log.warning("worker registry bootstrap failed: %s", exc)
|
||||||
|
|
||||||
yield
|
yield
|
||||||
|
|
||||||
log.info("API shutdown cancelling background tasks")
|
log.info("API shutdown cancelling background tasks")
|
||||||
for task in (ingestion_task, collector_task, attacker_task, sniffer_task):
|
try:
|
||||||
|
from decnet.web.worker_registry import get_registry
|
||||||
|
await get_registry().stop()
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
log.warning("worker registry stop raised: %s", exc)
|
||||||
|
for task in (ingestion_task, collector_task, attacker_task, sniffer_task, heartbeat_task):
|
||||||
if task and not task.done():
|
if task and not task.done():
|
||||||
task.cancel()
|
task.cancel()
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from typing import Literal, Optional, Any, List, Annotated
|
from typing import Dict, Literal, Optional, Any, List, Annotated
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
from sqlalchemy import Column, Index, Text, UniqueConstraint
|
from sqlalchemy import Column, Index, Text, UniqueConstraint
|
||||||
from sqlalchemy.dialects.mysql import MEDIUMTEXT
|
from sqlalchemy.dialects.mysql import MEDIUMTEXT
|
||||||
@@ -452,6 +452,52 @@ class HealthResponse(BaseModel):
|
|||||||
components: dict[str, ComponentHealth]
|
components: dict[str, ComponentHealth]
|
||||||
|
|
||||||
|
|
||||||
|
# --- Workers panel (Config → Workers) ---
|
||||||
|
# Bus-backed health + control: workers heartbeat on ``system.<name>.health``
|
||||||
|
# and listen on ``system.<name>.control``. The API aggregates last-seen
|
||||||
|
# heartbeats via the worker registry; these are the HTTP-facing shapes.
|
||||||
|
|
||||||
|
class WorkerStatus(BaseModel):
|
||||||
|
name: str
|
||||||
|
# ``ok`` — heartbeat within 90s (3× 30s heartbeat interval)
|
||||||
|
# ``stale`` — worker was seen before but hasn't pulsed in 90s+
|
||||||
|
# ``unknown`` — we've never received a heartbeat from this name
|
||||||
|
status: Literal["ok", "stale", "unknown"]
|
||||||
|
last_heartbeat_ts: Optional[float] = None
|
||||||
|
seconds_since: Optional[float] = None
|
||||||
|
# Whatever the worker's ``extra()`` callback put in the heartbeat;
|
||||||
|
# opaque to the panel, displayed only if the UI knows the key.
|
||||||
|
extra: Dict[str, Any] = PydanticField(default_factory=dict)
|
||||||
|
# True iff a ``decnet-<name>.service`` unit file is present on the
|
||||||
|
# host. False flips the UI START button to disabled with a
|
||||||
|
# "Unit not installed" tooltip. Default True for backwards compat
|
||||||
|
# on clients that pre-date the field.
|
||||||
|
installed: bool = True
|
||||||
|
|
||||||
|
|
||||||
|
class WorkersResponse(BaseModel):
|
||||||
|
workers: List[WorkerStatus]
|
||||||
|
generated_at: float
|
||||||
|
bus_connected: bool
|
||||||
|
|
||||||
|
|
||||||
|
class WorkerControlResponse(BaseModel):
|
||||||
|
accepted: bool
|
||||||
|
worker: str
|
||||||
|
action: str
|
||||||
|
|
||||||
|
|
||||||
|
class StartFailure(BaseModel):
|
||||||
|
name: str
|
||||||
|
reason: str
|
||||||
|
|
||||||
|
|
||||||
|
class StartAllResponse(BaseModel):
|
||||||
|
started: List[str]
|
||||||
|
already_running: List[str]
|
||||||
|
failed: List[StartFailure]
|
||||||
|
|
||||||
|
|
||||||
# --- Swarm API DTOs ---
|
# --- Swarm API DTOs ---
|
||||||
# Request/response contracts for the master-side swarm controller
|
# Request/response contracts for the master-side swarm controller
|
||||||
# (decnet/web/swarm_api.py). The underlying SQLModel tables — SwarmHost and
|
# (decnet/web/swarm_api.py). The underlying SQLModel tables — SwarmHost and
|
||||||
|
|||||||
@@ -22,6 +22,8 @@ from .config.api_update_config import router as config_update_router
|
|||||||
from .config.api_manage_users import router as config_users_router
|
from .config.api_manage_users import router as config_users_router
|
||||||
from .config.api_reinit import router as config_reinit_router
|
from .config.api_reinit import router as config_reinit_router
|
||||||
from .health.api_get_health import router as health_router
|
from .health.api_get_health import router as health_router
|
||||||
|
from .workers.api_list_workers import router as workers_list_router
|
||||||
|
from .workers.api_control_worker import router as workers_control_router
|
||||||
from .artifacts.api_get_artifact import router as artifacts_router
|
from .artifacts.api_get_artifact import router as artifacts_router
|
||||||
from .swarm_updates import swarm_updates_router
|
from .swarm_updates import swarm_updates_router
|
||||||
from .swarm_mgmt import swarm_mgmt_router
|
from .swarm_mgmt import swarm_mgmt_router
|
||||||
@@ -69,6 +71,8 @@ api_router.include_router(attacker_transcripts_router)
|
|||||||
api_router.include_router(stats_router)
|
api_router.include_router(stats_router)
|
||||||
api_router.include_router(stream_router)
|
api_router.include_router(stream_router)
|
||||||
api_router.include_router(health_router)
|
api_router.include_router(health_router)
|
||||||
|
api_router.include_router(workers_list_router)
|
||||||
|
api_router.include_router(workers_control_router)
|
||||||
|
|
||||||
# Configuration
|
# Configuration
|
||||||
api_router.include_router(config_get_router)
|
api_router.include_router(config_get_router)
|
||||||
|
|||||||
0
decnet/web/router/workers/__init__.py
Normal file
0
decnet/web/router/workers/__init__.py
Normal file
76
decnet/web/router/workers/api_control_worker.py
Normal file
76
decnet/web/router/workers/api_control_worker.py
Normal file
@@ -0,0 +1,76 @@
|
|||||||
|
import time
|
||||||
|
|
||||||
|
from fastapi import APIRouter, Depends, HTTPException, status
|
||||||
|
from fastapi.responses import ORJSONResponse
|
||||||
|
|
||||||
|
from decnet.bus import topics as _topics
|
||||||
|
from decnet.bus.app import get_app_bus
|
||||||
|
from decnet.bus.publish import publish_safely
|
||||||
|
from decnet.logging import get_logger
|
||||||
|
from decnet.telemetry import traced as _traced
|
||||||
|
from decnet.web.db.models import WorkerControlResponse
|
||||||
|
from decnet.web.dependencies import require_admin
|
||||||
|
from decnet.web.worker_registry import KNOWN_WORKERS
|
||||||
|
|
||||||
|
log = get_logger("api")
|
||||||
|
|
||||||
|
router = APIRouter()
|
||||||
|
|
||||||
|
|
||||||
|
@router.post(
|
||||||
|
"/workers/{name}/stop",
|
||||||
|
tags=["Observability"],
|
||||||
|
responses={
|
||||||
|
202: {"model": WorkerControlResponse, "description": "Stop intent queued on bus"},
|
||||||
|
401: {"description": "Could not validate credentials"},
|
||||||
|
403: {"description": "Insufficient permissions"},
|
||||||
|
404: {"description": "Unknown worker"},
|
||||||
|
503: {"description": "Bus unavailable"},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
@_traced("api.stop_worker")
|
||||||
|
async def stop_worker(
|
||||||
|
name: str,
|
||||||
|
admin: dict = Depends(require_admin),
|
||||||
|
) -> ORJSONResponse:
|
||||||
|
"""Publish a stop intent on ``system.<name>.control``.
|
||||||
|
|
||||||
|
Fire-and-forget: the endpoint does not wait for the worker to
|
||||||
|
actually exit — the caller observes the status row in the Workers
|
||||||
|
panel flipping to ``stale`` as heartbeats stop. Consistent with the
|
||||||
|
rest of the bus contract (at-most-once, DB is source of truth for
|
||||||
|
any persistent state; the bus is the notification layer).
|
||||||
|
"""
|
||||||
|
if name not in KNOWN_WORKERS:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_404_NOT_FOUND,
|
||||||
|
detail=f"Unknown worker: {name!r}",
|
||||||
|
)
|
||||||
|
|
||||||
|
bus = await get_app_bus()
|
||||||
|
if bus is None:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
||||||
|
detail="bus unavailable",
|
||||||
|
)
|
||||||
|
topic = _topics.system_control(name)
|
||||||
|
payload = {
|
||||||
|
"action": _topics.WORKER_CONTROL_STOP,
|
||||||
|
"requested_by": admin.get("username") or admin.get("sub") or "admin",
|
||||||
|
"ts": time.time(),
|
||||||
|
}
|
||||||
|
await publish_safely(bus, topic, payload, event_type=_topics.SYSTEM_CONTROL)
|
||||||
|
log.info(
|
||||||
|
"workers: stop requested worker=%s by=%s",
|
||||||
|
name, payload["requested_by"],
|
||||||
|
)
|
||||||
|
|
||||||
|
body = WorkerControlResponse(
|
||||||
|
accepted=True,
|
||||||
|
worker=name,
|
||||||
|
action=_topics.WORKER_CONTROL_STOP,
|
||||||
|
)
|
||||||
|
return ORJSONResponse(
|
||||||
|
content=body.model_dump(),
|
||||||
|
status_code=status.HTTP_202_ACCEPTED,
|
||||||
|
)
|
||||||
35
decnet/web/router/workers/api_list_workers.py
Normal file
35
decnet/web/router/workers/api_list_workers.py
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
import time
|
||||||
|
|
||||||
|
from fastapi import APIRouter, Depends
|
||||||
|
|
||||||
|
from decnet.bus.app import get_app_bus
|
||||||
|
from decnet.telemetry import traced as _traced
|
||||||
|
from decnet.web.db.models import WorkersResponse
|
||||||
|
from decnet.web.dependencies import require_viewer
|
||||||
|
from decnet.web.services import systemd_control
|
||||||
|
from decnet.web.worker_registry import get_registry
|
||||||
|
|
||||||
|
router = APIRouter()
|
||||||
|
|
||||||
|
|
||||||
|
@router.get(
|
||||||
|
"/workers",
|
||||||
|
response_model=WorkersResponse,
|
||||||
|
tags=["Observability"],
|
||||||
|
responses={
|
||||||
|
401: {"description": "Could not validate credentials"},
|
||||||
|
403: {"description": "Insufficient permissions"},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
@_traced("api.list_workers")
|
||||||
|
async def list_workers(user: dict = Depends(require_viewer)) -> WorkersResponse:
|
||||||
|
workers = get_registry().snapshot()
|
||||||
|
bus = await get_app_bus()
|
||||||
|
installed = await systemd_control.list_installed()
|
||||||
|
for w in workers:
|
||||||
|
w.installed = w.name in installed
|
||||||
|
return WorkersResponse(
|
||||||
|
workers=workers,
|
||||||
|
generated_at=time.time(),
|
||||||
|
bus_connected=bus is not None,
|
||||||
|
)
|
||||||
202
decnet/web/worker_registry.py
Normal file
202
decnet/web/worker_registry.py
Normal file
@@ -0,0 +1,202 @@
|
|||||||
|
"""In-process registry that aggregates worker heartbeats from the bus.
|
||||||
|
|
||||||
|
The API process subscribes to ``system.*.health`` (plus the bare
|
||||||
|
``system.bus.health`` leaf that the bus daemon itself publishes) at
|
||||||
|
lifespan startup and keeps a simple last-seen dict. The
|
||||||
|
:func:`snapshot` call renders that into a stable list of known workers,
|
||||||
|
including those we have **never** heard from (surfaced as ``unknown`` —
|
||||||
|
which is distinct from ``stale``, a worker that used to pulse but went
|
||||||
|
silent).
|
||||||
|
|
||||||
|
Names are the canonical singular forms used by the CLI table in
|
||||||
|
``CLAUDE.md``. Keeping the list hardcoded is deliberate: an unknown
|
||||||
|
topic segment would otherwise let any publisher inject a row into the
|
||||||
|
Workers panel.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import time
|
||||||
|
from typing import Any, Dict, List
|
||||||
|
|
||||||
|
from decnet.bus import topics as _topics
|
||||||
|
from decnet.bus.base import BaseBus
|
||||||
|
from decnet.logging import get_logger
|
||||||
|
from decnet.web.db.models import WorkerStatus
|
||||||
|
|
||||||
|
log = get_logger("web.worker_registry")
|
||||||
|
|
||||||
|
# Canonical worker names. Mirrors the CLAUDE.md worker table; keep in
|
||||||
|
# sync when a new worker CLI lands. The API process is included because
|
||||||
|
# it self-publishes its own heartbeat so the panel has at least one
|
||||||
|
# always-on row to ground "bus is reachable" sanity.
|
||||||
|
KNOWN_WORKERS: tuple[str, ...] = (
|
||||||
|
"api",
|
||||||
|
"bus",
|
||||||
|
"collector",
|
||||||
|
"profiler", # hosts the correlation engine too — no separate daemon
|
||||||
|
"sniffer",
|
||||||
|
"prober",
|
||||||
|
"mutator",
|
||||||
|
"agent",
|
||||||
|
"forwarder",
|
||||||
|
"updater",
|
||||||
|
)
|
||||||
|
|
||||||
|
# ``ok`` window: 3× the 30s heartbeat interval in
|
||||||
|
# :func:`decnet.bus.publish.run_health_heartbeat`. One missed beat is
|
||||||
|
# noise; three missed beats is a worker problem.
|
||||||
|
OK_WINDOW_SECONDS = 90.0
|
||||||
|
|
||||||
|
|
||||||
|
class WorkerRegistry:
|
||||||
|
"""Last-seen aggregator for worker heartbeats.
|
||||||
|
|
||||||
|
Single-writer (the subscriber task) + many-reader (HTTP requests).
|
||||||
|
Python's GIL makes the dict mutations atomic enough that we don't
|
||||||
|
need a lock for the read path; ``snapshot`` copies the dict under a
|
||||||
|
single reference so a concurrent write can't produce a torn view.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
# name → {"ts": float, "payload": dict}
|
||||||
|
self._seen: Dict[str, Dict[str, Any]] = {}
|
||||||
|
self._task: asyncio.Task[None] | None = None
|
||||||
|
|
||||||
|
def record(self, worker: str, ts: float, payload: Dict[str, Any]) -> None:
|
||||||
|
self._seen[worker] = {"ts": ts, "payload": payload}
|
||||||
|
|
||||||
|
def snapshot(self) -> List[WorkerStatus]:
|
||||||
|
now = time.time()
|
||||||
|
seen = dict(self._seen) # point-in-time copy
|
||||||
|
out: List[WorkerStatus] = []
|
||||||
|
for name in KNOWN_WORKERS:
|
||||||
|
entry = seen.get(name)
|
||||||
|
if entry is None:
|
||||||
|
out.append(WorkerStatus(
|
||||||
|
name=name,
|
||||||
|
status="unknown",
|
||||||
|
last_heartbeat_ts=None,
|
||||||
|
seconds_since=None,
|
||||||
|
extra={},
|
||||||
|
))
|
||||||
|
continue
|
||||||
|
ts = float(entry["ts"])
|
||||||
|
seconds_since = max(0.0, now - ts)
|
||||||
|
status = "ok" if seconds_since < OK_WINDOW_SECONDS else "stale"
|
||||||
|
payload = dict(entry.get("payload") or {})
|
||||||
|
# ``worker`` and ``ts`` are redundant with the row itself;
|
||||||
|
# strip them so ``extra`` only surfaces worker-contributed
|
||||||
|
# metadata (uptime, queue depth, etc.).
|
||||||
|
payload.pop("worker", None)
|
||||||
|
payload.pop("ts", None)
|
||||||
|
out.append(WorkerStatus(
|
||||||
|
name=name,
|
||||||
|
status=status, # type: ignore[arg-type]
|
||||||
|
last_heartbeat_ts=ts,
|
||||||
|
seconds_since=seconds_since,
|
||||||
|
extra=payload,
|
||||||
|
))
|
||||||
|
return out
|
||||||
|
|
||||||
|
async def start(self, bus: BaseBus | None) -> None:
|
||||||
|
"""Begin subscribing. Idempotent; a second call is a no-op."""
|
||||||
|
if self._task is not None and not self._task.done():
|
||||||
|
return
|
||||||
|
if bus is None:
|
||||||
|
log.debug("worker registry: no bus — panel will show all UNKNOWN")
|
||||||
|
return
|
||||||
|
self._task = asyncio.create_task(self._run(bus))
|
||||||
|
|
||||||
|
async def stop(self) -> None:
|
||||||
|
"""Cancel the subscriber task. Idempotent."""
|
||||||
|
if self._task is None:
|
||||||
|
return
|
||||||
|
self._task.cancel()
|
||||||
|
try:
|
||||||
|
await self._task
|
||||||
|
except (asyncio.CancelledError, Exception): # noqa: BLE001
|
||||||
|
pass
|
||||||
|
self._task = None
|
||||||
|
|
||||||
|
async def _run(self, bus: BaseBus) -> None:
|
||||||
|
"""Fan in both ``system.*.health`` and ``system.bus.health``.
|
||||||
|
|
||||||
|
Two subscriptions because the bus's own heartbeat uses the
|
||||||
|
pre-existing ``system.bus.health`` string (not nested under
|
||||||
|
``system.<worker>.health``) and ``*`` matches exactly one token,
|
||||||
|
so the wildcard would miss it.
|
||||||
|
"""
|
||||||
|
worker_sub = bus.subscribe("system.*.health")
|
||||||
|
bus_sub = bus.subscribe(_topics.system("bus.health"))
|
||||||
|
try:
|
||||||
|
async with worker_sub, bus_sub:
|
||||||
|
async for event in _merge(worker_sub, bus_sub):
|
||||||
|
self._on_event(event)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
log.warning("worker registry subscriber exited: %s", exc)
|
||||||
|
|
||||||
|
def _on_event(self, event: Any) -> None:
|
||||||
|
payload = event.payload or {}
|
||||||
|
# ``system.<name>.health`` — middle token is the worker name.
|
||||||
|
# ``system.bus.health`` — special case, derive from topic.
|
||||||
|
tokens = event.topic.split(".")
|
||||||
|
name: str | None = None
|
||||||
|
if len(tokens) == 3 and tokens[0] == "system" and tokens[2] == "health":
|
||||||
|
# tokens[1] is the worker name; also handles "bus" when the
|
||||||
|
# bus daemon publishes ``system.bus.health``.
|
||||||
|
name = tokens[1]
|
||||||
|
if not name:
|
||||||
|
return
|
||||||
|
if name not in KNOWN_WORKERS:
|
||||||
|
# Unknown worker name — log once at debug; don't widen the
|
||||||
|
# panel beyond the hardcoded list.
|
||||||
|
log.debug("heartbeat from unknown worker=%r", name)
|
||||||
|
return
|
||||||
|
ts = float(payload.get("ts", time.time()))
|
||||||
|
self.record(name, ts, payload)
|
||||||
|
|
||||||
|
|
||||||
|
async def _merge(*subs: Any):
|
||||||
|
"""Round-robin over multiple subscriptions without losing events.
|
||||||
|
|
||||||
|
asyncio.wait + FIRST_COMPLETED keeps both streams live; a plain
|
||||||
|
``async for`` over a merged generator would serialise them.
|
||||||
|
"""
|
||||||
|
iters = [sub.__aiter__() for sub in subs]
|
||||||
|
pending = {asyncio.create_task(it.__anext__()): it for it in iters}
|
||||||
|
try:
|
||||||
|
while pending:
|
||||||
|
done, _ = await asyncio.wait(
|
||||||
|
pending.keys(), return_when=asyncio.FIRST_COMPLETED,
|
||||||
|
)
|
||||||
|
for task in done:
|
||||||
|
it = pending.pop(task)
|
||||||
|
try:
|
||||||
|
yield task.result()
|
||||||
|
except StopAsyncIteration:
|
||||||
|
continue
|
||||||
|
pending[asyncio.create_task(it.__anext__())] = it
|
||||||
|
finally:
|
||||||
|
for task in pending:
|
||||||
|
task.cancel()
|
||||||
|
|
||||||
|
|
||||||
|
# Module-level singleton so the API lifespan and route handlers share
|
||||||
|
# one registry without threading it through every Depends.
|
||||||
|
_registry: WorkerRegistry | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_registry() -> WorkerRegistry:
|
||||||
|
global _registry
|
||||||
|
if _registry is None:
|
||||||
|
_registry = WorkerRegistry()
|
||||||
|
return _registry
|
||||||
|
|
||||||
|
|
||||||
|
def reset_registry_for_tests() -> None:
|
||||||
|
"""Drop the singleton — tests that spin up their own registry."""
|
||||||
|
global _registry
|
||||||
|
_registry = None
|
||||||
0
tests/api/workers/__init__.py
Normal file
0
tests/api/workers/__init__.py
Normal file
171
tests/api/workers/test_workers_api.py
Normal file
171
tests/api/workers/test_workers_api.py
Normal file
@@ -0,0 +1,171 @@
|
|||||||
|
"""Tests for the Workers panel API endpoints.
|
||||||
|
|
||||||
|
Covers ``GET /api/v1/workers`` (viewer-readable, always surfaces every
|
||||||
|
known worker) and ``POST /api/v1/workers/{name}/stop`` (admin-only,
|
||||||
|
publishes a stop intent on the bus).
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from decnet.bus import topics as _topics
|
||||||
|
from decnet.bus.fake import FakeBus
|
||||||
|
from decnet.web import worker_registry as _wr
|
||||||
|
from decnet.web.router.workers import api_control_worker as _ctl
|
||||||
|
from decnet.web.router.workers import api_list_workers as _list
|
||||||
|
from decnet.web.worker_registry import KNOWN_WORKERS
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _reset_registry() -> None:
|
||||||
|
_wr.reset_registry_for_tests()
|
||||||
|
yield
|
||||||
|
_wr.reset_registry_for_tests()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
async def fake_bus(monkeypatch) -> FakeBus:
|
||||||
|
bus = FakeBus()
|
||||||
|
await bus.connect()
|
||||||
|
|
||||||
|
async def _stub_get_app_bus() -> FakeBus:
|
||||||
|
return bus
|
||||||
|
|
||||||
|
# Patch the symbol the control endpoint imported into its namespace.
|
||||||
|
monkeypatch.setattr(_ctl, "get_app_bus", _stub_get_app_bus)
|
||||||
|
yield bus
|
||||||
|
await bus.close()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_list_workers_viewer_sees_all_unknown(
|
||||||
|
client: httpx.AsyncClient, viewer_token: str,
|
||||||
|
) -> None:
|
||||||
|
resp = await client.get(
|
||||||
|
"/api/v1/workers",
|
||||||
|
headers={"Authorization": f"Bearer {viewer_token}"},
|
||||||
|
)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
body = resp.json()
|
||||||
|
names = {w["name"] for w in body["workers"]}
|
||||||
|
assert names == set(KNOWN_WORKERS)
|
||||||
|
# No heartbeats have arrived in the test harness, so every row is unknown.
|
||||||
|
for w in body["workers"]:
|
||||||
|
assert w["status"] == "unknown"
|
||||||
|
assert w["last_heartbeat_ts"] is None
|
||||||
|
assert w["seconds_since"] is None
|
||||||
|
assert "bus_connected" in body
|
||||||
|
assert isinstance(body["bus_connected"], bool)
|
||||||
|
# `installed` flag is always present + boolean.
|
||||||
|
for w in body["workers"]:
|
||||||
|
assert "installed" in w
|
||||||
|
assert isinstance(w["installed"], bool)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_list_workers_requires_auth(client: httpx.AsyncClient) -> None:
|
||||||
|
resp = await client.get("/api/v1/workers")
|
||||||
|
assert resp.status_code == 401
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_list_workers_reports_bus_connected_false_when_no_bus(
|
||||||
|
client: httpx.AsyncClient, viewer_token: str, monkeypatch,
|
||||||
|
) -> None:
|
||||||
|
async def _no_bus() -> None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
monkeypatch.setattr(_list, "get_app_bus", _no_bus)
|
||||||
|
resp = await client.get(
|
||||||
|
"/api/v1/workers",
|
||||||
|
headers={"Authorization": f"Bearer {viewer_token}"},
|
||||||
|
)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert resp.json()["bus_connected"] is False
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_list_workers_reports_bus_connected_true_with_fake_bus(
|
||||||
|
client: httpx.AsyncClient, viewer_token: str, monkeypatch,
|
||||||
|
) -> None:
|
||||||
|
bus = FakeBus()
|
||||||
|
await bus.connect()
|
||||||
|
|
||||||
|
async def _fake_bus() -> FakeBus:
|
||||||
|
return bus
|
||||||
|
|
||||||
|
monkeypatch.setattr(_list, "get_app_bus", _fake_bus)
|
||||||
|
try:
|
||||||
|
resp = await client.get(
|
||||||
|
"/api/v1/workers",
|
||||||
|
headers={"Authorization": f"Bearer {viewer_token}"},
|
||||||
|
)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert resp.json()["bus_connected"] is True
|
||||||
|
finally:
|
||||||
|
await bus.close()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_stop_worker_admin_publishes_on_bus(
|
||||||
|
client: httpx.AsyncClient, auth_token: str, fake_bus: FakeBus,
|
||||||
|
) -> None:
|
||||||
|
topic = _topics.system_control("mutator")
|
||||||
|
received: list[Any] = []
|
||||||
|
|
||||||
|
sub = fake_bus.subscribe(topic)
|
||||||
|
await sub.__aenter__()
|
||||||
|
|
||||||
|
async def _drain() -> None:
|
||||||
|
async for event in sub:
|
||||||
|
received.append(event)
|
||||||
|
return
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
reader = asyncio.create_task(_drain())
|
||||||
|
# Give the subscribe a tick so the publish lands on a live reader.
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
try:
|
||||||
|
resp = await client.post(
|
||||||
|
"/api/v1/workers/mutator/stop",
|
||||||
|
headers={"Authorization": f"Bearer {auth_token}"},
|
||||||
|
)
|
||||||
|
assert resp.status_code == 202
|
||||||
|
body = resp.json()
|
||||||
|
assert body == {"accepted": True, "worker": "mutator", "action": "stop"}
|
||||||
|
|
||||||
|
await asyncio.wait_for(reader, timeout=1.0)
|
||||||
|
assert len(received) == 1
|
||||||
|
ev = received[0]
|
||||||
|
assert ev.topic == topic
|
||||||
|
assert ev.payload["action"] == _topics.WORKER_CONTROL_STOP
|
||||||
|
assert "requested_by" in ev.payload
|
||||||
|
assert "ts" in ev.payload
|
||||||
|
finally:
|
||||||
|
await sub.__aexit__(None, None, None)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_stop_worker_viewer_forbidden(
|
||||||
|
client: httpx.AsyncClient, viewer_token: str, fake_bus: FakeBus,
|
||||||
|
) -> None:
|
||||||
|
resp = await client.post(
|
||||||
|
"/api/v1/workers/mutator/stop",
|
||||||
|
headers={"Authorization": f"Bearer {viewer_token}"},
|
||||||
|
)
|
||||||
|
assert resp.status_code == 403
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_stop_worker_unknown_name_404(
|
||||||
|
client: httpx.AsyncClient, auth_token: str, fake_bus: FakeBus,
|
||||||
|
) -> None:
|
||||||
|
resp = await client.post(
|
||||||
|
"/api/v1/workers/nonsense/stop",
|
||||||
|
headers={"Authorization": f"Bearer {auth_token}"},
|
||||||
|
)
|
||||||
|
assert resp.status_code == 404
|
||||||
77
tests/bus/test_control_listener.py
Normal file
77
tests/bus/test_control_listener.py
Normal file
@@ -0,0 +1,77 @@
|
|||||||
|
"""Tests for :func:`run_control_listener`.
|
||||||
|
|
||||||
|
The listener is the worker-side half of the Workers panel stop flow:
|
||||||
|
consume ``system.<worker>.control`` messages, set a shutdown event on a
|
||||||
|
well-formed ``{"action": "stop"}``, and ignore everything else without
|
||||||
|
raising.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from decnet.bus import topics as _topics
|
||||||
|
from decnet.bus.fake import FakeBus
|
||||||
|
from decnet.bus.publish import run_control_listener
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_control_listener_sets_shutdown_on_stop() -> None:
|
||||||
|
bus = FakeBus()
|
||||||
|
await bus.connect()
|
||||||
|
shutdown = asyncio.Event()
|
||||||
|
try:
|
||||||
|
task = asyncio.create_task(run_control_listener(bus, "mutator", shutdown))
|
||||||
|
# Give the subscribe() call a tick to register before we publish.
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
await bus.publish(
|
||||||
|
_topics.system_control("mutator"),
|
||||||
|
{"action": _topics.WORKER_CONTROL_STOP, "requested_by": "admin"},
|
||||||
|
event_type="control",
|
||||||
|
)
|
||||||
|
await asyncio.wait_for(task, timeout=1.0)
|
||||||
|
assert shutdown.is_set()
|
||||||
|
finally:
|
||||||
|
await bus.close()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_control_listener_ignores_malformed() -> None:
|
||||||
|
bus = FakeBus()
|
||||||
|
await bus.connect()
|
||||||
|
shutdown = asyncio.Event()
|
||||||
|
try:
|
||||||
|
task = asyncio.create_task(run_control_listener(bus, "mutator", shutdown))
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
# Unknown action, non-dict-ish field, missing action — none of
|
||||||
|
# these should raise or trigger shutdown.
|
||||||
|
await bus.publish(
|
||||||
|
_topics.system_control("mutator"),
|
||||||
|
{"action": "bogus"}, event_type="control",
|
||||||
|
)
|
||||||
|
await bus.publish(
|
||||||
|
_topics.system_control("mutator"),
|
||||||
|
{"requested_by": "admin"}, event_type="control",
|
||||||
|
)
|
||||||
|
# Now send a real stop to unblock the task so the test terminates.
|
||||||
|
await bus.publish(
|
||||||
|
_topics.system_control("mutator"),
|
||||||
|
{"action": _topics.WORKER_CONTROL_STOP}, event_type="control",
|
||||||
|
)
|
||||||
|
await asyncio.wait_for(task, timeout=1.0)
|
||||||
|
assert shutdown.is_set()
|
||||||
|
finally:
|
||||||
|
await bus.close()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_control_listener_none_bus_awaits_shutdown() -> None:
|
||||||
|
# With bus=None the listener degrades to awaiting the shutdown event
|
||||||
|
# directly — callers can create_task() unconditionally.
|
||||||
|
shutdown = asyncio.Event()
|
||||||
|
task = asyncio.create_task(run_control_listener(None, "mutator", shutdown))
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
assert not task.done()
|
||||||
|
shutdown.set()
|
||||||
|
await asyncio.wait_for(task, timeout=1.0)
|
||||||
@@ -61,3 +61,14 @@ def test_attacker_builder_rejects_empty() -> None:
|
|||||||
def test_system_health_builder() -> None:
|
def test_system_health_builder() -> None:
|
||||||
assert topics.system_health("sniffer") == "system.sniffer.health"
|
assert topics.system_health("sniffer") == "system.sniffer.health"
|
||||||
assert topics.system_health("mutator") == "system.mutator.health"
|
assert topics.system_health("mutator") == "system.mutator.health"
|
||||||
|
|
||||||
|
|
||||||
|
def test_system_control_builder() -> None:
|
||||||
|
assert topics.system_control("mutator") == "system.mutator.control"
|
||||||
|
assert topics.system_control("collector") == "system.collector.control"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("bad", ["", "has.dot", "has*wildcard", "has>wild", "with space", "with\ttab"])
|
||||||
|
def test_system_control_rejects_bad_segments(bad: str) -> None:
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
topics.system_control(bad)
|
||||||
|
|||||||
Reference in New Issue
Block a user