From b5ce236cab1713c52e487a3f9780f7140cf2d1ab Mon Sep 17 00:00:00 2001 From: anti Date: Sat, 2 May 2026 02:38:24 -0400 Subject: [PATCH] test(bus): pin scope-(2) producer wiring for reuse / clusterer / intel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three producer-side regression guards. Each drives the worker's run loop with a fake bus + stubbed repo and asserts the documented topic fires when the producer has data: - reuse correlator → credential.reuse.detected (one finding row) - clusterer → identity.formed + identity.merged (one ClusterResult) - intel worker → attacker.intel.enriched (one unenriched attacker + a fake provider returning a "malicious" verdict) These complement commit 1's attacker.session.ended producer test — together the four cover every TTP-relevant publisher in the tree (modulo email.received, which has no producer yet; tracked in DEBT.md). --- tests/clustering/test_worker_publish.py | 96 +++++++++++++++++ .../correlation/test_reuse_worker_publish.py | 102 ++++++++++++++++++ tests/intel/test_worker_publish.py | 97 +++++++++++++++++ 3 files changed, 295 insertions(+) create mode 100644 tests/clustering/test_worker_publish.py create mode 100644 tests/correlation/test_reuse_worker_publish.py create mode 100644 tests/intel/test_worker_publish.py diff --git a/tests/clustering/test_worker_publish.py b/tests/clustering/test_worker_publish.py new file mode 100644 index 00000000..17409c28 --- /dev/null +++ b/tests/clustering/test_worker_publish.py @@ -0,0 +1,96 @@ +"""Clusterer publishes ``identity.formed`` and ``identity.merged``. + +Pins the producer wiring. The clusterer reports its tick output via a +:class:`ClusterResult`; the worker fans the four sub-lists out to the +matching ``identity.*`` topics. This test runs one tick with a fake +clusterer that returns a result containing one formed and one merged +identity, and asserts the bus saw both envelopes. +""" +from __future__ import annotations + +import asyncio +from typing import Any + +import pytest + +from decnet.bus import topics as _topics +from decnet.bus.fake import FakeBus +from decnet.clustering import worker as _cw +from decnet.clustering.base import ClusterResult, Clusterer +from decnet.web.db.repository import BaseRepository + + +class _FakeClusterer(Clusterer): + name = "fake" + + def __init__(self, results: list[ClusterResult]) -> None: + self._results = list(results) + + async def tick(self, _repo: BaseRepository) -> ClusterResult: + if self._results: + return self._results.pop(0) + return ClusterResult() + + +@pytest.mark.asyncio +async def test_clusterer_publishes_identity_formed_and_merged( + monkeypatch: pytest.MonkeyPatch, +) -> None: + bus = FakeBus() + await bus.connect() + monkeypatch.setattr(_cw, "get_bus", lambda *_a, **_kw: bus) + + captured: list[tuple[str, dict[str, Any]]] = [] + sub = bus.subscribe("identity.>") + + async def drain() -> None: + try: + async with sub: + async for ev in sub: + captured.append((ev.topic, ev.payload)) + except Exception: + pass + + drain_task = asyncio.create_task(drain()) + await asyncio.sleep(0) + + result = ClusterResult( + identities_formed=[ + {"identity_uuid": "id-1", "observation_uuids": ["obs-1", "obs-2"]}, + ], + identities_merged=[ + {"winner_uuid": "id-1", "loser_uuid": "id-9"}, + ], + ) + fake = _FakeClusterer([result]) + + shutdown = asyncio.Event() + + class _RepoStub: + pass + + loop_task = asyncio.create_task(_cw.run_clusterer_loop( + _RepoStub(), # type: ignore[arg-type] + poll_interval_secs=0.05, clusterer=fake, + shutdown=shutdown, + )) + await asyncio.sleep(0.15) + shutdown.set() + await asyncio.wait_for(loop_task, timeout=2.0) + drain_task.cancel() + await bus.close() + + topics_seen = [t for t, _ in captured] + assert _topics.identity(_topics.IDENTITY_FORMED) in topics_seen + assert _topics.identity(_topics.IDENTITY_MERGED) in topics_seen + formed = next( + p for t, p in captured + if t == _topics.identity(_topics.IDENTITY_FORMED) + ) + assert formed["identity_uuid"] == "id-1" + merged = next( + p for t, p in captured + if t == _topics.identity(_topics.IDENTITY_MERGED) + ) + assert merged["winner_uuid"] == "id-1" + assert merged["loser_uuid"] == "id-9" diff --git a/tests/correlation/test_reuse_worker_publish.py b/tests/correlation/test_reuse_worker_publish.py new file mode 100644 index 00000000..a17ef259 --- /dev/null +++ b/tests/correlation/test_reuse_worker_publish.py @@ -0,0 +1,102 @@ +"""Reuse correlator publishes ``credential.reuse.detected``. + +Pins the producer wiring so a regression that silently drops the +publish (e.g. someone moves the loop body or mis-spells the topic +constant) trips this test on the next run. +""" +from __future__ import annotations + +import asyncio +from typing import Any + +import pytest + +from decnet.bus import topics as _topics +from decnet.bus.fake import FakeBus +from decnet.correlation import reuse_worker as _rw + + +@pytest.mark.asyncio +async def test_reuse_correlator_publishes_on_finding( + monkeypatch: pytest.MonkeyPatch, +) -> None: + bus = FakeBus() + await bus.connect() + + async def _fake_get_bus(*_a: Any, **_kw: Any) -> FakeBus: + return bus + + # Worker calls `get_bus(...)` synchronously; replace with a sync + # callable returning the live fake. `connect()` on the fake is a + # no-op, so calling it again from inside the worker is harmless. + monkeypatch.setattr( + _rw, "get_bus", lambda *_a, **_kw: bus, + ) + + captured: list[tuple[str, dict[str, Any]]] = [] + sub = bus.subscribe( + _topics.credential(_topics.CREDENTIAL_REUSE_DETECTED), + ) + + async def drain() -> None: + try: + async with sub: + async for ev in sub: + captured.append((ev.topic, ev.payload)) + except Exception: + pass + + drain_task = asyncio.create_task(drain()) + await asyncio.sleep(0) + + # Stub the engine's correlate to return a single reuse row on the + # first tick. Subsequent ticks return [] so the publish doesn't + # spam. + seen_ticks: list[int] = [] + finding = { + "id": "reuse-1", + "secret_kind": "password", + "target_count": 3, + "attacker_uuids": ["att-1", "att-2"], + "attacker_ips": ["1.2.3.4", "5.6.7.8"], + "deckies": ["decky-a", "decky-b"], + "services": ["ssh", "ftp"], + } + + async def _fake_correlate( + _self: Any, _repo: Any, *, min_targets: int = 2, + ) -> list[dict[str, Any]]: + seen_ticks.append(0) + return [finding] if len(seen_ticks) == 1 else [] + + monkeypatch.setattr( + _rw.CorrelationEngine, "correlate_credential_reuse", _fake_correlate, + ) + + shutdown = asyncio.Event() + + class _RepoStub: + async def get_state(self, _key: str) -> None: + return None + + async def set_state(self, _key: str, _val: dict[str, Any]) -> None: + return None + + loop_task = asyncio.create_task(_rw.run_reuse_loop( + _RepoStub(), # type: ignore[arg-type] + poll_interval_secs=0.05, shutdown=shutdown, + )) + # One tick is enough — the stub returns the finding immediately, + # publishes, then the next tick yields []. Settle, then stop. + await asyncio.sleep(0.15) + shutdown.set() + await asyncio.wait_for(loop_task, timeout=2.0) + drain_task.cancel() + await bus.close() + + assert len(captured) >= 1 + topic, payload = captured[0] + assert topic == _topics.credential(_topics.CREDENTIAL_REUSE_DETECTED) + assert payload["id"] == "reuse-1" + assert payload["target_count"] == 3 + assert payload["secret_kind"] == "password" diff --git a/tests/intel/test_worker_publish.py b/tests/intel/test_worker_publish.py new file mode 100644 index 00000000..d93ec468 --- /dev/null +++ b/tests/intel/test_worker_publish.py @@ -0,0 +1,97 @@ +"""Intel worker publishes ``attacker.intel.enriched`` per enriched row. + +Pins the producer wiring. The worker drains +``repo.get_unenriched_attackers``, calls the providers' ``lookup``, +upserts via ``repo.upsert_attacker_intel``, and publishes +``attacker.intel.enriched`` per row. +""" +from __future__ import annotations + +import asyncio +import os +from typing import Any + +import pytest + +from decnet.bus import topics as _topics +from decnet.bus.fake import FakeBus +from decnet.intel import worker as _iw +from decnet.intel.base import IntelProvider, IntelResult + + +class _FakeProvider(IntelProvider): + name = "fake" + + async def lookup(self, ip: str) -> IntelResult: + return IntelResult( + provider="fake", + column_updates={"fake_classification": "malicious"}, + verdict="malicious", + ) + + +class _RepoStub: + def __init__(self, pending: list[dict[str, Any]]) -> None: + self._pending = pending + self._yielded = False + self.upserts: list[dict[str, Any]] = [] + + async def get_unenriched_attackers( + self, *, limit: int = 100, + ) -> list[dict[str, Any]]: + if not self._yielded: + self._yielded = True + return list(self._pending) + return [] + + async def upsert_attacker_intel(self, row: dict[str, Any]) -> None: + self.upserts.append(row) + + +@pytest.mark.asyncio +async def test_intel_worker_publishes_intel_enriched( + monkeypatch: pytest.MonkeyPatch, +) -> None: + bus = FakeBus() + await bus.connect() + monkeypatch.setattr(_iw, "get_bus", lambda *_a, **_kw: bus) + + captured: list[tuple[str, dict[str, Any]]] = [] + sub = bus.subscribe(_topics.attacker(_topics.ATTACKER_INTEL_ENRICHED)) + + async def drain() -> None: + try: + async with sub: + async for ev in sub: + captured.append((ev.topic, ev.payload)) + except Exception: + pass + + drain_task = asyncio.create_task(drain()) + await asyncio.sleep(0) + + repo = _RepoStub([ + {"uuid": "att-1", "ip": "192.168.1.5"}, + ]) + shutdown = asyncio.Event() + + loop_task = asyncio.create_task(_iw.run_intel_loop( + repo, # type: ignore[arg-type] + poll_interval_secs=0.05, ttl_hours=24, + providers=[_FakeProvider()], + shutdown=shutdown, + )) + await asyncio.sleep(0.2) + shutdown.set() + await asyncio.wait_for(loop_task, timeout=2.0) + drain_task.cancel() + await bus.close() + + assert len(repo.upserts) == 1 + assert len(captured) == 1 + topic, payload = captured[0] + assert topic == _topics.attacker(_topics.ATTACKER_INTEL_ENRICHED) + assert payload["attacker_uuid"] == "att-1" + assert payload["attacker_ip"] == "192.168.1.5" + assert payload["aggregate_verdict"] == "malicious" + assert "fake" in payload["providers"]