Files
DECNET/decnet/intel/abuseipdb.py
anti cd70136d09 feat(intel): wire GreyNoise, AbuseIPDB, Feodo Tracker + ThreatFox
Four concrete IntelProvider impls — three per-IP queries plus one bulk
feed:

* GreyNoiseProvider — community endpoint, optional API key for higher
  rate limit. 404 = unknown (cache the absence so we don't re-query).
* AbuseIPDBProvider — score threshold mapping (>=75 malicious, >=25
  suspicious, else benign). Self-disables with a clear error when no
  API key is configured rather than burning quota.
* FeodoProvider — fetches the bulk botnet C2 IP feed once per refresh
  window and answers every lookup from an in-memory set. Listed = C2.
* ThreatFoxProvider — POST /api/v1/ search_ioc query, optional Auth-Key
  header. Match in data[] = malicious; no_result = absence-not-benign.

Every provider routes through decnet.net.http.stealth_client so the
egress UA never leaks 'DECNET'.
2026-04-26 05:15:17 -04:00

105 lines
3.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""AbuseIPDB provider.
Endpoint: ``GET https://api.abuseipdb.com/api/v2/check``
Free tier: 1000 lookups/day. Always requires an API key passed in the
``Key`` header — the provider self-disables (returns an error) when no
key is configured rather than burning quota at the free public IP.
Verdict mapping is tier-based on the ``abuseConfidenceScore`` (0100):
* ``>= 75`` — ``malicious``
* ``25..74`` — ``suspicious``
* ``< 25`` — ``benign``
This matches AbuseIPDB's own UI thresholds reasonably closely; tune
later if operators report drift.
"""
from __future__ import annotations
import json
import os
from datetime import datetime, timezone
from typing import Optional
from decnet.intel.base import IntelProvider, IntelResult
from decnet.logging import get_logger
from decnet.net.http import stealth_client
log = get_logger("intel.abuseipdb")
_ENDPOINT = "https://api.abuseipdb.com/api/v2/check"
_DEFAULT_MAX_AGE_DAYS = 30
def _score_to_verdict(score: int) -> str:
if score >= 75:
return "malicious"
if score >= 25:
return "suspicious"
return "benign"
class AbuseIPDBProvider(IntelProvider):
name = "abuseipdb"
concurrency = 4
# 1000/day = avg 1 every ~86s. We don't enforce the daily cap here —
# operators who burn it through the worker will see HTTP 429 and the
# row gets retried after the TTL window.
min_dispatch_interval_s = 0.5
def __init__(
self,
*,
api_key: Optional[str] = None,
max_age_days: int = _DEFAULT_MAX_AGE_DAYS,
) -> None:
super().__init__()
self._api_key = api_key or os.environ.get(
"DECNET_ABUSEIPDB_API_KEY"
) or None
self._max_age_days = max_age_days
async def lookup(self, ip: str) -> IntelResult:
if not self._api_key:
return IntelResult(
provider=self.name,
error="DECNET_ABUSEIPDB_API_KEY not configured",
)
params = {
"ipAddress": ip,
"maxAgeInDays": str(self._max_age_days),
}
headers = {
"Key": self._api_key,
"Accept": "application/json",
}
try:
async with stealth_client() as client:
resp = await client.get(_ENDPOINT, headers=headers, params=params)
except Exception as exc: # noqa: BLE001
return IntelResult(provider=self.name, error=f"network: {exc}")
if resp.status_code != 200:
return IntelResult(
provider=self.name,
error=f"HTTP {resp.status_code}",
)
try:
payload = resp.json()
except Exception as exc: # noqa: BLE001
return IntelResult(provider=self.name, error=f"parse: {exc}")
data = payload.get("data") or {}
score = int(data.get("abuseConfidenceScore") or 0)
verdict = _score_to_verdict(score)
return IntelResult(
provider=self.name,
verdict=verdict,
column_updates={
"abuseipdb_score": score,
"abuseipdb_raw": json.dumps(data),
"abuseipdb_queried_at": datetime.now(timezone.utc),
},
)