From f715ac6bcd6780f6c52d168ecc4dfbd18b1d02eb Mon Sep 17 00:00:00 2001 From: anti Date: Fri, 19 Jun 2026 21:32:50 -0400 Subject: [PATCH] feat(cloak): egress SYN-ACK mangler + T2/T3 probe-response synthesizer In-decky-netns NFQUEUE rewriter (window/option-order/IP-ID) and raw-socket synthesizer for nmap probes Linux drops but the target OS answers (T2/T3), driven by os_fingerprint.OS_MANGLE. Packet-shaping logic is pure and unit-tested offline; scapy/netfilterqueue import lazily in the runtime loops. Entry: python -m decnet.cloak (run by the base container; CAP_NET_ADMIN). --- decnet/cloak/__init__.py | 30 +++++++++ decnet/cloak/__main__.py | 53 +++++++++++++++ decnet/cloak/mangler.py | 131 ++++++++++++++++++++++++++++++++++++++ decnet/cloak/responder.py | 86 +++++++++++++++++++++++++ tests/cloak/__init__.py | 0 tests/cloak/test_cloak.py | 130 +++++++++++++++++++++++++++++++++++++ 6 files changed, 430 insertions(+) create mode 100644 decnet/cloak/__init__.py create mode 100644 decnet/cloak/__main__.py create mode 100644 decnet/cloak/mangler.py create mode 100644 decnet/cloak/responder.py create mode 100644 tests/cloak/__init__.py create mode 100644 tests/cloak/test_cloak.py diff --git a/decnet/cloak/__init__.py b/decnet/cloak/__init__.py new file mode 100644 index 00000000..c836266f --- /dev/null +++ b/decnet/cloak/__init__.py @@ -0,0 +1,30 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +""" +DECNET cloak — egress TCP/IP fingerprint masquerading for deckies. + +sysctls (decnet/os_fingerprint.py) own GLOBAL packet fields. The cloak owns the +SYN-ACK *shape* and stack *behaviours* sysctl can't reach, so a decky reads as +its claimed nmap_os under active fingerprinting (nmap -O): + + - mangler : NFQUEUE rewrite of egress SYN-ACK (window, TCP option order, + IP-ID generation) to match the MangleProfile for the slug. + - responder : raw-socket synthesis of replies to probes the Linux kernel + drops but the target OS answers (nmap T2/T3). + +Both run INSIDE the decky's network namespace (CAP_NET_ADMIN), launched by the +base container — never a sidecar (that would double container count per decky). +Driven by os_fingerprint.get_os_mangle(nmap_os); a slug with no profile is a +no-op (the real Linux stack already approximates it). +""" +from __future__ import annotations + +from decnet.cloak.mangler import build_synack_options, next_ipid +from decnet.cloak.responder import ProbeKind, build_reply_fields, classify_probe + +__all__ = [ + "build_synack_options", + "next_ipid", + "classify_probe", + "build_reply_fields", + "ProbeKind", +] diff --git a/decnet/cloak/__main__.py b/decnet/cloak/__main__.py new file mode 100644 index 00000000..00fd509d --- /dev/null +++ b/decnet/cloak/__main__.py @@ -0,0 +1,53 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +""" +Cloak entrypoint — run inside the decky base container (CAP_NET_ADMIN). + + python -m decnet.cloak + +Config via env (set by the composer when nmap_os has a mangle profile): + DECNET_NMAP_OS nmap_os slug (e.g. "windows", "windows_server") + DECNET_OPEN_PORTS comma-separated TCP ports the decky serves (for T2/T3) + DECKY_IP this decky's IP (BPF scope for the responder) + +Starts the mangler and responder, each in its own thread. A slug with no mangle +profile exits 0 immediately — harmless to launch unconditionally. +""" +from __future__ import annotations + +import os +import threading + +from decnet.cloak import mangler, responder +from decnet.logging import get_logger +from decnet.os_fingerprint import get_os_mangle + +log = get_logger("cloak") + + +def _open_ports() -> frozenset[int]: + raw = os.environ.get("DECNET_OPEN_PORTS", "") + return frozenset(int(p) for p in raw.split(",") if p.strip().isdigit()) + + +def main() -> int: + nmap_os = os.environ.get("DECNET_NMAP_OS", "linux") + if get_os_mangle(nmap_os) is None: + log.info("cloak: no mangle profile for %r — exiting", nmap_os) + return 0 + + threads = [ + threading.Thread(target=mangler.run, args=(nmap_os,), + name="cloak-mangler", daemon=True), + threading.Thread(target=responder.run, args=(nmap_os, _open_ports()), + name="cloak-responder", daemon=True), + ] + for t in threads: + t.start() + log.info("cloak: started for nmap_os=%r", nmap_os) + for t in threads: + t.join() + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/decnet/cloak/mangler.py b/decnet/cloak/mangler.py new file mode 100644 index 00000000..50bb2b62 --- /dev/null +++ b/decnet/cloak/mangler.py @@ -0,0 +1,131 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +""" +Egress SYN-ACK mangler — rewrites the TCP/IP option shape sysctl can't reach. + +Split so the packet-shaping logic is pure and unit-testable without scapy, root, +or a live NFQUEUE: + + - build_synack_options() / next_ipid() : pure, tested offline. + - _rewrite() : mutates a scapy packet (lazy import). + - run() : the NFQUEUE loop (needs CAP_NET_ADMIN). + +scapy/netfilterqueue are imported lazily inside the runtime functions, mirroring +decnet/prober/tcpfp.py, so importing this module is cheap and side-effect-free. +""" +from __future__ import annotations + +import os +import signal +import subprocess # nosec B404 — fixed-arg iptables, no shell +import sys +from typing import Any + +from decnet.logging import get_logger +from decnet.os_fingerprint import MangleProfile, get_os_mangle + +log = get_logger("cloak.mangler") + +_QUEUE = 0 +# Queue every egress packet carrying SYN (covers SYN-ACK incl. ECN/CWR variants); +# --queue-bypass means a dead handler never blackholes the decky. +_RULE = [ + "OUTPUT", "-p", "tcp", "--tcp-flags", "SYN", "SYN", + "-j", "NFQUEUE", "--queue-num", str(_QUEUE), "--queue-bypass", +] + + +def next_ipid(prev: int, mode: str) -> int: + """Next IP-ID for *mode*: 'incr' (TI=I), 'random' (TI=RD), 'keep' (unchanged). + + 'keep' returns -1 as a sentinel meaning "do not touch the kernel's value". + """ + if mode == "incr": + return (prev + 1) & 0xFFFF + if mode == "random": + # Not for security — only to read as randomized to nmap (TI=RD). + return int.from_bytes(os.urandom(2), "big") or 1 + return -1 + + +def build_synack_options( + orig_options: list[tuple[str, Any]], profile: MangleProfile +) -> list[tuple[str, Any]]: + """Build the SYN-ACK TCP option list for *profile*, preserving the kernel's + live Timestamp value (so nmap's SEQ.TS increment-rate test still passes). + + *orig_options* is a scapy-style ``[(name, value), ...]`` list. + """ + ts = next((v for n, v in orig_options if n == "Timestamp"), None) + out: list[tuple[str, Any]] = [] + for code in profile.option_order: + if code == "MSS": + out.append(("MSS", profile.mss)) + elif code == "WScale": + out.append(("WScale", profile.wscale)) + elif code == "SAckOK": + out.append(("SAckOK", b"")) + elif code == "NOP": + out.append(("NOP", None)) + elif code == "TS": + if ts is not None: # only if sysctl kept timestamps on + out.append(("Timestamp", ts)) + return out + + +def _is_synack(flags: int) -> bool: + return bool(flags & 0x02) and bool(flags & 0x10) # SYN & ACK + + +def _iptables(action: str) -> None: + subprocess.run(["iptables", action, *_RULE], check=True) # nosec B603 B607 + + +def run(nmap_os: str) -> int: + """Install the NFQUEUE rule and rewrite egress SYN-ACK for *nmap_os*.""" + profile = get_os_mangle(nmap_os) + if profile is None: + log.info("cloak.mangler: no profile for %r — nothing to do", nmap_os) + return 0 + + from netfilterqueue import NetfilterQueue # type: ignore + from scapy.all import IP, TCP # type: ignore + + ipid = [0x0400] + + def _rewrite(pkt: Any) -> None: + try: + p = IP(pkt.get_payload()) + if p.haslayer(TCP) and _is_synack(int(p[TCP].flags)): + p[TCP].window = profile.window + p[TCP].options = build_synack_options(p[TCP].options, profile) + nid = next_ipid(ipid[0], profile.ipid) + if nid >= 0: + ipid[0] = nid + p[IP].id = nid + # options length changed → dataofs MUST be recomputed, else the + # kernel emits a malformed segment that breaks real connections. + del p[IP].chksum, p[TCP].chksum, p[IP].len, p[TCP].dataofs + pkt.set_payload(bytes(p)) + except Exception: # nosec B110 — never drop a packet on a rewrite bug + log.exception("cloak.mangler: rewrite failed; passing packet through") + pkt.accept() + + _iptables("-A") + nfq = NetfilterQueue() + nfq.bind(_QUEUE, _rewrite) + + def _cleanup(*_: Any) -> None: + try: + _iptables("-D") + finally: + sys.exit(0) + + signal.signal(signal.SIGTERM, _cleanup) + signal.signal(signal.SIGINT, _cleanup) + log.info("cloak.mangler: rewriting SYN-ACK -> %s (window=%#x ipid=%s)", + nmap_os, profile.window, profile.ipid) + try: + nfq.run() + finally: + _iptables("-D") + return 0 diff --git a/decnet/cloak/responder.py b/decnet/cloak/responder.py new file mode 100644 index 00000000..60b98645 --- /dev/null +++ b/decnet/cloak/responder.py @@ -0,0 +1,86 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +""" +Probe-response synthesizer — answers the nmap probes the Linux kernel drops. + +nmap's T2 (null-flags) and T3 (SYN+FIN+PSH+URG) to an OPEN port get no reply +from Linux (R=N), but Windows replies RST+ACK. We sniff the probe and inject the +target-OS-shaped reply ourselves; the kernel stays silent, so nothing races us. + +Pure classification/reply logic is separated from the scapy sniff/send loop so it +is unit-testable without root or a live capture. +""" +from __future__ import annotations + +import enum +import os +from typing import Any + +from decnet.logging import get_logger +from decnet.os_fingerprint import get_os_mangle + +log = get_logger("cloak.responder") + +_NULL = 0x00 +_T3 = 0x2B # SYN+FIN+PSH+URG + + +class ProbeKind(enum.Enum): + T2 = "T2" + T3 = "T3" + + +def classify_probe(flags: int, dport: int, open_ports: frozenset[int]) -> ProbeKind | None: + """Identify an nmap T2/T3 probe by flag combo + open destination port. + + Returns None for anything else (legit traffic, probes to closed ports, and + T1/T4-T7 which the real stack already answers). + """ + if dport not in open_ports: + return None + if flags == _NULL: + return ProbeKind.T2 + if flags == _T3: + return ProbeKind.T3 + return None + + +def build_reply_fields(probe_seq: int) -> dict[str, Any]: + """Windows T2/T3 reply fields: seq 0, ack=probe seq, RST+ACK, window 0. + + (nmap T2/T3 for Windows: S=Z, A=S, F=AR, W=0, DF=1.) + """ + return {"seq": 0, "ack": probe_seq, "flags": "RA", "window": 0, "df": True} + + +def run(nmap_os: str, open_ports: frozenset[int], decky_ip: str | None = None) -> int: + """Sniff for T2/T3 probes to *open_ports* and inject Windows-shaped replies.""" + profile = get_os_mangle(nmap_os) + if profile is None or not profile.respond_t2t3: + log.info("cloak.responder: nothing to do for %r", nmap_os) + return 0 + + from scapy.all import IP, TCP, send, sniff # type: ignore + + ip = decky_ip or os.environ.get("DECKY_IP", "") + ipid = [0x0800] + + def _on(pkt: Any) -> None: + if not pkt.haslayer(TCP): + return + kind = classify_probe(int(pkt[TCP].flags), int(pkt[TCP].dport), open_ports) + if kind is None: + return + f = build_reply_fields(int(pkt[TCP].seq)) + ipid[0] = (ipid[0] + 1) & 0xFFFF + reply = ( + IP(src=pkt[IP].dst, dst=pkt[IP].src, id=ipid[0], flags="DF", ttl=128) + / TCP(sport=int(pkt[TCP].dport), dport=int(pkt[TCP].sport), + seq=f["seq"], ack=f["ack"], flags=f["flags"], window=f["window"]) + ) + send(reply, verbose=0) + + bpf = f"tcp and dst host {ip}" if ip else "tcp" + log.info("cloak.responder: answering T2/T3 on %d ports (filter=%r)", + len(open_ports), bpf) + sniff(filter=bpf, prn=_on, store=0) + return 0 diff --git a/tests/cloak/__init__.py b/tests/cloak/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/cloak/test_cloak.py b/tests/cloak/test_cloak.py new file mode 100644 index 00000000..674de3a4 --- /dev/null +++ b/tests/cloak/test_cloak.py @@ -0,0 +1,130 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +""" +Tests for the cloak mangler/responder PURE logic — option layout, IP-ID policy, +probe classification, reply fields. No scapy, root, or live NFQUEUE involved +(the runtime loops are exercised only on real deckies, not in CI). +""" +from __future__ import annotations + +import pytest + +from decnet.cloak import ( + ProbeKind, + build_reply_fields, + build_synack_options, + classify_probe, + next_ipid, +) +from decnet.cloak.mangler import _is_synack +from decnet.os_fingerprint import OS_MANGLE, MangleProfile, get_os_mangle + +WIN = OS_MANGLE["windows"] +SRV = OS_MANGLE["windows_server"] + + +# ── profile wiring ────────────────────────────────────────────────────────── + +def test_get_os_mangle_known(): + assert isinstance(get_os_mangle("windows"), MangleProfile) + assert get_os_mangle("windows_server").ipid == "random" + + +def test_get_os_mangle_none_for_linux(): + assert get_os_mangle("linux") is None + assert get_os_mangle("nonexistent") is None + + +def test_windows_workstation_ipid_is_incr(): + # Win10 workstation = incremental IP-ID (nmap TI=I); server = randomized (RD). + assert WIN.ipid == "incr" + assert SRV.ipid == "random" + + +# ── SYN-ACK option building ───────────────────────────────────────────────── + +def test_options_layout_with_timestamp_preserved(): + orig = [("MSS", 1460), ("SAckOK", b""), ("Timestamp", (111, 222)), + ("NOP", None), ("WScale", 7)] + out = build_synack_options(orig, WIN) + names = [n for n, _ in out] + assert names == ["MSS", "NOP", "WScale", "SAckOK", "Timestamp"] + # the kernel's live timestamp value must survive (SEQ.TS rate test) + assert ("Timestamp", (111, 222)) in out + # our chosen mss/wscale override whatever the kernel emitted + assert ("MSS", WIN.mss) in out + assert ("WScale", WIN.wscale) in out + + +def test_options_drop_timestamp_when_kernel_had_none(): + """If timestamps are off (no kernel TS option), emit none — never a fake one.""" + orig = [("MSS", 1460), ("SAckOK", b""), ("NOP", None), ("WScale", 7)] + out = build_synack_options(orig, WIN) + assert all(n != "Timestamp" for n, _ in out) + + +def test_options_length_is_4byte_aligned(): + """Sanity: the windows option layout encodes to a multiple of 4 bytes.""" + from scapy.all import TCP # type: ignore # noqa + pytest.importorskip("scapy") + orig = [("MSS", 1460), ("Timestamp", (1, 2))] + out = build_synack_options(orig, WIN) + raw = bytes(TCP(options=out))[20:] # options after the 20-byte base header + assert len(raw) % 4 == 0 + + +# ── IP-ID policy ──────────────────────────────────────────────────────────── + +def test_next_ipid_incr_wraps(): + assert next_ipid(5, "incr") == 6 + assert next_ipid(0xFFFF, "incr") == 0 + + +def test_next_ipid_random_in_range_nonzero(): + for _ in range(50): + v = next_ipid(0, "random") + assert 1 <= v <= 0xFFFF + + +def test_next_ipid_keep_sentinel(): + assert next_ipid(123, "keep") == -1 + + +# ── SYN-ACK detection ─────────────────────────────────────────────────────── + +@pytest.mark.parametrize("flags,expected", [ + (0x12, True), # SYN+ACK + (0x52, True), # SYN+ACK+ECE (ECN SYN-ACK) + (0x02, False), # bare SYN + (0x10, False), # bare ACK +]) +def test_is_synack(flags, expected): + assert _is_synack(flags) is expected + + +# ── probe classification ──────────────────────────────────────────────────── + +OPEN = frozenset({22, 80, 443}) + + +def test_classify_t2_null_flags_open_port(): + assert classify_probe(0x00, 80, OPEN) is ProbeKind.T2 + + +def test_classify_t3_synfinpshurg_open_port(): + assert classify_probe(0x2B, 80, OPEN) is ProbeKind.T3 + + +def test_classify_ignores_closed_port(): + assert classify_probe(0x00, 9999, OPEN) is None + + +def test_classify_ignores_normal_traffic(): + assert classify_probe(0x02, 80, OPEN) is None # SYN — real stack handles + assert classify_probe(0x10, 80, OPEN) is None # ACK + + +# ── reply field shaping ───────────────────────────────────────────────────── + +def test_reply_fields_windows_shape(): + f = build_reply_fields(probe_seq=0xDEAD) + assert f == {"seq": 0, "ack": 0xDEAD, "flags": "RA", "window": 0, "df": True}