Files
DECNET/tests/canary/test_worker_dns.py

120 lines
4.0 KiB
Python

"""DNS surface coverage for the canary worker.
We don't open a real UDP socket — instead we drive
:class:`CanaryDNSProtocol` directly with synthesised packets and
inspect the bytes it returns via a fake transport. Faster than a
real listener, and avoids needing privileged ports in the test
runner.
"""
from __future__ import annotations
import asyncio
import struct
from typing import AsyncIterator
import pytest
import pytest_asyncio
from decnet.canary.dns_server import (
CanaryDNSProtocol,
_encode_name,
parse_query,
)
def _build_query(qname: str, txid: int = 0xCAFE, qtype: int = 1) -> bytes:
header = struct.pack("!HHHHHH", txid, 0x0100, 1, 0, 0, 0) # RD bit set
return header + _encode_name(qname) + struct.pack("!HH", qtype, 1)
class _FakeTransport:
def __init__(self) -> None:
self.sent: list[tuple[bytes, tuple]] = []
def sendto(self, data: bytes, addr: tuple) -> None:
self.sent.append((data, addr))
@pytest_asyncio.fixture
async def proto_and_hits():
hits: list[tuple[str, str, str]] = []
async def hook(slug: str, query, src_ip: str) -> None: # type: ignore[no-untyped-def]
hits.append((slug, query.qname, src_ip))
proto = CanaryDNSProtocol("canary.example.test", hook, answer_ip="192.0.2.1")
transport = _FakeTransport()
proto.connection_made(transport)
yield proto, transport, hits
@pytest.mark.asyncio
async def test_known_slug_returns_answer_and_fires_hook(proto_and_hits) -> None:
proto, transport, hits = proto_and_hits
pkt = _build_query("slug42.canary.example.test")
proto.datagram_received(pkt, ("203.0.113.7", 12345))
# Allow the create_task hook to settle.
await asyncio.sleep(0)
await asyncio.sleep(0)
assert hits == [("slug42", "slug42.canary.example.test", "203.0.113.7")]
assert len(transport.sent) == 1
response = transport.sent[0][0]
# Header: ANCOUNT == 1, RCODE == 0 in lower 4 bits of flags[1].
_txid, flags, _qd, an, _ns, _ar = struct.unpack("!HHHHHH", response[:12])
assert (flags & 0x0F) == 0 # NOERROR
assert an == 1
@pytest.mark.asyncio
async def test_unknown_slug_returns_nxdomain(proto_and_hits) -> None:
proto, transport, hits = proto_and_hits
pkt = _build_query("not-our-zone.example.com")
proto.datagram_received(pkt, ("203.0.113.7", 12345))
await asyncio.sleep(0)
assert hits == []
assert len(transport.sent) == 1
response = transport.sent[0][0]
_txid, flags, _qd, an, _ns, _ar = struct.unpack("!HHHHHH", response[:12])
assert (flags & 0x0F) == 3 # NXDOMAIN
assert an == 0
@pytest.mark.asyncio
async def test_multi_label_subdomain_is_ignored(proto_and_hits) -> None:
"""Slug must be exactly one label. ``foo.bar.canary.example.test``
is an attacker probing a sub-resource we don't model — NXDOMAIN."""
proto, transport, hits = proto_and_hits
pkt = _build_query("foo.bar.canary.example.test")
proto.datagram_received(pkt, ("203.0.113.7", 12345))
await asyncio.sleep(0)
assert hits == []
@pytest.mark.asyncio
async def test_malformed_packet_is_dropped_silently(proto_and_hits) -> None:
proto, transport, hits = proto_and_hits
proto.datagram_received(b"\x00\x01\x02", ("203.0.113.7", 12345))
await asyncio.sleep(0)
assert hits == []
assert transport.sent == []
def test_parse_query_round_trip() -> None:
pkt = _build_query("abc.def.canary.example.test", txid=0x1234, qtype=1)
q = parse_query(pkt)
assert q.txid == 0x1234
assert q.qname == "abc.def.canary.example.test"
assert q.qtype == 1
assert q.qclass == 1
def test_parse_query_handles_pointer_loop() -> None:
"""Malicious packet with a pointer loop must raise, not hang."""
# Header (12) + name with a self-pointer at offset 12.
header = struct.pack("!HHHHHH", 0, 0x0100, 1, 0, 0, 0)
name = struct.pack("!H", 0xC00C) # pointer back to offset 12
qtype_qclass = struct.pack("!HH", 1, 1)
packet = header + name + qtype_qclass
with pytest.raises(ValueError, match="pointer loop"):
parse_query(packet)