diff --git a/decnet/templates/dns/server.py b/decnet/templates/dns/server.py index d3572a39..d6381943 100644 --- a/decnet/templates/dns/server.py +++ b/decnet/templates/dns/server.py @@ -715,6 +715,16 @@ def _handle(data: bytes, src_ip: str, src_port: int, transport: str) -> bytes | return _chaos_txt_response(qid, rd, qname, answer_text) return _refused_response(qid, rd, qname, qtype, qclass) + # ── CLASS=ANY fingerprint probe ──────────────────────────────────────── + if qclass == CLASS_ANY: + _log( + "fingerprint_probe", severity=4, + src=src_ip, src_port=src_port, transport=transport, + probe="qclass_any", qname=qname.rstrip("."), qtype=qtype_name, + ) + _note_recon_event(src_ip, "fingerprint_probe") + return _refused_response(qid, rd, qname, qtype, qclass) + # ── Classify amp / tunneling ─────────────────────────────────────────── is_amp = qtype == TYPE_ANY or (edns_size is not None and edns_size > 1232) is_tunnel = _is_tunneling(qname, qtype, src_ip) diff --git a/tests/service_testing/test_dns.py b/tests/service_testing/test_dns.py index 835d8308..2ee67045 100644 --- a/tests/service_testing/test_dns.py +++ b/tests/service_testing/test_dns.py @@ -476,6 +476,37 @@ class TestReconBurst: assert bursts assert bursts[0]["distinct_types"] == 2 +# ── CLASS=ANY fingerprint probe ─────────────────────────────────────────────── + +class TestClassAnyProbe: + def test_class_any_emits_fingerprint_probe(self): + mod, events = _load_dns() + pkt = _build_query("example.test.local", mod.TYPE_A, qclass=mod.CLASS_ANY) + resp = mod._handle(pkt, "9.9.9.9", 53, "udp") + assert resp is not None + assert _rcode(resp) == mod.RCODE_REFUSED + probes = _events_of(events, "fingerprint_probe") + assert len(probes) == 1 + assert probes[0]["probe"] == "qclass_any" + assert probes[0]["qname"] == "example.test.local" + + def test_class_any_counts_toward_recon_burst(self): + mod, events = _load_dns() + pkt = _build_query("x.test.local", mod.TYPE_A, qclass=mod.CLASS_ANY) + for _ in range(3): + mod._handle(pkt, "6.6.6.6", 53, "udp") + # Should accumulate; also trigger a second distinct probe type to fire burst + zxfr = _build_query("test.local", mod.TYPE_AXFR) + mod._handle(zxfr, "6.6.6.6", 53, "tcp") + assert len(_events_of(events, "recon_burst")) >= 1 + + def test_class_in_is_not_affected(self): + """Regular CLASS_IN queries must NOT trigger qclass_any.""" + mod, events = _load_dns() + pkt = _build_query("test.local", mod.TYPE_A, qclass=mod.CLASS_IN) + mod._handle(pkt, "1.1.1.1", 53, "udp") + assert not _events_of(events, "fingerprint_probe") + # ── Zone mode: open ─────────────────────────────────────────────────────────── class TestZoneModeOpen: