From 4749c972e5a64922148dca2989b5517b8e1a4708 Mon Sep 17 00:00:00 2001 From: anti Date: Tue, 28 Apr 2026 11:09:25 -0400 Subject: [PATCH] feat(prober-cert): schema for active TLS cert capture Adds storage for TLS certificate details collected from attacker-run servers by the active prober (sibling to the existing JARM probe). - AttackerIdentity.tls_cert_sha256 / Campaign.tls_cert_sha256: JSON list[str] columns mirroring ja3_hashes / hassh_hashes for federation gossip. - ingester clause 9b: emits a 'tls_certificate' fingerprint bounty when a prober event carries subject_cn (disjoint from the existing sniffer-gated clause). - Prober-side capture (ssl.wrap_socket follow-up after JARM) and profiler rollup land in sibling commits. --- decnet/web/db/models/attackers.py | 8 ++ decnet/web/db/models/campaigns.py | 3 + decnet/web/ingester.py | 24 ++++++ tests/prober/test_prober_bounty.py | 113 +++++++++++++++++++++++++++++ 4 files changed, 148 insertions(+) diff --git a/decnet/web/db/models/attackers.py b/decnet/web/db/models/attackers.py index 38f6124a..9ac3ed62 100644 --- a/decnet/web/db/models/attackers.py +++ b/decnet/web/db/models/attackers.py @@ -154,6 +154,14 @@ class AttackerIdentity(SQLModel, table=True): hassh_hashes: Optional[str] = Field( default=None, sa_column=Column("hassh_hashes", Text, nullable=True) ) + # JSON list[str] — SHA-256 fingerprints of leaf certs presented by + # attacker-run TLS servers, captured by the active prober alongside + # JARM. Same federation-gossip rationale as ja3_hashes/hassh_hashes: + # a self-signed cert reused across C2 nodes is an instant cluster-link + # signal, and TEXT keeps MySQL indexable via prefix length. + tls_cert_sha256: Optional[str] = Field( + default=None, sa_column=Column("tls_cert_sha256", Text, nullable=True) + ) # Payload SimHash list — 64-bit ints serialized as hex strings. # SimHashes are Hamming-comparable, which is the entire reason # they're a list (not a set). diff --git a/decnet/web/db/models/campaigns.py b/decnet/web/db/models/campaigns.py index 478588e1..b7afc482 100644 --- a/decnet/web/db/models/campaigns.py +++ b/decnet/web/db/models/campaigns.py @@ -53,6 +53,9 @@ class Campaign(SQLModel, table=True): hassh_hashes: Optional[str] = Field( default=None, sa_column=Column("hassh_hashes", Text, nullable=True) ) + tls_cert_sha256: Optional[str] = Field( + default=None, sa_column=Column("tls_cert_sha256", Text, nullable=True) + ) payload_simhashes: Optional[str] = Field( default=None, sa_column=Column("payload_simhashes", Text, nullable=True) ) diff --git a/decnet/web/ingester.py b/decnet/web/ingester.py index b4623d88..0e9d1207 100644 --- a/decnet/web/ingester.py +++ b/decnet/web/ingester.py @@ -499,6 +499,30 @@ async def _extract_bounty( }, }) + # 9b. TLS certificate from active prober (sibling of the JARM probe; + # captured by a follow-up ssl.wrap_socket() against attacker-run TLS + # servers). Disjoint from clause 8 above which is the sniffer path. + _prober_subject_cn = _fields.get("subject_cn") + if _prober_subject_cn and log_data.get("service") == "prober": + await repo.add_bounty({ + "decky": log_data.get("decky"), + "service": "prober", + "attacker_ip": _fields.get("target_ip", "Unknown"), + "bounty_type": "fingerprint", + "payload": { + "fingerprint_type": "tls_certificate", + "subject_cn": _prober_subject_cn, + "issuer": _fields.get("issuer"), + "self_signed": _fields.get("self_signed"), + "not_before": _fields.get("not_before"), + "not_after": _fields.get("not_after"), + "sans": _fields.get("sans"), + "cert_sha256": _fields.get("cert_sha256"), + "target_ip": _fields.get("target_ip"), + "target_port": _fields.get("target_port"), + }, + }) + # 10. HASSHServer fingerprint from active prober _hassh = _fields.get("hassh_server_hash") if _hassh and log_data.get("service") == "prober": diff --git a/tests/prober/test_prober_bounty.py b/tests/prober/test_prober_bounty.py index e09a46a2..e84141d5 100644 --- a/tests/prober/test_prober_bounty.py +++ b/tests/prober/test_prober_bounty.py @@ -241,3 +241,116 @@ async def test_tcpfp_bounty_not_extracted_from_other_services(): for call in repo.add_bounty.call_args_list: payload = call[0][0].get("payload", {}) assert payload.get("fingerprint_type") != "tcpfp" + + +# ─── TLS certificate bounty extraction (active prober) ───────────────────── + +@pytest.mark.asyncio +async def test_tls_certificate_bounty_extracted_from_prober(): + """Prober event with subject_cn should create a tls_certificate bounty + against the probe target IP (not the prober's own attacker_ip).""" + repo = _make_repo() + log_data = { + "decky": "decnet-prober", + "service": "prober", + "event_type": "jarm_fingerprint", + "attacker_ip": "Unknown", + "fields": { + "target_ip": "10.0.0.1", + "target_port": "443", + "jarm_hash": "c0cc0cc0cc0cc0cc0cc0cc0cc0cc0cabcdef1234567890abcdef1234567890ab", + "subject_cn": "evil.example.com", + "issuer": "CN=evil.example.com", + "self_signed": True, + "not_before": "2026-01-01T00:00:00Z", + "not_after": "2027-01-01T00:00:00Z", + "sans": ["evil.example.com", "c2.example.com"], + "cert_sha256": "ab" * 32, + }, + "msg": "JARM+cert 10.0.0.1:443", + } + + await _extract_bounty(repo, log_data) + + cert_calls = [ + c for c in repo.add_bounty.call_args_list + if c[0][0].get("payload", {}).get("fingerprint_type") == "tls_certificate" + ] + assert len(cert_calls) == 1 + bounty = cert_calls[0][0][0] + assert bounty["service"] == "prober" + assert bounty["attacker_ip"] == "10.0.0.1" + payload = bounty["payload"] + assert payload["subject_cn"] == "evil.example.com" + assert payload["issuer"] == "CN=evil.example.com" + assert payload["self_signed"] is True + assert payload["not_before"] == "2026-01-01T00:00:00Z" + assert payload["not_after"] == "2027-01-01T00:00:00Z" + assert payload["sans"] == ["evil.example.com", "c2.example.com"] + assert payload["cert_sha256"] == "ab" * 32 + assert payload["target_ip"] == "10.0.0.1" + assert payload["target_port"] == "443" + + +@pytest.mark.asyncio +async def test_tls_certificate_bounty_not_extracted_without_subject_cn(): + """Prober event without subject_cn should not produce a tls_certificate + bounty (e.g. JARM-only run on a non-TLS port or handshake failure).""" + repo = _make_repo() + log_data = { + "decky": "decnet-prober", + "service": "prober", + "event_type": "jarm_fingerprint", + "attacker_ip": "Unknown", + "fields": { + "target_ip": "10.0.0.1", + "target_port": "443", + "jarm_hash": "c" * 62, + }, + "msg": "JARM only", + } + + await _extract_bounty(repo, log_data) + + for call in repo.add_bounty.call_args_list: + payload = call[0][0].get("payload", {}) + assert payload.get("fingerprint_type") != "tls_certificate" + + +@pytest.mark.asyncio +async def test_tls_certificate_prober_clause_disjoint_from_sniffer(): + """The prober clause must not steal sniffer-side cert events: a sniffer + log carrying subject_cn must still be attributed to the sniffer + (attacker_ip from the top-level field, not target_ip).""" + repo = _make_repo() + log_data = { + "decky": "decky-01", + "service": "sniffer", + "event_type": "tls_certificate", + "attacker_ip": "192.168.1.50", + "fields": { + "subject_cn": "real-attacker-cert.example", + "issuer": "Self", + "self_signed": True, + "not_before": "2026-01-01T00:00:00Z", + "not_after": "2027-01-01T00:00:00Z", + "sans": [], + "sni": "victim.local", + }, + "msg": "", + } + + await _extract_bounty(repo, log_data) + + cert_calls = [ + c for c in repo.add_bounty.call_args_list + if c[0][0].get("payload", {}).get("fingerprint_type") == "tls_certificate" + ] + # Exactly one — the sniffer clause, not duplicated by the prober clause. + assert len(cert_calls) == 1 + bounty = cert_calls[0][0][0] + assert bounty["service"] == "sniffer" + assert bounty["attacker_ip"] == "192.168.1.50" + # Sniffer payload carries `sni`; prober payload does not. + assert bounty["payload"].get("sni") == "victim.local" + assert "cert_sha256" not in bounty["payload"]