"""Tests for decnet/templates/dns/server.py and decnet/services/dns.py.""" import collections import importlib.util import struct import sys from types import ModuleType from unittest.mock import MagicMock, patch import pytest _SERVER_PATH = "decnet/templates/dns/server.py" # ── Test helpers ────────────────────────────────────────────────────────────── def _make_fake_syslog_bridge() -> ModuleType: mod = ModuleType("syslog_bridge") events: list[tuple[str, dict]] = [] def syslog_line(service, hostname, event_type, severity=6, **fields): events.append((event_type, fields)) return f"LOG {event_type}" mod.syslog_line = syslog_line mod.write_syslog_file = MagicMock() mod.forward_syslog = MagicMock() mod.SEVERITY_INFO = 6 mod.SEVERITY_WARNING = 4 mod.encode_secret = MagicMock(return_value={"secret_printable": "", "secret_b64": ""}) mod._events = events return mod def _make_fake_instance_seed() -> ModuleType: import random as _random mod = ModuleType("instance_seed") mod.rng = _random.Random(42) mod.pick = lambda choices: list(choices)[0] mod.instance_uuid = lambda ns="": f"aaaabbbb-cccc-dddd-eeee-{ns[:12].ljust(12, '0')}" mod.instance_hex = lambda nbytes, ns="": ("deadbeef" * 4)[:nbytes * 2] mod.hostname = lambda: "testhost" mod.jitter = MagicMock() return mod def _load_dns(extra_env: dict | None = None): """Load server.py in isolation with mocked syslog_bridge and instance_seed.""" env = { "NODE_NAME": "testhost", "DNS_ZONE_MODE": "auth", "DNS_DOMAIN": "test.local", "DNS_BIND_VERSION": "9.11.4-TEST", "DNS_NSID": "testnsid", "DNS_EXTRA_RECORDS": "", **(extra_env or {}), } for key in list(sys.modules): if key in ("dns_server", "syslog_bridge", "instance_seed"): del sys.modules[key] bridge = _make_fake_syslog_bridge() seed = _make_fake_instance_seed() sys.modules["syslog_bridge"] = bridge sys.modules["instance_seed"] = seed spec = importlib.util.spec_from_file_location("dns_server", _SERVER_PATH) mod = importlib.util.module_from_spec(spec) # type: ignore[arg-type] with patch.dict("os.environ", env, clear=False): spec.loader.exec_module(mod) # type: ignore[union-attr] # Reset tunneling state between tests mod._txt_times.clear() return mod, bridge._events def _build_query( qname: str, qtype: int, qclass: int = 1, qid: int = 0x1234, rd: bool = True, ) -> bytes: """Minimal DNS query wire packet.""" flags = 0x0100 if rd else 0x0000 header = struct.pack(">HHHHHH", qid, flags, 1, 0, 0, 0) wire = b"" for label in qname.rstrip(".").split("."): enc = label.encode("ascii") wire += bytes([len(enc)]) + enc wire += b"\x00" return header + wire + struct.pack(">HH", qtype, qclass) def _rcode(data: bytes) -> int: return struct.unpack_from(">H", data, 2)[0] & 0x0F def _counts(data: bytes) -> tuple[int, int, int, int]: _, _, qd, an, ns, ar = struct.unpack_from(">HHHHHH", data, 0) return qd, an, ns, ar def _events_of(events: list, kind: str) -> list[dict]: return [fields for etype, fields in events if etype == kind] # ── Auth zone ───────────────────────────────────────────────────────────────── class TestAuthZone: def test_a_record_apex(self): mod, events = _load_dns() resp = mod._handle(_build_query("test.local", mod.TYPE_A), "1.2.3.4", 1234, "udp") assert resp is not None assert _rcode(resp) == mod.RCODE_NOERROR _, ancount, _, _ = _counts(resp) assert ancount >= 1 assert _events_of(events, "query") def test_a_record_www(self): mod, events = _load_dns() resp = mod._handle(_build_query("www.test.local", mod.TYPE_A), "1.2.3.4", 1234, "udp") assert resp is not None assert _rcode(resp) == mod.RCODE_NOERROR _, ancount, _, _ = _counts(resp) assert ancount >= 1 def test_nxdomain_unknown_name(self): mod, _ = _load_dns() resp = mod._handle(_build_query("nobody.test.local", mod.TYPE_A), "1.2.3.4", 1234, "udp") assert resp is not None assert _rcode(resp) == mod.RCODE_NXDOMAIN def test_out_of_zone_refused_in_auth_mode(self): mod, _ = _load_dns({"DNS_ZONE_MODE": "auth"}) resp = mod._handle(_build_query("google.com", mod.TYPE_A), "1.2.3.4", 1234, "udp") assert resp is not None assert _rcode(resp) == mod.RCODE_REFUSED def test_soa_record(self): mod, events = _load_dns() resp = mod._handle(_build_query("test.local", mod.TYPE_SOA), "1.2.3.4", 1234, "udp") assert resp is not None assert _rcode(resp) == mod.RCODE_NOERROR _, ancount, _, _ = _counts(resp) assert ancount >= 1 def test_mx_record(self): mod, events = _load_dns() resp = mod._handle(_build_query("test.local", mod.TYPE_MX), "1.2.3.4", 1234, "udp") assert resp is not None assert _rcode(resp) == mod.RCODE_NOERROR def test_extra_records_parsed(self): mod, events = _load_dns({"DNS_EXTRA_RECORDS": "extra A 192.168.0.50"}) resp = mod._handle(_build_query("extra.test.local", mod.TYPE_A), "1.2.3.4", 1234, "udp") assert resp is not None assert _rcode(resp) == mod.RCODE_NOERROR # ── Fingerprint probes ──────────────────────────────────────────────────────── class TestFingerprintProbe: def test_version_bind_returns_configured_banner(self): mod, events = _load_dns() query = _build_query("version.bind", mod.TYPE_TXT, qclass=mod.CLASS_CH) resp = mod._handle(query, "10.0.0.1", 12345, "udp") assert resp is not None assert _rcode(resp) == mod.RCODE_NOERROR _, ancount, _, _ = _counts(resp) assert ancount == 1 probes = _events_of(events, "fingerprint_probe") assert probes assert probes[0]["probe"] == "version.bind" assert probes[0]["response"] == "9.11.4-TEST" def test_hostname_bind_emits_fingerprint_probe(self): mod, events = _load_dns() query = _build_query("hostname.bind", mod.TYPE_TXT, qclass=mod.CLASS_CH) resp = mod._handle(query, "10.0.0.1", 12345, "udp") assert resp is not None assert _events_of(events, "fingerprint_probe") def test_id_server_emits_fingerprint_probe(self): mod, events = _load_dns() query = _build_query("id.server", mod.TYPE_TXT, qclass=mod.CLASS_CH) resp = mod._handle(query, "10.0.0.1", 12345, "udp") assert resp is not None assert _events_of(events, "fingerprint_probe") def test_unknown_chaos_is_refused_still_logged(self): mod, events = _load_dns() query = _build_query("something.chaos", mod.TYPE_TXT, qclass=mod.CLASS_CH) resp = mod._handle(query, "10.0.0.1", 12345, "udp") assert resp is not None assert _rcode(resp) == mod.RCODE_REFUSED assert _events_of(events, "fingerprint_probe") def test_no_query_event_for_fingerprint(self): mod, events = _load_dns() query = _build_query("version.bind", mod.TYPE_TXT, qclass=mod.CLASS_CH) mod._handle(query, "10.0.0.1", 12345, "udp") assert not _events_of(events, "query") # ── Zone transfer ───────────────────────────────────────────────────────────── class TestZoneTransfer: def test_axfr_refused_and_logged(self): mod, events = _load_dns() query = _build_query("test.local", mod.TYPE_AXFR) resp = mod._handle(query, "5.5.5.5", 9999, "tcp") assert resp is not None assert _rcode(resp) == mod.RCODE_REFUSED xfers = _events_of(events, "zone_transfer") assert xfers assert xfers[0]["qtype"] == "AXFR" assert xfers[0]["transport"] == "tcp" def test_ixfr_refused_and_logged(self): mod, events = _load_dns() query = _build_query("test.local", mod.TYPE_IXFR) resp = mod._handle(query, "5.5.5.5", 9999, "tcp") assert resp is not None assert _rcode(resp) == mod.RCODE_REFUSED xfers = _events_of(events, "zone_transfer") assert xfers assert xfers[0]["qtype"] == "IXFR" # ── Amp probes ──────────────────────────────────────────────────────────────── class TestAmpProbe: def test_qtype_any_emits_amp_probe(self): mod, events = _load_dns() query = _build_query("test.local", mod.TYPE_ANY) resp = mod._handle(query, "2.2.2.2", 5353, "udp") assert resp is not None assert _events_of(events, "amp_probe") def test_amp_probe_suppresses_plain_query_event(self): mod, events = _load_dns() query = _build_query("test.local", mod.TYPE_ANY) mod._handle(query, "2.2.2.2", 5353, "udp") assert not _events_of(events, "query") # ── Tunneling heuristic ─────────────────────────────────────────────────────── class TestTunnelingHeuristic: def test_long_high_entropy_label(self): mod, events = _load_dns() # 40-char high-entropy label (mix of alpha + digits) label = "abcdefghijklmnopqrstuvwxyz0123456789abcd" assert len(label) >= mod._LABEL_LEN_THRESHOLD query = _build_query(f"{label}.test.local", mod.TYPE_A) resp = mod._handle(query, "9.9.9.9", 1234, "udp") assert resp is not None assert _events_of(events, "tunneling_suspect") def test_rapid_txt_burst_triggers_tunneling(self): mod, events = _load_dns() src = "3.3.3.3" # 5 TXT queries in rapid succession triggers the burst heuristic for i in range(5): query = _build_query(f"chunk{i}.test.local", mod.TYPE_TXT) mod._handle(query, src, 1234, "udp") assert _events_of(events, "tunneling_suspect") def test_tunneling_suppresses_plain_query_event(self): mod, events = _load_dns() label = "abcdefghijklmnopqrstuvwxyz0123456789abcd" query = _build_query(f"{label}.test.local", mod.TYPE_A) mod._handle(query, "9.9.9.9", 1234, "udp") assert not _events_of(events, "query") # ── Zone mode: open ─────────────────────────────────────────────────────────── class TestZoneModeOpen: def test_open_mode_resolves_any_name(self): mod, _ = _load_dns({"DNS_ZONE_MODE": "open"}) for qname in ("evil.example.com", "c2.attacker.net", "random.io"): query = _build_query(qname, mod.TYPE_A) resp = mod._handle(query, "4.4.4.4", 1234, "udp") assert resp is not None, f"no response for {qname}" assert _rcode(resp) == mod.RCODE_NOERROR _, ancount, _, _ = _counts(resp) assert ancount >= 1 def test_open_mode_returns_loopback_sinkhole(self): mod, _ = _load_dns({"DNS_ZONE_MODE": "open"}) # The sinkhole A record must be in 127.0.0.0/8 query = _build_query("anything.com", mod.TYPE_A) resp = mod._handle(query, "4.4.4.4", 1234, "udp") assert resp is not None # Find the A RDATA — walk past header(12) + question + answer name # Just verify the response contains 127 somewhere in a 4-byte window assert b"\x7f" in resp # 0x7f = 127 # ── Zone mode: recursive ────────────────────────────────────────────────────── class TestZoneModeRecursive: def test_recursive_mode_sets_ra_flag(self): mod, _ = _load_dns({"DNS_ZONE_MODE": "recursive"}) query = _build_query("out-of-zone.example.com", mod.TYPE_A) resp = mod._handle(query, "1.1.1.1", 1234, "udp") assert resp is not None flags = struct.unpack_from(">H", resp, 2)[0] ra = bool(flags & 0x0080) assert ra # ── Service registration ────────────────────────────────────────────────────── class TestServiceRegistration: def test_dns_registered_by_name(self): from decnet.services.registry import get_service svc = get_service("dns") assert svc is not None assert svc.name == "dns" def test_dns_port_53(self): from decnet.services.registry import get_service svc = get_service("dns") assert 53 in svc.ports def test_dns_udp_ports(self): from decnet.services.registry import get_service svc = get_service("dns") assert 53 in svc.udp_ports() def test_compose_fragment_structure(self): from decnet.services.registry import get_service svc = get_service("dns") frag = svc.compose_fragment("decky-01", log_target="127.0.0.1:514") assert "build" in frag assert frag["container_name"] == "decky-01-dns" assert frag["environment"]["NODE_NAME"] == "decky-01" assert frag["environment"]["LOG_TARGET"] == "127.0.0.1:514" assert "DNS_ZONE_MODE" in frag["environment"] assert "DNS_BIND_VERSION" in frag["environment"] def test_compose_fragment_no_log_target(self): from decnet.services.registry import get_service svc = get_service("dns") frag = svc.compose_fragment("decky-02") assert "LOG_TARGET" not in frag["environment"] def test_dockerfile_context_points_to_template(self): from decnet.services.registry import get_service svc = get_service("dns") ctx = svc.dockerfile_context() assert ctx is not None assert ctx.name == "dns" assert (ctx / "Dockerfile").exists()