feat(prober-cert): schema for active TLS cert capture

Adds storage for TLS certificate details collected from attacker-run
servers by the active prober (sibling to the existing JARM probe).

- AttackerIdentity.tls_cert_sha256 / Campaign.tls_cert_sha256:
  JSON list[str] columns mirroring ja3_hashes / hassh_hashes for
  federation gossip.
- ingester clause 9b: emits a 'tls_certificate' fingerprint bounty
  when a prober event carries subject_cn (disjoint from the existing
  sniffer-gated clause).
- Prober-side capture (ssl.wrap_socket follow-up after JARM) and
  profiler rollup land in sibling commits.
This commit is contained in:
2026-04-28 11:09:25 -04:00
parent e986e81421
commit 4749c972e5
4 changed files with 148 additions and 0 deletions

View File

@@ -241,3 +241,116 @@ async def test_tcpfp_bounty_not_extracted_from_other_services():
for call in repo.add_bounty.call_args_list:
payload = call[0][0].get("payload", {})
assert payload.get("fingerprint_type") != "tcpfp"
# ─── TLS certificate bounty extraction (active prober) ─────────────────────
@pytest.mark.asyncio
async def test_tls_certificate_bounty_extracted_from_prober():
"""Prober event with subject_cn should create a tls_certificate bounty
against the probe target IP (not the prober's own attacker_ip)."""
repo = _make_repo()
log_data = {
"decky": "decnet-prober",
"service": "prober",
"event_type": "jarm_fingerprint",
"attacker_ip": "Unknown",
"fields": {
"target_ip": "10.0.0.1",
"target_port": "443",
"jarm_hash": "c0cc0cc0cc0cc0cc0cc0cc0cc0cc0cabcdef1234567890abcdef1234567890ab",
"subject_cn": "evil.example.com",
"issuer": "CN=evil.example.com",
"self_signed": True,
"not_before": "2026-01-01T00:00:00Z",
"not_after": "2027-01-01T00:00:00Z",
"sans": ["evil.example.com", "c2.example.com"],
"cert_sha256": "ab" * 32,
},
"msg": "JARM+cert 10.0.0.1:443",
}
await _extract_bounty(repo, log_data)
cert_calls = [
c for c in repo.add_bounty.call_args_list
if c[0][0].get("payload", {}).get("fingerprint_type") == "tls_certificate"
]
assert len(cert_calls) == 1
bounty = cert_calls[0][0][0]
assert bounty["service"] == "prober"
assert bounty["attacker_ip"] == "10.0.0.1"
payload = bounty["payload"]
assert payload["subject_cn"] == "evil.example.com"
assert payload["issuer"] == "CN=evil.example.com"
assert payload["self_signed"] is True
assert payload["not_before"] == "2026-01-01T00:00:00Z"
assert payload["not_after"] == "2027-01-01T00:00:00Z"
assert payload["sans"] == ["evil.example.com", "c2.example.com"]
assert payload["cert_sha256"] == "ab" * 32
assert payload["target_ip"] == "10.0.0.1"
assert payload["target_port"] == "443"
@pytest.mark.asyncio
async def test_tls_certificate_bounty_not_extracted_without_subject_cn():
"""Prober event without subject_cn should not produce a tls_certificate
bounty (e.g. JARM-only run on a non-TLS port or handshake failure)."""
repo = _make_repo()
log_data = {
"decky": "decnet-prober",
"service": "prober",
"event_type": "jarm_fingerprint",
"attacker_ip": "Unknown",
"fields": {
"target_ip": "10.0.0.1",
"target_port": "443",
"jarm_hash": "c" * 62,
},
"msg": "JARM only",
}
await _extract_bounty(repo, log_data)
for call in repo.add_bounty.call_args_list:
payload = call[0][0].get("payload", {})
assert payload.get("fingerprint_type") != "tls_certificate"
@pytest.mark.asyncio
async def test_tls_certificate_prober_clause_disjoint_from_sniffer():
"""The prober clause must not steal sniffer-side cert events: a sniffer
log carrying subject_cn must still be attributed to the sniffer
(attacker_ip from the top-level field, not target_ip)."""
repo = _make_repo()
log_data = {
"decky": "decky-01",
"service": "sniffer",
"event_type": "tls_certificate",
"attacker_ip": "192.168.1.50",
"fields": {
"subject_cn": "real-attacker-cert.example",
"issuer": "Self",
"self_signed": True,
"not_before": "2026-01-01T00:00:00Z",
"not_after": "2027-01-01T00:00:00Z",
"sans": [],
"sni": "victim.local",
},
"msg": "",
}
await _extract_bounty(repo, log_data)
cert_calls = [
c for c in repo.add_bounty.call_args_list
if c[0][0].get("payload", {}).get("fingerprint_type") == "tls_certificate"
]
# Exactly one — the sniffer clause, not duplicated by the prober clause.
assert len(cert_calls) == 1
bounty = cert_calls[0][0][0]
assert bounty["service"] == "sniffer"
assert bounty["attacker_ip"] == "192.168.1.50"
# Sniffer payload carries `sni`; prober payload does not.
assert bounty["payload"].get("sni") == "victim.local"
assert "cert_sha256" not in bounty["payload"]