From d7da3a7fc7e491fc7491a381271ff0b486532cfe Mon Sep 17 00:00:00 2001 From: anti Date: Wed, 15 Apr 2026 12:51:17 -0400 Subject: [PATCH] feat: add advanced OS fingerprinting via p0f integration - decnet/sniffer/fingerprint.py: enhance TCP/IP fingerprinting pipeline - decnet/sniffer/p0f.py: integrate p0f for passive OS classification - Improves attacker profiling accuracy in honeypot interaction analysis --- decnet/sniffer/fingerprint.py | 257 +++++++++++++++++++++++++++++++++- decnet/sniffer/p0f.py | 235 +++++++++++++++++++++++++++++++ 2 files changed, 489 insertions(+), 3 deletions(-) create mode 100644 decnet/sniffer/p0f.py diff --git a/decnet/sniffer/fingerprint.py b/decnet/sniffer/fingerprint.py index 756d70c..70a1a39 100644 --- a/decnet/sniffer/fingerprint.py +++ b/decnet/sniffer/fingerprint.py @@ -14,6 +14,8 @@ import struct import time from typing import Any, Callable +from decnet.prober.tcpfp import _extract_options_order +from decnet.sniffer.p0f import guess_os, hop_distance, initial_ttl from decnet.sniffer.syslog import SEVERITY_INFO, SEVERITY_WARNING, syslog_line # ─── Constants ─────────────────────────────────────────────────────────────── @@ -23,6 +25,10 @@ SERVICE_NAME: str = "sniffer" _SESSION_TTL: float = 60.0 _DEDUP_TTL: float = 300.0 +# Inactivity after which a TCP flow is considered closed and its timing +# summary is flushed as an event. +_FLOW_IDLE_TIMEOUT: float = 120.0 + _GREASE: frozenset[int] = frozenset(0x0A0A + i * 0x1010 for i in range(16)) _TLS_RECORD_HANDSHAKE: int = 0x16 @@ -42,6 +48,38 @@ _EXT_EARLY_DATA: int = 0x002A _TCP_SYN: int = 0x02 _TCP_ACK: int = 0x10 +_TCP_FIN: int = 0x01 +_TCP_RST: int = 0x04 + + +# ─── TCP option extraction for passive fingerprinting ─────────────────────── + +def _extract_tcp_fingerprint(tcp_options: list) -> dict[str, Any]: + """ + Extract MSS, window-scale, SACK, timestamp flags, and the options order + signature from a scapy TCP options list. + """ + mss = 0 + wscale: int | None = None + sack_ok = False + has_ts = False + for opt_name, opt_value in tcp_options or []: + if opt_name == "MSS": + mss = opt_value + elif opt_name == "WScale": + wscale = opt_value + elif opt_name in ("SAckOK", "SAck"): + sack_ok = True + elif opt_name == "Timestamp": + has_ts = True + options_sig = _extract_options_order(tcp_options or []) + return { + "mss": mss, + "wscale": wscale, + "sack_ok": sack_ok, + "has_timestamps": has_ts, + "options_sig": options_sig, + } # ─── GREASE helpers ────────────────────────────────────────────────────────── @@ -655,6 +693,13 @@ class SnifferEngine: self._tcp_syn: dict[tuple[str, int, str, int], dict[str, Any]] = {} self._tcp_rtt: dict[tuple[str, int, str, int], dict[str, Any]] = {} + # Per-flow timing aggregator. Key: (src_ip, src_port, dst_ip, dst_port). + # Flow direction is client→decky; reverse packets are associated back + # to the forward flow so we can track retransmits and inter-arrival. + self._flows: dict[tuple[str, int, str, int], dict[str, Any]] = {} + self._flow_last_cleanup: float = 0.0 + self._FLOW_CLEANUP_INTERVAL: float = 30.0 + self._dedup_cache: dict[tuple[str, str, str], float] = {} self._dedup_last_cleanup: float = 0.0 self._DEDUP_CLEANUP_INTERVAL: float = 60.0 @@ -693,6 +738,16 @@ class SnifferEngine: "|" + fields.get("ja4", "") + "|" + fields.get("ja4s", "")) if event_type == "tls_certificate": return fields.get("subject_cn", "") + "|" + fields.get("issuer", "") + if event_type == "tcp_syn_fingerprint": + # Dedupe per (OS signature, options layout). One event per unique + # stack profile from this attacker IP per dedup window. + return fields.get("os_guess", "") + "|" + fields.get("options_sig", "") + if event_type == "tcp_flow_timing": + # Dedup per (attacker_ip, decky_port) — src_port is deliberately + # excluded so a port scanner rotating source ports only produces + # one timing event per dedup window. Behavior cadence doesn't + # need per-ephemeral-port fidelity. + return fields.get("dst_ip", "") + "|" + fields.get("dst_port", "") return fields.get("mechanisms", fields.get("resumption", "")) def _is_duplicate(self, event_type: str, fields: dict[str, Any]) -> bool: @@ -719,6 +774,149 @@ class SnifferEngine: line = syslog_line(SERVICE_NAME, node_name, event_type, severity=severity, **fields) self._write_fn(line) + # ── Flow tracking (per-TCP-4-tuple timing + retransmits) ──────────────── + + def _flow_key( + self, + src_ip: str, + src_port: int, + dst_ip: str, + dst_port: int, + ) -> tuple[str, int, str, int]: + """ + Canonicalize a packet to the *client→decky* direction so forward and + reverse packets share one flow record. + """ + if dst_ip in self._ip_to_decky: + return (src_ip, src_port, dst_ip, dst_port) + # Otherwise src is the decky, flip. + return (dst_ip, dst_port, src_ip, src_port) + + def _update_flow( + self, + flow_key: tuple[str, int, str, int], + now: float, + seq: int, + payload_len: int, + direction_forward: bool, + ) -> None: + """Record one packet into the flow aggregator.""" + flow = self._flows.get(flow_key) + if flow is None: + flow = { + "start": now, + "last": now, + "packets": 0, + "bytes": 0, + "iat_sum": 0.0, + "iat_min": float("inf"), + "iat_max": 0.0, + "iat_count": 0, + "forward_seqs": set(), + "retransmits": 0, + "emitted": False, + } + self._flows[flow_key] = flow + + if flow["packets"] > 0: + iat = now - flow["last"] + if iat >= 0: + flow["iat_sum"] += iat + flow["iat_count"] += 1 + if iat < flow["iat_min"]: + flow["iat_min"] = iat + if iat > flow["iat_max"]: + flow["iat_max"] = iat + + flow["last"] = now + flow["packets"] += 1 + flow["bytes"] += payload_len + + # Retransmit detection: a forward-direction packet with payload whose + # sequence number we've already seen is a retransmit. Empty SYN/ACKs + # are excluded because they share seq legitimately. + if direction_forward and payload_len > 0: + if seq in flow["forward_seqs"]: + flow["retransmits"] += 1 + else: + flow["forward_seqs"].add(seq) + + def _flush_flow( + self, + flow_key: tuple[str, int, str, int], + node_name: str, + ) -> None: + """Emit one `tcp_flow_timing` event for *flow_key* and drop its state. + + Trivial flows (scan probes: 1–2 packets, sub-second duration) are + dropped silently — they add noise to the log pipeline without carrying + usable behavioral signal (beacon cadence, exfil timing, retransmits + are all meaningful only on longer-lived flows). + """ + flow = self._flows.pop(flow_key, None) + if flow is None or flow.get("emitted"): + return + flow["emitted"] = True + + # Skip uninteresting flows — keep the log pipeline from being flooded + # by short-lived scan probes. + duration = flow["last"] - flow["start"] + if flow["packets"] < 4 and flow["retransmits"] == 0 and duration < 1.0: + return + + src_ip, src_port, dst_ip, dst_port = flow_key + iat_count = flow["iat_count"] + mean_iat_ms = round((flow["iat_sum"] / iat_count) * 1000, 2) if iat_count else 0.0 + min_iat_ms = round(flow["iat_min"] * 1000, 2) if iat_count else 0.0 + max_iat_ms = round(flow["iat_max"] * 1000, 2) if iat_count else 0.0 + duration_s = round(duration, 3) + + self._log( + node_name, + "tcp_flow_timing", + src_ip=src_ip, + src_port=str(src_port), + dst_ip=dst_ip, + dst_port=str(dst_port), + packets=str(flow["packets"]), + bytes=str(flow["bytes"]), + duration_s=str(duration_s), + mean_iat_ms=str(mean_iat_ms), + min_iat_ms=str(min_iat_ms), + max_iat_ms=str(max_iat_ms), + retransmits=str(flow["retransmits"]), + ) + + def flush_all_flows(self) -> None: + """ + Flush every tracked flow (emit `tcp_flow_timing` events) and drop + state. Safe to call from outside the sniff thread; used during + shutdown and in tests. + """ + for key in list(self._flows.keys()): + decky = self._ip_to_decky.get(key[2]) + if decky: + self._flush_flow(key, decky) + else: + self._flows.pop(key, None) + + def _flush_idle_flows(self) -> None: + """Flush any flow whose last packet was more than _FLOW_IDLE_TIMEOUT ago.""" + now = time.monotonic() + if now - self._flow_last_cleanup < self._FLOW_CLEANUP_INTERVAL: + return + self._flow_last_cleanup = now + stale: list[tuple[str, int, str, int]] = [ + k for k, f in self._flows.items() + if now - f["last"] > _FLOW_IDLE_TIMEOUT + ] + for key in stale: + decky = self._ip_to_decky.get(key[2]) + if decky: + self._flush_flow(key, decky) + else: + self._flows.pop(key, None) + def on_packet(self, pkt: Any) -> None: """Process a single scapy packet. Called from the sniff thread.""" try: @@ -743,21 +941,74 @@ class SnifferEngine: if node_name is None: return - # TCP SYN tracking for JA4L + now = time.monotonic() + + # Per-flow timing aggregation (covers all TCP traffic, not just TLS) + flow_key = self._flow_key(src_ip, src_port, dst_ip, dst_port) + direction_forward = (flow_key[0] == src_ip and flow_key[1] == src_port) + tcp_payload_len = len(bytes(tcp.payload)) + self._update_flow( + flow_key, + now=now, + seq=int(tcp.seq), + payload_len=tcp_payload_len, + direction_forward=direction_forward, + ) + self._flush_idle_flows() + + # TCP SYN tracking for JA4L + passive SYN fingerprint if flags & _TCP_SYN and not (flags & _TCP_ACK): key = (src_ip, src_port, dst_ip, dst_port) - self._tcp_syn[key] = {"time": time.monotonic(), "ttl": ip.ttl} + self._tcp_syn[key] = {"time": now, "ttl": ip.ttl} + + # Emit passive OS fingerprint on the *client* SYN. Only do this + # when the destination is a known decky, i.e. we're seeing an + # attacker's initial packet. + if dst_ip in self._ip_to_decky: + tcp_fp = _extract_tcp_fingerprint(list(tcp.options or [])) + os_label = guess_os( + ttl=ip.ttl, + window=int(tcp.window), + mss=tcp_fp["mss"], + wscale=tcp_fp["wscale"], + options_sig=tcp_fp["options_sig"], + ) + target_node = self._ip_to_decky[dst_ip] + self._log( + target_node, + "tcp_syn_fingerprint", + src_ip=src_ip, + src_port=str(src_port), + dst_ip=dst_ip, + dst_port=str(dst_port), + ttl=str(ip.ttl), + initial_ttl=str(initial_ttl(ip.ttl)), + hop_distance=str(hop_distance(ip.ttl)), + window=str(int(tcp.window)), + mss=str(tcp_fp["mss"]), + wscale=("" if tcp_fp["wscale"] is None else str(tcp_fp["wscale"])), + options_sig=tcp_fp["options_sig"], + has_sack=str(tcp_fp["sack_ok"]).lower(), + has_timestamps=str(tcp_fp["has_timestamps"]).lower(), + os_guess=os_label, + ) elif flags & _TCP_SYN and flags & _TCP_ACK: rev_key = (dst_ip, dst_port, src_ip, src_port) syn_data = self._tcp_syn.pop(rev_key, None) if syn_data: - rtt_ms = round((time.monotonic() - syn_data["time"]) * 1000, 2) + rtt_ms = round((now - syn_data["time"]) * 1000, 2) self._tcp_rtt[rev_key] = { "rtt_ms": rtt_ms, "client_ttl": syn_data["ttl"], } + # Flush flow on FIN/RST (terminal packets). + if flags & (_TCP_FIN | _TCP_RST): + decky = self._ip_to_decky.get(flow_key[2]) + if decky: + self._flush_flow(flow_key, decky) + payload = bytes(tcp.payload) if not payload: return diff --git a/decnet/sniffer/p0f.py b/decnet/sniffer/p0f.py new file mode 100644 index 0000000..41ae41e --- /dev/null +++ b/decnet/sniffer/p0f.py @@ -0,0 +1,235 @@ +""" +Passive OS fingerprinting (p0f-lite) for the DECNET sniffer. + +Pure-Python lookup module. Given the values of an incoming TCP SYN packet +(TTL, window, MSS, window-scale, and TCP option ordering), returns a coarse +OS bucket (linux / windows / macos_ios / freebsd / openbsd / nmap / unknown) +plus derived hop distance and inferred initial TTL. + +Rationale +--------- +Full p0f v3 distinguishes several dozen OS/tool profiles by combining dozens +of low-level quirks (OLEN, WSIZE, EOL padding, PCLASS, quirks, payload class). +For DECNET we only need a coarse bucket — enough to tag an attacker as +"linux beacon" vs "windows interactive" vs "active scan". The curated +table below covers default stacks that dominate real-world attacker traffic. + +References (public p0f v3 DB, nmap-os-db, and Mozilla OS Fingerprint table): + https://github.com/p0f/p0f/blob/master/p0f.fp + +No external dependencies. +""" + +from __future__ import annotations + +# ─── TTL → initial TTL bucket ─────────────────────────────────────────────── + +# Common "hop 0" TTLs. Packets decrement TTL once per hop, so we round up +# the observed TTL to the nearest known starting value. +_TTL_BUCKETS: tuple[int, ...] = (32, 64, 128, 255) + + +def initial_ttl(ttl: int) -> int: + """ + Round *ttl* up to the nearest known initial-TTL bucket. + + A SYN with TTL=59 was almost certainly emitted by a Linux/BSD host + (initial 64) five hops away; TTL=120 by a Windows host (initial 128) + eight hops away. + """ + for bucket in _TTL_BUCKETS: + if ttl <= bucket: + return bucket + return 255 + + +def hop_distance(ttl: int) -> int: + """ + Estimate hops between the attacker and the sniffer based on TTL. + + Upper-bounded at 64 (anything further has most likely been mangled + by a misconfigured firewall or a TTL-spoofing NAT). + """ + dist = initial_ttl(ttl) - ttl + if dist < 0: + return 0 + if dist > 64: + return 64 + return dist + + +# ─── OS signature table (TTL bucket, window, MSS, wscale, option-order) ───── + +# Each entry is a set of loose predicates. If all predicates match, the +# OS label is returned. First-match wins. `None` means "don't care". +# +# The option signatures use the short-code alphabet from +# decnet/prober/tcpfp.py :: _OPT_CODES (M=MSS, N=NOP, W=WScale, +# T=Timestamp, S=SAckOK, E=EOL). + +_SIGNATURES: tuple[tuple[dict, str], ...] = ( + # ── nmap -sS / -sT default probe ─────────────────────────────────────── + # nmap crafts very distinctive SYNs: tiny window (1024/4096/etc.), full + # option set including WScale=10 and SAckOK. Match these first so they + # don't get misclassified as Linux. + ( + { + "ttl_bucket": 64, + "window_in": {1024, 2048, 3072, 4096, 31337, 32768, 65535}, + "mss": 1460, + "wscale": 10, + "options": "M,W,T,S,S", + }, + "nmap", + ), + ( + { + "ttl_bucket": 64, + "window_in": {1024, 2048, 3072, 4096, 31337, 32768, 65535}, + "options_starts_with": "M,W,T,S", + }, + "nmap", + ), + # ── macOS / iOS default SYN (match before Linux — shares TTL 64) ────── + # TTL 64, window 65535, MSS 1460, WScale 6, specific option order + # M,N,W,N,N,T,S,E (Darwin signature with EOL padding). + ( + { + "ttl_bucket": 64, + "window": 65535, + "wscale": 6, + "options": "M,N,W,N,N,T,S,E", + }, + "macos_ios", + ), + ( + { + "ttl_bucket": 64, + "window_in": {65535}, + "wscale_in": {5, 6}, + "has_timestamps": True, + "options_ends_with": "E", + }, + "macos_ios", + ), + # ── FreeBSD default SYN (TTL 64, no EOL) ─────────────────────────────── + ( + { + "ttl_bucket": 64, + "window": 65535, + "wscale": 6, + "has_sack": True, + "has_timestamps": True, + "options_no_eol": True, + }, + "freebsd", + ), + # ── Linux (kernel 3.x – 6.x) default SYN ─────────────────────────────── + # TTL 64, window 29200 / 64240 / 65535, MSS 1460, WScale 7, full options. + ( + { + "ttl_bucket": 64, + "window_min": 5000, + "wscale_in": {6, 7, 8, 9, 10, 11, 12, 13, 14}, + "has_sack": True, + "has_timestamps": True, + }, + "linux", + ), + # ── OpenBSD default SYN ───────────────────────────────────────────────── + # TTL 64, window 16384, WScale 3-6, MSS 1460 + ( + { + "ttl_bucket": 64, + "window_in": {16384, 16960}, + "wscale_in": {3, 4, 5, 6}, + }, + "openbsd", + ), + # ── Windows 10/11/Server default SYN ──────────────────────────────────── + # TTL 128, window 64240/65535, MSS 1460, WScale 8, SACK+TS + ( + { + "ttl_bucket": 128, + "window_min": 8192, + "wscale_in": {2, 6, 7, 8}, + "has_sack": True, + }, + "windows", + ), + # ── Windows 7/XP (legacy) ─────────────────────────────────────────────── + ( + { + "ttl_bucket": 128, + "window_in": {8192, 16384, 65535}, + }, + "windows", + ), + # ── Embedded / Cisco / network gear ───────────────────────────────────── + ( + { + "ttl_bucket": 255, + }, + "embedded", + ), +) + + +def _match_signature( + sig: dict, + ttl: int, + window: int, + mss: int, + wscale: int | None, + options_sig: str, +) -> bool: + """Evaluate every predicate in *sig* against the observed values.""" + tb = initial_ttl(ttl) + if "ttl_bucket" in sig and sig["ttl_bucket"] != tb: + return False + if "window" in sig and sig["window"] != window: + return False + if "window_in" in sig and window not in sig["window_in"]: + return False + if "window_min" in sig and window < sig["window_min"]: + return False + if "mss" in sig and sig["mss"] != mss: + return False + if "wscale" in sig and sig["wscale"] != wscale: + return False + if "wscale_in" in sig and wscale not in sig["wscale_in"]: + return False + if "has_sack" in sig: + if sig["has_sack"] != ("S" in options_sig): + return False + if "has_timestamps" in sig: + if sig["has_timestamps"] != ("T" in options_sig): + return False + if "options" in sig and sig["options"] != options_sig: + return False + if "options_starts_with" in sig and not options_sig.startswith(sig["options_starts_with"]): + return False + if "options_ends_with" in sig and not options_sig.endswith(sig["options_ends_with"]): + return False + if "options_no_eol" in sig and sig["options_no_eol"] and "E" in options_sig: + return False + return True + + +def guess_os( + ttl: int, + window: int, + mss: int = 0, + wscale: int | None = None, + options_sig: str = "", +) -> str: + """ + Return a coarse OS bucket for the given SYN characteristics. + + One of: "linux", "windows", "macos_ios", "freebsd", "openbsd", + "embedded", "nmap", "unknown". + """ + for sig, label in _SIGNATURES: + if _match_signature(sig, ttl, window, mss, wscale, options_sig): + return label + return "unknown"