From c595d039bd2bf386a978a3140d58e28b4d71c6fb Mon Sep 17 00:00:00 2001 From: anti Date: Sun, 26 Apr 2026 20:30:24 -0400 Subject: [PATCH] feat(sniffer): ISN sequence classifier (reuses seq_class helper) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors the IP-ID classifier for TCP ISN values: per-source-IP rolling deque (maxlen=8) populated from each inbound SYN's tcp.seq, classified on every emission. A 'random' verdict is the modern norm; 'incremental', 'zero', or 'constant' indicates legacy stacks or hand-rolled raw-socket tooling — a strong fingerprint signal. Active prober now also captures server_isn (single sample, not classified in-flight; downstream consumers correlating multi-probe results can apply seq_class.classify_sequence themselves). Profiler rollup carries the latest non-'unknown' label into attacker.tcp_fingerprint. Dedup key already covers isn_class from the previous commit, so transitions emit cleanly. UI surfaces ISN class as a colour-coded tag with a ⚠ glyph for non-random verdicts, since they're the genuinely interesting case. --- decnet/prober/tcpfp.py | 5 +++ decnet/prober/worker.py | 1 + decnet/profiler/fingerprint.py | 5 +++ decnet/sniffer/fingerprint.py | 9 +++++ decnet_web/src/components/AttackerDetail.tsx | 7 ++++ tests/prober/test_prober_tcpfp.py | 7 ++++ tests/sniffer/test_sniffer_tcp_fingerprint.py | 33 +++++++++++++++++++ 7 files changed, 67 insertions(+) diff --git a/decnet/prober/tcpfp.py b/decnet/prober/tcpfp.py index 62284634..3495230d 100644 --- a/decnet/prober/tcpfp.py +++ b/decnet/prober/tcpfp.py @@ -139,6 +139,10 @@ def _parse_synack(resp: Any) -> dict[str, Any]: # TCP fields window_size = tcp_layer.window + # Server ISN: single sample from one probe — not classified here, but + # exported so a downstream consumer correlating multiple probes against + # the same target can apply seq_class.classify_sequence(). + server_isn = int(getattr(tcp_layer, "seq", 0)) # Parse TCP options mss = 0 @@ -165,6 +169,7 @@ def _parse_synack(resp: Any) -> dict[str, Any]: "tos": tos, "dscp": dscp, "ecn": ecn, + "server_isn": server_isn, "mss": mss, "window_scale": window_scale, "sack_ok": sack_ok, diff --git a/decnet/prober/worker.py b/decnet/prober/worker.py index 5f53c2ac..6c37608f 100644 --- a/decnet/prober/worker.py +++ b/decnet/prober/worker.py @@ -415,6 +415,7 @@ def _tcpfp_phase( tos=str(result["tos"]), dscp=str(result["dscp"]), ecn=str(result["ecn"]), + server_isn=str(result["server_isn"]), msg=f"TCPFP {ip}:{port} = {result['tcpfp_hash']}", ) logger.info("prober: TCPFP %s:%d = %s", ip, port, result["tcpfp_hash"]) diff --git a/decnet/profiler/fingerprint.py b/decnet/profiler/fingerprint.py index 6d46181e..c2721125 100644 --- a/decnet/profiler/fingerprint.py +++ b/decnet/profiler/fingerprint.py @@ -144,6 +144,7 @@ def sniffer_rollup(events: list[LogEvent]) -> dict[str, Any]: hops: list[int] = [] tcp_fp: dict[str, Any] | None = None ipid_latest: str | None = None + isn_latest: str | None = None # Tracks which event set tcp_fp last — picks the provider "context" # (syn vs synack) when we feed the p0f-v2 matcher below. tcp_fp_context: str = "syn" @@ -193,6 +194,10 @@ def sniffer_rollup(events: list[LogEvent]) -> dict[str, Any]: if ipid_class and ipid_class != "unknown": ipid_latest = ipid_class tcp_fp["ipid_class"] = ipid_latest + isn_class = e.fields.get("isn_class") + if isn_class and isn_class != "unknown": + isn_latest = isn_class + tcp_fp["isn_class"] = isn_latest tcp_fp_context = "syn" elif e.event_type == _SNIFFER_FLOW_EVENT: diff --git a/decnet/sniffer/fingerprint.py b/decnet/sniffer/fingerprint.py index c94e1ac2..cd9bd31e 100644 --- a/decnet/sniffer/fingerprint.py +++ b/decnet/sniffer/fingerprint.py @@ -752,6 +752,7 @@ class SnifferEngine: # we can label them random/incremental/zero/constant. self._SEQ_SAMPLE_SIZE = 8 self._ipid_samples: dict[str, deque[int]] = {} + self._isn_samples: dict[str, deque[int]] = {} # Per-flow timing aggregator. Key: (src_ip, src_port, dst_ip, dst_port). # Flow direction is client→decky; reverse packets are associated back @@ -1052,6 +1053,12 @@ class SnifferEngine: ) ipid_buf.append(int(ip.id)) ipid_class = classify_sequence(list(ipid_buf)) + + isn_buf = self._isn_samples.setdefault( + src_ip, deque(maxlen=self._SEQ_SAMPLE_SIZE) + ) + isn_buf.append(int(tcp.seq)) + isn_class = classify_sequence(list(isn_buf)) os_label = guess_os( ttl=ip.ttl, window=int(tcp.window), @@ -1082,6 +1089,8 @@ class SnifferEngine: ecn=str(int(getattr(ip, "tos", 0)) & 0x3), ipid_class=ipid_class, ipid_samples=str(len(ipid_buf)), + isn_class=isn_class, + isn_samples=str(len(isn_buf)), os_guess=os_label, ) diff --git a/decnet_web/src/components/AttackerDetail.tsx b/decnet_web/src/components/AttackerDetail.tsx index 478ac807..b37e1626 100644 --- a/decnet_web/src/components/AttackerDetail.tsx +++ b/decnet_web/src/components/AttackerDetail.tsx @@ -23,6 +23,7 @@ interface AttackerBehavior { dscp?: number | null; ecn?: number | null; ipid_class?: string | null; + isn_class?: string | null; } | null; retransmit_count: number; behavior_class: string | null; @@ -771,6 +772,12 @@ const TcpStackBlock: React.FC<{ b: AttackerBehavior }> = ({ b }) => { {fp.ipid_class && fp.ipid_class !== 'unknown' && ( IPID:{fp.ipid_class.toUpperCase()} )} + {fp.isn_class && fp.isn_class !== 'unknown' && ( + + {fp.isn_class !== 'random' && '⚠ '} + ISN:{fp.isn_class.toUpperCase()} + + )} {fp.options_sig && (
diff --git a/tests/prober/test_prober_tcpfp.py b/tests/prober/test_prober_tcpfp.py index 000dac4a..b951be42 100644 --- a/tests/prober/test_prober_tcpfp.py +++ b/tests/prober/test_prober_tcpfp.py @@ -32,6 +32,7 @@ def _make_synack( tcp_flags: int = 0x12, # SYN-ACK options: list | None = None, ack: int = 1, + seq: int = 0, ) -> SimpleNamespace: """Build a fake scapy-like SYN-ACK packet for testing.""" if options is None: @@ -52,6 +53,7 @@ def _make_synack( options=options, dport=12345, ack=ack, + seq=seq, ) ip_layer = SimpleNamespace( ttl=ttl, @@ -198,6 +200,11 @@ class TestParseSynack: assert result["dscp"] == 10 assert result["ecn"] == 2 + def test_server_isn_captured(self): + resp = _make_synack(seq=0xDEADBEEF) + result = _parse_synack(resp) + assert result["server_isn"] == 0xDEADBEEF + def test_tos_ce_marked(self): # ECN CE bit set, no DSCP marking → ToS = 0x03 resp = _make_synack(tos=0x03) diff --git a/tests/sniffer/test_sniffer_tcp_fingerprint.py b/tests/sniffer/test_sniffer_tcp_fingerprint.py index 37cb566b..338a1cb1 100644 --- a/tests/sniffer/test_sniffer_tcp_fingerprint.py +++ b/tests/sniffer/test_sniffer_tcp_fingerprint.py @@ -188,6 +188,39 @@ class TestSynFingerprintEmission: assert last["ipid_class"] == "incremental" assert int(last["ipid_samples"]) >= 4 + def test_isn_classified_random_with_high_variance_seqs(self): + """SYNs with widely varying ISNs should classify as random.""" + engine, captured = _make_engine() + # Spread ISNs across the 32-bit space; randomised initial sequence. + seqs = [0x12345678, 0xABCDEF01, 0x0F0F0F0F, 0xDEADBEEF, + 0x11223344, 0x99887766, 0x44556677, 0xCAFEBABE] + for i, seq in enumerate(seqs): + pkt = IP(src=_ATTACKER_IP, dst=_DECKY_IP, ttl=64, id=2000 + i) / TCP( + sport=47000 + i, dport=22, flags="S", seq=seq, window=29200, + options=[("MSS", 1460), ("SAckOK", b""), ("Timestamp", (0, 0)), + ("NOP", None), ("WScale", 7)], + ) + engine.on_packet(pkt) + fp_lines = [ln for ln in captured if _msgid(ln) == "tcp_syn_fingerprint"] + last = _fields_from_line(fp_lines[-1]) + assert last["isn_class"] == "random" + assert int(last["isn_samples"]) >= 4 + + def test_isn_classified_incremental_with_monotonic_seqs(self): + """SYNs whose ISNs march upward in small steps should classify + as incremental — a strong fingerprint signal vs. modern stacks.""" + engine, captured = _make_engine() + for i in range(8): + pkt = IP(src=_ATTACKER_IP, dst=_DECKY_IP, ttl=64, id=3000 + i) / TCP( + sport=48000 + i, dport=22, flags="S", seq=10_000 + i, window=29200, + options=[("MSS", 1460), ("SAckOK", b""), ("Timestamp", (0, 0)), + ("NOP", None), ("WScale", 7)], + ) + engine.on_packet(pkt) + fp_lines = [ln for ln in captured if _msgid(ln) == "tcp_syn_fingerprint"] + last = _fields_from_line(fp_lines[-1]) + assert last["isn_class"] == "incremental" + def test_decky_source_does_not_emit(self): """Packets originating from a decky (outbound reply) should NOT be classified as an attacker fingerprint."""