diff --git a/decnet/profiler/behavioral.py b/decnet/profiler/behavioral.py index 1992bcc2..37b59c9f 100644 --- a/decnet/profiler/behavioral.py +++ b/decnet/profiler/behavioral.py @@ -89,10 +89,12 @@ def build_behavior_record(events: list[LogEvent]) -> dict[str, Any]: if all_tools: _span.set_attribute("tools", ",".join(all_tools)) + kex_list = rollup.get("kex_order_raw") or [] return { "os_guess": rollup["os_guess"], "hop_distance": rollup["hop_distance"], "tcp_fingerprint": json.dumps(rollup["tcp_fingerprint"]), + "kex_order_raw": json.dumps(kex_list) if kex_list else None, "retransmit_count": rollup["retransmit_count"], "behavior_class": behavior, "beacon_interval_s": beacon_interval_s, diff --git a/decnet/profiler/fingerprint.py b/decnet/profiler/fingerprint.py index 19adac33..155d70f4 100644 --- a/decnet/profiler/fingerprint.py +++ b/decnet/profiler/fingerprint.py @@ -19,6 +19,8 @@ _SNIFFER_SYN_EVENT: str = "tcp_syn_fingerprint" _SNIFFER_FLOW_EVENT: str = "tcp_flow_timing" # Prober-emitted active-probe result (SYN-ACK fingerprint of attacker machine). _PROBER_TCPFP_EVENT: str = "tcpfp_fingerprint" +# Prober-emitted HASSHServer fingerprint; carries the raw kex_algorithms string. +_PROBER_HASSH_EVENT: str = "hassh_fingerprint" # Canonical initial TTL for each coarse OS bucket. Used to derive hop # distance when only the observed TTL is available (prober path). @@ -71,6 +73,8 @@ def sniffer_rollup(events: list[LogEvent]) -> dict[str, Any]: hops: list[int] = [] tcp_fp: dict[str, Any] | None = None retransmits = 0 + kex_order_raw: list[str] = [] + _kex_seen: set[str] = set() for e in events: if e.event_type == _SNIFFER_SYN_EVENT: @@ -109,6 +113,15 @@ def sniffer_rollup(events: list[LogEvent]) -> dict[str, Any]: except (TypeError, ValueError): pass + elif e.event_type == _PROBER_HASSH_EVENT: + # Prober HASSHServer probe: preserve the raw kex_algorithms list + # for post-hoc ordering analysis. Dedup because a single attacker + # SSH service will emit the same list per port/probe cycle. + kex = e.fields.get("kex_algorithms") + if kex and kex not in _kex_seen: + kex_order_raw.append(kex) + _kex_seen.add(kex) + elif e.event_type == _PROBER_TCPFP_EVENT: # Active-probe result: prober sent SYN to attacker, got SYN-ACK back. # Field names differ from the passive sniffer (different emitter). @@ -159,4 +172,5 @@ def sniffer_rollup(events: list[LogEvent]) -> dict[str, Any]: "hop_distance": hop_distance, "tcp_fingerprint": tcp_fp or {}, "retransmit_count": retransmits, + "kex_order_raw": kex_order_raw, } diff --git a/decnet/web/db/models.py b/decnet/web/db/models.py index 85684949..b8e372bb 100644 --- a/decnet/web/db/models.py +++ b/decnet/web/db/models.py @@ -175,6 +175,13 @@ class AttackerBehavior(SQLModel, table=True): default="{}", sa_column=Column("tcp_fingerprint", Text, nullable=False, default="{}"), ) # JSON: window, wscale, mss, options_sig + # Raw SSH KEX algorithm preference strings observed across HASSH probes + # (one entry per hassh_fingerprint event). Keeping the raw ordered list + # enables post-hoc KEX-order fingerprinting beyond the HASSH hash. + kex_order_raw: Optional[str] = Field( + default=None, + sa_column=Column("kex_order_raw", Text, nullable=True), + ) # JSON list[str] — kex_algorithms comma-separated strings retransmit_count: int = Field(default=0) # Behavioral (derived by the profiler from log-event timing) behavior_class: Optional[str] = None # beaconing | interactive | scanning | brute_force | slow_scan | mixed | unknown diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index 1efb544e..a66c5b45 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -673,6 +673,16 @@ class SQLModelRepository(BaseRepository): d["tool_guesses"] = [] elif raw is None: d["tool_guesses"] = [] + # Same list-or-None pattern for kex_order_raw. + raw_kex = d.get("kex_order_raw") + if isinstance(raw_kex, str): + try: + parsed_kex = json.loads(raw_kex) + d["kex_order_raw"] = parsed_kex if isinstance(parsed_kex, list) else [parsed_kex] + except (json.JSONDecodeError, TypeError): + d["kex_order_raw"] = [] + elif raw_kex is None: + d["kex_order_raw"] = [] return d @staticmethod diff --git a/tests/test_profiler_behavioral.py b/tests/test_profiler_behavioral.py index 5599bd69..08f3cc87 100644 --- a/tests/test_profiler_behavioral.py +++ b/tests/test_profiler_behavioral.py @@ -462,6 +462,30 @@ class TestSnifferRollup: assert fp["has_timestamps"] is True assert fp["options_sig"] == "M,N,W,N,N,T,S,E" + def test_hassh_kex_order_raw_collected(self): + # Prober hassh_fingerprint events contribute their raw kex_algorithms + # list (one entry per distinct string, deduplicated). + kex_a = "curve25519-sha256,ecdh-sha2-nistp256,diffie-hellman-group14-sha1" + kex_b = "curve25519-sha256@libssh.org,diffie-hellman-group-exchange-sha256" + events = [ + _mk(0, event_type="hassh_fingerprint", + fields={"kex_algorithms": kex_a, "hassh_server_hash": "x"}), + _mk(5, event_type="hassh_fingerprint", + fields={"kex_algorithms": kex_a, "hassh_server_hash": "x"}), # dup + _mk(10, event_type="hassh_fingerprint", + fields={"kex_algorithms": kex_b, "hassh_server_hash": "y"}), + ] + r = sniffer_rollup(events) + assert r["kex_order_raw"] == [kex_a, kex_b] + + def test_kex_order_raw_empty_when_no_hassh(self): + events = [ + _mk(0, event_type="tcp_syn_fingerprint", + fields={"os_guess": "linux", "hop_distance": "3"}), + ] + r = sniffer_rollup(events) + assert r["kex_order_raw"] == [] + # ─── build_behavior_record (composite) ────────────────────────────────────── @@ -527,6 +551,20 @@ class TestBuildBehaviorRecord: r = build_behavior_record(events) assert json.loads(r["tool_guesses"]) == [] + def test_kex_order_raw_persisted_as_json(self): + kex = "curve25519-sha256,ecdh-sha2-nistp256" + events = [ + _mk(0, event_type="hassh_fingerprint", + fields={"kex_algorithms": kex, "hassh_server_hash": "abc"}), + ] + r = build_behavior_record(events) + assert isinstance(r["kex_order_raw"], str) + assert json.loads(r["kex_order_raw"]) == [kex] + + def test_kex_order_raw_null_when_no_hassh(self): + r = build_behavior_record(_regular_beacon(count=5, interval_s=60.0)) + assert r["kex_order_raw"] is None + def test_nmap_promoted_from_tcp_fingerprint(self): # p0f identifies nmap from TCP handshake → must appear in tool_guesses # even when no HTTP request events are present.