Merge feat/os-fingerprint-cloak: cloak egress fingerprint masquerading (v1.2.1)

OS fingerprint cloak — sysctl profile fixes (Win timestamps), windows_server
slug, NFQUEUE SYN-ACK mangler + T2/T3 probe-response synthesizer, wired into the
deploy path. Flips real nmap -O to Microsoft Windows / Windows Server.
This commit is contained in:
2026-06-20 00:24:32 -04:00
15 changed files with 770 additions and 14 deletions

3
.gitignore vendored
View File

@@ -85,3 +85,6 @@ testfail
# Internal design/dev notes — not for publication
/development/
decnet.tar
# cloak base-image build context: decnet subtree synced in at deploy time
decnet/templates/_shared/cloak/decnet/

View File

@@ -47,7 +47,7 @@ ARCHETYPES: dict[str, Archetype] = {
description="Windows domain member: SMB, RDP, and LDAP directory",
services=["smb", "rdp", "ldap"],
preferred_distros=["debian", "ubuntu22"],
nmap_os="windows",
nmap_os="windows_server",
),
"domain-controller": Archetype(
slug="domain-controller",
@@ -55,7 +55,7 @@ ARCHETYPES: dict[str, Archetype] = {
description="Active Directory DC: LDAP, SMB, RDP, LLMNR",
services=["ldap", "smb", "rdp", "llmnr"],
preferred_distros=["debian", "ubuntu22"],
nmap_os="windows",
nmap_os="windows_server",
),
"linux-server": Archetype(
slug="linux-server",

30
decnet/cloak/__init__.py Normal file
View File

@@ -0,0 +1,30 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
DECNET cloak — egress TCP/IP fingerprint masquerading for deckies.
sysctls (decnet/os_fingerprint.py) own GLOBAL packet fields. The cloak owns the
SYN-ACK *shape* and stack *behaviours* sysctl can't reach, so a decky reads as
its claimed nmap_os under active fingerprinting (nmap -O):
- mangler : NFQUEUE rewrite of egress SYN-ACK (window, TCP option order,
IP-ID generation) to match the MangleProfile for the slug.
- responder : raw-socket synthesis of replies to probes the Linux kernel
drops but the target OS answers (nmap T2/T3).
Both run INSIDE the decky's network namespace (CAP_NET_ADMIN), launched by the
base container — never a sidecar (that would double container count per decky).
Driven by os_fingerprint.get_os_mangle(nmap_os); a slug with no profile is a
no-op (the real Linux stack already approximates it).
"""
from __future__ import annotations
from decnet.cloak.mangler import build_synack_options, next_ipid
from decnet.cloak.responder import ProbeKind, build_reply_fields, classify_probe
__all__ = [
"build_synack_options",
"next_ipid",
"classify_probe",
"build_reply_fields",
"ProbeKind",
]

51
decnet/cloak/__main__.py Normal file
View File

@@ -0,0 +1,51 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Cloak entrypoint — run inside the decky base container (CAP_NET_ADMIN).
python -m decnet.cloak
Config via env (set by the composer when nmap_os has a mangle profile):
DECNET_NMAP_OS nmap_os slug (e.g. "windows", "windows_server")
DECNET_OPEN_PORTS comma-separated TCP ports the decky serves (for T2/T3)
DECKY_IP this decky's IP (BPF scope for the responder)
Starts the mangler and responder, each in its own thread. A slug with no mangle
profile exits 0 immediately — harmless to launch unconditionally.
"""
from __future__ import annotations
import os
import threading
from decnet.cloak import mangler, responder
from decnet.logging import get_logger
from decnet.os_fingerprint import get_os_mangle
log = get_logger("cloak")
def _open_ports() -> frozenset[int]:
raw = os.environ.get("DECNET_OPEN_PORTS", "")
return frozenset(int(p) for p in raw.split(",") if p.strip().isdigit())
def main() -> int:
nmap_os = os.environ.get("DECNET_NMAP_OS", "linux")
if get_os_mangle(nmap_os) is None:
log.info("cloak: no mangle profile for %r — exiting", nmap_os)
return 0
# Responder runs in a daemon thread; the mangler runs in the MAIN thread so
# its SIGTERM/SIGINT iptables-teardown handlers can be installed (signal only
# works in the main thread).
threading.Thread(
target=responder.run, args=(nmap_os, _open_ports()),
name="cloak-responder", daemon=True,
).start()
log.info("cloak: started for nmap_os=%r", nmap_os)
mangler.run(nmap_os)
return 0
if __name__ == "__main__":
raise SystemExit(main())

136
decnet/cloak/mangler.py Normal file
View File

@@ -0,0 +1,136 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Egress SYN-ACK mangler — rewrites the TCP/IP option shape sysctl can't reach.
Split so the packet-shaping logic is pure and unit-testable without scapy, root,
or a live NFQUEUE:
- build_synack_options() / next_ipid() : pure, tested offline.
- _rewrite() : mutates a scapy packet (lazy import).
- run() : the NFQUEUE loop (needs CAP_NET_ADMIN).
scapy/netfilterqueue are imported lazily inside the runtime functions, mirroring
decnet/prober/tcpfp.py, so importing this module is cheap and side-effect-free.
"""
from __future__ import annotations
import os
import signal
import subprocess # nosec B404 — fixed-arg iptables, no shell
import sys
import threading
from typing import Any
from decnet.logging import get_logger
from decnet.os_fingerprint import MangleProfile, get_os_mangle
log = get_logger("cloak.mangler")
_QUEUE = 0
# Queue every egress packet carrying SYN (covers SYN-ACK incl. ECN/CWR variants);
# --queue-bypass means a dead handler never blackholes the decky.
_RULE = [
"OUTPUT", "-p", "tcp", "--tcp-flags", "SYN", "SYN",
"-j", "NFQUEUE", "--queue-num", str(_QUEUE), "--queue-bypass",
]
def next_ipid(prev: int, mode: str) -> int:
"""Next IP-ID for *mode*: 'incr' (TI=I), 'random' (TI=RD), 'keep' (unchanged).
'keep' returns -1 as a sentinel meaning "do not touch the kernel's value".
"""
if mode == "incr":
return (prev + 1) & 0xFFFF
if mode == "random":
# Not for security — only to read as randomized to nmap (TI=RD).
return int.from_bytes(os.urandom(2), "big") or 1
return -1
def build_synack_options(
orig_options: list[tuple[str, Any]], profile: MangleProfile
) -> list[tuple[str, Any]]:
"""Build the SYN-ACK TCP option list for *profile*, preserving the kernel's
live Timestamp value (so nmap's SEQ.TS increment-rate test still passes).
*orig_options* is a scapy-style ``[(name, value), ...]`` list.
"""
ts = next((v for n, v in orig_options if n == "Timestamp"), None)
out: list[tuple[str, Any]] = []
for code in profile.option_order:
if code == "MSS":
out.append(("MSS", profile.mss))
elif code == "WScale":
out.append(("WScale", profile.wscale))
elif code == "SAckOK":
out.append(("SAckOK", b""))
elif code == "NOP":
out.append(("NOP", None))
elif code == "TS":
if ts is not None: # only if sysctl kept timestamps on
out.append(("Timestamp", ts))
return out
def _is_synack(flags: int) -> bool:
return bool(flags & 0x02) and bool(flags & 0x10) # SYN & ACK
def _iptables(action: str) -> None:
subprocess.run(["iptables", action, *_RULE], check=True) # nosec B603 B607
def run(nmap_os: str) -> int:
"""Install the NFQUEUE rule and rewrite egress SYN-ACK for *nmap_os*."""
profile = get_os_mangle(nmap_os)
if profile is None:
log.info("cloak.mangler: no profile for %r — nothing to do", nmap_os)
return 0
from netfilterqueue import NetfilterQueue # type: ignore
from scapy.all import IP, TCP # type: ignore
ipid = [0x0400]
def _rewrite(pkt: Any) -> None:
try:
p = IP(pkt.get_payload())
if p.haslayer(TCP) and _is_synack(int(p[TCP].flags)):
p[TCP].window = profile.window
p[TCP].options = build_synack_options(p[TCP].options, profile)
nid = next_ipid(ipid[0], profile.ipid)
if nid >= 0:
ipid[0] = nid
p[IP].id = nid
# options length changed → dataofs MUST be recomputed, else the
# kernel emits a malformed segment that breaks real connections.
del p[IP].chksum, p[TCP].chksum, p[IP].len, p[TCP].dataofs
pkt.set_payload(bytes(p))
except Exception: # nosec B110 — never drop a packet on a rewrite bug
log.exception("cloak.mangler: rewrite failed; passing packet through")
pkt.accept()
_iptables("-A")
nfq = NetfilterQueue()
nfq.bind(_QUEUE, _rewrite)
def _cleanup(*_: Any) -> None:
try:
_iptables("-D")
finally:
sys.exit(0)
# signal.signal() only works in the main thread; the `finally` below still
# removes the rule on a normal exit, and on container stop the netns (and
# its iptables rules) are torn down regardless.
if threading.current_thread() is threading.main_thread():
signal.signal(signal.SIGTERM, _cleanup)
signal.signal(signal.SIGINT, _cleanup)
log.info("cloak.mangler: rewriting SYN-ACK -> %s (window=%#x ipid=%s)",
nmap_os, profile.window, profile.ipid)
try:
nfq.run()
finally:
_iptables("-D")
return 0

86
decnet/cloak/responder.py Normal file
View File

@@ -0,0 +1,86 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Probe-response synthesizer — answers the nmap probes the Linux kernel drops.
nmap's T2 (null-flags) and T3 (SYN+FIN+PSH+URG) to an OPEN port get no reply
from Linux (R=N), but Windows replies RST+ACK. We sniff the probe and inject the
target-OS-shaped reply ourselves; the kernel stays silent, so nothing races us.
Pure classification/reply logic is separated from the scapy sniff/send loop so it
is unit-testable without root or a live capture.
"""
from __future__ import annotations
import enum
import os
from typing import Any
from decnet.logging import get_logger
from decnet.os_fingerprint import get_os_mangle
log = get_logger("cloak.responder")
_NULL = 0x00
_T3 = 0x2B # SYN+FIN+PSH+URG
class ProbeKind(enum.Enum):
T2 = "T2"
T3 = "T3"
def classify_probe(flags: int, dport: int, open_ports: frozenset[int]) -> ProbeKind | None:
"""Identify an nmap T2/T3 probe by flag combo + open destination port.
Returns None for anything else (legit traffic, probes to closed ports, and
T1/T4-T7 which the real stack already answers).
"""
if dport not in open_ports:
return None
if flags == _NULL:
return ProbeKind.T2
if flags == _T3:
return ProbeKind.T3
return None
def build_reply_fields(probe_seq: int) -> dict[str, Any]:
"""Windows T2/T3 reply fields: seq 0, ack=probe seq, RST+ACK, window 0.
(nmap T2/T3 for Windows: S=Z, A=S, F=AR, W=0, DF=1.)
"""
return {"seq": 0, "ack": probe_seq, "flags": "RA", "window": 0, "df": True}
def run(nmap_os: str, open_ports: frozenset[int], decky_ip: str | None = None) -> int:
"""Sniff for T2/T3 probes to *open_ports* and inject Windows-shaped replies."""
profile = get_os_mangle(nmap_os)
if profile is None or not profile.respond_t2t3:
log.info("cloak.responder: nothing to do for %r", nmap_os)
return 0
from scapy.all import IP, TCP, send, sniff # type: ignore
ip = decky_ip or os.environ.get("DECKY_IP", "")
ipid = [0x0800]
def _on(pkt: Any) -> None:
if not pkt.haslayer(TCP):
return
kind = classify_probe(int(pkt[TCP].flags), int(pkt[TCP].dport), open_ports)
if kind is None:
return
f = build_reply_fields(int(pkt[TCP].seq))
ipid[0] = (ipid[0] + 1) & 0xFFFF
reply = (
IP(src=pkt[IP].dst, dst=pkt[IP].src, id=ipid[0], flags="DF", ttl=128)
/ TCP(sport=int(pkt[TCP].dport), dport=int(pkt[TCP].sport),
seq=f["seq"], ack=f["ack"], flags=f["flags"], window=f["window"])
)
send(reply, verbose=0)
bpf = f"tcp and dst host {ip}" if ip else "tcp"
log.info("cloak.responder: answering T2/T3 on %d ports (filter=%r)",
len(open_ports), bpf)
sniff(filter=bpf, prn=_on, store=0)
return 0

View File

@@ -21,7 +21,7 @@ import yaml
from decnet.config import DecnetConfig
from decnet.network import MACVLAN_NETWORK_NAME
from decnet.os_fingerprint import get_os_sysctls
from decnet.os_fingerprint import get_os_mangle, get_os_sysctls
from decnet.services.registry import get_service
_DOCKER_LOGGING = {
@@ -32,6 +32,26 @@ _DOCKER_LOGGING = {
},
}
# Build context for the cloak base image (decnet subtree synced in by
# deployer._sync_cloak_sources before build).
_CLOAK_CONTEXT = Path(__file__).parent / "templates" / "_shared" / "cloak"
# Netns-safe: run the cloak best-effort in the background, but keep `sleep
# infinity` as PID 1 in the foreground so a cloak crash never tears down the
# base container (and with it the netns every service container shares).
_CLOAK_COMMAND = ["sh", "-c", "python3 -m decnet.cloak & exec sleep infinity"]
def _decky_open_tcp_ports(services: list[str]) -> list[int]:
"""Sorted, de-duped TCP ports a decky's services listen on (for the cloak
responder's T2/T3 classification — DECNET_OPEN_PORTS)."""
ports: set[int] = set()
for svc_name in services:
svc = get_service(svc_name)
if svc is not None:
ports.update(svc.ports)
return sorted(ports)
def generate_compose(config: DecnetConfig) -> dict:
"""Build and return the full docker-compose data structure."""
@@ -60,6 +80,25 @@ def generate_compose(config: DecnetConfig) -> dict:
base["sysctls"] = get_os_sysctls(decky.nmap_os)
base["cap_add"] = ["NET_ADMIN"]
# sysctls reach only global packet fields. nmap_os families with an
# egress mangle profile (windows*) additionally run the cloak in the
# base container to rewrite SYN-ACK shape + synthesize T2/T3 replies, so
# they read as the claimed OS under active fingerprinting (nmap -O).
if get_os_mangle(decky.nmap_os) is not None:
base.pop("image", None)
base["build"] = {
"context": str(_CLOAK_CONTEXT),
"args": {"BASE_IMAGE": decky.build_base},
}
base["command"] = _CLOAK_COMMAND
base["cap_add"] = ["NET_ADMIN", "NET_RAW"] # NET_RAW: responder send/sniff
ports = _decky_open_tcp_ports(decky.services)
base["environment"] = {
"DECNET_NMAP_OS": decky.nmap_os,
"DECNET_OPEN_PORTS": ",".join(str(p) for p in ports),
"DECKY_IP": decky.ip,
}
services[base_key] = base
# --- Service containers: share base network namespace ---

View File

@@ -65,6 +65,20 @@ _CANONICAL_NTLMSSP = Path(__file__).parent.parent / "templates" / "_shared" / "n
_NTLMSSP_SERVICES = {"smb", "rdp"}
_CANONICAL_CADDY_MODULES_DIR = Path(__file__).parent.parent / "templates" / "_caddy_modules"
_CADDY_SERVICES = {"http", "https"}
# Cloak base image: the decnet package root + the 8 light files shipped into the
# cloak build context so `python -m decnet.cloak` runs in the base container.
_DECNET_SRC = Path(__file__).parent.parent
_CANONICAL_CLOAK_DIR = _DECNET_SRC / "templates" / "_shared" / "cloak"
_CLOAK_SHIP_FILES = (
"__init__.py",
"config_ini.py",
"logging/__init__.py",
"os_fingerprint.py",
"cloak/__init__.py",
"cloak/__main__.py",
"cloak/mangler.py",
"cloak/responder.py",
)
def _sync_logging_helper(config: DecnetConfig) -> None:
@@ -87,6 +101,26 @@ def _sync_logging_helper(config: DecnetConfig) -> None:
shutil.copy2(src, dest)
def _sync_cloak_sources(config: DecnetConfig) -> None:
"""Ship the light decnet subtree into the cloak base-image build context.
Only when at least one decky has an egress mangle profile (windows*). Copies
the 8 files in _CLOAK_SHIP_FILES into <cloak ctx>/decnet/ preserving package
structure so the image's `python -m decnet.cloak` resolves. The dest tree is
gitignored. Mirrors the _sync_*_sources copy-if-changed idiom.
"""
from decnet.os_fingerprint import get_os_mangle
if not any(get_os_mangle(d.nmap_os) is not None for d in config.deckies):
return
dest_root = _CANONICAL_CLOAK_DIR / "decnet"
for rel in _CLOAK_SHIP_FILES:
src = _DECNET_SRC / rel
dest = dest_root / rel
dest.parent.mkdir(parents=True, exist_ok=True)
if not dest.exists() or dest.read_bytes() != src.read_bytes():
shutil.copy2(src, dest)
def _sync_auth_helper_sources(config: DecnetConfig) -> None:
"""Copy auth-helper.c into SSH/Telnet build contexts as auth-helper/.
@@ -679,6 +713,7 @@ def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False,
_sync_auth_helper_sources(config)
_sync_ntlmssp_sources(config)
_sync_caddy_modules(config)
_sync_cloak_sources(config)
compose_path = write_compose(config, COMPOSE_FILE)
console.print(f"[bold cyan]Compose file written[/] → {compose_path}")

View File

@@ -35,6 +35,8 @@ Windows (64240) from the kernel's default tcp_rmem settings.
from __future__ import annotations
from dataclasses import dataclass
OS_SYSCTLS: dict[str, dict[str, str]] = {
"linux": {
"net.ipv4.ip_default_ttl": "64",
@@ -49,9 +51,12 @@ OS_SYSCTLS: dict[str, dict[str, str]] = {
"net.ipv4.icmp_ratemask": "6168",
},
"windows": {
# Windows 10/11 workstation. NOTE: modern Windows runs TCP timestamps
# ON (nmap SEQ.TS=A) — an earlier value of 0 here fingerprinted as an
# ancient Windows/Linux stack. ECN off → nmap ECN.CC=N (workstation).
"net.ipv4.ip_default_ttl": "128",
"net.ipv4.tcp_syn_retries": "2",
"net.ipv4.tcp_timestamps": "0",
"net.ipv4.tcp_timestamps": "1",
"net.ipv4.tcp_window_scaling": "1",
"net.ipv4.tcp_sack": "1",
"net.ipv4.tcp_ecn": "0",
@@ -60,6 +65,22 @@ OS_SYSCTLS: dict[str, dict[str, str]] = {
"net.ipv4.icmp_ratelimit": "0",
"net.ipv4.icmp_ratemask": "0",
},
"windows_server": {
# Windows Server 2016/2019. Same NT stack as the workstation; the only
# stack-visible deltas nmap reads are ECN negotiated (CC=Y → tcp_ecn=1)
# and randomized IP-ID (SEQ.TI=RD, applied by the cloak mangler, not a
# sysctl). Everything else == "windows".
"net.ipv4.ip_default_ttl": "128",
"net.ipv4.tcp_syn_retries": "2",
"net.ipv4.tcp_timestamps": "1",
"net.ipv4.tcp_window_scaling": "1",
"net.ipv4.tcp_sack": "1",
"net.ipv4.tcp_ecn": "1",
"net.ipv4.ip_no_pmtu_disc": "0",
"net.ipv4.tcp_fin_timeout": "30",
"net.ipv4.icmp_ratelimit": "0",
"net.ipv4.icmp_ratemask": "0",
},
"bsd": {
"net.ipv4.ip_default_ttl": "64",
"net.ipv4.tcp_syn_retries": "6",
@@ -112,3 +133,47 @@ def all_os_families() -> list[str]:
"""Return all registered nmap OS family slugs."""
return list(OS_SYSCTLS.keys())
# ─── Egress mangle profiles (cloak) ──────────────────────────────────────────
#
# sysctls above reach only GLOBAL fields (TTL, timestamps on/off, ECN). The
# SYN-ACK *shape* nmap also scores — exact window, TCP option order, IP-ID
# generation — cannot be set per-container by sysctl. The cloak mangler
# (decnet/cloak) rewrites those on egress, driven by these profiles, keyed by
# the SAME nmap_os slug. A slug ABSENT here needs no mangling (its real Linux
# stack already approximates the target, e.g. "linux"/"bsd").
@dataclass(frozen=True)
class MangleProfile:
"""How the cloak rewrites a decky's egress to match an nmap_os family."""
window: int # TCP advertised window on SYN-ACK
mss: int # MSS option value
wscale: int # window-scale shift
# Ordered TCP option layout to emit on SYN-ACK. "TS" is kept only if the
# kernel emitted a Timestamp (sysctl tcp_timestamps=1) so its live,
# incrementing value survives the rewrite (nmap SEQ.TS rate test).
option_order: tuple[str, ...]
ipid: str # "incr" (TI=I) | "random" (TI=RD) | "keep"
respond_t2t3: bool # synthesize Windows T2/T3 replies
_WIN_OPTS = ("MSS", "NOP", "WScale", "SAckOK", "TS")
OS_MANGLE: dict[str, MangleProfile] = {
"windows": MangleProfile(
window=0x2000, mss=1460, wscale=8,
option_order=_WIN_OPTS, ipid="incr", respond_t2t3=True,
),
"windows_server": MangleProfile(
window=0x2000, mss=1460, wscale=8,
option_order=_WIN_OPTS, ipid="random", respond_t2t3=True,
),
}
def get_os_mangle(nmap_os: str) -> MangleProfile | None:
"""Return the cloak mangle profile for *nmap_os*, or None if it needs none."""
return OS_MANGLE.get(nmap_os)

View File

@@ -0,0 +1,32 @@
# Cloak base image — the IP-holder/netns container for deckies whose nmap_os has
# an egress mangle profile (windows, windows_server). Runs `python -m decnet.cloak`
# (SYN-ACK mangler + T2/T3 responder) alongside holding the MACVLAN IP.
#
# FROM the per-decky distro so the base still varies by distro (BASE_IMAGE arg,
# set by the composer from decky.build_base — same pattern as service images).
# The decnet/ subtree is synced into this context by deployer._sync_cloak_sources
# before build (8 light, stdlib-only files; scapy/netfilterqueue are pip'd here).
ARG BASE_IMAGE=debian:bookworm-slim
FROM ${BASE_IMAGE}
# Runtime: iptables (NFQUEUE rules), python3, libpcap (scapy BPF sniff in the
# responder). Build-only: gcc + headers for the netfilterqueue C extension,
# purged after the wheel is built to keep the image lean.
RUN apt-get update && apt-get install -y --no-install-recommends \
python3 python3-pip iptables libpcap0.8 \
libnetfilter-queue1 libnfnetlink0 \
gcc python3-dev libnetfilter-queue-dev libnfnetlink-dev \
&& pip3 install --no-cache-dir --break-system-packages \
"scapy>=2.6.1" "netfilterqueue>=1.1.0" \
&& apt-get purge -y gcc python3-dev libnetfilter-queue-dev libnfnetlink-dev \
&& apt-get autoremove -y \
&& rm -rf /var/lib/apt/lists/*
# Synced 8-file decnet subtree (decnet/__init__, config_ini, logging/, os_fingerprint,
# cloak/). PYTHONPATH=/opt makes `python3 -m decnet.cloak` importable.
COPY decnet/ /opt/decnet/
ENV PYTHONPATH=/opt
# The compose `command` drives runtime (netns-safe supervisor: cloak in background,
# sleep infinity in foreground so a cloak crash never tears down the netns holder).
CMD ["sleep", "infinity"]

View File

@@ -40,6 +40,9 @@ dependencies = [
# `alembic upgrade head` at boot for managed DBs (see db/migrate.py).
"alembic>=1.13",
"scapy>=2.6.1",
# cloak egress mangler (NFQUEUE); Linux-only, lazy-imported so absence on
# dev/CI/non-Linux is tolerated (decnet.cloak only needs it at run()).
"netfilterqueue>=1.1.0 ; sys_platform == 'linux'",
"orjson>=3.10",
"cryptography>=48.0.1",
"python-multipart>=0.0.31",

0
tests/cloak/__init__.py Normal file
View File

130
tests/cloak/test_cloak.py Normal file
View File

@@ -0,0 +1,130 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Tests for the cloak mangler/responder PURE logic — option layout, IP-ID policy,
probe classification, reply fields. No scapy, root, or live NFQUEUE involved
(the runtime loops are exercised only on real deckies, not in CI).
"""
from __future__ import annotations
import pytest
from decnet.cloak import (
ProbeKind,
build_reply_fields,
build_synack_options,
classify_probe,
next_ipid,
)
from decnet.cloak.mangler import _is_synack
from decnet.os_fingerprint import OS_MANGLE, MangleProfile, get_os_mangle
WIN = OS_MANGLE["windows"]
SRV = OS_MANGLE["windows_server"]
# ── profile wiring ──────────────────────────────────────────────────────────
def test_get_os_mangle_known():
assert isinstance(get_os_mangle("windows"), MangleProfile)
assert get_os_mangle("windows_server").ipid == "random"
def test_get_os_mangle_none_for_linux():
assert get_os_mangle("linux") is None
assert get_os_mangle("nonexistent") is None
def test_windows_workstation_ipid_is_incr():
# Win10 workstation = incremental IP-ID (nmap TI=I); server = randomized (RD).
assert WIN.ipid == "incr"
assert SRV.ipid == "random"
# ── SYN-ACK option building ─────────────────────────────────────────────────
def test_options_layout_with_timestamp_preserved():
orig = [("MSS", 1460), ("SAckOK", b""), ("Timestamp", (111, 222)),
("NOP", None), ("WScale", 7)]
out = build_synack_options(orig, WIN)
names = [n for n, _ in out]
assert names == ["MSS", "NOP", "WScale", "SAckOK", "Timestamp"]
# the kernel's live timestamp value must survive (SEQ.TS rate test)
assert ("Timestamp", (111, 222)) in out
# our chosen mss/wscale override whatever the kernel emitted
assert ("MSS", WIN.mss) in out
assert ("WScale", WIN.wscale) in out
def test_options_drop_timestamp_when_kernel_had_none():
"""If timestamps are off (no kernel TS option), emit none — never a fake one."""
orig = [("MSS", 1460), ("SAckOK", b""), ("NOP", None), ("WScale", 7)]
out = build_synack_options(orig, WIN)
assert all(n != "Timestamp" for n, _ in out)
def test_options_length_is_4byte_aligned():
"""Sanity: the windows option layout encodes to a multiple of 4 bytes."""
from scapy.all import TCP # type: ignore # noqa
pytest.importorskip("scapy")
orig = [("MSS", 1460), ("Timestamp", (1, 2))]
out = build_synack_options(orig, WIN)
raw = bytes(TCP(options=out))[20:] # options after the 20-byte base header
assert len(raw) % 4 == 0
# ── IP-ID policy ────────────────────────────────────────────────────────────
def test_next_ipid_incr_wraps():
assert next_ipid(5, "incr") == 6
assert next_ipid(0xFFFF, "incr") == 0
def test_next_ipid_random_in_range_nonzero():
for _ in range(50):
v = next_ipid(0, "random")
assert 1 <= v <= 0xFFFF
def test_next_ipid_keep_sentinel():
assert next_ipid(123, "keep") == -1
# ── SYN-ACK detection ───────────────────────────────────────────────────────
@pytest.mark.parametrize("flags,expected", [
(0x12, True), # SYN+ACK
(0x52, True), # SYN+ACK+ECE (ECN SYN-ACK)
(0x02, False), # bare SYN
(0x10, False), # bare ACK
])
def test_is_synack(flags, expected):
assert _is_synack(flags) is expected
# ── probe classification ────────────────────────────────────────────────────
OPEN = frozenset({22, 80, 443})
def test_classify_t2_null_flags_open_port():
assert classify_probe(0x00, 80, OPEN) is ProbeKind.T2
def test_classify_t3_synfinpshurg_open_port():
assert classify_probe(0x2B, 80, OPEN) is ProbeKind.T3
def test_classify_ignores_closed_port():
assert classify_probe(0x00, 9999, OPEN) is None
def test_classify_ignores_normal_traffic():
assert classify_probe(0x02, 80, OPEN) is None # SYN — real stack handles
assert classify_probe(0x10, 80, OPEN) is None # ACK
# ── reply field shaping ─────────────────────────────────────────────────────
def test_reply_fields_windows_shape():
f = build_reply_fields(probe_seq=0xDEAD)
assert f == {"seq": 0, "ack": 0xDEAD, "flags": "RA", "window": 0, "df": True}

View File

@@ -0,0 +1,130 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Tests for wiring the cloak into the deploy path:
- composer.py: windows* base containers get build+command+caps+env; non-mangled
bases stay byte-for-byte unchanged.
- composer._decky_open_tcp_ports: service-port enumeration.
- deployer._sync_cloak_sources: ships the decnet subtree only when needed.
"""
from __future__ import annotations
import pytest
from decnet.composer import _CLOAK_COMMAND, _decky_open_tcp_ports, generate_compose
from decnet.config import DeckyConfig, DecnetConfig
def _decky(nmap_os: str = "linux", services: list[str] | None = None) -> DeckyConfig:
return DeckyConfig(
name="decky-01",
ip="10.0.0.10",
services=services or ["ssh"],
distro="debian",
base_image="debian:bookworm-slim",
build_base="debian:bookworm-slim",
hostname="test-host",
nmap_os=nmap_os,
)
def _config(decky: DeckyConfig) -> DecnetConfig:
return DecnetConfig(
mode="unihost", interface="eth0", subnet="10.0.0.0/24",
gateway="10.0.0.1", deckies=[decky],
)
def _base(nmap_os: str, services: list[str] | None = None) -> dict:
return generate_compose(_config(_decky(nmap_os, services)))["services"]["decky-01"]
# ── port enumeration ────────────────────────────────────────────────────────
def test_open_ports_union_sorted_deduped():
# smb=[445,139], rdp=[3389]
assert _decky_open_tcp_ports(["smb", "rdp"]) == [139, 445, 3389]
def test_open_ports_single_and_multiport():
assert _decky_open_tcp_ports(["ssh"]) == [22]
assert _decky_open_tcp_ports(["imap"]) == [143, 993] # multi-port service
# ── non-mangled base is unchanged ───────────────────────────────────────────
def test_linux_base_uses_stock_image_and_sleep():
base = _base("linux")
assert base["image"] == "debian:bookworm-slim"
assert base["command"] == ["sleep", "infinity"]
assert "build" not in base
assert "environment" not in base
assert base["cap_add"] == ["NET_ADMIN"]
@pytest.mark.parametrize("fam", ["embedded", "bsd", "cisco"])
def test_other_families_not_cloaked(fam):
base = _base(fam)
assert "build" not in base
assert base["command"] == ["sleep", "infinity"]
assert "NET_RAW" not in base["cap_add"]
# ── windows* base gets the cloak ────────────────────────────────────────────
@pytest.mark.parametrize("fam", ["windows", "windows_server"])
def test_windows_base_is_built_cloak_image(fam):
base = _base(fam, services=["smb", "rdp"])
assert "image" not in base
assert base["build"]["args"]["BASE_IMAGE"] == "debian:bookworm-slim"
assert base["build"]["context"].endswith("templates/_shared/cloak")
@pytest.mark.parametrize("fam", ["windows", "windows_server"])
def test_windows_base_runs_cloak_netns_safe(fam):
base = _base(fam)
# supervisor keeps sleep infinity as PID1 so a cloak crash can't kill the netns
assert base["command"] == _CLOAK_COMMAND
assert "decnet.cloak" in base["command"][-1]
assert "sleep infinity" in base["command"][-1]
@pytest.mark.parametrize("fam", ["windows", "windows_server"])
def test_windows_base_caps_include_net_raw(fam):
base = _base(fam)
assert "NET_ADMIN" in base["cap_add"]
assert "NET_RAW" in base["cap_add"]
def test_windows_base_env_carries_profile_and_ports():
base = _base("windows_server", services=["smb", "rdp"])
env = base["environment"]
assert env["DECNET_NMAP_OS"] == "windows_server"
assert env["DECNET_OPEN_PORTS"] == "139,445,3389"
assert env["DECKY_IP"] == "10.0.0.10"
def test_windows_base_still_has_sysctls():
base = _base("windows")
assert base["sysctls"]["net.ipv4.ip_default_ttl"] == "128"
assert base["sysctls"]["net.ipv4.tcp_timestamps"] == "1"
# ── deployer sync gating ────────────────────────────────────────────────────
def test_sync_cloak_ships_subtree_only_when_needed(tmp_path, monkeypatch):
from decnet.engine import deployer
dest_root = tmp_path / "cloak"
monkeypatch.setattr(deployer, "_CANONICAL_CLOAK_DIR", dest_root)
# linux-only → no-op
deployer._sync_cloak_sources(_config(_decky("linux")))
assert not (dest_root / "decnet").exists()
# windows → ships the subtree, package structure preserved
deployer._sync_cloak_sources(_config(_decky("windows")))
shipped = dest_root / "decnet"
assert (shipped / "__init__.py").is_file()
assert (shipped / "os_fingerprint.py").is_file()
assert (shipped / "cloak" / "mangler.py").is_file()
assert (shipped / "logging" / "__init__.py").is_file()

View File

@@ -50,8 +50,20 @@ def test_linux_tcp_timestamps_is_1():
assert get_os_sysctls("linux")["net.ipv4.tcp_timestamps"] == "1"
def test_windows_tcp_timestamps_is_0():
assert get_os_sysctls("windows")["net.ipv4.tcp_timestamps"] == "0"
def test_windows_tcp_timestamps_is_1():
# Modern Windows 10/11 runs TCP timestamps ON (nmap SEQ.TS=A). A prior
# value of 0 here fingerprinted as an ancient stack — see os_fingerprint.py.
assert get_os_sysctls("windows")["net.ipv4.tcp_timestamps"] == "1"
def test_windows_server_tcp_timestamps_is_1():
assert get_os_sysctls("windows_server")["net.ipv4.tcp_timestamps"] == "1"
def test_windows_server_tcp_ecn_is_1():
# Server negotiates ECN (nmap ECN.CC=Y); workstation does not (CC=N).
assert get_os_sysctls("windows_server")["net.ipv4.tcp_ecn"] == "1"
assert get_os_sysctls("windows")["net.ipv4.tcp_ecn"] == "0"
def test_embedded_tcp_timestamps_is_0():
@@ -237,7 +249,7 @@ def test_all_os_families_non_empty():
assert "embedded" in families
@pytest.mark.parametrize("family", ["linux", "windows", "bsd", "embedded", "cisco"])
@pytest.mark.parametrize("family", ["linux", "windows", "windows_server", "bsd", "embedded", "cisco"])
def test_all_os_profiles_have_required_sysctls(family: str):
"""Every OS profile must define the full canonical sysctl set."""
from decnet.os_fingerprint import _REQUIRED_SYSCTLS
@@ -246,7 +258,7 @@ def test_all_os_profiles_have_required_sysctls(family: str):
assert not missing, f"OS profile '{family}' is missing sysctls: {missing}"
@pytest.mark.parametrize("family", ["linux", "windows", "bsd", "embedded", "cisco"])
@pytest.mark.parametrize("family", ["linux", "windows", "windows_server", "bsd", "embedded", "cisco"])
def test_all_os_sysctl_values_are_strings(family: str):
"""Docker Compose requires sysctl values to be strings, never ints."""
for _key, _val in get_os_sysctls(family).items():
@@ -267,9 +279,13 @@ def test_archetype_nmap_os_is_known(slug, arch):
)
@pytest.mark.parametrize("slug", ["windows-workstation", "windows-server", "domain-controller"])
def test_windows_archetypes_have_windows_nmap_os(slug):
assert ARCHETYPES[slug].nmap_os == "windows"
def test_windows_workstation_archetype_nmap_os():
assert ARCHETYPES["windows-workstation"].nmap_os == "windows"
@pytest.mark.parametrize("slug", ["windows-server", "domain-controller"])
def test_windows_server_archetypes_use_server_nmap_os(slug):
assert ARCHETYPES[slug].nmap_os == "windows_server"
@pytest.mark.parametrize("slug", ["printer", "iot-device", "industrial-control"])
@@ -403,11 +419,11 @@ def test_compose_linux_sysctls_include_timestamps():
assert sysctls.get("net.ipv4.tcp_timestamps") == "1"
def test_compose_windows_sysctls_no_timestamps():
"""Windows compose output must have tcp_timestamps disabled (= 0)."""
def test_compose_windows_sysctls_timestamps_on():
"""Windows compose output must have tcp_timestamps ENABLED (= 1) — Win10/11."""
compose = generate_compose(_make_config("windows"))
sysctls = compose["services"]["decky-01"]["sysctls"]
assert sysctls.get("net.ipv4.tcp_timestamps") == "0"
assert sysctls.get("net.ipv4.tcp_timestamps") == "1"
def test_compose_linux_sysctls_full_set():