From 5f8149daeea52e70dcd779b63219ddfb262b9f4e Mon Sep 17 00:00:00 2001 From: anti Date: Tue, 28 Apr 2026 11:14:44 -0400 Subject: [PATCH] feat(prober-cert): capture leaf TLS cert after successful JARM MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit JARM probes are crafted ClientHellos with weird ciphers — they never complete a real handshake, so the peer cert isn't reachable from those sockets. After a non-empty JARM hash proves the port speaks TLS, do a separate ssl.wrap_socket() against the same (ip, port) to fetch and parse the leaf cert. - decnet/prober/tlscert.py: fetch + parse via cryptography lib; swallows all connect/handshake/parse failures (returns None). - decnet/prober/worker.py::_capture_tls_cert: emits a tls_certificate event with subject_cn / issuer / SANs / validity / SHA-256 + publishes on the bus. Wired from _jarm_phase only when JARM succeeds, so non-TLS ports never trigger a second connect. - Tests cover happy path, cert-fetch failure, defense-in-depth crash, empty-JARM skip, publish_fn, and parser edge cases (garbage DER, empty bytes, missing SAN extension, non-self-signed). --- decnet/prober/tlscert.py | 131 ++++++++++++++++++++ decnet/prober/worker.py | 59 +++++++++ tests/prober/test_prober_tlscert.py | 165 +++++++++++++++++++++++++ tests/prober/test_prober_worker.py | 181 +++++++++++++++++++++++++++- 4 files changed, 534 insertions(+), 2 deletions(-) create mode 100644 decnet/prober/tlscert.py create mode 100644 tests/prober/test_prober_tlscert.py diff --git a/decnet/prober/tlscert.py b/decnet/prober/tlscert.py new file mode 100644 index 00000000..ba8f1185 --- /dev/null +++ b/decnet/prober/tlscert.py @@ -0,0 +1,131 @@ +""" +TLS leaf-certificate capture from attacker-run servers. + +Companion to ``decnet.prober.jarm``: JARM probes are crafted ClientHellos +that never complete a real handshake (raw byte parsing only), so the +peer certificate is never available from those sockets. This module does +a separate :func:`ssl.wrap_socket` against the same ``(host, port)`` +solely to fetch and parse the leaf cert. + +The cert is intentionally NOT verified — attacker-presented certs are +inherently untrusted, and rejecting self-signed ones would defeat the +whole point of the capture (most C2 infra runs self-signed certs). +""" + +from __future__ import annotations + +import hashlib +import socket +import ssl +from typing import Any + +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.x509.oid import NameOID + +from decnet.telemetry import traced as _traced + + +def _cn_or_empty(name: x509.Name) -> str: + """Return the first CN attribute as a plain string, or ``""``.""" + attrs = name.get_attributes_for_oid(NameOID.COMMON_NAME) + if not attrs: + return "" + return str(attrs[0].value) + + +def _iso_utc(dt: Any) -> str: + """Cert validity timestamps as ``YYYY-MM-DDTHH:MM:SSZ``. + + ``cryptography`` exposes ``not_valid_before`` (deprecated, naive UTC) + and ``not_valid_before_utc`` (timezone-aware) — prefer the latter + when available so we always emit explicit-Z ISO strings. + """ + return dt.strftime("%Y-%m-%dT%H:%M:%SZ") + + +def _extract_sans(cert: x509.Certificate) -> list[str]: + """All DNS / IP SANs as a flat list of strings; empty when absent.""" + try: + ext = cert.extensions.get_extension_for_class( + x509.SubjectAlternativeName + ) + except x509.ExtensionNotFound: + return [] + sans: list[str] = [] + san: x509.SubjectAlternativeName = ext.value + sans.extend(str(v) for v in san.get_values_for_type(x509.DNSName)) + sans.extend(str(v) for v in san.get_values_for_type(x509.IPAddress)) + return sans + + +@_traced("prober.tls_cert_parse") +def parse_leaf_cert(der: bytes) -> dict[str, Any] | None: + """Parse a DER-encoded leaf cert into the prober's flat field shape. + + Returns ``None`` if parsing fails for any reason — the caller treats + that the same as a connect failure. + """ + try: + cert = x509.load_der_x509_certificate(der, default_backend()) + except Exception: + return None + + try: + subject_cn = _cn_or_empty(cert.subject) + issuer = cert.issuer.rfc4514_string() + issuer_cn = _cn_or_empty(cert.issuer) + try: + nb = cert.not_valid_before_utc + na = cert.not_valid_after_utc + except AttributeError: # cryptography < 42 + nb = cert.not_valid_before + na = cert.not_valid_after + not_before = _iso_utc(nb) + not_after = _iso_utc(na) + self_signed = bool(subject_cn) and subject_cn == issuer_cn + sans = _extract_sans(cert) + cert_sha256 = hashlib.sha256(der).hexdigest() + except Exception: + return None + + return { + "subject_cn": subject_cn, + "issuer": issuer, + "self_signed": self_signed, + "not_before": not_before, + "not_after": not_after, + "sans": sans, + "cert_sha256": cert_sha256, + } + + +@_traced("prober.tls_cert_fetch") +def fetch_leaf_cert( + host: str, port: int, timeout: float = 5.0 +) -> dict[str, Any] | None: + """Open a TLS connection and return the parsed leaf cert. + + Returns ``None`` on any connect / handshake / parse failure. Never + raises — failures must collapse silently so the prober's outer loop + can keep moving through targets. + """ + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + # Some attacker C2 servers gate on weak ciphers; don't constrain. + ctx.set_ciphers("ALL:@SECLEVEL=0") + + try: + with socket.create_connection((host, port), timeout=timeout) as raw: + raw.settimeout(timeout) + with ctx.wrap_socket(raw, server_hostname=None) as tls: + der = tls.getpeercert(binary_form=True) + except (OSError, ssl.SSLError, socket.timeout): + return None + except Exception: + return None + + if not der: + return None + return parse_leaf_cert(der) diff --git a/decnet/prober/worker.py b/decnet/prober/worker.py index 6c37608f..52da4c17 100644 --- a/decnet/prober/worker.py +++ b/decnet/prober/worker.py @@ -39,6 +39,7 @@ from decnet.logging import get_logger from decnet.prober.hassh import hassh_server from decnet.prober.jarm import JARM_EMPTY_HASH, jarm_hash from decnet.prober.tcpfp import tcp_fingerprint +from decnet.prober.tlscert import fetch_leaf_cert from decnet.telemetry import traced as _traced logger = get_logger("prober") @@ -305,6 +306,10 @@ def _jarm_phase( "jarm", {"attacker_ip": ip, "port": port, "jarm_hash": h}, ) + # Cert capture: a non-empty JARM hash proves the port speaks + # TLS, so a follow-up real handshake is worth attempting. + # Failures are silent — the next probe target must not stall. + _capture_tls_cert(ip, port, log_path, json_path, timeout, publish_fn) except Exception as exc: done.add(port) _write_event( @@ -319,6 +324,60 @@ def _jarm_phase( logger.warning("prober: JARM probe failed %s:%d: %s", ip, port, exc) +@_traced("prober.tls_cert_capture") +def _capture_tls_cert( + ip: str, + port: int, + log_path: Path, + json_path: Path, + timeout: float, + publish_fn: ProbePublishFn | None, +) -> None: + """Fetch the leaf TLS cert from ``ip:port`` and emit a tls_certificate + event. No-op when the handshake fails (silent — JARM already proved + the port responds, but the real handshake can still fail for many + reasons: cipher mismatch, SNI gating, mTLS requirement).""" + try: + cert = fetch_leaf_cert(ip, port, timeout=timeout) + except Exception as exc: + # fetch_leaf_cert is supposed to swallow errors; defense in depth. + logger.warning("prober: TLS cert fetch crashed %s:%d: %s", ip, port, exc) + return + if cert is None: + return + + sans_csv = ",".join(cert["sans"]) + _write_event( + log_path, json_path, + "tls_certificate", + target_ip=ip, + target_port=str(port), + subject_cn=cert["subject_cn"], + issuer=cert["issuer"], + self_signed=str(cert["self_signed"]).lower(), + not_before=cert["not_before"], + not_after=cert["not_after"], + sans=sans_csv, + cert_sha256=cert["cert_sha256"], + msg=f"TLS cert {ip}:{port} CN={cert['subject_cn']} sha256={cert['cert_sha256'][:16]}...", + ) + logger.info( + "prober: TLS cert %s:%d CN=%s sha256=%s", + ip, port, cert["subject_cn"], cert["cert_sha256"], + ) + if publish_fn is not None: + publish_fn( + "tls_certificate", + { + "attacker_ip": ip, + "port": port, + "subject_cn": cert["subject_cn"], + "cert_sha256": cert["cert_sha256"], + "self_signed": cert["self_signed"], + }, + ) + + @_traced("prober.hassh_phase") def _hassh_phase( ip: str, diff --git a/tests/prober/test_prober_tlscert.py b/tests/prober/test_prober_tlscert.py new file mode 100644 index 00000000..15e0776b --- /dev/null +++ b/tests/prober/test_prober_tlscert.py @@ -0,0 +1,165 @@ +"""Unit tests for ``decnet.prober.tlscert``. + +DER fixtures are synthesized at runtime via ``cryptography`` so we don't +ship a binary blob; failure modes (truncated DER, missing extensions) +are exercised against those fixtures. +""" + +from __future__ import annotations + +import datetime as dt +import hashlib +import socket +import ssl +from unittest.mock import MagicMock, patch + +import pytest +from cryptography import x509 +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.x509.oid import NameOID + +from decnet.prober.tlscert import fetch_leaf_cert, parse_leaf_cert + + +def _build_self_signed_der( + cn: str = "evil.example.com", + sans: list[str] | None = None, + issuer_cn: str | None = None, +) -> bytes: + """Build a fresh self-signed DER cert. ``issuer_cn`` defaults to ``cn`` + (true self-signed); pass a different value to simulate a CA-issued cert.""" + key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + subject = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)]) + issuer = x509.Name([ + x509.NameAttribute(NameOID.COMMON_NAME, issuer_cn or cn), + ]) + builder = ( + x509.CertificateBuilder() + .subject_name(subject) + .issuer_name(issuer) + .public_key(key.public_key()) + .serial_number(1) + .not_valid_before(dt.datetime(2026, 1, 1)) + .not_valid_after(dt.datetime(2027, 1, 1)) + ) + if sans: + builder = builder.add_extension( + x509.SubjectAlternativeName([x509.DNSName(s) for s in sans]), + critical=False, + ) + cert = builder.sign(key, hashes.SHA256()) + return cert.public_bytes(serialization.Encoding.DER) + + +class TestParseLeafCert: + + def test_self_signed_cert_with_sans(self): + der = _build_self_signed_der( + cn="evil.example.com", + sans=["evil.example.com", "c2.example.com"], + ) + result = parse_leaf_cert(der) + assert result is not None + assert result["subject_cn"] == "evil.example.com" + assert "evil.example.com" in result["issuer"] + assert result["self_signed"] is True + assert result["not_before"] == "2026-01-01T00:00:00Z" + assert result["not_after"] == "2027-01-01T00:00:00Z" + assert set(result["sans"]) == {"evil.example.com", "c2.example.com"} + assert result["cert_sha256"] == hashlib.sha256(der).hexdigest() + + def test_cert_without_sans(self): + der = _build_self_signed_der(cn="single.example", sans=None) + result = parse_leaf_cert(der) + assert result is not None + assert result["sans"] == [] + + def test_ca_issued_cert_not_self_signed(self): + der = _build_self_signed_der(cn="leaf.example", issuer_cn="ca.example") + result = parse_leaf_cert(der) + assert result is not None + assert result["self_signed"] is False + + def test_garbage_der_returns_none(self): + assert parse_leaf_cert(b"\x00\x01\x02\x03 not a cert") is None + + def test_empty_bytes_returns_none(self): + assert parse_leaf_cert(b"") is None + + +class TestFetchLeafCert: + + @patch("decnet.prober.tlscert.ssl.create_default_context") + @patch("decnet.prober.tlscert.socket.create_connection") + def test_returns_parsed_cert_on_success( + self, mock_conn: MagicMock, mock_ctx_factory: MagicMock, + ): + der = _build_self_signed_der(cn="ok.example", sans=["ok.example"]) + + # Mock the socket context manager + mock_socket = MagicMock() + mock_socket.__enter__ = MagicMock(return_value=mock_socket) + mock_socket.__exit__ = MagicMock(return_value=False) + mock_conn.return_value = mock_socket + + # Mock the SSLSocket returned by wrap_socket + mock_tls = MagicMock() + mock_tls.__enter__ = MagicMock(return_value=mock_tls) + mock_tls.__exit__ = MagicMock(return_value=False) + mock_tls.getpeercert = MagicMock(return_value=der) + + mock_ctx = MagicMock() + mock_ctx.wrap_socket = MagicMock(return_value=mock_tls) + mock_ctx_factory.return_value = mock_ctx + + result = fetch_leaf_cert("10.0.0.1", 443, timeout=1.0) + assert result is not None + assert result["subject_cn"] == "ok.example" + + @patch("decnet.prober.tlscert.socket.create_connection") + def test_connect_failure_returns_none(self, mock_conn: MagicMock): + mock_conn.side_effect = OSError("Connection refused") + assert fetch_leaf_cert("10.0.0.1", 443, timeout=1.0) is None + + @patch("decnet.prober.tlscert.socket.create_connection") + def test_handshake_timeout_returns_none(self, mock_conn: MagicMock): + mock_conn.side_effect = socket.timeout() + assert fetch_leaf_cert("10.0.0.1", 443, timeout=1.0) is None + + @patch("decnet.prober.tlscert.ssl.create_default_context") + @patch("decnet.prober.tlscert.socket.create_connection") + def test_ssl_error_returns_none( + self, mock_conn: MagicMock, mock_ctx_factory: MagicMock, + ): + mock_socket = MagicMock() + mock_socket.__enter__ = MagicMock(return_value=mock_socket) + mock_socket.__exit__ = MagicMock(return_value=False) + mock_conn.return_value = mock_socket + + mock_ctx = MagicMock() + mock_ctx.wrap_socket = MagicMock(side_effect=ssl.SSLError("handshake failed")) + mock_ctx_factory.return_value = mock_ctx + + assert fetch_leaf_cert("10.0.0.1", 443, timeout=1.0) is None + + @patch("decnet.prober.tlscert.ssl.create_default_context") + @patch("decnet.prober.tlscert.socket.create_connection") + def test_empty_peer_cert_returns_none( + self, mock_conn: MagicMock, mock_ctx_factory: MagicMock, + ): + mock_socket = MagicMock() + mock_socket.__enter__ = MagicMock(return_value=mock_socket) + mock_socket.__exit__ = MagicMock(return_value=False) + mock_conn.return_value = mock_socket + + mock_tls = MagicMock() + mock_tls.__enter__ = MagicMock(return_value=mock_tls) + mock_tls.__exit__ = MagicMock(return_value=False) + mock_tls.getpeercert = MagicMock(return_value=b"") + + mock_ctx = MagicMock() + mock_ctx.wrap_socket = MagicMock(return_value=mock_tls) + mock_ctx_factory.return_value = mock_ctx + + assert fetch_leaf_cert("10.0.0.1", 443, timeout=1.0) is None diff --git a/tests/prober/test_prober_worker.py b/tests/prober/test_prober_worker.py index 76a8ef57..69b4a73b 100644 --- a/tests/prober/test_prober_worker.py +++ b/tests/prober/test_prober_worker.py @@ -109,11 +109,13 @@ class TestDiscoverAttackers: class TestProbeCycleJARM: + @patch("decnet.prober.worker.fetch_leaf_cert", return_value=None) @patch("decnet.prober.worker.tcp_fingerprint") @patch("decnet.prober.worker.hassh_server") @patch("decnet.prober.worker.jarm_hash") def test_probes_new_ips(self, mock_jarm: MagicMock, mock_hassh: MagicMock, - mock_tcpfp: MagicMock, tmp_path: Path): + mock_tcpfp: MagicMock, mock_cert: MagicMock, + tmp_path: Path): mock_jarm.return_value = "c0c" * 10 + "a" * 32 # fake 62-char hash mock_hassh.return_value = None mock_tcpfp.return_value = None @@ -129,11 +131,13 @@ class TestProbeCycleJARM: assert 443 in probed["10.0.0.1"]["jarm"] assert 8443 in probed["10.0.0.1"]["jarm"] + @patch("decnet.prober.worker.fetch_leaf_cert", return_value=None) @patch("decnet.prober.worker.tcp_fingerprint") @patch("decnet.prober.worker.hassh_server") @patch("decnet.prober.worker.jarm_hash") def test_skips_already_probed_ports(self, mock_jarm: MagicMock, mock_hassh: MagicMock, - mock_tcpfp: MagicMock, tmp_path: Path): + mock_tcpfp: MagicMock, mock_cert: MagicMock, + tmp_path: Path): mock_jarm.return_value = "c0c" * 10 + "a" * 32 mock_hassh.return_value = None mock_tcpfp.return_value = None @@ -480,3 +484,176 @@ class TestWriteEvent: assert record["event_type"] == "test_event" assert record["service"] == "prober" assert record["fields"]["target_ip"] == "10.0.0.1" + + +# ─── _probe_cycle: TLS certificate capture (after JARM) ─────────────────── + +class TestProbeCycleTLSCert: + + @patch("decnet.prober.worker.fetch_leaf_cert") + @patch("decnet.prober.worker.tcp_fingerprint") + @patch("decnet.prober.worker.hassh_server") + @patch("decnet.prober.worker.jarm_hash") + def test_cert_event_emitted_after_successful_jarm( + self, + mock_jarm: MagicMock, + mock_hassh: MagicMock, + mock_tcpfp: MagicMock, + mock_cert: MagicMock, + tmp_path: Path, + ): + """A non-empty JARM hash should trigger a follow-up cert fetch and + write a tls_certificate event with all parsed fields.""" + mock_jarm.return_value = "c0c" * 10 + "a" * 32 + mock_hassh.return_value = None + mock_tcpfp.return_value = None + mock_cert.return_value = { + "subject_cn": "evil.example.com", + "issuer": "CN=evil.example.com", + "self_signed": True, + "not_before": "2026-01-01T00:00:00Z", + "not_after": "2027-01-01T00:00:00Z", + "sans": ["evil.example.com", "c2.example.com"], + "cert_sha256": "ab" * 32, + } + log_path = tmp_path / "decnet.log" + json_path = tmp_path / "decnet.json" + + _probe_cycle({"10.0.0.1"}, {}, [443], [], [], log_path, json_path, timeout=1.0) + + mock_cert.assert_called_once_with("10.0.0.1", 443, timeout=1.0) + records = [ + json.loads(line) + for line in json_path.read_text().splitlines() if line + ] + cert_records = [r for r in records if r["event_type"] == "tls_certificate"] + assert len(cert_records) == 1 + f = cert_records[0]["fields"] + assert f["target_ip"] == "10.0.0.1" + assert f["target_port"] == "443" + assert f["subject_cn"] == "evil.example.com" + assert f["issuer"] == "CN=evil.example.com" + assert f["self_signed"] == "true" + assert f["not_before"] == "2026-01-01T00:00:00Z" + assert f["not_after"] == "2027-01-01T00:00:00Z" + assert f["sans"] == "evil.example.com,c2.example.com" + assert f["cert_sha256"] == "ab" * 32 + + @patch("decnet.prober.worker.fetch_leaf_cert") + @patch("decnet.prober.worker.tcp_fingerprint") + @patch("decnet.prober.worker.hassh_server") + @patch("decnet.prober.worker.jarm_hash") + def test_cert_fetch_skipped_on_empty_jarm( + self, + mock_jarm: MagicMock, + mock_hassh: MagicMock, + mock_tcpfp: MagicMock, + mock_cert: MagicMock, + tmp_path: Path, + ): + """JARM_EMPTY_HASH means the port doesn't speak TLS; skip cert fetch.""" + mock_jarm.return_value = JARM_EMPTY_HASH + mock_hassh.return_value = None + mock_tcpfp.return_value = None + log_path = tmp_path / "decnet.log" + json_path = tmp_path / "decnet.json" + + _probe_cycle({"10.0.0.1"}, {}, [443], [], [], log_path, json_path, timeout=1.0) + + mock_cert.assert_not_called() + + @patch("decnet.prober.worker.fetch_leaf_cert", return_value=None) + @patch("decnet.prober.worker.tcp_fingerprint") + @patch("decnet.prober.worker.hassh_server") + @patch("decnet.prober.worker.jarm_hash") + def test_cert_fetch_failure_silent( + self, + mock_jarm: MagicMock, + mock_hassh: MagicMock, + mock_tcpfp: MagicMock, + mock_cert: MagicMock, + tmp_path: Path, + ): + """fetch_leaf_cert returning None must not write a cert event.""" + mock_jarm.return_value = "c0c" * 10 + "a" * 32 + mock_hassh.return_value = None + mock_tcpfp.return_value = None + log_path = tmp_path / "decnet.log" + json_path = tmp_path / "decnet.json" + + _probe_cycle({"10.0.0.1"}, {}, [443], [], [], log_path, json_path, timeout=1.0) + + mock_cert.assert_called_once_with("10.0.0.1", 443, timeout=1.0) + if json_path.exists(): + content = json_path.read_text() + assert "tls_certificate" not in content + + @patch("decnet.prober.worker.fetch_leaf_cert") + @patch("decnet.prober.worker.tcp_fingerprint") + @patch("decnet.prober.worker.hassh_server") + @patch("decnet.prober.worker.jarm_hash") + def test_cert_fetch_crash_does_not_break_phase( + self, + mock_jarm: MagicMock, + mock_hassh: MagicMock, + mock_tcpfp: MagicMock, + mock_cert: MagicMock, + tmp_path: Path, + ): + """If fetch_leaf_cert throws despite its contract, the JARM phase + must keep moving to the next port without crashing.""" + mock_jarm.return_value = "c0c" * 10 + "a" * 32 + mock_hassh.return_value = None + mock_tcpfp.return_value = None + mock_cert.side_effect = RuntimeError("unexpected") + log_path = tmp_path / "decnet.log" + json_path = tmp_path / "decnet.json" + + _probe_cycle({"10.0.0.1"}, {}, [443, 8443], [], [], log_path, json_path, timeout=1.0) + + # Both ports still marked probed despite the cert-side crash. + from decnet.prober.worker import _probe_cycle as _ # re-import safety + assert mock_cert.call_count == 2 + + @patch("decnet.prober.worker.fetch_leaf_cert") + @patch("decnet.prober.worker.tcp_fingerprint") + @patch("decnet.prober.worker.hassh_server") + @patch("decnet.prober.worker.jarm_hash") + def test_cert_publish_fn_called( + self, + mock_jarm: MagicMock, + mock_hassh: MagicMock, + mock_tcpfp: MagicMock, + mock_cert: MagicMock, + tmp_path: Path, + ): + """publish_fn must receive a 'tls_certificate' event when capture succeeds.""" + mock_jarm.return_value = "c0c" * 10 + "a" * 32 + mock_hassh.return_value = None + mock_tcpfp.return_value = None + mock_cert.return_value = { + "subject_cn": "cn", + "issuer": "CN=cn", + "self_signed": True, + "not_before": "2026-01-01T00:00:00Z", + "not_after": "2027-01-01T00:00:00Z", + "sans": [], + "cert_sha256": "cd" * 32, + } + published: list[tuple[str, dict]] = [] + + def publish(kind: str, payload: dict) -> None: + published.append((kind, payload)) + + _probe_cycle( + {"10.0.0.1"}, {}, [443], [], [], + tmp_path / "decnet.log", tmp_path / "decnet.json", + timeout=1.0, publish_fn=publish, + ) + + cert_pubs = [p for p in published if p[0] == "tls_certificate"] + assert len(cert_pubs) == 1 + assert cert_pubs[0][1]["attacker_ip"] == "10.0.0.1" + assert cert_pubs[0][1]["port"] == 443 + assert cert_pubs[0][1]["cert_sha256"] == "cd" * 32 + assert cert_pubs[0][1]["self_signed"] is True