Mirrors decnet/geoip/ end-to-end: paths/base/factory/lookup at the package level, iptoasn/ subpackage holds the data-source-specific fetch+parse+provider. AsnLookup is bisect-indexed over (start, end, AsnInfo) ranges with a pickled cache invalidated on raw-file mtime bump. Why iptoasn (and not bgp.tools / Team Cymru): public-domain dump, zero attribution, no UA mandate, daily refresh — keeps DECNET stealth intact (the geoip/rir module's "never identify as DECNET" comment applies the same way here). bgp.tools' ToS would have required an identifying UA, conflicting with feedback_stealth. Public surface: decnet.asn.enrich_ip(ip) -> (asn, name, source) or all-None on miss/disabled. Same shape as decnet.geoip.enrich_ip so the profiler can compose them in one call site.
127 lines
4.1 KiB
Python
127 lines
4.1 KiB
Python
"""Provider-agnostic IP→ASN lookup.
|
|
|
|
A :class:`AsnLookup` is a frozen, sorted array of ``(start_ip,
|
|
end_ip_inclusive, AsnInfo)`` ranges queried via :mod:`bisect`.
|
|
O(log n) on ~600k ranges (a current iptoasn dump is ~580k rows).
|
|
|
|
Private/loopback/invalid IPv4 and all IPv6 addresses resolve to
|
|
``None`` — the same policy :mod:`decnet.geoip.lookup` uses.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import bisect
|
|
import ipaddress
|
|
import pickle # nosec B403 — self-produced cache under /var/lib/decnet, never deserialized from untrusted input
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Iterable, List, Optional, Tuple
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class AsnInfo:
|
|
"""One BGP-announced prefix's origin metadata."""
|
|
|
|
asn: int
|
|
name: str # AS description / org name; "" if absent in the source data
|
|
|
|
|
|
Range = Tuple[int, int, AsnInfo]
|
|
|
|
|
|
@dataclass
|
|
class AsnLookup:
|
|
"""Indexed AS lookup over IPv4 ranges."""
|
|
|
|
# Parallel arrays for bisect: _starts[i] is the start-IP of the i-th
|
|
# range, _ends[i] its inclusive end, _infos[i] its AsnInfo.
|
|
_starts: List[int]
|
|
_ends: List[int]
|
|
_infos: List[AsnInfo]
|
|
|
|
@classmethod
|
|
def from_ranges(cls, ranges: Iterable[Range]) -> "AsnLookup":
|
|
"""Build a lookup from ``(start, end_inclusive, AsnInfo)`` triples.
|
|
|
|
Ranges are sorted by start; on identical starts, last writer
|
|
wins (matches :class:`decnet.geoip.lookup.Lookup` semantics).
|
|
Non-overlapping adjacency is preserved.
|
|
"""
|
|
sorted_ranges = sorted(ranges, key=lambda r: (r[0], r[1]))
|
|
starts: List[int] = []
|
|
ends: List[int] = []
|
|
infos: List[AsnInfo] = []
|
|
for start, end, info in sorted_ranges:
|
|
if starts and starts[-1] == start:
|
|
ends[-1] = end
|
|
infos[-1] = info
|
|
continue
|
|
starts.append(start)
|
|
ends.append(end)
|
|
infos.append(info)
|
|
return cls(starts, ends, infos)
|
|
|
|
def asn(self, ip: str) -> Optional[AsnInfo]:
|
|
"""Return the :class:`AsnInfo` for ``ip`` or ``None``.
|
|
|
|
``None`` on: IPv6, private/loopback/link-local/multicast/reserved
|
|
addresses, malformed strings, and IPs outside every BGP-announced
|
|
range in the source dump.
|
|
"""
|
|
try:
|
|
addr = ipaddress.ip_address(ip)
|
|
except ValueError:
|
|
return None
|
|
if isinstance(addr, ipaddress.IPv6Address):
|
|
return None
|
|
if (
|
|
addr.is_private
|
|
or addr.is_loopback
|
|
or addr.is_link_local
|
|
or addr.is_multicast
|
|
or addr.is_reserved
|
|
or addr.is_unspecified
|
|
):
|
|
return None
|
|
|
|
n = int(addr)
|
|
idx = bisect.bisect_right(self._starts, n) - 1
|
|
if idx < 0:
|
|
return None
|
|
if n <= self._ends[idx]:
|
|
return self._infos[idx]
|
|
return None
|
|
|
|
def __len__(self) -> int:
|
|
return len(self._starts)
|
|
|
|
# ---------- persistence ----------
|
|
|
|
def save(self, path: Path) -> None:
|
|
"""Pickle the lookup to *path* (atomic rename)."""
|
|
tmp = path.with_suffix(path.suffix + ".tmp")
|
|
tmp.parent.mkdir(parents=True, exist_ok=True)
|
|
with tmp.open("wb") as fh:
|
|
pickle.dump(
|
|
{
|
|
"version": 1,
|
|
"starts": self._starts,
|
|
"ends": self._ends,
|
|
"infos": [(i.asn, i.name) for i in self._infos],
|
|
},
|
|
fh,
|
|
protocol=pickle.HIGHEST_PROTOCOL,
|
|
)
|
|
tmp.replace(path)
|
|
|
|
@classmethod
|
|
def load(cls, path: Path) -> "AsnLookup":
|
|
"""Load a pickled lookup from *path*."""
|
|
with path.open("rb") as fh:
|
|
data = pickle.load(fh) # nosec B301 — self-produced file under /var/lib/decnet
|
|
if data.get("version") != 1:
|
|
raise ValueError(
|
|
f"unsupported asn-lookup index version: {data.get('version')!r}"
|
|
)
|
|
infos = [AsnInfo(asn=a, name=n) for a, n in data["infos"]]
|
|
return cls(data["starts"], data["ends"], infos)
|