feat(canary): worker (HTTP + stdlib DNS callback receivers) + tests
decnet canary worker hosts both callback surfaces in one process:
- HTTP: a tiny FastAPI app on its own port (default 8088). The only
meaningful route is GET /c/{slug} which looks up the slug, persists
a CanaryTrigger, publishes canary.<id>.triggered, and returns a 1x1
transparent GIF. Unknown slugs return the same response (stealth);
no decnet strings leak in headers/banners; docs/openapi/redoc are
disabled. X-Forwarded-For is honored.
- DNS: an authoritative UDP server for *.<canary_zone> using
asyncio.DatagramProtocol with stdlib-only DNS wire-format parsing
(no dnslib dep). Same lookup -> persist -> publish flow, plus a
sinkhole A record (192.0.2.1) so the attacker's resolver doesn't
loop on NXDOMAIN. Single-label slugs only; multi-label probes
return NXDOMAIN. Pointer loops in malformed queries are caught
(10-hop cap) so an adversarial packet can't wedge the parser.
Tests cover both surfaces without privileged sockets:
- HTTP via Starlette TestClient: known/unknown slug, headers, XFF,
stealth-string assertions.
- DNS via direct DatagramProtocol drive: known slug -> ANSWER,
unknown -> NXDOMAIN, pointer-loop -> ValueError, malformed
packet -> silent drop.
This commit is contained in:
207
decnet/canary/dns_server.py
Normal file
207
decnet/canary/dns_server.py
Normal file
@@ -0,0 +1,207 @@
|
||||
"""Minimal authoritative DNS server for canary tokens (stdlib only).
|
||||
|
||||
We don't need a full resolver — only enough to:
|
||||
|
||||
1. Decode an inbound query's qname.
|
||||
2. If the qname matches ``<slug>.<canary_zone>``, log the callback,
|
||||
publish ``canary.<token_id>.triggered`` on the bus, and return a
|
||||
plausible A record (any RFC-5737 reserved address would do; we
|
||||
use 192.0.2.1) so the attacker's resolver doesn't loop on
|
||||
NXDOMAIN.
|
||||
3. For unknown qnames return NXDOMAIN.
|
||||
|
||||
DNS-over-UDP wire format is well-trodden: 12-byte header + name
|
||||
labels + qtype + qclass. We implement just the bits we need.
|
||||
|
||||
This module deliberately avoids the ``dnslib`` PyPI package so the
|
||||
canary worker has no extra dependency surface. If we ever need
|
||||
EDNS0, DNSSEC, or other niceties we'll swap to dnslib then.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import struct
|
||||
from dataclasses import dataclass
|
||||
from typing import Awaitable, Callable, Optional, Tuple
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DNSQuery:
|
||||
"""Decoded query — only the bits the canary worker cares about."""
|
||||
|
||||
txid: int
|
||||
qname: str # lowercase, no trailing dot
|
||||
qtype: int
|
||||
qclass: int
|
||||
flags: int
|
||||
|
||||
|
||||
def _decode_name(buf: bytes, offset: int) -> Tuple[str, int]:
|
||||
"""Return ``(qname_lowercase_no_dot, bytes_consumed)``.
|
||||
|
||||
Supports compressed pointers (RFC 1035 §4.1.4). Doesn't recurse —
|
||||
we walk the pointer chain iteratively with a hop cap to avoid
|
||||
pointer-loop DoS.
|
||||
"""
|
||||
labels: list[str] = []
|
||||
pos = offset
|
||||
consumed = 0
|
||||
jumped = False
|
||||
hops = 0
|
||||
while True:
|
||||
if pos >= len(buf):
|
||||
raise ValueError("truncated DNS name")
|
||||
length = buf[pos]
|
||||
if length == 0:
|
||||
pos += 1
|
||||
if not jumped:
|
||||
consumed = pos - offset
|
||||
break
|
||||
if (length & 0xC0) == 0xC0:
|
||||
# Compression pointer.
|
||||
if pos + 1 >= len(buf):
|
||||
raise ValueError("truncated DNS pointer")
|
||||
ptr = ((length & 0x3F) << 8) | buf[pos + 1]
|
||||
if not jumped:
|
||||
consumed = (pos + 2) - offset
|
||||
pos = ptr
|
||||
jumped = True
|
||||
hops += 1
|
||||
if hops > 10:
|
||||
raise ValueError("DNS pointer loop")
|
||||
continue
|
||||
pos += 1
|
||||
if pos + length > len(buf):
|
||||
raise ValueError("truncated DNS label")
|
||||
labels.append(buf[pos:pos + length].decode("ascii", "replace"))
|
||||
pos += length
|
||||
return ".".join(labels).lower(), consumed
|
||||
|
||||
|
||||
def parse_query(packet: bytes) -> DNSQuery:
|
||||
"""Parse the (single) question of a DNS query packet."""
|
||||
if len(packet) < 12:
|
||||
raise ValueError("DNS packet too short")
|
||||
txid, flags, qdcount, _ancount, _nscount, _arcount = struct.unpack(
|
||||
"!HHHHHH", packet[:12]
|
||||
)
|
||||
if qdcount != 1:
|
||||
raise ValueError(f"expected 1 question, got {qdcount}")
|
||||
qname, consumed = _decode_name(packet, 12)
|
||||
pos = 12 + consumed
|
||||
if pos + 4 > len(packet):
|
||||
raise ValueError("truncated DNS qtype/qclass")
|
||||
qtype, qclass = struct.unpack("!HH", packet[pos:pos + 4])
|
||||
return DNSQuery(
|
||||
txid=txid, qname=qname, qtype=qtype, qclass=qclass, flags=flags,
|
||||
)
|
||||
|
||||
|
||||
def _encode_name(name: str) -> bytes:
|
||||
out = bytearray()
|
||||
for label in name.split("."):
|
||||
if not label:
|
||||
continue
|
||||
b = label.encode("ascii", "replace")
|
||||
out.append(len(b))
|
||||
out.extend(b)
|
||||
out.append(0)
|
||||
return bytes(out)
|
||||
|
||||
|
||||
def _build_response(
|
||||
query: DNSQuery,
|
||||
*,
|
||||
rcode: int = 0,
|
||||
answer_ip: Optional[str] = None,
|
||||
) -> bytes:
|
||||
"""Encode a DNS response packet.
|
||||
|
||||
*rcode* 0 = NOERROR, 3 = NXDOMAIN. When *answer_ip* is supplied
|
||||
and the query was for an A record we include exactly one answer
|
||||
(TTL 60, class IN).
|
||||
"""
|
||||
qd_count = 1
|
||||
an_count = 1 if (answer_ip and query.qtype == 1 and rcode == 0) else 0
|
||||
flags = 0x8400 | rcode # response + authoritative + RA bit clear + rcode
|
||||
header = struct.pack(
|
||||
"!HHHHHH", query.txid, flags, qd_count, an_count, 0, 0,
|
||||
)
|
||||
qname_bytes = _encode_name(query.qname)
|
||||
question = qname_bytes + struct.pack("!HH", query.qtype, query.qclass)
|
||||
|
||||
answer = b""
|
||||
if an_count:
|
||||
# Use a name pointer back to the question (offset 12).
|
||||
ptr = struct.pack("!H", 0xC000 | 12)
|
||||
rdata = bytes(int(o) for o in answer_ip.split("."))
|
||||
answer = ptr + struct.pack("!HHIH", 1, 1, 60, 4) + rdata
|
||||
|
||||
return header + question + answer
|
||||
|
||||
|
||||
# Hook signature: receives the matched slug + the query; returns
|
||||
# nothing. The worker uses it to persist a CanaryTrigger row and
|
||||
# publish the bus event.
|
||||
TriggerHook = Callable[[str, DNSQuery, str], Awaitable[None]]
|
||||
|
||||
|
||||
class CanaryDNSProtocol(asyncio.DatagramProtocol):
|
||||
"""asyncio UDP server endpoint for canary DNS callbacks.
|
||||
|
||||
Constructor takes the canary zone (``"canary.example.test"``) and
|
||||
a coroutine called when a query matches ``<slug>.<zone>``. The
|
||||
hook runs in the event loop's task; we don't block the receive
|
||||
path on it.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
zone: str,
|
||||
hook: TriggerHook,
|
||||
*,
|
||||
answer_ip: str = "192.0.2.1",
|
||||
) -> None:
|
||||
# Normalise: lowercase, no leading/trailing dot.
|
||||
self._zone = zone.lower().strip(".")
|
||||
self._suffix = "." + self._zone if self._zone else ""
|
||||
self._hook = hook
|
||||
self._answer_ip = answer_ip
|
||||
self._transport: Optional[asyncio.DatagramTransport] = None
|
||||
|
||||
def connection_made(self, transport) -> None: # type: ignore[override]
|
||||
self._transport = transport # type: ignore[assignment]
|
||||
|
||||
def datagram_received( # type: ignore[override]
|
||||
self, data: bytes, addr: Tuple[str, int],
|
||||
) -> None:
|
||||
try:
|
||||
query = parse_query(data)
|
||||
except ValueError:
|
||||
# Malformed query — drop silently. Returning a FORMERR
|
||||
# would tip off the attacker that *something* is listening
|
||||
# on this port; the stealth posture (feedback_stealth)
|
||||
# prefers radio silence on parse errors.
|
||||
return
|
||||
slug = self._slug_for(query.qname)
|
||||
if slug is None:
|
||||
# Unknown name — NXDOMAIN.
|
||||
self._send(addr, _build_response(query, rcode=3))
|
||||
return
|
||||
# Known name — answer with our sinkhole IP, then fire the hook.
|
||||
self._send(addr, _build_response(query, answer_ip=self._answer_ip))
|
||||
asyncio.create_task(self._hook(slug, query, addr[0]))
|
||||
|
||||
def _slug_for(self, qname: str) -> Optional[str]:
|
||||
if not self._zone or not qname.endswith(self._suffix):
|
||||
return None
|
||||
slug = qname[: -len(self._suffix)]
|
||||
# Single-label slug only; multi-label means the attacker is
|
||||
# querying a sub-resource we don't model.
|
||||
if not slug or "." in slug:
|
||||
return None
|
||||
return slug
|
||||
|
||||
def _send(self, addr: Tuple[str, int], packet: bytes) -> None:
|
||||
if self._transport is not None:
|
||||
self._transport.sendto(packet, addr)
|
||||
254
decnet/canary/worker.py
Normal file
254
decnet/canary/worker.py
Normal file
@@ -0,0 +1,254 @@
|
||||
"""``decnet canary`` worker — HTTP + DNS callback receivers.
|
||||
|
||||
Two surfaces, one process:
|
||||
|
||||
* **HTTP** — a tiny FastAPI app on its own port (default 8088). The
|
||||
only useful route is ``GET /c/{slug}`` which looks up the slug in
|
||||
the canary token table, persists a :class:`CanaryTrigger` row,
|
||||
publishes ``canary.<token_id>.triggered`` on the bus, and returns
|
||||
a 1×1 transparent GIF (or 204 if the client's ``Accept`` doesn't
|
||||
list any image type).
|
||||
* **DNS** — an authoritative UDP server (default 5353 if non-root,
|
||||
53 if root) for ``*.<canary_zone>``. Same lookup + persist +
|
||||
publish flow, plus a sinkhole A record so the attacker's resolver
|
||||
doesn't loop on NXDOMAIN.
|
||||
|
||||
Both surfaces are **stealth** by policy
|
||||
(:mod:`feedback_stealth`): no DECNET strings in headers / banners /
|
||||
error pages. The HTTP app strips the default ``Server: uvicorn``
|
||||
header in middleware; FastAPI's docs/openapi UI is disabled because
|
||||
discovering them would tip off the attacker that this is a honeypot.
|
||||
|
||||
The worker is supervised by its own systemd unit
|
||||
(``decnet-canary.service``); like every other DECNET worker, it
|
||||
crashes loudly rather than masking failures.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import FastAPI, Request, Response
|
||||
|
||||
from decnet.bus import topics
|
||||
from decnet.bus.base import BaseBus
|
||||
from decnet.bus.factory import get_bus
|
||||
from decnet.canary.dns_server import CanaryDNSProtocol, DNSQuery
|
||||
from decnet.logging import get_logger
|
||||
from decnet.web.db.factory import get_repository
|
||||
from decnet.web.db.repository import BaseRepository
|
||||
|
||||
log = get_logger("canary.worker")
|
||||
|
||||
# 1×1 transparent GIF — public-domain canonical bytes. Returning the
|
||||
# same image every time is fine: the body has no information the
|
||||
# attacker shouldn't see, and image clients cache it.
|
||||
_TRANSPARENT_GIF = bytes.fromhex(
|
||||
"47494638396101000100800100000000ffffff21f90401000001002c00000000010001000002024401003b"
|
||||
)
|
||||
|
||||
|
||||
def _http_base() -> str:
|
||||
return os.environ.get("DECNET_CANARY_HTTP_BASE", "http://localhost:8088").rstrip("/")
|
||||
|
||||
|
||||
def _dns_zone() -> str:
|
||||
return os.environ.get("DECNET_CANARY_DNS_ZONE", "").strip(".").lower()
|
||||
|
||||
|
||||
def _http_port() -> int:
|
||||
return int(os.environ.get("DECNET_CANARY_HTTP_PORT", "8088"))
|
||||
|
||||
|
||||
def _dns_port() -> int:
|
||||
# Default 5353 (mDNS-ish, non-privileged) — operators pin :53 via
|
||||
# NAT or a CAP_NET_BIND_SERVICE-enabled unit.
|
||||
return int(os.environ.get("DECNET_CANARY_DNS_PORT", "5353"))
|
||||
|
||||
|
||||
def _dns_bind() -> str:
|
||||
return os.environ.get("DECNET_CANARY_DNS_BIND", "0.0.0.0") # nosec B104 — attacker-facing decoy listener, internet exposure is the design
|
||||
|
||||
|
||||
def _http_bind() -> str:
|
||||
return os.environ.get("DECNET_CANARY_HTTP_BIND", "0.0.0.0") # nosec B104 — same rationale
|
||||
|
||||
|
||||
# ---------------------------- HTTP surface --------------------------------
|
||||
|
||||
|
||||
def _build_app(repo: BaseRepository, bus: BaseBus) -> FastAPI:
|
||||
"""Construct the FastAPI app.
|
||||
|
||||
Disables docs / openapi / redoc — operators query the canary
|
||||
surface via the *main* DECNET API, never directly. Anyone hitting
|
||||
these paths is either misconfigured or scanning for a honeypot.
|
||||
"""
|
||||
app = FastAPI(
|
||||
title="", # don't leak "DECNET" in OpenAPI
|
||||
docs_url=None, redoc_url=None, openapi_url=None,
|
||||
)
|
||||
|
||||
@app.middleware("http")
|
||||
async def _stealth_headers(request: Request, call_next):
|
||||
response: Response = await call_next(request)
|
||||
# Strip the uvicorn / starlette banner; replace with a
|
||||
# generic Server line that matches what most CDNs return.
|
||||
response.headers["Server"] = "nginx"
|
||||
# Don't leak request id / process id headers.
|
||||
if "x-process-time" in response.headers:
|
||||
del response.headers["x-process-time"]
|
||||
return response
|
||||
|
||||
@app.get("/c/{slug}")
|
||||
async def callback(slug: str, request: Request) -> Response:
|
||||
await _record_hit(
|
||||
repo, bus,
|
||||
slug=slug,
|
||||
src_ip=_client_ip(request),
|
||||
user_agent=request.headers.get("user-agent"),
|
||||
request_path=str(request.url.path),
|
||||
dns_qname=None,
|
||||
raw_headers=dict(request.headers),
|
||||
)
|
||||
# Always 200 with a tiny image so the attacker's client sees
|
||||
# a "success" — same return regardless of whether the slug is
|
||||
# known. Stealth: do NOT distinguish unknown vs known via
|
||||
# status code or response body.
|
||||
return Response(content=_TRANSPARENT_GIF, media_type="image/gif")
|
||||
|
||||
@app.get("/")
|
||||
async def root() -> Response:
|
||||
# Bare root returns a generic 404. The decoy posture: pretend
|
||||
# to be an empty static-file host that just happens to resolve
|
||||
# /c/<slug> when it matches.
|
||||
return Response(status_code=404)
|
||||
|
||||
return app
|
||||
|
||||
|
||||
def _client_ip(request: Request) -> str:
|
||||
# Honor X-Forwarded-For if the operator deployed behind a reverse
|
||||
# proxy. Take the leftmost address in the chain; everything after
|
||||
# is upstream-proxy noise.
|
||||
fwd = request.headers.get("x-forwarded-for")
|
||||
if fwd:
|
||||
return fwd.split(",", 1)[0].strip()
|
||||
if request.client:
|
||||
return request.client.host
|
||||
return "0.0.0.0" # nosec B104 — sentinel for "unknown remote"
|
||||
|
||||
|
||||
# ---------------------------- shared persistence -------------------------
|
||||
|
||||
|
||||
async def _record_hit(
|
||||
repo: BaseRepository,
|
||||
bus: BaseBus,
|
||||
*,
|
||||
slug: str,
|
||||
src_ip: str,
|
||||
user_agent: Optional[str],
|
||||
request_path: Optional[str],
|
||||
dns_qname: Optional[str],
|
||||
raw_headers: Optional[dict],
|
||||
) -> None:
|
||||
"""Resolve slug -> token, persist a trigger, publish on the bus.
|
||||
|
||||
Unknown slugs are silently swallowed: returning the same response
|
||||
for known and unknown slugs is the stealth posture, and persisting
|
||||
every random scan would clutter the DB.
|
||||
"""
|
||||
token = await repo.get_canary_token_by_slug(slug)
|
||||
if token is None:
|
||||
return
|
||||
trigger_id = await repo.record_canary_trigger({
|
||||
"token_uuid": token["uuid"],
|
||||
"occurred_at": datetime.now(timezone.utc),
|
||||
"src_ip": src_ip,
|
||||
"user_agent": user_agent,
|
||||
"request_path": request_path,
|
||||
"dns_qname": dns_qname,
|
||||
"raw_headers": raw_headers or {},
|
||||
})
|
||||
try:
|
||||
await bus.publish(
|
||||
topics.canary(token["uuid"], topics.CANARY_TRIGGERED),
|
||||
{
|
||||
"token_id": token["uuid"],
|
||||
"trigger_id": trigger_id,
|
||||
"decky_name": token["decky_name"],
|
||||
"src_ip": src_ip,
|
||||
"user_agent": user_agent,
|
||||
"request_path": request_path,
|
||||
"dns_qname": dns_qname,
|
||||
},
|
||||
)
|
||||
except Exception as e: # noqa: BLE001 — best effort
|
||||
log.warning("canary.triggered publish failed slug=%s err=%s", slug, e)
|
||||
|
||||
|
||||
# ---------------------------- DNS surface --------------------------------
|
||||
|
||||
|
||||
async def _start_dns_server(
|
||||
repo: BaseRepository, bus: BaseBus, *, loop: asyncio.AbstractEventLoop,
|
||||
) -> Optional[asyncio.DatagramTransport]:
|
||||
zone = _dns_zone()
|
||||
if not zone:
|
||||
log.info("canary.dns disabled (DECNET_CANARY_DNS_ZONE unset)")
|
||||
return None
|
||||
|
||||
async def _hook(slug: str, query: DNSQuery, src_ip: str) -> None:
|
||||
await _record_hit(
|
||||
repo, bus,
|
||||
slug=slug, src_ip=src_ip, user_agent=None,
|
||||
request_path=None, dns_qname=query.qname,
|
||||
raw_headers=None,
|
||||
)
|
||||
|
||||
transport, _proto = await loop.create_datagram_endpoint(
|
||||
lambda: CanaryDNSProtocol(zone, _hook),
|
||||
local_addr=(_dns_bind(), _dns_port()),
|
||||
)
|
||||
log.info("canary.dns listening zone=%s port=%d", zone, _dns_port())
|
||||
return transport # type: ignore[return-value]
|
||||
|
||||
|
||||
# ---------------------------- entry point --------------------------------
|
||||
|
||||
|
||||
async def run() -> None:
|
||||
"""Worker entry point — kicked off by ``decnet canary``."""
|
||||
import uvicorn
|
||||
|
||||
repo = get_repository()
|
||||
await repo.initialize()
|
||||
bus = get_bus()
|
||||
await bus.connect()
|
||||
|
||||
app = _build_app(repo, bus)
|
||||
config = uvicorn.Config(
|
||||
app,
|
||||
host=_http_bind(),
|
||||
port=_http_port(),
|
||||
log_level="warning",
|
||||
access_log=False, # stealth: no per-request lines
|
||||
server_header=False, # we set Server: nginx in middleware
|
||||
)
|
||||
server = uvicorn.Server(config)
|
||||
loop = asyncio.get_running_loop()
|
||||
dns_transport = await _start_dns_server(repo, bus, loop=loop)
|
||||
try:
|
||||
await server.serve()
|
||||
finally:
|
||||
if dns_transport is not None:
|
||||
dns_transport.close()
|
||||
await bus.close()
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""CLI entry point — synchronous wrapper for ``asyncio.run``."""
|
||||
asyncio.run(run())
|
||||
Reference in New Issue
Block a user