test(ttp): enable test_dropped_intel_enriched_still_produces_intel_tags

Removes the E.3.14b xfail marker and writes the test body:
- _StubRepo gains get_attacker_intel_row_by_uuid(uuid) backed by an
  optional intel_rows dict; existing tests pass None (no catch-up, no
  change to their behaviour).
- The test drives a session.ended event with NO intel.enriched published,
  injects an AttackerIntel row into the stub repo, and asserts the
  tagger is called with source_kind='intel' carrying the correct payload
  fields (abuseipdb_score, greynoise_classification).
- Pins the asymmetry contract: email.received has no catch-up path
  (sibling test already green); intel does.
This commit is contained in:
2026-05-10 08:30:44 -04:00
parent 6e7020f2aa
commit c39b63a431

View File

@@ -25,6 +25,7 @@ from decnet.bus.fake import FakeBus
from decnet.ttp import worker as _worker
from decnet.ttp.base import Tagger, TaggerEvent
from decnet.ttp.worker import _TOPICS, run_ttp_worker_loop
from decnet.web.db.models.attacker_intel import AttackerIntel
from decnet.web.db.models.ttp import TTPTag
@@ -87,11 +88,19 @@ class _StubRepo:
First call with a given uuid set returns the row count; replays
return zero (idempotent). Mirrors :meth:`SQLiteRepository.
_insert_tags_or_ignore` for tests without a real DB.
``intel_rows`` maps attacker_uuid → :class:`AttackerIntel` instance so
the E.3.14b catch-up test can inject a persisted intel row without a
real DB. Defaults to empty (None return) for all other tests.
"""
def __init__(self) -> None:
def __init__(
self,
intel_rows: dict[str, AttackerIntel] | None = None,
) -> None:
self._seen: set[str] = set()
self.calls: int = 0
self._intel_rows: dict[str, AttackerIntel] = intel_rows or {}
async def insert_tags(self, rows: list[TTPTag]) -> int:
self.calls += 1
@@ -100,6 +109,11 @@ class _StubRepo:
self._seen.add(r.uuid)
return len(new)
async def get_attacker_intel_row_by_uuid(
self, uuid: str,
) -> AttackerIntel | None:
return self._intel_rows.get(uuid)
async def _drive_worker(
bus: FakeBus,
@@ -283,15 +297,59 @@ def test_run_ttp_worker_loop_signature() -> None:
# ── Bus delivery asymmetry (still xfail — catch-up paths are E.3.14b) ─
@pytest.mark.xfail(
strict=True,
reason="catch-up via attacker.session.ended is design-deferred to "
"E.3.14b; today the worker fans events 1:1 by source_kind",
)
async def test_dropped_intel_enriched_still_produces_intel_tags(
fake_bus: FakeBus,
) -> None:
pytest.fail("intel catch-up path not yet implemented")
"""Dropping ``attacker.intel.enriched`` still produces intel-derived tags.
The catch-up path (E.3.14b): on ``attacker.session.ended`` the worker
reads the persisted ``AttackerIntel`` row and synthesizes an
``source_kind="intel"`` TaggerEvent. Idempotent UUIDs mean a later
``attacker.intel.enriched`` event would deduplicate; the asymmetry with
email (no catch-up) is pinned by the sibling test below.
"""
from datetime import datetime, timezone
intel_row = AttackerIntel(
uuid="row-uuid-1",
attacker_uuid="att-catchup",
attacker_ip="10.0.0.1",
abuseipdb_score=90,
abuseipdb_categories="[18, 22]",
greynoise_classification="malicious",
greynoise_name="",
greynoise_tags="[]",
feodo_listed=None,
threatfox_listed=None,
threatfox_threat_types="[]",
threatfox_ioc_types="[]",
threatfox_malware_families="[]",
aggregate_verdict="malicious",
expires_at=datetime(2099, 1, 1, tzinfo=timezone.utc),
)
tagger = _FixedTagger(tags=[_make_tag()])
repo = _StubRepo(intel_rows={"att-catchup": intel_row})
await _drive_worker(
fake_bus, tagger, repo,
# Only session.ended — intel.enriched is intentionally NOT published.
[(_topics.attacker(_topics.ATTACKER_SESSION_ENDED), {
"session_id": "sess-catchup",
"attacker_uuid": "att-catchup",
})],
)
intel_calls = [c for c in tagger.calls if c.source_kind == "intel"]
assert intel_calls, (
"expected tagger called with source_kind='intel' via catch-up path; "
"got source_kinds=" + str([c.source_kind for c in tagger.calls])
)
call = intel_calls[0]
assert call.attacker_uuid == "att-catchup"
assert call.session_id == "sess-catchup"
# source_id must be deterministic so replays hit INSERT OR IGNORE
assert "att-catchup" in call.source_id or "sess-catchup" in call.source_id
# The catch-up payload carries the intel fields IntelLifter predicates on
assert call.payload.get("abuseipdb_score") == 90
assert call.payload.get("greynoise_classification") == "malicious"
async def test_dropped_email_received_produces_no_email_tags(