diff --git a/decnet/templates/dns/server.py b/decnet/templates/dns/server.py index d6381943..22d766c2 100644 --- a/decnet/templates/dns/server.py +++ b/decnet/templates/dns/server.py @@ -149,6 +149,7 @@ _TYPE_NAMES = { TYPE_AXFR: "AXFR", TYPE_OPT: "OPT", TYPE_ANY: "ANY", } _CLASS_NAMES = {CLASS_IN: "IN", CLASS_CH: "CH", CLASS_ANY: "ANY"} +_OPCODE_NAMES = {0: "query", 1: "iquery", 2: "status", 4: "notify", 5: "update"} # ── Wire codec ──────────────────────────────────────────────────────────────── @@ -480,6 +481,13 @@ def _refused_response(qid: int, rd: bool, qname: str, qtype: int, qclass: int) - return _build_header(qid, flags, 1, 0, 0, 0) + q +def _notimp_response(qid: int, opcode: int) -> bytes: + # QR=1, opcode echoed, RCODE=NOTIMP, no question/answer sections. + # Matches real BIND behaviour for unimplemented opcodes. + flags = (1 << 15) | ((opcode & 0x0F) << 11) | RCODE_NOTIMP + return _build_header(qid, flags, 0, 0, 0, 0) + + def _soa_rr(ttl: int = 300) -> bytes: rdata = _rdata_SOA( NS1, f"hostmaster.{DOMAIN_BARE}.", @@ -666,11 +674,29 @@ def _handle(data: bytes, src_ip: str, src_port: int, transport: str) -> bytes | transport=transport, length=len(data)) return None qid, flags_in, qdcount, ancount, nscount, arcount = struct.unpack_from(">HHHHHH", data, 0) + opcode = (flags_in >> 11) & 0x0F + rd = bool(flags_in & 0x0100) + tc = bool(flags_in & 0x0200) + ad = bool(flags_in & 0x0020) + cd = bool(flags_in & 0x0010) + z = bool(flags_in & 0x0040) + + # ── Unsupported opcode ───────────────────────────────────────────────── + # Before qdcount check: BIND returns NOTIMP regardless of question count. + if opcode != 0: + probe = f"opcode_{_OPCODE_NAMES.get(opcode, str(opcode))}" + _log( + "fingerprint_probe", severity=4, + src=src_ip, src_port=src_port, transport=transport, + probe=probe, opcode=opcode, + ) + _note_recon_event(src_ip, "fingerprint_probe") + return _notimp_response(qid, opcode) + if qdcount == 0: _log("empty_question_section", severity=5, src=src_ip, src_port=src_port, transport=transport, qid=qid) return None - rd = bool(flags_in & 0x0100) try: qname, qtype, qclass, _ = _parse_question(data, 12) @@ -688,6 +714,17 @@ def _handle(data: bytes, src_ip: str, src_port: int, transport: str) -> bytes | _log("multi_question", severity=5, src=src_ip, src_port=src_port, transport=transport, qdcount=qdcount, qname=qname.rstrip(".")) + # ── Header flag fingerprinting ───────────────────────────────────────── + # Z bit must-be-zero per RFC; AD+CD without RD is operationally nonsensical. + if z or (ad and cd and not rd): + _log( + "fingerprint_probe", severity=4, + src=src_ip, src_port=src_port, transport=transport, + probe="header_flags", qname=qname.rstrip("."), + opcode=opcode, ad=ad, cd=cd, z=z, tc=tc, + ) + _note_recon_event(src_ip, "fingerprint_probe") + # Flood check runs on every packet (including CHAOS / transfer probes) _check_flood(src_ip, qtype_name) diff --git a/tests/service_testing/test_dns.py b/tests/service_testing/test_dns.py index 2ee67045..b1c234f7 100644 --- a/tests/service_testing/test_dns.py +++ b/tests/service_testing/test_dns.py @@ -86,9 +86,10 @@ def _build_query( qclass: int = 1, qid: int = 0x1234, rd: bool = True, + extra_flags: int = 0, ) -> bytes: """Minimal DNS query wire packet.""" - flags = 0x0100 if rd else 0x0000 + flags = (0x0100 if rd else 0x0000) | extra_flags header = struct.pack(">HHHHHH", qid, flags, 1, 0, 0, 0) wire = b"" for label in qname.rstrip(".").split("."): @@ -507,6 +508,83 @@ class TestClassAnyProbe: mod._handle(pkt, "1.1.1.1", 53, "udp") assert not _events_of(events, "fingerprint_probe") +# ── Header flag fingerprinting ──────────────────────────────────────────────── + +class TestHeaderFlagFingerprint: + def _opcode_pkt(self, mod, qname: str, opcode: int) -> bytes: + # Build a raw 12-byte header with no question section — opcode block + # fires before question parse, so we don't need a valid question. + flags = (opcode & 0x0F) << 11 + return struct.pack(">HHHHHH", 0x1234, flags, 0, 0, 0, 0) + b"\x00" * 4 + + def test_opcode_update_emits_fingerprint_probe_notimp(self): + mod, events = _load_dns() + # UPDATE opcode=5; pad to 12 bytes minimum + flags = (5 << 11) + pkt = struct.pack(">HHHHHH", 0xABCD, flags, 0, 0, 0, 0) + resp = mod._handle(pkt, "7.7.7.7", 53, "udp") + assert resp is not None + # RCODE must be NOTIMP (4) + assert struct.unpack_from(">H", resp, 2)[0] & 0x0F == mod.RCODE_NOTIMP + # opcode in response header echoes the request opcode + assert (struct.unpack_from(">H", resp, 2)[0] >> 11) & 0x0F == 5 + probes = _events_of(events, "fingerprint_probe") + assert len(probes) == 1 + assert probes[0]["probe"] == "opcode_update" + assert probes[0]["opcode"] == 5 + + def test_opcode_iquery_emits_fingerprint_probe(self): + mod, events = _load_dns() + flags = (1 << 11) + pkt = struct.pack(">HHHHHH", 0x0001, flags, 0, 0, 0, 0) + resp = mod._handle(pkt, "8.8.8.8", 53, "udp") + assert resp is not None + assert struct.unpack_from(">H", resp, 2)[0] & 0x0F == mod.RCODE_NOTIMP + probes = _events_of(events, "fingerprint_probe") + assert probes[0]["probe"] == "opcode_iquery" + + def test_opcode_notify_emits_opcode_notify(self): + mod, events = _load_dns() + flags = (4 << 11) + pkt = struct.pack(">HHHHHH", 0x0002, flags, 0, 0, 0, 0) + mod._handle(pkt, "9.9.9.8", 53, "udp") + probes = _events_of(events, "fingerprint_probe") + assert probes[0]["probe"] == "opcode_notify" + + def test_z_bit_emits_header_flags_probe(self): + # Z=0x0040 in the flags word + mod, events = _load_dns() + pkt = _build_query("test.local", mod.TYPE_A, extra_flags=0x0040) + resp = mod._handle(pkt, "2.2.2.2", 53, "udp") + assert resp is not None + probes = _events_of(events, "fingerprint_probe") + assert any(p["probe"] == "header_flags" and p["z"] for p in probes) + + def test_ad_cd_without_rd_emits_header_flags_probe(self): + # AD=0x0020, CD=0x0010, RD=0 (rd=False) + mod, events = _load_dns() + pkt = _build_query("test.local", mod.TYPE_A, rd=False, extra_flags=0x0030) + mod._handle(pkt, "3.3.3.3", 53, "udp") + probes = _events_of(events, "fingerprint_probe") + assert any(p["probe"] == "header_flags" and p["ad"] and p["cd"] for p in probes) + + def test_ad_with_rd_is_not_a_probe(self): + """AD set with RD=1 is a legitimate DNSSEC-aware stub — should not escalate.""" + mod, events = _load_dns() + pkt = _build_query("test.local", mod.TYPE_A, rd=True, extra_flags=0x0020) + mod._handle(pkt, "4.4.4.4", 53, "udp") + assert not any(p["probe"] == "header_flags" for p in _events_of(events, "fingerprint_probe")) + + def test_opcode_fires_before_qclass_any_no_double_count(self): + """A packet with opcode=update AND qclass=ANY must emit exactly one probe (opcode).""" + mod, events = _load_dns() + flags = (5 << 11) + pkt = struct.pack(">HHHHHH", 0xBEEF, flags, 0, 0, 0, 0) + mod._handle(pkt, "5.5.5.5", 53, "udp") + probes = _events_of(events, "fingerprint_probe") + assert len(probes) == 1 + assert probes[0]["probe"] == "opcode_update" + # ── Zone mode: open ─────────────────────────────────────────────────────────── class TestZoneModeOpen: