From e3d9908bed92d23e74a90b2271c8937aa63f8cbe Mon Sep 17 00:00:00 2001 From: anti Date: Thu, 21 May 2026 16:07:57 -0400 Subject: [PATCH] feat(asn): expose BGP prefix in AsnInfo and enrich_ip Synthesize the covering CIDR at lookup time from the matched iptoasn range using ipaddress.summarize_address_range. AsnInfo.prefix is populated per-query; not persisted in the pickle cache. enrich_ip now returns (asn, as_name, bgp_prefix, provider_name). Profiler worker updated to unpack the 4-tuple and write bgp_prefix into the attacker record dict. --- decnet/asn/__init__.py | 14 +++++++------- decnet/asn/lookup.py | 18 +++++++++++++++++- decnet/profiler/worker.py | 3 ++- tests/asn/test_lookup.py | 33 +++++++++++++++++++++++++++++++++ tests/asn/test_provider.py | 11 ++++++----- 5 files changed, 65 insertions(+), 14 deletions(-) diff --git a/decnet/asn/__init__.py b/decnet/asn/__init__.py index 64224b0a..32385609 100644 --- a/decnet/asn/__init__.py +++ b/decnet/asn/__init__.py @@ -6,7 +6,7 @@ Public surface mirrors :mod:`decnet.geoip` so callers can compose them: * :func:`get_lookup` — returns the singleton :class:`AsnLookup`. * :func:`enrich_ip` — takes an IP string, returns - ``(asn_int, asn_name, provider_name)`` or ``(None, None, None)``. + ``(asn_int, asn_name, bgp_prefix, provider_name)`` or ``(None, None, None, None)``. Provider selection goes through :func:`~decnet.asn.factory.get_provider` (env ``DECNET_ASN_PROVIDER``, default ``iptoasn``). Direct imports of @@ -51,8 +51,8 @@ def get_lookup(*, force_refresh: bool = False) -> AsnLookup: return _lookup -def enrich_ip(ip: str) -> Tuple[Optional[int], Optional[str], Optional[str]]: - """Return ``(asn, as_name, provider_name)`` or ``(None, None, None)``. +def enrich_ip(ip: str) -> Tuple[Optional[int], Optional[str], Optional[str], Optional[str]]: + """Return ``(asn, as_name, bgp_prefix, provider_name)`` or ``(None, None, None, None)``. Never raises — any lookup failure collapses to all-None so the caller (profiler) can upsert the attacker row regardless. @@ -62,15 +62,15 @@ def enrich_ip(ip: str) -> Tuple[Optional[int], Optional[str], Optional[str]]: touching provider config. """ if os.environ.get("DECNET_ASN_ENABLED", "true").lower() == "false": - return (None, None, None) + return (None, None, None, None) try: lookup = get_lookup() info = lookup.asn(ip) if info is None: - return (None, None, None) - return (info.asn, info.name or None, _provider_name or "unknown") + return (None, None, None, None) + return (info.asn, info.name or None, info.prefix, _provider_name or "unknown") except Exception: - return (None, None, None) + return (None, None, None, None) def _files_stale(provider) -> bool: diff --git a/decnet/asn/lookup.py b/decnet/asn/lookup.py index e3d6272b..fdf5087d 100644 --- a/decnet/asn/lookup.py +++ b/decnet/asn/lookup.py @@ -23,11 +23,25 @@ class AsnInfo: asn: int name: str # AS description / org name; "" if absent in the source data + prefix: Optional[str] = None # synthesized covering CIDR; set at lookup time, not at rest Range = Tuple[int, int, AsnInfo] +def _synthesize_prefix(start_int: int, end_int: int, queried_int: int) -> Optional[str]: + """Return the most-specific CIDR from [start, end] that contains queried_int.""" + try: + for net in ipaddress.summarize_address_range( + ipaddress.IPv4Address(start_int), ipaddress.IPv4Address(end_int) + ): + if queried_int >= int(net.network_address) and queried_int <= int(net.broadcast_address): + return str(net) + except (ValueError, TypeError): + pass + return None + + @dataclass class AsnLookup: """Indexed AS lookup over IPv4 ranges.""" @@ -88,7 +102,9 @@ class AsnLookup: if idx < 0: return None if n <= self._ends[idx]: - return self._infos[idx] + info = self._infos[idx] + prefix = _synthesize_prefix(self._starts[idx], self._ends[idx], n) + return AsnInfo(asn=info.asn, name=info.name, prefix=prefix) return None def __len__(self) -> int: diff --git a/decnet/profiler/worker.py b/decnet/profiler/worker.py index 501fd2d9..83510298 100644 --- a/decnet/profiler/worker.py +++ b/decnet/profiler/worker.py @@ -356,7 +356,7 @@ def _build_record( fingerprints = [b for b in bounties if b.get("bounty_type") == "fingerprint"] credential_count = sum(1 for b in bounties if b.get("bounty_type") == "credential") country_code, country_source = enrich_ip(ip) - asn, as_name, asn_source = enrich_ip_asn(ip) + asn, as_name, bgp_prefix, asn_source = enrich_ip_asn(ip) record: dict[str, Any] = { "ip": ip, @@ -377,6 +377,7 @@ def _build_record( "country_source": country_source, "asn": asn, "as_name": as_name, + "bgp_prefix": bgp_prefix, "asn_source": asn_source, "updated_at": datetime.now(timezone.utc), } diff --git a/tests/asn/test_lookup.py b/tests/asn/test_lookup.py index 149e04ca..54dd5833 100644 --- a/tests/asn/test_lookup.py +++ b/tests/asn/test_lookup.py @@ -26,6 +26,39 @@ def test_asn_hits_known_ranges() -> None: assert lookup.asn("46.101.10.20").asn == 14061 +def test_prefix_aligned_range() -> None: + lookup = _fixture_lookup() + assert lookup.asn("8.8.8.8").prefix == "8.8.8.0/24" + assert lookup.asn("8.8.8.0").prefix == "8.8.8.0/24" + assert lookup.asn("8.8.8.255").prefix == "8.8.8.0/24" + + +def test_prefix_aligned_16() -> None: + lookup = _fixture_lookup() + assert lookup.asn("46.101.10.20").prefix == "46.101.0.0/16" + + +def test_prefix_non_power_of_two_range() -> None: + # 1.0.0.0–1.0.0.191 spans /25 (0-127) and /26 (128-191) + lookup = AsnLookup.from_ranges([ + (_ip("1.0.0.0"), _ip("1.0.0.191"), AsnInfo(13335, "CF")), + ]) + assert lookup.asn("1.0.0.5").prefix == "1.0.0.0/25" + assert lookup.asn("1.0.0.130").prefix == "1.0.0.128/26" + + +def test_prefix_single_host_range() -> None: + lookup = AsnLookup.from_ranges([ + (_ip("1.2.3.4"), _ip("1.2.3.4"), AsnInfo(1, "X")), + ]) + assert lookup.asn("1.2.3.4").prefix == "1.2.3.4/32" + + +def test_prefix_not_set_on_miss() -> None: + lookup = _fixture_lookup() + assert lookup.asn("9.0.0.0") is None + + def test_asn_misses_gap() -> None: lookup = _fixture_lookup() assert lookup.asn("9.0.0.0") is None diff --git a/tests/asn/test_provider.py b/tests/asn/test_provider.py index 707befc7..d26ddfcf 100644 --- a/tests/asn/test_provider.py +++ b/tests/asn/test_provider.py @@ -67,16 +67,17 @@ def test_enrich_ip_short_circuits_when_disabled(monkeypatch: pytest.MonkeyPatch) import decnet.asn as asn monkeypatch.setenv("DECNET_ASN_ENABLED", "false") - assert asn.enrich_ip("8.8.8.8") == (None, None, None) + assert asn.enrich_ip("8.8.8.8") == (None, None, None, None) -def test_enrich_ip_returns_asn_and_source(tmp_path: Path) -> None: +def test_enrich_ip_returns_asn_prefix_and_source(tmp_path: Path) -> None: from decnet.asn import enrich_ip _seed_fixture(tmp_path) - asn, name, src = enrich_ip("8.8.8.8") + asn, name, prefix, src = enrich_ip("8.8.8.8") assert asn == 15169 assert name == "GOOGLE" + assert prefix == "8.8.8.0/24" assert src == "iptoasn" @@ -84,7 +85,7 @@ def test_enrich_ip_private_returns_none(tmp_path: Path) -> None: from decnet.asn import enrich_ip _seed_fixture(tmp_path) - assert enrich_ip("192.168.1.1") == (None, None, None) + assert enrich_ip("192.168.1.1") == (None, None, None, None) def test_enrich_ip_unannounced_returns_none(tmp_path: Path) -> None: @@ -92,4 +93,4 @@ def test_enrich_ip_unannounced_returns_none(tmp_path: Path) -> None: _seed_fixture(tmp_path) # 9.0.0.0 isn't in our fixture range — no BGP announcement we know of. - assert enrich_ip("9.0.0.0") == (None, None, None) + assert enrich_ip("9.0.0.0") == (None, None, None, None)