63 lines
2.5 KiB
Python
63 lines
2.5 KiB
Python
"""RIR delegated-stats download.
|
|
|
|
Five public files, ~5 MB total. Pulled over HTTPS with a generic
|
|
User-Agent (stealth: never identify as DECNET — a RIR log scraper could
|
|
otherwise correlate our egress to a honeypot operator).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import shutil
|
|
import urllib.request
|
|
from pathlib import Path
|
|
from typing import Tuple
|
|
|
|
logger = logging.getLogger("decnet.geoip.rir.fetch")
|
|
|
|
# (registry_name, url). Extended delegated-stats include the opaque
|
|
# registration ID we don't use, but they are what the RIRs recommend
|
|
# consumers pull.
|
|
RIR_SOURCES: Tuple[Tuple[str, str], ...] = (
|
|
("arin", "https://ftp.arin.net/pub/stats/arin/delegated-arin-extended-latest"),
|
|
("ripe", "https://ftp.ripe.net/pub/stats/ripencc/delegated-ripencc-extended-latest"),
|
|
("apnic", "https://ftp.apnic.net/stats/apnic/delegated-apnic-extended-latest"),
|
|
("lacnic", "https://ftp.lacnic.net/pub/stats/lacnic/delegated-lacnic-extended-latest"),
|
|
("afrinic", "https://ftp.afrinic.net/pub/stats/afrinic/delegated-afrinic-extended-latest"),
|
|
)
|
|
|
|
# Generic UA — no DECNET/honeypot token. Matches what a stock requests/
|
|
# urllib script would send if someone forgot to set one.
|
|
_USER_AGENT = "Mozilla/5.0 (compatible; fetch/1.0)"
|
|
_TIMEOUT_S = 60
|
|
|
|
|
|
def fetch_all(dest: Path) -> list[Path]:
|
|
"""Download every RIR file into *dest*. Returns the written paths.
|
|
|
|
Atomic per file: we download to ``{name}.txt.tmp`` then rename. A
|
|
partial failure leaves the previous generation intact.
|
|
"""
|
|
dest.mkdir(parents=True, exist_ok=True)
|
|
written: list[Path] = []
|
|
for name, url in RIR_SOURCES:
|
|
target = dest / f"{name}.txt"
|
|
tmp = target.with_suffix(".txt.tmp")
|
|
try:
|
|
_download(url, tmp)
|
|
tmp.replace(target)
|
|
written.append(target)
|
|
logger.info("geoip.rir: fetched %s (%d bytes)", name, target.stat().st_size)
|
|
except Exception as exc:
|
|
logger.error("geoip.rir: fetch failed for %s (%s): %s", name, url, exc)
|
|
if tmp.exists():
|
|
tmp.unlink(missing_ok=True)
|
|
# Keep any stale previous file — better outdated than empty.
|
|
return written
|
|
|
|
|
|
def _download(url: str, dest: Path) -> None:
|
|
req = urllib.request.Request(url, headers={"User-Agent": _USER_AGENT})
|
|
# `with` closes the response + dest file on any path.
|
|
with urllib.request.urlopen(req, timeout=_TIMEOUT_S) as resp, dest.open("wb") as fh: # nosec B310 — fixed https RIR URLs
|
|
shutil.copyfileobj(resp, fh)
|