feat(cloak): egress SYN-ACK mangler + T2/T3 probe-response synthesizer

In-decky-netns NFQUEUE rewriter (window/option-order/IP-ID) and raw-socket
synthesizer for nmap probes Linux drops but the target OS answers (T2/T3),
driven by os_fingerprint.OS_MANGLE. Packet-shaping logic is pure and unit-tested
offline; scapy/netfilterqueue import lazily in the runtime loops. Entry:
python -m decnet.cloak (run by the base container; CAP_NET_ADMIN).
This commit is contained in:
2026-06-19 21:32:50 -04:00
parent 082d3fec19
commit f715ac6bcd
6 changed files with 430 additions and 0 deletions

130
tests/cloak/test_cloak.py Normal file
View File

@@ -0,0 +1,130 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Tests for the cloak mangler/responder PURE logic — option layout, IP-ID policy,
probe classification, reply fields. No scapy, root, or live NFQUEUE involved
(the runtime loops are exercised only on real deckies, not in CI).
"""
from __future__ import annotations
import pytest
from decnet.cloak import (
ProbeKind,
build_reply_fields,
build_synack_options,
classify_probe,
next_ipid,
)
from decnet.cloak.mangler import _is_synack
from decnet.os_fingerprint import OS_MANGLE, MangleProfile, get_os_mangle
WIN = OS_MANGLE["windows"]
SRV = OS_MANGLE["windows_server"]
# ── profile wiring ──────────────────────────────────────────────────────────
def test_get_os_mangle_known():
assert isinstance(get_os_mangle("windows"), MangleProfile)
assert get_os_mangle("windows_server").ipid == "random"
def test_get_os_mangle_none_for_linux():
assert get_os_mangle("linux") is None
assert get_os_mangle("nonexistent") is None
def test_windows_workstation_ipid_is_incr():
# Win10 workstation = incremental IP-ID (nmap TI=I); server = randomized (RD).
assert WIN.ipid == "incr"
assert SRV.ipid == "random"
# ── SYN-ACK option building ─────────────────────────────────────────────────
def test_options_layout_with_timestamp_preserved():
orig = [("MSS", 1460), ("SAckOK", b""), ("Timestamp", (111, 222)),
("NOP", None), ("WScale", 7)]
out = build_synack_options(orig, WIN)
names = [n for n, _ in out]
assert names == ["MSS", "NOP", "WScale", "SAckOK", "Timestamp"]
# the kernel's live timestamp value must survive (SEQ.TS rate test)
assert ("Timestamp", (111, 222)) in out
# our chosen mss/wscale override whatever the kernel emitted
assert ("MSS", WIN.mss) in out
assert ("WScale", WIN.wscale) in out
def test_options_drop_timestamp_when_kernel_had_none():
"""If timestamps are off (no kernel TS option), emit none — never a fake one."""
orig = [("MSS", 1460), ("SAckOK", b""), ("NOP", None), ("WScale", 7)]
out = build_synack_options(orig, WIN)
assert all(n != "Timestamp" for n, _ in out)
def test_options_length_is_4byte_aligned():
"""Sanity: the windows option layout encodes to a multiple of 4 bytes."""
from scapy.all import TCP # type: ignore # noqa
pytest.importorskip("scapy")
orig = [("MSS", 1460), ("Timestamp", (1, 2))]
out = build_synack_options(orig, WIN)
raw = bytes(TCP(options=out))[20:] # options after the 20-byte base header
assert len(raw) % 4 == 0
# ── IP-ID policy ────────────────────────────────────────────────────────────
def test_next_ipid_incr_wraps():
assert next_ipid(5, "incr") == 6
assert next_ipid(0xFFFF, "incr") == 0
def test_next_ipid_random_in_range_nonzero():
for _ in range(50):
v = next_ipid(0, "random")
assert 1 <= v <= 0xFFFF
def test_next_ipid_keep_sentinel():
assert next_ipid(123, "keep") == -1
# ── SYN-ACK detection ───────────────────────────────────────────────────────
@pytest.mark.parametrize("flags,expected", [
(0x12, True), # SYN+ACK
(0x52, True), # SYN+ACK+ECE (ECN SYN-ACK)
(0x02, False), # bare SYN
(0x10, False), # bare ACK
])
def test_is_synack(flags, expected):
assert _is_synack(flags) is expected
# ── probe classification ────────────────────────────────────────────────────
OPEN = frozenset({22, 80, 443})
def test_classify_t2_null_flags_open_port():
assert classify_probe(0x00, 80, OPEN) is ProbeKind.T2
def test_classify_t3_synfinpshurg_open_port():
assert classify_probe(0x2B, 80, OPEN) is ProbeKind.T3
def test_classify_ignores_closed_port():
assert classify_probe(0x00, 9999, OPEN) is None
def test_classify_ignores_normal_traffic():
assert classify_probe(0x02, 80, OPEN) is None # SYN — real stack handles
assert classify_probe(0x10, 80, OPEN) is None # ACK
# ── reply field shaping ─────────────────────────────────────────────────────
def test_reply_fields_windows_shape():
f = build_reply_fields(probe_seq=0xDEAD)
assert f == {"seq": 0, "ack": 0xDEAD, "flags": "RA", "window": 0, "df": True}