feat(fingerprint): ToS/DSCP/ECN extraction in active + passive TCP fingerprint
Active prober now reads ip.tos from the SYN-ACK and emits tos/dscp/ecn alongside the existing TTL/window/options fields. dscp is folded into the fingerprint hash so different DSCP markings produce distinct signatures. Passive sniffer logs the same three fields on tcp_syn_fingerprint events; profiler rollup carries them into the attacker tcp_fingerprint snapshot; AttackerDetail's TCP STACK panel now surfaces DSCP and ECN cells.
This commit is contained in:
@@ -27,6 +27,7 @@ def _make_synack(
|
||||
ttl: int = 64,
|
||||
flags: int = 0x02, # IP flags (DF = 0x02)
|
||||
ip_id: int = 0,
|
||||
tos: int = 0,
|
||||
window: int = 65535,
|
||||
tcp_flags: int = 0x12, # SYN-ACK
|
||||
options: list | None = None,
|
||||
@@ -56,6 +57,7 @@ def _make_synack(
|
||||
ttl=ttl,
|
||||
flags=flags,
|
||||
id=ip_id,
|
||||
tos=tos,
|
||||
)
|
||||
|
||||
class FakePacket:
|
||||
@@ -181,6 +183,28 @@ class TestParseSynack:
|
||||
result = _parse_synack(resp)
|
||||
assert result["ip_id"] == 12345
|
||||
|
||||
def test_tos_default_zero(self):
|
||||
resp = _make_synack()
|
||||
result = _parse_synack(resp)
|
||||
assert result["tos"] == 0
|
||||
assert result["dscp"] == 0
|
||||
assert result["ecn"] == 0
|
||||
|
||||
def test_tos_dscp_af11_ecn_ect0(self):
|
||||
# AF11 = DSCP 10 (0b001010); ECT(0) = 0b10 → ToS byte 0b00101010 = 0x2A
|
||||
resp = _make_synack(tos=0x2A)
|
||||
result = _parse_synack(resp)
|
||||
assert result["tos"] == 0x2A
|
||||
assert result["dscp"] == 10
|
||||
assert result["ecn"] == 2
|
||||
|
||||
def test_tos_ce_marked(self):
|
||||
# ECN CE bit set, no DSCP marking → ToS = 0x03
|
||||
resp = _make_synack(tos=0x03)
|
||||
result = _parse_synack(resp)
|
||||
assert result["dscp"] == 0
|
||||
assert result["ecn"] == 3
|
||||
|
||||
def test_empty_options(self):
|
||||
resp = _make_synack(options=[])
|
||||
result = _parse_synack(resp)
|
||||
@@ -254,9 +278,22 @@ class TestComputeFingerprint:
|
||||
"ttl": 64, "window_size": 65535, "df_bit": 1,
|
||||
"mss": 1460, "window_scale": 7, "sack_ok": 1,
|
||||
"timestamp": 1, "options_order": "M,N,W",
|
||||
"dscp": 0, "ecn": 0,
|
||||
}
|
||||
raw, _ = _compute_fingerprint(fields)
|
||||
assert raw == "64:65535:1:1460:7:1:1:M,N,W"
|
||||
assert raw == "64:65535:1:1460:7:1:1:M,N,W:0:0"
|
||||
|
||||
def test_dscp_changes_hash(self):
|
||||
base = {
|
||||
"ttl": 64, "window_size": 65535, "df_bit": 1,
|
||||
"mss": 1460, "window_scale": 7, "sack_ok": 1,
|
||||
"timestamp": 1, "options_order": "M,N,W",
|
||||
"dscp": 0, "ecn": 0,
|
||||
}
|
||||
marked = dict(base, dscp=46) # EF
|
||||
_, h_base = _compute_fingerprint(base)
|
||||
_, h_marked = _compute_fingerprint(marked)
|
||||
assert h_base != h_marked
|
||||
|
||||
def test_sha256_correctness(self):
|
||||
fields = {
|
||||
|
||||
@@ -147,6 +147,22 @@ class TestSynFingerprintEmission:
|
||||
fp_lines = [ln for ln in captured if _msgid(ln) == "tcp_syn_fingerprint"]
|
||||
assert len(fp_lines) == 2
|
||||
|
||||
def test_tos_dscp_ecn_emitted(self):
|
||||
engine, captured = _make_engine()
|
||||
# ToS 0x2A → DSCP 10 (AF11), ECN 2 (ECT(0))
|
||||
pkt = IP(src=_ATTACKER_IP, dst=_DECKY_IP, ttl=64, tos=0x2A) / TCP(
|
||||
sport=45100, dport=22, flags="S", window=29200,
|
||||
options=[("MSS", 1460), ("SAckOK", b""), ("Timestamp", (0, 0)),
|
||||
("NOP", None), ("WScale", 7)],
|
||||
)
|
||||
engine.on_packet(pkt)
|
||||
fp_lines = [ln for ln in captured if _msgid(ln) == "tcp_syn_fingerprint"]
|
||||
assert len(fp_lines) == 1
|
||||
f = _fields_from_line(fp_lines[0])
|
||||
assert f["tos"] == "42"
|
||||
assert f["dscp"] == "10"
|
||||
assert f["ecn"] == "2"
|
||||
|
||||
def test_decky_source_does_not_emit(self):
|
||||
"""Packets originating from a decky (outbound reply) should NOT
|
||||
be classified as an attacker fingerprint."""
|
||||
|
||||
Reference in New Issue
Block a user