feat(correlation,profiler): publish attacker.observed on first sighting (DEBT-031 worker 3)

CorrelationEngine gains an optional publish_fn hook fired once per unique
attacker IP.  The profiler worker — sole caller of the engine today —
carries the bus physically, builds a thread-safe publisher, and wraps it
with the attacker.observed topic before handing it in.

Bus stays optional: if get_bus() fails or DECNET_BUS_ENABLED=false, the
engine runs publish_fn=None and the worker degrades to DB-only.  Hook
failures log a warning and never break ingestion.
This commit is contained in:
2026-04-21 16:53:03 -04:00
parent 34d9e37ab0
commit e51b65d7c3
4 changed files with 224 additions and 8 deletions

View File

@@ -24,6 +24,7 @@ from __future__ import annotations
from collections import defaultdict from collections import defaultdict
from pathlib import Path from pathlib import Path
from typing import Any, Callable
from rich.table import Table from rich.table import Table
@@ -33,17 +34,35 @@ from decnet.logging.syslog_formatter import (
SEVERITY_WARNING, SEVERITY_WARNING,
format_rfc5424, format_rfc5424,
) )
from decnet.logging import get_logger
from decnet.telemetry import traced as _traced, get_tracer as _get_tracer from decnet.telemetry import traced as _traced, get_tracer as _get_tracer
log = get_logger("correlation.engine")
# ``publish_fn(event_type, payload_dict)``. Sync to avoid rippling
# ``async`` through every call site of :meth:`CorrelationEngine.ingest`;
# the caller wraps bus-publish via
# :func:`decnet.bus.publish.make_thread_safe_publisher`, which is safe to
# invoke from any thread including the event-loop thread.
CorrelationPublishFn = Callable[[str, dict[str, Any]], None]
class CorrelationEngine: class CorrelationEngine:
def __init__(self) -> None: def __init__(
self,
*,
publish_fn: CorrelationPublishFn | None = None,
) -> None:
# attacker_ip → chronological list of events (only events with an IP) # attacker_ip → chronological list of events (only events with an IP)
self._events: dict[str, list[LogEvent]] = defaultdict(list) self._events: dict[str, list[LogEvent]] = defaultdict(list)
# Total lines parsed (including no-IP and non-DECNET lines) # Total lines parsed (including no-IP and non-DECNET lines)
self.lines_parsed: int = 0 self.lines_parsed: int = 0
# Total events indexed (had an attacker_ip) # Total events indexed (had an attacker_ip)
self.events_indexed: int = 0 self.events_indexed: int = 0
# Optional bus hook — invoked on first-sighting of an attacker IP.
# Always fires exactly once per IP for the lifetime of the engine.
self._publish_fn = publish_fn
# ------------------------------------------------------------------ # # ------------------------------------------------------------------ #
# Ingestion # # Ingestion #
@@ -61,8 +80,23 @@ class CorrelationEngine:
if event is None: if event is None:
return None return None
if event.attacker_ip: if event.attacker_ip:
first_sighting = event.attacker_ip not in self._events
self._events[event.attacker_ip].append(event) self._events[event.attacker_ip].append(event)
self.events_indexed += 1 self.events_indexed += 1
if first_sighting and self._publish_fn is not None:
try:
self._publish_fn(
"observed",
{
"attacker_ip": event.attacker_ip,
"decky": event.decky,
"service": event.service,
"event_type": event.event_type,
"first_seen": event.timestamp.isoformat(),
},
)
except Exception as exc:
log.warning("correlation publish hook failed: %s", exc)
return event return event
@_traced("correlation.ingest_file") @_traced("correlation.ingest_file")

View File

@@ -13,11 +13,15 @@ Complexity per cycle: O(new_logs + affected_ips) instead of O(total_logs²).
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import contextlib
import json import json
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Any from typing import Any
from decnet.bus import topics as _topics
from decnet.bus.factory import get_bus
from decnet.bus.publish import make_thread_safe_publisher
from decnet.correlation.engine import CorrelationEngine from decnet.correlation.engine import CorrelationEngine
from decnet.correlation.parser import LogEvent from decnet.correlation.parser import LogEvent
from decnet.logging import get_logger from decnet.logging import get_logger
@@ -50,18 +54,44 @@ class _WorkerState:
async def attacker_profile_worker(repo: BaseRepository, *, interval: int = 30) -> None: async def attacker_profile_worker(repo: BaseRepository, *, interval: int = 30) -> None:
"""Periodically updates the Attacker table incrementally. Designed to run as an asyncio Task.""" """Periodically updates the Attacker table incrementally. Designed to run as an asyncio Task."""
logger.info("attacker profile worker started interval=%ds", interval) logger.info("attacker profile worker started interval=%ds", interval)
state = _WorkerState()
# Optional bus wiring — correlator-family publishes ride on the profiler
# worker because CorrelationEngine lives inside it. If the bus is off or
# unreachable the engine runs with publish_fn=None and downstream degrades
# to DB-only.
bus = None
try:
bus = get_bus(client_name="profiler")
await bus.connect()
except Exception as exc:
logger.warning("profiler: bus unavailable, continuing without publish: %s", exc)
bus = None
loop = asyncio.get_running_loop()
raw_publish = make_thread_safe_publisher(bus, loop) if bus is not None else None
def _publish_attacker(event_type: str, payload: dict[str, Any]) -> None:
if raw_publish is None:
return
raw_publish(_topics.attacker(event_type), payload, event_type)
state = _WorkerState(engine=CorrelationEngine(publish_fn=_publish_attacker))
_saved_cursor = await repo.get_state(_STATE_KEY) _saved_cursor = await repo.get_state(_STATE_KEY)
if _saved_cursor: if _saved_cursor:
state.last_log_id = _saved_cursor.get("last_log_id", 0) state.last_log_id = _saved_cursor.get("last_log_id", 0)
state.initialized = True state.initialized = True
logger.info("attacker worker: resumed from cursor last_log_id=%d", state.last_log_id) logger.info("attacker worker: resumed from cursor last_log_id=%d", state.last_log_id)
while True: try:
await asyncio.sleep(interval) while True:
try: await asyncio.sleep(interval)
await _incremental_update(repo, state) try:
except Exception as exc: await _incremental_update(repo, state)
logger.error("attacker worker: update failed: %s", exc) except Exception as exc:
logger.error("attacker worker: update failed: %s", exc)
finally:
if bus is not None:
with contextlib.suppress(Exception):
await bus.close()
@_traced("profiler.incremental_update") @_traced("profiler.incremental_update")

View File

View File

@@ -0,0 +1,152 @@
"""Bus wiring for the correlation engine (DEBT-031, worker 3).
The correlator is not a standalone worker — ``CorrelationEngine`` is a
batch class instantiated inside the profiler worker. DEBT-031 wires it
via an optional ``publish_fn`` constructor arg: on the first sighting of
an attacker IP, the engine emits ``("observed", payload)`` through the
hook. The profiler worker carries the bus physically and translates
those sync hook calls into ``attacker.observed`` publishes.
"""
from __future__ import annotations
import asyncio
from datetime import datetime
import pytest
import pytest_asyncio
from decnet.bus import topics as _topics
from decnet.bus.fake import FakeBus
from decnet.bus.publish import make_thread_safe_publisher
from decnet.correlation.engine import CorrelationEngine
from decnet.logging.syslog_formatter import SEVERITY_INFO, format_rfc5424
_TS = "2026-04-21T10:00:00+00:00"
def _line(ip: str = "1.2.3.4", decky: str = "decky-01", event_type: str = "connection") -> str:
return format_rfc5424(
service="http",
hostname=decky,
event_type=event_type,
severity=SEVERITY_INFO,
timestamp=datetime.fromisoformat(_TS),
src_ip=ip,
)
@pytest_asyncio.fixture
async def bus() -> FakeBus:
b = FakeBus()
await b.connect()
yield b
await b.close()
# ─── Engine-level publish hook ───────────────────────────────────────────────
def test_engine_publishes_once_on_first_sighting() -> None:
captured: list[tuple[str, dict]] = []
engine = CorrelationEngine(
publish_fn=lambda event_type, payload: captured.append((event_type, payload)),
)
# Same IP three times: only the first should publish.
engine.ingest(_line(ip="9.9.9.9"))
engine.ingest(_line(ip="9.9.9.9", event_type="login"))
engine.ingest(_line(ip="9.9.9.9", decky="decky-02"))
assert len(captured) == 1
event_type, payload = captured[0]
assert event_type == "observed"
assert payload["attacker_ip"] == "9.9.9.9"
assert payload["decky"] == "decky-01"
assert payload["service"] == "http"
assert payload["event_type"] == "connection"
assert payload["first_seen"].startswith("2026-04-21T10:00:00")
def test_engine_publishes_per_unique_ip() -> None:
captured: list[tuple[str, dict]] = []
engine = CorrelationEngine(
publish_fn=lambda event_type, payload: captured.append((event_type, payload)),
)
engine.ingest(_line(ip="1.1.1.1"))
engine.ingest(_line(ip="2.2.2.2"))
engine.ingest(_line(ip="1.1.1.1")) # dup, no publish
engine.ingest(_line(ip="3.3.3.3"))
ips = [p["attacker_ip"] for _, p in captured]
assert ips == ["1.1.1.1", "2.2.2.2", "3.3.3.3"]
def test_engine_swallows_publish_fn_failures() -> None:
# A publish hook that blows up must never break ingestion.
def _boom(_event_type, _payload):
raise RuntimeError("transport exploded")
engine = CorrelationEngine(publish_fn=_boom)
result = engine.ingest(_line(ip="5.5.5.5"))
assert result is not None
assert engine.events_indexed == 1
def test_engine_runs_unchanged_without_publish_fn() -> None:
# Pre-bus behavior. No hook, no publishes, same indexing result.
engine = CorrelationEngine()
engine.ingest(_line(ip="7.7.7.7"))
engine.ingest(_line(ip="7.7.7.7"))
assert engine.events_indexed == 2
def test_engine_ignores_lines_without_attacker_ip() -> None:
captured: list[tuple[str, dict]] = []
engine = CorrelationEngine(
publish_fn=lambda event_type, payload: captured.append((event_type, payload)),
)
# Line without src_ip — parser still returns a LogEvent but attacker_ip is empty.
line_no_ip = format_rfc5424(
service="http",
hostname="decky-01",
event_type="boot",
severity=SEVERITY_INFO,
timestamp=datetime.fromisoformat(_TS),
)
engine.ingest(line_no_ip)
assert captured == []
# ─── End-to-end through the bus ──────────────────────────────────────────────
@pytest.mark.asyncio
async def test_correlator_publishes_on_attacker_observed_topic(bus: FakeBus) -> None:
loop = asyncio.get_running_loop()
raw = make_thread_safe_publisher(bus, loop)
def publish(event_type: str, payload: dict) -> None:
raw(_topics.attacker(_topics.ATTACKER_OBSERVED), payload, event_type)
engine = CorrelationEngine(publish_fn=publish)
sub = bus.subscribe("attacker.observed")
async with sub:
engine.ingest(_line(ip="8.8.8.8"))
event = await asyncio.wait_for(sub.__anext__(), timeout=2.0)
assert event.topic == "attacker.observed"
assert event.type == "observed"
assert event.payload["attacker_ip"] == "8.8.8.8"
@pytest.mark.asyncio
async def test_correlator_degrades_cleanly_when_bus_disabled(monkeypatch: pytest.MonkeyPatch) -> None:
# DECNET_BUS_ENABLED=false returns NullBus; connect()+publish must never raise.
from decnet.bus.factory import get_bus
monkeypatch.setenv("DECNET_BUS_ENABLED", "false")
b = get_bus(client_name="profiler")
await b.connect()
await b.publish("attacker.observed", {"attacker_ip": "1.2.3.4"}, event_type="observed")
await b.close()