feat(sniffer): capture SSH client banner from TCP stream

Parse RFC 4253 §4.2 identification strings from the first attacker→decky
data segment on TCP/22; emit ssh_client_banner syslog events and bus
fan-out. Profiler's sniffer_rollup dedupes observed banners into a new
AttackerBehavior.ssh_client_banners JSON column.

Closes gap #3 from SIGNAL_CAPTURE_AUDIT.md.
This commit is contained in:
2026-04-22 21:37:01 -04:00
parent 8181f39ae2
commit d3321324eb
7 changed files with 148 additions and 0 deletions

View File

@@ -486,6 +486,30 @@ class TestSnifferRollup:
r = sniffer_rollup(events)
assert r["kex_order_raw"] == []
def test_ssh_client_banners_collected(self):
# Sniffer ssh_client_banner events accumulate the attacker's observed
# SSH identification strings, deduplicated in observation order.
ban_a = "SSH-2.0-OpenSSH_9.2p1 Debian-2"
ban_b = "SSH-2.0-libssh2_1.10.0"
events = [
_mk(0, event_type="ssh_client_banner",
fields={"ssh_version": ban_a}),
_mk(1, event_type="ssh_client_banner",
fields={"ssh_version": ban_a}), # dup
_mk(2, event_type="ssh_client_banner",
fields={"ssh_version": ban_b}),
]
r = sniffer_rollup(events)
assert r["ssh_client_banners"] == [ban_a, ban_b]
def test_ssh_client_banners_empty_when_none(self):
events = [
_mk(0, event_type="tcp_syn_fingerprint",
fields={"os_guess": "linux"}),
]
r = sniffer_rollup(events)
assert r["ssh_client_banners"] == []
# ─── build_behavior_record (composite) ──────────────────────────────────────
@@ -565,6 +589,20 @@ class TestBuildBehaviorRecord:
r = build_behavior_record(_regular_beacon(count=5, interval_s=60.0))
assert r["kex_order_raw"] is None
def test_ssh_client_banners_persisted_as_json(self):
banner = "SSH-2.0-OpenSSH_9.2p1"
events = [
_mk(0, event_type="ssh_client_banner",
fields={"ssh_version": banner}),
]
r = build_behavior_record(events)
assert isinstance(r["ssh_client_banners"], str)
assert json.loads(r["ssh_client_banners"]) == [banner]
def test_ssh_client_banners_null_when_none(self):
r = build_behavior_record(_regular_beacon(count=5, interval_s=60.0))
assert r["ssh_client_banners"] is None
def test_nmap_promoted_from_tcp_fingerprint(self):
# p0f identifies nmap from TCP handshake → must appear in tool_guesses
# even when no HTTP request events are present.