feat(dns): hoist CHAOS probe map to module level, add authors.bind. entry

The inline probe_map dict inside _handle made tests blind to the probe
catalogue and couldn't be extended without touching the hot path.  It is now
module-level _CHAOS_PROBE_MAP.  authors.bind. joins the three existing entries
so it gets named correctly instead of carrying the raw qname.
This commit is contained in:
2026-05-21 21:15:58 -04:00
parent 629f969eb6
commit 521d77b28f
2 changed files with 34 additions and 6 deletions

View File

@@ -37,6 +37,7 @@ SERVICE_NAME = "dns"
LOG_TARGET = os.environ.get("LOG_TARGET", "")
ZONE_MODE = os.environ.get("DNS_ZONE_MODE", "auth")
BIND_VERSION = os.environ.get("DNS_BIND_VERSION", "9.11.4-P2-RedHat-9.11.4-26.P2.el7_9.10")
_AUTHORS = os.environ.get("DNS_AUTHORS", "BIND9 Developers")
_NSID_RAW = os.environ.get("DNS_NSID", "")
_EXTRA_RAW = os.environ.get("DNS_EXTRA_RECORDS", "")
REAL_RECURSIVE = os.environ.get("DNS_REAL_RECURSIVE", "").lower() in ("1", "true", "yes")
@@ -70,10 +71,21 @@ DOMAIN = _generate_domain()
DOMAIN_BARE = DOMAIN.rstrip(".")
NSID = _NSID_RAW if _NSID_RAW else seed.instance_uuid("nsid")[:16]
# Module-level map so tests can introspect probe names without re-parsing.
# Keys are the normalised qname (with trailing dot) as returned by _decode_name.
_CHAOS_PROBE_MAP: dict[str, str] = {} # populated after BIND_VERSION/NODE_NAME/NSID/_AUTHORS
_SOA_SERIAL = int(seed.instance_hex(4, "soa-serial"), 16) % 99 + 2020010101
NS1 = f"ns1.{DOMAIN_BARE}."
NS2 = f"ns2.{DOMAIN_BARE}."
_CHAOS_PROBE_MAP.update({
"version.bind.": BIND_VERSION,
"hostname.bind.": NODE_NAME,
"id.server.": NSID,
"authors.bind.": _AUTHORS,
})
def _fake_ip(label: str = "") -> str:
h = int(seed.instance_hex(3, f"ip:{label}"), 16)
@@ -692,12 +704,7 @@ def _handle(data: bytes, src_ip: str, src_port: int, transport: str) -> bytes |
# ── CHAOS fingerprinting ───────────────────────────────────────────────
if qclass == CLASS_CH and qtype == TYPE_TXT:
probe_map = {
"version.bind.": BIND_VERSION,
"hostname.bind.": NODE_NAME,
"id.server.": NSID,
}
answer_text = probe_map.get(qname, "")
answer_text = _CHAOS_PROBE_MAP.get(qname, "")
_log(
"fingerprint_probe",
src=src_ip, src_port=src_port, transport=transport,

View File

@@ -271,6 +271,27 @@ class TestFingerprintProbe:
mod._handle(query, "10.0.0.1", 12345, "udp")
assert not _events_of(events, "query")
def test_authors_bind_identified_by_name(self):
mod, events = _load_dns()
query = _build_query("authors.bind", mod.TYPE_TXT, qclass=mod.CLASS_CH)
resp = mod._handle(query, "10.0.0.1", 12345, "udp")
assert resp is not None
assert _rcode(resp) == mod.RCODE_NOERROR
probes = _events_of(events, "fingerprint_probe")
assert probes
assert probes[0]["probe"] == "authors.bind"
assert probes[0]["response"] != ""
def test_authors_bind_in_probe_map(self):
mod, _ = _load_dns()
assert "authors.bind." in mod._CHAOS_PROBE_MAP
def test_chaos_probe_map_introspectable(self):
mod, _ = _load_dns()
assert "version.bind." in mod._CHAOS_PROBE_MAP
assert "hostname.bind." in mod._CHAOS_PROBE_MAP
assert "id.server." in mod._CHAOS_PROBE_MAP
# ── Zone transfer ─────────────────────────────────────────────────────────────
class TestZoneTransfer: