From cd70136d09ff2b15163766116d5c9e5150bf9b1d Mon Sep 17 00:00:00 2001 From: anti Date: Sun, 26 Apr 2026 05:15:17 -0400 Subject: [PATCH] feat(intel): wire GreyNoise, AbuseIPDB, Feodo Tracker + ThreatFox MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Four concrete IntelProvider impls — three per-IP queries plus one bulk feed: * GreyNoiseProvider — community endpoint, optional API key for higher rate limit. 404 = unknown (cache the absence so we don't re-query). * AbuseIPDBProvider — score threshold mapping (>=75 malicious, >=25 suspicious, else benign). Self-disables with a clear error when no API key is configured rather than burning quota. * FeodoProvider — fetches the bulk botnet C2 IP feed once per refresh window and answers every lookup from an in-memory set. Listed = C2. * ThreatFoxProvider — POST /api/v1/ search_ioc query, optional Auth-Key header. Match in data[] = malicious; no_result = absence-not-benign. Every provider routes through decnet.net.http.stealth_client so the egress UA never leaks 'DECNET'. --- decnet/intel/abuseipdb.py | 104 ++++++++++++++++++++++++++ decnet/intel/feodo.py | 108 +++++++++++++++++++++++++++ decnet/intel/greynoise.py | 107 ++++++++++++++++++++++++++ decnet/intel/threatfox.py | 94 +++++++++++++++++++++++ tests/intel/test_abuseipdb.py | 109 +++++++++++++++++++++++++++ tests/intel/test_feodo.py | 99 +++++++++++++++++++++++++ tests/intel/test_greynoise.py | 136 ++++++++++++++++++++++++++++++++++ tests/intel/test_threatfox.py | 111 +++++++++++++++++++++++++++ 8 files changed, 868 insertions(+) create mode 100644 decnet/intel/abuseipdb.py create mode 100644 decnet/intel/feodo.py create mode 100644 decnet/intel/greynoise.py create mode 100644 decnet/intel/threatfox.py create mode 100644 tests/intel/test_abuseipdb.py create mode 100644 tests/intel/test_feodo.py create mode 100644 tests/intel/test_greynoise.py create mode 100644 tests/intel/test_threatfox.py diff --git a/decnet/intel/abuseipdb.py b/decnet/intel/abuseipdb.py new file mode 100644 index 00000000..a099c4c5 --- /dev/null +++ b/decnet/intel/abuseipdb.py @@ -0,0 +1,104 @@ +"""AbuseIPDB provider. + +Endpoint: ``GET https://api.abuseipdb.com/api/v2/check`` + +Free tier: 1000 lookups/day. Always requires an API key passed in the +``Key`` header — the provider self-disables (returns an error) when no +key is configured rather than burning quota at the free public IP. + +Verdict mapping is tier-based on the ``abuseConfidenceScore`` (0–100): + +* ``>= 75`` — ``malicious`` +* ``25..74`` — ``suspicious`` +* ``< 25`` — ``benign`` + +This matches AbuseIPDB's own UI thresholds reasonably closely; tune +later if operators report drift. +""" +from __future__ import annotations + +import json +import os +from datetime import datetime, timezone +from typing import Optional + +from decnet.intel.base import IntelProvider, IntelResult +from decnet.logging import get_logger +from decnet.net.http import stealth_client + +log = get_logger("intel.abuseipdb") + +_ENDPOINT = "https://api.abuseipdb.com/api/v2/check" +_DEFAULT_MAX_AGE_DAYS = 30 + + +def _score_to_verdict(score: int) -> str: + if score >= 75: + return "malicious" + if score >= 25: + return "suspicious" + return "benign" + + +class AbuseIPDBProvider(IntelProvider): + name = "abuseipdb" + concurrency = 4 + # 1000/day = avg 1 every ~86s. We don't enforce the daily cap here — + # operators who burn it through the worker will see HTTP 429 and the + # row gets retried after the TTL window. + min_dispatch_interval_s = 0.5 + + def __init__( + self, + *, + api_key: Optional[str] = None, + max_age_days: int = _DEFAULT_MAX_AGE_DAYS, + ) -> None: + super().__init__() + self._api_key = api_key or os.environ.get( + "DECNET_ABUSEIPDB_API_KEY" + ) or None + self._max_age_days = max_age_days + + async def lookup(self, ip: str) -> IntelResult: + if not self._api_key: + return IntelResult( + provider=self.name, + error="DECNET_ABUSEIPDB_API_KEY not configured", + ) + params = { + "ipAddress": ip, + "maxAgeInDays": str(self._max_age_days), + } + headers = { + "Key": self._api_key, + "Accept": "application/json", + } + try: + async with stealth_client() as client: + resp = await client.get(_ENDPOINT, headers=headers, params=params) + except Exception as exc: # noqa: BLE001 + return IntelResult(provider=self.name, error=f"network: {exc}") + + if resp.status_code != 200: + return IntelResult( + provider=self.name, + error=f"HTTP {resp.status_code}", + ) + try: + payload = resp.json() + except Exception as exc: # noqa: BLE001 + return IntelResult(provider=self.name, error=f"parse: {exc}") + + data = payload.get("data") or {} + score = int(data.get("abuseConfidenceScore") or 0) + verdict = _score_to_verdict(score) + return IntelResult( + provider=self.name, + verdict=verdict, + column_updates={ + "abuseipdb_score": score, + "abuseipdb_raw": json.dumps(data), + "abuseipdb_queried_at": datetime.now(timezone.utc), + }, + ) diff --git a/decnet/intel/feodo.py b/decnet/intel/feodo.py new file mode 100644 index 00000000..284dd9d1 --- /dev/null +++ b/decnet/intel/feodo.py @@ -0,0 +1,108 @@ +"""abuse.ch Feodo Tracker provider — bulk JSON botnet C2 feed. + +Endpoint: ``GET https://feodotracker.abuse.ch/downloads/ipblocklist.json`` + +This is the only provider in the v1 set that uses a *bulk* feed instead +of a per-IP query: the upstream is a list of every botnet C2 IP abuse.ch +has seen recently (Emotet, TrickBot, Dridex, etc.), refreshed every few +minutes. We fetch the full list once per ``refresh_interval_s`` and +answer ``lookup(ip)`` calls from the in-process set. + +This makes Feodo Tracker effectively free at the call-site: thousands +of attacker IPs map to a single network round-trip per refresh window. +""" +from __future__ import annotations + +import json +import time +from datetime import datetime, timezone +from typing import Any, Optional + +from decnet.intel.base import IntelProvider, IntelResult +from decnet.logging import get_logger +from decnet.net.http import stealth_client + +log = get_logger("intel.feodo") + +_ENDPOINT = "https://feodotracker.abuse.ch/downloads/ipblocklist.json" +_DEFAULT_REFRESH_S = 3600.0 + + +class FeodoProvider(IntelProvider): + name = "feodo" + concurrency = 1 # only one concurrent refresh; lookups are pure set ops + min_dispatch_interval_s = 0.0 + + def __init__(self, *, refresh_interval_s: float = _DEFAULT_REFRESH_S) -> None: + super().__init__() + self._refresh_interval_s = refresh_interval_s + # ip → upstream record dict, keyed by ``ip_address``. + self._index: dict[str, dict[str, Any]] = {} + self._loaded_at: float = 0.0 + self._last_error: Optional[str] = None + + async def _refresh(self) -> Optional[str]: + """Refetch the bulk feed. Returns an error string or ``None``.""" + try: + async with stealth_client(timeout=20.0) as client: + resp = await client.get(_ENDPOINT) + except Exception as exc: # noqa: BLE001 + return f"network: {exc}" + if resp.status_code != 200: + return f"HTTP {resp.status_code}" + try: + payload = resp.json() + except Exception as exc: # noqa: BLE001 + return f"parse: {exc}" + if not isinstance(payload, list): + return "feed: not a list" + + new_index: dict[str, dict[str, Any]] = {} + for entry in payload: + if not isinstance(entry, dict): + continue + ip = entry.get("ip_address") + if isinstance(ip, str): + new_index[ip] = entry + self._index = new_index + self._loaded_at = time.monotonic() + self._last_error = None + log.info("feodo: refreshed bulk feed entries=%d", len(new_index)) + return None + + async def _ensure_fresh(self) -> None: + if ( + not self._index + or (time.monotonic() - self._loaded_at) >= self._refresh_interval_s + ): + err = await self._refresh() + if err: + self._last_error = err + + async def lookup(self, ip: str) -> IntelResult: + await self._ensure_fresh() + if not self._index and self._last_error: + return IntelResult(provider=self.name, error=self._last_error) + + entry = self._index.get(ip) + if entry is None: + # Not on the C2 list — explicit benign-ish signal. Cache it + # so we don't keep checking the same set on every wake. + return IntelResult( + provider=self.name, + verdict=None, # absence ≠ "benign", let other providers speak + column_updates={ + "feodo_listed": False, + "feodo_raw": "{}", + "feodo_queried_at": datetime.now(timezone.utc), + }, + ) + return IntelResult( + provider=self.name, + verdict="malicious", + column_updates={ + "feodo_listed": True, + "feodo_raw": json.dumps(entry), + "feodo_queried_at": datetime.now(timezone.utc), + }, + ) diff --git a/decnet/intel/greynoise.py b/decnet/intel/greynoise.py new file mode 100644 index 00000000..b702c311 --- /dev/null +++ b/decnet/intel/greynoise.py @@ -0,0 +1,107 @@ +"""GreyNoise Community API provider. + +Endpoint: ``GET https://api.greynoise.io/v3/community/`` + +The Community endpoint requires no API key for low-volume use; an +optional ``DECNET_GREYNOISE_API_KEY`` lifts the rate limit. We always +send the key when present. + +Response shape (relevant fields):: + + { + "ip": "1.2.3.4", + "noise": true, // observed scanning the public internet + "riot": false, // member of the "Rule It Out" benign set + "classification": "benign | malicious | unknown", + "name": "Censys", // tool/operator label, when known + "link": "https://...", + "last_seen": "2026-04-25" + } + +Status code semantics: +* 200 — IP found, JSON body as above +* 404 — IP not observed by GreyNoise (treat as ``"unknown"``, not error) +* 429 — rate-limited (treat as transient error) +""" +from __future__ import annotations + +import json +import os +from datetime import datetime, timezone +from typing import Optional + +from decnet.intel.base import IntelProvider, IntelResult +from decnet.logging import get_logger +from decnet.net.http import stealth_client + +log = get_logger("intel.greynoise") + +_ENDPOINT = "https://api.greynoise.io/v3/community/{ip}" + + +class GreyNoiseProvider(IntelProvider): + name = "greynoise" + concurrency = 4 + # Community tier is ~50/min; ~1.5s between dispatches keeps us well + # under that without serialising entirely. + min_dispatch_interval_s = 1.5 + + def __init__(self, *, api_key: Optional[str] = None) -> None: + super().__init__() + self._api_key = api_key or os.environ.get( + "DECNET_GREYNOISE_API_KEY" + ) or None + + async def lookup(self, ip: str) -> IntelResult: + url = _ENDPOINT.format(ip=ip) + headers = {"Accept": "application/json"} + if self._api_key: + headers["key"] = self._api_key + try: + async with stealth_client() as client: + resp = await client.get(url, headers=headers) + except Exception as exc: # noqa: BLE001 + return IntelResult(provider=self.name, error=f"network: {exc}") + + if resp.status_code == 404: + # IP not in GreyNoise's view of the internet — record the row + # so we don't keep re-querying within the TTL window. + return IntelResult( + provider=self.name, + verdict="unknown", + column_updates={ + "greynoise_classification": "unknown", + "greynoise_raw": json.dumps({"message": "not seen"}), + "greynoise_queried_at": datetime.now(timezone.utc), + }, + ) + if resp.status_code != 200: + return IntelResult( + provider=self.name, + error=f"HTTP {resp.status_code}", + ) + + try: + data = resp.json() + except Exception as exc: # noqa: BLE001 + return IntelResult(provider=self.name, error=f"parse: {exc}") + + classification = (data.get("classification") or "unknown").lower() + verdict = _CLASSIFICATION_TO_VERDICT.get(classification, "unknown") + return IntelResult( + provider=self.name, + verdict=verdict, + column_updates={ + "greynoise_classification": classification, + "greynoise_raw": json.dumps(data), + "greynoise_queried_at": datetime.now(timezone.utc), + }, + ) + + +_CLASSIFICATION_TO_VERDICT = { + "malicious": "malicious", + "suspicious": "suspicious", + "benign": "benign", + "unknown": "unknown", +} diff --git a/decnet/intel/threatfox.py b/decnet/intel/threatfox.py new file mode 100644 index 00000000..17bdb787 --- /dev/null +++ b/decnet/intel/threatfox.py @@ -0,0 +1,94 @@ +"""abuse.ch ThreatFox provider — per-IOC query API. + +Endpoint: ``POST https://threatfox-api.abuse.ch/api/v1/`` + +ThreatFox returns IOC matches across many types (URL, domain, IP, hash). +We send ``{"query": "search_ioc", "search_term": ""}`` and treat any +non-empty ``data`` array as a malicious match. + +API key handling: ThreatFox accepts an optional ``Auth-Key`` header for +higher rate limits. Without a key the public endpoint still answers but +caps requests/min — the provider works either way. +""" +from __future__ import annotations + +import json +import os +from datetime import datetime, timezone +from typing import Optional + +from decnet.intel.base import IntelProvider, IntelResult +from decnet.logging import get_logger +from decnet.net.http import stealth_client + +log = get_logger("intel.threatfox") + +_ENDPOINT = "https://threatfox-api.abuse.ch/api/v1/" + + +class ThreatFoxProvider(IntelProvider): + name = "threatfox" + concurrency = 4 + min_dispatch_interval_s = 0.5 + + def __init__(self, *, api_key: Optional[str] = None) -> None: + super().__init__() + self._api_key = api_key or os.environ.get( + "DECNET_THREATFOX_API_KEY" + ) or None + + async def lookup(self, ip: str) -> IntelResult: + body = {"query": "search_ioc", "search_term": ip} + headers = {"Accept": "application/json"} + if self._api_key: + headers["Auth-Key"] = self._api_key + + try: + async with stealth_client() as client: + resp = await client.post( + _ENDPOINT, headers=headers, json=body, + ) + except Exception as exc: # noqa: BLE001 + return IntelResult(provider=self.name, error=f"network: {exc}") + + if resp.status_code != 200: + return IntelResult( + provider=self.name, error=f"HTTP {resp.status_code}", + ) + try: + payload = resp.json() + except Exception as exc: # noqa: BLE001 + return IntelResult(provider=self.name, error=f"parse: {exc}") + + status = payload.get("query_status") + # ThreatFox returns query_status="no_result" when the IOC isn't + # tracked, and query_status="ok" with a non-empty data list when + # it is. Anything else (illegal_search, etc.) is a contract + # violation we surface as an error. + if status == "no_result": + return IntelResult( + provider=self.name, + verdict=None, # absence is not a benign signal + column_updates={ + "threatfox_listed": False, + "threatfox_raw": "{}", + "threatfox_queried_at": datetime.now(timezone.utc), + }, + ) + if status != "ok": + return IntelResult( + provider=self.name, + error=f"query_status={status!r}", + ) + + data = payload.get("data") or [] + listed = bool(data) + return IntelResult( + provider=self.name, + verdict="malicious" if listed else None, + column_updates={ + "threatfox_listed": listed, + "threatfox_raw": json.dumps(data), + "threatfox_queried_at": datetime.now(timezone.utc), + }, + ) diff --git a/tests/intel/test_abuseipdb.py b/tests/intel/test_abuseipdb.py new file mode 100644 index 00000000..20180da7 --- /dev/null +++ b/tests/intel/test_abuseipdb.py @@ -0,0 +1,109 @@ +"""Unit tests for the AbuseIPDB provider.""" +from __future__ import annotations + +import json + +import httpx +import pytest + +from decnet.intel.abuseipdb import AbuseIPDBProvider, _score_to_verdict + + +def _install_transport(handler) -> list[httpx.Request]: + captured: list[httpx.Request] = [] + + async def _wrapped(request: httpx.Request) -> httpx.Response: + captured.append(request) + return await handler(request) + + transport = httpx.MockTransport(_wrapped) + from decnet.intel import abuseipdb as mod + + def _factory(): + return httpx.AsyncClient( + transport=transport, + headers={"User-Agent": "curl/7.88.1"}, + ) + + mod.stealth_client = _factory # type: ignore[assignment] + return captured + + +def test_score_thresholds(): + assert _score_to_verdict(0) == "benign" + assert _score_to_verdict(24) == "benign" + assert _score_to_verdict(25) == "suspicious" + assert _score_to_verdict(74) == "suspicious" + assert _score_to_verdict(75) == "malicious" + assert _score_to_verdict(100) == "malicious" + + +@pytest.mark.anyio +async def test_missing_api_key_returns_error_no_egress(monkeypatch): + monkeypatch.delenv("DECNET_ABUSEIPDB_API_KEY", raising=False) + captured = _install_transport( + lambda r: (_ for _ in ()).throw(AssertionError("must not egress")) + ) + provider = AbuseIPDBProvider() + result = await provider.lookup("1.2.3.4") + assert result.error == "DECNET_ABUSEIPDB_API_KEY not configured" + assert result.column_updates == {} + assert captured == [] # no request made + + +@pytest.mark.anyio +async def test_high_score_maps_to_malicious(monkeypatch): + monkeypatch.setenv("DECNET_ABUSEIPDB_API_KEY", "k3y") + + async def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response( + 200, + json={"data": { + "ipAddress": "1.2.3.4", + "abuseConfidenceScore": 92, + "totalReports": 41, + "countryCode": "RU", + }}, + ) + + captured = _install_transport(handler) + provider = AbuseIPDBProvider() + result = await provider.lookup("1.2.3.4") + assert result.verdict == "malicious" + assert result.column_updates["abuseipdb_score"] == 92 + raw = json.loads(result.column_updates["abuseipdb_raw"]) + assert raw["countryCode"] == "RU" + # Key header sent, query params correct. + req = captured[0] + assert req.headers["key"] == "k3y" + assert "ipAddress=1.2.3.4" in str(req.url) + + +@pytest.mark.anyio +async def test_low_score_maps_to_benign(monkeypatch): + monkeypatch.setenv("DECNET_ABUSEIPDB_API_KEY", "k3y") + + async def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response( + 200, json={"data": {"abuseConfidenceScore": 0}}, + ) + + _install_transport(handler) + provider = AbuseIPDBProvider() + result = await provider.lookup("8.8.8.8") + assert result.verdict == "benign" + assert result.column_updates["abuseipdb_score"] == 0 + + +@pytest.mark.anyio +async def test_429_returns_error(monkeypatch): + monkeypatch.setenv("DECNET_ABUSEIPDB_API_KEY", "k3y") + + async def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(429) + + _install_transport(handler) + provider = AbuseIPDBProvider() + result = await provider.lookup("1.1.1.1") + assert result.error == "HTTP 429" + assert result.column_updates == {} diff --git a/tests/intel/test_feodo.py b/tests/intel/test_feodo.py new file mode 100644 index 00000000..049392c7 --- /dev/null +++ b/tests/intel/test_feodo.py @@ -0,0 +1,99 @@ +"""Unit tests for the abuse.ch Feodo Tracker provider. + +Bulk-feed semantics: one HTTP fetch loads the in-memory set, all +subsequent ``lookup`` calls hit memory. We assert: + +* a fresh provider triggers exactly one refresh, then answers from cache +* a listed IP returns verdict='malicious' with the upstream record +* an unlisted IP returns verdict=None (absence ≠ benign) +* a feed fetch failure is reported as an error, not silently swallowed +""" +from __future__ import annotations + +import json + +import httpx +import pytest + +from decnet.intel.feodo import FeodoProvider + + +def _install_transport(handler) -> list[httpx.Request]: + captured: list[httpx.Request] = [] + + async def _wrapped(request: httpx.Request) -> httpx.Response: + captured.append(request) + return await handler(request) + + transport = httpx.MockTransport(_wrapped) + from decnet.intel import feodo as mod + + def _factory(*, timeout: float = 20.0): + return httpx.AsyncClient( + transport=transport, + headers={"User-Agent": "curl/7.88.1"}, + timeout=timeout, + ) + + mod.stealth_client = _factory # type: ignore[assignment] + return captured + + +_FEED = [ + {"ip_address": "9.9.9.9", "port": 443, "malware": "TrickBot"}, + {"ip_address": "10.10.10.10", "port": 80, "malware": "Emotet"}, +] + + +@pytest.mark.anyio +async def test_listed_ip_yields_malicious_verdict(): + async def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, json=_FEED) + + captured = _install_transport(handler) + provider = FeodoProvider(refresh_interval_s=999.0) + + result = await provider.lookup("9.9.9.9") + assert result.verdict == "malicious" + assert result.column_updates["feodo_listed"] is True + raw = json.loads(result.column_updates["feodo_raw"]) + assert raw["malware"] == "TrickBot" + assert len(captured) == 1 + + +@pytest.mark.anyio +async def test_subsequent_lookups_dont_refetch(): + async def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, json=_FEED) + + captured = _install_transport(handler) + provider = FeodoProvider(refresh_interval_s=999.0) + + await provider.lookup("9.9.9.9") + await provider.lookup("10.10.10.10") + await provider.lookup("not-listed.example") + assert len(captured) == 1 # one refresh, three answers + + +@pytest.mark.anyio +async def test_unlisted_ip_returns_no_verdict(): + async def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, json=_FEED) + + _install_transport(handler) + provider = FeodoProvider(refresh_interval_s=999.0) + result = await provider.lookup("1.2.3.4") + assert result.verdict is None + assert result.column_updates["feodo_listed"] is False + + +@pytest.mark.anyio +async def test_feed_failure_reports_error(): + async def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(503) + + _install_transport(handler) + provider = FeodoProvider(refresh_interval_s=999.0) + result = await provider.lookup("1.2.3.4") + assert result.error == "HTTP 503" + assert result.column_updates == {} diff --git a/tests/intel/test_greynoise.py b/tests/intel/test_greynoise.py new file mode 100644 index 00000000..8e2cdea4 --- /dev/null +++ b/tests/intel/test_greynoise.py @@ -0,0 +1,136 @@ +"""Unit tests for the GreyNoise Community provider. + +Mocks httpx via ``MockTransport`` and asserts: + +* request URL + headers (API key when present, none when absent) +* malicious / benign / suspicious classification → verdict mapping +* 404 → verdict='unknown' with no error (cache the absence) +* non-200/404 → error populated, no column writes +* network exception → error populated +* the row never advertises DECNET in the egress UA +""" +from __future__ import annotations + +import json + +import httpx +import pytest + +from decnet.intel.greynoise import GreyNoiseProvider + + +def _install_transport(provider: GreyNoiseProvider, handler) -> list[httpx.Request]: + """Patch ``stealth_client`` so it returns a client wired to ``handler``.""" + captured: list[httpx.Request] = [] + + async def _wrapped(request: httpx.Request) -> httpx.Response: + captured.append(request) + return await handler(request) + + transport = httpx.MockTransport(_wrapped) + + from decnet.intel import greynoise as gn_mod + + def _factory(): + return httpx.AsyncClient( + transport=transport, + headers={"User-Agent": "curl/7.88.1"}, + ) + + gn_mod.stealth_client = _factory # type: ignore[assignment] + return captured + + +@pytest.mark.anyio +async def test_malicious_classification_maps_to_verdict(): + async def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response( + 200, + json={ + "ip": "1.2.3.4", + "noise": True, + "classification": "malicious", + "name": "Mirai-like", + }, + ) + + provider = GreyNoiseProvider() + captured = _install_transport(provider, handler) + + result = await provider.lookup("1.2.3.4") + assert result.error is None + assert result.verdict == "malicious" + assert result.column_updates["greynoise_classification"] == "malicious" + raw = json.loads(result.column_updates["greynoise_raw"]) + assert raw["name"] == "Mirai-like" + assert "1.2.3.4" in str(captured[0].url) + # No DECNET label leaks in the UA. + assert "decnet" not in captured[0].headers["user-agent"].lower() + + +@pytest.mark.anyio +async def test_api_key_is_sent_when_configured(monkeypatch): + monkeypatch.setenv("DECNET_GREYNOISE_API_KEY", "k3y-abc") + + async def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, json={"classification": "benign"}) + + provider = GreyNoiseProvider() + captured = _install_transport(provider, handler) + + await provider.lookup("8.8.8.8") + assert captured[0].headers.get("key") == "k3y-abc" + + +@pytest.mark.anyio +async def test_no_api_key_means_no_header(monkeypatch): + monkeypatch.delenv("DECNET_GREYNOISE_API_KEY", raising=False) + + async def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, json={"classification": "benign"}) + + provider = GreyNoiseProvider() + captured = _install_transport(provider, handler) + + await provider.lookup("8.8.8.8") + assert "key" not in captured[0].headers + + +@pytest.mark.anyio +async def test_404_caches_unknown_without_error(): + async def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(404, json={"message": "IP not observed"}) + + provider = GreyNoiseProvider() + _install_transport(provider, handler) + + result = await provider.lookup("10.0.0.5") + assert result.error is None + assert result.verdict == "unknown" + assert result.column_updates["greynoise_classification"] == "unknown" + + +@pytest.mark.anyio +async def test_429_returns_error_no_writes(): + async def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(429) + + provider = GreyNoiseProvider() + _install_transport(provider, handler) + + result = await provider.lookup("1.1.1.1") + assert result.error == "HTTP 429" + assert result.column_updates == {} + + +@pytest.mark.anyio +async def test_network_failure_becomes_error(): + async def handler(request: httpx.Request) -> httpx.Response: + raise httpx.ConnectError("upstream unreachable") + + provider = GreyNoiseProvider() + _install_transport(provider, handler) + + result = await provider.lookup("1.1.1.1") + assert result.error and result.error.startswith("network:") + assert result.column_updates == {} diff --git a/tests/intel/test_threatfox.py b/tests/intel/test_threatfox.py new file mode 100644 index 00000000..45b29e61 --- /dev/null +++ b/tests/intel/test_threatfox.py @@ -0,0 +1,111 @@ +"""Unit tests for the abuse.ch ThreatFox provider.""" +from __future__ import annotations + +import json + +import httpx +import pytest + +from decnet.intel.threatfox import ThreatFoxProvider + + +def _install_transport(handler) -> list[httpx.Request]: + captured: list[httpx.Request] = [] + + async def _wrapped(request: httpx.Request) -> httpx.Response: + captured.append(request) + return await handler(request) + + transport = httpx.MockTransport(_wrapped) + from decnet.intel import threatfox as mod + + def _factory(): + return httpx.AsyncClient( + transport=transport, + headers={"User-Agent": "curl/7.88.1"}, + ) + + mod.stealth_client = _factory # type: ignore[assignment] + return captured + + +@pytest.mark.anyio +async def test_match_returns_malicious(monkeypatch): + monkeypatch.delenv("DECNET_THREATFOX_API_KEY", raising=False) + + async def handler(request: httpx.Request) -> httpx.Response: + body = json.loads(request.content.decode()) + assert body == {"query": "search_ioc", "search_term": "1.2.3.4"} + return httpx.Response( + 200, + json={ + "query_status": "ok", + "data": [ + { + "ioc": "1.2.3.4", + "ioc_type": "ip:port", + "malware": "Cobalt Strike", + "confidence_level": 80, + } + ], + }, + ) + + captured = _install_transport(handler) + provider = ThreatFoxProvider() + result = await provider.lookup("1.2.3.4") + assert result.verdict == "malicious" + assert result.column_updates["threatfox_listed"] is True + raw = json.loads(result.column_updates["threatfox_raw"]) + assert raw[0]["malware"] == "Cobalt Strike" + # No Auth-Key when none configured. + assert "auth-key" not in {h.lower() for h in captured[0].headers} + + +@pytest.mark.anyio +async def test_auth_key_sent_when_configured(monkeypatch): + monkeypatch.setenv("DECNET_THREATFOX_API_KEY", "tfx-key") + + async def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, json={"query_status": "no_result"}) + + captured = _install_transport(handler) + provider = ThreatFoxProvider() + await provider.lookup("8.8.8.8") + assert captured[0].headers["auth-key"] == "tfx-key" + + +@pytest.mark.anyio +async def test_no_result_caches_unlisted(): + async def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, json={"query_status": "no_result"}) + + _install_transport(handler) + provider = ThreatFoxProvider() + result = await provider.lookup("8.8.8.8") + assert result.verdict is None + assert result.column_updates["threatfox_listed"] is False + assert result.error is None + + +@pytest.mark.anyio +async def test_unexpected_status_is_error(): + async def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, json={"query_status": "illegal_search"}) + + _install_transport(handler) + provider = ThreatFoxProvider() + result = await provider.lookup("oops") + assert result.error and "illegal_search" in result.error + assert result.column_updates == {} + + +@pytest.mark.anyio +async def test_http_error_surfaces(): + async def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(502) + + _install_transport(handler) + provider = ThreatFoxProvider() + result = await provider.lookup("1.1.1.1") + assert result.error == "HTTP 502"