Files
DECNET/decnet/intel/abuseipdb.py

105 lines
3.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""AbuseIPDB provider.
Endpoint: ``GET https://api.abuseipdb.com/api/v2/check``
Free tier: 1000 lookups/day. Always requires an API key passed in the
``Key`` header — the provider self-disables (returns an error) when no
key is configured rather than burning quota at the free public IP.
Verdict mapping is tier-based on the ``abuseConfidenceScore`` (0100):
* ``>= 75`` — ``malicious``
* ``25..74`` — ``suspicious``
* ``< 25`` — ``benign``
This matches AbuseIPDB's own UI thresholds reasonably closely; tune
later if operators report drift.
"""
from __future__ import annotations
import json
import os
from datetime import datetime, timezone
from typing import Optional
from decnet.intel.base import IntelProvider, IntelResult
from decnet.logging import get_logger
from decnet.net.http import stealth_client
log = get_logger("intel.abuseipdb")
_ENDPOINT = "https://api.abuseipdb.com/api/v2/check"
_DEFAULT_MAX_AGE_DAYS = 30
def _score_to_verdict(score: int) -> str:
if score >= 75:
return "malicious"
if score >= 25:
return "suspicious"
return "benign"
class AbuseIPDBProvider(IntelProvider):
name = "abuseipdb"
concurrency = 4
# 1000/day = avg 1 every ~86s. We don't enforce the daily cap here —
# operators who burn it through the worker will see HTTP 429 and the
# row gets retried after the TTL window.
min_dispatch_interval_s = 0.5
def __init__(
self,
*,
api_key: Optional[str] = None,
max_age_days: int = _DEFAULT_MAX_AGE_DAYS,
) -> None:
super().__init__()
self._api_key = api_key or os.environ.get(
"DECNET_ABUSEIPDB_API_KEY"
) or None
self._max_age_days = max_age_days
async def lookup(self, ip: str) -> IntelResult:
if not self._api_key:
return IntelResult(
provider=self.name,
error="DECNET_ABUSEIPDB_API_KEY not configured",
)
params = {
"ipAddress": ip,
"maxAgeInDays": str(self._max_age_days),
}
headers = {
"Key": self._api_key,
"Accept": "application/json",
}
try:
async with stealth_client() as client:
resp = await client.get(_ENDPOINT, headers=headers, params=params)
except Exception as exc: # noqa: BLE001
return IntelResult(provider=self.name, error=f"network: {exc}")
if resp.status_code != 200:
return IntelResult(
provider=self.name,
error=f"HTTP {resp.status_code}",
)
try:
payload = resp.json()
except Exception as exc: # noqa: BLE001
return IntelResult(provider=self.name, error=f"parse: {exc}")
data = payload.get("data") or {}
score = int(data.get("abuseConfidenceScore") or 0)
verdict = _score_to_verdict(score)
return IntelResult(
provider=self.name,
verdict=verdict,
column_updates={
"abuseipdb_score": score,
"abuseipdb_raw": json.dumps(data),
"abuseipdb_queried_at": datetime.now(timezone.utc),
},
)