feat(profiler): publish attacker.scored per profile upsert (DEBT-031 worker 4)

The profiler worker threads its bus publisher through _WorkerState so
_update_profiles can emit a compact attacker.scored event for every
upsert.  Payload carries the headline counts (event/service/decky/
bounty/credential) plus is_traversal, so the MazeNET attacker pool can
redraw without a round-trip.

Bus stays optional: publish_attacker=None when DECNET_BUS_ENABLED=false
or get_bus() fails, and hook exceptions are logged without breaking the
upsert path.
This commit is contained in:
2026-04-21 16:54:40 -04:00
parent e51b65d7c3
commit 67c2e30f89
3 changed files with 167 additions and 2 deletions

View File

@@ -17,7 +17,7 @@ import contextlib
import json import json
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Any from typing import Any, Callable
from decnet.bus import topics as _topics from decnet.bus import topics as _topics
from decnet.bus.factory import get_bus from decnet.bus.factory import get_bus
@@ -49,6 +49,9 @@ class _WorkerState:
engine: CorrelationEngine = field(default_factory=CorrelationEngine) engine: CorrelationEngine = field(default_factory=CorrelationEngine)
last_log_id: int = 0 last_log_id: int = 0
initialized: bool = False initialized: bool = False
# Optional bus hook — fires ``("scored", payload)`` per profile upsert.
# None when the bus is disabled or unreachable.
publish_attacker: Callable[[str, dict[str, Any]], None] | None = None
async def attacker_profile_worker(repo: BaseRepository, *, interval: int = 30) -> None: async def attacker_profile_worker(repo: BaseRepository, *, interval: int = 30) -> None:
@@ -75,7 +78,10 @@ async def attacker_profile_worker(repo: BaseRepository, *, interval: int = 30) -
return return
raw_publish(_topics.attacker(event_type), payload, event_type) raw_publish(_topics.attacker(event_type), payload, event_type)
state = _WorkerState(engine=CorrelationEngine(publish_fn=_publish_attacker)) state = _WorkerState(
engine=CorrelationEngine(publish_fn=_publish_attacker),
publish_attacker=_publish_attacker,
)
_saved_cursor = await repo.get_state(_STATE_KEY) _saved_cursor = await repo.get_state(_STATE_KEY)
if _saved_cursor: if _saved_cursor:
state.last_log_id = _saved_cursor.get("last_log_id", 0) state.last_log_id = _saved_cursor.get("last_log_id", 0)
@@ -160,6 +166,20 @@ async def _update_profiles(
_span.set_attribute("bounty_count", len(bounties)) _span.set_attribute("bounty_count", len(bounties))
_span.set_attribute("command_count", len(commands)) _span.set_attribute("command_count", len(commands))
if state.publish_attacker is not None:
try:
state.publish_attacker("scored", {
"attacker_ip": ip,
"event_count": record["event_count"],
"service_count": record["service_count"],
"decky_count": record["decky_count"],
"bounty_count": record["bounty_count"],
"credential_count": record["credential_count"],
"is_traversal": record["is_traversal"],
})
except Exception as exc:
logger.warning("attacker worker: scored publish failed for %s: %s", ip, exc)
# Behavioral / fingerprint rollup lives in a sibling table so failures # Behavioral / fingerprint rollup lives in a sibling table so failures
# here never block the core attacker profile upsert. # here never block the core attacker profile upsert.
try: try:

View File

View File

@@ -0,0 +1,145 @@
"""Bus wiring for the profiler worker (DEBT-031, worker 4).
The profiler publishes ``attacker.scored`` once per profile upsert.
Payload is a compact summary of the record the profiler just wrote to
the DB — enough for the MazeNET attacker pool to redraw without another
round-trip.
Like every other bus-wired worker, ``DECNET_BUS_ENABLED=false`` must
leave the profiler fully functional (DB-only, no publish attempts).
"""
from __future__ import annotations
import asyncio
from datetime import datetime
from unittest.mock import AsyncMock, MagicMock
import pytest
import pytest_asyncio
from decnet.bus import topics as _topics
from decnet.bus.fake import FakeBus
from decnet.bus.publish import make_thread_safe_publisher
from decnet.correlation.engine import CorrelationEngine
from decnet.logging.syslog_formatter import SEVERITY_INFO, format_rfc5424
from decnet.profiler.worker import _WorkerState, _update_profiles
_TS = "2026-04-21T10:00:00+00:00"
_DT = datetime.fromisoformat(_TS)
def _line(ip: str = "1.2.3.4", decky: str = "decky-01") -> str:
return format_rfc5424(
service="ssh",
hostname=decky,
event_type="connection",
severity=SEVERITY_INFO,
timestamp=_DT,
src_ip=ip,
)
def _stub_repo() -> MagicMock:
repo = MagicMock()
repo.get_bounties_for_ips = AsyncMock(return_value={})
repo.upsert_attacker = AsyncMock(return_value="mock-uuid")
repo.upsert_attacker_behavior = AsyncMock()
return repo
@pytest_asyncio.fixture
async def bus() -> FakeBus:
b = FakeBus()
await b.connect()
yield b
await b.close()
# ─── publish hook fires per upsert ───────────────────────────────────────────
@pytest.mark.asyncio
async def test_update_profiles_publishes_scored_per_ip() -> None:
captured: list[tuple[str, dict]] = []
engine = CorrelationEngine()
engine.ingest(_line(ip="1.1.1.1", decky="decky-01"))
engine.ingest(_line(ip="2.2.2.2", decky="decky-02"))
state = _WorkerState(
engine=engine,
publish_attacker=lambda event_type, payload: captured.append((event_type, payload)),
)
await _update_profiles(_stub_repo(), state, {"1.1.1.1", "2.2.2.2"})
assert len(captured) == 2
for event_type, payload in captured:
assert event_type == "scored"
assert payload["attacker_ip"] in {"1.1.1.1", "2.2.2.2"}
assert payload["event_count"] == 1
assert payload["decky_count"] == 1
assert payload["is_traversal"] is False
@pytest.mark.asyncio
async def test_update_profiles_runs_without_publish_hook() -> None:
# Pre-bus behavior. No crash, upsert still happens.
engine = CorrelationEngine()
engine.ingest(_line(ip="3.3.3.3"))
state = _WorkerState(engine=engine, publish_attacker=None)
repo = _stub_repo()
await _update_profiles(repo, state, {"3.3.3.3"})
repo.upsert_attacker.assert_awaited_once()
@pytest.mark.asyncio
async def test_update_profiles_swallows_publish_failures() -> None:
engine = CorrelationEngine()
engine.ingest(_line(ip="4.4.4.4"))
def _boom(_event_type, _payload):
raise RuntimeError("transport exploded")
state = _WorkerState(engine=engine, publish_attacker=_boom)
repo = _stub_repo()
# Must not raise; upsert still lands.
await _update_profiles(repo, state, {"4.4.4.4"})
repo.upsert_attacker.assert_awaited_once()
# ─── End-to-end through the bus ──────────────────────────────────────────────
@pytest.mark.asyncio
async def test_profiler_publishes_on_attacker_scored_topic(bus: FakeBus) -> None:
loop = asyncio.get_running_loop()
raw = make_thread_safe_publisher(bus, loop)
def publish(event_type: str, payload: dict) -> None:
raw(_topics.attacker(event_type), payload, event_type)
engine = CorrelationEngine()
engine.ingest(_line(ip="8.8.8.8", decky="decky-01"))
state = _WorkerState(engine=engine, publish_attacker=publish)
sub = bus.subscribe("attacker.scored")
async with sub:
await _update_profiles(_stub_repo(), state, {"8.8.8.8"})
event = await asyncio.wait_for(sub.__anext__(), timeout=2.0)
assert event.topic == "attacker.scored"
assert event.type == "scored"
assert event.payload["attacker_ip"] == "8.8.8.8"
@pytest.mark.asyncio
async def test_profiler_degrades_cleanly_when_bus_disabled(monkeypatch: pytest.MonkeyPatch) -> None:
from decnet.bus.factory import get_bus
monkeypatch.setenv("DECNET_BUS_ENABLED", "false")
b = get_bus(client_name="profiler")
await b.connect()
await b.publish("attacker.scored", {"attacker_ip": "1.2.3.4"}, event_type="scored")
await b.close()