diff --git a/tests/test_sniffer_p0f.py b/tests/test_sniffer_p0f.py new file mode 100644 index 0000000..a8f5715 --- /dev/null +++ b/tests/test_sniffer_p0f.py @@ -0,0 +1,117 @@ +""" +Unit tests for the passive p0f-lite OS fingerprint lookup. + +Covers: + - initial_ttl() TTL → bucket rounding + - hop_distance() upper-bound clamping + - guess_os() signature matching for Linux, Windows, macOS, nmap, + embedded, and the unknown fallback +""" + +from __future__ import annotations + +from decnet.sniffer.p0f import guess_os, hop_distance, initial_ttl + + +# ─── initial_ttl ──────────────────────────────────────────────────────────── + +class TestInitialTtl: + def test_linux_bsd(self): + assert initial_ttl(64) == 64 + assert initial_ttl(59) == 64 + assert initial_ttl(33) == 64 + + def test_windows(self): + assert initial_ttl(128) == 128 + assert initial_ttl(120) == 128 + assert initial_ttl(65) == 128 + + def test_embedded(self): + assert initial_ttl(255) == 255 + assert initial_ttl(254) == 255 + assert initial_ttl(200) == 255 + + def test_very_short(self): + # anything <= 32 rounds to 32 + assert initial_ttl(32) == 32 + assert initial_ttl(1) == 32 + + def test_out_of_range(self): + # Packets with TTL > 255 (should never happen) still bucket. + assert initial_ttl(300) == 255 + + +# ─── hop_distance ─────────────────────────────────────────────────────────── + +class TestHopDistance: + def test_zero_when_local(self): + assert hop_distance(64) == 0 + assert hop_distance(128) == 0 + assert hop_distance(255) == 0 + + def test_typical(self): + assert hop_distance(60) == 4 # 4 hops from Linux + assert hop_distance(120) == 8 # 8 hops from Windows + + def test_negative_or_weird_still_bucketed(self): + # TTL=0 is anomalous but we still return a non-negative distance. + # TTL 0 bucket is 32 → distance = 32 - 0 = 32. + assert hop_distance(0) == 32 + + +# ─── guess_os ─────────────────────────────────────────────────────────────── + +class TestGuessOs: + def test_linux_default(self): + # Modern Linux: TTL 64, window 29200+, WScale 7, full options + result = guess_os( + ttl=64, window=29200, mss=1460, wscale=7, + options_sig="M,S,T,N,W", + ) + assert result == "linux" + + def test_windows_default(self): + # Windows 10: TTL 128, window 64240, WScale 8, MSS 1460 + result = guess_os( + ttl=128, window=64240, mss=1460, wscale=8, + options_sig="M,N,W,N,N,T,S", + ) + assert result == "windows" + + def test_macos_ios(self): + # macOS default: TTL 64, window 65535, WScale 6, ends with EOL + result = guess_os( + ttl=64, window=65535, mss=1460, wscale=6, + options_sig="M,N,W,N,N,T,S,E", + ) + assert result == "macos_ios" + + def test_nmap_sYn(self): + # nmap -sS uses tiny/distinctive windows like 1024 or 4096 + result = guess_os( + ttl=64, window=1024, mss=1460, wscale=10, + options_sig="M,W,T,S,S", + ) + assert result == "nmap" + + def test_nmap_alt_window(self): + result = guess_os( + ttl=64, window=31337, mss=1460, wscale=10, + options_sig="M,W,T,S,S", + ) + assert result == "nmap" + + def test_embedded_ttl255(self): + # Any TTL bucket 255 → embedded + result = guess_os( + ttl=250, window=4128, mss=536, wscale=None, + options_sig="M", + ) + assert result == "embedded" + + def test_unknown(self): + # Bizarre combo nothing matches + result = guess_os( + ttl=50, window=100, mss=0, wscale=None, options_sig="", + ) + assert result == "unknown" diff --git a/tests/test_sniffer_retransmit.py b/tests/test_sniffer_retransmit.py new file mode 100644 index 0000000..7572886 --- /dev/null +++ b/tests/test_sniffer_retransmit.py @@ -0,0 +1,108 @@ +""" +Unit tests for TCP retransmit detection in the SnifferEngine flow aggregator. + +A retransmit is defined as a *forward-direction* (attacker → decky) TCP +segment carrying payload whose sequence number has already been seen on +this flow. Empty SYN/ACKs that share seq legitimately are excluded. +""" + +from __future__ import annotations + +from scapy.layers.inet import IP, TCP + +from decnet.sniffer.fingerprint import SnifferEngine + + +_DECKY_IP = "192.168.1.10" +_DECKY = "decky-01" +_ATTACKER_IP = "10.0.0.7" + + +def _mk_engine() -> tuple[SnifferEngine, list[str]]: + captured: list[str] = [] + engine = SnifferEngine( + ip_to_decky={_DECKY_IP: _DECKY}, + write_fn=captured.append, + dedup_ttl=0, # disable dedup for easier assertion + ) + return engine, captured + + +def _data_pkt(seq: int, payload: bytes = b"data", sport: int = 55555): + return IP(src=_ATTACKER_IP, dst=_DECKY_IP, ttl=64) / TCP( + sport=sport, dport=22, flags="A", seq=seq, window=29200, + ) / payload + + +def _rst(sport: int = 55555): + return IP(src=_DECKY_IP, dst=_ATTACKER_IP, ttl=64) / TCP( + sport=22, dport=sport, flags="R", + ) + + +def _extract_retransmits(lines: list[str]) -> int: + """Pull `retransmits=` from the last tcp_flow_timing line.""" + import re + for line in reversed(lines): + if "tcp_flow_timing" not in line: + continue + m = re.search(r'retransmits="(\d+)"', line) + if m: + return int(m.group(1)) + return -1 + + +class TestRetransmitDetection: + def test_no_retransmits_when_seqs_unique(self): + engine, captured = _mk_engine() + engine.on_packet(_data_pkt(seq=1000)) + engine.on_packet(_data_pkt(seq=1004)) + engine.on_packet(_data_pkt(seq=1008)) + engine.on_packet(_rst()) + assert _extract_retransmits(captured) == 0 + + def test_single_retransmit(self): + engine, captured = _mk_engine() + engine.on_packet(_data_pkt(seq=2000)) + engine.on_packet(_data_pkt(seq=2004)) + engine.on_packet(_data_pkt(seq=2000)) # retransmitted + engine.on_packet(_rst()) + assert _extract_retransmits(captured) == 1 + + def test_multiple_retransmits(self): + engine, captured = _mk_engine() + engine.on_packet(_data_pkt(seq=3000)) + engine.on_packet(_data_pkt(seq=3000)) + engine.on_packet(_data_pkt(seq=3000)) + engine.on_packet(_data_pkt(seq=3004)) + engine.on_packet(_rst()) + # Two retransmits (original + 2 dupes of seq=3000) + assert _extract_retransmits(captured) == 2 + + def test_reverse_direction_not_counted(self): + """Packets from decky → attacker sharing seq should NOT count.""" + engine, captured = _mk_engine() + # Forward data + engine.on_packet(_data_pkt(seq=4000)) + engine.on_packet(_data_pkt(seq=4004)) + engine.on_packet(_data_pkt(seq=4008)) + # Reverse response (decky → attacker) with same seq as a forward + # packet — different flow direction, must not count as retransmit. + reverse = IP(src=_DECKY_IP, dst=_ATTACKER_IP, ttl=64) / TCP( + sport=22, dport=55555, flags="A", seq=4000, window=29200, + ) / b"resp" + engine.on_packet(reverse) + engine.on_packet(_rst()) + assert _extract_retransmits(captured) == 0 + + def test_empty_segments_not_counted(self): + """Pure ACKs (no payload) are not retransmits even if seqs repeat.""" + engine, captured = _mk_engine() + # Three pure-ACKs with identical seq + for _ in range(3): + pkt = IP(src=_ATTACKER_IP, dst=_DECKY_IP, ttl=64) / TCP( + sport=55555, dport=22, flags="A", seq=5000, window=29200, + ) + engine.on_packet(pkt) + engine.on_packet(_rst()) + assert _extract_retransmits(captured) == 0 diff --git a/tests/test_sniffer_tcp_fingerprint.py b/tests/test_sniffer_tcp_fingerprint.py new file mode 100644 index 0000000..fc04714 --- /dev/null +++ b/tests/test_sniffer_tcp_fingerprint.py @@ -0,0 +1,232 @@ +""" +Integration tests for TCP-level passive fingerprinting in the SnifferEngine. + +Covers end-to-end flow from a scapy packet through `on_packet()` to: + - tcp_syn_fingerprint event emission (OS guess, options, hop distance) + - tcp_flow_timing event emission (packet count, duration, retransmits) + - dedup behavior (one event per unique fingerprint per window) + - flow flush on FIN/RST +""" + +from __future__ import annotations + +from scapy.layers.inet import IP, TCP + +from decnet.sniffer.fingerprint import SnifferEngine + + +# ─── Helpers ──────────────────────────────────────────────────────────────── + +_DECKY_IP = "192.168.1.10" +_DECKY = "decky-01" +_ATTACKER_IP = "10.0.0.7" + + +def _make_engine() -> tuple[SnifferEngine, list[str]]: + """Return (engine, captured_syslog_lines).""" + captured: list[str] = [] + engine = SnifferEngine( + ip_to_decky={_DECKY_IP: _DECKY}, + write_fn=captured.append, + dedup_ttl=300.0, + ) + return engine, captured + + +def _linux_syn(src_port: int = 45000, dst_port: int = 22, seq: int = 1000): + """Build a synthetic SYN that should fingerprint as Linux.""" + return IP(src=_ATTACKER_IP, dst=_DECKY_IP, ttl=64) / TCP( + sport=src_port, + dport=dst_port, + flags="S", + seq=seq, + window=29200, + options=[ + ("MSS", 1460), + ("SAckOK", b""), + ("Timestamp", (123, 0)), + ("NOP", None), + ("WScale", 7), + ], + ) + + +def _windows_syn(src_port: int = 45001): + """Build a synthetic SYN that should fingerprint as Windows.""" + return IP(src=_ATTACKER_IP, dst=_DECKY_IP, ttl=128) / TCP( + sport=src_port, + dport=3389, + flags="S", + window=64240, + options=[ + ("MSS", 1460), + ("NOP", None), + ("WScale", 8), + ("NOP", None), + ("NOP", None), + ("SAckOK", b""), + ], + ) + + +def _fields_from_line(line: str) -> dict[str, str]: + """Parse the SD-params section of an RFC 5424 syslog line into a dict.""" + import re + m = re.search(r"\[decnet@55555 (.*?)\]", line) + if not m: + return {} + body = m.group(1) + out: dict[str, str] = {} + for k, v in re.findall(r'(\w+)="((?:[^"\\]|\\.)*)"', body): + out[k] = v + return out + + +def _msgid(line: str) -> str: + """Extract MSGID from RFC 5424 line.""" + parts = line.split(" ", 6) + return parts[5] if len(parts) > 5 else "" + + +# ─── tcp_syn_fingerprint emission ────────────────────────────────────────── + +class TestSynFingerprintEmission: + def test_linux_syn_emits_fingerprint(self): + engine, captured = _make_engine() + engine.on_packet(_linux_syn()) + fp_lines = [ln for ln in captured if _msgid(ln) == "tcp_syn_fingerprint"] + assert len(fp_lines) == 1 + f = _fields_from_line(fp_lines[0]) + assert f["src_ip"] == _ATTACKER_IP + assert f["dst_ip"] == _DECKY_IP + assert f["os_guess"] == "linux" + assert f["ttl"] == "64" + assert f["initial_ttl"] == "64" + assert f["hop_distance"] == "0" + assert f["window"] == "29200" + assert f["wscale"] == "7" + assert f["mss"] == "1460" + assert f["has_sack"] == "true" + assert f["has_timestamps"] == "true" + + def test_windows_syn_emits_windows_guess(self): + engine, captured = _make_engine() + engine.on_packet(_windows_syn()) + fp_lines = [ln for ln in captured if _msgid(ln) == "tcp_syn_fingerprint"] + assert len(fp_lines) == 1 + f = _fields_from_line(fp_lines[0]) + assert f["os_guess"] == "windows" + assert f["ttl"] == "128" + assert f["initial_ttl"] == "128" + + def test_hop_distance_inferred_from_ttl(self): + engine, captured = _make_engine() + pkt = IP(src=_ATTACKER_IP, dst=_DECKY_IP, ttl=58) / TCP( + sport=40000, dport=22, flags="S", window=29200, + options=[("MSS", 1460), ("SAckOK", b""), ("Timestamp", (0, 0)), + ("NOP", None), ("WScale", 7)], + ) + engine.on_packet(pkt) + fp_lines = [ln for ln in captured if _msgid(ln) == "tcp_syn_fingerprint"] + f = _fields_from_line(fp_lines[0]) + assert f["initial_ttl"] == "64" + assert f["hop_distance"] == "6" + + def test_dedup_suppresses_repeated_fingerprints(self): + engine, captured = _make_engine() + engine.on_packet(_linux_syn(src_port=40001)) + engine.on_packet(_linux_syn(src_port=40002)) + engine.on_packet(_linux_syn(src_port=40003)) + fp_lines = [ln for ln in captured if _msgid(ln) == "tcp_syn_fingerprint"] + assert len(fp_lines) == 1 # same OS + options_sig deduped + + def test_different_os_not_deduped(self): + engine, captured = _make_engine() + engine.on_packet(_linux_syn(src_port=40001)) + engine.on_packet(_windows_syn(src_port=40002)) + fp_lines = [ln for ln in captured if _msgid(ln) == "tcp_syn_fingerprint"] + assert len(fp_lines) == 2 + + def test_decky_source_does_not_emit(self): + """Packets originating from a decky (outbound reply) should NOT + be classified as an attacker fingerprint.""" + engine, captured = _make_engine() + pkt = IP(src=_DECKY_IP, dst=_ATTACKER_IP, ttl=64) / TCP( + sport=22, dport=40000, flags="S", window=29200, + options=[("MSS", 1460)], + ) + engine.on_packet(pkt) + fp_lines = [ln for ln in captured if _msgid(ln) == "tcp_syn_fingerprint"] + assert fp_lines == [] + + +# ─── tcp_flow_timing emission ─────────────────────────────────────────────── + +class TestFlowTiming: + def test_flow_flushed_on_fin_if_non_trivial(self): + """A session with ≥4 packets triggers a tcp_flow_timing event on FIN.""" + engine, captured = _make_engine() + # SYN + 3 data ACKs + FIN = 5 packets → passes the trivial-flow filter + pkts = [_linux_syn(src_port=50000, seq=100)] + for i, seq in enumerate((101, 200, 300)): + pkts.append( + IP(src=_ATTACKER_IP, dst=_DECKY_IP, ttl=64) / TCP( + sport=50000, dport=22, flags="A", seq=seq, window=29200, + ) / b"hello-data-here" + ) + pkts.append( + IP(src=_ATTACKER_IP, dst=_DECKY_IP, ttl=64) / TCP( + sport=50000, dport=22, flags="FA", seq=400, window=29200, + ) + ) + for p in pkts: + engine.on_packet(p) + + flow_lines = [ln for ln in captured if _msgid(ln) == "tcp_flow_timing"] + assert len(flow_lines) == 1 + f = _fields_from_line(flow_lines[0]) + assert f["src_ip"] == _ATTACKER_IP + assert f["dst_ip"] == _DECKY_IP + assert int(f["packets"]) == 5 + assert int(f["retransmits"]) == 0 + + def test_trivial_flow_dropped(self): + """A 2-packet scan probe (SYN + RST) must NOT emit a timing event.""" + engine, captured = _make_engine() + engine.on_packet(_linux_syn(src_port=50001, seq=200)) + engine.on_packet( + IP(src=_DECKY_IP, dst=_ATTACKER_IP, ttl=64) / TCP( + sport=22, dport=50001, flags="R", window=0, + ) + ) + flow_lines = [ln for ln in captured if _msgid(ln) == "tcp_flow_timing"] + assert flow_lines == [] # trivial: packets<4, no retransmits, dur<1s + + def test_retransmit_forces_emission_on_short_flow(self): + """Even a 3-packet flow must emit if it contains a retransmit.""" + engine, captured = _make_engine() + engine.on_packet(_linux_syn(src_port=50002, seq=300)) + # Repeat a forward data seq → retransmit + for _ in range(2): + engine.on_packet( + IP(src=_ATTACKER_IP, dst=_DECKY_IP, ttl=64) / TCP( + sport=50002, dport=22, flags="A", seq=301, window=29200, + ) / b"payload" + ) + engine.on_packet( + IP(src=_DECKY_IP, dst=_ATTACKER_IP, ttl=64) / TCP( + sport=22, dport=50002, flags="R", window=0, + ) + ) + flow_lines = [ln for ln in captured if _msgid(ln) == "tcp_flow_timing"] + assert len(flow_lines) == 1 + f = _fields_from_line(flow_lines[0]) + assert int(f["retransmits"]) == 1 + + def test_flush_all_flows_helper_drops_trivial(self): + """flush_all_flows still filters trivial flows.""" + engine, captured = _make_engine() + engine.on_packet(_linux_syn(src_port=50003, seq=400)) + engine.flush_all_flows() + flow_lines = [ln for ln in captured if _msgid(ln) == "tcp_flow_timing"] + assert flow_lines == [] # single packet = trivial