feat(sniffer): ISN sequence classifier (reuses seq_class helper)

Mirrors the IP-ID classifier for TCP ISN values: per-source-IP rolling
deque (maxlen=8) populated from each inbound SYN's tcp.seq, classified
on every emission. A 'random' verdict is the modern norm; 'incremental',
'zero', or 'constant' indicates legacy stacks or hand-rolled raw-socket
tooling — a strong fingerprint signal.

Active prober now also captures server_isn (single sample, not classified
in-flight; downstream consumers correlating multi-probe results can apply
seq_class.classify_sequence themselves).

Profiler rollup carries the latest non-'unknown' label into
attacker.tcp_fingerprint. Dedup key already covers isn_class from
the previous commit, so transitions emit cleanly.

UI surfaces ISN class as a colour-coded tag with a ⚠ glyph for
non-random verdicts, since they're the genuinely interesting case.
This commit is contained in:
2026-04-26 20:30:24 -04:00
parent 0e40cc8ae1
commit c595d039bd
7 changed files with 67 additions and 0 deletions

View File

@@ -139,6 +139,10 @@ def _parse_synack(resp: Any) -> dict[str, Any]:
# TCP fields
window_size = tcp_layer.window
# Server ISN: single sample from one probe — not classified here, but
# exported so a downstream consumer correlating multiple probes against
# the same target can apply seq_class.classify_sequence().
server_isn = int(getattr(tcp_layer, "seq", 0))
# Parse TCP options
mss = 0
@@ -165,6 +169,7 @@ def _parse_synack(resp: Any) -> dict[str, Any]:
"tos": tos,
"dscp": dscp,
"ecn": ecn,
"server_isn": server_isn,
"mss": mss,
"window_scale": window_scale,
"sack_ok": sack_ok,

View File

@@ -415,6 +415,7 @@ def _tcpfp_phase(
tos=str(result["tos"]),
dscp=str(result["dscp"]),
ecn=str(result["ecn"]),
server_isn=str(result["server_isn"]),
msg=f"TCPFP {ip}:{port} = {result['tcpfp_hash']}",
)
logger.info("prober: TCPFP %s:%d = %s", ip, port, result["tcpfp_hash"])

View File

@@ -144,6 +144,7 @@ def sniffer_rollup(events: list[LogEvent]) -> dict[str, Any]:
hops: list[int] = []
tcp_fp: dict[str, Any] | None = None
ipid_latest: str | None = None
isn_latest: str | None = None
# Tracks which event set tcp_fp last — picks the provider "context"
# (syn vs synack) when we feed the p0f-v2 matcher below.
tcp_fp_context: str = "syn"
@@ -193,6 +194,10 @@ def sniffer_rollup(events: list[LogEvent]) -> dict[str, Any]:
if ipid_class and ipid_class != "unknown":
ipid_latest = ipid_class
tcp_fp["ipid_class"] = ipid_latest
isn_class = e.fields.get("isn_class")
if isn_class and isn_class != "unknown":
isn_latest = isn_class
tcp_fp["isn_class"] = isn_latest
tcp_fp_context = "syn"
elif e.event_type == _SNIFFER_FLOW_EVENT:

View File

@@ -752,6 +752,7 @@ class SnifferEngine:
# we can label them random/incremental/zero/constant.
self._SEQ_SAMPLE_SIZE = 8
self._ipid_samples: dict[str, deque[int]] = {}
self._isn_samples: dict[str, deque[int]] = {}
# Per-flow timing aggregator. Key: (src_ip, src_port, dst_ip, dst_port).
# Flow direction is client→decky; reverse packets are associated back
@@ -1052,6 +1053,12 @@ class SnifferEngine:
)
ipid_buf.append(int(ip.id))
ipid_class = classify_sequence(list(ipid_buf))
isn_buf = self._isn_samples.setdefault(
src_ip, deque(maxlen=self._SEQ_SAMPLE_SIZE)
)
isn_buf.append(int(tcp.seq))
isn_class = classify_sequence(list(isn_buf))
os_label = guess_os(
ttl=ip.ttl,
window=int(tcp.window),
@@ -1082,6 +1089,8 @@ class SnifferEngine:
ecn=str(int(getattr(ip, "tos", 0)) & 0x3),
ipid_class=ipid_class,
ipid_samples=str(len(ipid_buf)),
isn_class=isn_class,
isn_samples=str(len(isn_buf)),
os_guess=os_label,
)

View File

@@ -23,6 +23,7 @@ interface AttackerBehavior {
dscp?: number | null;
ecn?: number | null;
ipid_class?: string | null;
isn_class?: string | null;
} | null;
retransmit_count: number;
behavior_class: string | null;
@@ -771,6 +772,12 @@ const TcpStackBlock: React.FC<{ b: AttackerBehavior }> = ({ b }) => {
{fp.ipid_class && fp.ipid_class !== 'unknown' && (
<Tag color={seqClassColor(fp.ipid_class)}>IPID:{fp.ipid_class.toUpperCase()}</Tag>
)}
{fp.isn_class && fp.isn_class !== 'unknown' && (
<Tag color={seqClassColor(fp.isn_class)}>
{fp.isn_class !== 'random' && '⚠ '}
ISN:{fp.isn_class.toUpperCase()}
</Tag>
)}
</div>
{fp.options_sig && (
<div>

View File

@@ -32,6 +32,7 @@ def _make_synack(
tcp_flags: int = 0x12, # SYN-ACK
options: list | None = None,
ack: int = 1,
seq: int = 0,
) -> SimpleNamespace:
"""Build a fake scapy-like SYN-ACK packet for testing."""
if options is None:
@@ -52,6 +53,7 @@ def _make_synack(
options=options,
dport=12345,
ack=ack,
seq=seq,
)
ip_layer = SimpleNamespace(
ttl=ttl,
@@ -198,6 +200,11 @@ class TestParseSynack:
assert result["dscp"] == 10
assert result["ecn"] == 2
def test_server_isn_captured(self):
resp = _make_synack(seq=0xDEADBEEF)
result = _parse_synack(resp)
assert result["server_isn"] == 0xDEADBEEF
def test_tos_ce_marked(self):
# ECN CE bit set, no DSCP marking → ToS = 0x03
resp = _make_synack(tos=0x03)

View File

@@ -188,6 +188,39 @@ class TestSynFingerprintEmission:
assert last["ipid_class"] == "incremental"
assert int(last["ipid_samples"]) >= 4
def test_isn_classified_random_with_high_variance_seqs(self):
"""SYNs with widely varying ISNs should classify as random."""
engine, captured = _make_engine()
# Spread ISNs across the 32-bit space; randomised initial sequence.
seqs = [0x12345678, 0xABCDEF01, 0x0F0F0F0F, 0xDEADBEEF,
0x11223344, 0x99887766, 0x44556677, 0xCAFEBABE]
for i, seq in enumerate(seqs):
pkt = IP(src=_ATTACKER_IP, dst=_DECKY_IP, ttl=64, id=2000 + i) / TCP(
sport=47000 + i, dport=22, flags="S", seq=seq, window=29200,
options=[("MSS", 1460), ("SAckOK", b""), ("Timestamp", (0, 0)),
("NOP", None), ("WScale", 7)],
)
engine.on_packet(pkt)
fp_lines = [ln for ln in captured if _msgid(ln) == "tcp_syn_fingerprint"]
last = _fields_from_line(fp_lines[-1])
assert last["isn_class"] == "random"
assert int(last["isn_samples"]) >= 4
def test_isn_classified_incremental_with_monotonic_seqs(self):
"""SYNs whose ISNs march upward in small steps should classify
as incremental — a strong fingerprint signal vs. modern stacks."""
engine, captured = _make_engine()
for i in range(8):
pkt = IP(src=_ATTACKER_IP, dst=_DECKY_IP, ttl=64, id=3000 + i) / TCP(
sport=48000 + i, dport=22, flags="S", seq=10_000 + i, window=29200,
options=[("MSS", 1460), ("SAckOK", b""), ("Timestamp", (0, 0)),
("NOP", None), ("WScale", 7)],
)
engine.on_packet(pkt)
fp_lines = [ln for ln in captured if _msgid(ln) == "tcp_syn_fingerprint"]
last = _fields_from_line(fp_lines[-1])
assert last["isn_class"] == "incremental"
def test_decky_source_does_not_emit(self):
"""Packets originating from a decky (outbound reply) should NOT
be classified as an attacker fingerprint."""