diff --git a/decnet/geoip/ptr.py b/decnet/geoip/ptr.py new file mode 100644 index 00000000..b5af6c28 --- /dev/null +++ b/decnet/geoip/ptr.py @@ -0,0 +1,87 @@ +"""Reverse DNS (PTR record) lookup for attacker IPs. + +Colocated with ``decnet.geoip`` because the shape matches: take an IP, +return a piece of supplementary metadata, never raise. Same operator +posture as ``enrich_ip`` — a missing PTR must never break profile +building. + +The profiler calls this once per attacker IP at first sighting. Never +re-resolves — the profiler tracks already-attempted IPs in-memory +(``_WorkerState.ptr_attempted``) so a persistent NXDOMAIN doesn't burn +2 seconds of tick time on every cycle. +""" +from __future__ import annotations + +import asyncio +import ipaddress +import os +import socket +from typing import Optional + +from decnet.logging import get_logger + +log = get_logger("geoip.ptr") + + +_DEFAULT_TIMEOUT = 2.0 + + +def _is_resolvable(ip: str) -> bool: + """True iff ``ip`` is a parseable public address worth querying. + + Private / loopback / link-local / multicast / reserved addresses + have no meaningful PTR at the public resolver level, so short- + circuit before spending a DNS round-trip on them. + """ + try: + addr = ipaddress.ip_address(ip) + except (ValueError, TypeError): + return False + if addr.is_loopback or addr.is_private or addr.is_link_local: + return False + if addr.is_multicast or addr.is_reserved or addr.is_unspecified: + return False + return True + + +def _blocking_lookup(ip: str) -> Optional[str]: + """Synchronous PTR lookup — runs in the executor thread.""" + try: + hostname, _aliases, _addrs = socket.gethostbyaddr(ip) + return hostname or None + except (socket.herror, socket.gaierror, OSError): + return None + + +async def resolve_ptr_record( + ip: str, + *, + timeout: float = _DEFAULT_TIMEOUT, +) -> Optional[str]: + """Resolve *ip* to a PTR / rDNS hostname. + + Returns the canonical hostname on success, ``None`` on any failure + (NXDOMAIN, timeout, malformed input, env kill-switch). Never raises + — PTR is supplementary attacker metadata; a missing lookup must not + break profile building. + + Honours ``DECNET_PTR_ENABLED=false`` for locked-down environments + where egress DNS is forbidden. + """ + if os.environ.get("DECNET_PTR_ENABLED", "true").lower() == "false": + return None + if not _is_resolvable(ip): + return None + + loop = asyncio.get_running_loop() + try: + return await asyncio.wait_for( + loop.run_in_executor(None, _blocking_lookup, ip), + timeout=timeout, + ) + except asyncio.TimeoutError: + log.debug("ptr: timeout resolving %s after %.1fs", ip, timeout) + return None + except Exception as exc: # noqa: BLE001 — supplementary metadata + log.debug("ptr: resolver crashed for %s: %s", ip, exc) + return None diff --git a/decnet/profiler/worker.py b/decnet/profiler/worker.py index 4da37aa7..89517073 100644 --- a/decnet/profiler/worker.py +++ b/decnet/profiler/worker.py @@ -30,6 +30,7 @@ from decnet.bus.publish import ( from decnet.correlation.engine import CorrelationEngine from decnet.correlation.parser import LogEvent from decnet.geoip import enrich_ip +from decnet.geoip.ptr import resolve_ptr_record from decnet.logging import get_logger from decnet.profiler.behavioral import build_behavior_record from decnet.telemetry import traced as _traced, get_tracer as _get_tracer @@ -76,6 +77,10 @@ class _WorkerState: # Optional bus hook — fires ``("scored", payload)`` per profile upsert. # None when the bus is disabled or unreachable. publish_attacker: Callable[[str, dict[str, Any]], None] | None = None + # Set of IPs we've already tried to PTR-resolve in this worker's + # lifetime. Bounds retry to once per worker boot so a persistently + # NXDOMAIN-returning IP doesn't burn 2s of tick time on every cycle. + ptr_attempted: set[str] = field(default_factory=set) async def attacker_profile_worker(repo: BaseRepository, *, interval: int = 30) -> None: @@ -178,6 +183,28 @@ async def _incremental_update(repo: BaseRepository, state: _WorkerState) -> None logger.info("attacker worker: updated %d profiles (incremental)", len(affected_ips)) +_PTR_CONCURRENCY = 10 + + +async def _resolve_ptrs_for(ips: list[str]) -> dict[str, Any]: + """Resolve PTR for each *ip* concurrently, bounded. + + Returns ``{ip: ptr_or_None}`` for every input. Uses an asyncio + semaphore to cap parallel lookups — cold-start could see hundreds + of fresh IPs and we don't want to hammer the OS resolver. + """ + if not ips: + return {} + sem = asyncio.Semaphore(_PTR_CONCURRENCY) + + async def _one(ip: str) -> tuple[str, Any]: + async with sem: + return ip, await resolve_ptr_record(ip) + + results = await asyncio.gather(*(_one(ip) for ip in ips)) + return dict(results) + + @_traced("profiler.update_profiles") async def _update_profiles( repo: BaseRepository, @@ -187,6 +214,14 @@ async def _update_profiles( traversal_map = {t.attacker_ip: t for t in state.engine.traversals(min_deckies=2)} bounties_map = await repo.get_bounties_for_ips(ips) + # PTR resolution: one shot per IP per worker lifetime. OS resolver + # caches, so re-runs on worker restart hit cache instantly for IPs + # resolved recently; only never-seen addresses pay the 2s ceiling. + fresh = [ip for ip in ips if ip not in state.ptr_attempted] + for ip in fresh: + state.ptr_attempted.add(ip) + ptrs = await _resolve_ptrs_for(fresh) + _tracer = _get_tracer("profiler") for ip in ips: events = state.engine._events.get(ip, []) @@ -201,7 +236,15 @@ async def _update_profiles( bounties = bounties_map.get(ip, []) commands = _extract_commands_from_events(events) - record = _build_record(ip, events, traversal, bounties, commands) + if ip in ptrs: + record = _build_record( + ip, events, traversal, bounties, commands, + ptr_record=ptrs[ip], + ) + else: + # Not in ptrs → already attempted in a prior cycle → skip + # kwarg so upsert preserves whatever's stored. + record = _build_record(ip, events, traversal, bounties, commands) attacker_uuid = await repo.upsert_attacker(record) _span.set_attribute("is_traversal", traversal is not None) @@ -243,12 +286,17 @@ async def _update_profiles( logger.error("attacker worker: smtp target upsert failed for %s: %s", ip, exc) +_UNSET = object() # sentinel — distinguishes "not passed" from "None" + + def _build_record( ip: str, events: list[LogEvent], traversal: Any, bounties: list[dict[str, Any]], commands: list[dict[str, Any]], + *, + ptr_record: Any = _UNSET, ) -> dict[str, Any]: services = sorted({e.service for e in events}) deckies = ( @@ -260,7 +308,7 @@ def _build_record( credential_count = sum(1 for b in bounties if b.get("bounty_type") == "credential") country_code, country_source = enrich_ip(ip) - return { + record: dict[str, Any] = { "ip": ip, "first_seen": min(e.timestamp for e in events), "last_seen": max(e.timestamp for e in events), @@ -279,6 +327,13 @@ def _build_record( "country_source": country_source, "updated_at": datetime.now(timezone.utc), } + # ptr_record is omitted from the dict entirely when the caller didn't + # supply one — lets the upsert's attribute-merge preserve any value + # already stored on the row without us having to think about "None + # means preserve vs. overwrite". + if ptr_record is not _UNSET: + record["ptr_record"] = ptr_record + return record def _first_contact_deckies(events: list[LogEvent]) -> list[str]: diff --git a/decnet/web/db/models/attackers.py b/decnet/web/db/models/attackers.py index 5c1ad913..a6583d29 100644 --- a/decnet/web/db/models/attackers.py +++ b/decnet/web/db/models/attackers.py @@ -63,6 +63,11 @@ class Attacker(SQLModel, table=True): # Nullable because private / loopback / IPv6 sources never resolve. country_code: Optional[str] = Field(default=None, max_length=2, index=True) country_source: Optional[str] = Field(default=None, max_length=16) + # Reverse-DNS (PTR) name, one-shot resolved by the profiler at first + # sighting. Nullable — many attackers run infra with no rDNS, and + # private/loopback addresses never resolve. 256 chars matches + # RFC 1035 max hostname length. + ptr_record: Optional[str] = Field(default=None, max_length=256) updated_at: datetime = Field( default_factory=lambda: datetime.now(timezone.utc), index=True ) diff --git a/decnet_web/src/components/AttackerDetail.tsx b/decnet_web/src/components/AttackerDetail.tsx index 70867eed..33253d55 100644 --- a/decnet_web/src/components/AttackerDetail.tsx +++ b/decnet_web/src/components/AttackerDetail.tsx @@ -61,6 +61,7 @@ interface AttackerData { commands: { service: string; decky: string; command: string; timestamp: string }[]; country_code: string | null; country_source: string | null; + ptr_record: string | null; updated_at: string; behavior: AttackerBehavior | null; service_activity?: { @@ -1012,6 +1013,20 @@ const AttackerDetail: React.FC = () => { unknown )} +
+ REVERSE DNS: + {attacker.ptr_record ? ( + + {attacker.ptr_record} + + ) : ( + + )} +
diff --git a/tests/conftest.py b/tests/conftest.py index 19628810..467f5fbd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -27,6 +27,10 @@ os.environ["DECNET_DB_TYPE"] = "sqlite" # enrichment globally. The geoip-specific tests re-enable it via # monkeypatch + a temp DECNET_GEOIP_ROOT. os.environ["DECNET_GEOIP_ENABLED"] = "false" +# Same posture for PTR resolution — tests that cover the resolver +# re-enable it explicitly via monkeypatch; everyone else gets the +# short-circuit (returns None without touching socket.gethostbyaddr). +os.environ["DECNET_PTR_ENABLED"] = "false" import pytest from typing import Any diff --git a/tests/geoip/test_ptr.py b/tests/geoip/test_ptr.py new file mode 100644 index 00000000..fcffe1d8 --- /dev/null +++ b/tests/geoip/test_ptr.py @@ -0,0 +1,119 @@ +"""Unit tests for decnet.geoip.ptr — reverse-DNS resolver.""" +from __future__ import annotations + +import asyncio +import socket +from unittest.mock import patch + +import pytest + +from decnet.geoip.ptr import _is_resolvable, resolve_ptr_record + + +@pytest.fixture(autouse=True) +def _enable_ptr(monkeypatch): + """This module covers the resolver directly — re-enable the env + switch that tests/conftest.py disables globally.""" + monkeypatch.setenv("DECNET_PTR_ENABLED", "true") + + +# ─── pure predicate ───────────────────────────────────────────────────────── + +@pytest.mark.parametrize("ip", [ + "127.0.0.1", + "10.0.0.1", + "192.168.1.5", + "172.16.0.1", + "169.254.1.1", # link-local + "224.0.0.1", # multicast + "::1", + "fe80::1", # IPv6 link-local + "not-an-ip", + "", +]) +def test_not_resolvable(ip: str): + assert _is_resolvable(ip) is False + + +@pytest.mark.parametrize("ip", [ + "8.8.8.8", + "1.1.1.1", + "2606:4700:4700::1111", +]) +def test_resolvable_public(ip: str): + assert _is_resolvable(ip) is True + + +# ─── resolver ─────────────────────────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_resolves_public_ip(): + with patch( + "decnet.geoip.ptr.socket.gethostbyaddr", + return_value=("dns.google", [], ["8.8.8.8"]), + ): + name = await resolve_ptr_record("8.8.8.8") + assert name == "dns.google" + + +@pytest.mark.asyncio +async def test_private_ip_short_circuits(): + """Private IPs never touch the resolver.""" + with patch("decnet.geoip.ptr.socket.gethostbyaddr") as mock_lookup: + assert await resolve_ptr_record("127.0.0.1") is None + assert await resolve_ptr_record("10.0.0.1") is None + assert await resolve_ptr_record("::1") is None + assert mock_lookup.call_count == 0 + + +@pytest.mark.asyncio +async def test_gethostbyaddr_herror_returns_none(): + with patch( + "decnet.geoip.ptr.socket.gethostbyaddr", + side_effect=socket.herror("no rDNS"), + ): + assert await resolve_ptr_record("8.8.8.8") is None + + +@pytest.mark.asyncio +async def test_gethostbyaddr_gaierror_returns_none(): + with patch( + "decnet.geoip.ptr.socket.gethostbyaddr", + side_effect=socket.gaierror("dns broken"), + ): + assert await resolve_ptr_record("8.8.8.8") is None + + +@pytest.mark.asyncio +async def test_timeout_returns_none(): + """A slow resolver should not block the caller past timeout.""" + def slow(ip: str): # noqa: ARG001 + import time + time.sleep(3.0) + return ("slow.example", [], []) + + with patch("decnet.geoip.ptr.socket.gethostbyaddr", side_effect=slow): + # Tight timeout — must return quickly. + result = await asyncio.wait_for( + resolve_ptr_record("8.8.8.8", timeout=0.1), + timeout=1.0, + ) + assert result is None + + +@pytest.mark.asyncio +async def test_env_disabled(monkeypatch): + monkeypatch.setenv("DECNET_PTR_ENABLED", "false") + with patch("decnet.geoip.ptr.socket.gethostbyaddr") as mock_lookup: + assert await resolve_ptr_record("8.8.8.8") is None + assert mock_lookup.call_count == 0 + + +@pytest.mark.asyncio +async def test_empty_hostname_returned_as_none(): + """gethostbyaddr can return '' on some platforms; normalize to None.""" + with patch( + "decnet.geoip.ptr.socket.gethostbyaddr", + return_value=("", [], ["8.8.8.8"]), + ): + assert await resolve_ptr_record("8.8.8.8") is None diff --git a/tests/profiler/test_attacker_worker.py b/tests/profiler/test_attacker_worker.py index e6f4757f..1c144a30 100644 --- a/tests/profiler/test_attacker_worker.py +++ b/tests/profiler/test_attacker_worker.py @@ -314,6 +314,33 @@ class TestBuildRecord: assert isinstance(record["updated_at"], datetime) assert record["updated_at"].tzinfo is not None + def test_ptr_record_absent_when_not_passed(self): + """Omitting the kwarg means the key isn't in the record dict. + + This lets upsert_attacker's attribute-merge loop preserve any + PTR already stored on the row — we never null by accident. + """ + events = self._events() + record = _build_record("1.1.1.1", events, None, [], []) + assert "ptr_record" not in record + + def test_ptr_record_included_when_resolved(self): + events = self._events() + record = _build_record( + "1.1.1.1", events, None, [], [], + ptr_record="dns.google", + ) + assert record["ptr_record"] == "dns.google" + + def test_ptr_record_included_when_explicit_none(self): + """Explicit None is a fresh-attempt-failed signal, still written.""" + events = self._events() + record = _build_record( + "1.1.1.1", events, None, [], [], + ptr_record=None, + ) + assert record["ptr_record"] is None + # ─── cold start via _incremental_update (uninitialized state) ────────────────