feat(sniffer): IP-ID sequence classifier (random/incremental/zero/constant)

Adds a per-source-IP rolling sample buffer (deque, maxlen=8) for IP-ID
values seen on attacker SYNs and a stdlib-only classifier in
decnet/sniffer/seq_class.py. Each new SYN appends ip.id and re-classifies
the buffer; the result is logged on tcp_syn_fingerprint events alongside
sample count.

The dedup key now folds in ipid_class so a transition from 'unknown' to
a definitive verdict emits exactly one fresh event instead of being
suppressed by the old (os|options) key. Profiler rollup carries the
latest non-'unknown' label into attacker.tcp_fingerprint.

UI surfaces it as a colour-coded tag in the TCP STACK panel: random
neutral, incremental amber, zero/constant green (the strong signal).
This commit is contained in:
2026-04-26 20:28:32 -04:00
parent b0b08754d0
commit 0e40cc8ae1
6 changed files with 204 additions and 3 deletions

View File

@@ -0,0 +1,66 @@
"""
Unit tests for decnet.sniffer.seq_class.classify_sequence.
Verifies the four classification branches plus the "unknown" fallback
when fewer than the minimum number of samples is supplied.
"""
from __future__ import annotations
from decnet.sniffer.seq_class import classify_sequence
class TestUnknown:
def test_empty(self):
assert classify_sequence([]) == "unknown"
def test_below_min_samples(self):
# _MIN_SAMPLES is 4 — three samples should not commit.
assert classify_sequence([10, 20, 30]) == "unknown"
class TestZero:
def test_all_zero(self):
assert classify_sequence([0, 0, 0, 0, 0]) == "zero"
def test_zero_long(self):
assert classify_sequence([0] * 8) == "zero"
class TestConstant:
def test_all_same_nonzero(self):
assert classify_sequence([42, 42, 42, 42]) == "constant"
def test_mixed_breaks_constant(self):
assert classify_sequence([42, 42, 43, 42]) != "constant"
class TestIncremental:
def test_strict_increment_one(self):
assert classify_sequence([100, 101, 102, 103, 104]) == "incremental"
def test_increment_with_small_jumps(self):
# Some kernels skip a few IDs but stay monotonic.
assert classify_sequence([1000, 1003, 1010, 1012, 1015]) == "incremental"
def test_decreasing_is_not_incremental(self):
# Reverse-monotonic could happen on wrap; we treat it as random
# (callers care about a counter-like signal, not "any monotonic").
assert classify_sequence([500, 400, 300, 200]) != "incremental"
def test_huge_jump_breaks_incremental(self):
# 0x1000 = 4096 is the cutoff; 0x2000 between samples is "random".
result = classify_sequence([0, 0x2000, 0x4000, 0x6000])
assert result == "random"
class TestRandom:
def test_high_variance(self):
samples = [12345, 0xABCD, 0x1234, 0xFFFF, 0x00FF, 0x7F7F]
assert classify_sequence(samples) == "random"
def test_repeated_value_with_one_outlier(self):
# Not constant (one outlier), not monotonic, not high-variance —
# still classified as random per the fallthrough rule.
result = classify_sequence([42, 42, 42, 99])
assert result == "random"

View File

@@ -163,6 +163,31 @@ class TestSynFingerprintEmission:
assert f["dscp"] == "10"
assert f["ecn"] == "2"
def test_ipid_classified_after_enough_samples(self):
"""Eight SYNs from one source with monotonic IP-IDs should yield
ipid_class=incremental on the final emission. Each transition of
ipid_class is part of the dedup key, so we expect exactly one
emission per distinct class as samples accumulate."""
engine, captured = _make_engine()
for i in range(8):
pkt = IP(src=_ATTACKER_IP, dst=_DECKY_IP, ttl=64, id=1000 + i) / TCP(
sport=46000 + i, dport=22, flags="S", seq=10_000 + i,
window=29200,
options=[("MSS", 1460), ("SAckOK", b""), ("Timestamp", (0, 0)),
("NOP", None), ("WScale", 7)],
)
engine.on_packet(pkt)
fp_lines = [ln for ln in captured if _msgid(ln) == "tcp_syn_fingerprint"]
# First emission has only 1 sample → ipid_class=unknown.
# Once samples reach _MIN_SAMPLES (4) classification flips →
# second emission has ipid_class=incremental.
assert len(fp_lines) == 2
first = _fields_from_line(fp_lines[0])
last = _fields_from_line(fp_lines[1])
assert first["ipid_class"] == "unknown"
assert last["ipid_class"] == "incremental"
assert int(last["ipid_samples"]) >= 4
def test_decky_source_does_not_emit(self):
"""Packets originating from a decky (outbound reply) should NOT
be classified as an attacker fingerprint."""