diff --git a/decnet/prober/base.py b/decnet/prober/base.py index 73d25574..5adf3d9a 100644 --- a/decnet/prober/base.py +++ b/decnet/prober/base.py @@ -2,8 +2,6 @@ ActiveProbe ABC and metaclass registry for port-iterating active probes. Adding a new active probe = one class with three methods. -IPv6 leak and TLS cert capture are NOT part of this registry (different -call shapes); they stay as special cases in prober/worker.py. """ from __future__ import annotations diff --git a/decnet/prober/probes/__init__.py b/decnet/prober/probes/__init__.py index c017bfb3..55dc7f4b 100644 --- a/decnet/prober/probes/__init__.py +++ b/decnet/prober/probes/__init__.py @@ -3,3 +3,4 @@ from decnet.prober.probes.hassh import HasshProbe as HasshProbe from decnet.prober.probes.ipv6_leak_probe import Ipv6LeakProbe as Ipv6LeakProbe from decnet.prober.probes.jarm import JarmProbe as JarmProbe from decnet.prober.probes.tcpfp import TcpfpProbe as TcpfpProbe +from decnet.prober.probes.tlscert_probe import TlsCertProbe as TlsCertProbe diff --git a/decnet/prober/probes/tlscert_probe.py b/decnet/prober/probes/tlscert_probe.py new file mode 100644 index 00000000..87583523 --- /dev/null +++ b/decnet/prober/probes/tlscert_probe.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +from typing import Any + +from decnet.prober.base import ActiveProbe +from decnet.prober.tlscert import fetch_leaf_cert +from decnet.telemetry import traced as _traced + +DEFAULT_PORTS: list[int | None] = [443, 8443, 8080, 4443, 50050, 2222, 993, 995, 8888, 9001] + + +class TlsCertProbe(ActiveProbe): + """Fetch the leaf TLS certificate from attacker-run servers. + + Runs after JarmProbe (priority=200 > 100) on the same port set. + Returns None when the port does not speak TLS — no event emitted. + """ + + probe_name = "tls_certificate" + default_ports: list[int | None] = DEFAULT_PORTS + event_type = "tls_certificate" + rotation_type = None + rotation_hash_key = None + priority = 200 + + @_traced("prober.tls_cert_probe") + def run(self, ip: str, port: int | None, timeout: float) -> dict[str, Any] | None: + if port is None: + return None + return fetch_leaf_cert(ip, port, timeout=timeout) + + def syslog_fields(self, ip: str, port: int | None, result: dict[str, Any]) -> tuple[dict[str, Any], str]: + fields = { + "subject_cn": result["subject_cn"], + "issuer": result["issuer"], + "self_signed": str(result["self_signed"]).lower(), + "not_before": result["not_before"], + "not_after": result["not_after"], + "sans": ",".join(result["sans"]), + "cert_sha256": result["cert_sha256"], + } + msg = f"TLS cert {ip}:{port} CN={result['subject_cn']} sha256={result['cert_sha256'][:16]}..." + return fields, msg + + def publish_payload(self, ip: str, port: int | None, result: dict[str, Any]) -> dict[str, Any]: + return { + "attacker_ip": ip, + "port": port, + "subject_cn": result["subject_cn"], + "cert_sha256": result["cert_sha256"], + "self_signed": result["self_signed"], + } diff --git a/decnet/prober/worker.py b/decnet/prober/worker.py index 6939bb4b..60e79a2b 100644 --- a/decnet/prober/worker.py +++ b/decnet/prober/worker.py @@ -45,7 +45,6 @@ from decnet.correlation.fingerprint_rotation import ( from decnet.logging import get_logger from decnet.prober.base import ActiveProbe, ActiveProbeMeta import decnet.prober.probes as _probes # noqa: F401 — triggers metaclass registration -from decnet.prober.tlscert import fetch_leaf_cert from decnet.telemetry import traced as _traced logger = get_logger("prober") @@ -285,9 +284,6 @@ def _run_probe( record_rotation(ip, port, probe.rotation_type, result[probe.rotation_hash_key]) if publish_fn is not None: publish_fn(probe.probe_name, probe.publish_payload(ip, port, result)) - if probe.probe_name == "jarm" and port is not None: - # A non-empty JARM hash proves TLS; attempt a real cert capture. - _capture_tls_cert(ip, port, log_path, json_path, timeout, publish_fn) except Exception as exc: done.add(port) _write_event( @@ -324,60 +320,6 @@ def _probe_cycle( timeout, publish_fn, record_rotation) -@_traced("prober.tls_cert_capture") -def _capture_tls_cert( - ip: str, - port: int, - log_path: Path, - json_path: Path, - timeout: float, - publish_fn: ProbePublishFn | None, -) -> None: - """Fetch the leaf TLS cert from ``ip:port`` and emit a tls_certificate - event. No-op when the handshake fails (silent — JARM already proved - the port responds, but the real handshake can still fail for many - reasons: cipher mismatch, SNI gating, mTLS requirement).""" - try: - cert = fetch_leaf_cert(ip, port, timeout=timeout) - except Exception as exc: - # fetch_leaf_cert is supposed to swallow errors; defense in depth. - logger.warning("prober: TLS cert fetch crashed %s:%d: %s", ip, port, exc) - return - if cert is None: - return - - sans_csv = ",".join(cert["sans"]) - _write_event( - log_path, json_path, - "tls_certificate", - target_ip=ip, - target_port=str(port), - subject_cn=cert["subject_cn"], - issuer=cert["issuer"], - self_signed=str(cert["self_signed"]).lower(), - not_before=cert["not_before"], - not_after=cert["not_after"], - sans=sans_csv, - cert_sha256=cert["cert_sha256"], - msg=f"TLS cert {ip}:{port} CN={cert['subject_cn']} sha256={cert['cert_sha256'][:16]}...", - ) - logger.info( - "prober: TLS cert %s:%d CN=%s sha256=%s", - ip, port, cert["subject_cn"], cert["cert_sha256"], - ) - if publish_fn is not None: - publish_fn( - "tls_certificate", - { - "attacker_ip": ip, - "port": port, - "subject_cn": cert["subject_cn"], - "cert_sha256": cert["cert_sha256"], - "self_signed": cert["self_signed"], - }, - ) - - # ─── Main worker ─────────────────────────────────────────────────────────────