The 2026-05-02 ship-time audit of the R0054-R0058 intel rule pack found that AbuseIPDB / GreyNoise / ThreatFox stored only the aggregate verdict (score / classification / listed-bool) plus the raw response blob. The TTP IntelLifter expects per-provider taxonomy fields (categories, tags, threat_types) that were never populated, so R0054 / R0055 / R0057 emitted zero tags in production despite passing unit tests. Add typed columns: abuseipdb_categories, greynoise_tags, greynoise_name, feodo_malware_family, threatfox_threat_types, threatfox_ioc_types, threatfox_malware_families. Each provider now parses the relevant taxonomy out of the upstream response and writes it through column_updates. JSON-list columns ride as TEXT with default "[]" to keep the SQLite/MySQL backend split honest, deserialised back to native lists by the repo on read.
167 lines
5.5 KiB
Python
167 lines
5.5 KiB
Python
"""Unit tests for the abuse.ch ThreatFox provider."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
|
|
import httpx
|
|
import pytest
|
|
|
|
from decnet.intel.threatfox import ThreatFoxProvider
|
|
|
|
|
|
def _install_transport(handler) -> list[httpx.Request]:
|
|
captured: list[httpx.Request] = []
|
|
|
|
async def _wrapped(request: httpx.Request) -> httpx.Response:
|
|
captured.append(request)
|
|
return await handler(request)
|
|
|
|
transport = httpx.MockTransport(_wrapped)
|
|
from decnet.intel import threatfox as mod
|
|
|
|
def _factory():
|
|
return httpx.AsyncClient(
|
|
transport=transport,
|
|
headers={"User-Agent": "curl/7.88.1"},
|
|
)
|
|
|
|
mod.stealth_client = _factory # type: ignore[assignment]
|
|
return captured
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_match_returns_malicious(monkeypatch):
|
|
monkeypatch.delenv("DECNET_THREATFOX_API_KEY", raising=False)
|
|
|
|
async def handler(request: httpx.Request) -> httpx.Response:
|
|
body = json.loads(request.content.decode())
|
|
assert body == {"query": "search_ioc", "search_term": "1.2.3.4"}
|
|
return httpx.Response(
|
|
200,
|
|
json={
|
|
"query_status": "ok",
|
|
"data": [
|
|
{
|
|
"ioc": "1.2.3.4",
|
|
"ioc_type": "ip:port",
|
|
"malware": "Cobalt Strike",
|
|
"confidence_level": 80,
|
|
}
|
|
],
|
|
},
|
|
)
|
|
|
|
captured = _install_transport(handler)
|
|
provider = ThreatFoxProvider()
|
|
result = await provider.lookup("1.2.3.4")
|
|
assert result.verdict == "malicious"
|
|
assert result.column_updates["threatfox_listed"] is True
|
|
raw = json.loads(result.column_updates["threatfox_raw"])
|
|
assert raw[0]["malware"] == "Cobalt Strike"
|
|
# No Auth-Key when none configured.
|
|
assert "auth-key" not in {h.lower() for h in captured[0].headers}
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_auth_key_sent_when_configured(monkeypatch):
|
|
monkeypatch.setenv("DECNET_THREATFOX_API_KEY", "tfx-key")
|
|
|
|
async def handler(request: httpx.Request) -> httpx.Response:
|
|
return httpx.Response(200, json={"query_status": "no_result"})
|
|
|
|
captured = _install_transport(handler)
|
|
provider = ThreatFoxProvider()
|
|
await provider.lookup("8.8.8.8")
|
|
assert captured[0].headers["auth-key"] == "tfx-key"
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_no_result_caches_unlisted():
|
|
async def handler(request: httpx.Request) -> httpx.Response:
|
|
return httpx.Response(200, json={"query_status": "no_result"})
|
|
|
|
_install_transport(handler)
|
|
provider = ThreatFoxProvider()
|
|
result = await provider.lookup("8.8.8.8")
|
|
assert result.verdict is None
|
|
assert result.column_updates["threatfox_listed"] is False
|
|
assert result.error is None
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_unexpected_status_is_error():
|
|
async def handler(request: httpx.Request) -> httpx.Response:
|
|
return httpx.Response(200, json={"query_status": "illegal_search"})
|
|
|
|
_install_transport(handler)
|
|
provider = ThreatFoxProvider()
|
|
result = await provider.lookup("oops")
|
|
assert result.error and "illegal_search" in result.error
|
|
assert result.column_updates == {}
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_threat_types_and_ioc_types_flattened(monkeypatch):
|
|
"""Post-2026-05-02 audit: provider must extract the union of
|
|
``threat_type`` / ``ioc_type`` / ``malware`` across all matches.
|
|
The IntelLifter dispatches ATT&CK on threat_type."""
|
|
monkeypatch.delenv("DECNET_THREATFOX_API_KEY", raising=False)
|
|
|
|
async def handler(request: httpx.Request) -> httpx.Response:
|
|
return httpx.Response(
|
|
200,
|
|
json={"query_status": "ok", "data": [
|
|
{
|
|
"ioc_type": "ip:port",
|
|
"threat_type": "botnet_cc",
|
|
"malware": "Sliver",
|
|
},
|
|
{
|
|
"ioc_type": "url",
|
|
"threat_type": "payload_delivery",
|
|
"malware_printable": "Emotet",
|
|
},
|
|
{
|
|
"ioc_type": "ip:port", # duplicate, dedup'd
|
|
"threat_type": "botnet_cc", # duplicate
|
|
"malware": "Sliver", # duplicate
|
|
},
|
|
"not a dict — silently skipped",
|
|
]},
|
|
)
|
|
|
|
_install_transport(handler)
|
|
provider = ThreatFoxProvider()
|
|
result = await provider.lookup("1.2.3.4")
|
|
cu = result.column_updates
|
|
assert json.loads(cu["threatfox_threat_types"]) == [
|
|
"botnet_cc", "payload_delivery",
|
|
]
|
|
assert json.loads(cu["threatfox_ioc_types"]) == ["ip:port", "url"]
|
|
assert json.loads(cu["threatfox_malware_families"]) == ["Emotet", "Sliver"]
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_no_result_clears_taxonomy_columns():
|
|
async def handler(request: httpx.Request) -> httpx.Response:
|
|
return httpx.Response(200, json={"query_status": "no_result"})
|
|
|
|
_install_transport(handler)
|
|
provider = ThreatFoxProvider()
|
|
result = await provider.lookup("8.8.8.8")
|
|
cu = result.column_updates
|
|
assert cu["threatfox_threat_types"] == "[]"
|
|
assert cu["threatfox_ioc_types"] == "[]"
|
|
assert cu["threatfox_malware_families"] == "[]"
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_http_error_surfaces():
|
|
async def handler(request: httpx.Request) -> httpx.Response:
|
|
return httpx.Response(502)
|
|
|
|
_install_transport(handler)
|
|
provider = ThreatFoxProvider()
|
|
result = await provider.lookup("1.1.1.1")
|
|
assert result.error == "HTTP 502"
|