feat(dns): full EDNS sub-option parsing and NSID request detection

_parse_edns_size only extracted the requestor UDP size; every other field in
the OPT record (DO bit, EDNS version, extended RCODE, all sub-options) was
invisible.  Replaced with _parse_opt_record returning a full dict:
  udp_size, ext_rcode, version, do_bit, z, options[(code, len, data)]

NSID request (option code 3) is now detected as fingerprint_probe with
probe=edns_nsid and contributes to recon_burst.  DO bit, COOKIE (10), and
other options are not escalated; udp_size continues to drive amp_probe.
This commit is contained in:
2026-05-21 21:20:57 -04:00
parent 4dadeb9aba
commit a6b5b1a7f8
2 changed files with 156 additions and 5 deletions

View File

@@ -111,6 +111,37 @@ def _counts(data: bytes) -> tuple[int, int, int, int]:
def _events_of(events: list, kind: str) -> list[dict]:
return [fields for etype, fields in events if etype == kind]
def _build_opt_rr(udp_size: int = 4096, options: list[tuple[int, bytes]] = []) -> bytes:
"""Build an OPT additional record (owner=root, TYPE=41)."""
rdata = b""
for code, opt_data in options:
rdata += struct.pack(">HH", code, len(opt_data)) + opt_data
# Root label (1 byte) + TYPE(2) + CLASS=udp_size(2) + TTL(4) + RDLEN(2) + RDATA
return b"\x00" + struct.pack(">HHIH", 41, udp_size, 0, len(rdata)) + rdata
def _build_query_with_opt(
qname: str,
qtype: int,
qclass: int = 1,
qid: int = 0x1234,
rd: bool = True,
udp_size: int = 4096,
opt_options: list[tuple[int, bytes]] | None = None,
) -> bytes:
"""DNS query with an OPT additional record, optionally carrying sub-options."""
flags = 0x0100 if rd else 0x0000
wire = b""
for label in qname.rstrip(".").split("."):
enc = label.encode("ascii")
wire += bytes([len(enc)]) + enc
wire += b"\x00"
question = wire + struct.pack(">HH", qtype, qclass)
opt = _build_opt_rr(udp_size, opt_options or [])
header = struct.pack(">HHHHHH", qid, flags, 1, 0, 0, 1) # arcount=1
return header + question + opt
# ── Auth zone ─────────────────────────────────────────────────────────────────
class TestAuthZone:
@@ -934,3 +965,77 @@ class TestParseHygiene:
pkt = struct.pack(">HHHHHH", 0x0001, 0x0100, 1, 0, 0, 0)
mod._handle(pkt, "3.3.3.3", 1053, "udp")
assert len(_events_of(events, "malformed_packet")) == 0
# ── EDNS sub-option parsing ───────────────────────────────────────────────────
class TestEDNSOptions:
def test_nsid_option_emits_fingerprint_probe(self):
mod, events = _load_dns()
# NSID option code is 3; client sends empty data to request NSID
pkt = _build_query_with_opt(
"test.local", mod.TYPE_A, opt_options=[(3, b"")]
)
resp = mod._handle(pkt, "10.0.0.2", 53, "udp")
assert resp is not None
probes = _events_of(events, "fingerprint_probe")
assert len(probes) == 1
assert probes[0]["probe"] == "edns_nsid"
assert probes[0]["qname"] == "test.local"
def test_nsid_option_still_answers_query(self):
"""NSID probe still gets a response — we answer normally."""
mod, events = _load_dns()
pkt = _build_query_with_opt(
"test.local", mod.TYPE_A, opt_options=[(3, b"")]
)
resp = mod._handle(pkt, "10.0.0.2", 53, "udp")
assert resp is not None
assert _rcode(resp) in (mod.RCODE_NOERROR, mod.RCODE_REFUSED)
def test_cookie_option_does_not_emit_probe(self):
"""COOKIE (code=10) is not a fingerprint signal — no probe event."""
mod, events = _load_dns()
# 8-byte client cookie
pkt = _build_query_with_opt(
"test.local", mod.TYPE_A, opt_options=[(10, b"\x01\x02\x03\x04\x05\x06\x07\x08")]
)
mod._handle(pkt, "11.0.0.1", 53, "udp")
assert not _events_of(events, "fingerprint_probe")
def test_do_bit_alone_does_not_emit_probe(self):
"""DO bit set in EDNS is normal DNSSEC behaviour — not a probe signal."""
mod, events = _load_dns()
# TTL with DO=0x8000 in high half
wire = b""
for label in "test.local".split("."):
enc = label.encode("ascii")
wire += bytes([len(enc)]) + enc
wire += b"\x00"
question = wire + struct.pack(">HH", mod.TYPE_A, 1)
# OPT with DO bit set in TTL
opt = b"\x00" + struct.pack(">HHIH", 41, 4096, 0x00008000, 0)
header = struct.pack(">HHHHHH", 0x1234, 0x0100, 1, 0, 0, 1)
pkt = header + question + opt
mod._handle(pkt, "12.0.0.1", 53, "udp")
assert not _events_of(events, "fingerprint_probe")
def test_edns_size_still_drives_amp_probe(self):
"""udp_size from OPT must still feed the amp_probe classifier."""
mod, events = _load_dns()
pkt = _build_query_with_opt("test.local", mod.TYPE_A, udp_size=4096)
mod._handle(pkt, "13.0.0.1", 53, "udp")
# udp_size=4096 > 1232 → amp_probe
assert len(_events_of(events, "amp_probe")) == 1
def test_parse_opt_record_returns_dict(self):
"""Direct unit test for _parse_opt_record with NSID option."""
mod, _ = _load_dns()
pkt = _build_query_with_opt(
"test.local", mod.TYPE_A, udp_size=512, opt_options=[(3, b"\xde\xad")]
)
qid, flags, qdcount, ancount, nscount, arcount = struct.unpack_from(">HHHHHH", pkt, 0)
result = mod._parse_opt_record(pkt, qdcount, ancount, nscount, arcount)
assert result is not None
assert result["udp_size"] == 512
assert any(code == 3 for code, _l, _d in result["options"])