feat(intel): project per-provider taxonomy into attacker.intel.enriched payload

The TTP worker forwards the bus payload verbatim to the IntelLifter as
TaggerEvent.payload. The pre-audit publish payload only carried
{attacker_uuid, attacker_ip, aggregate_verdict, providers}, so even with
the new AttackerIntel taxonomy columns populated the lifter still saw
nothing. Lift the relevant fields (categories / tags / threat_types /
malware family / score / classification) into the bus event and decode
JSON-string list columns back to native lists at the boundary.
This commit is contained in:
2026-05-02 18:08:29 -04:00
parent 999d3494b4
commit a31ad82880
2 changed files with 107 additions and 6 deletions

View File

@@ -20,6 +20,7 @@ from __future__ import annotations
import asyncio import asyncio
import contextlib import contextlib
import json
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import Any, Optional from typing import Any, Optional
@@ -59,6 +60,63 @@ def _aggregate(verdicts: list[Optional[str]]) -> Optional[str]:
return None return None
def _decode_json_list(value: Any) -> list[Any]:
if isinstance(value, list):
return value
if isinstance(value, str) and value:
try:
decoded = json.loads(value)
except (json.JSONDecodeError, TypeError):
return []
return decoded if isinstance(decoded, list) else []
return []
def _build_intel_event_payload(
attacker_uuid: str,
ip: str,
row: dict[str, Any],
providers: list[IntelProvider],
) -> dict[str, Any]:
"""Project the AttackerIntel row into the bus event the TTP worker
consumes as ``source_kind="intel"``.
The TTP worker forwards the payload verbatim to the IntelLifter.
Per-provider taxonomy fields (categories, tags, threat_types) are
decoded back to native lists here so the lifter does not have to
care that the storage layer JSON-encodes them.
"""
return {
"attacker_uuid": attacker_uuid,
"attacker_ip": ip,
"aggregate_verdict": row.get("aggregate_verdict"),
"providers": [p.name for p in providers],
# AbuseIPDB
"abuseipdb_score": row.get("abuseipdb_score"),
"abuseipdb_categories": _decode_json_list(
row.get("abuseipdb_categories"),
),
# GreyNoise
"greynoise_classification": row.get("greynoise_classification"),
"greynoise_name": row.get("greynoise_name"),
"greynoise_tags": _decode_json_list(row.get("greynoise_tags")),
# Feodo
"feodo_listed": row.get("feodo_listed"),
"feodo_malware_family": row.get("feodo_malware_family"),
# ThreatFox
"threatfox_listed": row.get("threatfox_listed"),
"threatfox_threat_types": _decode_json_list(
row.get("threatfox_threat_types"),
),
"threatfox_ioc_types": _decode_json_list(
row.get("threatfox_ioc_types"),
),
"threatfox_malware_families": _decode_json_list(
row.get("threatfox_malware_families"),
),
}
async def _enrich_one( async def _enrich_one(
attacker_uuid: str, attacker_uuid: str,
ip: str, ip: str,
@@ -172,12 +230,9 @@ async def run_intel_loop(
await publish_safely( await publish_safely(
bus, bus,
_topics.attacker(_topics.ATTACKER_INTEL_ENRICHED), _topics.attacker(_topics.ATTACKER_INTEL_ENRICHED),
{ _build_intel_event_payload(
"attacker_uuid": attacker_uuid, attacker_uuid, ip, row, providers,
"attacker_ip": ip, ),
"aggregate_verdict": row.get("aggregate_verdict"),
"providers": [p.name for p in providers],
},
event_type=_topics.ATTACKER_INTEL_ENRICHED, event_type=_topics.ATTACKER_INTEL_ENRICHED,
) )
except Exception: # noqa: BLE001 except Exception: # noqa: BLE001

View File

@@ -95,3 +95,49 @@ async def test_intel_worker_publishes_intel_enriched(
assert payload["attacker_ip"] == "192.168.1.5" assert payload["attacker_ip"] == "192.168.1.5"
assert payload["aggregate_verdict"] == "malicious" assert payload["aggregate_verdict"] == "malicious"
assert "fake" in payload["providers"] assert "fake" in payload["providers"]
def test_build_intel_event_payload_projects_taxonomy_fields() -> None:
"""Post-2026-05-02 audit: the bus payload now carries the per-
provider taxonomy fields the IntelLifter needs (categories, tags,
threat_types). JSON-string columns are decoded back to native
lists so the consumer does not have to know about storage shape.
"""
import json as _json
row = {
"aggregate_verdict": "malicious",
"abuseipdb_score": 87,
"abuseipdb_categories": _json.dumps([14, 18, 22]),
"greynoise_classification": "malicious",
"greynoise_name": "Mirai",
"greynoise_tags": _json.dumps(["ssh_bruteforcer"]),
"feodo_listed": True,
"feodo_malware_family": "Emotet",
"threatfox_listed": True,
"threatfox_threat_types": _json.dumps(["botnet_cc"]),
"threatfox_ioc_types": _json.dumps(["ip:port"]),
"threatfox_malware_families": _json.dumps(["Sliver"]),
}
payload = _iw._build_intel_event_payload(
"att-2", "203.0.113.7", row, [_FakeProvider()],
)
assert payload["abuseipdb_categories"] == [14, 18, 22]
assert payload["greynoise_tags"] == ["ssh_bruteforcer"]
assert payload["greynoise_name"] == "Mirai"
assert payload["feodo_malware_family"] == "Emotet"
assert payload["threatfox_threat_types"] == ["botnet_cc"]
assert payload["threatfox_ioc_types"] == ["ip:port"]
assert payload["threatfox_malware_families"] == ["Sliver"]
def test_build_intel_event_payload_tolerates_absent_columns() -> None:
"""A pre-enrichment row should produce a payload with empty lists
rather than raising — the IntelLifter contract is to absorb
absence silently."""
payload = _iw._build_intel_event_payload(
"att-3", "10.0.0.1", {}, [],
)
assert payload["abuseipdb_categories"] == []
assert payload["greynoise_tags"] == []
assert payload["threatfox_threat_types"] == []