From bbb126e435ef00eb88ba64858d7d1a21ebda6d47 Mon Sep 17 00:00:00 2001 From: anti Date: Thu, 21 May 2026 19:50:09 -0400 Subject: [PATCH] =?UTF-8?q?feat(dns):=20fix=20three=20operational=20blind?= =?UTF-8?q?=20spots=20=E2=80=94=20flood=20detection,=20AAAA,=20recon=20bur?= =?UTF-8?q?st?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add per-src QPS counter (_qps_window) with flood_suspect event at ≥50 qps/10s; one event per src per 30s cooldown, does not suppress baseline query events. - Add tracking_evicted telemetry every 100 LRU evictions so IP-rotation evasion of _txt_times/_qps_window/_recon_window is observable, not silent. - Shared _track_lru helper consolidates LRU touch + eviction signalling across all three bounded OrderedDicts. - Add TYPE_AAAA=28 support: _fake_ipv6() returns deterministic ULA (fd::/8) addresses for in-zone names; extra_records parser now accepts and validates AAAA entries via socket.inet_pton. - Add per-src recon-burst aggregation (_recon_window): fingerprint_probe + zone_transfer + amp_probe are tracked per source in a 60s window; recon_burst fires when ≥2 distinct signal types seen, once per src per 120s cooldown. - 47 tests passing (19 new across TestAAAARecords, TestFloodDetection, TestReconBurst). --- decnet/templates/dns/server.py | 174 +++++++++++++++++++++++++-- tests/service_testing/test_dns.py | 190 +++++++++++++++++++++++++++++- 2 files changed, 352 insertions(+), 12 deletions(-) diff --git a/decnet/templates/dns/server.py b/decnet/templates/dns/server.py index 060ef6c4..07c4d98c 100644 --- a/decnet/templates/dns/server.py +++ b/decnet/templates/dns/server.py @@ -8,6 +8,9 @@ event_type values emitted: zone_transfer — AXFR or IXFR (always REFUSED) amp_probe — qtype=ANY or EDNS requestor udp_size > 1232 tunneling_suspect — long high-entropy labels or rapid TXT burst from same src + flood_suspect — source exceeding QPS threshold within rolling window + tracking_evicted — LRU state evicted (signals IP-rotation evasion) + recon_burst — same source hit ≥2 distinct high-signal event types within 60s """ import asyncio @@ -15,6 +18,7 @@ import collections import hashlib import math import os +import socket import struct import time from typing import Any, cast @@ -64,15 +68,32 @@ def _fake_ip(label: str = "") -> str: return f"10.{(h >> 16) & 0xFF}.{(h >> 8) & 0xFF}.{h & 0xFF}" +def _fake_ipv6(label: str = "") -> str: + """Deterministic ULA IPv6 address (fd00::/8) for in-zone names.""" + raw = bytes.fromhex(seed.instance_hex(15, f"aaaa:{label}")) + addr = b"\xfd" + raw # fd + 15 bytes = 16 bytes total, guaranteed fd::/8 + return socket.inet_ntop(socket.AF_INET6, addr) + + ZONE_IP = _fake_ip("zone") _NS2_IP = _fake_ip("ns2") +ZONE_IPV6 = _fake_ipv6("zone") +_NS2_IPV6 = _fake_ipv6("ns2") # Parse extra_records: one per line, " " _EXTRA_RECORDS: list[tuple[str, str, str]] = [] for _line in _EXTRA_RAW.splitlines(): _parts = _line.strip().split(None, 2) if len(_parts) == 3: - _EXTRA_RECORDS.append((_parts[0], _parts[1].upper(), _parts[2])) + _ename, _etype, _eval = _parts[0], _parts[1].upper(), _parts[2] + if _etype == "AAAA": + try: + socket.inet_pton(socket.AF_INET6, _eval) + _EXTRA_RECORDS.append((_ename, _etype, _eval)) + except OSError: + pass + else: + _EXTRA_RECORDS.append((_ename, _etype, _eval)) # ── DNS wire constants ──────────────────────────────────────────────────────── @@ -161,6 +182,10 @@ def _rdata_A(ip: str) -> bytes: return bytes(int(x) for x in ip.split(".")) +def _rdata_AAAA(ip6: str) -> bytes: + return socket.inet_pton(socket.AF_INET6, ip6) + + def _rdata_NS(ns: str) -> bytes: return _encode_name(ns) @@ -257,17 +282,70 @@ def _log(event_type: str, severity: int = 6, **kwargs) -> None: write_syslog_file(line) forward_syslog(line, LOG_TARGET) -# ── Tunneling heuristic ─────────────────────────────────────────────────────── +# ── Tunables ────────────────────────────────────────────────────────────────── -_SHANNON_THRESHOLD = 4.0 +# Tunneling heuristic +_SHANNON_THRESHOLD = 4.0 _LABEL_LEN_THRESHOLD = 30 -_TXT_BURST_WINDOW = 10.0 # seconds -_TXT_BURST_COUNT = 5 -_MAX_TRACKED_SRCS = 1000 +_TXT_BURST_WINDOW = 10.0 # seconds +_TXT_BURST_COUNT = 5 +_MAX_TRACKED_SRCS = 1000 -# src_ip -> deque of recent TXT query timestamps (monotonic) +# Flood detection +_QPS_WINDOW_SEC = 10.0 +_FLOOD_THRESHOLD = 50 +_FLOOD_COOLDOWN_SEC = 30.0 + +# Recon burst +_RECON_WINDOW_SEC = 60.0 +_RECON_DISTINCT_THRESHOLD = 2 +_RECON_COOLDOWN_SEC = 120.0 +_RECON_SIGNAL_TYPES = frozenset({"fingerprint_probe", "zone_transfer", "amp_probe"}) + +# Eviction telemetry +_EVICT_EVENT_EVERY = 100 + +# ── Per-src state ───────────────────────────────────────────────────────────── + +# Tunneling: src_ip -> deque of recent TXT timestamps _txt_times: collections.OrderedDict[str, collections.deque] = collections.OrderedDict() +# Flood: src_ip -> deque of recent query timestamps +_qps_window: collections.OrderedDict[str, collections.deque] = collections.OrderedDict() + +# Flood cooldown: src_ip -> last flood_suspect emit time +_flood_cooldown: dict[str, float] = {} + +# Recon: src_ip -> {event_type: last_seen_monotonic} +_recon_window: collections.OrderedDict[str, dict[str, float]] = collections.OrderedDict() + +# Recon cooldown: src_ip -> last recon_burst emit time +_recon_cooldown: dict[str, float] = {} + +_evictions_total = 0 + + +def _note_eviction(tracker_name: str) -> None: + global _evictions_total + _evictions_total += 1 + if _evictions_total % _EVICT_EVENT_EVERY == 0: + _log( + "tracking_evicted", + evictions_total=_evictions_total, + capacity=_MAX_TRACKED_SRCS, + tracker_name=tracker_name, + ) + + +def _track_lru(table: collections.OrderedDict, key: str, tracker_name: str) -> None: + """Touch key to MRU end; evict LRU entries if over capacity.""" + if key in table: + table.move_to_end(key) + while len(table) > _MAX_TRACKED_SRCS: + table.popitem(last=False) + _note_eviction(tracker_name) + +# ── Tunneling heuristic ─────────────────────────────────────────────────────── def _shannon_entropy(s: str) -> float: if not s: @@ -286,9 +364,8 @@ def _is_tunneling(qname: str, qtype: int, src: str) -> bool: if qtype == TYPE_TXT: now = time.monotonic() if src not in _txt_times: - if len(_txt_times) >= _MAX_TRACKED_SRCS: - _txt_times.popitem(last=False) _txt_times[src] = collections.deque() + _track_lru(_txt_times, src, "txt_times") q = _txt_times[src] q.append(now) while q and now - q[0] > _TXT_BURST_WINDOW: @@ -297,6 +374,64 @@ def _is_tunneling(qname: str, qtype: int, src: str) -> bool: return True return False +# ── Flood detection ─────────────────────────────────────────────────────────── + +def _check_flood(src: str, qtype_name: str) -> bool: + """Return True (and emit flood_suspect once per cooldown) if src is flooding.""" + now = time.monotonic() + if src not in _qps_window: + _qps_window[src] = collections.deque() + _track_lru(_qps_window, src, "qps_window") + q = _qps_window[src] + q.append(now) + while q and now - q[0] > _QPS_WINDOW_SEC: + q.popleft() + if len(q) >= _FLOOD_THRESHOLD: + last = _flood_cooldown.get(src, 0.0) + if now - last >= _FLOOD_COOLDOWN_SEC: + _flood_cooldown[src] = now + _log( + "flood_suspect", + src=src, + qps=len(q), + window_sec=_QPS_WINDOW_SEC, + sample_qtype=qtype_name, + ) + return True + return False + +# ── Recon burst aggregation ─────────────────────────────────────────────────── + +def _note_recon_event(src: str, event_type: str) -> None: + """Record a high-signal event; emit recon_burst if threshold met.""" + if event_type not in _RECON_SIGNAL_TYPES: + return + now = time.monotonic() + if src not in _recon_window: + _recon_window[src] = {} + _track_lru(_recon_window, src, "recon_window") + _recon_window[src][event_type] = now + # Prune events older than window + stale = [k for k, t in _recon_window[src].items() if now - t > _RECON_WINDOW_SEC] + for k in stale: + del _recon_window[src][k] + active = _recon_window[src] + if len(active) >= _RECON_DISTINCT_THRESHOLD: + last = _recon_cooldown.get(src, 0.0) + if now - last >= _RECON_COOLDOWN_SEC: + _recon_cooldown[src] = now + seq = sorted( + [(et, round(now - t, 1)) for et, t in active.items()], + key=lambda x: x[1], + ) + _log( + "recon_burst", + src=src, + distinct_types=len(active), + window_sec=_RECON_WINDOW_SEC, + sequence=str(seq), + ) + # ── Response builders ───────────────────────────────────────────────────────── def _refused_response(qid: int, rd: bool, qname: str, qtype: int, qclass: int) -> bytes: @@ -367,7 +502,7 @@ def _auth_response(qid: int, rd: bool, qname: str, qtype: int) -> bytes: if qtype in (TYPE_A, TYPE_ANY): ip_map = { - DOMAIN_BARE: ZONE_IP, + DOMAIN_BARE: ZONE_IP, f"www.{DOMAIN_BARE}": ZONE_IP, f"mail.{DOMAIN_BARE}": _fake_ip("mail"), f"ns1.{DOMAIN_BARE}": ZONE_IP, @@ -376,6 +511,17 @@ def _auth_response(qid: int, rd: bool, qname: str, qtype: int) -> bytes: if qname_bare in ip_map: answers.append(_rr(qname, TYPE_A, CLASS_IN, 300, _rdata_A(ip_map[qname_bare]))) + if qtype in (TYPE_AAAA, TYPE_ANY): + ipv6_map = { + DOMAIN_BARE: ZONE_IPV6, + f"www.{DOMAIN_BARE}": ZONE_IPV6, + f"mail.{DOMAIN_BARE}": _fake_ipv6("mail"), + f"ns1.{DOMAIN_BARE}": ZONE_IPV6, + f"ns2.{DOMAIN_BARE}": _NS2_IPV6, + } + if qname_bare in ipv6_map: + answers.append(_rr(qname, TYPE_AAAA, CLASS_IN, 300, _rdata_AAAA(ipv6_map[qname_bare]))) + if qtype in (TYPE_NS, TYPE_ANY) and qname_bare == DOMAIN_BARE: answers.append(_rr(DOMAIN, TYPE_NS, CLASS_IN, 3600, _rdata_NS(NS1))) answers.append(_rr(DOMAIN, TYPE_NS, CLASS_IN, 3600, _rdata_NS(NS2))) @@ -397,6 +543,8 @@ def _auth_response(qid: int, rd: bool, qname: str, qtype: int) -> bytes: continue if ertype == "A" and qtype in (TYPE_A, TYPE_ANY): answers.append(_rr(er_fqdn, TYPE_A, CLASS_IN, 300, _rdata_A(erval))) + elif ertype == "AAAA" and qtype in (TYPE_AAAA, TYPE_ANY): + answers.append(_rr(er_fqdn, TYPE_AAAA, CLASS_IN, 300, _rdata_AAAA(erval))) elif ertype == "TXT" and qtype in (TYPE_TXT, TYPE_ANY): answers.append(_rr(er_fqdn, TYPE_TXT, CLASS_IN, 300, _rdata_TXT(erval))) elif ertype == "CNAME" and qtype in (TYPE_A, TYPE_ANY): @@ -436,6 +584,9 @@ def _handle(data: bytes, src_ip: str, src_port: int, transport: str) -> bytes | qtype_name = _TYPE_NAMES.get(qtype, str(qtype)) qclass_name = _CLASS_NAMES.get(qclass, str(qclass)) + # Flood check runs on every packet (including CHAOS / transfer probes) + _check_flood(src_ip, qtype_name) + # ── Zone transfer ────────────────────────────────────────────────────── if qtype in (TYPE_AXFR, TYPE_IXFR): _log( @@ -444,6 +595,7 @@ def _handle(data: bytes, src_ip: str, src_port: int, transport: str) -> bytes | qname=qname.rstrip("."), qtype=qtype_name, qclass=qclass_name, zone=DOMAIN, ) + _note_recon_event(src_ip, "zone_transfer") return _refused_response(qid, rd, qname, qtype, qclass) # ── CHAOS fingerprinting ─────────────────────────────────────────────── @@ -459,6 +611,7 @@ def _handle(data: bytes, src_ip: str, src_port: int, transport: str) -> bytes | src=src_ip, src_port=src_port, transport=transport, probe=qname.rstrip("."), response=answer_text, ) + _note_recon_event(src_ip, "fingerprint_probe") if answer_text: return _chaos_txt_response(qid, rd, qname, answer_text) return _refused_response(qid, rd, qname, qtype, qclass) @@ -480,6 +633,7 @@ def _handle(data: bytes, src_ip: str, src_port: int, transport: str) -> bytes | _log("tunneling_suspect", **base) if is_amp: _log("amp_probe", **base) + _note_recon_event(src_ip, "amp_probe") if not is_tunnel and not is_amp: _log("query", **base) diff --git a/tests/service_testing/test_dns.py b/tests/service_testing/test_dns.py index 0a1f78ae..ce4999dc 100644 --- a/tests/service_testing/test_dns.py +++ b/tests/service_testing/test_dns.py @@ -1,7 +1,9 @@ """Tests for decnet/templates/dns/server.py and decnet/services/dns.py.""" import collections +import hashlib import importlib.util +import socket import struct import sys from types import ModuleType @@ -37,7 +39,7 @@ def _make_fake_instance_seed() -> ModuleType: mod.rng = _random.Random(42) mod.pick = lambda choices: list(choices)[0] mod.instance_uuid = lambda ns="": f"aaaabbbb-cccc-dddd-eeee-{ns[:12].ljust(12, '0')}" - mod.instance_hex = lambda nbytes, ns="": ("deadbeef" * 4)[:nbytes * 2] + mod.instance_hex = lambda nbytes, ns="": (hashlib.sha256(ns.encode()).hexdigest() * 4)[:nbytes * 2] mod.hostname = lambda: "testhost" mod.jitter = MagicMock() return mod @@ -68,8 +70,12 @@ def _load_dns(extra_env: dict | None = None): with patch.dict("os.environ", env, clear=False): spec.loader.exec_module(mod) # type: ignore[union-attr] - # Reset tunneling state between tests + # Reset per-src state between tests mod._txt_times.clear() + mod._qps_window.clear() + mod._flood_cooldown.clear() + mod._recon_window.clear() + mod._recon_cooldown.clear() return mod, bridge._events @@ -156,6 +162,71 @@ class TestAuthZone: assert resp is not None assert _rcode(resp) == mod.RCODE_NOERROR +# ── AAAA / IPv6 ─────────────────────────────────────────────────────────────── + +class TestAAAARecords: + def test_aaaa_apex(self): + mod, _ = _load_dns() + resp = mod._handle(_build_query("test.local", mod.TYPE_AAAA), "1.2.3.4", 1234, "udp") + assert resp is not None + assert _rcode(resp) == mod.RCODE_NOERROR + _, ancount, _, _ = _counts(resp) + assert ancount >= 1 + + def test_aaaa_rdata_is_16_bytes_and_ula(self): + mod, _ = _load_dns() + resp = mod._handle(_build_query("test.local", mod.TYPE_AAAA), "1.2.3.4", 1234, "udp") + assert resp is not None + # Walk past header(12) + question to reach answer RDATA + # Question: encoded "test.local" + 4 bytes type/class + # We just need to find a 16-byte block starting with 0xfd somewhere + # The AAAA RDATA is 16 bytes; first byte must be 0xfd (ULA) + assert b"\xfd" in resp # ULA fd::/8 + + def test_aaaa_www(self): + mod, _ = _load_dns() + resp = mod._handle(_build_query("www.test.local", mod.TYPE_AAAA), "1.2.3.4", 1234, "udp") + assert resp is not None + assert _rcode(resp) == mod.RCODE_NOERROR + _, ancount, _, _ = _counts(resp) + assert ancount >= 1 + + def test_aaaa_out_of_zone_refused(self): + mod, _ = _load_dns({"DNS_ZONE_MODE": "auth"}) + resp = mod._handle(_build_query("google.com", mod.TYPE_AAAA), "1.2.3.4", 1234, "udp") + assert resp is not None + assert _rcode(resp) == mod.RCODE_REFUSED + + def test_extra_record_aaaa(self): + mod, _ = _load_dns({"DNS_EXTRA_RECORDS": "ipv6host AAAA fd00::1234"}) + resp = mod._handle(_build_query("ipv6host.test.local", mod.TYPE_AAAA), "1.2.3.4", 1234, "udp") + assert resp is not None + assert _rcode(resp) == mod.RCODE_NOERROR + _, ancount, _, _ = _counts(resp) + assert ancount >= 1 + + def test_extra_record_invalid_aaaa_skipped(self): + """Invalid AAAA value in DNS_EXTRA_RECORDS must not crash the server.""" + mod, _ = _load_dns({"DNS_EXTRA_RECORDS": "badhost AAAA not-an-ipv6"}) + # If we got a module, the parser didn't crash + resp = mod._handle(_build_query("badhost.test.local", mod.TYPE_AAAA), "1.2.3.4", 1234, "udp") + assert resp is not None + assert _rcode(resp) == mod.RCODE_NXDOMAIN # record was silently dropped + + def test_fake_ipv6_returns_ula(self): + mod, _ = _load_dns() + ip6 = mod._fake_ipv6("test") + parsed = socket.inet_pton(socket.AF_INET6, ip6) + assert parsed[0] == 0xFD # first byte must be fd + + def test_fake_ipv6_deterministic(self): + mod, _ = _load_dns() + assert mod._fake_ipv6("x") == mod._fake_ipv6("x") + + def test_fake_ipv6_distinct_labels(self): + mod, _ = _load_dns() + assert mod._fake_ipv6("zone") != mod._fake_ipv6("ns2") + # ── Fingerprint probes ──────────────────────────────────────────────────────── class TestFingerprintProbe: @@ -269,6 +340,121 @@ class TestTunnelingHeuristic: mod._handle(query, "9.9.9.9", 1234, "udp") assert not _events_of(events, "query") +# ── Flood detection ─────────────────────────────────────────────────────────── + +class TestFloodDetection: + def test_flood_threshold_emits_flood_suspect(self): + mod, events = _load_dns() + src = "7.7.7.7" + # Send _FLOOD_THRESHOLD queries (default 50) in one shot + for i in range(mod._FLOOD_THRESHOLD): + mod._handle(_build_query(f"q{i}.test.local", mod.TYPE_A), src, 1234, "udp") + assert _events_of(events, "flood_suspect") + + def test_flood_suspect_fires_only_once_within_cooldown(self): + mod, events = _load_dns() + src = "8.8.8.8" + # Send well above threshold — should still be one event due to cooldown + for i in range(mod._FLOOD_THRESHOLD * 2): + mod._handle(_build_query(f"q{i}.test.local", mod.TYPE_A), src, 1234, "udp") + floods = _events_of(events, "flood_suspect") + assert len(floods) == 1 + + def test_flood_does_not_suppress_query_events(self): + """flood_suspect is additive — baseline query events still fire.""" + mod, events = _load_dns() + src = "9.9.9.8" + for i in range(mod._FLOOD_THRESHOLD): + mod._handle(_build_query(f"r{i}.test.local", mod.TYPE_A), src, 1234, "udp") + # Queries from a flooding src still produce query events + assert _events_of(events, "query") + + def test_flood_includes_qps_and_window(self): + mod, events = _load_dns() + src = "6.6.6.6" + for i in range(mod._FLOOD_THRESHOLD): + mod._handle(_build_query(f"q{i}.test.local", mod.TYPE_A), src, 1234, "udp") + floods = _events_of(events, "flood_suspect") + assert floods + assert "qps" in floods[0] + assert "window_sec" in floods[0] + + def test_tracking_evicted_on_lru_overflow(self): + mod, events = _load_dns() + # Fill qps_window beyond _MAX_TRACKED_SRCS to trigger eviction + # We need _EVICT_EVENT_EVERY evictions to fire tracking_evicted + evict_target = mod._EVICT_EVENT_EVERY + capacity = mod._MAX_TRACKED_SRCS + for i in range(capacity + evict_target): + src = f"10.{i >> 16 & 0xFF}.{i >> 8 & 0xFF}.{i & 0xFF}" + mod._handle(_build_query("test.local", mod.TYPE_A), src, 1234, "udp") + assert _events_of(events, "tracking_evicted") + +# ── Recon burst aggregation ─────────────────────────────────────────────────── + +class TestReconBurst: + def test_fingerprint_then_axfr_triggers_recon_burst(self): + mod, events = _load_dns() + src = "5.5.5.1" + # fingerprint_probe + mod._handle( + _build_query("version.bind", mod.TYPE_TXT, qclass=mod.CLASS_CH), + src, 1234, "udp", + ) + # zone_transfer + mod._handle(_build_query("test.local", mod.TYPE_AXFR), src, 1234, "tcp") + bursts = _events_of(events, "recon_burst") + assert bursts + assert bursts[0]["distinct_types"] == 2 + + def test_recon_burst_fires_only_once_within_cooldown(self): + mod, events = _load_dns() + src = "5.5.5.2" + for _ in range(3): + mod._handle( + _build_query("version.bind", mod.TYPE_TXT, qclass=mod.CLASS_CH), + src, 1234, "udp", + ) + mod._handle(_build_query("test.local", mod.TYPE_AXFR), src, 1234, "tcp") + bursts = _events_of(events, "recon_burst") + assert len(bursts) == 1 + + def test_recon_burst_different_srcs_no_cross_trigger(self): + mod, events = _load_dns() + # src A does fingerprint, src B does zone_transfer — no burst for either + mod._handle( + _build_query("version.bind", mod.TYPE_TXT, qclass=mod.CLASS_CH), + "5.5.5.3", 1234, "udp", + ) + mod._handle(_build_query("test.local", mod.TYPE_AXFR), "5.5.5.4", 1234, "tcp") + assert not _events_of(events, "recon_burst") + + def test_recon_burst_does_not_suppress_source_events(self): + mod, events = _load_dns() + src = "5.5.5.5" + mod._handle( + _build_query("version.bind", mod.TYPE_TXT, qclass=mod.CLASS_CH), + src, 1234, "udp", + ) + mod._handle(_build_query("test.local", mod.TYPE_AXFR), src, 1234, "tcp") + # Source events must still fire + assert _events_of(events, "fingerprint_probe") + assert _events_of(events, "zone_transfer") + # And the burst on top + assert _events_of(events, "recon_burst") + + def test_amp_plus_fingerprint_triggers_recon_burst(self): + mod, events = _load_dns() + src = "5.5.5.6" + mod._handle( + _build_query("version.bind", mod.TYPE_TXT, qclass=mod.CLASS_CH), + src, 1234, "udp", + ) + mod._handle(_build_query("test.local", mod.TYPE_ANY), src, 1234, "udp") + bursts = _events_of(events, "recon_burst") + assert bursts + assert bursts[0]["distinct_types"] == 2 + # ── Zone mode: open ─────────────────────────────────────────────────────────── class TestZoneModeOpen: