diff --git a/decnet/profiler/fingerprint.py b/decnet/profiler/fingerprint.py index 37c3efbc..6d46181e 100644 --- a/decnet/profiler/fingerprint.py +++ b/decnet/profiler/fingerprint.py @@ -143,6 +143,7 @@ def sniffer_rollup(events: list[LogEvent]) -> dict[str, Any]: ttl_values: list[str] = [] hops: list[int] = [] tcp_fp: dict[str, Any] | None = None + ipid_latest: str | None = None # Tracks which event set tcp_fp last — picks the provider "context" # (syn vs synack) when we feed the p0f-v2 matcher below. tcp_fp_context: str = "syn" @@ -185,6 +186,13 @@ def sniffer_rollup(events: list[LogEvent]) -> dict[str, Any]: "dscp": _int_or_none(e.fields.get("dscp")), "ecn": _int_or_none(e.fields.get("ecn")), } + # Sequence classifications converge as samples accumulate; the + # most recent non-"unknown" label wins so a later "unknown" event + # (e.g. a deque reset) doesn't overwrite a confident verdict. + ipid_class = e.fields.get("ipid_class") + if ipid_class and ipid_class != "unknown": + ipid_latest = ipid_class + tcp_fp["ipid_class"] = ipid_latest tcp_fp_context = "syn" elif e.event_type == _SNIFFER_FLOW_EVENT: diff --git a/decnet/sniffer/fingerprint.py b/decnet/sniffer/fingerprint.py index f287fee7..c94e1ac2 100644 --- a/decnet/sniffer/fingerprint.py +++ b/decnet/sniffer/fingerprint.py @@ -12,10 +12,12 @@ from __future__ import annotations import hashlib import struct import time +from collections import deque from typing import Any, Callable from decnet.prober.tcpfp import _extract_options_order from decnet.sniffer.p0f import guess_os, hop_distance, initial_ttl +from decnet.sniffer.seq_class import classify_sequence from decnet.sniffer.syslog import SEVERITY_INFO, SEVERITY_WARNING, syslog_line from decnet.telemetry import traced as _traced, get_tracer as _get_tracer @@ -745,6 +747,12 @@ class SnifferEngine: self._tcp_syn: dict[tuple[str, int, str, int], dict[str, Any]] = {} self._tcp_rtt: dict[tuple[str, int, str, int], dict[str, Any]] = {} + # Per-source-IP rolling samples for sequence-pattern classification. + # IP-ID and TCP ISN need multiple SYNs from the same attacker before + # we can label them random/incremental/zero/constant. + self._SEQ_SAMPLE_SIZE = 8 + self._ipid_samples: dict[str, deque[int]] = {} + # Per-flow timing aggregator. Key: (src_ip, src_port, dst_ip, dst_port). # Flow direction is client→decky; reverse packets are associated back # to the forward flow so we can track retransmits and inter-arrival. @@ -791,9 +799,16 @@ class SnifferEngine: if event_type == "tls_certificate": return fields.get("subject_cn", "") + "|" + fields.get("issuer", "") if event_type == "tcp_syn_fingerprint": - # Dedupe per (OS signature, options layout). One event per unique - # stack profile from this attacker IP per dedup window. - return fields.get("os_guess", "") + "|" + fields.get("options_sig", "") + # Dedupe per (OS signature, options layout, sequence-pattern + # classification). Including ipid_class/isn_class lets each + # transition (unknown → random/incremental/zero/constant) emit + # exactly one fresh event as samples accumulate. + return ( + fields.get("os_guess", "") + + "|" + fields.get("options_sig", "") + + "|" + fields.get("ipid_class", "") + + "|" + fields.get("isn_class", "") + ) if event_type == "tcp_flow_timing": # Dedup per (attacker_ip, decky_port) — src_port is deliberately # excluded so a port scanner rotating source ports only produces @@ -1031,6 +1046,12 @@ class SnifferEngine: _span.set_attribute("attacker_ip", src_ip) _span.set_attribute("dst_port", dst_port) tcp_fp = _extract_tcp_fingerprint(list(tcp.options or [])) + + ipid_buf = self._ipid_samples.setdefault( + src_ip, deque(maxlen=self._SEQ_SAMPLE_SIZE) + ) + ipid_buf.append(int(ip.id)) + ipid_class = classify_sequence(list(ipid_buf)) os_label = guess_os( ttl=ip.ttl, window=int(tcp.window), @@ -1059,6 +1080,8 @@ class SnifferEngine: tos=str(int(getattr(ip, "tos", 0))), dscp=str((int(getattr(ip, "tos", 0)) >> 2) & 0x3F), ecn=str(int(getattr(ip, "tos", 0)) & 0x3), + ipid_class=ipid_class, + ipid_samples=str(len(ipid_buf)), os_guess=os_label, ) diff --git a/decnet/sniffer/seq_class.py b/decnet/sniffer/seq_class.py new file mode 100644 index 00000000..d0d52b46 --- /dev/null +++ b/decnet/sniffer/seq_class.py @@ -0,0 +1,63 @@ +""" +Sequence-pattern classifier for TCP/IP fields that are useful as a tooling +fingerprint when sampled across multiple packets from the same source. + +Two callers today: +- IP-ID sequence per attacker (random/incremental/zero/constant). +- TCP ISN sequence per attacker; modern stacks randomise, so a non-random + result is itself a strong signal (legacy stacks, custom raw-socket tools). + +Pure stdlib so it stays trivially unit-testable. +""" + +from __future__ import annotations + +import statistics + +# Minimum samples needed for a meaningful classification. Below this we +# return "unknown" rather than guess from 1-3 noisy values. +_MIN_SAMPLES = 4 + +# Max plausible delta for an "incremental" classification. The IP-ID field +# is 16-bit so kernel-emitted increments wrap rapidly under load — anything +# over 4096 between consecutive SYNs from the same host is almost certainly +# random rather than a counter we just happen to be sampling sparsely. +_INCREMENTAL_MAX_DELTA = 0x1000 + +# Coefficient-of-variation threshold above which we call a sequence random. +# stddev/mean > 0.5 is well past anything a counter would produce. +_RANDOM_CV_THRESHOLD = 0.5 + + +def classify_sequence(samples: list[int]) -> str: + """ + Classify an integer sequence as one of: + - "zero": every sample is 0 + - "constant": every sample is the same non-zero value + - "incremental": strictly monotonic with small positive deltas + - "random": high coefficient of variation, no monotonic pattern + - "unknown": fewer than _MIN_SAMPLES samples + + Order is preserved — pass the deque/list in arrival order. + """ + if len(samples) < _MIN_SAMPLES: + return "unknown" + + if all(s == 0 for s in samples): + return "zero" + + first = samples[0] + if all(s == first for s in samples): + return "constant" + + deltas = [b - a for a, b in zip(samples, samples[1:])] + if all(0 < d <= _INCREMENTAL_MAX_DELTA for d in deltas): + return "incremental" + + mean = statistics.fmean(samples) + if mean > 0: + stdev = statistics.pstdev(samples) + if stdev / mean > _RANDOM_CV_THRESHOLD: + return "random" + + return "random" diff --git a/decnet_web/src/components/AttackerDetail.tsx b/decnet_web/src/components/AttackerDetail.tsx index 28107991..478ac807 100644 --- a/decnet_web/src/components/AttackerDetail.tsx +++ b/decnet_web/src/components/AttackerDetail.tsx @@ -22,6 +22,7 @@ interface AttackerBehavior { tos?: number | null; dscp?: number | null; ecn?: number | null; + ipid_class?: string | null; } | null; retransmit_count: number; behavior_class: string | null; @@ -145,6 +146,18 @@ const HashRow: React.FC<{ label: string; value?: string | null }> = ({ label, va ); }; +// Random ISN/IP-ID is the modern default; non-random patterns are +// fingerprinting gold (legacy stacks, custom raw-socket tools). +const seqClassColor = (cls: string): string | undefined => { + switch (cls) { + case 'random': return undefined; // neutral, expected + case 'incremental': return '#e5c07b'; // amber — uncommon + case 'zero': + case 'constant': return '#98c379'; // green — strong signal + default: return undefined; + } +}; + const Tag: React.FC<{ children: React.ReactNode; color?: string }> = ({ children, color }) => ( = ({ b }) => {
{fp.has_sack && SACK} {fp.has_timestamps && TS} + {fp.ipid_class && fp.ipid_class !== 'unknown' && ( + IPID:{fp.ipid_class.toUpperCase()} + )}
{fp.options_sig && (
diff --git a/tests/sniffer/test_sniffer_seq_class.py b/tests/sniffer/test_sniffer_seq_class.py new file mode 100644 index 00000000..f0985792 --- /dev/null +++ b/tests/sniffer/test_sniffer_seq_class.py @@ -0,0 +1,66 @@ +""" +Unit tests for decnet.sniffer.seq_class.classify_sequence. + +Verifies the four classification branches plus the "unknown" fallback +when fewer than the minimum number of samples is supplied. +""" + +from __future__ import annotations + +from decnet.sniffer.seq_class import classify_sequence + + +class TestUnknown: + def test_empty(self): + assert classify_sequence([]) == "unknown" + + def test_below_min_samples(self): + # _MIN_SAMPLES is 4 — three samples should not commit. + assert classify_sequence([10, 20, 30]) == "unknown" + + +class TestZero: + def test_all_zero(self): + assert classify_sequence([0, 0, 0, 0, 0]) == "zero" + + def test_zero_long(self): + assert classify_sequence([0] * 8) == "zero" + + +class TestConstant: + def test_all_same_nonzero(self): + assert classify_sequence([42, 42, 42, 42]) == "constant" + + def test_mixed_breaks_constant(self): + assert classify_sequence([42, 42, 43, 42]) != "constant" + + +class TestIncremental: + def test_strict_increment_one(self): + assert classify_sequence([100, 101, 102, 103, 104]) == "incremental" + + def test_increment_with_small_jumps(self): + # Some kernels skip a few IDs but stay monotonic. + assert classify_sequence([1000, 1003, 1010, 1012, 1015]) == "incremental" + + def test_decreasing_is_not_incremental(self): + # Reverse-monotonic could happen on wrap; we treat it as random + # (callers care about a counter-like signal, not "any monotonic"). + assert classify_sequence([500, 400, 300, 200]) != "incremental" + + def test_huge_jump_breaks_incremental(self): + # 0x1000 = 4096 is the cutoff; 0x2000 between samples is "random". + result = classify_sequence([0, 0x2000, 0x4000, 0x6000]) + assert result == "random" + + +class TestRandom: + def test_high_variance(self): + samples = [12345, 0xABCD, 0x1234, 0xFFFF, 0x00FF, 0x7F7F] + assert classify_sequence(samples) == "random" + + def test_repeated_value_with_one_outlier(self): + # Not constant (one outlier), not monotonic, not high-variance — + # still classified as random per the fallthrough rule. + result = classify_sequence([42, 42, 42, 99]) + assert result == "random" diff --git a/tests/sniffer/test_sniffer_tcp_fingerprint.py b/tests/sniffer/test_sniffer_tcp_fingerprint.py index f6fa221b..37cb566b 100644 --- a/tests/sniffer/test_sniffer_tcp_fingerprint.py +++ b/tests/sniffer/test_sniffer_tcp_fingerprint.py @@ -163,6 +163,31 @@ class TestSynFingerprintEmission: assert f["dscp"] == "10" assert f["ecn"] == "2" + def test_ipid_classified_after_enough_samples(self): + """Eight SYNs from one source with monotonic IP-IDs should yield + ipid_class=incremental on the final emission. Each transition of + ipid_class is part of the dedup key, so we expect exactly one + emission per distinct class as samples accumulate.""" + engine, captured = _make_engine() + for i in range(8): + pkt = IP(src=_ATTACKER_IP, dst=_DECKY_IP, ttl=64, id=1000 + i) / TCP( + sport=46000 + i, dport=22, flags="S", seq=10_000 + i, + window=29200, + options=[("MSS", 1460), ("SAckOK", b""), ("Timestamp", (0, 0)), + ("NOP", None), ("WScale", 7)], + ) + engine.on_packet(pkt) + fp_lines = [ln for ln in captured if _msgid(ln) == "tcp_syn_fingerprint"] + # First emission has only 1 sample → ipid_class=unknown. + # Once samples reach _MIN_SAMPLES (4) classification flips → + # second emission has ipid_class=incremental. + assert len(fp_lines) == 2 + first = _fields_from_line(fp_lines[0]) + last = _fields_from_line(fp_lines[1]) + assert first["ipid_class"] == "unknown" + assert last["ipid_class"] == "incremental" + assert int(last["ipid_samples"]) >= 4 + def test_decky_source_does_not_emit(self): """Packets originating from a decky (outbound reply) should NOT be classified as an attacker fingerprint."""