Files
DECNET/tests/profiler/test_profiler_bus.py
anti f2b3393669 chore: relicense to AGPL-3.0-or-later and add SPDX headers
Replaces LICENSE (GPLv3 -> AGPLv3) and prepends
`SPDX-License-Identifier: AGPL-3.0-or-later` to every source file
across decnet/, decnet_web/, tests/, scripts/, and tools/.

Rationale: closes the GPLv3 ASP loophole so any party operating a
modified DECNET as a network service must offer their modified
source. Personal copyright (Samuel Paschuan) + inbound=outbound
contributions make a future unilateral relicense infeasible.

- LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt)
- COPYRIGHT: project copyright notice
- tools/add_spdx_headers.py: idempotent header injector
  (shebang- and PEP 263-aware)

Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh).
No behavior change; comments only.
2026-05-22 21:04:16 -04:00

147 lines
4.8 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""Bus wiring for the profiler worker (DEBT-031, worker 4).
The profiler publishes ``attacker.scored`` once per profile upsert.
Payload is a compact summary of the record the profiler just wrote to
the DB — enough for the MazeNET attacker pool to redraw without another
round-trip.
Like every other bus-wired worker, ``DECNET_BUS_ENABLED=false`` must
leave the profiler fully functional (DB-only, no publish attempts).
"""
from __future__ import annotations
import asyncio
from datetime import datetime
from unittest.mock import AsyncMock, MagicMock
import pytest
import pytest_asyncio
from decnet.bus import topics as _topics
from decnet.bus.fake import FakeBus
from decnet.bus.publish import make_thread_safe_publisher
from decnet.correlation.engine import CorrelationEngine
from decnet.logging.syslog_formatter import SEVERITY_INFO, format_rfc5424
from decnet.profiler.worker import _WorkerState, _update_profiles
_TS = "2026-04-21T10:00:00+00:00"
_DT = datetime.fromisoformat(_TS)
def _line(ip: str = "1.2.3.4", decky: str = "decky-01") -> str:
return format_rfc5424(
service="ssh",
hostname=decky,
event_type="connection",
severity=SEVERITY_INFO,
timestamp=_DT,
src_ip=ip,
)
def _stub_repo() -> MagicMock:
repo = MagicMock()
repo.get_bounties_for_ips = AsyncMock(return_value={})
repo.upsert_attacker = AsyncMock(return_value="mock-uuid")
repo.upsert_attacker_behavior = AsyncMock()
return repo
@pytest_asyncio.fixture
async def bus() -> FakeBus:
b = FakeBus()
await b.connect()
yield b
await b.close()
# ─── publish hook fires per upsert ───────────────────────────────────────────
@pytest.mark.asyncio
async def test_update_profiles_publishes_scored_per_ip() -> None:
captured: list[tuple[str, dict]] = []
engine = CorrelationEngine()
engine.ingest(_line(ip="1.1.1.1", decky="decky-01"))
engine.ingest(_line(ip="2.2.2.2", decky="decky-02"))
state = _WorkerState(
engine=engine,
publish_attacker=lambda event_type, payload: captured.append((event_type, payload)),
)
await _update_profiles(_stub_repo(), state, {"1.1.1.1", "2.2.2.2"})
assert len(captured) == 2
for event_type, payload in captured:
assert event_type == "scored"
assert payload["attacker_ip"] in {"1.1.1.1", "2.2.2.2"}
assert payload["event_count"] == 1
assert payload["decky_count"] == 1
assert payload["is_traversal"] is False
@pytest.mark.asyncio
async def test_update_profiles_runs_without_publish_hook() -> None:
# Pre-bus behavior. No crash, upsert still happens.
engine = CorrelationEngine()
engine.ingest(_line(ip="3.3.3.3"))
state = _WorkerState(engine=engine, publish_attacker=None)
repo = _stub_repo()
await _update_profiles(repo, state, {"3.3.3.3"})
repo.upsert_attacker.assert_awaited_once()
@pytest.mark.asyncio
async def test_update_profiles_swallows_publish_failures() -> None:
engine = CorrelationEngine()
engine.ingest(_line(ip="4.4.4.4"))
def _boom(_event_type, _payload):
raise RuntimeError("transport exploded")
state = _WorkerState(engine=engine, publish_attacker=_boom)
repo = _stub_repo()
# Must not raise; upsert still lands.
await _update_profiles(repo, state, {"4.4.4.4"})
repo.upsert_attacker.assert_awaited_once()
# ─── End-to-end through the bus ──────────────────────────────────────────────
@pytest.mark.asyncio
async def test_profiler_publishes_on_attacker_scored_topic(bus: FakeBus) -> None:
loop = asyncio.get_running_loop()
raw = make_thread_safe_publisher(bus, loop)
def publish(event_type: str, payload: dict) -> None:
raw(_topics.attacker(event_type), payload, event_type)
engine = CorrelationEngine()
engine.ingest(_line(ip="8.8.8.8", decky="decky-01"))
state = _WorkerState(engine=engine, publish_attacker=publish)
sub = bus.subscribe("attacker.scored")
async with sub:
await _update_profiles(_stub_repo(), state, {"8.8.8.8"})
event = await asyncio.wait_for(sub.__anext__(), timeout=2.0)
assert event.topic == "attacker.scored"
assert event.type == "scored"
assert event.payload["attacker_ip"] == "8.8.8.8"
@pytest.mark.asyncio
async def test_profiler_degrades_cleanly_when_bus_disabled(monkeypatch: pytest.MonkeyPatch) -> None:
from decnet.bus.factory import get_bus
monkeypatch.setenv("DECNET_BUS_ENABLED", "false")
b = get_bus(client_name="profiler")
await b.connect()
await b.publish("attacker.scored", {"attacker_ip": "1.2.3.4"}, event_type="scored")
await b.close()