refactor(bus): extract publish_safely + extend topics for DEBT-031
Shared publish_safely helper at decnet/bus/publish.py so the nine workers about to be wired into the bus don't each copy-paste the "never raise back at the caller" contract. Mutator drops its private copy and imports the canonical one. topics.py gains the attacker.* hierarchy (observed, scored, session.started, session.ended) and a system_health(worker) builder for per-worker health heartbeats — both prerequisites for the worker rollout under DEBT-031.
This commit is contained in:
36
decnet/bus/publish.py
Normal file
36
decnet/bus/publish.py
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
"""Fire-and-forget publish helper shared across every worker.
|
||||||
|
|
||||||
|
Lifted out of ``decnet/mutator/engine.py`` once a second caller showed up
|
||||||
|
(DEBT-031). Keeping one implementation means the "never break the worker
|
||||||
|
loop" guarantee is audited in exactly one place.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from decnet.bus.base import BaseBus
|
||||||
|
from decnet.logging import get_logger
|
||||||
|
|
||||||
|
log = get_logger("bus.publish")
|
||||||
|
|
||||||
|
|
||||||
|
async def publish_safely(
|
||||||
|
bus: BaseBus | None,
|
||||||
|
topic: str,
|
||||||
|
payload: dict[str, Any],
|
||||||
|
event_type: str = "",
|
||||||
|
) -> None:
|
||||||
|
"""Publish on *bus* without ever raising back at the caller.
|
||||||
|
|
||||||
|
The DB row (or equivalent side-effect) has already been committed by
|
||||||
|
the time a worker calls this; the bus is the notification layer, not
|
||||||
|
the source of truth. A dropped publish is at most a few seconds of
|
||||||
|
UI latency until the next poll tick. A raised exception here, by
|
||||||
|
contrast, would crash the worker — which is strictly worse.
|
||||||
|
"""
|
||||||
|
if bus is None:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
await bus.publish(topic, payload, event_type=event_type)
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
log.warning("bus publish failed topic=%s: %s", topic, exc)
|
||||||
@@ -11,8 +11,12 @@ Token structure (NATS-style, dot-separated):
|
|||||||
decky.{decky_id}.state
|
decky.{decky_id}.state
|
||||||
decky.{decky_id}.traffic
|
decky.{decky_id}.traffic
|
||||||
attacker.observed
|
attacker.observed
|
||||||
|
attacker.scored
|
||||||
|
attacker.session.started
|
||||||
|
attacker.session.ended
|
||||||
system.log
|
system.log
|
||||||
system.bus.health
|
system.bus.health
|
||||||
|
system.{worker}.health
|
||||||
|
|
||||||
Wildcards (per :func:`decnet.bus.base.matches`):
|
Wildcards (per :func:`decnet.bus.base.matches`):
|
||||||
|
|
||||||
@@ -47,9 +51,22 @@ TOPOLOGY_STATUS = "status"
|
|||||||
DECKY_STATE = "state"
|
DECKY_STATE = "state"
|
||||||
DECKY_TRAFFIC = "traffic"
|
DECKY_TRAFFIC = "traffic"
|
||||||
|
|
||||||
|
# Attacker event types (second token under the ``attacker`` root). First
|
||||||
|
# sighting, session boundary transitions, and score-threshold crossings
|
||||||
|
# published by correlator + profiler. Consumers typically subscribe to
|
||||||
|
# the wildcard ``attacker.>``.
|
||||||
|
ATTACKER_OBSERVED = "observed"
|
||||||
|
ATTACKER_SCORED = "scored"
|
||||||
|
ATTACKER_SESSION_STARTED = "session.started"
|
||||||
|
ATTACKER_SESSION_ENDED = "session.ended"
|
||||||
|
|
||||||
# System event types.
|
# System event types.
|
||||||
SYSTEM_LOG = "log"
|
SYSTEM_LOG = "log"
|
||||||
SYSTEM_BUS_HEALTH = "bus.health"
|
SYSTEM_BUS_HEALTH = "bus.health"
|
||||||
|
# Worker-health leaf — built per-worker as ``system.<worker>.health`` via
|
||||||
|
# :func:`system_health`. The leaf constant stays the same across workers;
|
||||||
|
# the worker name goes in the middle token.
|
||||||
|
SYSTEM_HEALTH = "health"
|
||||||
|
|
||||||
|
|
||||||
# ─── Builders ────────────────────────────────────────────────────────────────
|
# ─── Builders ────────────────────────────────────────────────────────────────
|
||||||
@@ -89,6 +106,31 @@ def system(event_type: str) -> str:
|
|||||||
return f"{SYSTEM}.{event_type}"
|
return f"{SYSTEM}.{event_type}"
|
||||||
|
|
||||||
|
|
||||||
|
def attacker(event_type: str) -> str:
|
||||||
|
"""Build ``attacker.<event_type>``.
|
||||||
|
|
||||||
|
*event_type* is typically one of ``ATTACKER_OBSERVED``,
|
||||||
|
``ATTACKER_SCORED``, ``ATTACKER_SESSION_STARTED``,
|
||||||
|
``ATTACKER_SESSION_ENDED``. Dotted leaves (``session.started``) are
|
||||||
|
permitted — same rationale as :func:`system`.
|
||||||
|
"""
|
||||||
|
if not event_type:
|
||||||
|
raise ValueError("attacker topic requires a non-empty event_type")
|
||||||
|
return f"{ATTACKER}.{event_type}"
|
||||||
|
|
||||||
|
|
||||||
|
def system_health(worker: str) -> str:
|
||||||
|
"""Build ``system.<worker>.health``.
|
||||||
|
|
||||||
|
Worker-health heartbeats live as a nested leaf under ``system`` so
|
||||||
|
consumers can subscribe to ``system.*.health`` for every worker at
|
||||||
|
once, or to ``system.mutator.health`` for a single one. *worker* is
|
||||||
|
validated as a regular segment — no dots, wildcards, or whitespace.
|
||||||
|
"""
|
||||||
|
_reject_tokens(worker)
|
||||||
|
return f"{SYSTEM}.{worker}.{SYSTEM_HEALTH}"
|
||||||
|
|
||||||
|
|
||||||
def _reject_tokens(*parts: str) -> None:
|
def _reject_tokens(*parts: str) -> None:
|
||||||
"""Reject topic segments that would break NATS-style tokenization.
|
"""Reject topic segments that would break NATS-style tokenization.
|
||||||
|
|
||||||
|
|||||||
@@ -25,32 +25,13 @@ import contextlib
|
|||||||
from decnet.bus import topics as _topics
|
from decnet.bus import topics as _topics
|
||||||
from decnet.bus.base import BaseBus
|
from decnet.bus.base import BaseBus
|
||||||
from decnet.bus.factory import get_bus
|
from decnet.bus.factory import get_bus
|
||||||
|
from decnet.bus.publish import publish_safely as _publish_safely
|
||||||
from decnet.web.db.repository import BaseRepository
|
from decnet.web.db.repository import BaseRepository
|
||||||
|
|
||||||
log = get_logger("mutator")
|
log = get_logger("mutator")
|
||||||
console = Console()
|
console = Console()
|
||||||
|
|
||||||
|
|
||||||
async def _publish_safely(
|
|
||||||
bus: BaseBus | None,
|
|
||||||
topic: str,
|
|
||||||
payload: dict,
|
|
||||||
event_type: str = "",
|
|
||||||
) -> None:
|
|
||||||
"""Fire-and-forget bus publish.
|
|
||||||
|
|
||||||
A bus failure must never break the reconciler — the DB write already
|
|
||||||
happened before we got here, so losing the notification is at most a
|
|
||||||
few seconds of UI latency (the next poll tick picks it up).
|
|
||||||
"""
|
|
||||||
if bus is None:
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
await bus.publish(topic, payload, event_type=event_type)
|
|
||||||
except Exception as exc: # noqa: BLE001
|
|
||||||
log.warning("bus publish failed topic=%s: %s", topic, exc)
|
|
||||||
|
|
||||||
|
|
||||||
@_traced("mutator.mutate_decky")
|
@_traced("mutator.mutate_decky")
|
||||||
async def mutate_decky(decky_name: str, repo: BaseRepository) -> bool:
|
async def mutate_decky(decky_name: str, repo: BaseRepository) -> bool:
|
||||||
"""
|
"""
|
||||||
|
|||||||
64
tests/bus/test_publish.py
Normal file
64
tests/bus/test_publish.py
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
"""Tests for :mod:`decnet.bus.publish`.
|
||||||
|
|
||||||
|
The whole point of ``publish_safely`` is that it never raises back at the
|
||||||
|
caller. These tests pin that contract: ``None`` bus is a no-op, a real
|
||||||
|
bus publishes, and a raising bus is swallowed + logged.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from decnet.bus.base import BaseBus, Event, Subscription
|
||||||
|
from decnet.bus.fake import FakeBus
|
||||||
|
from decnet.bus.publish import publish_safely
|
||||||
|
|
||||||
|
|
||||||
|
class _ExplodingBus(BaseBus):
|
||||||
|
"""Minimal bus whose ``publish`` always raises."""
|
||||||
|
|
||||||
|
async def connect(self) -> None: # pragma: no cover - trivial
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def publish(self, topic, payload, *, event_type=""):
|
||||||
|
raise RuntimeError("transport exploded")
|
||||||
|
|
||||||
|
def subscribe(self, pattern: str) -> Subscription: # pragma: no cover
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
async def close(self) -> None: # pragma: no cover - trivial
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_publish_safely_none_bus_is_noop() -> None:
|
||||||
|
# Must not raise. A worker that couldn't connect at startup passes
|
||||||
|
# bus=None and expects every call to silently no-op.
|
||||||
|
await publish_safely(None, "system.log", {"msg": "hi"})
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_publish_safely_delivers_on_live_bus() -> None:
|
||||||
|
bus = FakeBus()
|
||||||
|
await bus.connect()
|
||||||
|
try:
|
||||||
|
sub = bus.subscribe("system.log")
|
||||||
|
async with sub:
|
||||||
|
await publish_safely(bus, "system.log", {"msg": "hi"}, event_type="log")
|
||||||
|
event = await sub.__anext__()
|
||||||
|
assert isinstance(event, Event)
|
||||||
|
assert event.topic == "system.log"
|
||||||
|
assert event.type == "log"
|
||||||
|
assert event.payload == {"msg": "hi"}
|
||||||
|
finally:
|
||||||
|
await bus.close()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_publish_safely_swallows_transport_errors(caplog: pytest.LogCaptureFixture) -> None:
|
||||||
|
caplog.set_level(logging.WARNING, logger="bus.publish")
|
||||||
|
# The exploding bus would crash the caller without publish_safely.
|
||||||
|
# After wrapping, the caller sees nothing but a log line.
|
||||||
|
await publish_safely(_ExplodingBus(), "system.log", {"msg": "hi"})
|
||||||
|
assert any("bus publish failed" in rec.message for rec in caplog.records)
|
||||||
@@ -40,3 +40,23 @@ def test_segment_validation(bad: str) -> None:
|
|||||||
topics.topology_status(bad)
|
topics.topology_status(bad)
|
||||||
with pytest.raises(ValueError):
|
with pytest.raises(ValueError):
|
||||||
topics.decky(bad, topics.DECKY_STATE)
|
topics.decky(bad, topics.DECKY_STATE)
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
topics.system_health(bad)
|
||||||
|
|
||||||
|
|
||||||
|
def test_attacker_builder() -> None:
|
||||||
|
assert topics.attacker(topics.ATTACKER_OBSERVED) == "attacker.observed"
|
||||||
|
assert topics.attacker(topics.ATTACKER_SCORED) == "attacker.scored"
|
||||||
|
# Dotted leaf is intentional — same as system.bus.health.
|
||||||
|
assert topics.attacker(topics.ATTACKER_SESSION_STARTED) == "attacker.session.started"
|
||||||
|
assert topics.attacker(topics.ATTACKER_SESSION_ENDED) == "attacker.session.ended"
|
||||||
|
|
||||||
|
|
||||||
|
def test_attacker_builder_rejects_empty() -> None:
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
topics.attacker("")
|
||||||
|
|
||||||
|
|
||||||
|
def test_system_health_builder() -> None:
|
||||||
|
assert topics.system_health("sniffer") == "system.sniffer.health"
|
||||||
|
assert topics.system_health("mutator") == "system.mutator.health"
|
||||||
|
|||||||
Reference in New Issue
Block a user