diff --git a/decnet/profiler/worker.py b/decnet/profiler/worker.py index 8cccac2b..26c7b89d 100644 --- a/decnet/profiler/worker.py +++ b/decnet/profiler/worker.py @@ -17,7 +17,7 @@ import contextlib import json from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import Any +from typing import Any, Callable from decnet.bus import topics as _topics from decnet.bus.factory import get_bus @@ -49,6 +49,9 @@ class _WorkerState: engine: CorrelationEngine = field(default_factory=CorrelationEngine) last_log_id: int = 0 initialized: bool = False + # Optional bus hook — fires ``("scored", payload)`` per profile upsert. + # None when the bus is disabled or unreachable. + publish_attacker: Callable[[str, dict[str, Any]], None] | None = None async def attacker_profile_worker(repo: BaseRepository, *, interval: int = 30) -> None: @@ -75,7 +78,10 @@ async def attacker_profile_worker(repo: BaseRepository, *, interval: int = 30) - return raw_publish(_topics.attacker(event_type), payload, event_type) - state = _WorkerState(engine=CorrelationEngine(publish_fn=_publish_attacker)) + state = _WorkerState( + engine=CorrelationEngine(publish_fn=_publish_attacker), + publish_attacker=_publish_attacker, + ) _saved_cursor = await repo.get_state(_STATE_KEY) if _saved_cursor: state.last_log_id = _saved_cursor.get("last_log_id", 0) @@ -160,6 +166,20 @@ async def _update_profiles( _span.set_attribute("bounty_count", len(bounties)) _span.set_attribute("command_count", len(commands)) + if state.publish_attacker is not None: + try: + state.publish_attacker("scored", { + "attacker_ip": ip, + "event_count": record["event_count"], + "service_count": record["service_count"], + "decky_count": record["decky_count"], + "bounty_count": record["bounty_count"], + "credential_count": record["credential_count"], + "is_traversal": record["is_traversal"], + }) + except Exception as exc: + logger.warning("attacker worker: scored publish failed for %s: %s", ip, exc) + # Behavioral / fingerprint rollup lives in a sibling table so failures # here never block the core attacker profile upsert. try: diff --git a/tests/profiler/__init__.py b/tests/profiler/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/profiler/test_profiler_bus.py b/tests/profiler/test_profiler_bus.py new file mode 100644 index 00000000..952b3a34 --- /dev/null +++ b/tests/profiler/test_profiler_bus.py @@ -0,0 +1,145 @@ +"""Bus wiring for the profiler worker (DEBT-031, worker 4). + +The profiler publishes ``attacker.scored`` once per profile upsert. +Payload is a compact summary of the record the profiler just wrote to +the DB — enough for the MazeNET attacker pool to redraw without another +round-trip. + +Like every other bus-wired worker, ``DECNET_BUS_ENABLED=false`` must +leave the profiler fully functional (DB-only, no publish attempts). +""" +from __future__ import annotations + +import asyncio +from datetime import datetime +from unittest.mock import AsyncMock, MagicMock + +import pytest +import pytest_asyncio + +from decnet.bus import topics as _topics +from decnet.bus.fake import FakeBus +from decnet.bus.publish import make_thread_safe_publisher +from decnet.correlation.engine import CorrelationEngine +from decnet.logging.syslog_formatter import SEVERITY_INFO, format_rfc5424 +from decnet.profiler.worker import _WorkerState, _update_profiles + + +_TS = "2026-04-21T10:00:00+00:00" +_DT = datetime.fromisoformat(_TS) + + +def _line(ip: str = "1.2.3.4", decky: str = "decky-01") -> str: + return format_rfc5424( + service="ssh", + hostname=decky, + event_type="connection", + severity=SEVERITY_INFO, + timestamp=_DT, + src_ip=ip, + ) + + +def _stub_repo() -> MagicMock: + repo = MagicMock() + repo.get_bounties_for_ips = AsyncMock(return_value={}) + repo.upsert_attacker = AsyncMock(return_value="mock-uuid") + repo.upsert_attacker_behavior = AsyncMock() + return repo + + +@pytest_asyncio.fixture +async def bus() -> FakeBus: + b = FakeBus() + await b.connect() + yield b + await b.close() + + +# ─── publish hook fires per upsert ─────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_update_profiles_publishes_scored_per_ip() -> None: + captured: list[tuple[str, dict]] = [] + engine = CorrelationEngine() + engine.ingest(_line(ip="1.1.1.1", decky="decky-01")) + engine.ingest(_line(ip="2.2.2.2", decky="decky-02")) + + state = _WorkerState( + engine=engine, + publish_attacker=lambda event_type, payload: captured.append((event_type, payload)), + ) + + await _update_profiles(_stub_repo(), state, {"1.1.1.1", "2.2.2.2"}) + + assert len(captured) == 2 + for event_type, payload in captured: + assert event_type == "scored" + assert payload["attacker_ip"] in {"1.1.1.1", "2.2.2.2"} + assert payload["event_count"] == 1 + assert payload["decky_count"] == 1 + assert payload["is_traversal"] is False + + +@pytest.mark.asyncio +async def test_update_profiles_runs_without_publish_hook() -> None: + # Pre-bus behavior. No crash, upsert still happens. + engine = CorrelationEngine() + engine.ingest(_line(ip="3.3.3.3")) + + state = _WorkerState(engine=engine, publish_attacker=None) + repo = _stub_repo() + + await _update_profiles(repo, state, {"3.3.3.3"}) + repo.upsert_attacker.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_update_profiles_swallows_publish_failures() -> None: + engine = CorrelationEngine() + engine.ingest(_line(ip="4.4.4.4")) + + def _boom(_event_type, _payload): + raise RuntimeError("transport exploded") + + state = _WorkerState(engine=engine, publish_attacker=_boom) + repo = _stub_repo() + + # Must not raise; upsert still lands. + await _update_profiles(repo, state, {"4.4.4.4"}) + repo.upsert_attacker.assert_awaited_once() + + +# ─── End-to-end through the bus ────────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_profiler_publishes_on_attacker_scored_topic(bus: FakeBus) -> None: + loop = asyncio.get_running_loop() + raw = make_thread_safe_publisher(bus, loop) + + def publish(event_type: str, payload: dict) -> None: + raw(_topics.attacker(event_type), payload, event_type) + + engine = CorrelationEngine() + engine.ingest(_line(ip="8.8.8.8", decky="decky-01")) + state = _WorkerState(engine=engine, publish_attacker=publish) + + sub = bus.subscribe("attacker.scored") + async with sub: + await _update_profiles(_stub_repo(), state, {"8.8.8.8"}) + event = await asyncio.wait_for(sub.__anext__(), timeout=2.0) + + assert event.topic == "attacker.scored" + assert event.type == "scored" + assert event.payload["attacker_ip"] == "8.8.8.8" + + +@pytest.mark.asyncio +async def test_profiler_degrades_cleanly_when_bus_disabled(monkeypatch: pytest.MonkeyPatch) -> None: + from decnet.bus.factory import get_bus + + monkeypatch.setenv("DECNET_BUS_ENABLED", "false") + b = get_bus(client_name="profiler") + await b.connect() + await b.publish("attacker.scored", {"attacker_ip": "1.2.3.4"}, event_type="scored") + await b.close()