diff --git a/decnet/correlation/engine.py b/decnet/correlation/engine.py index 198d5442..c8340df0 100644 --- a/decnet/correlation/engine.py +++ b/decnet/correlation/engine.py @@ -24,6 +24,7 @@ from __future__ import annotations from collections import defaultdict from pathlib import Path +from typing import Any, Callable from rich.table import Table @@ -33,17 +34,35 @@ from decnet.logging.syslog_formatter import ( SEVERITY_WARNING, format_rfc5424, ) +from decnet.logging import get_logger from decnet.telemetry import traced as _traced, get_tracer as _get_tracer +log = get_logger("correlation.engine") + + +# ``publish_fn(event_type, payload_dict)``. Sync to avoid rippling +# ``async`` through every call site of :meth:`CorrelationEngine.ingest`; +# the caller wraps bus-publish via +# :func:`decnet.bus.publish.make_thread_safe_publisher`, which is safe to +# invoke from any thread including the event-loop thread. +CorrelationPublishFn = Callable[[str, dict[str, Any]], None] + class CorrelationEngine: - def __init__(self) -> None: + def __init__( + self, + *, + publish_fn: CorrelationPublishFn | None = None, + ) -> None: # attacker_ip → chronological list of events (only events with an IP) self._events: dict[str, list[LogEvent]] = defaultdict(list) # Total lines parsed (including no-IP and non-DECNET lines) self.lines_parsed: int = 0 # Total events indexed (had an attacker_ip) self.events_indexed: int = 0 + # Optional bus hook — invoked on first-sighting of an attacker IP. + # Always fires exactly once per IP for the lifetime of the engine. + self._publish_fn = publish_fn # ------------------------------------------------------------------ # # Ingestion # @@ -61,8 +80,23 @@ class CorrelationEngine: if event is None: return None if event.attacker_ip: + first_sighting = event.attacker_ip not in self._events self._events[event.attacker_ip].append(event) self.events_indexed += 1 + if first_sighting and self._publish_fn is not None: + try: + self._publish_fn( + "observed", + { + "attacker_ip": event.attacker_ip, + "decky": event.decky, + "service": event.service, + "event_type": event.event_type, + "first_seen": event.timestamp.isoformat(), + }, + ) + except Exception as exc: + log.warning("correlation publish hook failed: %s", exc) return event @_traced("correlation.ingest_file") diff --git a/decnet/profiler/worker.py b/decnet/profiler/worker.py index 3abaf8ef..8cccac2b 100644 --- a/decnet/profiler/worker.py +++ b/decnet/profiler/worker.py @@ -13,11 +13,15 @@ Complexity per cycle: O(new_logs + affected_ips) instead of O(total_logs²). from __future__ import annotations import asyncio +import contextlib import json from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Any +from decnet.bus import topics as _topics +from decnet.bus.factory import get_bus +from decnet.bus.publish import make_thread_safe_publisher from decnet.correlation.engine import CorrelationEngine from decnet.correlation.parser import LogEvent from decnet.logging import get_logger @@ -50,18 +54,44 @@ class _WorkerState: async def attacker_profile_worker(repo: BaseRepository, *, interval: int = 30) -> None: """Periodically updates the Attacker table incrementally. Designed to run as an asyncio Task.""" logger.info("attacker profile worker started interval=%ds", interval) - state = _WorkerState() + + # Optional bus wiring — correlator-family publishes ride on the profiler + # worker because CorrelationEngine lives inside it. If the bus is off or + # unreachable the engine runs with publish_fn=None and downstream degrades + # to DB-only. + bus = None + try: + bus = get_bus(client_name="profiler") + await bus.connect() + except Exception as exc: + logger.warning("profiler: bus unavailable, continuing without publish: %s", exc) + bus = None + + loop = asyncio.get_running_loop() + raw_publish = make_thread_safe_publisher(bus, loop) if bus is not None else None + + def _publish_attacker(event_type: str, payload: dict[str, Any]) -> None: + if raw_publish is None: + return + raw_publish(_topics.attacker(event_type), payload, event_type) + + state = _WorkerState(engine=CorrelationEngine(publish_fn=_publish_attacker)) _saved_cursor = await repo.get_state(_STATE_KEY) if _saved_cursor: state.last_log_id = _saved_cursor.get("last_log_id", 0) state.initialized = True logger.info("attacker worker: resumed from cursor last_log_id=%d", state.last_log_id) - while True: - await asyncio.sleep(interval) - try: - await _incremental_update(repo, state) - except Exception as exc: - logger.error("attacker worker: update failed: %s", exc) + try: + while True: + await asyncio.sleep(interval) + try: + await _incremental_update(repo, state) + except Exception as exc: + logger.error("attacker worker: update failed: %s", exc) + finally: + if bus is not None: + with contextlib.suppress(Exception): + await bus.close() @_traced("profiler.incremental_update") diff --git a/tests/correlation/__init__.py b/tests/correlation/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/correlation/test_correlation_bus.py b/tests/correlation/test_correlation_bus.py new file mode 100644 index 00000000..08ebda74 --- /dev/null +++ b/tests/correlation/test_correlation_bus.py @@ -0,0 +1,152 @@ +"""Bus wiring for the correlation engine (DEBT-031, worker 3). + +The correlator is not a standalone worker — ``CorrelationEngine`` is a +batch class instantiated inside the profiler worker. DEBT-031 wires it +via an optional ``publish_fn`` constructor arg: on the first sighting of +an attacker IP, the engine emits ``("observed", payload)`` through the +hook. The profiler worker carries the bus physically and translates +those sync hook calls into ``attacker.observed`` publishes. +""" +from __future__ import annotations + +import asyncio +from datetime import datetime + +import pytest +import pytest_asyncio + +from decnet.bus import topics as _topics +from decnet.bus.fake import FakeBus +from decnet.bus.publish import make_thread_safe_publisher +from decnet.correlation.engine import CorrelationEngine +from decnet.logging.syslog_formatter import SEVERITY_INFO, format_rfc5424 + + +_TS = "2026-04-21T10:00:00+00:00" + + +def _line(ip: str = "1.2.3.4", decky: str = "decky-01", event_type: str = "connection") -> str: + return format_rfc5424( + service="http", + hostname=decky, + event_type=event_type, + severity=SEVERITY_INFO, + timestamp=datetime.fromisoformat(_TS), + src_ip=ip, + ) + + +@pytest_asyncio.fixture +async def bus() -> FakeBus: + b = FakeBus() + await b.connect() + yield b + await b.close() + + +# ─── Engine-level publish hook ─────────────────────────────────────────────── + +def test_engine_publishes_once_on_first_sighting() -> None: + captured: list[tuple[str, dict]] = [] + engine = CorrelationEngine( + publish_fn=lambda event_type, payload: captured.append((event_type, payload)), + ) + + # Same IP three times: only the first should publish. + engine.ingest(_line(ip="9.9.9.9")) + engine.ingest(_line(ip="9.9.9.9", event_type="login")) + engine.ingest(_line(ip="9.9.9.9", decky="decky-02")) + + assert len(captured) == 1 + event_type, payload = captured[0] + assert event_type == "observed" + assert payload["attacker_ip"] == "9.9.9.9" + assert payload["decky"] == "decky-01" + assert payload["service"] == "http" + assert payload["event_type"] == "connection" + assert payload["first_seen"].startswith("2026-04-21T10:00:00") + + +def test_engine_publishes_per_unique_ip() -> None: + captured: list[tuple[str, dict]] = [] + engine = CorrelationEngine( + publish_fn=lambda event_type, payload: captured.append((event_type, payload)), + ) + + engine.ingest(_line(ip="1.1.1.1")) + engine.ingest(_line(ip="2.2.2.2")) + engine.ingest(_line(ip="1.1.1.1")) # dup, no publish + engine.ingest(_line(ip="3.3.3.3")) + + ips = [p["attacker_ip"] for _, p in captured] + assert ips == ["1.1.1.1", "2.2.2.2", "3.3.3.3"] + + +def test_engine_swallows_publish_fn_failures() -> None: + # A publish hook that blows up must never break ingestion. + def _boom(_event_type, _payload): + raise RuntimeError("transport exploded") + + engine = CorrelationEngine(publish_fn=_boom) + result = engine.ingest(_line(ip="5.5.5.5")) + assert result is not None + assert engine.events_indexed == 1 + + +def test_engine_runs_unchanged_without_publish_fn() -> None: + # Pre-bus behavior. No hook, no publishes, same indexing result. + engine = CorrelationEngine() + engine.ingest(_line(ip="7.7.7.7")) + engine.ingest(_line(ip="7.7.7.7")) + assert engine.events_indexed == 2 + + +def test_engine_ignores_lines_without_attacker_ip() -> None: + captured: list[tuple[str, dict]] = [] + engine = CorrelationEngine( + publish_fn=lambda event_type, payload: captured.append((event_type, payload)), + ) + # Line without src_ip — parser still returns a LogEvent but attacker_ip is empty. + line_no_ip = format_rfc5424( + service="http", + hostname="decky-01", + event_type="boot", + severity=SEVERITY_INFO, + timestamp=datetime.fromisoformat(_TS), + ) + engine.ingest(line_no_ip) + assert captured == [] + + +# ─── End-to-end through the bus ────────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_correlator_publishes_on_attacker_observed_topic(bus: FakeBus) -> None: + loop = asyncio.get_running_loop() + raw = make_thread_safe_publisher(bus, loop) + + def publish(event_type: str, payload: dict) -> None: + raw(_topics.attacker(_topics.ATTACKER_OBSERVED), payload, event_type) + + engine = CorrelationEngine(publish_fn=publish) + + sub = bus.subscribe("attacker.observed") + async with sub: + engine.ingest(_line(ip="8.8.8.8")) + event = await asyncio.wait_for(sub.__anext__(), timeout=2.0) + + assert event.topic == "attacker.observed" + assert event.type == "observed" + assert event.payload["attacker_ip"] == "8.8.8.8" + + +@pytest.mark.asyncio +async def test_correlator_degrades_cleanly_when_bus_disabled(monkeypatch: pytest.MonkeyPatch) -> None: + # DECNET_BUS_ENABLED=false returns NullBus; connect()+publish must never raise. + from decnet.bus.factory import get_bus + + monkeypatch.setenv("DECNET_BUS_ENABLED", "false") + b = get_bus(client_name="profiler") + await b.connect() + await b.publish("attacker.observed", {"attacker_ip": "1.2.3.4"}, event_type="observed") + await b.close()