93 lines
3.0 KiB
Python
93 lines
3.0 KiB
Python
"""
|
|
IP-to-ASN enrichment — maps attacker IPs to BGP-announced AS numbers and
|
|
org names for attacker intelligence.
|
|
|
|
Public surface mirrors :mod:`decnet.geoip` so callers can compose them:
|
|
|
|
* :func:`get_lookup` — returns the singleton :class:`AsnLookup`.
|
|
* :func:`enrich_ip` — takes an IP string, returns
|
|
``(asn_int, asn_name, provider_name)`` or ``(None, None, None)``.
|
|
|
|
Provider selection goes through :func:`~decnet.asn.factory.get_provider`
|
|
(env ``DECNET_ASN_PROVIDER``, default ``iptoasn``). Direct imports of
|
|
concrete providers are forbidden — mirrors the ``get_bus`` /
|
|
``get_repository`` rule.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import time
|
|
from typing import Optional, Tuple
|
|
|
|
from decnet.asn.factory import get_provider
|
|
from decnet.asn.lookup import AsnLookup
|
|
from decnet.asn.paths import ASN_ROOT
|
|
|
|
# 24 h — iptoasn refreshes daily.
|
|
REFRESH_INTERVAL_S = 86_400
|
|
|
|
_lookup: Optional[AsnLookup] = None
|
|
_provider_name: Optional[str] = None
|
|
|
|
|
|
def get_lookup(*, force_refresh: bool = False) -> AsnLookup:
|
|
"""Return the cached :class:`AsnLookup`, building it on first use.
|
|
|
|
If the provider's data files are missing or older than
|
|
``REFRESH_INTERVAL_S`` seconds, refresh before building. Pass
|
|
``force_refresh=True`` to bypass the age check (used by a future
|
|
``decnet asn refresh`` CLI command).
|
|
"""
|
|
global _lookup, _provider_name
|
|
provider = get_provider()
|
|
_provider_name = provider.name
|
|
|
|
if force_refresh or _files_stale(provider):
|
|
provider.refresh()
|
|
_lookup = None # rebuild on next access
|
|
|
|
if _lookup is None:
|
|
_lookup = provider.build_lookup()
|
|
return _lookup
|
|
|
|
|
|
def enrich_ip(ip: str) -> Tuple[Optional[int], Optional[str], Optional[str]]:
|
|
"""Return ``(asn, as_name, provider_name)`` or ``(None, None, None)``.
|
|
|
|
Never raises — any lookup failure collapses to all-None so the
|
|
caller (profiler) can upsert the attacker row regardless.
|
|
|
|
``DECNET_ASN_ENABLED=false`` short-circuits the whole path, useful
|
|
for tests / agent hosts / ops wanting to disable enrichment without
|
|
touching provider config.
|
|
"""
|
|
if os.environ.get("DECNET_ASN_ENABLED", "true").lower() == "false":
|
|
return (None, None, None)
|
|
try:
|
|
lookup = get_lookup()
|
|
info = lookup.asn(ip)
|
|
if info is None:
|
|
return (None, None, None)
|
|
return (info.asn, info.name or None, _provider_name or "unknown")
|
|
except Exception:
|
|
return (None, None, None)
|
|
|
|
|
|
def _files_stale(provider) -> bool:
|
|
"""True when the provider has no fresh data on disk.
|
|
|
|
Same semantics as :func:`decnet.geoip._files_stale`: a partial
|
|
cache still produces correct answers for the ranges it covers.
|
|
"""
|
|
paths = provider.data_paths()
|
|
if not paths:
|
|
return True
|
|
now = time.time()
|
|
for p in paths:
|
|
if p.exists() and now - p.stat().st_mtime <= REFRESH_INTERVAL_S:
|
|
return False
|
|
return True
|
|
|
|
|
|
__all__ = ["get_lookup", "enrich_ip", "ASN_ROOT", "REFRESH_INTERVAL_S"]
|