diff --git a/decnet/intel/worker.py b/decnet/intel/worker.py index 2e3beefb..bb975bc9 100644 --- a/decnet/intel/worker.py +++ b/decnet/intel/worker.py @@ -20,6 +20,7 @@ from __future__ import annotations import asyncio import contextlib +import json from datetime import datetime, timedelta, timezone from typing import Any, Optional @@ -59,6 +60,63 @@ def _aggregate(verdicts: list[Optional[str]]) -> Optional[str]: return None +def _decode_json_list(value: Any) -> list[Any]: + if isinstance(value, list): + return value + if isinstance(value, str) and value: + try: + decoded = json.loads(value) + except (json.JSONDecodeError, TypeError): + return [] + return decoded if isinstance(decoded, list) else [] + return [] + + +def _build_intel_event_payload( + attacker_uuid: str, + ip: str, + row: dict[str, Any], + providers: list[IntelProvider], +) -> dict[str, Any]: + """Project the AttackerIntel row into the bus event the TTP worker + consumes as ``source_kind="intel"``. + + The TTP worker forwards the payload verbatim to the IntelLifter. + Per-provider taxonomy fields (categories, tags, threat_types) are + decoded back to native lists here so the lifter does not have to + care that the storage layer JSON-encodes them. + """ + return { + "attacker_uuid": attacker_uuid, + "attacker_ip": ip, + "aggregate_verdict": row.get("aggregate_verdict"), + "providers": [p.name for p in providers], + # AbuseIPDB + "abuseipdb_score": row.get("abuseipdb_score"), + "abuseipdb_categories": _decode_json_list( + row.get("abuseipdb_categories"), + ), + # GreyNoise + "greynoise_classification": row.get("greynoise_classification"), + "greynoise_name": row.get("greynoise_name"), + "greynoise_tags": _decode_json_list(row.get("greynoise_tags")), + # Feodo + "feodo_listed": row.get("feodo_listed"), + "feodo_malware_family": row.get("feodo_malware_family"), + # ThreatFox + "threatfox_listed": row.get("threatfox_listed"), + "threatfox_threat_types": _decode_json_list( + row.get("threatfox_threat_types"), + ), + "threatfox_ioc_types": _decode_json_list( + row.get("threatfox_ioc_types"), + ), + "threatfox_malware_families": _decode_json_list( + row.get("threatfox_malware_families"), + ), + } + + async def _enrich_one( attacker_uuid: str, ip: str, @@ -172,12 +230,9 @@ async def run_intel_loop( await publish_safely( bus, _topics.attacker(_topics.ATTACKER_INTEL_ENRICHED), - { - "attacker_uuid": attacker_uuid, - "attacker_ip": ip, - "aggregate_verdict": row.get("aggregate_verdict"), - "providers": [p.name for p in providers], - }, + _build_intel_event_payload( + attacker_uuid, ip, row, providers, + ), event_type=_topics.ATTACKER_INTEL_ENRICHED, ) except Exception: # noqa: BLE001 diff --git a/tests/intel/test_worker_publish.py b/tests/intel/test_worker_publish.py index d93ec468..2b7e6037 100644 --- a/tests/intel/test_worker_publish.py +++ b/tests/intel/test_worker_publish.py @@ -95,3 +95,49 @@ async def test_intel_worker_publishes_intel_enriched( assert payload["attacker_ip"] == "192.168.1.5" assert payload["aggregate_verdict"] == "malicious" assert "fake" in payload["providers"] + + +def test_build_intel_event_payload_projects_taxonomy_fields() -> None: + """Post-2026-05-02 audit: the bus payload now carries the per- + provider taxonomy fields the IntelLifter needs (categories, tags, + threat_types). JSON-string columns are decoded back to native + lists so the consumer does not have to know about storage shape. + """ + import json as _json + + row = { + "aggregate_verdict": "malicious", + "abuseipdb_score": 87, + "abuseipdb_categories": _json.dumps([14, 18, 22]), + "greynoise_classification": "malicious", + "greynoise_name": "Mirai", + "greynoise_tags": _json.dumps(["ssh_bruteforcer"]), + "feodo_listed": True, + "feodo_malware_family": "Emotet", + "threatfox_listed": True, + "threatfox_threat_types": _json.dumps(["botnet_cc"]), + "threatfox_ioc_types": _json.dumps(["ip:port"]), + "threatfox_malware_families": _json.dumps(["Sliver"]), + } + payload = _iw._build_intel_event_payload( + "att-2", "203.0.113.7", row, [_FakeProvider()], + ) + assert payload["abuseipdb_categories"] == [14, 18, 22] + assert payload["greynoise_tags"] == ["ssh_bruteforcer"] + assert payload["greynoise_name"] == "Mirai" + assert payload["feodo_malware_family"] == "Emotet" + assert payload["threatfox_threat_types"] == ["botnet_cc"] + assert payload["threatfox_ioc_types"] == ["ip:port"] + assert payload["threatfox_malware_families"] == ["Sliver"] + + +def test_build_intel_event_payload_tolerates_absent_columns() -> None: + """A pre-enrichment row should produce a payload with empty lists + rather than raising — the IntelLifter contract is to absorb + absence silently.""" + payload = _iw._build_intel_event_payload( + "att-3", "10.0.0.1", {}, [], + ) + assert payload["abuseipdb_categories"] == [] + assert payload["greynoise_tags"] == [] + assert payload["threatfox_threat_types"] == []