feat(profiler): persist raw SSH KEX algorithm ordering

Prober already emits kex_algorithms in hassh_fingerprint syslog events, but
the raw ordered list was only queryable via the generic bounty store. Add a
dedicated AttackerBehavior.kex_order_raw column (TEXT, JSON list) so
post-v1 KEX-order fingerprinting has a typed, indexable home.

Pipeline:
  - sniffer_rollup() now consumes hassh_fingerprint events and collects
    distinct kex_algorithms strings across ports.
  - build_behavior_record() JSON-encodes the list (NULL when empty).
  - sqlmodel_repo._deserialize_behavior() parses it back into a list.

Closes pre-v1 gap #1 from SIGNAL_CAPTURE_AUDIT.md.
This commit is contained in:
2026-04-22 21:29:46 -04:00
parent 25838eb9f3
commit 8181f39ae2
5 changed files with 71 additions and 0 deletions

View File

@@ -462,6 +462,30 @@ class TestSnifferRollup:
assert fp["has_timestamps"] is True
assert fp["options_sig"] == "M,N,W,N,N,T,S,E"
def test_hassh_kex_order_raw_collected(self):
# Prober hassh_fingerprint events contribute their raw kex_algorithms
# list (one entry per distinct string, deduplicated).
kex_a = "curve25519-sha256,ecdh-sha2-nistp256,diffie-hellman-group14-sha1"
kex_b = "curve25519-sha256@libssh.org,diffie-hellman-group-exchange-sha256"
events = [
_mk(0, event_type="hassh_fingerprint",
fields={"kex_algorithms": kex_a, "hassh_server_hash": "x"}),
_mk(5, event_type="hassh_fingerprint",
fields={"kex_algorithms": kex_a, "hassh_server_hash": "x"}), # dup
_mk(10, event_type="hassh_fingerprint",
fields={"kex_algorithms": kex_b, "hassh_server_hash": "y"}),
]
r = sniffer_rollup(events)
assert r["kex_order_raw"] == [kex_a, kex_b]
def test_kex_order_raw_empty_when_no_hassh(self):
events = [
_mk(0, event_type="tcp_syn_fingerprint",
fields={"os_guess": "linux", "hop_distance": "3"}),
]
r = sniffer_rollup(events)
assert r["kex_order_raw"] == []
# ─── build_behavior_record (composite) ──────────────────────────────────────
@@ -527,6 +551,20 @@ class TestBuildBehaviorRecord:
r = build_behavior_record(events)
assert json.loads(r["tool_guesses"]) == []
def test_kex_order_raw_persisted_as_json(self):
kex = "curve25519-sha256,ecdh-sha2-nistp256"
events = [
_mk(0, event_type="hassh_fingerprint",
fields={"kex_algorithms": kex, "hassh_server_hash": "abc"}),
]
r = build_behavior_record(events)
assert isinstance(r["kex_order_raw"], str)
assert json.loads(r["kex_order_raw"]) == [kex]
def test_kex_order_raw_null_when_no_hassh(self):
r = build_behavior_record(_regular_beacon(count=5, interval_s=60.0))
assert r["kex_order_raw"] is None
def test_nmap_promoted_from_tcp_fingerprint(self):
# p0f identifies nmap from TCP handshake → must appear in tool_guesses
# even when no HTTP request events are present.