Files
DECNET/templates/llmnr/server.py
anti 8dd4c78b33 refactor: strip DECNET tokens from container-visible surface
Rename the container-side logging module decnet_logging → syslog_bridge
(canonical at templates/syslog_bridge.py, synced into each template by
the deployer). Drop the stale per-template copies; setuptools find was
picking them up anyway. Swap useradd/USER/chown "decnet" for "logrelay"
so no obvious token appears in the rendered container image.

Apply the same cloaking pattern to the telnet template that SSH got:
syslog pipe moves to /run/systemd/journal/syslog-relay and the relay
is cat'd via exec -a "systemd-journal-fwd". rsyslog.d conf rename
99-decnet.conf → 50-journal-forward.conf. SSH capture script:
/var/decnet/captured → /var/lib/systemd/coredump (real systemd path),
logger tag decnet-capture → systemd-journal. Compose volume updated
to match the new in-container quarantine path.

SD element ID shifts decnet@55555 → relay@55555; synced across
collector, parser, sniffer, prober, formatter, tests, and docs so the
host-side pipeline still matches what containers emit.
2026-04-17 22:57:53 -04:00

114 lines
3.3 KiB
Python

#!/usr/bin/env python3
"""
LLMNR / mDNS poisoning detector (UDP 5355 and UDP 5353).
Listens for any incoming name-resolution queries. Any traffic here is a
strong signal of an attacker running Responder or similar tools on the LAN.
Logs every packet with source IP and decoded query name where possible.
"""
import asyncio
import os
import struct
from syslog_bridge import syslog_line, write_syslog_file, forward_syslog
NODE_NAME = os.environ.get("NODE_NAME", "lan-host")
SERVICE_NAME = "llmnr"
LOG_TARGET = os.environ.get("LOG_TARGET", "")
def _log(event_type: str, severity: int = 6, **kwargs) -> None:
line = syslog_line(SERVICE_NAME, NODE_NAME, event_type, severity, **kwargs)
write_syslog_file(line)
forward_syslog(line, LOG_TARGET)
def _decode_dns_name(data: bytes, offset: int) -> str:
"""Decode a DNS-encoded label sequence starting at offset."""
labels = []
visited = set()
pos = offset
while pos < len(data):
if pos in visited:
break
visited.add(pos)
length = data[pos]
if length == 0:
break
if length & 0xc0 == 0xc0: # pointer
if pos + 1 >= len(data):
break
ptr = ((length & 0x3f) << 8) | data[pos + 1]
labels.append(_decode_dns_name(data, ptr))
break
pos += 1
labels.append(data[pos:pos + length].decode(errors="replace"))
pos += length
return ".".join(labels)
def _parse_query(data: bytes, proto: str, src_addr) -> None:
"""Parse DNS/LLMNR/mDNS query and log the queried name."""
try:
if len(data) < 12:
raise ValueError("too short")
flags = struct.unpack(">H", data[2:4])[0]
qr = (flags >> 15) & 1
qdcount = struct.unpack(">H", data[4:6])[0]
if qr != 0 or qdcount < 1:
return # not a query or no questions
name = _decode_dns_name(data, 12)
pos = 12
while pos < len(data) and data[pos] != 0:
pos += data[pos] + 1
pos += 1
qtype = struct.unpack(">H", data[pos:pos + 2])[0] if pos + 2 <= len(data) else 0
_log(
"query",
proto=proto,
src=src_addr[0],
src_port=src_addr[1],
name=name,
qtype=qtype,
)
except Exception as e:
_log("raw_packet", proto=proto, src=src_addr[0], data=data[:64].hex(), error=str(e))
class LLMNRProtocol(asyncio.DatagramProtocol):
def __init__(self, proto_label: str):
self._proto = proto_label
def datagram_received(self, data, addr):
_parse_query(data, self._proto, addr)
def error_received(self, exc):
pass
async def main():
_log("startup", msg=f"LLMNR/mDNS server starting as {NODE_NAME}")
loop = asyncio.get_running_loop()
# LLMNR: UDP 5355
llmnr_transport, _ = await loop.create_datagram_endpoint(
lambda: LLMNRProtocol("LLMNR"),
local_addr=("0.0.0.0", 5355), # nosec B104
)
# mDNS: UDP 5353
mdns_transport, _ = await loop.create_datagram_endpoint(
lambda: LLMNRProtocol("mDNS"),
local_addr=("0.0.0.0", 5353), # nosec B104
)
try:
await asyncio.sleep(float("inf"))
finally:
llmnr_transport.close()
mdns_transport.close()
if __name__ == "__main__":
asyncio.run(main())