Files
DECNET/decnet/geoip/ptr.py
anti f2b3393669 chore: relicense to AGPL-3.0-or-later and add SPDX headers
Replaces LICENSE (GPLv3 -> AGPLv3) and prepends
`SPDX-License-Identifier: AGPL-3.0-or-later` to every source file
across decnet/, decnet_web/, tests/, scripts/, and tools/.

Rationale: closes the GPLv3 ASP loophole so any party operating a
modified DECNET as a network service must offer their modified
source. Personal copyright (Samuel Paschuan) + inbound=outbound
contributions make a future unilateral relicense infeasible.

- LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt)
- COPYRIGHT: project copyright notice
- tools/add_spdx_headers.py: idempotent header injector
  (shebang- and PEP 263-aware)

Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh).
No behavior change; comments only.
2026-05-22 21:04:16 -04:00

89 lines
2.7 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""Reverse DNS (PTR record) lookup for attacker IPs.
Colocated with ``decnet.geoip`` because the shape matches: take an IP,
return a piece of supplementary metadata, never raise. Same operator
posture as ``enrich_ip`` — a missing PTR must never break profile
building.
The profiler calls this once per attacker IP at first sighting. Never
re-resolves — the profiler tracks already-attempted IPs in-memory
(``_WorkerState.ptr_attempted``) so a persistent NXDOMAIN doesn't burn
2 seconds of tick time on every cycle.
"""
from __future__ import annotations
import asyncio
import ipaddress
import os
import socket
from typing import Optional
from decnet.logging import get_logger
log = get_logger("geoip.ptr")
_DEFAULT_TIMEOUT = 2.0
def _is_resolvable(ip: str) -> bool:
"""True iff ``ip`` is a parseable public address worth querying.
Private / loopback / link-local / multicast / reserved addresses
have no meaningful PTR at the public resolver level, so short-
circuit before spending a DNS round-trip on them.
"""
try:
addr = ipaddress.ip_address(ip)
except (ValueError, TypeError):
return False
if addr.is_loopback or addr.is_private or addr.is_link_local:
return False
if addr.is_multicast or addr.is_reserved or addr.is_unspecified:
return False
return True
def _blocking_lookup(ip: str) -> Optional[str]:
"""Synchronous PTR lookup — runs in the executor thread."""
try:
hostname, _aliases, _addrs = socket.gethostbyaddr(ip)
return hostname or None
except (socket.herror, socket.gaierror, OSError):
return None
async def resolve_ptr_record(
ip: str,
*,
timeout: float = _DEFAULT_TIMEOUT,
) -> Optional[str]:
"""Resolve *ip* to a PTR / rDNS hostname.
Returns the canonical hostname on success, ``None`` on any failure
(NXDOMAIN, timeout, malformed input, env kill-switch). Never raises
— PTR is supplementary attacker metadata; a missing lookup must not
break profile building.
Honours ``DECNET_PTR_ENABLED=false`` for locked-down environments
where egress DNS is forbidden.
"""
if os.environ.get("DECNET_PTR_ENABLED", "true").lower() == "false":
return None
if not _is_resolvable(ip):
return None
loop = asyncio.get_running_loop()
try:
return await asyncio.wait_for(
loop.run_in_executor(None, _blocking_lookup, ip),
timeout=timeout,
)
except asyncio.TimeoutError:
log.debug("ptr: timeout resolving %s after %.1fs", ip, timeout)
return None
except Exception as exc: # noqa: BLE001 — supplementary metadata
log.debug("ptr: resolver crashed for %s: %s", ip, exc)
return None