Mangler now also rewrites egress RST (IP-ID + nonzero ack on bare RSTs → nmap CI, T4/T6 A=O) and ICMP echo-reply (code=0 → IE.CD=Z), sharing one IP-ID counter across SYN-ACK/RST/ICMP (reads as a shared sequence, SS=S). Responder injects at L2 (reflecting probe MACs) so its own RST replies bypass the OUTPUT/NFQUEUE chain — otherwise the new RST rule re-processed and dropped them. T3 reply ack now A=O. Live: windows_server decky reads Microsoft Windows Server 2012 (94%, up from 89%); T2/T3 R=Y, IE.CD=Z, T4/T6 A=O all confirmed coexisting.
93 lines
3.3 KiB
Python
93 lines
3.3 KiB
Python
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
"""
|
|
Probe-response synthesizer — answers the nmap probes the Linux kernel drops.
|
|
|
|
nmap's T2 (null-flags) and T3 (SYN+FIN+PSH+URG) to an OPEN port get no reply
|
|
from Linux (R=N), but Windows replies RST+ACK. We sniff the probe and inject the
|
|
target-OS-shaped reply ourselves; the kernel stays silent, so nothing races us.
|
|
|
|
Pure classification/reply logic is separated from the scapy sniff/send loop so it
|
|
is unit-testable without root or a live capture.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import enum
|
|
import os
|
|
from typing import Any
|
|
|
|
from decnet.logging import get_logger
|
|
from decnet.os_fingerprint import get_os_mangle
|
|
|
|
log = get_logger("cloak.responder")
|
|
|
|
_NULL = 0x00
|
|
_T3 = 0x2B # SYN+FIN+PSH+URG
|
|
|
|
|
|
class ProbeKind(enum.Enum):
|
|
T2 = "T2"
|
|
T3 = "T3"
|
|
|
|
|
|
def classify_probe(flags: int, dport: int, open_ports: frozenset[int]) -> ProbeKind | None:
|
|
"""Identify an nmap T2/T3 probe by flag combo + open destination port.
|
|
|
|
Returns None for anything else (legit traffic, probes to closed ports, and
|
|
T1/T4-T7 which the real stack already answers).
|
|
"""
|
|
if dport not in open_ports:
|
|
return None
|
|
if flags == _NULL:
|
|
return ProbeKind.T2
|
|
if flags == _T3:
|
|
return ProbeKind.T3
|
|
return None
|
|
|
|
|
|
def build_reply_fields(probe_seq: int, kind: ProbeKind) -> dict[str, Any]:
|
|
"""Windows T2/T3 reply fields: seq 0, RST+ACK, window 0, DF=1.
|
|
|
|
ack differs by probe (nmap): T2 A=S (ack == probe seq); T3 A=O (other — we
|
|
use probe seq + 1 so it reads as 'other', never zero or the probe seq).
|
|
"""
|
|
ack = probe_seq if kind is ProbeKind.T2 else (probe_seq + 1) & 0xFFFFFFFF
|
|
return {"seq": 0, "ack": ack, "flags": "RA", "window": 0, "df": True}
|
|
|
|
|
|
def run(nmap_os: str, open_ports: frozenset[int], decky_ip: str | None = None) -> int:
|
|
"""Sniff for T2/T3 probes to *open_ports* and inject Windows-shaped replies."""
|
|
profile = get_os_mangle(nmap_os)
|
|
if profile is None or not profile.respond_t2t3:
|
|
log.info("cloak.responder: nothing to do for %r", nmap_os)
|
|
return 0
|
|
|
|
from scapy.all import IP, TCP, Ether, sendp, sniff # type: ignore
|
|
|
|
ip = decky_ip or os.environ.get("DECKY_IP", "")
|
|
ipid = [0x0800]
|
|
|
|
def _on(pkt: Any) -> None:
|
|
if not pkt.haslayer(TCP) or not pkt.haslayer(Ether):
|
|
return
|
|
kind = classify_probe(int(pkt[TCP].flags), int(pkt[TCP].dport), open_ports)
|
|
if kind is None:
|
|
return
|
|
f = build_reply_fields(int(pkt[TCP].seq), kind)
|
|
ipid[0] = (ipid[0] + 1) & 0xFFFF
|
|
# Inject at L2 (reflecting the probe's MACs) so the reply BYPASSES the
|
|
# OUTPUT chain — otherwise the mangler's RST rule would re-process and
|
|
# drop our own RST. The reply is already in final Windows shape.
|
|
reply = (
|
|
Ether(src=pkt[Ether].dst, dst=pkt[Ether].src)
|
|
/ IP(src=pkt[IP].dst, dst=pkt[IP].src, id=ipid[0], flags="DF", ttl=128)
|
|
/ TCP(sport=int(pkt[TCP].dport), dport=int(pkt[TCP].sport),
|
|
seq=f["seq"], ack=f["ack"], flags=f["flags"], window=f["window"])
|
|
)
|
|
sendp(reply, iface=pkt.sniffed_on, verbose=0)
|
|
|
|
bpf = f"tcp and dst host {ip}" if ip else "tcp"
|
|
log.info("cloak.responder: answering T2/T3 on %d ports (filter=%r)",
|
|
len(open_ports), bpf)
|
|
sniff(filter=bpf, prn=_on, store=0)
|
|
return 0
|