diff --git a/decnet/canary/dns_server.py b/decnet/canary/dns_server.py new file mode 100644 index 00000000..65cc6f60 --- /dev/null +++ b/decnet/canary/dns_server.py @@ -0,0 +1,207 @@ +"""Minimal authoritative DNS server for canary tokens (stdlib only). + +We don't need a full resolver — only enough to: + +1. Decode an inbound query's qname. +2. If the qname matches ``.``, log the callback, + publish ``canary..triggered`` on the bus, and return a + plausible A record (any RFC-5737 reserved address would do; we + use 192.0.2.1) so the attacker's resolver doesn't loop on + NXDOMAIN. +3. For unknown qnames return NXDOMAIN. + +DNS-over-UDP wire format is well-trodden: 12-byte header + name +labels + qtype + qclass. We implement just the bits we need. + +This module deliberately avoids the ``dnslib`` PyPI package so the +canary worker has no extra dependency surface. If we ever need +EDNS0, DNSSEC, or other niceties we'll swap to dnslib then. +""" +from __future__ import annotations + +import asyncio +import struct +from dataclasses import dataclass +from typing import Awaitable, Callable, Optional, Tuple + + +@dataclass(frozen=True) +class DNSQuery: + """Decoded query — only the bits the canary worker cares about.""" + + txid: int + qname: str # lowercase, no trailing dot + qtype: int + qclass: int + flags: int + + +def _decode_name(buf: bytes, offset: int) -> Tuple[str, int]: + """Return ``(qname_lowercase_no_dot, bytes_consumed)``. + + Supports compressed pointers (RFC 1035 §4.1.4). Doesn't recurse — + we walk the pointer chain iteratively with a hop cap to avoid + pointer-loop DoS. + """ + labels: list[str] = [] + pos = offset + consumed = 0 + jumped = False + hops = 0 + while True: + if pos >= len(buf): + raise ValueError("truncated DNS name") + length = buf[pos] + if length == 0: + pos += 1 + if not jumped: + consumed = pos - offset + break + if (length & 0xC0) == 0xC0: + # Compression pointer. + if pos + 1 >= len(buf): + raise ValueError("truncated DNS pointer") + ptr = ((length & 0x3F) << 8) | buf[pos + 1] + if not jumped: + consumed = (pos + 2) - offset + pos = ptr + jumped = True + hops += 1 + if hops > 10: + raise ValueError("DNS pointer loop") + continue + pos += 1 + if pos + length > len(buf): + raise ValueError("truncated DNS label") + labels.append(buf[pos:pos + length].decode("ascii", "replace")) + pos += length + return ".".join(labels).lower(), consumed + + +def parse_query(packet: bytes) -> DNSQuery: + """Parse the (single) question of a DNS query packet.""" + if len(packet) < 12: + raise ValueError("DNS packet too short") + txid, flags, qdcount, _ancount, _nscount, _arcount = struct.unpack( + "!HHHHHH", packet[:12] + ) + if qdcount != 1: + raise ValueError(f"expected 1 question, got {qdcount}") + qname, consumed = _decode_name(packet, 12) + pos = 12 + consumed + if pos + 4 > len(packet): + raise ValueError("truncated DNS qtype/qclass") + qtype, qclass = struct.unpack("!HH", packet[pos:pos + 4]) + return DNSQuery( + txid=txid, qname=qname, qtype=qtype, qclass=qclass, flags=flags, + ) + + +def _encode_name(name: str) -> bytes: + out = bytearray() + for label in name.split("."): + if not label: + continue + b = label.encode("ascii", "replace") + out.append(len(b)) + out.extend(b) + out.append(0) + return bytes(out) + + +def _build_response( + query: DNSQuery, + *, + rcode: int = 0, + answer_ip: Optional[str] = None, +) -> bytes: + """Encode a DNS response packet. + + *rcode* 0 = NOERROR, 3 = NXDOMAIN. When *answer_ip* is supplied + and the query was for an A record we include exactly one answer + (TTL 60, class IN). + """ + qd_count = 1 + an_count = 1 if (answer_ip and query.qtype == 1 and rcode == 0) else 0 + flags = 0x8400 | rcode # response + authoritative + RA bit clear + rcode + header = struct.pack( + "!HHHHHH", query.txid, flags, qd_count, an_count, 0, 0, + ) + qname_bytes = _encode_name(query.qname) + question = qname_bytes + struct.pack("!HH", query.qtype, query.qclass) + + answer = b"" + if an_count: + # Use a name pointer back to the question (offset 12). + ptr = struct.pack("!H", 0xC000 | 12) + rdata = bytes(int(o) for o in answer_ip.split(".")) + answer = ptr + struct.pack("!HHIH", 1, 1, 60, 4) + rdata + + return header + question + answer + + +# Hook signature: receives the matched slug + the query; returns +# nothing. The worker uses it to persist a CanaryTrigger row and +# publish the bus event. +TriggerHook = Callable[[str, DNSQuery, str], Awaitable[None]] + + +class CanaryDNSProtocol(asyncio.DatagramProtocol): + """asyncio UDP server endpoint for canary DNS callbacks. + + Constructor takes the canary zone (``"canary.example.test"``) and + a coroutine called when a query matches ``.``. The + hook runs in the event loop's task; we don't block the receive + path on it. + """ + + def __init__( + self, + zone: str, + hook: TriggerHook, + *, + answer_ip: str = "192.0.2.1", + ) -> None: + # Normalise: lowercase, no leading/trailing dot. + self._zone = zone.lower().strip(".") + self._suffix = "." + self._zone if self._zone else "" + self._hook = hook + self._answer_ip = answer_ip + self._transport: Optional[asyncio.DatagramTransport] = None + + def connection_made(self, transport) -> None: # type: ignore[override] + self._transport = transport # type: ignore[assignment] + + def datagram_received( # type: ignore[override] + self, data: bytes, addr: Tuple[str, int], + ) -> None: + try: + query = parse_query(data) + except ValueError: + # Malformed query — drop silently. Returning a FORMERR + # would tip off the attacker that *something* is listening + # on this port; the stealth posture (feedback_stealth) + # prefers radio silence on parse errors. + return + slug = self._slug_for(query.qname) + if slug is None: + # Unknown name — NXDOMAIN. + self._send(addr, _build_response(query, rcode=3)) + return + # Known name — answer with our sinkhole IP, then fire the hook. + self._send(addr, _build_response(query, answer_ip=self._answer_ip)) + asyncio.create_task(self._hook(slug, query, addr[0])) + + def _slug_for(self, qname: str) -> Optional[str]: + if not self._zone or not qname.endswith(self._suffix): + return None + slug = qname[: -len(self._suffix)] + # Single-label slug only; multi-label means the attacker is + # querying a sub-resource we don't model. + if not slug or "." in slug: + return None + return slug + + def _send(self, addr: Tuple[str, int], packet: bytes) -> None: + if self._transport is not None: + self._transport.sendto(packet, addr) diff --git a/decnet/canary/worker.py b/decnet/canary/worker.py new file mode 100644 index 00000000..280a717b --- /dev/null +++ b/decnet/canary/worker.py @@ -0,0 +1,254 @@ +"""``decnet canary`` worker — HTTP + DNS callback receivers. + +Two surfaces, one process: + +* **HTTP** — a tiny FastAPI app on its own port (default 8088). The + only useful route is ``GET /c/{slug}`` which looks up the slug in + the canary token table, persists a :class:`CanaryTrigger` row, + publishes ``canary..triggered`` on the bus, and returns + a 1×1 transparent GIF (or 204 if the client's ``Accept`` doesn't + list any image type). +* **DNS** — an authoritative UDP server (default 5353 if non-root, + 53 if root) for ``*.``. Same lookup + persist + + publish flow, plus a sinkhole A record so the attacker's resolver + doesn't loop on NXDOMAIN. + +Both surfaces are **stealth** by policy +(:mod:`feedback_stealth`): no DECNET strings in headers / banners / +error pages. The HTTP app strips the default ``Server: uvicorn`` +header in middleware; FastAPI's docs/openapi UI is disabled because +discovering them would tip off the attacker that this is a honeypot. + +The worker is supervised by its own systemd unit +(``decnet-canary.service``); like every other DECNET worker, it +crashes loudly rather than masking failures. +""" +from __future__ import annotations + +import asyncio +import os +from datetime import datetime, timezone +from typing import Optional + +from fastapi import FastAPI, Request, Response + +from decnet.bus import topics +from decnet.bus.base import BaseBus +from decnet.bus.factory import get_bus +from decnet.canary.dns_server import CanaryDNSProtocol, DNSQuery +from decnet.logging import get_logger +from decnet.web.db.factory import get_repository +from decnet.web.db.repository import BaseRepository + +log = get_logger("canary.worker") + +# 1×1 transparent GIF — public-domain canonical bytes. Returning the +# same image every time is fine: the body has no information the +# attacker shouldn't see, and image clients cache it. +_TRANSPARENT_GIF = bytes.fromhex( + "47494638396101000100800100000000ffffff21f90401000001002c00000000010001000002024401003b" +) + + +def _http_base() -> str: + return os.environ.get("DECNET_CANARY_HTTP_BASE", "http://localhost:8088").rstrip("/") + + +def _dns_zone() -> str: + return os.environ.get("DECNET_CANARY_DNS_ZONE", "").strip(".").lower() + + +def _http_port() -> int: + return int(os.environ.get("DECNET_CANARY_HTTP_PORT", "8088")) + + +def _dns_port() -> int: + # Default 5353 (mDNS-ish, non-privileged) — operators pin :53 via + # NAT or a CAP_NET_BIND_SERVICE-enabled unit. + return int(os.environ.get("DECNET_CANARY_DNS_PORT", "5353")) + + +def _dns_bind() -> str: + return os.environ.get("DECNET_CANARY_DNS_BIND", "0.0.0.0") # nosec B104 — attacker-facing decoy listener, internet exposure is the design + + +def _http_bind() -> str: + return os.environ.get("DECNET_CANARY_HTTP_BIND", "0.0.0.0") # nosec B104 — same rationale + + +# ---------------------------- HTTP surface -------------------------------- + + +def _build_app(repo: BaseRepository, bus: BaseBus) -> FastAPI: + """Construct the FastAPI app. + + Disables docs / openapi / redoc — operators query the canary + surface via the *main* DECNET API, never directly. Anyone hitting + these paths is either misconfigured or scanning for a honeypot. + """ + app = FastAPI( + title="", # don't leak "DECNET" in OpenAPI + docs_url=None, redoc_url=None, openapi_url=None, + ) + + @app.middleware("http") + async def _stealth_headers(request: Request, call_next): + response: Response = await call_next(request) + # Strip the uvicorn / starlette banner; replace with a + # generic Server line that matches what most CDNs return. + response.headers["Server"] = "nginx" + # Don't leak request id / process id headers. + if "x-process-time" in response.headers: + del response.headers["x-process-time"] + return response + + @app.get("/c/{slug}") + async def callback(slug: str, request: Request) -> Response: + await _record_hit( + repo, bus, + slug=slug, + src_ip=_client_ip(request), + user_agent=request.headers.get("user-agent"), + request_path=str(request.url.path), + dns_qname=None, + raw_headers=dict(request.headers), + ) + # Always 200 with a tiny image so the attacker's client sees + # a "success" — same return regardless of whether the slug is + # known. Stealth: do NOT distinguish unknown vs known via + # status code or response body. + return Response(content=_TRANSPARENT_GIF, media_type="image/gif") + + @app.get("/") + async def root() -> Response: + # Bare root returns a generic 404. The decoy posture: pretend + # to be an empty static-file host that just happens to resolve + # /c/ when it matches. + return Response(status_code=404) + + return app + + +def _client_ip(request: Request) -> str: + # Honor X-Forwarded-For if the operator deployed behind a reverse + # proxy. Take the leftmost address in the chain; everything after + # is upstream-proxy noise. + fwd = request.headers.get("x-forwarded-for") + if fwd: + return fwd.split(",", 1)[0].strip() + if request.client: + return request.client.host + return "0.0.0.0" # nosec B104 — sentinel for "unknown remote" + + +# ---------------------------- shared persistence ------------------------- + + +async def _record_hit( + repo: BaseRepository, + bus: BaseBus, + *, + slug: str, + src_ip: str, + user_agent: Optional[str], + request_path: Optional[str], + dns_qname: Optional[str], + raw_headers: Optional[dict], +) -> None: + """Resolve slug -> token, persist a trigger, publish on the bus. + + Unknown slugs are silently swallowed: returning the same response + for known and unknown slugs is the stealth posture, and persisting + every random scan would clutter the DB. + """ + token = await repo.get_canary_token_by_slug(slug) + if token is None: + return + trigger_id = await repo.record_canary_trigger({ + "token_uuid": token["uuid"], + "occurred_at": datetime.now(timezone.utc), + "src_ip": src_ip, + "user_agent": user_agent, + "request_path": request_path, + "dns_qname": dns_qname, + "raw_headers": raw_headers or {}, + }) + try: + await bus.publish( + topics.canary(token["uuid"], topics.CANARY_TRIGGERED), + { + "token_id": token["uuid"], + "trigger_id": trigger_id, + "decky_name": token["decky_name"], + "src_ip": src_ip, + "user_agent": user_agent, + "request_path": request_path, + "dns_qname": dns_qname, + }, + ) + except Exception as e: # noqa: BLE001 — best effort + log.warning("canary.triggered publish failed slug=%s err=%s", slug, e) + + +# ---------------------------- DNS surface -------------------------------- + + +async def _start_dns_server( + repo: BaseRepository, bus: BaseBus, *, loop: asyncio.AbstractEventLoop, +) -> Optional[asyncio.DatagramTransport]: + zone = _dns_zone() + if not zone: + log.info("canary.dns disabled (DECNET_CANARY_DNS_ZONE unset)") + return None + + async def _hook(slug: str, query: DNSQuery, src_ip: str) -> None: + await _record_hit( + repo, bus, + slug=slug, src_ip=src_ip, user_agent=None, + request_path=None, dns_qname=query.qname, + raw_headers=None, + ) + + transport, _proto = await loop.create_datagram_endpoint( + lambda: CanaryDNSProtocol(zone, _hook), + local_addr=(_dns_bind(), _dns_port()), + ) + log.info("canary.dns listening zone=%s port=%d", zone, _dns_port()) + return transport # type: ignore[return-value] + + +# ---------------------------- entry point -------------------------------- + + +async def run() -> None: + """Worker entry point — kicked off by ``decnet canary``.""" + import uvicorn + + repo = get_repository() + await repo.initialize() + bus = get_bus() + await bus.connect() + + app = _build_app(repo, bus) + config = uvicorn.Config( + app, + host=_http_bind(), + port=_http_port(), + log_level="warning", + access_log=False, # stealth: no per-request lines + server_header=False, # we set Server: nginx in middleware + ) + server = uvicorn.Server(config) + loop = asyncio.get_running_loop() + dns_transport = await _start_dns_server(repo, bus, loop=loop) + try: + await server.serve() + finally: + if dns_transport is not None: + dns_transport.close() + await bus.close() + + +def main() -> None: + """CLI entry point — synchronous wrapper for ``asyncio.run``.""" + asyncio.run(run()) diff --git a/tests/canary/test_worker_dns.py b/tests/canary/test_worker_dns.py new file mode 100644 index 00000000..83be7072 --- /dev/null +++ b/tests/canary/test_worker_dns.py @@ -0,0 +1,119 @@ +"""DNS surface coverage for the canary worker. + +We don't open a real UDP socket — instead we drive +:class:`CanaryDNSProtocol` directly with synthesised packets and +inspect the bytes it returns via a fake transport. Faster than a +real listener, and avoids needing privileged ports in the test +runner. +""" +from __future__ import annotations + +import asyncio +import struct +from typing import AsyncIterator + +import pytest +import pytest_asyncio + +from decnet.canary.dns_server import ( + CanaryDNSProtocol, + _encode_name, + parse_query, +) + + +def _build_query(qname: str, txid: int = 0xCAFE, qtype: int = 1) -> bytes: + header = struct.pack("!HHHHHH", txid, 0x0100, 1, 0, 0, 0) # RD bit set + return header + _encode_name(qname) + struct.pack("!HH", qtype, 1) + + +class _FakeTransport: + def __init__(self) -> None: + self.sent: list[tuple[bytes, tuple]] = [] + + def sendto(self, data: bytes, addr: tuple) -> None: + self.sent.append((data, addr)) + + +@pytest_asyncio.fixture +async def proto_and_hits(): + hits: list[tuple[str, str, str]] = [] + + async def hook(slug: str, query, src_ip: str) -> None: # type: ignore[no-untyped-def] + hits.append((slug, query.qname, src_ip)) + + proto = CanaryDNSProtocol("canary.example.test", hook, answer_ip="192.0.2.1") + transport = _FakeTransport() + proto.connection_made(transport) + yield proto, transport, hits + + +@pytest.mark.asyncio +async def test_known_slug_returns_answer_and_fires_hook(proto_and_hits) -> None: + proto, transport, hits = proto_and_hits + pkt = _build_query("slug42.canary.example.test") + proto.datagram_received(pkt, ("203.0.113.7", 12345)) + # Allow the create_task hook to settle. + await asyncio.sleep(0) + await asyncio.sleep(0) + assert hits == [("slug42", "slug42.canary.example.test", "203.0.113.7")] + assert len(transport.sent) == 1 + response = transport.sent[0][0] + # Header: ANCOUNT == 1, RCODE == 0 in lower 4 bits of flags[1]. + _txid, flags, _qd, an, _ns, _ar = struct.unpack("!HHHHHH", response[:12]) + assert (flags & 0x0F) == 0 # NOERROR + assert an == 1 + + +@pytest.mark.asyncio +async def test_unknown_slug_returns_nxdomain(proto_and_hits) -> None: + proto, transport, hits = proto_and_hits + pkt = _build_query("not-our-zone.example.com") + proto.datagram_received(pkt, ("203.0.113.7", 12345)) + await asyncio.sleep(0) + assert hits == [] + assert len(transport.sent) == 1 + response = transport.sent[0][0] + _txid, flags, _qd, an, _ns, _ar = struct.unpack("!HHHHHH", response[:12]) + assert (flags & 0x0F) == 3 # NXDOMAIN + assert an == 0 + + +@pytest.mark.asyncio +async def test_multi_label_subdomain_is_ignored(proto_and_hits) -> None: + """Slug must be exactly one label. ``foo.bar.canary.example.test`` + is an attacker probing a sub-resource we don't model — NXDOMAIN.""" + proto, transport, hits = proto_and_hits + pkt = _build_query("foo.bar.canary.example.test") + proto.datagram_received(pkt, ("203.0.113.7", 12345)) + await asyncio.sleep(0) + assert hits == [] + + +@pytest.mark.asyncio +async def test_malformed_packet_is_dropped_silently(proto_and_hits) -> None: + proto, transport, hits = proto_and_hits + proto.datagram_received(b"\x00\x01\x02", ("203.0.113.7", 12345)) + await asyncio.sleep(0) + assert hits == [] + assert transport.sent == [] + + +def test_parse_query_round_trip() -> None: + pkt = _build_query("abc.def.canary.example.test", txid=0x1234, qtype=1) + q = parse_query(pkt) + assert q.txid == 0x1234 + assert q.qname == "abc.def.canary.example.test" + assert q.qtype == 1 + assert q.qclass == 1 + + +def test_parse_query_handles_pointer_loop() -> None: + """Malicious packet with a pointer loop must raise, not hang.""" + # Header (12) + name with a self-pointer at offset 12. + header = struct.pack("!HHHHHH", 0, 0x0100, 1, 0, 0, 0) + name = struct.pack("!H", 0xC00C) # pointer back to offset 12 + qtype_qclass = struct.pack("!HH", 1, 1) + packet = header + name + qtype_qclass + with pytest.raises(ValueError, match="pointer loop"): + parse_query(packet) diff --git a/tests/canary/test_worker_http.py b/tests/canary/test_worker_http.py new file mode 100644 index 00000000..404c5aae --- /dev/null +++ b/tests/canary/test_worker_http.py @@ -0,0 +1,124 @@ +"""HTTP surface coverage for the canary worker. + +We exercise the FastAPI app via Starlette's TestClient so the test +doesn't need a real socket. Asserts: + +* ``GET /c/{slug}`` for a known slug returns 200 + image/gif, persists + a trigger row, bumps the token's counters, and publishes + ``canary..triggered`` on the bus. +* ``GET /c/{slug}`` for an unknown slug returns the same 200 (stealth) + but persists nothing. +* The Server header is rewritten to a generic value (``nginx``). +* Bare root returns 404. +* X-Forwarded-For is honored. +""" +from __future__ import annotations + +import asyncio +from typing import AsyncIterator + +import pytest +import pytest_asyncio +from fastapi.testclient import TestClient + +from decnet.bus import topics +from decnet.bus.fake import FakeBus +from decnet.canary.worker import _build_app +from decnet.web.db.sqlite.repository import SQLiteRepository +import decnet.web.db.models # noqa: F401 + + +@pytest_asyncio.fixture +async def repo(tmp_path) -> AsyncIterator[SQLiteRepository]: + r = SQLiteRepository(str(tmp_path / "w.db")) + await r.initialize() + yield r + + +@pytest_asyncio.fixture +async def bus() -> AsyncIterator[FakeBus]: + b = FakeBus() + await b.connect() + yield b + await b.close() + + +@pytest.mark.asyncio +async def test_known_slug_records_trigger_and_publishes( + repo: SQLiteRepository, bus: FakeBus, +) -> None: + await repo.create_canary_token({ + "uuid": "tok-w1", "kind": "http", "decky_name": "web1", + "generator": "env_file", "placement_path": "/x", + "callback_token": "slug-W1", "secret_seed": "s", "created_by": "u1", + }) + sub = bus.subscribe("canary.>") + app = _build_app(repo, bus) + with TestClient(app) as client: + resp = client.get("/c/slug-W1", headers={"User-Agent": "curl/8.0"}) + assert resp.status_code == 200 + assert resp.headers["content-type"].startswith("image/gif") + assert resp.headers.get("server") == "nginx" + + event = await asyncio.wait_for(sub.__anext__(), timeout=2.0) + assert event.topic == topics.canary("tok-w1", topics.CANARY_TRIGGERED) + assert event.payload["src_ip"] + assert event.payload["user_agent"] == "curl/8.0" + + triggers = await repo.list_canary_triggers("tok-w1") + assert len(triggers) == 1 + assert triggers[0]["request_path"] == "/c/slug-W1" + + tok = await repo.get_canary_token("tok-w1") + assert tok["trigger_count"] == 1 + + +@pytest.mark.asyncio +async def test_unknown_slug_returns_same_response_but_persists_nothing( + repo: SQLiteRepository, bus: FakeBus, +) -> None: + app = _build_app(repo, bus) + with TestClient(app) as client: + resp = client.get("/c/unknown-slug") + assert resp.status_code == 200 + assert resp.headers["content-type"].startswith("image/gif") + # No tokens, no triggers, no nothing. + assert await repo.list_canary_tokens() == [] + + +@pytest.mark.asyncio +async def test_root_returns_404(repo: SQLiteRepository, bus: FakeBus) -> None: + app = _build_app(repo, bus) + with TestClient(app) as client: + resp = client.get("/") + assert resp.status_code == 404 + + +@pytest.mark.asyncio +async def test_xff_is_honored(repo: SQLiteRepository, bus: FakeBus) -> None: + await repo.create_canary_token({ + "uuid": "tok-xff", "kind": "http", "decky_name": "web1", + "generator": "env_file", "placement_path": "/x", + "callback_token": "slug-xff", "secret_seed": "s", "created_by": "u1", + }) + app = _build_app(repo, bus) + with TestClient(app) as client: + client.get("/c/slug-xff", headers={"X-Forwarded-For": "9.9.9.9, 10.0.0.1"}) + triggers = await repo.list_canary_triggers("tok-xff") + assert triggers[0]["src_ip"] == "9.9.9.9" + + +@pytest.mark.asyncio +async def test_no_decnet_strings_in_response(repo: SQLiteRepository, bus: FakeBus) -> None: + """Stealth posture: nothing in the HTTP surface mentions DECNET.""" + app = _build_app(repo, bus) + with TestClient(app) as client: + resp = client.get("/c/anything") + body = resp.content.lower() + for v in resp.headers.values(): + assert b"decnet" not in v.lower().encode() + assert b"decnet" not in body + # Docs / openapi / redoc are disabled. + assert client.get("/docs").status_code == 404 + assert client.get("/openapi.json").status_code == 404 + assert client.get("/redoc").status_code == 404