diff --git a/decnet/bus/publish.py b/decnet/bus/publish.py new file mode 100644 index 00000000..727fce2e --- /dev/null +++ b/decnet/bus/publish.py @@ -0,0 +1,36 @@ +"""Fire-and-forget publish helper shared across every worker. + +Lifted out of ``decnet/mutator/engine.py`` once a second caller showed up +(DEBT-031). Keeping one implementation means the "never break the worker +loop" guarantee is audited in exactly one place. +""" +from __future__ import annotations + +from typing import Any + +from decnet.bus.base import BaseBus +from decnet.logging import get_logger + +log = get_logger("bus.publish") + + +async def publish_safely( + bus: BaseBus | None, + topic: str, + payload: dict[str, Any], + event_type: str = "", +) -> None: + """Publish on *bus* without ever raising back at the caller. + + The DB row (or equivalent side-effect) has already been committed by + the time a worker calls this; the bus is the notification layer, not + the source of truth. A dropped publish is at most a few seconds of + UI latency until the next poll tick. A raised exception here, by + contrast, would crash the worker — which is strictly worse. + """ + if bus is None: + return + try: + await bus.publish(topic, payload, event_type=event_type) + except Exception as exc: # noqa: BLE001 + log.warning("bus publish failed topic=%s: %s", topic, exc) diff --git a/decnet/bus/topics.py b/decnet/bus/topics.py index 27e03184..3b02418d 100644 --- a/decnet/bus/topics.py +++ b/decnet/bus/topics.py @@ -11,8 +11,12 @@ Token structure (NATS-style, dot-separated): decky.{decky_id}.state decky.{decky_id}.traffic attacker.observed + attacker.scored + attacker.session.started + attacker.session.ended system.log system.bus.health + system.{worker}.health Wildcards (per :func:`decnet.bus.base.matches`): @@ -47,9 +51,22 @@ TOPOLOGY_STATUS = "status" DECKY_STATE = "state" DECKY_TRAFFIC = "traffic" +# Attacker event types (second token under the ``attacker`` root). First +# sighting, session boundary transitions, and score-threshold crossings +# published by correlator + profiler. Consumers typically subscribe to +# the wildcard ``attacker.>``. +ATTACKER_OBSERVED = "observed" +ATTACKER_SCORED = "scored" +ATTACKER_SESSION_STARTED = "session.started" +ATTACKER_SESSION_ENDED = "session.ended" + # System event types. SYSTEM_LOG = "log" SYSTEM_BUS_HEALTH = "bus.health" +# Worker-health leaf — built per-worker as ``system..health`` via +# :func:`system_health`. The leaf constant stays the same across workers; +# the worker name goes in the middle token. +SYSTEM_HEALTH = "health" # ─── Builders ──────────────────────────────────────────────────────────────── @@ -89,6 +106,31 @@ def system(event_type: str) -> str: return f"{SYSTEM}.{event_type}" +def attacker(event_type: str) -> str: + """Build ``attacker.``. + + *event_type* is typically one of ``ATTACKER_OBSERVED``, + ``ATTACKER_SCORED``, ``ATTACKER_SESSION_STARTED``, + ``ATTACKER_SESSION_ENDED``. Dotted leaves (``session.started``) are + permitted — same rationale as :func:`system`. + """ + if not event_type: + raise ValueError("attacker topic requires a non-empty event_type") + return f"{ATTACKER}.{event_type}" + + +def system_health(worker: str) -> str: + """Build ``system..health``. + + Worker-health heartbeats live as a nested leaf under ``system`` so + consumers can subscribe to ``system.*.health`` for every worker at + once, or to ``system.mutator.health`` for a single one. *worker* is + validated as a regular segment — no dots, wildcards, or whitespace. + """ + _reject_tokens(worker) + return f"{SYSTEM}.{worker}.{SYSTEM_HEALTH}" + + def _reject_tokens(*parts: str) -> None: """Reject topic segments that would break NATS-style tokenization. diff --git a/decnet/mutator/engine.py b/decnet/mutator/engine.py index 4f3f9192..f0c22ede 100644 --- a/decnet/mutator/engine.py +++ b/decnet/mutator/engine.py @@ -25,32 +25,13 @@ import contextlib from decnet.bus import topics as _topics from decnet.bus.base import BaseBus from decnet.bus.factory import get_bus +from decnet.bus.publish import publish_safely as _publish_safely from decnet.web.db.repository import BaseRepository log = get_logger("mutator") console = Console() -async def _publish_safely( - bus: BaseBus | None, - topic: str, - payload: dict, - event_type: str = "", -) -> None: - """Fire-and-forget bus publish. - - A bus failure must never break the reconciler — the DB write already - happened before we got here, so losing the notification is at most a - few seconds of UI latency (the next poll tick picks it up). - """ - if bus is None: - return - try: - await bus.publish(topic, payload, event_type=event_type) - except Exception as exc: # noqa: BLE001 - log.warning("bus publish failed topic=%s: %s", topic, exc) - - @_traced("mutator.mutate_decky") async def mutate_decky(decky_name: str, repo: BaseRepository) -> bool: """ diff --git a/tests/bus/test_publish.py b/tests/bus/test_publish.py new file mode 100644 index 00000000..4fae7d19 --- /dev/null +++ b/tests/bus/test_publish.py @@ -0,0 +1,64 @@ +"""Tests for :mod:`decnet.bus.publish`. + +The whole point of ``publish_safely`` is that it never raises back at the +caller. These tests pin that contract: ``None`` bus is a no-op, a real +bus publishes, and a raising bus is swallowed + logged. +""" +from __future__ import annotations + +import logging + +import pytest + +from decnet.bus.base import BaseBus, Event, Subscription +from decnet.bus.fake import FakeBus +from decnet.bus.publish import publish_safely + + +class _ExplodingBus(BaseBus): + """Minimal bus whose ``publish`` always raises.""" + + async def connect(self) -> None: # pragma: no cover - trivial + return None + + async def publish(self, topic, payload, *, event_type=""): + raise RuntimeError("transport exploded") + + def subscribe(self, pattern: str) -> Subscription: # pragma: no cover + raise NotImplementedError + + async def close(self) -> None: # pragma: no cover - trivial + return None + + +@pytest.mark.asyncio +async def test_publish_safely_none_bus_is_noop() -> None: + # Must not raise. A worker that couldn't connect at startup passes + # bus=None and expects every call to silently no-op. + await publish_safely(None, "system.log", {"msg": "hi"}) + + +@pytest.mark.asyncio +async def test_publish_safely_delivers_on_live_bus() -> None: + bus = FakeBus() + await bus.connect() + try: + sub = bus.subscribe("system.log") + async with sub: + await publish_safely(bus, "system.log", {"msg": "hi"}, event_type="log") + event = await sub.__anext__() + assert isinstance(event, Event) + assert event.topic == "system.log" + assert event.type == "log" + assert event.payload == {"msg": "hi"} + finally: + await bus.close() + + +@pytest.mark.asyncio +async def test_publish_safely_swallows_transport_errors(caplog: pytest.LogCaptureFixture) -> None: + caplog.set_level(logging.WARNING, logger="bus.publish") + # The exploding bus would crash the caller without publish_safely. + # After wrapping, the caller sees nothing but a log line. + await publish_safely(_ExplodingBus(), "system.log", {"msg": "hi"}) + assert any("bus publish failed" in rec.message for rec in caplog.records) diff --git a/tests/bus/test_topics.py b/tests/bus/test_topics.py index 02c4cdbe..6d18153b 100644 --- a/tests/bus/test_topics.py +++ b/tests/bus/test_topics.py @@ -40,3 +40,23 @@ def test_segment_validation(bad: str) -> None: topics.topology_status(bad) with pytest.raises(ValueError): topics.decky(bad, topics.DECKY_STATE) + with pytest.raises(ValueError): + topics.system_health(bad) + + +def test_attacker_builder() -> None: + assert topics.attacker(topics.ATTACKER_OBSERVED) == "attacker.observed" + assert topics.attacker(topics.ATTACKER_SCORED) == "attacker.scored" + # Dotted leaf is intentional — same as system.bus.health. + assert topics.attacker(topics.ATTACKER_SESSION_STARTED) == "attacker.session.started" + assert topics.attacker(topics.ATTACKER_SESSION_ENDED) == "attacker.session.ended" + + +def test_attacker_builder_rejects_empty() -> None: + with pytest.raises(ValueError): + topics.attacker("") + + +def test_system_health_builder() -> None: + assert topics.system_health("sniffer") == "system.sniffer.health" + assert topics.system_health("mutator") == "system.mutator.health"