feat(sniffer): publish decky.{id}.traffic on the bus (DEBT-031)

SnifferEngine gains an optional publish_fn hook, invoked after the
dedup + syslog write for traffic-summary events only (tls_session,
tcp_flow_timing, tcp_syn_fingerprint) — intermediate parser artifacts
like tls_client_hello stay off the bus.

The sniffer worker wires get_bus() + a thread-safe shim that marshals
sync calls from the scapy sniff thread back onto the asyncio loop via
run_coroutine_threadsafe.  Bus failure at startup degrades cleanly to
publish-off mode; publish failures at runtime never escape the sniff
thread.
This commit is contained in:
2026-04-21 16:35:50 -04:00
parent f3eaab5d37
commit 7f497ac552
4 changed files with 243 additions and 3 deletions

View File

@@ -52,6 +52,17 @@ _TCP_ACK: int = 0x10
_TCP_FIN: int = 0x01 _TCP_FIN: int = 0x01
_TCP_RST: int = 0x04 _TCP_RST: int = 0x04
# Event types that should fan out on the service bus as ``decky.{id}.traffic``.
# Intermediate parser artifacts (tls_client_hello, tls_certificate) are
# intentionally excluded — tls_session covers the completed handshake and
# tcp_flow_timing covers the flow summary; together they're the minimum
# interesting signal for downstream consumers.
_BUS_TRAFFIC_EVENTS: frozenset[str] = frozenset({
"tls_session",
"tcp_flow_timing",
"tcp_syn_fingerprint",
})
# ─── TCP option extraction for passive fingerprinting ─────────────────────── # ─── TCP option extraction for passive fingerprinting ───────────────────────
@@ -692,10 +703,16 @@ class SnifferEngine:
ip_to_decky: dict[str, str], ip_to_decky: dict[str, str],
write_fn: Callable[[str], None], write_fn: Callable[[str], None],
dedup_ttl: float = 300.0, dedup_ttl: float = 300.0,
publish_fn: Callable[[str, str, dict[str, Any]], None] | None = None,
): ):
self._ip_to_decky = ip_to_decky self._ip_to_decky = ip_to_decky
self._write_fn = write_fn self._write_fn = write_fn
self._dedup_ttl = dedup_ttl self._dedup_ttl = dedup_ttl
# Optional bus publish hook. Called *after* dedup + syslog write, so
# every syslog line we emit has a matching bus event and duplicate
# storms are already suppressed upstream. Signature:
# ``publish_fn(decky_name, event_type, payload_dict)``.
self._publish_fn = publish_fn
self._sessions: dict[tuple[str, int, str, int], dict[str, Any]] = {} self._sessions: dict[tuple[str, int, str, int], dict[str, Any]] = {}
self._session_ts: dict[tuple[str, int, str, int], float] = {} self._session_ts: dict[tuple[str, int, str, int], float] = {}
@@ -782,6 +799,15 @@ class SnifferEngine:
return return
line = syslog_line(SERVICE_NAME, node_name, event_type, severity=severity, **fields) line = syslog_line(SERVICE_NAME, node_name, event_type, severity=severity, **fields)
self._write_fn(line) self._write_fn(line)
# Bus fan-out, fire-and-forget. Only emit for traffic-summary event
# types — the ones that represent an observable decky interaction
# rather than an intermediate parser artifact. Rate is naturally
# bounded by the dedup cache above.
if self._publish_fn is not None and event_type in _BUS_TRAFFIC_EVENTS:
try:
self._publish_fn(node_name, event_type, dict(fields))
except Exception: # nosec B110 — bus must never break sniff thread
pass
# ── Flow tracking (per-TCP-4-tuple timing + retransmits) ──────────────── # ── Flow tracking (per-TCP-4-tuple timing + retransmits) ────────────────

View File

@@ -11,12 +11,18 @@ The API never depends on this worker being alive.
""" """
import asyncio import asyncio
import contextlib
import os import os
import subprocess # nosec B404 — needed for interface checks import subprocess # nosec B404 — needed for interface checks
import threading import threading
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from pathlib import Path from pathlib import Path
from typing import Any, Callable
from decnet.bus import topics as _topics
from decnet.bus.base import BaseBus
from decnet.bus.factory import get_bus
from decnet.bus.publish import publish_safely
from decnet.logging import get_logger from decnet.logging import get_logger
from decnet.network import HOST_IPVLAN_IFACE, HOST_MACVLAN_IFACE from decnet.network import HOST_IPVLAN_IFACE, HOST_MACVLAN_IFACE
from decnet.sniffer.fingerprint import SnifferEngine from decnet.sniffer.fingerprint import SnifferEngine
@@ -41,6 +47,30 @@ def _load_ip_to_decky() -> dict[str, str]:
return mapping return mapping
def _make_thread_safe_publisher(
bus: BaseBus,
loop: asyncio.AbstractEventLoop,
) -> Callable[[str, str, dict[str, Any]], None]:
"""Build a sync callable that marshals bus publishes back to *loop*.
The scapy sniff loop runs in a dedicated worker thread and cannot
``await`` anything. Every call here schedules the async publish on
the event loop and returns immediately; the sniff thread is never
blocked waiting for the publish to actually land on the wire.
"""
def _publish(decky_name: str, event_type: str, payload: dict[str, Any]) -> None:
topic = _topics.decky(decky_name, _topics.DECKY_TRAFFIC)
try:
asyncio.run_coroutine_threadsafe(
publish_safely(bus, topic, payload, event_type=event_type),
loop,
)
except Exception as exc: # noqa: BLE001
logger.debug("sniffer: cross-thread bus publish failed: %s", exc)
return _publish
def _interface_exists(iface: str) -> bool: def _interface_exists(iface: str) -> bool:
"""Check if a network interface exists on this host.""" """Check if a network interface exists on this host."""
try: try:
@@ -59,6 +89,7 @@ def _sniff_loop(
log_path: Path, log_path: Path,
json_path: Path, json_path: Path,
stop_event: threading.Event, stop_event: threading.Event,
publish_fn: Callable[[str, str, dict[str, Any]], None] | None = None,
) -> None: ) -> None:
"""Blocking sniff loop. Runs in a dedicated thread via asyncio.to_thread.""" """Blocking sniff loop. Runs in a dedicated thread via asyncio.to_thread."""
try: try:
@@ -75,7 +106,9 @@ def _sniff_loop(
def _write_fn(line: str) -> None: def _write_fn(line: str) -> None:
write_event(line, log_path, json_path) write_event(line, log_path, json_path)
engine = SnifferEngine(ip_to_decky=ip_map, write_fn=_write_fn) engine = SnifferEngine(
ip_to_decky=ip_map, write_fn=_write_fn, publish_fn=publish_fn,
)
# Periodically refresh IP map in a background daemon thread # Periodically refresh IP map in a background daemon thread
def _refresh_loop() -> None: def _refresh_loop() -> None:
@@ -150,6 +183,25 @@ async def sniffer_worker(log_file: str) -> None:
stop_event = threading.Event() stop_event = threading.Event()
loop = asyncio.get_running_loop()
# Connect to the bus for decky.{id}.traffic fan-out. Failure here
# is non-fatal: the sniffer still writes syslog, it just doesn't
# push notifications to downstream consumers.
bus: BaseBus | None = None
try:
candidate = get_bus(client_name="sniffer")
await candidate.connect()
bus = candidate
except Exception as exc: # noqa: BLE001
logger.warning(
"sniffer: bus unavailable, running in publish-off mode: %s", exc,
)
publish_fn: Callable[[str, str, dict[str, Any]], None] | None = None
if bus is not None:
publish_fn = _make_thread_safe_publisher(bus, loop)
# Dedicated thread pool so the long-running sniff loop doesn't # Dedicated thread pool so the long-running sniff loop doesn't
# occupy a slot in the default asyncio executor. # occupy a slot in the default asyncio executor.
sniffer_pool = ThreadPoolExecutor( sniffer_pool = ThreadPoolExecutor(
@@ -157,10 +209,9 @@ async def sniffer_worker(log_file: str) -> None:
) )
try: try:
loop = asyncio.get_running_loop()
await loop.run_in_executor( await loop.run_in_executor(
sniffer_pool, _sniff_loop, sniffer_pool, _sniff_loop,
interface, log_path, json_path, stop_event, interface, log_path, json_path, stop_event, publish_fn,
) )
except asyncio.CancelledError: except asyncio.CancelledError:
logger.info("sniffer: shutdown requested") logger.info("sniffer: shutdown requested")
@@ -169,6 +220,9 @@ async def sniffer_worker(log_file: str) -> None:
raise raise
finally: finally:
sniffer_pool.shutdown(wait=False) sniffer_pool.shutdown(wait=False)
if bus is not None:
with contextlib.suppress(Exception):
await bus.close()
except asyncio.CancelledError: except asyncio.CancelledError:
raise raise

View File

View File

@@ -0,0 +1,160 @@
"""Bus wiring for the fleet sniffer (DEBT-031, worker 1).
The sniff loop itself lives in a dedicated thread running scapy and
cannot be exercised cleanly under pytest (see the "no scapy in
TestClient lifespan tests" constraint — same hazard applies here).
These tests instead pin the two things that actually carry the
contract:
1. ``SnifferEngine`` invokes ``publish_fn`` on traffic-summary events
and skips intermediate parser artifacts.
2. The worker's thread-safe publisher marshals syncronous calls from
the sniff thread back onto the asyncio loop where the bus lives,
and routes them under the ``decky.{id}.traffic`` topic.
"""
from __future__ import annotations
import asyncio
import pytest
import pytest_asyncio
from decnet.bus import topics as _topics
from decnet.bus.fake import FakeBus
from decnet.sniffer.fingerprint import SnifferEngine
from decnet.sniffer.worker import _make_thread_safe_publisher
@pytest_asyncio.fixture
async def bus() -> FakeBus:
b = FakeBus()
await b.connect()
yield b
await b.close()
# ─── Engine-level publish hook ───────────────────────────────────────────────
def test_engine_publishes_on_traffic_summary_events() -> None:
captured: list[tuple[str, str, dict]] = []
engine = SnifferEngine(
ip_to_decky={"10.0.0.5": "decky-a"},
write_fn=lambda _line: None,
publish_fn=lambda node, event, payload: captured.append((node, event, payload)),
)
engine._log(
"decky-a", "tcp_flow_timing",
src_ip="203.0.113.9", src_port="4444",
dst_ip="10.0.0.5", dst_port="22",
packets="17", bytes="2048", duration_s="5.1",
mean_iat_ms="300", min_iat_ms="1", max_iat_ms="1200",
retransmits="0",
)
assert captured == [(
"decky-a", "tcp_flow_timing",
{
"src_ip": "203.0.113.9", "src_port": "4444",
"dst_ip": "10.0.0.5", "dst_port": "22",
"packets": "17", "bytes": "2048", "duration_s": "5.1",
"mean_iat_ms": "300", "min_iat_ms": "1", "max_iat_ms": "1200",
"retransmits": "0",
},
)]
def test_engine_skips_intermediate_parser_artifacts() -> None:
captured: list[tuple[str, str, dict]] = []
engine = SnifferEngine(
ip_to_decky={"10.0.0.5": "decky-a"},
write_fn=lambda _line: None,
publish_fn=lambda node, event, payload: captured.append((node, event, payload)),
)
# tls_client_hello is parser intermediate — the completed tls_session
# handshake is what downstream consumers actually want.
engine._log("decky-a", "tls_client_hello", src_ip="1.2.3.4", ja3="abc", ja4="t13d0")
engine._log("decky-a", "tls_certificate", src_ip="1.2.3.4", subject_cn="foo", issuer="bar")
assert captured == []
def test_engine_no_publish_when_hook_absent() -> None:
# Engine without publish_fn is the pre-bus behavior; the syslog line
# is still written. No crash, no exceptions, no publish attempts.
calls: list[str] = []
engine = SnifferEngine(
ip_to_decky={"10.0.0.5": "decky-a"},
write_fn=lambda line: calls.append(line),
)
engine._log(
"decky-a", "tcp_flow_timing",
src_ip="1.2.3.4", src_port="4", dst_ip="10.0.0.5", dst_port="22",
packets="5", bytes="100", duration_s="2",
mean_iat_ms="0", min_iat_ms="0", max_iat_ms="0", retransmits="0",
)
assert len(calls) == 1
def test_engine_swallows_publish_fn_failures() -> None:
# A publish hook that blows up must never break the sniff thread.
def _boom(_node, _event, _payload):
raise RuntimeError("transport exploded")
engine = SnifferEngine(
ip_to_decky={"10.0.0.5": "decky-a"},
write_fn=lambda _line: None,
publish_fn=_boom,
)
# Must not raise.
engine._log(
"decky-a", "tcp_flow_timing",
src_ip="1.2.3.4", src_port="4", dst_ip="10.0.0.5", dst_port="22",
packets="5", bytes="100", duration_s="2",
mean_iat_ms="0", min_iat_ms="0", max_iat_ms="0", retransmits="0",
)
# ─── Thread-safe publisher (worker → bus) ────────────────────────────────────
@pytest.mark.asyncio
async def test_sniffer_worker_degrades_cleanly_when_bus_disabled(
monkeypatch: pytest.MonkeyPatch, tmp_path,
) -> None:
"""``DECNET_BUS_ENABLED=false`` is the non-negotiable escape hatch.
With the bus disabled, ``get_bus()`` returns a ``NullBus`` that
connects without error, and the worker proceeds in publish-off mode
without crashing. We don't exercise the scapy sniff loop (hangs
pytest teardown); we just assert the bus setup path is benign.
"""
from decnet.bus.factory import get_bus
monkeypatch.setenv("DECNET_BUS_ENABLED", "false")
bus = get_bus(client_name="sniffer")
await bus.connect()
# NullBus.publish is a no-op and must never raise.
await bus.publish("decky.x.traffic", {"probe": "ok"}, event_type="tcp_flow_timing")
await bus.close()
@pytest.mark.asyncio
async def test_thread_safe_publisher_routes_to_decky_traffic_topic(bus: FakeBus) -> None:
loop = asyncio.get_running_loop()
publish = _make_thread_safe_publisher(bus, loop)
sub = bus.subscribe(f"{_topics.DECKY}.*.{_topics.DECKY_TRAFFIC}")
async with sub:
# Fire from the same thread for test determinism — the
# run_coroutine_threadsafe path works identically in-thread, and
# asserting topic/payload shape is the point.
publish("decky-a", "tcp_flow_timing", {"src_ip": "1.2.3.4"})
event = await asyncio.wait_for(sub.__anext__(), timeout=2.0)
assert event.topic == "decky.decky-a.traffic"
assert event.type == "tcp_flow_timing"
assert event.payload == {"src_ip": "1.2.3.4"}