From 010568e55882baa2604eb40bf510f900618531bd Mon Sep 17 00:00:00 2001 From: anti Date: Sat, 25 Apr 2026 03:58:58 -0400 Subject: [PATCH] =?UTF-8?q?feat(asn):=20IP=E2=86=92ASN=20enrichment=20via?= =?UTF-8?q?=20iptoasn.com=20bulk=20dump?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors decnet/geoip/ end-to-end: paths/base/factory/lookup at the package level, iptoasn/ subpackage holds the data-source-specific fetch+parse+provider. AsnLookup is bisect-indexed over (start, end, AsnInfo) ranges with a pickled cache invalidated on raw-file mtime bump. Why iptoasn (and not bgp.tools / Team Cymru): public-domain dump, zero attribution, no UA mandate, daily refresh — keeps DECNET stealth intact (the geoip/rir module's "never identify as DECNET" comment applies the same way here). bgp.tools' ToS would have required an identifying UA, conflicting with feedback_stealth. Public surface: decnet.asn.enrich_ip(ip) -> (asn, name, source) or all-None on miss/disabled. Same shape as decnet.geoip.enrich_ip so the profiler can compose them in one call site. --- decnet/asn/__init__.py | 92 ++++++++++++++++++++++++ decnet/asn/base.py | 33 +++++++++ decnet/asn/factory.py | 39 ++++++++++ decnet/asn/iptoasn/__init__.py | 9 +++ decnet/asn/iptoasn/fetch.py | 63 +++++++++++++++++ decnet/asn/iptoasn/parse.py | 78 ++++++++++++++++++++ decnet/asn/iptoasn/provider.py | 83 ++++++++++++++++++++++ decnet/asn/lookup.py | 126 +++++++++++++++++++++++++++++++++ decnet/asn/paths.py | 18 +++++ tests/asn/__init__.py | 0 tests/asn/conftest.py | 22 ++++++ tests/asn/test_lookup.py | 74 +++++++++++++++++++ tests/asn/test_parse.py | 57 +++++++++++++++ tests/asn/test_provider.py | 95 +++++++++++++++++++++++++ 14 files changed, 789 insertions(+) create mode 100644 decnet/asn/__init__.py create mode 100644 decnet/asn/base.py create mode 100644 decnet/asn/factory.py create mode 100644 decnet/asn/iptoasn/__init__.py create mode 100644 decnet/asn/iptoasn/fetch.py create mode 100644 decnet/asn/iptoasn/parse.py create mode 100644 decnet/asn/iptoasn/provider.py create mode 100644 decnet/asn/lookup.py create mode 100644 decnet/asn/paths.py create mode 100644 tests/asn/__init__.py create mode 100644 tests/asn/conftest.py create mode 100644 tests/asn/test_lookup.py create mode 100644 tests/asn/test_parse.py create mode 100644 tests/asn/test_provider.py diff --git a/decnet/asn/__init__.py b/decnet/asn/__init__.py new file mode 100644 index 00000000..64224b0a --- /dev/null +++ b/decnet/asn/__init__.py @@ -0,0 +1,92 @@ +""" +IP-to-ASN enrichment — maps attacker IPs to BGP-announced AS numbers and +org names for attacker intelligence. + +Public surface mirrors :mod:`decnet.geoip` so callers can compose them: + +* :func:`get_lookup` — returns the singleton :class:`AsnLookup`. +* :func:`enrich_ip` — takes an IP string, returns + ``(asn_int, asn_name, provider_name)`` or ``(None, None, None)``. + +Provider selection goes through :func:`~decnet.asn.factory.get_provider` +(env ``DECNET_ASN_PROVIDER``, default ``iptoasn``). Direct imports of +concrete providers are forbidden — mirrors the ``get_bus`` / +``get_repository`` rule. +""" +from __future__ import annotations + +import os +import time +from typing import Optional, Tuple + +from decnet.asn.factory import get_provider +from decnet.asn.lookup import AsnLookup +from decnet.asn.paths import ASN_ROOT + +# 24 h — iptoasn refreshes daily. +REFRESH_INTERVAL_S = 86_400 + +_lookup: Optional[AsnLookup] = None +_provider_name: Optional[str] = None + + +def get_lookup(*, force_refresh: bool = False) -> AsnLookup: + """Return the cached :class:`AsnLookup`, building it on first use. + + If the provider's data files are missing or older than + ``REFRESH_INTERVAL_S`` seconds, refresh before building. Pass + ``force_refresh=True`` to bypass the age check (used by a future + ``decnet asn refresh`` CLI command). + """ + global _lookup, _provider_name + provider = get_provider() + _provider_name = provider.name + + if force_refresh or _files_stale(provider): + provider.refresh() + _lookup = None # rebuild on next access + + if _lookup is None: + _lookup = provider.build_lookup() + return _lookup + + +def enrich_ip(ip: str) -> Tuple[Optional[int], Optional[str], Optional[str]]: + """Return ``(asn, as_name, provider_name)`` or ``(None, None, None)``. + + Never raises — any lookup failure collapses to all-None so the + caller (profiler) can upsert the attacker row regardless. + + ``DECNET_ASN_ENABLED=false`` short-circuits the whole path, useful + for tests / agent hosts / ops wanting to disable enrichment without + touching provider config. + """ + if os.environ.get("DECNET_ASN_ENABLED", "true").lower() == "false": + return (None, None, None) + try: + lookup = get_lookup() + info = lookup.asn(ip) + if info is None: + return (None, None, None) + return (info.asn, info.name or None, _provider_name or "unknown") + except Exception: + return (None, None, None) + + +def _files_stale(provider) -> bool: + """True when the provider has no fresh data on disk. + + Same semantics as :func:`decnet.geoip._files_stale`: a partial + cache still produces correct answers for the ranges it covers. + """ + paths = provider.data_paths() + if not paths: + return True + now = time.time() + for p in paths: + if p.exists() and now - p.stat().st_mtime <= REFRESH_INTERVAL_S: + return False + return True + + +__all__ = ["get_lookup", "enrich_ip", "ASN_ROOT", "REFRESH_INTERVAL_S"] diff --git a/decnet/asn/base.py b/decnet/asn/base.py new file mode 100644 index 00000000..418d6529 --- /dev/null +++ b/decnet/asn/base.py @@ -0,0 +1,33 @@ +"""ASN provider protocol — mirror of :mod:`decnet.geoip.base`. + +Concrete providers (e.g. :mod:`decnet.asn.iptoasn`) implement this. +Callers must go through :func:`decnet.asn.factory.get_provider`; never +import a concrete provider class directly. +""" +from __future__ import annotations + +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Sequence + +from decnet.asn.lookup import AsnLookup + + +class Provider(ABC): + """Abstract IP→ASN data provider.""" + + #: Short tag written to ``Attacker.asn_source`` (e.g. ``'iptoasn'``). + name: str + + @abstractmethod + def refresh(self) -> None: + """Download / regenerate the provider's raw data files.""" + + @abstractmethod + def build_lookup(self) -> AsnLookup: + """Parse the on-disk data files and return a ready-to-query lookup.""" + + @abstractmethod + def data_paths(self) -> Sequence[Path]: + """Return the list of files this provider manages — used for staleness + detection. Order is not significant.""" diff --git a/decnet/asn/factory.py b/decnet/asn/factory.py new file mode 100644 index 00000000..c1a63f8f --- /dev/null +++ b/decnet/asn/factory.py @@ -0,0 +1,39 @@ +"""ASN provider factory — mirror of :mod:`decnet.geoip.factory`. + +Dispatch key: ``DECNET_ASN_PROVIDER`` (default ``iptoasn``). Lazy +singleton. +""" +from __future__ import annotations + +import os +from typing import Optional + +from decnet.asn.base import Provider + +_cached: Optional[Provider] = None +_cached_key: Optional[str] = None + + +def get_provider() -> Provider: + """Return the configured :class:`Provider` singleton.""" + global _cached, _cached_key + key = os.environ.get("DECNET_ASN_PROVIDER", "iptoasn").lower() + if _cached is not None and _cached_key == key: + return _cached + + if key == "iptoasn": + from decnet.asn.iptoasn.provider import IptoasnProvider + provider: Provider = IptoasnProvider() + else: + raise ValueError(f"Unsupported ASN provider: {key!r}") + + _cached = provider + _cached_key = key + return provider + + +def reset_cache() -> None: + """Forget the singleton — tests swap providers via the env var.""" + global _cached, _cached_key + _cached = None + _cached_key = None diff --git a/decnet/asn/iptoasn/__init__.py b/decnet/asn/iptoasn/__init__.py new file mode 100644 index 00000000..081f216b --- /dev/null +++ b/decnet/asn/iptoasn/__init__.py @@ -0,0 +1,9 @@ +"""iptoasn.com IP→ASN provider. + +Daily-refreshed gzipped TSV dump of the global BGP table, derived from +RIPE RIS. Released into the public domain by upstream — no attribution +required, no UA mandate, no terms to violate. + +Direct imports of :class:`IptoasnProvider` are discouraged — go through +:func:`decnet.asn.factory.get_provider`. +""" diff --git a/decnet/asn/iptoasn/fetch.py b/decnet/asn/iptoasn/fetch.py new file mode 100644 index 00000000..c087da22 --- /dev/null +++ b/decnet/asn/iptoasn/fetch.py @@ -0,0 +1,63 @@ +"""iptoasn.com bulk dump download. + +One file: ``ip2asn-v4.tsv.gz``, ~5 MB compressed, refreshed daily. +Pulled over HTTPS with the same generic UA the geoip RIR fetcher uses +(stealth: never identify as DECNET — public-data scrapers correlated to +honeypot operator egress is the threat model). +""" +from __future__ import annotations + +import logging +import shutil +import urllib.request +from pathlib import Path +from typing import Tuple + +logger = logging.getLogger("decnet.asn.iptoasn.fetch") + +# Mirror the (name, url) tuple shape of geoip.rir.fetch so test +# harnesses can swap one for the other. +IPTOASN_SOURCES: Tuple[Tuple[str, str], ...] = ( + ("ip2asn-v4", "https://iptoasn.com/data/ip2asn-v4.tsv.gz"), +) + +# Generic UA — matches geoip.rir.fetch. iptoasn.com explicitly releases +# the data into the public domain and does NOT require an identifying UA, +# so we keep DECNET stealth instead of advertising. +_USER_AGENT = "Mozilla/5.0 (compatible; fetch/1.0)" +_TIMEOUT_S = 60 + + +def fetch_all(dest: Path) -> list[Path]: + """Download every iptoasn file into *dest*. Returns the written paths. + + Atomic per file: download to ``{name}.tsv.gz.tmp`` then rename. A + partial failure leaves the previous generation intact. + """ + dest.mkdir(parents=True, exist_ok=True) + written: list[Path] = [] + for name, url in IPTOASN_SOURCES: + target = dest / f"{name}.tsv.gz" + tmp = target.with_suffix(".gz.tmp") + try: + _download(url, tmp) + tmp.replace(target) + written.append(target) + logger.info( + "asn.iptoasn: fetched %s (%d bytes)", + name, target.stat().st_size, + ) + except Exception as exc: + logger.error( + "asn.iptoasn: fetch failed for %s (%s): %s", name, url, exc + ) + if tmp.exists(): + tmp.unlink(missing_ok=True) + # Keep any stale previous file — better outdated than empty. + return written + + +def _download(url: str, dest: Path) -> None: + req = urllib.request.Request(url, headers={"User-Agent": _USER_AGENT}) + with urllib.request.urlopen(req, timeout=_TIMEOUT_S) as resp, dest.open("wb") as fh: # nosec B310 — fixed https iptoasn URL + shutil.copyfileobj(resp, fh) diff --git a/decnet/asn/iptoasn/parse.py b/decnet/asn/iptoasn/parse.py new file mode 100644 index 00000000..47db413d --- /dev/null +++ b/decnet/asn/iptoasn/parse.py @@ -0,0 +1,78 @@ +"""Parser for the iptoasn.com ``ip2asn-v4.tsv`` dump. + +Line shape (gzipped, one row per BGP-announced prefix):: + + 1.0.0.0\\t1.0.0.255\\t13335\\tUS\\tCLOUDFLARENET + +Fields: ``range_start``, ``range_end``, ``as_number``, ``country_code``, +``as_description``. Both range columns are dotted IPv4 strings (the dump +is IPv4-only — there's a separate ``ip2asn-v6.tsv.gz`` we don't pull). + +Rows skipped: + +* ``as_number == 0`` — iptoasn's sentinel for "unannounced" / private + / reserved space. Country may still be present (``"None"`` / two-letter + CC) but we don't care: the geoip module owns country, ASN owns BGP. +* Rows where either range column won't parse as IPv4. +* Rows with fewer than 3 tab-separated columns. +""" +from __future__ import annotations + +import gzip +import ipaddress +import logging +from pathlib import Path +from typing import Iterator + +from decnet.asn.lookup import AsnInfo, Range + +logger = logging.getLogger("decnet.asn.iptoasn.parse") + + +def parse_file(path: Path) -> Iterator[Range]: + """Yield ``(start_int, end_int_inclusive, AsnInfo)`` for every BGP row. + + Accepts a gzipped path (``*.tsv.gz``); plain TSV is also fine for + test harnesses that hand-craft small fixtures. + """ + opener = gzip.open if path.suffix == ".gz" else open + with opener(path, "rt", encoding="utf-8", errors="replace") as fh: + for lineno, raw in enumerate(fh, 1): + line = raw.rstrip("\n") + if not line: + continue + parts = line.split("\t") + if len(parts) < 3: + continue + start_s, end_s, asn_s = parts[0], parts[1], parts[2] + # Description is the 5th column; iptoasn quotes nothing, + # but the field can contain stray whitespace. ``""`` when + # missing or unknown. + name = parts[4].strip() if len(parts) >= 5 else "" + + try: + asn = int(asn_s) + except ValueError: + logger.debug( + "asn.iptoasn: skipping malformed asn line %d in %s", + lineno, path.name, + ) + continue + # ASN 0 is iptoasn's sentinel for unannounced / sentinel + # space. Skip — there's no useful enrichment to attach. + if asn == 0: + continue + + try: + start_int = int(ipaddress.IPv4Address(start_s)) + end_int = int(ipaddress.IPv4Address(end_s)) + except (ValueError, ipaddress.AddressValueError): + logger.debug( + "asn.iptoasn: skipping malformed addr line %d in %s", + lineno, path.name, + ) + continue + if end_int < start_int: + continue + + yield (start_int, end_int, AsnInfo(asn=asn, name=name)) diff --git a/decnet/asn/iptoasn/provider.py b/decnet/asn/iptoasn/provider.py new file mode 100644 index 00000000..fbd243b5 --- /dev/null +++ b/decnet/asn/iptoasn/provider.py @@ -0,0 +1,83 @@ +"""iptoasn provider — orchestrates fetch + parse into an :class:`AsnLookup`. + +Mirrors :class:`decnet.geoip.rir.provider.RirProvider` exactly: fetch, +build a pickled cache, invalidate when raw files are newer than the +cache. +""" +from __future__ import annotations + +import logging +from pathlib import Path +from typing import Sequence + +from decnet.asn.base import Provider +from decnet.asn.iptoasn.fetch import IPTOASN_SOURCES, fetch_all +from decnet.asn.iptoasn.parse import parse_file +from decnet.asn.lookup import AsnLookup +from decnet.asn.paths import ensure_root + +logger = logging.getLogger("decnet.asn.iptoasn.provider") + +# Pickled lookup cache — skips re-parsing the ~580k-row gz dump on every +# profiler restart. Rebuilt whenever any raw file is newer than the +# cache, see ``_cache_fresh``. +_CACHE_NAME = ".iptoasn_index.pkl" + + +class IptoasnProvider(Provider): + name = "iptoasn" + + def __init__(self) -> None: + self._root = ensure_root() + + # ---------- Provider interface ---------- + + def refresh(self) -> None: + logger.info("asn.iptoasn: refreshing dump into %s", self._root) + fetch_all(self._root) + cache = self._root / _CACHE_NAME + if cache.exists(): + cache.unlink(missing_ok=True) + + def build_lookup(self) -> AsnLookup: + cache = self._root / _CACHE_NAME + if self._cache_fresh(cache): + try: + lookup = AsnLookup.load(cache) + logger.debug( + "asn.iptoasn: loaded cached index (%d ranges)", + len(lookup), + ) + return lookup + except Exception as exc: + logger.warning( + "asn.iptoasn: cache load failed, rebuilding: %s", exc + ) + + ranges = [] + for path in self.data_paths(): + if not path.exists(): + continue + ranges.extend(parse_file(path)) + lookup = AsnLookup.from_ranges(ranges) + try: + lookup.save(cache) + except Exception as exc: + logger.warning("asn.iptoasn: cache save failed: %s", exc) + logger.info("asn.iptoasn: built index with %d ranges", len(lookup)) + return lookup + + def data_paths(self) -> Sequence[Path]: + return [self._root / f"{name}.tsv.gz" for name, _url in IPTOASN_SOURCES] + + # ---------- internals ---------- + + def _cache_fresh(self, cache: Path) -> bool: + """True when the pickle exists and is at least as new as every raw file.""" + if not cache.exists(): + return False + cache_mtime = cache.stat().st_mtime + for path in self.data_paths(): + if path.exists() and path.stat().st_mtime > cache_mtime: + return False + return True diff --git a/decnet/asn/lookup.py b/decnet/asn/lookup.py new file mode 100644 index 00000000..e3d6272b --- /dev/null +++ b/decnet/asn/lookup.py @@ -0,0 +1,126 @@ +"""Provider-agnostic IP→ASN lookup. + +A :class:`AsnLookup` is a frozen, sorted array of ``(start_ip, +end_ip_inclusive, AsnInfo)`` ranges queried via :mod:`bisect`. +O(log n) on ~600k ranges (a current iptoasn dump is ~580k rows). + +Private/loopback/invalid IPv4 and all IPv6 addresses resolve to +``None`` — the same policy :mod:`decnet.geoip.lookup` uses. +""" +from __future__ import annotations + +import bisect +import ipaddress +import pickle # nosec B403 — self-produced cache under /var/lib/decnet, never deserialized from untrusted input +from dataclasses import dataclass +from pathlib import Path +from typing import Iterable, List, Optional, Tuple + + +@dataclass(frozen=True) +class AsnInfo: + """One BGP-announced prefix's origin metadata.""" + + asn: int + name: str # AS description / org name; "" if absent in the source data + + +Range = Tuple[int, int, AsnInfo] + + +@dataclass +class AsnLookup: + """Indexed AS lookup over IPv4 ranges.""" + + # Parallel arrays for bisect: _starts[i] is the start-IP of the i-th + # range, _ends[i] its inclusive end, _infos[i] its AsnInfo. + _starts: List[int] + _ends: List[int] + _infos: List[AsnInfo] + + @classmethod + def from_ranges(cls, ranges: Iterable[Range]) -> "AsnLookup": + """Build a lookup from ``(start, end_inclusive, AsnInfo)`` triples. + + Ranges are sorted by start; on identical starts, last writer + wins (matches :class:`decnet.geoip.lookup.Lookup` semantics). + Non-overlapping adjacency is preserved. + """ + sorted_ranges = sorted(ranges, key=lambda r: (r[0], r[1])) + starts: List[int] = [] + ends: List[int] = [] + infos: List[AsnInfo] = [] + for start, end, info in sorted_ranges: + if starts and starts[-1] == start: + ends[-1] = end + infos[-1] = info + continue + starts.append(start) + ends.append(end) + infos.append(info) + return cls(starts, ends, infos) + + def asn(self, ip: str) -> Optional[AsnInfo]: + """Return the :class:`AsnInfo` for ``ip`` or ``None``. + + ``None`` on: IPv6, private/loopback/link-local/multicast/reserved + addresses, malformed strings, and IPs outside every BGP-announced + range in the source dump. + """ + try: + addr = ipaddress.ip_address(ip) + except ValueError: + return None + if isinstance(addr, ipaddress.IPv6Address): + return None + if ( + addr.is_private + or addr.is_loopback + or addr.is_link_local + or addr.is_multicast + or addr.is_reserved + or addr.is_unspecified + ): + return None + + n = int(addr) + idx = bisect.bisect_right(self._starts, n) - 1 + if idx < 0: + return None + if n <= self._ends[idx]: + return self._infos[idx] + return None + + def __len__(self) -> int: + return len(self._starts) + + # ---------- persistence ---------- + + def save(self, path: Path) -> None: + """Pickle the lookup to *path* (atomic rename).""" + tmp = path.with_suffix(path.suffix + ".tmp") + tmp.parent.mkdir(parents=True, exist_ok=True) + with tmp.open("wb") as fh: + pickle.dump( + { + "version": 1, + "starts": self._starts, + "ends": self._ends, + "infos": [(i.asn, i.name) for i in self._infos], + }, + fh, + protocol=pickle.HIGHEST_PROTOCOL, + ) + tmp.replace(path) + + @classmethod + def load(cls, path: Path) -> "AsnLookup": + """Load a pickled lookup from *path*.""" + with path.open("rb") as fh: + data = pickle.load(fh) # nosec B301 — self-produced file under /var/lib/decnet + if data.get("version") != 1: + raise ValueError( + f"unsupported asn-lookup index version: {data.get('version')!r}" + ) + infos = [AsnInfo(asn=a, name=n) for a, n in data["infos"]] + return cls(data["starts"], data["ends"], infos) diff --git a/decnet/asn/paths.py b/decnet/asn/paths.py new file mode 100644 index 00000000..b78c665d --- /dev/null +++ b/decnet/asn/paths.py @@ -0,0 +1,18 @@ +"""Filesystem layout for ASN data — mirror of :mod:`decnet.geoip.paths`. + +``ASN_ROOT`` is where providers drop their raw files and cache indexes. +Default ``/var/lib/decnet/asn``. Override with ``DECNET_ASN_ROOT`` for +test harnesses. +""" +from __future__ import annotations + +import os +from pathlib import Path + +ASN_ROOT = Path(os.environ.get("DECNET_ASN_ROOT", "/var/lib/decnet/asn")) + + +def ensure_root() -> Path: + """Create ``ASN_ROOT`` if absent and return it. No-op if present.""" + ASN_ROOT.mkdir(parents=True, exist_ok=True) + return ASN_ROOT diff --git a/tests/asn/__init__.py b/tests/asn/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/asn/conftest.py b/tests/asn/conftest.py new file mode 100644 index 00000000..4afd0665 --- /dev/null +++ b/tests/asn/conftest.py @@ -0,0 +1,22 @@ +"""Per-package fixtures — sandbox the ASN provider into a tmp dir so no +real /var/lib/decnet paths get touched and no real iptoasn URL gets +fetched.""" +from __future__ import annotations + +from pathlib import Path + +import pytest + + +@pytest.fixture(autouse=True) +def _asn_sandbox(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path: + monkeypatch.setenv("DECNET_ASN_ENABLED", "true") + monkeypatch.setenv("DECNET_ASN_ROOT", str(tmp_path)) + import decnet.asn as _a + import decnet.asn.factory as _f + import decnet.asn.paths as _p + monkeypatch.setattr(_p, "ASN_ROOT", tmp_path) + _a._lookup = None + _a._provider_name = None + _f.reset_cache() + return tmp_path diff --git a/tests/asn/test_lookup.py b/tests/asn/test_lookup.py new file mode 100644 index 00000000..149e04ca --- /dev/null +++ b/tests/asn/test_lookup.py @@ -0,0 +1,74 @@ +"""AsnLookup index tests.""" +from __future__ import annotations + +import ipaddress +from pathlib import Path + +from decnet.asn.lookup import AsnInfo, AsnLookup + + +def _ip(s: str) -> int: + return int(ipaddress.IPv4Address(s)) + + +def _fixture_lookup() -> AsnLookup: + return AsnLookup.from_ranges([ + (_ip("8.8.8.0"), _ip("8.8.8.255"), AsnInfo(15169, "GOOGLE")), + (_ip("1.0.0.0"), _ip("1.0.0.255"), AsnInfo(13335, "CLOUDFLARENET")), + (_ip("46.101.0.0"), _ip("46.101.255.255"), AsnInfo(14061, "DIGITALOCEAN")), + ]) + + +def test_asn_hits_known_ranges() -> None: + lookup = _fixture_lookup() + assert lookup.asn("8.8.8.8").asn == 15169 + assert lookup.asn("1.0.0.5").name == "CLOUDFLARENET" + assert lookup.asn("46.101.10.20").asn == 14061 + + +def test_asn_misses_gap() -> None: + lookup = _fixture_lookup() + assert lookup.asn("9.0.0.0") is None + + +def test_asn_private_returns_none() -> None: + lookup = _fixture_lookup() + for ip in ("10.0.0.1", "192.168.1.1", "172.16.0.1", "127.0.0.1", "0.0.0.0"): + assert lookup.asn(ip) is None, ip + + +def test_asn_ipv6_returns_none() -> None: + lookup = _fixture_lookup() + assert lookup.asn("2001:db8::1") is None + assert lookup.asn("::1") is None + + +def test_asn_invalid_returns_none() -> None: + lookup = _fixture_lookup() + assert lookup.asn("not-an-ip") is None + assert lookup.asn("") is None + + +def test_lookup_roundtrips_through_pickle(tmp_path: Path) -> None: + lookup = _fixture_lookup() + cache = tmp_path / "idx.pkl" + lookup.save(cache) + loaded = AsnLookup.load(cache) + assert len(loaded) == len(lookup) + assert loaded.asn("8.8.8.8").asn == 15169 + assert loaded.asn("8.8.8.8").name == "GOOGLE" + + +def test_from_ranges_last_writer_wins_on_collision() -> None: + lookup = AsnLookup.from_ranges([ + (_ip("1.0.0.0"), _ip("1.0.0.255"), AsnInfo(1, "first")), + (_ip("1.0.0.0"), _ip("1.0.0.255"), AsnInfo(2, "second")), + ]) + assert lookup.asn("1.0.0.5").asn == 2 + + +def test_boundary_inclusive() -> None: + lookup = _fixture_lookup() + assert lookup.asn("8.8.8.0").asn == 15169 + assert lookup.asn("8.8.8.255").asn == 15169 + assert lookup.asn("8.8.9.0") is None diff --git a/tests/asn/test_parse.py b/tests/asn/test_parse.py new file mode 100644 index 00000000..f28b5da1 --- /dev/null +++ b/tests/asn/test_parse.py @@ -0,0 +1,57 @@ +"""Parser tests for the iptoasn TSV dump.""" +from __future__ import annotations + +import gzip +import ipaddress +from pathlib import Path + +from decnet.asn.iptoasn.parse import parse_file + + +_FIXTURE_TSV = ( + "1.0.0.0\t1.0.0.255\t13335\tUS\tCLOUDFLARENET\n" + "8.8.8.0\t8.8.8.255\t15169\tUS\tGOOGLE\n" + # ASN 0 sentinel — must be skipped. + "100.64.0.0\t100.127.255.255\t0\tNone\tNot routed\n" + # Malformed addresses — skipped. + "garbage\tnonsense\t12345\tXX\twhatever\n" + # Reversed range (end < start) — skipped. + "10.0.0.10\t10.0.0.5\t99999\tXX\tBackwards\n" + # Valid row with empty description. + "46.101.0.0\t46.101.255.255\t14061\tDE\t\n" +) + + +def test_parse_plain_tsv(tmp_path: Path) -> None: + fixture = tmp_path / "ip2asn-v4.tsv" + fixture.write_text(_FIXTURE_TSV) + ranges = list(parse_file(fixture)) + asns = {r[2].asn for r in ranges} + assert asns == {13335, 15169, 14061} + + +def test_parse_gzipped(tmp_path: Path) -> None: + fixture = tmp_path / "ip2asn-v4.tsv.gz" + with gzip.open(fixture, "wt", encoding="utf-8") as fh: + fh.write(_FIXTURE_TSV) + ranges = list(parse_file(fixture)) + asns = {r[2].asn for r in ranges} + assert 13335 in asns and 15169 in asns + + +def test_parse_range_boundaries(tmp_path: Path) -> None: + fixture = tmp_path / "ip2asn-v4.tsv" + fixture.write_text(_FIXTURE_TSV) + ranges = [r for r in parse_file(fixture) if r[2].asn == 15169] + assert len(ranges) == 1 + start, end, info = ranges[0] + assert start == int(ipaddress.IPv4Address("8.8.8.0")) + assert end == int(ipaddress.IPv4Address("8.8.8.255")) + assert info.name == "GOOGLE" + + +def test_parse_empty_description_kept(tmp_path: Path) -> None: + fixture = tmp_path / "ip2asn-v4.tsv" + fixture.write_text(_FIXTURE_TSV) + ranges = [r for r in parse_file(fixture) if r[2].asn == 14061] + assert ranges[0][2].name == "" diff --git a/tests/asn/test_provider.py b/tests/asn/test_provider.py new file mode 100644 index 00000000..707befc7 --- /dev/null +++ b/tests/asn/test_provider.py @@ -0,0 +1,95 @@ +"""IptoasnProvider + factory + public API tests.""" +from __future__ import annotations + +import gzip +from pathlib import Path + +import pytest + + +def _seed_fixture(root: Path, content: str = "8.8.8.0\t8.8.8.255\t15169\tUS\tGOOGLE\n") -> None: + target = root / "ip2asn-v4.tsv.gz" + with gzip.open(target, "wt", encoding="utf-8") as fh: + fh.write(content) + + +def test_factory_returns_iptoasn_by_default() -> None: + from decnet.asn.factory import get_provider + + provider = get_provider() + assert provider.name == "iptoasn" + + +def test_factory_rejects_unknown_provider(monkeypatch: pytest.MonkeyPatch) -> None: + from decnet.asn import factory + + monkeypatch.setenv("DECNET_ASN_PROVIDER", "nope") + factory.reset_cache() + with pytest.raises(ValueError): + factory.get_provider() + + +def test_provider_build_lookup_empty_when_no_files(tmp_path: Path) -> None: + from decnet.asn.iptoasn.provider import IptoasnProvider + + p = IptoasnProvider() + lookup = p.build_lookup() + assert len(lookup) == 0 + assert lookup.asn("8.8.8.8") is None + + +def test_provider_build_lookup_reads_present_file(tmp_path: Path) -> None: + from decnet.asn.iptoasn.provider import IptoasnProvider + + _seed_fixture(tmp_path) + p = IptoasnProvider() + lookup = p.build_lookup() + info = lookup.asn("8.8.8.8") + assert info is not None + assert info.asn == 15169 + assert info.name == "GOOGLE" + + +def test_provider_uses_cache_when_fresh(tmp_path: Path) -> None: + from decnet.asn.iptoasn.provider import IptoasnProvider + + _seed_fixture(tmp_path) + p = IptoasnProvider() + a = p.build_lookup() + assert (tmp_path / ".iptoasn_index.pkl").exists() + + p2 = IptoasnProvider() + b = p2.build_lookup() + assert len(b) == len(a) + + +def test_enrich_ip_short_circuits_when_disabled(monkeypatch: pytest.MonkeyPatch) -> None: + import decnet.asn as asn + + monkeypatch.setenv("DECNET_ASN_ENABLED", "false") + assert asn.enrich_ip("8.8.8.8") == (None, None, None) + + +def test_enrich_ip_returns_asn_and_source(tmp_path: Path) -> None: + from decnet.asn import enrich_ip + + _seed_fixture(tmp_path) + asn, name, src = enrich_ip("8.8.8.8") + assert asn == 15169 + assert name == "GOOGLE" + assert src == "iptoasn" + + +def test_enrich_ip_private_returns_none(tmp_path: Path) -> None: + from decnet.asn import enrich_ip + + _seed_fixture(tmp_path) + assert enrich_ip("192.168.1.1") == (None, None, None) + + +def test_enrich_ip_unannounced_returns_none(tmp_path: Path) -> None: + from decnet.asn import enrich_ip + + _seed_fixture(tmp_path) + # 9.0.0.0 isn't in our fixture range — no BGP announcement we know of. + assert enrich_ip("9.0.0.0") == (None, None, None)