diff --git a/decnet/cloak/mangler.py b/decnet/cloak/mangler.py index 3d6b7000..bce731c8 100644 --- a/decnet/cloak/mangler.py +++ b/decnet/cloak/mangler.py @@ -1,11 +1,17 @@ # SPDX-License-Identifier: AGPL-3.0-or-later """ -Egress SYN-ACK mangler — rewrites the TCP/IP option shape sysctl can't reach. +Egress mangler — rewrites the TCP/IP shape & behaviours sysctl can't reach. + +Touches only the fingerprint-relevant egress packets: + - SYN-ACK : window, TCP option order, IP-ID (nmap OPS/WIN/TI) + - RST : IP-ID + a nonzero ack on bare RSTs (nmap CI, T4/T6 A=O) + - ICMP echo-reply : code=0 + IP-ID (nmap IE.CD, II) +A single shared IP-ID counter across all three reads as a shared sequence (SS=S). Split so the packet-shaping logic is pure and unit-testable without scapy, root, or a live NFQUEUE: - - build_synack_options() / next_ipid() : pure, tested offline. + - build_synack_options() / next_ipid() / _rst_needs_ack() : pure, tested offline. - _rewrite() : mutates a scapy packet (lazy import). - run() : the NFQUEUE loop (needs CAP_NET_ADMIN). @@ -27,14 +33,28 @@ from decnet.os_fingerprint import MangleProfile, get_os_mangle log = get_logger("cloak.mangler") _QUEUE = 0 -# Queue every egress packet carrying SYN (covers SYN-ACK incl. ECN/CWR variants); -# --queue-bypass means a dead handler never blackholes the decky. -_RULE = [ - "OUTPUT", "-p", "tcp", "--tcp-flags", "SYN", "SYN", - "-j", "NFQUEUE", "--queue-num", str(_QUEUE), "--queue-bypass", +# Only the fingerprint-relevant egress packets are queued (never bulk data): +# SYN-bearing → SYN-ACK (OPS/WIN/options/TI) +# RST-bearing → T4-T7 RST shape (CI IP-ID, T4/T6 ack) +# ICMP echo-reply → IE.CD code + II IP-ID +# --queue-bypass: a dead handler never blackholes the decky. +def _nfq_rule(match: list[str]) -> list[str]: + return ["OUTPUT", *match, "-j", "NFQUEUE", "--queue-num", str(_QUEUE), "--queue-bypass"] + + +_RULES = [ + _nfq_rule(["-p", "tcp", "--tcp-flags", "SYN", "SYN"]), + _nfq_rule(["-p", "tcp", "--tcp-flags", "RST", "RST"]), + _nfq_rule(["-p", "icmp", "--icmp-type", "echo-reply"]), ] +def _rst_needs_ack(flags: int) -> bool: + """A bare RST (RST set, ACK clear) — the T4/T6 case. Windows fills a nonzero + ack (nmap A=O); Linux leaves it 0 (A=Z). R+ACK RSTs (T5/T7) already match.""" + return bool(flags & 0x04) and not (flags & 0x10) + + def next_ipid(prev: int, mode: str) -> int: """Next IP-ID for *mode*: 'incr' (TI=I), 'random' (TI=RD), 'keep' (unchanged). @@ -78,34 +98,58 @@ def _is_synack(flags: int) -> bool: def _iptables(action: str) -> None: - subprocess.run(["iptables", action, *_RULE], check=True) # nosec B603 B607 + for rule in _RULES: + subprocess.run(["iptables", action, *rule], check=True) # nosec B603 B607 def run(nmap_os: str) -> int: - """Install the NFQUEUE rule and rewrite egress SYN-ACK for *nmap_os*.""" + """Install the NFQUEUE rules and rewrite egress SYN-ACK / RST / ICMP for *nmap_os*.""" profile = get_os_mangle(nmap_os) if profile is None: log.info("cloak.mangler: no profile for %r — nothing to do", nmap_os) return 0 from netfilterqueue import NetfilterQueue # type: ignore - from scapy.all import IP, TCP # type: ignore + from scapy.all import ICMP, IP, TCP # type: ignore + # ONE shared IP-ID counter across SYN-ACK / RST / ICMP — keeps TCP and ICMP + # IDs close, which is what nmap reads as a shared sequence (SS=S, Windows). ipid = [0x0400] + def _bump_ipid(p: Any) -> None: + nid = next_ipid(ipid[0], profile.ipid) + if nid >= 0: + ipid[0] = nid + p[IP].id = nid + def _rewrite(pkt: Any) -> None: try: p = IP(pkt.get_payload()) - if p.haslayer(TCP) and _is_synack(int(p[TCP].flags)): - p[TCP].window = profile.window - p[TCP].options = build_synack_options(p[TCP].options, profile) - nid = next_ipid(ipid[0], profile.ipid) - if nid >= 0: - ipid[0] = nid - p[IP].id = nid - # options length changed → dataofs MUST be recomputed, else the - # kernel emits a malformed segment that breaks real connections. - del p[IP].chksum, p[TCP].chksum, p[IP].len, p[TCP].dataofs + touched = False + tcp_synack = False + if p.haslayer(TCP): + f = int(p[TCP].flags) + if _is_synack(f): + p[TCP].window = profile.window + p[TCP].options = build_synack_options(p[TCP].options, profile) + _bump_ipid(p) + touched = tcp_synack = True + elif f & 0x04: # RST (T4-T7) + _bump_ipid(p) + if _rst_needs_ack(f): + p[TCP].ack = (int(p[TCP].seq) + 1) & 0xFFFFFFFF # A=O + touched = True + if touched: + del p[TCP].chksum + if tcp_synack: # options length changed → recompute offset + del p[TCP].dataofs + elif p.haslayer(ICMP) and int(p[ICMP].type) == 0: # echo-reply + p[ICMP].code = 0 # IE.CD=Z (Windows); Linux echoes the code + _bump_ipid(p) + del p[ICMP].chksum + touched = True + if touched: + del p[IP].chksum, p[IP].len pkt.set_payload(bytes(p)) except Exception: # nosec B110 — never drop a packet on a rewrite bug log.exception("cloak.mangler: rewrite failed; passing packet through") @@ -127,7 +171,7 @@ def run(nmap_os: str) -> int: if threading.current_thread() is threading.main_thread(): signal.signal(signal.SIGTERM, _cleanup) signal.signal(signal.SIGINT, _cleanup) - log.info("cloak.mangler: rewriting SYN-ACK -> %s (window=%#x ipid=%s)", + log.info("cloak.mangler: rewriting SYN-ACK/RST/ICMP -> %s (window=%#x ipid=%s)", nmap_os, profile.window, profile.ipid) try: nfq.run() diff --git a/decnet/cloak/responder.py b/decnet/cloak/responder.py index 60b98645..7e46b52f 100644 --- a/decnet/cloak/responder.py +++ b/decnet/cloak/responder.py @@ -44,12 +44,14 @@ def classify_probe(flags: int, dport: int, open_ports: frozenset[int]) -> ProbeK return None -def build_reply_fields(probe_seq: int) -> dict[str, Any]: - """Windows T2/T3 reply fields: seq 0, ack=probe seq, RST+ACK, window 0. +def build_reply_fields(probe_seq: int, kind: ProbeKind) -> dict[str, Any]: + """Windows T2/T3 reply fields: seq 0, RST+ACK, window 0, DF=1. - (nmap T2/T3 for Windows: S=Z, A=S, F=AR, W=0, DF=1.) + ack differs by probe (nmap): T2 A=S (ack == probe seq); T3 A=O (other — we + use probe seq + 1 so it reads as 'other', never zero or the probe seq). """ - return {"seq": 0, "ack": probe_seq, "flags": "RA", "window": 0, "df": True} + ack = probe_seq if kind is ProbeKind.T2 else (probe_seq + 1) & 0xFFFFFFFF + return {"seq": 0, "ack": ack, "flags": "RA", "window": 0, "df": True} def run(nmap_os: str, open_ports: frozenset[int], decky_ip: str | None = None) -> int: @@ -59,25 +61,29 @@ def run(nmap_os: str, open_ports: frozenset[int], decky_ip: str | None = None) - log.info("cloak.responder: nothing to do for %r", nmap_os) return 0 - from scapy.all import IP, TCP, send, sniff # type: ignore + from scapy.all import IP, TCP, Ether, sendp, sniff # type: ignore ip = decky_ip or os.environ.get("DECKY_IP", "") ipid = [0x0800] def _on(pkt: Any) -> None: - if not pkt.haslayer(TCP): + if not pkt.haslayer(TCP) or not pkt.haslayer(Ether): return kind = classify_probe(int(pkt[TCP].flags), int(pkt[TCP].dport), open_ports) if kind is None: return - f = build_reply_fields(int(pkt[TCP].seq)) + f = build_reply_fields(int(pkt[TCP].seq), kind) ipid[0] = (ipid[0] + 1) & 0xFFFF + # Inject at L2 (reflecting the probe's MACs) so the reply BYPASSES the + # OUTPUT chain — otherwise the mangler's RST rule would re-process and + # drop our own RST. The reply is already in final Windows shape. reply = ( - IP(src=pkt[IP].dst, dst=pkt[IP].src, id=ipid[0], flags="DF", ttl=128) + Ether(src=pkt[Ether].dst, dst=pkt[Ether].src) + / IP(src=pkt[IP].dst, dst=pkt[IP].src, id=ipid[0], flags="DF", ttl=128) / TCP(sport=int(pkt[TCP].dport), dport=int(pkt[TCP].sport), seq=f["seq"], ack=f["ack"], flags=f["flags"], window=f["window"]) ) - send(reply, verbose=0) + sendp(reply, iface=pkt.sniffed_on, verbose=0) bpf = f"tcp and dst host {ip}" if ip else "tcp" log.info("cloak.responder: answering T2/T3 on %d ports (filter=%r)", diff --git a/tests/cloak/test_cloak.py b/tests/cloak/test_cloak.py index 674de3a4..4a44a4a1 100644 --- a/tests/cloak/test_cloak.py +++ b/tests/cloak/test_cloak.py @@ -15,7 +15,7 @@ from decnet.cloak import ( classify_probe, next_ipid, ) -from decnet.cloak.mangler import _is_synack +from decnet.cloak.mangler import _is_synack, _rst_needs_ack from decnet.os_fingerprint import OS_MANGLE, MangleProfile, get_os_mangle WIN = OS_MANGLE["windows"] @@ -101,6 +101,15 @@ def test_is_synack(flags, expected): assert _is_synack(flags) is expected +@pytest.mark.parametrize("flags,expected", [ + (0x04, True), # bare RST (T4/T6 ACK-probe response) → fill ack (A=O) + (0x14, False), # RST+ACK (T5/T7) → already A=S+, leave + (0x12, False), # SYN+ACK +]) +def test_rst_needs_ack(flags, expected): + assert _rst_needs_ack(flags) is expected + + # ── probe classification ──────────────────────────────────────────────────── OPEN = frozenset({22, 80, 443}) @@ -125,6 +134,14 @@ def test_classify_ignores_normal_traffic(): # ── reply field shaping ───────────────────────────────────────────────────── -def test_reply_fields_windows_shape(): - f = build_reply_fields(probe_seq=0xDEAD) +def test_reply_fields_t2_ack_equals_probe_seq(): + # T2: A=S (ack == probe seq) + f = build_reply_fields(0xDEAD, ProbeKind.T2) assert f == {"seq": 0, "ack": 0xDEAD, "flags": "RA", "window": 0, "df": True} + + +def test_reply_fields_t3_ack_is_other(): + # T3: A=O (other — not zero, not the probe seq) + f = build_reply_fields(0xDEAD, ProbeKind.T3) + assert f["ack"] not in (0, 0xDEAD) + assert f["seq"] == 0 and f["flags"] == "RA"