feat(geoip): country-code enrichment via RIR delegated-stats

Populates Attacker.country_code + country_source (MVP) using the five
RIR delegated-stats files (ARIN/RIPE/APNIC/LACNIC/AFRINIC). Offline,
license-free, no outbound traffic that could burn honeypot stealth.

- decnet.geoip package with factory/base/lookup + rir/ subpackage
  (fetch/parse/provider) mirroring the db + bus factory convention
- Profiler._build_record calls enrich_ip on every upsert
- Idempotent ALTER TABLE migrations for both SQLite and MySQL
- decnet geoip refresh/lookup CLI (master-only)
- /var/lib/decnet/geoip seeded by decnet init
- DECNET_GEOIP_ENABLED=false kill-switch; set in tests/conftest.py so
  unit tests never trigger the first-access fetch
This commit is contained in:
2026-04-23 21:12:38 -04:00
parent 07bf3dc8cb
commit ffc275f051
24 changed files with 969 additions and 6 deletions

0
tests/geoip/__init__.py Normal file
View File

25
tests/geoip/conftest.py Normal file
View File

@@ -0,0 +1,25 @@
"""Per-package fixtures — flip DECNET_GEOIP_ENABLED back on for geoip tests
and point the provider at a tmp dir so no real /var/lib/decnet paths get
touched and no real RIR URL gets fetched.
"""
from __future__ import annotations
import os
from pathlib import Path
import pytest
@pytest.fixture(autouse=True)
def _geoip_sandbox(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path:
monkeypatch.setenv("DECNET_GEOIP_ENABLED", "true")
monkeypatch.setenv("DECNET_GEOIP_ROOT", str(tmp_path))
# Reset module-level caches so the env swap takes effect.
import decnet.geoip as _g
import decnet.geoip.factory as _f
import decnet.geoip.paths as _p
monkeypatch.setattr(_p, "GEOIP_ROOT", tmp_path)
_g._lookup = None
_g._provider_name = None
_f.reset_cache()
return tmp_path

View File

@@ -0,0 +1,76 @@
"""Lookup index tests."""
from __future__ import annotations
import ipaddress
from pathlib import Path
from decnet.geoip.lookup import Lookup
def _ip(s: str) -> int:
return int(ipaddress.IPv4Address(s))
def _fixture_lookup() -> Lookup:
return Lookup.from_ranges([
(_ip("8.8.8.0"), _ip("8.8.8.255"), "US"),
(_ip("85.214.0.0"), _ip("85.214.255.255"), "DE"),
(_ip("46.101.0.0"), _ip("46.101.255.255"), "GB"),
])
def test_country_hits_known_ranges() -> None:
lookup = _fixture_lookup()
assert lookup.country("8.8.8.8") == "US"
assert lookup.country("85.214.128.1") == "DE"
assert lookup.country("46.101.10.20") == "GB"
def test_country_misses_gap() -> None:
lookup = _fixture_lookup()
# 9.0.0.0 sits between our fixtures — not in any range.
assert lookup.country("9.0.0.0") is None
def test_country_private_loopback_returns_none() -> None:
lookup = _fixture_lookup()
for ip in ("10.0.0.1", "192.168.1.1", "172.16.0.1", "127.0.0.1", "0.0.0.0"):
assert lookup.country(ip) is None, ip
def test_country_ipv6_returns_none() -> None:
lookup = _fixture_lookup()
assert lookup.country("2001:db8::1") is None
assert lookup.country("::1") is None
def test_country_invalid_returns_none() -> None:
lookup = _fixture_lookup()
assert lookup.country("not-an-ip") is None
assert lookup.country("") is None
assert lookup.country("999.1.1.1") is None
def test_lookup_roundtrips_through_pickle(tmp_path: Path) -> None:
lookup = _fixture_lookup()
cache = tmp_path / "idx.pkl"
lookup.save(cache)
loaded = Lookup.load(cache)
assert len(loaded) == len(lookup)
assert loaded.country("8.8.8.8") == "US"
def test_from_ranges_last_writer_wins_on_collision() -> None:
lookup = Lookup.from_ranges([
(_ip("1.0.0.0"), _ip("1.0.0.255"), "AU"),
(_ip("1.0.0.0"), _ip("1.0.0.255"), "CN"),
])
# Sorted by (start, end); last wins.
assert lookup.country("1.0.0.5") == "CN"
def test_boundary_inclusive() -> None:
lookup = _fixture_lookup()
assert lookup.country("8.8.8.0") == "US"
assert lookup.country("8.8.8.255") == "US"
assert lookup.country("8.8.9.0") is None

66
tests/geoip/test_parse.py Normal file
View File

@@ -0,0 +1,66 @@
"""Parser tests for RIR delegated-stats files."""
from __future__ import annotations
import ipaddress
from pathlib import Path
from decnet.geoip.rir.parse import parse_file
_FIXTURE = """\
2|ripencc|20260420|230000|19830101|20260419|+0000
ripencc|*|asn|*|35000|summary
ripencc|*|ipv4|*|25000|summary
ripencc|DE|ipv4|85.214.0.0|65536|20060814|allocated|abc
ripencc|GB|ipv4|46.101.0.0|65536|20120101|assigned|def
ripencc|FR|ipv6|2001:db8::|32|20100101|allocated|ghi
ripencc|*|ipv4|5.0.0.0|256|20200101|reserved|jkl
ripencc|ZZ|ipv4|6.0.0.0|256|20200101|allocated|mno
ripencc|ES|ipv4|*|0|20200101|allocated|pqr
# comment line
ripencc|IT|asn|12345|1|20100101|allocated|stu
arin|US|ipv4|8.8.8.0|256|20000101|allocated|xyz
"""
def test_parse_skips_non_ipv4_and_sentinels(tmp_path: Path) -> None:
fixture = tmp_path / "ripe.txt"
fixture.write_text(_FIXTURE)
ranges = list(parse_file(fixture))
ccs = {r[2] for r in ranges}
# v4 allocated/assigned with real country codes only.
assert ccs == {"DE", "GB", "US"}
def test_parse_range_boundaries(tmp_path: Path) -> None:
fixture = tmp_path / "arin.txt"
fixture.write_text(_FIXTURE)
ranges = [r for r in parse_file(fixture) if r[2] == "US"]
assert len(ranges) == 1
start, end, cc = ranges[0]
assert start == int(ipaddress.IPv4Address("8.8.8.0"))
assert end == int(ipaddress.IPv4Address("8.8.8.255"))
assert cc == "US"
def test_parse_lowercase_cc_is_uppercased(tmp_path: Path) -> None:
fixture = tmp_path / "apnic.txt"
fixture.write_text("apnic|jp|ipv4|1.0.0.0|256|19990101|allocated|abc\n")
ranges = list(parse_file(fixture))
assert ranges == [(int(ipaddress.IPv4Address("1.0.0.0")),
int(ipaddress.IPv4Address("1.0.0.255")),
"JP")]
def test_parse_malformed_lines_are_skipped(tmp_path: Path) -> None:
fixture = tmp_path / "broken.txt"
fixture.write_text(
"garbage\n"
"a|b|c\n"
"ripencc|DE|ipv4|not-an-ip|65536|20060814|allocated|abc\n"
"ripencc|DE|ipv4|85.214.0.0|not-a-count|20060814|allocated|abc\n"
"ripencc|DE|ipv4|85.214.0.0|65536|20060814|allocated|ok\n"
)
ranges = list(parse_file(fixture))
assert len(ranges) == 1
assert ranges[0][2] == "DE"

View File

@@ -0,0 +1,39 @@
"""_build_record must thread country fields through to the upsert payload."""
from __future__ import annotations
from datetime import datetime, timezone
from pathlib import Path
from decnet.correlation.parser import LogEvent
from decnet.geoip.rir.fetch import RIR_SOURCES
from decnet.profiler.worker import _build_record
def _evt(ip: str) -> LogEvent:
return LogEvent(
timestamp=datetime(2026, 4, 23, tzinfo=timezone.utc),
attacker_ip=ip,
decky="decky-01",
service="ssh",
event_type="conn",
fields={},
raw="",
)
def test_build_record_includes_country_when_resolved(tmp_path: Path) -> None:
(tmp_path / f"{RIR_SOURCES[0][0]}.txt").write_text(
"arin|US|ipv4|8.8.8.0|256|20000101|allocated|abc\n"
)
record = _build_record("8.8.8.8", [_evt("8.8.8.8")], None, [], [])
assert record["country_code"] == "US"
assert record["country_source"] == "rir"
def test_build_record_country_none_for_private(tmp_path: Path) -> None:
(tmp_path / f"{RIR_SOURCES[0][0]}.txt").write_text(
"arin|US|ipv4|8.8.8.0|256|20000101|allocated|abc\n"
)
record = _build_record("10.0.0.1", [_evt("10.0.0.1")], None, [], [])
assert record["country_code"] is None
assert record["country_source"] is None

View File

@@ -0,0 +1,103 @@
"""RirProvider + factory + public API tests."""
from __future__ import annotations
from pathlib import Path
import pytest
def test_factory_returns_rir_by_default() -> None:
from decnet.geoip.factory import get_provider
provider = get_provider()
assert provider.name == "rir"
def test_factory_rejects_unknown_provider(monkeypatch: pytest.MonkeyPatch) -> None:
from decnet.geoip import factory
monkeypatch.setenv("DECNET_GEOIP_PROVIDER", "nope")
factory.reset_cache()
with pytest.raises(ValueError):
factory.get_provider()
def test_factory_reserved_providers_raise(monkeypatch: pytest.MonkeyPatch) -> None:
from decnet.geoip import factory
for reserved in ("dbip", "maxmind"):
monkeypatch.setenv("DECNET_GEOIP_PROVIDER", reserved)
factory.reset_cache()
with pytest.raises(NotImplementedError):
factory.get_provider()
def test_provider_build_lookup_empty_when_no_files(tmp_path: Path) -> None:
from decnet.geoip.rir.provider import RirProvider
p = RirProvider()
lookup = p.build_lookup()
assert len(lookup) == 0
assert lookup.country("8.8.8.8") is None
def test_provider_build_lookup_reads_present_files(tmp_path: Path) -> None:
from decnet.geoip.rir.fetch import RIR_SOURCES
from decnet.geoip.rir.provider import RirProvider
# Drop one fake ARIN file — provider should pick it up.
arin_name = RIR_SOURCES[0][0]
(tmp_path / f"{arin_name}.txt").write_text(
"arin|US|ipv4|8.8.8.0|256|20000101|allocated|abc\n"
)
p = RirProvider()
lookup = p.build_lookup()
assert lookup.country("8.8.8.8") == "US"
def test_provider_uses_cache_when_fresh(tmp_path: Path) -> None:
from decnet.geoip.rir.fetch import RIR_SOURCES
from decnet.geoip.rir.provider import RirProvider
arin_name = RIR_SOURCES[0][0]
src = tmp_path / f"{arin_name}.txt"
src.write_text("arin|US|ipv4|8.8.8.0|256|20000101|allocated|abc\n")
p = RirProvider()
lookup_a = p.build_lookup()
assert (tmp_path / ".rir_index.pkl").exists()
# Rewrite the source file BUT keep its mtime older than the cache.
# We only test the fast path by rebuilding a new provider instance
# without mutating the source — cache should be used.
p2 = RirProvider()
lookup_b = p2.build_lookup()
assert len(lookup_b) == len(lookup_a)
def test_enrich_ip_short_circuits_when_disabled(monkeypatch: pytest.MonkeyPatch) -> None:
import decnet.geoip as geoip
monkeypatch.setenv("DECNET_GEOIP_ENABLED", "false")
assert geoip.enrich_ip("8.8.8.8") == (None, None)
def test_enrich_ip_returns_country_and_source(tmp_path: Path) -> None:
from decnet.geoip import enrich_ip
from decnet.geoip.rir.fetch import RIR_SOURCES
(tmp_path / f"{RIR_SOURCES[0][0]}.txt").write_text(
"arin|US|ipv4|8.8.8.0|256|20000101|allocated|abc\n"
)
cc, src = enrich_ip("8.8.8.8")
assert cc == "US"
assert src == "rir"
def test_enrich_ip_private_returns_none(tmp_path: Path) -> None:
from decnet.geoip import enrich_ip
from decnet.geoip.rir.fetch import RIR_SOURCES
(tmp_path / f"{RIR_SOURCES[0][0]}.txt").write_text(
"arin|US|ipv4|8.8.8.0|256|20000101|allocated|abc\n"
)
assert enrich_ip("192.168.1.1") == (None, None)