feat(asn): IP→ASN enrichment via iptoasn.com bulk dump

Mirrors decnet/geoip/ end-to-end: paths/base/factory/lookup at the
package level, iptoasn/ subpackage holds the data-source-specific
fetch+parse+provider. AsnLookup is bisect-indexed over (start, end,
AsnInfo) ranges with a pickled cache invalidated on raw-file mtime
bump.

Why iptoasn (and not bgp.tools / Team Cymru): public-domain dump,
zero attribution, no UA mandate, daily refresh — keeps DECNET stealth
intact (the geoip/rir module's "never identify as DECNET" comment
applies the same way here). bgp.tools' ToS would have required an
identifying UA, conflicting with feedback_stealth.

Public surface: decnet.asn.enrich_ip(ip) -> (asn, name, source) or
all-None on miss/disabled. Same shape as decnet.geoip.enrich_ip so
the profiler can compose them in one call site.
This commit is contained in:
2026-04-25 03:58:58 -04:00
parent 77a19ffe9f
commit 010568e558
14 changed files with 789 additions and 0 deletions

View File

@@ -0,0 +1,78 @@
"""Parser for the iptoasn.com ``ip2asn-v4.tsv`` dump.
Line shape (gzipped, one row per BGP-announced prefix)::
1.0.0.0\\t1.0.0.255\\t13335\\tUS\\tCLOUDFLARENET
Fields: ``range_start``, ``range_end``, ``as_number``, ``country_code``,
``as_description``. Both range columns are dotted IPv4 strings (the dump
is IPv4-only — there's a separate ``ip2asn-v6.tsv.gz`` we don't pull).
Rows skipped:
* ``as_number == 0`` — iptoasn's sentinel for "unannounced" / private
/ reserved space. Country may still be present (``"None"`` / two-letter
CC) but we don't care: the geoip module owns country, ASN owns BGP.
* Rows where either range column won't parse as IPv4.
* Rows with fewer than 3 tab-separated columns.
"""
from __future__ import annotations
import gzip
import ipaddress
import logging
from pathlib import Path
from typing import Iterator
from decnet.asn.lookup import AsnInfo, Range
logger = logging.getLogger("decnet.asn.iptoasn.parse")
def parse_file(path: Path) -> Iterator[Range]:
"""Yield ``(start_int, end_int_inclusive, AsnInfo)`` for every BGP row.
Accepts a gzipped path (``*.tsv.gz``); plain TSV is also fine for
test harnesses that hand-craft small fixtures.
"""
opener = gzip.open if path.suffix == ".gz" else open
with opener(path, "rt", encoding="utf-8", errors="replace") as fh:
for lineno, raw in enumerate(fh, 1):
line = raw.rstrip("\n")
if not line:
continue
parts = line.split("\t")
if len(parts) < 3:
continue
start_s, end_s, asn_s = parts[0], parts[1], parts[2]
# Description is the 5th column; iptoasn quotes nothing,
# but the field can contain stray whitespace. ``""`` when
# missing or unknown.
name = parts[4].strip() if len(parts) >= 5 else ""
try:
asn = int(asn_s)
except ValueError:
logger.debug(
"asn.iptoasn: skipping malformed asn line %d in %s",
lineno, path.name,
)
continue
# ASN 0 is iptoasn's sentinel for unannounced / sentinel
# space. Skip — there's no useful enrichment to attach.
if asn == 0:
continue
try:
start_int = int(ipaddress.IPv4Address(start_s))
end_int = int(ipaddress.IPv4Address(end_s))
except (ValueError, ipaddress.AddressValueError):
logger.debug(
"asn.iptoasn: skipping malformed addr line %d in %s",
lineno, path.name,
)
continue
if end_int < start_int:
continue
yield (start_int, end_int, AsnInfo(asn=asn, name=name))