diff --git a/decnet/templates/dns/server.py b/decnet/templates/dns/server.py index 22d766c2..c216b404 100644 --- a/decnet/templates/dns/server.py +++ b/decnet/templates/dns/server.py @@ -265,8 +265,16 @@ def _parse_question(data: bytes, offset: int) -> tuple[str, int, int, int]: return qname, qtype, qclass, offset + 4 -def _parse_edns_size(data: bytes, qdcount: int, ancount: int, nscount: int, arcount: int) -> int | None: - """Walk to the additional section; return requestor UDP size if OPT found.""" +def _parse_opt_record( + data: bytes, qdcount: int, ancount: int, nscount: int, arcount: int +) -> dict | None: + """Walk to the additional section; parse the OPT record if present. + + Returns a dict with: + udp_size (int), ext_rcode (int), version (int), do_bit (bool), z (int), + options (list of (code, length, data_bytes) tuples) + or None if no OPT record is found or the packet is malformed. + """ if arcount == 0: return None offset = 12 @@ -289,8 +297,35 @@ def _parse_edns_size(data: bytes, qdcount: int, ancount: int, nscount: int, arco return None rtype = struct.unpack_from(">H", data, offset + 1)[0] if rtype == TYPE_OPT: - udp_size = struct.unpack_from(">H", data, offset + 3)[0] - return udp_size + udp_size = struct.unpack_from(">H", data, offset + 3)[0] + # TTL field encodes: ext_rcode(8) | version(8) | DO+Z(16) + ttl_raw = struct.unpack_from(">I", data, offset + 5)[0] + ext_rcode = (ttl_raw >> 24) & 0xFF + version = (ttl_raw >> 16) & 0xFF + do_bit = bool(ttl_raw & 0x8000) + z_bits = ttl_raw & 0x7FFF + rdlen = struct.unpack_from(">H", data, offset + 9)[0] + rdata_start = offset + 11 + rdata_end = rdata_start + rdlen + if rdata_end > len(data): + rdata_end = len(data) + rdata = data[rdata_start:rdata_end] + options: list[tuple[int, int, bytes]] = [] + pos = 0 + while pos + 4 <= len(rdata): + opt_code = struct.unpack_from(">H", rdata, pos)[0] + opt_len = struct.unpack_from(">H", rdata, pos + 2)[0] + opt_data = rdata[pos + 4 : pos + 4 + opt_len] + options.append((opt_code, opt_len, opt_data)) + pos += 4 + opt_len + return { + "udp_size": udp_size, + "ext_rcode": ext_rcode, + "version": version, + "do_bit": do_bit, + "z": z_bits, + "options": options, + } _, offset = _decode_name(data, offset) if offset + 10 > len(data): return None @@ -705,7 +740,18 @@ def _handle(data: bytes, src_ip: str, src_port: int, transport: str) -> bytes | transport=transport, reason=str(exc)[:64]) return None - edns_size = _parse_edns_size(data, qdcount, ancount, nscount, arcount) + opt = _parse_opt_record(data, qdcount, ancount, nscount, arcount) + edns_size = opt["udp_size"] if opt else None + + # ── EDNS NSID request (option code 3) ───────────────────────────────── + if opt and any(code == 3 for code, _len, _data in opt["options"]): + _log( + "fingerprint_probe", severity=4, + src=src_ip, src_port=src_port, transport=transport, + probe="edns_nsid", qname=qname.rstrip("."), + qtype=_TYPE_NAMES.get(qtype, str(qtype)), + ) + _note_recon_event(src_ip, "fingerprint_probe") qtype_name = _TYPE_NAMES.get(qtype, str(qtype)) qclass_name = _CLASS_NAMES.get(qclass, str(qclass)) diff --git a/tests/service_testing/test_dns.py b/tests/service_testing/test_dns.py index b1c234f7..daf93513 100644 --- a/tests/service_testing/test_dns.py +++ b/tests/service_testing/test_dns.py @@ -111,6 +111,37 @@ def _counts(data: bytes) -> tuple[int, int, int, int]: def _events_of(events: list, kind: str) -> list[dict]: return [fields for etype, fields in events if etype == kind] + +def _build_opt_rr(udp_size: int = 4096, options: list[tuple[int, bytes]] = []) -> bytes: + """Build an OPT additional record (owner=root, TYPE=41).""" + rdata = b"" + for code, opt_data in options: + rdata += struct.pack(">HH", code, len(opt_data)) + opt_data + # Root label (1 byte) + TYPE(2) + CLASS=udp_size(2) + TTL(4) + RDLEN(2) + RDATA + return b"\x00" + struct.pack(">HHIH", 41, udp_size, 0, len(rdata)) + rdata + + +def _build_query_with_opt( + qname: str, + qtype: int, + qclass: int = 1, + qid: int = 0x1234, + rd: bool = True, + udp_size: int = 4096, + opt_options: list[tuple[int, bytes]] | None = None, +) -> bytes: + """DNS query with an OPT additional record, optionally carrying sub-options.""" + flags = 0x0100 if rd else 0x0000 + wire = b"" + for label in qname.rstrip(".").split("."): + enc = label.encode("ascii") + wire += bytes([len(enc)]) + enc + wire += b"\x00" + question = wire + struct.pack(">HH", qtype, qclass) + opt = _build_opt_rr(udp_size, opt_options or []) + header = struct.pack(">HHHHHH", qid, flags, 1, 0, 0, 1) # arcount=1 + return header + question + opt + # ── Auth zone ───────────────────────────────────────────────────────────────── class TestAuthZone: @@ -934,3 +965,77 @@ class TestParseHygiene: pkt = struct.pack(">HHHHHH", 0x0001, 0x0100, 1, 0, 0, 0) mod._handle(pkt, "3.3.3.3", 1053, "udp") assert len(_events_of(events, "malformed_packet")) == 0 + + +# ── EDNS sub-option parsing ─────────────────────────────────────────────────── + +class TestEDNSOptions: + def test_nsid_option_emits_fingerprint_probe(self): + mod, events = _load_dns() + # NSID option code is 3; client sends empty data to request NSID + pkt = _build_query_with_opt( + "test.local", mod.TYPE_A, opt_options=[(3, b"")] + ) + resp = mod._handle(pkt, "10.0.0.2", 53, "udp") + assert resp is not None + probes = _events_of(events, "fingerprint_probe") + assert len(probes) == 1 + assert probes[0]["probe"] == "edns_nsid" + assert probes[0]["qname"] == "test.local" + + def test_nsid_option_still_answers_query(self): + """NSID probe still gets a response — we answer normally.""" + mod, events = _load_dns() + pkt = _build_query_with_opt( + "test.local", mod.TYPE_A, opt_options=[(3, b"")] + ) + resp = mod._handle(pkt, "10.0.0.2", 53, "udp") + assert resp is not None + assert _rcode(resp) in (mod.RCODE_NOERROR, mod.RCODE_REFUSED) + + def test_cookie_option_does_not_emit_probe(self): + """COOKIE (code=10) is not a fingerprint signal — no probe event.""" + mod, events = _load_dns() + # 8-byte client cookie + pkt = _build_query_with_opt( + "test.local", mod.TYPE_A, opt_options=[(10, b"\x01\x02\x03\x04\x05\x06\x07\x08")] + ) + mod._handle(pkt, "11.0.0.1", 53, "udp") + assert not _events_of(events, "fingerprint_probe") + + def test_do_bit_alone_does_not_emit_probe(self): + """DO bit set in EDNS is normal DNSSEC behaviour — not a probe signal.""" + mod, events = _load_dns() + # TTL with DO=0x8000 in high half + wire = b"" + for label in "test.local".split("."): + enc = label.encode("ascii") + wire += bytes([len(enc)]) + enc + wire += b"\x00" + question = wire + struct.pack(">HH", mod.TYPE_A, 1) + # OPT with DO bit set in TTL + opt = b"\x00" + struct.pack(">HHIH", 41, 4096, 0x00008000, 0) + header = struct.pack(">HHHHHH", 0x1234, 0x0100, 1, 0, 0, 1) + pkt = header + question + opt + mod._handle(pkt, "12.0.0.1", 53, "udp") + assert not _events_of(events, "fingerprint_probe") + + def test_edns_size_still_drives_amp_probe(self): + """udp_size from OPT must still feed the amp_probe classifier.""" + mod, events = _load_dns() + pkt = _build_query_with_opt("test.local", mod.TYPE_A, udp_size=4096) + mod._handle(pkt, "13.0.0.1", 53, "udp") + # udp_size=4096 > 1232 → amp_probe + assert len(_events_of(events, "amp_probe")) == 1 + + def test_parse_opt_record_returns_dict(self): + """Direct unit test for _parse_opt_record with NSID option.""" + mod, _ = _load_dns() + pkt = _build_query_with_opt( + "test.local", mod.TYPE_A, udp_size=512, opt_options=[(3, b"\xde\xad")] + ) + qid, flags, qdcount, ancount, nscount, arcount = struct.unpack_from(">HHHHHH", pkt, 0) + result = mod._parse_opt_record(pkt, qdcount, ancount, nscount, arcount) + assert result is not None + assert result["udp_size"] == 512 + assert any(code == 3 for code, _l, _d in result["options"])