From ffc275f05174867c88ee777dd174613aafbbdd52 Mon Sep 17 00:00:00 2001 From: anti Date: Thu, 23 Apr 2026 21:12:38 -0400 Subject: [PATCH] feat(geoip): country-code enrichment via RIR delegated-stats Populates Attacker.country_code + country_source (MVP) using the five RIR delegated-stats files (ARIN/RIPE/APNIC/LACNIC/AFRINIC). Offline, license-free, no outbound traffic that could burn honeypot stealth. - decnet.geoip package with factory/base/lookup + rir/ subpackage (fetch/parse/provider) mirroring the db + bus factory convention - Profiler._build_record calls enrich_ip on every upsert - Idempotent ALTER TABLE migrations for both SQLite and MySQL - decnet geoip refresh/lookup CLI (master-only) - /var/lib/decnet/geoip seeded by decnet init - DECNET_GEOIP_ENABLED=false kill-switch; set in tests/conftest.py so unit tests never trigger the first-access fetch --- decnet/cli/__init__.py | 3 +- decnet/cli/gating.py | 2 +- decnet/cli/geoip.py | 59 +++++++++++ decnet/cli/init.py | 1 + decnet/geoip/__init__.py | 95 ++++++++++++++++++ decnet/geoip/base.py | 34 +++++++ decnet/geoip/factory.py | 47 +++++++++ decnet/geoip/lookup.py | 121 +++++++++++++++++++++++ decnet/geoip/paths.py | 19 ++++ decnet/geoip/rir/__init__.py | 9 ++ decnet/geoip/rir/fetch.py | 62 ++++++++++++ decnet/geoip/rir/parse.py | 70 +++++++++++++ decnet/geoip/rir/provider.py | 74 ++++++++++++++ decnet/profiler/worker.py | 4 + decnet/web/db/models/attackers.py | 4 + decnet/web/db/mysql/repository.py | 22 ++++- decnet/web/db/sqlite/repository.py | 24 ++++- tests/conftest.py | 16 +++ tests/geoip/__init__.py | 0 tests/geoip/conftest.py | 25 +++++ tests/geoip/test_lookup.py | 76 ++++++++++++++ tests/geoip/test_parse.py | 66 +++++++++++++ tests/geoip/test_profiler_integration.py | 39 ++++++++ tests/geoip/test_provider.py | 103 +++++++++++++++++++ 24 files changed, 969 insertions(+), 6 deletions(-) create mode 100644 decnet/cli/geoip.py create mode 100644 decnet/geoip/__init__.py create mode 100644 decnet/geoip/base.py create mode 100644 decnet/geoip/factory.py create mode 100644 decnet/geoip/lookup.py create mode 100644 decnet/geoip/paths.py create mode 100644 decnet/geoip/rir/__init__.py create mode 100644 decnet/geoip/rir/fetch.py create mode 100644 decnet/geoip/rir/parse.py create mode 100644 decnet/geoip/rir/provider.py create mode 100644 tests/geoip/__init__.py create mode 100644 tests/geoip/conftest.py create mode 100644 tests/geoip/test_lookup.py create mode 100644 tests/geoip/test_parse.py create mode 100644 tests/geoip/test_profiler_integration.py create mode 100644 tests/geoip/test_provider.py diff --git a/decnet/cli/__init__.py b/decnet/cli/__init__.py index 3576daa4..c51e7c25 100644 --- a/decnet/cli/__init__.py +++ b/decnet/cli/__init__.py @@ -25,6 +25,7 @@ from . import ( db, deploy, forwarder, + geoip, init, inventory, lifecycle, @@ -53,7 +54,7 @@ for _mod in ( swarm, deploy, lifecycle, workers, inventory, web, profiler, sniffer, db, - topology, bus, init, + topology, bus, geoip, init, ): _mod.register(app) diff --git a/decnet/cli/gating.py b/decnet/cli/gating.py index 3da191a9..a685cc6b 100644 --- a/decnet/cli/gating.py +++ b/decnet/cli/gating.py @@ -31,7 +31,7 @@ MASTER_ONLY_COMMANDS: frozenset[str] = frozenset({ "services", "distros", "correlate", "archetypes", "web", "db-reset", "init", }) -MASTER_ONLY_GROUPS: frozenset[str] = frozenset({"swarm", "topology"}) +MASTER_ONLY_GROUPS: frozenset[str] = frozenset({"swarm", "topology", "geoip"}) def _agent_mode_active() -> bool: diff --git a/decnet/cli/geoip.py b/decnet/cli/geoip.py new file mode 100644 index 00000000..7ff90a3e --- /dev/null +++ b/decnet/cli/geoip.py @@ -0,0 +1,59 @@ +"""GeoIP CLI — refresh and lookup subcommands (master-only). + +Usage:: + + decnet geoip refresh # re-download RIR files and rebuild the index + decnet geoip lookup 8.8.8.8 # one-shot IP -> country dump +""" +from __future__ import annotations + +import typer + +from .gating import _require_master_mode +from .utils import console, log + +_group = typer.Typer( + name="geoip", + help="GeoIP provider management (master only).", + no_args_is_help=True, +) + + +@_group.command("refresh") +def _refresh() -> None: + """Force re-download of the GeoIP provider data and rebuild the index.""" + _require_master_mode("geoip refresh") + from decnet.geoip import get_lookup + from decnet.geoip.factory import get_provider + + provider = get_provider() + log.info("geoip: forcing refresh via %s provider", provider.name) + console.print(f"[bold cyan]Refreshing {provider.name} GeoIP data…[/]") + try: + lookup = get_lookup(force_refresh=True) + except Exception as exc: # noqa: BLE001 + console.print(f"[red]refresh failed: {exc}[/]") + raise typer.Exit(1) from exc + console.print( + f"[green]OK[/] {provider.name} index rebuilt " + f"({len(lookup)} ranges)." + ) + + +@_group.command("lookup") +def _lookup( + ip: str = typer.Argument(..., help="IP address to resolve."), +) -> None: + """Print the country code for an IP (or 'unknown').""" + _require_master_mode("geoip lookup") + from decnet.geoip import enrich_ip + + cc, source = enrich_ip(ip) + if cc is None: + console.print(f"{ip} [yellow]unknown[/]") + raise typer.Exit(0) + console.print(f"{ip} [green]cc={cc}[/] source={source}") + + +def register(app: typer.Typer) -> None: + app.add_typer(_group, name="geoip") diff --git a/decnet/cli/init.py b/decnet/cli/init.py index b128b9eb..323b3eca 100644 --- a/decnet/cli/init.py +++ b/decnet/cli/init.py @@ -607,6 +607,7 @@ def register(app: typer.Typer) -> None: dirs = [ (pfx / _install_rel, 0o755, user, group), (pfx / "var/lib/decnet", 0o750, user, group), + (pfx / "var/lib/decnet/geoip", 0o755, user, group), (pfx / "var/log/decnet", 0o750, user, group), (etc_decnet, 0o755, "root", group), (pfx / "run/decnet", 0o755, "root", group), diff --git a/decnet/geoip/__init__.py b/decnet/geoip/__init__.py new file mode 100644 index 00000000..39704e20 --- /dev/null +++ b/decnet/geoip/__init__.py @@ -0,0 +1,95 @@ +""" +GeoIP enrichment — maps attacker IPs to country codes for attacker intelligence. + +Public surface: + +* :func:`get_lookup` — returns the singleton :class:`~decnet.geoip.lookup.Lookup`. + Builds / loads the index on first call. Refreshes the underlying data files + if they're missing or older than :data:`REFRESH_INTERVAL_S`. +* :func:`enrich_ip` — convenience wrapper used by the profiler: takes an IP + string, returns ``(country_code, provider_name)`` or ``(None, None)``. + +Provider selection goes through :func:`~decnet.geoip.factory.get_provider` +(env ``DECNET_GEOIP_PROVIDER``, default ``rir``). Direct imports of concrete +providers are forbidden — mirrors the ``get_bus`` / ``get_repository`` rule. +""" +from __future__ import annotations + +import os +import time +from typing import Optional, Tuple + +from decnet.geoip.factory import get_provider +from decnet.geoip.lookup import Lookup +from decnet.geoip.paths import GEOIP_ROOT + +# 24 h — delegated-stats files are refreshed daily by the RIRs. +REFRESH_INTERVAL_S = 86_400 + +_lookup: Optional[Lookup] = None +_provider_name: Optional[str] = None + + +def get_lookup(*, force_refresh: bool = False) -> Lookup: + """Return the cached :class:`Lookup`, building it on first use. + + If the provider's data files are missing or older than + ``REFRESH_INTERVAL_S`` seconds, refresh before building. Pass + ``force_refresh=True`` to bypass the age check (used by + ``decnet geoip refresh``). + """ + global _lookup, _provider_name + provider = get_provider() + _provider_name = provider.name + + if force_refresh or _files_stale(provider): + provider.refresh() + _lookup = None # rebuild on next access + + if _lookup is None: + _lookup = provider.build_lookup() + return _lookup + + +def enrich_ip(ip: str) -> Tuple[Optional[str], Optional[str]]: + """Return ``(country_code, provider_name)`` or ``(None, None)``. + + Never raises — any lookup failure collapses to ``(None, None)`` so the + caller (profiler) can upsert the attacker row regardless. + + ``DECNET_GEOIP_ENABLED=false`` short-circuits the whole path, useful + for tests / agent hosts / ops wanting to disable enrichment without + touching provider config. + """ + if os.environ.get("DECNET_GEOIP_ENABLED", "true").lower() == "false": + return (None, None) + try: + lookup = get_lookup() + cc = lookup.country(ip) + if cc is None: + return (None, None) + return (cc, _provider_name or "unknown") + except Exception: + return (None, None) + + +def _files_stale(provider) -> bool: + """True when the provider has no fresh data on disk. + + "Fresh" = at least one data file exists whose mtime is within the + refresh window. We don't demand every RIR file be present: a + partial cache still produces correct answers for the ranges it + covers, and demanding all-or-nothing would trigger a network + refresh every time one RIR endpoint was transiently unreachable. + """ + paths = provider.data_paths() + if not paths: + return True + now = time.time() + for p in paths: + if p.exists() and now - p.stat().st_mtime <= REFRESH_INTERVAL_S: + return False + return True + + +__all__ = ["get_lookup", "enrich_ip", "GEOIP_ROOT", "REFRESH_INTERVAL_S"] diff --git a/decnet/geoip/base.py b/decnet/geoip/base.py new file mode 100644 index 00000000..8cef8b68 --- /dev/null +++ b/decnet/geoip/base.py @@ -0,0 +1,34 @@ +"""GeoIP provider protocol. + +Concrete providers (:mod:`decnet.geoip.rir`, future ``dbip``, ``maxmind``) +implement this. Callers must go through +:func:`~decnet.geoip.factory.get_provider`; never import a concrete +provider class directly. +""" +from __future__ import annotations + +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Sequence + +from decnet.geoip.lookup import Lookup + + +class Provider(ABC): + """Abstract GeoIP data provider.""" + + #: Short tag written to ``Attacker.country_source`` (e.g. ``'rir'``). + name: str + + @abstractmethod + def refresh(self) -> None: + """Download / regenerate the provider's raw data files.""" + + @abstractmethod + def build_lookup(self) -> Lookup: + """Parse the on-disk data files and return a ready-to-query Lookup.""" + + @abstractmethod + def data_paths(self) -> Sequence[Path]: + """Return the list of files this provider manages — used for staleness + detection. Order is not significant.""" diff --git a/decnet/geoip/factory.py b/decnet/geoip/factory.py new file mode 100644 index 00000000..0de93e21 --- /dev/null +++ b/decnet/geoip/factory.py @@ -0,0 +1,47 @@ +"""GeoIP provider factory. + +Dispatch key: ``DECNET_GEOIP_PROVIDER`` (default ``rir``). Lazy singleton, +same shape as :func:`decnet.bus.factory.get_bus`. + +MVP wires only the RIR provider. ``dbip`` and ``maxmind`` slots are +reserved and raise :class:`NotImplementedError` until their subpackages +land. +""" +from __future__ import annotations + +import os +from typing import Optional + +from decnet.geoip.base import Provider + +_cached: Optional[Provider] = None +_cached_key: Optional[str] = None + + +def get_provider() -> Provider: + """Return the configured :class:`Provider` singleton.""" + global _cached, _cached_key + key = os.environ.get("DECNET_GEOIP_PROVIDER", "rir").lower() + if _cached is not None and _cached_key == key: + return _cached + + if key == "rir": + from decnet.geoip.rir.provider import RirProvider + provider: Provider = RirProvider() + elif key in {"dbip", "maxmind"}: + raise NotImplementedError( + f"GeoIP provider {key!r} is not wired yet; only 'rir' ships in MVP." + ) + else: + raise ValueError(f"Unsupported GeoIP provider: {key!r}") + + _cached = provider + _cached_key = key + return provider + + +def reset_cache() -> None: + """Forget the singleton — tests swap providers via the env var.""" + global _cached, _cached_key + _cached = None + _cached_key = None diff --git a/decnet/geoip/lookup.py b/decnet/geoip/lookup.py new file mode 100644 index 00000000..a04eb6c7 --- /dev/null +++ b/decnet/geoip/lookup.py @@ -0,0 +1,121 @@ +"""Provider-agnostic country lookup. + +A :class:`Lookup` is a frozen, sorted array of (start_ip, end_ip, cc) +ranges queried via :mod:`bisect`. O(log n) on ~200k ranges. + +Private/loopback/invalid IPv4 and all IPv6 addresses resolve to +``None`` — honeypots hit plenty of RFC1918 traffic from our own probes, +and IPv6 country-mapping is explicitly out of MVP scope. +""" +from __future__ import annotations + +import bisect +import ipaddress +import pickle # nosec B403 — self-produced cache under /var/lib/decnet, never deserialized from untrusted input +from dataclasses import dataclass +from pathlib import Path +from typing import Iterable, Iterator, List, Optional, Tuple + +Range = Tuple[int, int, str] + + +@dataclass +class Lookup: + """Indexed country lookup over IPv4 ranges.""" + + # Parallel arrays for bisect: _starts[i] is the start-IP of the i-th + # range, _ends[i] its inclusive end, _ccs[i] its country code. + _starts: List[int] + _ends: List[int] + _ccs: List[str] + + @classmethod + def from_ranges(cls, ranges: Iterable[Range]) -> "Lookup": + """Build a Lookup from (start, end_inclusive, cc) triples. + + Ranges are sorted by start; overlapping ranges are resolved + last-writer-wins when both starts collide. Non-overlapping + adjacency is preserved. + """ + sorted_ranges = sorted(ranges, key=lambda r: (r[0], r[1])) + starts: List[int] = [] + ends: List[int] = [] + ccs: List[str] = [] + for start, end, cc in sorted_ranges: + if starts and starts[-1] == start: + ends[-1] = end + ccs[-1] = cc + continue + starts.append(start) + ends.append(end) + ccs.append(cc) + return cls(starts, ends, ccs) + + def country(self, ip: str) -> Optional[str]: + """Return the 2-letter ISO country code for ``ip`` or ``None``. + + ``None`` on: IPv6, private/loopback/link-local/multicast/reserved + addresses, malformed strings, and IPs outside every known range. + """ + try: + addr = ipaddress.ip_address(ip) + except ValueError: + return None + if isinstance(addr, ipaddress.IPv6Address): + return None + if ( + addr.is_private + or addr.is_loopback + or addr.is_link_local + or addr.is_multicast + or addr.is_reserved + or addr.is_unspecified + ): + return None + + n = int(addr) + # bisect_right gives the first start > n; the candidate range is + # the one immediately before it. + idx = bisect.bisect_right(self._starts, n) - 1 + if idx < 0: + return None + if n <= self._ends[idx]: + return self._ccs[idx] + return None + + def __len__(self) -> int: + return len(self._starts) + + # ---------- persistence ---------- + + def save(self, path: Path) -> None: + """Pickle the lookup to *path* (atomic rename).""" + tmp = path.with_suffix(path.suffix + ".tmp") + tmp.parent.mkdir(parents=True, exist_ok=True) + with tmp.open("wb") as fh: + pickle.dump( + { + "version": 1, + "starts": self._starts, + "ends": self._ends, + "ccs": self._ccs, + }, + fh, + protocol=pickle.HIGHEST_PROTOCOL, + ) + tmp.replace(path) + + @classmethod + def load(cls, path: Path) -> "Lookup": + """Load a pickled lookup from *path*.""" + with path.open("rb") as fh: + data = pickle.load(fh) # nosec B301 — self-produced file under /var/lib/decnet + if data.get("version") != 1: + raise ValueError(f"unsupported lookup index version: {data.get('version')!r}") + return cls(data["starts"], data["ends"], data["ccs"]) + + +def iter_ranges(items: Iterable[Range]) -> Iterator[Range]: + """Passthrough helper — kept so providers can compose iterators without + importing private symbols.""" + yield from items diff --git a/decnet/geoip/paths.py b/decnet/geoip/paths.py new file mode 100644 index 00000000..c6616ec1 --- /dev/null +++ b/decnet/geoip/paths.py @@ -0,0 +1,19 @@ +"""Filesystem layout for GeoIP data. + +``GEOIP_ROOT`` is where providers drop their raw files and cache indexes. +Default ``/var/lib/decnet/geoip`` — ``decnet init`` seeds the directory +with ``decnet:decnet`` ownership, mode 0755. Override with +``DECNET_GEOIP_ROOT`` for test harnesses. +""" +from __future__ import annotations + +import os +from pathlib import Path + +GEOIP_ROOT = Path(os.environ.get("DECNET_GEOIP_ROOT", "/var/lib/decnet/geoip")) + + +def ensure_root() -> Path: + """Create ``GEOIP_ROOT`` if absent and return it. No-op if present.""" + GEOIP_ROOT.mkdir(parents=True, exist_ok=True) + return GEOIP_ROOT diff --git a/decnet/geoip/rir/__init__.py b/decnet/geoip/rir/__init__.py new file mode 100644 index 00000000..3c2324e4 --- /dev/null +++ b/decnet/geoip/rir/__init__.py @@ -0,0 +1,9 @@ +"""RIR delegated-stats provider. + +Free, offline, no license: each Regional Internet Registry publishes a +daily plaintext file mapping IPv4 allocations to countries. Together the +five RIR files cover the entire assigned IPv4 space. + +Direct imports of :class:`RirProvider` are discouraged — go through +:func:`decnet.geoip.factory.get_provider`. +""" diff --git a/decnet/geoip/rir/fetch.py b/decnet/geoip/rir/fetch.py new file mode 100644 index 00000000..063a2c7f --- /dev/null +++ b/decnet/geoip/rir/fetch.py @@ -0,0 +1,62 @@ +"""RIR delegated-stats download. + +Five public files, ~5 MB total. Pulled over HTTPS with a generic +User-Agent (stealth: never identify as DECNET — a RIR log scraper could +otherwise correlate our egress to a honeypot operator). +""" +from __future__ import annotations + +import logging +import shutil +import urllib.request +from pathlib import Path +from typing import Tuple + +logger = logging.getLogger("decnet.geoip.rir.fetch") + +# (registry_name, url). Extended delegated-stats include the opaque +# registration ID we don't use, but they are what the RIRs recommend +# consumers pull. +RIR_SOURCES: Tuple[Tuple[str, str], ...] = ( + ("arin", "https://ftp.arin.net/pub/stats/arin/delegated-arin-extended-latest"), + ("ripe", "https://ftp.ripe.net/pub/stats/ripencc/delegated-ripencc-extended-latest"), + ("apnic", "https://ftp.apnic.net/stats/apnic/delegated-apnic-extended-latest"), + ("lacnic", "https://ftp.lacnic.net/pub/stats/lacnic/delegated-lacnic-extended-latest"), + ("afrinic", "https://ftp.afrinic.net/pub/stats/afrinic/delegated-afrinic-extended-latest"), +) + +# Generic UA — no DECNET/honeypot token. Matches what a stock requests/ +# urllib script would send if someone forgot to set one. +_USER_AGENT = "Mozilla/5.0 (compatible; fetch/1.0)" +_TIMEOUT_S = 60 + + +def fetch_all(dest: Path) -> list[Path]: + """Download every RIR file into *dest*. Returns the written paths. + + Atomic per file: we download to ``{name}.txt.tmp`` then rename. A + partial failure leaves the previous generation intact. + """ + dest.mkdir(parents=True, exist_ok=True) + written: list[Path] = [] + for name, url in RIR_SOURCES: + target = dest / f"{name}.txt" + tmp = target.with_suffix(".txt.tmp") + try: + _download(url, tmp) + tmp.replace(target) + written.append(target) + logger.info("geoip.rir: fetched %s (%d bytes)", name, target.stat().st_size) + except Exception as exc: + logger.error("geoip.rir: fetch failed for %s (%s): %s", name, url, exc) + if tmp.exists(): + tmp.unlink(missing_ok=True) + # Keep any stale previous file — better outdated than empty. + return written + + +def _download(url: str, dest: Path) -> None: + req = urllib.request.Request(url, headers={"User-Agent": _USER_AGENT}) + # `with` closes the response + dest file on any path. + with urllib.request.urlopen(req, timeout=_TIMEOUT_S) as resp, dest.open("wb") as fh: # nosec B310 — fixed https RIR URLs + shutil.copyfileobj(resp, fh) diff --git a/decnet/geoip/rir/parse.py b/decnet/geoip/rir/parse.py new file mode 100644 index 00000000..ad078775 --- /dev/null +++ b/decnet/geoip/rir/parse.py @@ -0,0 +1,70 @@ +"""Parser for RIR ``delegated-*-extended`` files. + +Line shape (the bits we care about):: + + ripencc|DE|ipv4|85.214.0.0|65536|20060814|allocated| + +Fields: registry, country, type (ipv4/ipv6/asn), start, count, date, +status, id. We emit one ``(start_int, end_int_inclusive, country)`` +tuple per ``ipv4||...|allocated|assigned`` row. + +Rows skipped: + +* ``ipv6`` and ``asn`` types — IPv6 is out of MVP scope, ASN is a + different table. +* ``summary`` / ``version`` header lines (registry|*|*|*|*|summary). +* Rows with status ``reserved`` / ``available`` — no country assigned. +* Rows with country ``*`` or ``ZZ`` — sentinel for unassigned space. +* Rows where count is not a valid power-of-two-ish positive integer + (the RIR files are usually tidy, but defensive). +""" +from __future__ import annotations + +import ipaddress +import logging +from pathlib import Path +from typing import Iterator, Tuple + +Range = Tuple[int, int, str] + +logger = logging.getLogger("decnet.geoip.rir.parse") + +_VALID_STATUSES = frozenset({"allocated", "assigned"}) +_SENTINEL_CCS = frozenset({"*", "ZZ", ""}) + + +def parse_file(path: Path) -> Iterator[Range]: + """Yield ``(start_int, end_int_inclusive, cc)`` for every ipv4 row.""" + with path.open("r", encoding="utf-8", errors="replace") as fh: + for lineno, raw in enumerate(fh, 1): + line = raw.strip() + if not line or line.startswith("#"): + continue + parts = line.split("|") + if len(parts) < 7: + continue + _registry, cc, rtype, start, count, _date, status = parts[:7] + + if rtype != "ipv4": + continue + if status not in _VALID_STATUSES: + continue + if cc in _SENTINEL_CCS: + continue + # summary header carries type=ipv4 but start=='*' and status + # =='summary' — already filtered by _VALID_STATUSES, but + # keep the guard for defensiveness. + if start in ("*", ""): + continue + + try: + start_int = int(ipaddress.IPv4Address(start)) + n = int(count) + except (ValueError, ipaddress.AddressValueError): + logger.debug("geoip.rir: skipping malformed line %d in %s", lineno, path.name) + continue + if n <= 0: + continue + + end_int = start_int + n - 1 + yield (start_int, end_int, cc.upper()) diff --git a/decnet/geoip/rir/provider.py b/decnet/geoip/rir/provider.py new file mode 100644 index 00000000..87ee53d4 --- /dev/null +++ b/decnet/geoip/rir/provider.py @@ -0,0 +1,74 @@ +"""RIR provider — orchestrates fetch + parse into a :class:`Lookup`.""" +from __future__ import annotations + +import logging +from pathlib import Path +from typing import Sequence + +from decnet.geoip.base import Provider +from decnet.geoip.lookup import Lookup +from decnet.geoip.paths import ensure_root +from decnet.geoip.rir.fetch import RIR_SOURCES, fetch_all +from decnet.geoip.rir.parse import parse_file + +logger = logging.getLogger("decnet.geoip.rir.provider") + +# Pickled lookup cache — skips re-parsing ~5 MB of RIR text on every +# profiler restart. Rebuilt whenever any raw file is newer than the +# cache, see ``_cache_fresh``. +_CACHE_NAME = ".rir_index.pkl" + + +class RirProvider(Provider): + name = "rir" + + def __init__(self) -> None: + self._root = ensure_root() + + # ---------- Provider interface ---------- + + def refresh(self) -> None: + logger.info("geoip.rir: refreshing delegated-stats files into %s", self._root) + fetch_all(self._root) + # Invalidate the cache — next build_lookup regenerates it. + cache = self._root / _CACHE_NAME + if cache.exists(): + cache.unlink(missing_ok=True) + + def build_lookup(self) -> Lookup: + cache = self._root / _CACHE_NAME + if self._cache_fresh(cache): + try: + lookup = Lookup.load(cache) + logger.debug("geoip.rir: loaded cached index (%d ranges)", len(lookup)) + return lookup + except Exception as exc: + logger.warning("geoip.rir: cache load failed, rebuilding: %s", exc) + + ranges = [] + for path in self.data_paths(): + if not path.exists(): + continue + ranges.extend(parse_file(path)) + lookup = Lookup.from_ranges(ranges) + try: + lookup.save(cache) + except Exception as exc: + logger.warning("geoip.rir: cache save failed: %s", exc) + logger.info("geoip.rir: built index with %d ranges", len(lookup)) + return lookup + + def data_paths(self) -> Sequence[Path]: + return [self._root / f"{name}.txt" for name, _url in RIR_SOURCES] + + # ---------- internals ---------- + + def _cache_fresh(self, cache: Path) -> bool: + """True when the pickle exists and is at least as new as every raw file.""" + if not cache.exists(): + return False + cache_mtime = cache.stat().st_mtime + for path in self.data_paths(): + if path.exists() and path.stat().st_mtime > cache_mtime: + return False + return True diff --git a/decnet/profiler/worker.py b/decnet/profiler/worker.py index 99de0a0b..509a91d7 100644 --- a/decnet/profiler/worker.py +++ b/decnet/profiler/worker.py @@ -29,6 +29,7 @@ from decnet.bus.publish import ( ) from decnet.correlation.engine import CorrelationEngine from decnet.correlation.parser import LogEvent +from decnet.geoip import enrich_ip from decnet.logging import get_logger from decnet.profiler.behavioral import build_behavior_record from decnet.telemetry import traced as _traced, get_tracer as _get_tracer @@ -251,6 +252,7 @@ def _build_record( ) fingerprints = [b for b in bounties if b.get("bounty_type") == "fingerprint"] credential_count = sum(1 for b in bounties if b.get("bounty_type") == "credential") + country_code, country_source = enrich_ip(ip) return { "ip": ip, @@ -267,6 +269,8 @@ def _build_record( "credential_count": credential_count, "fingerprints": json.dumps(fingerprints), "commands": json.dumps(commands), + "country_code": country_code, + "country_source": country_source, "updated_at": datetime.now(timezone.utc), } diff --git a/decnet/web/db/models/attackers.py b/decnet/web/db/models/attackers.py index c612b077..9f2a4274 100644 --- a/decnet/web/db/models/attackers.py +++ b/decnet/web/db/models/attackers.py @@ -39,6 +39,10 @@ class Attacker(SQLModel, table=True): commands: str = Field( default="[]", sa_column=Column("commands", _BIG_TEXT, nullable=False, default="[]") ) # JSON list[dict] — commands per service/decky + # GeoIP enrichment (populated by the profiler from decnet.geoip.enrich_ip). + # Nullable because private / loopback / IPv6 sources never resolve. + country_code: Optional[str] = Field(default=None, max_length=2, index=True) + country_source: Optional[str] = Field(default=None, max_length=16) updated_at: datetime = Field( default_factory=lambda: datetime.now(timezone.utc), index=True ) diff --git a/decnet/web/db/mysql/repository.py b/decnet/web/db/mysql/repository.py index f83b4bf9..8547061a 100644 --- a/decnet/web/db/mysql/repository.py +++ b/decnet/web/db/mysql/repository.py @@ -35,16 +35,32 @@ class MySQLRepository(SQLModelRepository): async def _migrate_attackers_table(self) -> None: """Drop the legacy (pre-UUID) ``attackers`` table if it exists without a ``uuid`` column. - MySQL exposes column metadata via ``information_schema.COLUMNS``. - ``DATABASE()`` scopes the lookup to the currently connected schema. + Also adds the GeoIP columns (``country_code``, ``country_source``) + to existing tables that predate them. MySQL exposes column + metadata via ``information_schema.COLUMNS``; ``DATABASE()`` scopes + the lookup to the currently connected schema. """ async with self.engine.begin() as conn: rows = (await conn.execute(text( "SELECT COLUMN_NAME FROM information_schema.COLUMNS " "WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'attackers'" ))).fetchall() - if rows and not any(r[0] == "uuid" for r in rows): + if not rows: + return # table absent; create_all() handles it. + if not any(r[0] == "uuid" for r in rows): await conn.execute(text("DROP TABLE attackers")) + return + existing_cols = {r[0] for r in rows} + if "country_code" not in existing_cols: + await conn.execute(text( + "ALTER TABLE attackers " + "ADD COLUMN country_code VARCHAR(2) NULL, " + "ADD INDEX ix_attackers_country_code (country_code)" + )) + if "country_source" not in existing_cols: + await conn.execute(text( + "ALTER TABLE attackers ADD COLUMN country_source VARCHAR(16) NULL" + )) async def _migrate_column_types(self) -> None: """Upgrade TEXT → MEDIUMTEXT for columns that accumulate large JSON blobs. diff --git a/decnet/web/db/sqlite/repository.py b/decnet/web/db/sqlite/repository.py index 5965d0be..e920e94f 100644 --- a/decnet/web/db/sqlite/repository.py +++ b/decnet/web/db/sqlite/repository.py @@ -26,11 +26,33 @@ class SQLiteRepository(SQLModelRepository): ) async def _migrate_attackers_table(self) -> None: - """Drop the old attackers table if it lacks the uuid column (pre-UUID schema).""" + """Drop the old attackers table if it lacks the uuid column (pre-UUID schema). + + Also adds the GeoIP columns (``country_code``, ``country_source``) + to existing tables that predate them. SQLite's + ``ALTER TABLE ADD COLUMN`` is idempotent only if we gate on + ``PRAGMA table_info`` first — re-adding raises. + """ async with self.engine.begin() as conn: rows = (await conn.execute(text("PRAGMA table_info(attackers)"))).fetchall() if rows and not any(r[1] == "uuid" for r in rows): await conn.execute(text("DROP TABLE attackers")) + return # create_all() rebuilds fresh — no need to patch columns. + if not rows: + return # table absent; create_all() handles it. + existing_cols = {r[1] for r in rows} + if "country_code" not in existing_cols: + await conn.execute(text( + "ALTER TABLE attackers ADD COLUMN country_code VARCHAR(2)" + )) + await conn.execute(text( + "CREATE INDEX IF NOT EXISTS ix_attackers_country_code " + "ON attackers (country_code)" + )) + if "country_source" not in existing_cols: + await conn.execute(text( + "ALTER TABLE attackers ADD COLUMN country_source VARCHAR(16)" + )) def _json_field_equals(self, key: str): # SQLite stores JSON as text; json_extract is the canonical accessor. diff --git a/tests/conftest.py b/tests/conftest.py index 3fa86289..19628810 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,6 +5,15 @@ Env vars required by decnet.env must be set here, at module level, before any test file imports decnet.* — pytest loads conftest.py first. """ import os +import tempfile + +# Redirect log paths to a user-writable tempdir so unprivileged test runs +# (CI, local sans-sudo) don't try to mkdir /var/log/decnet. +_TEST_LOG_DIR = os.path.join(tempfile.gettempdir(), "decnet-tests-logs") +os.makedirs(_TEST_LOG_DIR, exist_ok=True) +os.environ.setdefault("DECNET_LOG_FILE", os.path.join(_TEST_LOG_DIR, "decnet.log")) +os.environ.setdefault("DECNET_INGEST_LOG_FILE", os.path.join(_TEST_LOG_DIR, "decnet.log")) +os.environ.setdefault("DECNET_AGENT_LOG_FILE", os.path.join(_TEST_LOG_DIR, "agent.log")) os.environ["DECNET_JWT_SECRET"] = "stable-test-secret-key-at-least-32-chars-long" os.environ["DECNET_ADMIN_PASSWORD"] = "test-password-123" @@ -12,6 +21,13 @@ os.environ["DECNET_DEVELOPER"] = "true" os.environ["DECNET_DEVELOPER_TRACING"] = "false" os.environ["DECNET_DB_TYPE"] = "sqlite" +# GeoIP enrichment is offline-by-design (RIR delegated-stats) but the +# first access triggers a background file fetch. Unit tests must never +# hit the network and don't care about country codes — disable +# enrichment globally. The geoip-specific tests re-enable it via +# monkeypatch + a temp DECNET_GEOIP_ROOT. +os.environ["DECNET_GEOIP_ENABLED"] = "false" + import pytest from typing import Any diff --git a/tests/geoip/__init__.py b/tests/geoip/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/geoip/conftest.py b/tests/geoip/conftest.py new file mode 100644 index 00000000..e40ec016 --- /dev/null +++ b/tests/geoip/conftest.py @@ -0,0 +1,25 @@ +"""Per-package fixtures — flip DECNET_GEOIP_ENABLED back on for geoip tests +and point the provider at a tmp dir so no real /var/lib/decnet paths get +touched and no real RIR URL gets fetched. +""" +from __future__ import annotations + +import os +from pathlib import Path + +import pytest + + +@pytest.fixture(autouse=True) +def _geoip_sandbox(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path: + monkeypatch.setenv("DECNET_GEOIP_ENABLED", "true") + monkeypatch.setenv("DECNET_GEOIP_ROOT", str(tmp_path)) + # Reset module-level caches so the env swap takes effect. + import decnet.geoip as _g + import decnet.geoip.factory as _f + import decnet.geoip.paths as _p + monkeypatch.setattr(_p, "GEOIP_ROOT", tmp_path) + _g._lookup = None + _g._provider_name = None + _f.reset_cache() + return tmp_path diff --git a/tests/geoip/test_lookup.py b/tests/geoip/test_lookup.py new file mode 100644 index 00000000..258cae45 --- /dev/null +++ b/tests/geoip/test_lookup.py @@ -0,0 +1,76 @@ +"""Lookup index tests.""" +from __future__ import annotations + +import ipaddress +from pathlib import Path + +from decnet.geoip.lookup import Lookup + + +def _ip(s: str) -> int: + return int(ipaddress.IPv4Address(s)) + + +def _fixture_lookup() -> Lookup: + return Lookup.from_ranges([ + (_ip("8.8.8.0"), _ip("8.8.8.255"), "US"), + (_ip("85.214.0.0"), _ip("85.214.255.255"), "DE"), + (_ip("46.101.0.0"), _ip("46.101.255.255"), "GB"), + ]) + + +def test_country_hits_known_ranges() -> None: + lookup = _fixture_lookup() + assert lookup.country("8.8.8.8") == "US" + assert lookup.country("85.214.128.1") == "DE" + assert lookup.country("46.101.10.20") == "GB" + + +def test_country_misses_gap() -> None: + lookup = _fixture_lookup() + # 9.0.0.0 sits between our fixtures — not in any range. + assert lookup.country("9.0.0.0") is None + + +def test_country_private_loopback_returns_none() -> None: + lookup = _fixture_lookup() + for ip in ("10.0.0.1", "192.168.1.1", "172.16.0.1", "127.0.0.1", "0.0.0.0"): + assert lookup.country(ip) is None, ip + + +def test_country_ipv6_returns_none() -> None: + lookup = _fixture_lookup() + assert lookup.country("2001:db8::1") is None + assert lookup.country("::1") is None + + +def test_country_invalid_returns_none() -> None: + lookup = _fixture_lookup() + assert lookup.country("not-an-ip") is None + assert lookup.country("") is None + assert lookup.country("999.1.1.1") is None + + +def test_lookup_roundtrips_through_pickle(tmp_path: Path) -> None: + lookup = _fixture_lookup() + cache = tmp_path / "idx.pkl" + lookup.save(cache) + loaded = Lookup.load(cache) + assert len(loaded) == len(lookup) + assert loaded.country("8.8.8.8") == "US" + + +def test_from_ranges_last_writer_wins_on_collision() -> None: + lookup = Lookup.from_ranges([ + (_ip("1.0.0.0"), _ip("1.0.0.255"), "AU"), + (_ip("1.0.0.0"), _ip("1.0.0.255"), "CN"), + ]) + # Sorted by (start, end); last wins. + assert lookup.country("1.0.0.5") == "CN" + + +def test_boundary_inclusive() -> None: + lookup = _fixture_lookup() + assert lookup.country("8.8.8.0") == "US" + assert lookup.country("8.8.8.255") == "US" + assert lookup.country("8.8.9.0") is None diff --git a/tests/geoip/test_parse.py b/tests/geoip/test_parse.py new file mode 100644 index 00000000..ec1c523a --- /dev/null +++ b/tests/geoip/test_parse.py @@ -0,0 +1,66 @@ +"""Parser tests for RIR delegated-stats files.""" +from __future__ import annotations + +import ipaddress +from pathlib import Path + +from decnet.geoip.rir.parse import parse_file + + +_FIXTURE = """\ +2|ripencc|20260420|230000|19830101|20260419|+0000 +ripencc|*|asn|*|35000|summary +ripencc|*|ipv4|*|25000|summary +ripencc|DE|ipv4|85.214.0.0|65536|20060814|allocated|abc +ripencc|GB|ipv4|46.101.0.0|65536|20120101|assigned|def +ripencc|FR|ipv6|2001:db8::|32|20100101|allocated|ghi +ripencc|*|ipv4|5.0.0.0|256|20200101|reserved|jkl +ripencc|ZZ|ipv4|6.0.0.0|256|20200101|allocated|mno +ripencc|ES|ipv4|*|0|20200101|allocated|pqr +# comment line +ripencc|IT|asn|12345|1|20100101|allocated|stu +arin|US|ipv4|8.8.8.0|256|20000101|allocated|xyz +""" + + +def test_parse_skips_non_ipv4_and_sentinels(tmp_path: Path) -> None: + fixture = tmp_path / "ripe.txt" + fixture.write_text(_FIXTURE) + ranges = list(parse_file(fixture)) + ccs = {r[2] for r in ranges} + # v4 allocated/assigned with real country codes only. + assert ccs == {"DE", "GB", "US"} + + +def test_parse_range_boundaries(tmp_path: Path) -> None: + fixture = tmp_path / "arin.txt" + fixture.write_text(_FIXTURE) + ranges = [r for r in parse_file(fixture) if r[2] == "US"] + assert len(ranges) == 1 + start, end, cc = ranges[0] + assert start == int(ipaddress.IPv4Address("8.8.8.0")) + assert end == int(ipaddress.IPv4Address("8.8.8.255")) + assert cc == "US" + + +def test_parse_lowercase_cc_is_uppercased(tmp_path: Path) -> None: + fixture = tmp_path / "apnic.txt" + fixture.write_text("apnic|jp|ipv4|1.0.0.0|256|19990101|allocated|abc\n") + ranges = list(parse_file(fixture)) + assert ranges == [(int(ipaddress.IPv4Address("1.0.0.0")), + int(ipaddress.IPv4Address("1.0.0.255")), + "JP")] + + +def test_parse_malformed_lines_are_skipped(tmp_path: Path) -> None: + fixture = tmp_path / "broken.txt" + fixture.write_text( + "garbage\n" + "a|b|c\n" + "ripencc|DE|ipv4|not-an-ip|65536|20060814|allocated|abc\n" + "ripencc|DE|ipv4|85.214.0.0|not-a-count|20060814|allocated|abc\n" + "ripencc|DE|ipv4|85.214.0.0|65536|20060814|allocated|ok\n" + ) + ranges = list(parse_file(fixture)) + assert len(ranges) == 1 + assert ranges[0][2] == "DE" diff --git a/tests/geoip/test_profiler_integration.py b/tests/geoip/test_profiler_integration.py new file mode 100644 index 00000000..bfa3b4de --- /dev/null +++ b/tests/geoip/test_profiler_integration.py @@ -0,0 +1,39 @@ +"""_build_record must thread country fields through to the upsert payload.""" +from __future__ import annotations + +from datetime import datetime, timezone +from pathlib import Path + +from decnet.correlation.parser import LogEvent +from decnet.geoip.rir.fetch import RIR_SOURCES +from decnet.profiler.worker import _build_record + + +def _evt(ip: str) -> LogEvent: + return LogEvent( + timestamp=datetime(2026, 4, 23, tzinfo=timezone.utc), + attacker_ip=ip, + decky="decky-01", + service="ssh", + event_type="conn", + fields={}, + raw="", + ) + + +def test_build_record_includes_country_when_resolved(tmp_path: Path) -> None: + (tmp_path / f"{RIR_SOURCES[0][0]}.txt").write_text( + "arin|US|ipv4|8.8.8.0|256|20000101|allocated|abc\n" + ) + record = _build_record("8.8.8.8", [_evt("8.8.8.8")], None, [], []) + assert record["country_code"] == "US" + assert record["country_source"] == "rir" + + +def test_build_record_country_none_for_private(tmp_path: Path) -> None: + (tmp_path / f"{RIR_SOURCES[0][0]}.txt").write_text( + "arin|US|ipv4|8.8.8.0|256|20000101|allocated|abc\n" + ) + record = _build_record("10.0.0.1", [_evt("10.0.0.1")], None, [], []) + assert record["country_code"] is None + assert record["country_source"] is None diff --git a/tests/geoip/test_provider.py b/tests/geoip/test_provider.py new file mode 100644 index 00000000..885e46f5 --- /dev/null +++ b/tests/geoip/test_provider.py @@ -0,0 +1,103 @@ +"""RirProvider + factory + public API tests.""" +from __future__ import annotations + +from pathlib import Path + +import pytest + + +def test_factory_returns_rir_by_default() -> None: + from decnet.geoip.factory import get_provider + + provider = get_provider() + assert provider.name == "rir" + + +def test_factory_rejects_unknown_provider(monkeypatch: pytest.MonkeyPatch) -> None: + from decnet.geoip import factory + + monkeypatch.setenv("DECNET_GEOIP_PROVIDER", "nope") + factory.reset_cache() + with pytest.raises(ValueError): + factory.get_provider() + + +def test_factory_reserved_providers_raise(monkeypatch: pytest.MonkeyPatch) -> None: + from decnet.geoip import factory + + for reserved in ("dbip", "maxmind"): + monkeypatch.setenv("DECNET_GEOIP_PROVIDER", reserved) + factory.reset_cache() + with pytest.raises(NotImplementedError): + factory.get_provider() + + +def test_provider_build_lookup_empty_when_no_files(tmp_path: Path) -> None: + from decnet.geoip.rir.provider import RirProvider + + p = RirProvider() + lookup = p.build_lookup() + assert len(lookup) == 0 + assert lookup.country("8.8.8.8") is None + + +def test_provider_build_lookup_reads_present_files(tmp_path: Path) -> None: + from decnet.geoip.rir.fetch import RIR_SOURCES + from decnet.geoip.rir.provider import RirProvider + + # Drop one fake ARIN file — provider should pick it up. + arin_name = RIR_SOURCES[0][0] + (tmp_path / f"{arin_name}.txt").write_text( + "arin|US|ipv4|8.8.8.0|256|20000101|allocated|abc\n" + ) + p = RirProvider() + lookup = p.build_lookup() + assert lookup.country("8.8.8.8") == "US" + + +def test_provider_uses_cache_when_fresh(tmp_path: Path) -> None: + from decnet.geoip.rir.fetch import RIR_SOURCES + from decnet.geoip.rir.provider import RirProvider + + arin_name = RIR_SOURCES[0][0] + src = tmp_path / f"{arin_name}.txt" + src.write_text("arin|US|ipv4|8.8.8.0|256|20000101|allocated|abc\n") + p = RirProvider() + lookup_a = p.build_lookup() + assert (tmp_path / ".rir_index.pkl").exists() + + # Rewrite the source file BUT keep its mtime older than the cache. + # We only test the fast path by rebuilding a new provider instance + # without mutating the source — cache should be used. + p2 = RirProvider() + lookup_b = p2.build_lookup() + assert len(lookup_b) == len(lookup_a) + + +def test_enrich_ip_short_circuits_when_disabled(monkeypatch: pytest.MonkeyPatch) -> None: + import decnet.geoip as geoip + + monkeypatch.setenv("DECNET_GEOIP_ENABLED", "false") + assert geoip.enrich_ip("8.8.8.8") == (None, None) + + +def test_enrich_ip_returns_country_and_source(tmp_path: Path) -> None: + from decnet.geoip import enrich_ip + from decnet.geoip.rir.fetch import RIR_SOURCES + + (tmp_path / f"{RIR_SOURCES[0][0]}.txt").write_text( + "arin|US|ipv4|8.8.8.0|256|20000101|allocated|abc\n" + ) + cc, src = enrich_ip("8.8.8.8") + assert cc == "US" + assert src == "rir" + + +def test_enrich_ip_private_returns_none(tmp_path: Path) -> None: + from decnet.geoip import enrich_ip + from decnet.geoip.rir.fetch import RIR_SOURCES + + (tmp_path / f"{RIR_SOURCES[0][0]}.txt").write_text( + "arin|US|ipv4|8.8.8.0|256|20000101|allocated|abc\n" + ) + assert enrich_ip("192.168.1.1") == (None, None)