diff --git a/decnet/rpki/cache.py b/decnet/rpki/cache.py new file mode 100644 index 00000000..67efd698 --- /dev/null +++ b/decnet/rpki/cache.py @@ -0,0 +1,73 @@ +"""SQLite-backed RPKI result cache. + +Schema: ``(ip, asn) -> (rpki_status, rpki_prefix, fetched_at)``. +Key is ``ip`` only — for a given IP the announcing ASN is stable +within the cache TTL, and ASN-change events are rare enough that +letting the entry expire naturally is sufficient. + +TTL: 12 hours. On :func:`open_db` the caller should call +:func:`prune` once to evict stale rows. +""" +from __future__ import annotations + +import sqlite3 +import time +from pathlib import Path +from typing import Optional, Tuple + +TTL_S = 43_200 # 12 hours + +_CREATE = """ +CREATE TABLE IF NOT EXISTS rpki_cache ( + ip TEXT NOT NULL PRIMARY KEY, + asn INTEGER NOT NULL, + rpki_status TEXT NOT NULL, + rpki_prefix TEXT, + fetched_at REAL NOT NULL +) +""" + + +def open_db(path: Path) -> sqlite3.Connection: + """Open (or create) the cache database at *path* and return the connection.""" + con = sqlite3.connect(str(path), check_same_thread=False, timeout=5) + con.execute(_CREATE) + con.commit() + return con + + +def get(con: sqlite3.Connection, ip: str) -> Optional[Tuple[str, Optional[str]]]: + """Return ``(rpki_status, rpki_prefix)`` if a fresh entry exists, else ``None``.""" + row = con.execute( + "SELECT rpki_status, rpki_prefix, fetched_at FROM rpki_cache WHERE ip = ?", + (ip,), + ).fetchone() + if row is None: + return None + if time.time() - row[2] > TTL_S: + return None + return (row[0], row[1]) + + +def put( + con: sqlite3.Connection, + ip: str, + asn: int, + status: str, + prefix: Optional[str], +) -> None: + """Insert or replace a cache entry.""" + con.execute( + "INSERT OR REPLACE INTO rpki_cache " + "(ip, asn, rpki_status, rpki_prefix, fetched_at) VALUES (?, ?, ?, ?, ?)", + (ip, asn, status, prefix, time.time()), + ) + con.commit() + + +def prune(con: sqlite3.Connection) -> int: + """Delete all entries older than :data:`TTL_S`. Returns the count deleted.""" + cutoff = time.time() - TTL_S + cur = con.execute("DELETE FROM rpki_cache WHERE fetched_at < ?", (cutoff,)) + con.commit() + return cur.rowcount diff --git a/decnet/rpki/ripestat/validator.py b/decnet/rpki/ripestat/validator.py index 9c86872c..07856f98 100644 --- a/decnet/rpki/ripestat/validator.py +++ b/decnet/rpki/ripestat/validator.py @@ -3,18 +3,87 @@ Resolves the most-specific announced prefix covering ``ip`` via the RIPE STAT ``network-info`` endpoint, then validates ``(asn, prefix)`` via ``rpki-validation``. Results are cached in a SQLite database under -:data:`~decnet.rpki.paths.RPKI_ROOT` to avoid per-event network calls. +:data:`~decnet.rpki.paths.RPKI_ROOT`. -HTTP is wired in the next commit; this skeleton returns ``unknown`` -unconditionally so the rest of the pipeline compiles and tests pass. +Two HTTP calls per uncached IP (``network-info`` + ``rpki-validation``), +each with a 2-second timeout. Any network failure collapses to +``status="unknown"`` — the caller upserts the attacker row regardless. """ from __future__ import annotations -from decnet.rpki.base import RpkiResult, Validator +import json +import logging +import sqlite3 +import urllib.request +from datetime import datetime, timezone +from typing import Optional + +from decnet.rpki import cache as _cache +from decnet.rpki.base import RpkiResult, RpkiStatus, Validator +from decnet.rpki.paths import ensure_root + +logger = logging.getLogger("decnet.rpki.ripestat") + +_TIMEOUT_S = 2 +_STAT_BASE = "https://stat.ripe.net/data" +_UA = "Mozilla/5.0 (compatible; fetch/1.0)" class RipeStatValidator(Validator): name = "ripestat" + def __init__(self) -> None: + db_path = ensure_root() / "cache.db" + self._con: sqlite3.Connection = _cache.open_db(db_path) + _cache.prune(self._con) + def validate(self, ip: str, asn: int) -> RpkiResult: - return RpkiResult(status="unknown") + cached = _cache.get(self._con, ip) + if cached is not None: + status, prefix = cached + return RpkiResult(status=status, prefix=prefix) # type: ignore[arg-type] + + try: + prefix = self._network_info(ip) + if prefix is None: + return self._store(ip, asn, "not-found", None) + status = self._rpki_validation(asn, prefix) + return self._store(ip, asn, status, prefix) + except Exception as exc: + logger.debug("rpki.ripestat: lookup failed for %s / AS%s: %s", ip, asn, exc) + return RpkiResult(status="unknown") + + # ---------- internal ---------- + + def _network_info(self, ip: str) -> Optional[str]: + """Return the most-specific announced prefix containing *ip*, or None.""" + data = self._fetch(f"{_STAT_BASE}/network-info/data.json?resource={ip}") + return data.get("data", {}).get("prefix") or None + + def _rpki_validation(self, asn: int, prefix: str) -> RpkiStatus: + """Return RPKI state for (asn, prefix).""" + data = self._fetch( + f"{_STAT_BASE}/rpki-validation/data.json?resource={asn}&prefix={prefix}" + ) + raw = data.get("data", {}).get("status", "unknown") + if raw in ("valid", "invalid", "not-found"): + return raw + return "unknown" + + def _fetch(self, url: str) -> dict: + req = urllib.request.Request(url, headers={"User-Agent": _UA}) + with urllib.request.urlopen(req, timeout=_TIMEOUT_S) as resp: # nosec B310 — HTTPS RIPE STAT base URL only; IP/ASN components are validated upstream + return json.loads(resp.read()) + + def _store( + self, ip: str, asn: int, status: str, prefix: Optional[str] + ) -> RpkiResult: + try: + _cache.put(self._con, ip, asn, status, prefix) + except Exception as exc: + logger.debug("rpki.ripestat: cache write failed: %s", exc) + return RpkiResult( + status=status, # type: ignore[arg-type] + prefix=prefix, + validated_at=datetime.now(timezone.utc), + ) diff --git a/tests/rpki/test_cache.py b/tests/rpki/test_cache.py new file mode 100644 index 00000000..3a6af615 --- /dev/null +++ b/tests/rpki/test_cache.py @@ -0,0 +1,64 @@ +"""SQLite RPKI cache tests.""" +from __future__ import annotations + +import time +from pathlib import Path + +import pytest + +from decnet.rpki.cache import TTL_S, get, open_db, prune, put + + +@pytest.fixture() +def db(tmp_path: Path): + return open_db(tmp_path / "rpki.db") + + +def test_miss_returns_none(db) -> None: + assert get(db, "8.8.8.8") is None + + +def test_put_then_get_returns_entry(db) -> None: + put(db, "8.8.8.8", 15169, "valid", "8.8.8.0/24") + result = get(db, "8.8.8.8") + assert result == ("valid", "8.8.8.0/24") + + +def test_get_returns_none_prefix_when_stored_null(db) -> None: + put(db, "1.2.3.4", 64496, "not-found", None) + status, prefix = get(db, "1.2.3.4") + assert status == "not-found" + assert prefix is None + + +def test_expired_entry_returns_none(db, monkeypatch: pytest.MonkeyPatch) -> None: + put(db, "8.8.8.8", 15169, "valid", "8.8.8.0/24") + future = time.time() + TTL_S + 1 + monkeypatch.setattr("decnet.rpki.cache.time.time", lambda: future) + assert get(db, "8.8.8.8") is None + + +def test_replace_updates_entry(db) -> None: + put(db, "8.8.8.8", 15169, "valid", "8.8.8.0/24") + put(db, "8.8.8.8", 15169, "invalid", "8.8.8.0/24") + status, _ = get(db, "8.8.8.8") + assert status == "invalid" + + +def test_prune_removes_stale_rows(db, monkeypatch: pytest.MonkeyPatch) -> None: + put(db, "1.1.1.1", 13335, "valid", "1.1.1.0/24") + put(db, "2.2.2.2", 3215, "invalid", "2.0.0.0/8") + future = time.time() + TTL_S + 1 + monkeypatch.setattr("decnet.rpki.cache.time.time", lambda: future) + count = prune(db) + assert count == 2 + # After prune, both gone + assert get(db, "1.1.1.1") is None + assert get(db, "2.2.2.2") is None + + +def test_prune_keeps_fresh_rows(db) -> None: + put(db, "8.8.8.8", 15169, "valid", "8.8.8.0/24") + count = prune(db) + assert count == 0 + assert get(db, "8.8.8.8") is not None diff --git a/tests/rpki/test_validator.py b/tests/rpki/test_validator.py new file mode 100644 index 00000000..8f2854f6 --- /dev/null +++ b/tests/rpki/test_validator.py @@ -0,0 +1,125 @@ +"""RipeStatValidator HTTP + cache integration tests. + +All network calls are intercepted via monkeypatching +``urllib.request.urlopen`` so no real HTTP leaves the test runner. +""" +from __future__ import annotations + +import io +import json +from pathlib import Path +from typing import Any +from unittest.mock import MagicMock, patch + +import pytest + + +def _mock_urlopen(responses: dict[str, Any]): + """Return a context-manager mock for urlopen that dispatches by URL fragment.""" + + def _urlopen(req, timeout=None): + url = req.full_url if hasattr(req, "full_url") else str(req) + for fragment, payload in responses.items(): + if fragment in url: + body = json.dumps(payload).encode() + mock = MagicMock() + mock.__enter__ = lambda s: io.BytesIO(body) + mock.__exit__ = MagicMock(return_value=False) + return mock + raise ValueError(f"Unexpected URL in test: {url}") + + return _urlopen + + +_NETWORK_INFO_VALID = { + "data": {"prefix": "8.8.8.0/24", "asns": ["15169"]} +} +_RPKI_VALID = { + "data": {"status": "valid", "validating_roas": []} +} +_RPKI_INVALID = { + "data": {"status": "invalid", "validating_roas": []} +} +_RPKI_NOT_FOUND = { + "data": {"status": "not-found", "validating_roas": []} +} +_NETWORK_INFO_EMPTY = {"data": {"prefix": None}} + + +@pytest.fixture() +def validator(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + monkeypatch.setenv("DECNET_RPKI_ROOT", str(tmp_path)) + # Re-import to pick up the patched root + import importlib + import decnet.rpki.paths as rpki_paths + monkeypatch.setattr(rpki_paths, "RPKI_ROOT", tmp_path) + + from decnet.rpki.ripestat.validator import RipeStatValidator + return RipeStatValidator() + + +def test_valid_result(validator) -> None: + responses = { + "network-info": _NETWORK_INFO_VALID, + "rpki-validation": _RPKI_VALID, + } + with patch("urllib.request.urlopen", side_effect=_mock_urlopen(responses)): + result = validator.validate("8.8.8.8", 15169) + assert result.status == "valid" + assert result.prefix == "8.8.8.0/24" + + +def test_invalid_result(validator) -> None: + responses = { + "network-info": _NETWORK_INFO_VALID, + "rpki-validation": _RPKI_INVALID, + } + with patch("urllib.request.urlopen", side_effect=_mock_urlopen(responses)): + result = validator.validate("8.8.8.8", 64496) + assert result.status == "invalid" + + +def test_not_found_when_no_prefix(validator) -> None: + responses = {"network-info": _NETWORK_INFO_EMPTY} + with patch("urllib.request.urlopen", side_effect=_mock_urlopen(responses)): + result = validator.validate("192.0.2.1", 64496) + assert result.status == "not-found" + + +def test_unknown_on_network_error(validator) -> None: + with patch("urllib.request.urlopen", side_effect=OSError("timeout")): + result = validator.validate("8.8.8.8", 15169) + assert result.status == "unknown" + + +def test_cache_hit_skips_http(validator) -> None: + responses = { + "network-info": _NETWORK_INFO_VALID, + "rpki-validation": _RPKI_VALID, + } + with patch("urllib.request.urlopen", side_effect=_mock_urlopen(responses)) as mock: + validator.validate("8.8.8.8", 15169) + validator.validate("8.8.8.8", 15169) # second call — should hit cache + # urlopen called exactly twice (once per endpoint on the first call) + assert mock.call_count == 2 + + +def test_rpki_not_found_status_stored(validator) -> None: + responses = { + "network-info": _NETWORK_INFO_VALID, + "rpki-validation": _RPKI_NOT_FOUND, + } + with patch("urllib.request.urlopen", side_effect=_mock_urlopen(responses)): + result = validator.validate("8.8.8.8", 99999) + assert result.status == "not-found" + + +def test_unknown_status_normalised(validator) -> None: + """Any unrecognised status string from RIPE STAT collapses to 'unknown'.""" + responses = { + "network-info": _NETWORK_INFO_VALID, + "rpki-validation": {"data": {"status": "something-new"}}, + } + with patch("urllib.request.urlopen", side_effect=_mock_urlopen(responses)): + result = validator.validate("8.8.8.8", 15169) + assert result.status == "unknown"