feat(cloak): broaden mangler to RST/ICMP + L2 responder injection

Mangler now also rewrites egress RST (IP-ID + nonzero ack on bare RSTs → nmap
CI, T4/T6 A=O) and ICMP echo-reply (code=0 → IE.CD=Z), sharing one IP-ID counter
across SYN-ACK/RST/ICMP (reads as a shared sequence, SS=S). Responder injects at
L2 (reflecting probe MACs) so its own RST replies bypass the OUTPUT/NFQUEUE chain
— otherwise the new RST rule re-processed and dropped them. T3 reply ack now A=O.

Live: windows_server decky reads Microsoft Windows Server 2012 (94%, up from 89%);
T2/T3 R=Y, IE.CD=Z, T4/T6 A=O all confirmed coexisting.
This commit is contained in:
2026-06-20 00:35:51 -04:00
parent 65d33bc611
commit 4798a9eb9c
3 changed files with 100 additions and 33 deletions

View File

@@ -44,12 +44,14 @@ def classify_probe(flags: int, dport: int, open_ports: frozenset[int]) -> ProbeK
return None
def build_reply_fields(probe_seq: int) -> dict[str, Any]:
"""Windows T2/T3 reply fields: seq 0, ack=probe seq, RST+ACK, window 0.
def build_reply_fields(probe_seq: int, kind: ProbeKind) -> dict[str, Any]:
"""Windows T2/T3 reply fields: seq 0, RST+ACK, window 0, DF=1.
(nmap T2/T3 for Windows: S=Z, A=S, F=AR, W=0, DF=1.)
ack differs by probe (nmap): T2 A=S (ack == probe seq); T3 A=O (other — we
use probe seq + 1 so it reads as 'other', never zero or the probe seq).
"""
return {"seq": 0, "ack": probe_seq, "flags": "RA", "window": 0, "df": True}
ack = probe_seq if kind is ProbeKind.T2 else (probe_seq + 1) & 0xFFFFFFFF
return {"seq": 0, "ack": ack, "flags": "RA", "window": 0, "df": True}
def run(nmap_os: str, open_ports: frozenset[int], decky_ip: str | None = None) -> int:
@@ -59,25 +61,29 @@ def run(nmap_os: str, open_ports: frozenset[int], decky_ip: str | None = None) -
log.info("cloak.responder: nothing to do for %r", nmap_os)
return 0
from scapy.all import IP, TCP, send, sniff # type: ignore
from scapy.all import IP, TCP, Ether, sendp, sniff # type: ignore
ip = decky_ip or os.environ.get("DECKY_IP", "")
ipid = [0x0800]
def _on(pkt: Any) -> None:
if not pkt.haslayer(TCP):
if not pkt.haslayer(TCP) or not pkt.haslayer(Ether):
return
kind = classify_probe(int(pkt[TCP].flags), int(pkt[TCP].dport), open_ports)
if kind is None:
return
f = build_reply_fields(int(pkt[TCP].seq))
f = build_reply_fields(int(pkt[TCP].seq), kind)
ipid[0] = (ipid[0] + 1) & 0xFFFF
# Inject at L2 (reflecting the probe's MACs) so the reply BYPASSES the
# OUTPUT chain — otherwise the mangler's RST rule would re-process and
# drop our own RST. The reply is already in final Windows shape.
reply = (
IP(src=pkt[IP].dst, dst=pkt[IP].src, id=ipid[0], flags="DF", ttl=128)
Ether(src=pkt[Ether].dst, dst=pkt[Ether].src)
/ IP(src=pkt[IP].dst, dst=pkt[IP].src, id=ipid[0], flags="DF", ttl=128)
/ TCP(sport=int(pkt[TCP].dport), dport=int(pkt[TCP].sport),
seq=f["seq"], ack=f["ack"], flags=f["flags"], window=f["window"])
)
send(reply, verbose=0)
sendp(reply, iface=pkt.sniffed_on, verbose=0)
bpf = f"tcp and dst host {ip}" if ip else "tcp"
log.info("cloak.responder: answering T2/T3 on %d ports (filter=%r)",