feat(dns): full-subdomain entropy check catches short-label exfil

_is_tunneling now returns str|None (the detection method) instead of bool.
Two new tunables _QNAME_TOTAL_LEN_THRESHOLD=50 and _QNAME_ENTROPY_THRESHOLD=3.5
catch attackers who split a high-entropy payload across multiple short labels.
tunnel_method field added to tunneling_suspect events for downstream correlation.
This commit is contained in:
2026-05-21 22:06:14 -04:00
parent a6b5b1a7f8
commit 9e3473b370
2 changed files with 72 additions and 12 deletions

View File

@@ -7,7 +7,7 @@ event_type values emitted:
fingerprint_probe — version.bind / hostname.bind / id.server / opcode / flag / qclass probes fingerprint_probe — version.bind / hostname.bind / id.server / opcode / flag / qclass probes
zone_transfer — AXFR or IXFR (always REFUSED) zone_transfer — AXFR or IXFR (always REFUSED)
amp_probe — qtype=ANY or EDNS requestor udp_size > 1232 amp_probe — qtype=ANY or EDNS requestor udp_size > 1232
tunneling_suspect — long high-entropy labels or rapid TXT burst from same src tunneling_suspect — long high-entropy labels, high-entropy subdomain, or rapid burst from same src
flood_suspect — source exceeding QPS threshold within rolling window flood_suspect — source exceeding QPS threshold within rolling window
tracking_evicted — LRU state evicted (signals IP-rotation evasion) tracking_evicted — LRU state evicted (signals IP-rotation evasion)
recon_burst — same source hit ≥2 distinct high-signal event types within 60s recon_burst — same source hit ≥2 distinct high-signal event types within 60s
@@ -345,11 +345,13 @@ def _log(event_type: str, severity: int = 6, **kwargs) -> None:
# ── Tunables ────────────────────────────────────────────────────────────────── # ── Tunables ──────────────────────────────────────────────────────────────────
# Tunneling heuristic # Tunneling heuristic
_SHANNON_THRESHOLD = 4.0 _SHANNON_THRESHOLD = 4.0
_LABEL_LEN_THRESHOLD = 30 _LABEL_LEN_THRESHOLD = 30
_TXT_BURST_WINDOW = 10.0 # seconds _QNAME_TOTAL_LEN_THRESHOLD = 50
_TXT_BURST_COUNT = 5 _QNAME_ENTROPY_THRESHOLD = 3.5
_MAX_TRACKED_SRCS = 1000 _TXT_BURST_WINDOW = 10.0 # seconds
_TXT_BURST_COUNT = 5
_MAX_TRACKED_SRCS = 1000
# Flood detection # Flood detection
_QPS_WINDOW_SEC = 10.0 _QPS_WINDOW_SEC = 10.0
@@ -433,10 +435,23 @@ def _shannon_entropy(s: str) -> float:
return -sum((v / n) * math.log2(v / n) for v in freq.values()) return -sum((v / n) * math.log2(v / n) for v in freq.values())
def _is_tunneling(qname: str, qtype: int, src: str) -> bool: def _is_tunneling(qname: str, qtype: int, src: str) -> str | None:
for label in qname.rstrip(".").split("."): labels_all = qname.rstrip(".").split(".")
# Per-label check: any single label that is long AND high-entropy.
for label in labels_all:
if len(label) >= _LABEL_LEN_THRESHOLD and _shannon_entropy(label) > _SHANNON_THRESHOLD: if len(label) >= _LABEL_LEN_THRESHOLD and _shannon_entropy(label) > _SHANNON_THRESHOLD:
return True return "label_entropy"
# Full-subdomain check: strip zone suffix, concatenate remaining labels, test combined entropy.
# Catches split-label exfil where each label is short but the encoded payload spans many.
zone_label_count = len(DOMAIN_BARE.split("."))
subdomain_labels = labels_all[:-zone_label_count] if len(labels_all) > zone_label_count else []
if subdomain_labels:
subdomain_str = "".join(subdomain_labels)
if (
len(subdomain_str) >= _QNAME_TOTAL_LEN_THRESHOLD
and _shannon_entropy(subdomain_str) >= _QNAME_ENTROPY_THRESHOLD
):
return "qname_entropy"
if qtype == TYPE_TXT: if qtype == TYPE_TXT:
now = time.monotonic() now = time.monotonic()
if src not in _txt_times: if src not in _txt_times:
@@ -447,8 +462,8 @@ def _is_tunneling(qname: str, qtype: int, src: str) -> bool:
while q and now - q[0] > _TXT_BURST_WINDOW: while q and now - q[0] > _TXT_BURST_WINDOW:
q.popleft() q.popleft()
if len(q) >= _TXT_BURST_COUNT: if len(q) >= _TXT_BURST_COUNT:
return True return "burst"
return False return None
# ── Flood detection ─────────────────────────────────────────────────────────── # ── Flood detection ───────────────────────────────────────────────────────────
@@ -822,7 +837,7 @@ def _handle(data: bytes, src_ip: str, src_port: int, transport: str) -> bytes |
edns_size=edns_size or 0, recursion_desired=rd, edns_size=edns_size or 0, recursion_desired=rd,
) )
if is_tunnel: if is_tunnel:
_log("tunneling_suspect", **base) _log("tunneling_suspect", tunnel_method=is_tunnel, **base)
if is_amp: if is_amp:
_log("amp_probe", **base) _log("amp_probe", **base)
_note_recon_event(src_ip, "amp_probe") _note_recon_event(src_ip, "amp_probe")

View File

@@ -393,6 +393,51 @@ class TestTunnelingHeuristic:
mod._handle(query, "9.9.9.9", 1234, "udp") mod._handle(query, "9.9.9.9", 1234, "udp")
assert not _events_of(events, "query") assert not _events_of(events, "query")
def test_tunnel_method_label_entropy(self):
mod, events = _load_dns()
label = "abcdefghijklmnopqrstuvwxyz0123456789abcd"
mod._handle(_build_query(f"{label}.test.local", mod.TYPE_A), "9.9.9.9", 1234, "udp")
suspects = _events_of(events, "tunneling_suspect")
assert suspects and suspects[0]["tunnel_method"] == "label_entropy"
def test_tunnel_method_burst(self):
mod, events = _load_dns()
src = "3.3.3.4"
for i in range(5):
mod._handle(_build_query(f"chunk{i}.test.local", mod.TYPE_TXT), src, 1234, "udp")
suspects = _events_of(events, "tunneling_suspect")
assert suspects and suspects[0]["tunnel_method"] == "burst"
def test_short_label_high_entropy_qname_triggers_tunneling(self):
"""Five 14-char high-entropy labels, each under the per-label threshold,
but combined subdomain length (70) and entropy exceed the qname thresholds."""
mod, events = _load_dns()
# Each label: 14 chars, 14 distinct chars → entropy ≈ 3.8 per label
# Combined 70 chars → entropy ≈ 3.8 > _QNAME_ENTROPY_THRESHOLD (3.5)
# Individual label len = 14 < _LABEL_LEN_THRESHOLD (30) so per-label check is silent
label = "a1b2c3d4e5f6g7"
assert len(label) < mod._LABEL_LEN_THRESHOLD
qname = f"{label}.{label}.{label}.{label}.{label}.test.local"
query = _build_query(qname, mod.TYPE_A)
mod._handle(query, "4.4.4.4", 1234, "udp")
suspects = _events_of(events, "tunneling_suspect")
assert suspects, "expected tunneling_suspect from qname_entropy path"
assert suspects[0]["tunnel_method"] == "qname_entropy"
def test_short_labels_low_entropy_no_tunneling(self):
"""Short labels that individually and collectively have low entropy must not trigger."""
mod, events = _load_dns()
# "aaaaaaaaaa" * 5 = 50 chars but entropy is 0
qname = "aaaaaaaaaa.aaaaaaaaaa.aaaaaaaaaa.aaaaaaaaaa.aaaaaaaaaa.test.local"
mod._handle(_build_query(qname, mod.TYPE_A), "4.4.4.5", 1234, "udp")
assert not _events_of(events, "tunneling_suspect")
def test_qname_entropy_check_ignores_zone_suffix(self):
"""If the qname IS the zone apex (no subdomain labels), no qname_entropy check fires."""
mod, events = _load_dns()
mod._handle(_build_query("test.local", mod.TYPE_A), "4.4.4.6", 1234, "udp")
assert not _events_of(events, "tunneling_suspect")
# ── Flood detection ─────────────────────────────────────────────────────────── # ── Flood detection ───────────────────────────────────────────────────────────
class TestFloodDetection: class TestFloodDetection: