diff --git a/tests/ttp/test_worker_bus.py b/tests/ttp/test_worker_bus.py index b77ccb54..bdf1ca0c 100644 --- a/tests/ttp/test_worker_bus.py +++ b/tests/ttp/test_worker_bus.py @@ -25,6 +25,7 @@ from decnet.bus.fake import FakeBus from decnet.ttp import worker as _worker from decnet.ttp.base import Tagger, TaggerEvent from decnet.ttp.worker import _TOPICS, run_ttp_worker_loop +from decnet.web.db.models.attacker_intel import AttackerIntel from decnet.web.db.models.ttp import TTPTag @@ -87,11 +88,19 @@ class _StubRepo: First call with a given uuid set returns the row count; replays return zero (idempotent). Mirrors :meth:`SQLiteRepository. _insert_tags_or_ignore` for tests without a real DB. + + ``intel_rows`` maps attacker_uuid → :class:`AttackerIntel` instance so + the E.3.14b catch-up test can inject a persisted intel row without a + real DB. Defaults to empty (None return) for all other tests. """ - def __init__(self) -> None: + def __init__( + self, + intel_rows: dict[str, AttackerIntel] | None = None, + ) -> None: self._seen: set[str] = set() self.calls: int = 0 + self._intel_rows: dict[str, AttackerIntel] = intel_rows or {} async def insert_tags(self, rows: list[TTPTag]) -> int: self.calls += 1 @@ -100,6 +109,11 @@ class _StubRepo: self._seen.add(r.uuid) return len(new) + async def get_attacker_intel_row_by_uuid( + self, uuid: str, + ) -> AttackerIntel | None: + return self._intel_rows.get(uuid) + async def _drive_worker( bus: FakeBus, @@ -283,15 +297,59 @@ def test_run_ttp_worker_loop_signature() -> None: # ── Bus delivery asymmetry (still xfail — catch-up paths are E.3.14b) ─ -@pytest.mark.xfail( - strict=True, - reason="catch-up via attacker.session.ended is design-deferred to " - "E.3.14b; today the worker fans events 1:1 by source_kind", -) async def test_dropped_intel_enriched_still_produces_intel_tags( fake_bus: FakeBus, ) -> None: - pytest.fail("intel catch-up path not yet implemented") + """Dropping ``attacker.intel.enriched`` still produces intel-derived tags. + + The catch-up path (E.3.14b): on ``attacker.session.ended`` the worker + reads the persisted ``AttackerIntel`` row and synthesizes an + ``source_kind="intel"`` TaggerEvent. Idempotent UUIDs mean a later + ``attacker.intel.enriched`` event would deduplicate; the asymmetry with + email (no catch-up) is pinned by the sibling test below. + """ + from datetime import datetime, timezone + + intel_row = AttackerIntel( + uuid="row-uuid-1", + attacker_uuid="att-catchup", + attacker_ip="10.0.0.1", + abuseipdb_score=90, + abuseipdb_categories="[18, 22]", + greynoise_classification="malicious", + greynoise_name="", + greynoise_tags="[]", + feodo_listed=None, + threatfox_listed=None, + threatfox_threat_types="[]", + threatfox_ioc_types="[]", + threatfox_malware_families="[]", + aggregate_verdict="malicious", + expires_at=datetime(2099, 1, 1, tzinfo=timezone.utc), + ) + tagger = _FixedTagger(tags=[_make_tag()]) + repo = _StubRepo(intel_rows={"att-catchup": intel_row}) + await _drive_worker( + fake_bus, tagger, repo, + # Only session.ended — intel.enriched is intentionally NOT published. + [(_topics.attacker(_topics.ATTACKER_SESSION_ENDED), { + "session_id": "sess-catchup", + "attacker_uuid": "att-catchup", + })], + ) + intel_calls = [c for c in tagger.calls if c.source_kind == "intel"] + assert intel_calls, ( + "expected tagger called with source_kind='intel' via catch-up path; " + "got source_kinds=" + str([c.source_kind for c in tagger.calls]) + ) + call = intel_calls[0] + assert call.attacker_uuid == "att-catchup" + assert call.session_id == "sess-catchup" + # source_id must be deterministic so replays hit INSERT OR IGNORE + assert "att-catchup" in call.source_id or "sess-catchup" in call.source_id + # The catch-up payload carries the intel fields IntelLifter predicates on + assert call.payload.get("abuseipdb_score") == 90 + assert call.payload.get("greynoise_classification") == "malicious" async def test_dropped_email_received_produces_no_email_tags(