refactor(intel): migrate AttackerIntel JSON-string columns to native SQLAlchemy JSON
Five list columns (greynoise_tags, abuseipdb_categories, threatfox_threat_types, threatfox_ioc_types, threatfox_malware_families) and four dict columns (*_raw) are now Column(JSON) with list/dict type annotations and default_factory=list/dict. Providers return native Python objects; the application-layer json.dumps/json.loads round-trip and _decode_json_list helpers are gone. to_intel_event_payload() reads columns directly. Also caps pytest xdist at -n 4 and excludes tests/api from norecursedirs to prevent schemathesis workers from OOM-killing the dev loop.
This commit is contained in:
@@ -56,7 +56,7 @@ async def test_match_returns_malicious(monkeypatch):
|
||||
result = await provider.lookup("1.2.3.4")
|
||||
assert result.verdict == "malicious"
|
||||
assert result.column_updates["threatfox_listed"] is True
|
||||
raw = json.loads(result.column_updates["threatfox_raw"])
|
||||
raw = result.column_updates["threatfox_raw"]
|
||||
assert raw[0]["malware"] == "Cobalt Strike"
|
||||
# No Auth-Key when none configured.
|
||||
assert "auth-key" not in {h.lower() for h in captured[0].headers}
|
||||
@@ -134,11 +134,9 @@ async def test_threat_types_and_ioc_types_flattened(monkeypatch):
|
||||
provider = ThreatFoxProvider()
|
||||
result = await provider.lookup("1.2.3.4")
|
||||
cu = result.column_updates
|
||||
assert json.loads(cu["threatfox_threat_types"]) == [
|
||||
"botnet_cc", "payload_delivery",
|
||||
]
|
||||
assert json.loads(cu["threatfox_ioc_types"]) == ["ip:port", "url"]
|
||||
assert json.loads(cu["threatfox_malware_families"]) == ["Emotet", "Sliver"]
|
||||
assert cu["threatfox_threat_types"] == ["botnet_cc", "payload_delivery"]
|
||||
assert cu["threatfox_ioc_types"] == ["ip:port", "url"]
|
||||
assert cu["threatfox_malware_families"] == ["Emotet", "Sliver"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
@@ -150,9 +148,9 @@ async def test_no_result_clears_taxonomy_columns():
|
||||
provider = ThreatFoxProvider()
|
||||
result = await provider.lookup("8.8.8.8")
|
||||
cu = result.column_updates
|
||||
assert cu["threatfox_threat_types"] == "[]"
|
||||
assert cu["threatfox_ioc_types"] == "[]"
|
||||
assert cu["threatfox_malware_families"] == "[]"
|
||||
assert cu["threatfox_threat_types"] == []
|
||||
assert cu["threatfox_ioc_types"] == []
|
||||
assert cu["threatfox_malware_families"] == []
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
|
||||
Reference in New Issue
Block a user