decnet canary worker hosts both callback surfaces in one process:
- HTTP: a tiny FastAPI app on its own port (default 8088). The only
meaningful route is GET /c/{slug} which looks up the slug, persists
a CanaryTrigger, publishes canary.<id>.triggered, and returns a 1x1
transparent GIF. Unknown slugs return the same response (stealth);
no decnet strings leak in headers/banners; docs/openapi/redoc are
disabled. X-Forwarded-For is honored.
- DNS: an authoritative UDP server for *.<canary_zone> using
asyncio.DatagramProtocol with stdlib-only DNS wire-format parsing
(no dnslib dep). Same lookup -> persist -> publish flow, plus a
sinkhole A record (192.0.2.1) so the attacker's resolver doesn't
loop on NXDOMAIN. Single-label slugs only; multi-label probes
return NXDOMAIN. Pointer loops in malformed queries are caught
(10-hop cap) so an adversarial packet can't wedge the parser.
Tests cover both surfaces without privileged sockets:
- HTTP via Starlette TestClient: known/unknown slug, headers, XFF,
stealth-string assertions.
- DNS via direct DatagramProtocol drive: known slug -> ANSWER,
unknown -> NXDOMAIN, pointer-loop -> ValueError, malformed
packet -> silent drop.
120 lines
4.0 KiB
Python
120 lines
4.0 KiB
Python
"""DNS surface coverage for the canary worker.
|
|
|
|
We don't open a real UDP socket — instead we drive
|
|
:class:`CanaryDNSProtocol` directly with synthesised packets and
|
|
inspect the bytes it returns via a fake transport. Faster than a
|
|
real listener, and avoids needing privileged ports in the test
|
|
runner.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import struct
|
|
from typing import AsyncIterator
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
|
|
from decnet.canary.dns_server import (
|
|
CanaryDNSProtocol,
|
|
_encode_name,
|
|
parse_query,
|
|
)
|
|
|
|
|
|
def _build_query(qname: str, txid: int = 0xCAFE, qtype: int = 1) -> bytes:
|
|
header = struct.pack("!HHHHHH", txid, 0x0100, 1, 0, 0, 0) # RD bit set
|
|
return header + _encode_name(qname) + struct.pack("!HH", qtype, 1)
|
|
|
|
|
|
class _FakeTransport:
|
|
def __init__(self) -> None:
|
|
self.sent: list[tuple[bytes, tuple]] = []
|
|
|
|
def sendto(self, data: bytes, addr: tuple) -> None:
|
|
self.sent.append((data, addr))
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def proto_and_hits():
|
|
hits: list[tuple[str, str, str]] = []
|
|
|
|
async def hook(slug: str, query, src_ip: str) -> None: # type: ignore[no-untyped-def]
|
|
hits.append((slug, query.qname, src_ip))
|
|
|
|
proto = CanaryDNSProtocol("canary.example.test", hook, answer_ip="192.0.2.1")
|
|
transport = _FakeTransport()
|
|
proto.connection_made(transport)
|
|
yield proto, transport, hits
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_known_slug_returns_answer_and_fires_hook(proto_and_hits) -> None:
|
|
proto, transport, hits = proto_and_hits
|
|
pkt = _build_query("slug42.canary.example.test")
|
|
proto.datagram_received(pkt, ("203.0.113.7", 12345))
|
|
# Allow the create_task hook to settle.
|
|
await asyncio.sleep(0)
|
|
await asyncio.sleep(0)
|
|
assert hits == [("slug42", "slug42.canary.example.test", "203.0.113.7")]
|
|
assert len(transport.sent) == 1
|
|
response = transport.sent[0][0]
|
|
# Header: ANCOUNT == 1, RCODE == 0 in lower 4 bits of flags[1].
|
|
_txid, flags, _qd, an, _ns, _ar = struct.unpack("!HHHHHH", response[:12])
|
|
assert (flags & 0x0F) == 0 # NOERROR
|
|
assert an == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unknown_slug_returns_nxdomain(proto_and_hits) -> None:
|
|
proto, transport, hits = proto_and_hits
|
|
pkt = _build_query("not-our-zone.example.com")
|
|
proto.datagram_received(pkt, ("203.0.113.7", 12345))
|
|
await asyncio.sleep(0)
|
|
assert hits == []
|
|
assert len(transport.sent) == 1
|
|
response = transport.sent[0][0]
|
|
_txid, flags, _qd, an, _ns, _ar = struct.unpack("!HHHHHH", response[:12])
|
|
assert (flags & 0x0F) == 3 # NXDOMAIN
|
|
assert an == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_multi_label_subdomain_is_ignored(proto_and_hits) -> None:
|
|
"""Slug must be exactly one label. ``foo.bar.canary.example.test``
|
|
is an attacker probing a sub-resource we don't model — NXDOMAIN."""
|
|
proto, transport, hits = proto_and_hits
|
|
pkt = _build_query("foo.bar.canary.example.test")
|
|
proto.datagram_received(pkt, ("203.0.113.7", 12345))
|
|
await asyncio.sleep(0)
|
|
assert hits == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_malformed_packet_is_dropped_silently(proto_and_hits) -> None:
|
|
proto, transport, hits = proto_and_hits
|
|
proto.datagram_received(b"\x00\x01\x02", ("203.0.113.7", 12345))
|
|
await asyncio.sleep(0)
|
|
assert hits == []
|
|
assert transport.sent == []
|
|
|
|
|
|
def test_parse_query_round_trip() -> None:
|
|
pkt = _build_query("abc.def.canary.example.test", txid=0x1234, qtype=1)
|
|
q = parse_query(pkt)
|
|
assert q.txid == 0x1234
|
|
assert q.qname == "abc.def.canary.example.test"
|
|
assert q.qtype == 1
|
|
assert q.qclass == 1
|
|
|
|
|
|
def test_parse_query_handles_pointer_loop() -> None:
|
|
"""Malicious packet with a pointer loop must raise, not hang."""
|
|
# Header (12) + name with a self-pointer at offset 12.
|
|
header = struct.pack("!HHHHHH", 0, 0x0100, 1, 0, 0, 0)
|
|
name = struct.pack("!H", 0xC00C) # pointer back to offset 12
|
|
qtype_qclass = struct.pack("!HH", 1, 1)
|
|
packet = header + name + qtype_qclass
|
|
with pytest.raises(ValueError, match="pointer loop"):
|
|
parse_query(packet)
|