diff --git a/decnet/templates/dns/server.py b/decnet/templates/dns/server.py index c216b404..378b32d8 100644 --- a/decnet/templates/dns/server.py +++ b/decnet/templates/dns/server.py @@ -7,7 +7,7 @@ event_type values emitted: fingerprint_probe — version.bind / hostname.bind / id.server / opcode / flag / qclass probes zone_transfer — AXFR or IXFR (always REFUSED) amp_probe — qtype=ANY or EDNS requestor udp_size > 1232 - tunneling_suspect — long high-entropy labels or rapid TXT burst from same src + tunneling_suspect — long high-entropy labels, high-entropy subdomain, or rapid burst from same src flood_suspect — source exceeding QPS threshold within rolling window tracking_evicted — LRU state evicted (signals IP-rotation evasion) recon_burst — same source hit ≥2 distinct high-signal event types within 60s @@ -345,11 +345,13 @@ def _log(event_type: str, severity: int = 6, **kwargs) -> None: # ── Tunables ────────────────────────────────────────────────────────────────── # Tunneling heuristic -_SHANNON_THRESHOLD = 4.0 -_LABEL_LEN_THRESHOLD = 30 -_TXT_BURST_WINDOW = 10.0 # seconds -_TXT_BURST_COUNT = 5 -_MAX_TRACKED_SRCS = 1000 +_SHANNON_THRESHOLD = 4.0 +_LABEL_LEN_THRESHOLD = 30 +_QNAME_TOTAL_LEN_THRESHOLD = 50 +_QNAME_ENTROPY_THRESHOLD = 3.5 +_TXT_BURST_WINDOW = 10.0 # seconds +_TXT_BURST_COUNT = 5 +_MAX_TRACKED_SRCS = 1000 # Flood detection _QPS_WINDOW_SEC = 10.0 @@ -433,10 +435,23 @@ def _shannon_entropy(s: str) -> float: return -sum((v / n) * math.log2(v / n) for v in freq.values()) -def _is_tunneling(qname: str, qtype: int, src: str) -> bool: - for label in qname.rstrip(".").split("."): +def _is_tunneling(qname: str, qtype: int, src: str) -> str | None: + labels_all = qname.rstrip(".").split(".") + # Per-label check: any single label that is long AND high-entropy. + for label in labels_all: if len(label) >= _LABEL_LEN_THRESHOLD and _shannon_entropy(label) > _SHANNON_THRESHOLD: - return True + return "label_entropy" + # Full-subdomain check: strip zone suffix, concatenate remaining labels, test combined entropy. + # Catches split-label exfil where each label is short but the encoded payload spans many. + zone_label_count = len(DOMAIN_BARE.split(".")) + subdomain_labels = labels_all[:-zone_label_count] if len(labels_all) > zone_label_count else [] + if subdomain_labels: + subdomain_str = "".join(subdomain_labels) + if ( + len(subdomain_str) >= _QNAME_TOTAL_LEN_THRESHOLD + and _shannon_entropy(subdomain_str) >= _QNAME_ENTROPY_THRESHOLD + ): + return "qname_entropy" if qtype == TYPE_TXT: now = time.monotonic() if src not in _txt_times: @@ -447,8 +462,8 @@ def _is_tunneling(qname: str, qtype: int, src: str) -> bool: while q and now - q[0] > _TXT_BURST_WINDOW: q.popleft() if len(q) >= _TXT_BURST_COUNT: - return True - return False + return "burst" + return None # ── Flood detection ─────────────────────────────────────────────────────────── @@ -822,7 +837,7 @@ def _handle(data: bytes, src_ip: str, src_port: int, transport: str) -> bytes | edns_size=edns_size or 0, recursion_desired=rd, ) if is_tunnel: - _log("tunneling_suspect", **base) + _log("tunneling_suspect", tunnel_method=is_tunnel, **base) if is_amp: _log("amp_probe", **base) _note_recon_event(src_ip, "amp_probe") diff --git a/tests/service_testing/test_dns.py b/tests/service_testing/test_dns.py index daf93513..67ba6f1e 100644 --- a/tests/service_testing/test_dns.py +++ b/tests/service_testing/test_dns.py @@ -393,6 +393,51 @@ class TestTunnelingHeuristic: mod._handle(query, "9.9.9.9", 1234, "udp") assert not _events_of(events, "query") + def test_tunnel_method_label_entropy(self): + mod, events = _load_dns() + label = "abcdefghijklmnopqrstuvwxyz0123456789abcd" + mod._handle(_build_query(f"{label}.test.local", mod.TYPE_A), "9.9.9.9", 1234, "udp") + suspects = _events_of(events, "tunneling_suspect") + assert suspects and suspects[0]["tunnel_method"] == "label_entropy" + + def test_tunnel_method_burst(self): + mod, events = _load_dns() + src = "3.3.3.4" + for i in range(5): + mod._handle(_build_query(f"chunk{i}.test.local", mod.TYPE_TXT), src, 1234, "udp") + suspects = _events_of(events, "tunneling_suspect") + assert suspects and suspects[0]["tunnel_method"] == "burst" + + def test_short_label_high_entropy_qname_triggers_tunneling(self): + """Five 14-char high-entropy labels, each under the per-label threshold, + but combined subdomain length (70) and entropy exceed the qname thresholds.""" + mod, events = _load_dns() + # Each label: 14 chars, 14 distinct chars → entropy ≈ 3.8 per label + # Combined 70 chars → entropy ≈ 3.8 > _QNAME_ENTROPY_THRESHOLD (3.5) + # Individual label len = 14 < _LABEL_LEN_THRESHOLD (30) so per-label check is silent + label = "a1b2c3d4e5f6g7" + assert len(label) < mod._LABEL_LEN_THRESHOLD + qname = f"{label}.{label}.{label}.{label}.{label}.test.local" + query = _build_query(qname, mod.TYPE_A) + mod._handle(query, "4.4.4.4", 1234, "udp") + suspects = _events_of(events, "tunneling_suspect") + assert suspects, "expected tunneling_suspect from qname_entropy path" + assert suspects[0]["tunnel_method"] == "qname_entropy" + + def test_short_labels_low_entropy_no_tunneling(self): + """Short labels that individually and collectively have low entropy must not trigger.""" + mod, events = _load_dns() + # "aaaaaaaaaa" * 5 = 50 chars but entropy is 0 + qname = "aaaaaaaaaa.aaaaaaaaaa.aaaaaaaaaa.aaaaaaaaaa.aaaaaaaaaa.test.local" + mod._handle(_build_query(qname, mod.TYPE_A), "4.4.4.5", 1234, "udp") + assert not _events_of(events, "tunneling_suspect") + + def test_qname_entropy_check_ignores_zone_suffix(self): + """If the qname IS the zone apex (no subdomain labels), no qname_entropy check fires.""" + mod, events = _load_dns() + mod._handle(_build_query("test.local", mod.TYPE_A), "4.4.4.6", 1234, "udp") + assert not _events_of(events, "tunneling_suspect") + # ── Flood detection ─────────────────────────────────────────────────────────── class TestFloodDetection: