feat(prober): add IcmpErrorProbe — ICMP error-leakage fingerprint

Sends four crafted stimuli (UDP/closed-port, TTL=1, DF+oversized,
bad IP option) and records which ICMP error classes come back, the
per-error RTT, and the bytes echoed in each ICMP body. Absence is
as informative as a reply — Linux rate-limiting is a fingerprint signal.

Returns None when no packets could be sent (no CAP_NET_RAW), so the
probe is a no-op in non-root test environments. Port-free ActiveProbe
subclass (priority=850), metaclass auto-registered in the registry.

Also fixes three sets of stale tests left over from the TlsCertProbe
migration (4b2759e0):
- test_active_probe_registry: closed name/order sets updated for
  tls_certificate and icmp_error
- test_prober_rotation: dead patches on worker.fetch_leaf_cert removed
- test_prober_worker (TestProbeCycleTLSCert): rewritten to test
  TlsCertProbe as an independent registry probe, patch target updated
  from worker.fetch_leaf_cert to probes.tlscert_probe.fetch_leaf_cert
This commit is contained in:
2026-05-21 14:52:49 -04:00
parent 4b2759e0fc
commit 56229a272b
7 changed files with 781 additions and 84 deletions

304
decnet/prober/icmp_error.py Normal file
View File

@@ -0,0 +1,304 @@
"""ICMP error-elicitation prober.
Sends four crafted stimuli to a target and records which ICMP error
classes are returned, the per-error RTT, and the bytes echoed back inside
each ICMP error body. Silence is as informative as a reply — Linux emits
at most 1 ICMP error/sec by default, so rate-limited absences are
fingerprint-worthy too.
Requires root / CAP_NET_RAW. Scapy is lazy-imported so non-root callers
of other prober modules are not broken.
"""
from __future__ import annotations
import hashlib
import random
from datetime import datetime, timezone
from typing import Any
from decnet.logging import get_logger
from decnet.telemetry import traced as _traced
_log = get_logger("prober.icmp_error")
# ─── per-primitive result shape ──────────────────────────────────────────────
# {
# "returned": bool,
# "rtt_ms": float | None,
# "src_ip": str | None, # sender of the ICMP error (may differ from target)
# "icmp_code": int | None,
# "echo_len": int | None, # bytes the ICMP body echoed back from our probe
# "echo_bytes_hex": str | None,
# }
_SILENT: dict[str, Any] = {
"sent": False, # False when PermissionError blocked sr1 (no CAP_NET_RAW)
"returned": False,
"rtt_ms": None,
"src_ip": None,
"icmp_code": None,
"echo_len": None,
"echo_bytes_hex": None,
}
# Matrix letter codes: uppercase if returned, '.' if silent, '~' if wrong type
_MATRIX_LETTERS = {
"port_unreachable": "P",
"time_exceeded": "T",
"frag_needed": "F",
"param_problem": "X",
}
# ─── helpers ──────────────────────────────────────────────────────────────────
def _ephemeral() -> int:
return random.randint(49152, 65535) # nosec B311 — ephemeral port, not crypto
def _closed_udp_port() -> int:
# traceroute range — blends with normal network probing traffic
return random.randint(33434, 33534) # nosec B311
def _parse_reply(resp: Any, expected_type: int, expected_code: int | None, start_ns: int) -> dict[str, Any]:
"""Extract a per-primitive result dict from a scapy ICMP reply."""
from scapy.all import ICMP, IP # type: ignore[attr-defined]
rtt_ms: float | None = None
try:
rtt_ms = round((resp.time - start_ns) * 1000, 3)
except Exception as exc:
_log.debug("icmp_error: rtt extraction failed: %s", exc)
result: dict[str, Any] = {"sent": True} # we made it past sr1
src_ip: str | None = None
try:
src_ip = resp[IP].src
except Exception as exc:
_log.debug("icmp_error: src_ip extraction failed: %s", exc)
icmp_code: int | None = None
echo_len: int | None = None
echo_bytes_hex: str | None = None
try:
icmp_layer = resp[ICMP]
icmp_code = int(icmp_layer.code)
# ICMP error bodies echo the original IP header + first 8 bytes of transport.
payload = bytes(icmp_layer.payload)
if payload:
echo_len = len(payload)
echo_bytes_hex = payload[:32].hex()
except Exception as exc:
_log.debug("icmp_error: ICMP field extraction failed: %s", exc)
wrong_type = False
try:
icmp_type = int(resp[ICMP].type)
wrong_type = icmp_type != expected_type or (
expected_code is not None and icmp_code != expected_code
)
except Exception:
wrong_type = True
result.update({
"returned": not wrong_type,
"rtt_ms": rtt_ms,
"src_ip": src_ip,
"icmp_code": icmp_code,
"echo_len": echo_len,
"echo_bytes_hex": echo_bytes_hex,
})
return result
# ─── four stimulus primitives ─────────────────────────────────────────────────
@_traced("prober.icmp_error.port_unreachable")
def _probe_port_unreachable(target_ip: str, timeout: float) -> dict[str, Any]:
"""UDP to a closed port → expect ICMP type=3 code=3 (Port Unreachable)."""
try:
from scapy.all import IP, Raw, UDP, conf, sr1 # type: ignore[attr-defined]
import time as _time
conf.verb = 0
dport = _closed_udp_port()
pkt = IP(dst=target_ip) / UDP(sport=_ephemeral(), dport=dport) / Raw(b"\x00" * 32)
t0 = _time.monotonic()
resp = sr1(pkt, timeout=timeout, verbose=0)
if resp is None:
return dict(_SILENT)
resp.time = _time.monotonic() - t0
return _parse_reply(resp, expected_type=3, expected_code=3, start_ns=0)
except (OSError, PermissionError) as exc:
_log.debug("icmp_error: port_unreachable probe failed for %s: %s", target_ip, exc)
return dict(_SILENT)
@_traced("prober.icmp_error.time_exceeded")
def _probe_time_exceeded(
target_ip: str,
timeout: float,
on_link: bool = False,
) -> dict[str, Any]:
"""UDP with ttl=1 → expect ICMP type=11 code=0 (Time Exceeded) from next-hop.
Skipped when the attacker is on-link (no intermediate hop to probe).
`on_link` is pre-computed by the caller to avoid a redundant route lookup.
"""
if on_link:
return dict(_SILENT)
try:
from scapy.all import IP, Raw, UDP, conf, sr1 # type: ignore[attr-defined]
import time as _time
conf.verb = 0
dport = _closed_udp_port()
pkt = IP(dst=target_ip, ttl=1) / UDP(sport=_ephemeral(), dport=dport) / Raw(b"\x00" * 32)
t0 = _time.monotonic()
resp = sr1(pkt, timeout=timeout, verbose=0)
if resp is None:
return dict(_SILENT)
resp.time = _time.monotonic() - t0
return _parse_reply(resp, expected_type=11, expected_code=0, start_ns=0)
except (OSError, PermissionError) as exc:
_log.debug("icmp_error: time_exceeded probe failed for %s: %s", target_ip, exc)
return dict(_SILENT)
@_traced("prober.icmp_error.frag_needed")
def _probe_frag_needed(target_ip: str, timeout: float) -> dict[str, Any]:
"""Oversized UDP with DF=1 → expect ICMP type=3 code=4 (Fragmentation Needed)."""
try:
from scapy.all import IP, Raw, UDP, conf, sr1 # type: ignore[attr-defined]
import time as _time
conf.verb = 0
# 1500 bytes payload forces fragmentation when DF is set
pkt = (
IP(dst=target_ip, flags="DF")
/ UDP(sport=_ephemeral(), dport=_closed_udp_port())
/ Raw(b"\x00" * 1500)
)
t0 = _time.monotonic()
resp = sr1(pkt, timeout=timeout, verbose=0)
if resp is None:
return dict(_SILENT)
resp.time = _time.monotonic() - t0
return _parse_reply(resp, expected_type=3, expected_code=4, start_ns=0)
except (OSError, PermissionError) as exc:
_log.debug("icmp_error: frag_needed probe failed for %s: %s", target_ip, exc)
return dict(_SILENT)
@_traced("prober.icmp_error.param_problem")
def _probe_param_problem(target_ip: str, timeout: float) -> dict[str, Any]:
"""IP packet with malformed option → expect ICMP type=12 (Parameter Problem).
Most stacks silently drop malformed options; absence is still a fingerprint.
"""
try:
from scapy.all import IP, Raw, UDP, conf, sr1 # type: ignore[attr-defined]
import time as _time
conf.verb = 0
# Option class=0 (control), number=17 (MTU probe, often unrecognised),
# length byte deliberately wrong (2 instead of a valid even value).
bad_opt = b"\x91\x02" # type=0x91 (copied flag + class 0 + number 17), len=2
pkt = (
IP(dst=target_ip, options=bad_opt)
/ UDP(sport=_ephemeral(), dport=_closed_udp_port())
/ Raw(b"\x00" * 8)
)
t0 = _time.monotonic()
resp = sr1(pkt, timeout=timeout, verbose=0)
if resp is None:
return dict(_SILENT)
resp.time = _time.monotonic() - t0
return _parse_reply(resp, expected_type=12, expected_code=None, start_ns=0)
except (OSError, PermissionError) as exc:
_log.debug("icmp_error: param_problem probe failed for %s: %s", target_ip, exc)
return dict(_SILENT)
# ─── matrix + hash ────────────────────────────────────────────────────────────
def _build_matrix(errors: dict[str, dict[str, Any]]) -> str:
parts = []
for key, letter in _MATRIX_LETTERS.items():
e = errors.get(key, _SILENT)
if not e.get("returned", False):
parts.append(".")
elif e.get("icmp_code") is not None:
parts.append(letter)
else:
parts.append("~")
return "".join(parts)
def _compute_hash(matrix: str, errors: dict[str, dict[str, Any]]) -> str:
echo_lens = tuple(
errors.get(k, _SILENT).get("echo_len") or 0
for k in _MATRIX_LETTERS
)
icmp_codes = tuple(
errors.get(k, _SILENT).get("icmp_code") or -1
for k in _MATRIX_LETTERS
)
raw = f"{matrix}:{echo_lens}:{icmp_codes}"
return hashlib.sha256(raw.encode()).hexdigest()[:32]
# ─── public API ───────────────────────────────────────────────────────────────
@_traced("prober.icmp_error.elicit")
def elicit_icmp_errors(
target_ip: str,
timeout: float = 2.0,
) -> dict[str, Any] | None:
"""Send four ICMP-eliciting probes and return a fingerprint result dict.
Returns None when scapy is unavailable or all four primitives produced
no information (all silent with no exception — pure silence is still
fingerprint-worthy, but we need at least one probe to have executed
without a hard error before returning a result).
Requires root / CAP_NET_RAW.
"""
try:
import scapy.all # noqa: F401 — presence check
except ImportError:
_log.debug("scapy not available — icmp_error active probe skipped")
return None
# Pre-compute on-link status once for the time_exceeded gate.
on_link = False
try:
from decnet.prober.ipv6_leak import _route_info
on_link, _ = _route_info(target_ip)
except Exception as exc:
_log.debug("icmp_error: _route_info failed for %s: %s", target_ip, exc)
errors: dict[str, dict[str, Any]] = {
"port_unreachable": _probe_port_unreachable(target_ip, timeout),
"time_exceeded": _probe_time_exceeded(target_ip, timeout, on_link=on_link),
"frag_needed": _probe_frag_needed(target_ip, timeout),
"param_problem": _probe_param_problem(target_ip, timeout),
}
if not any(e.get("sent", False) for e in errors.values()):
_log.debug("icmp_error: no packets sent to %s (no CAP_NET_RAW?)", target_ip)
return None
matrix = _build_matrix(errors)
fp_hash = _compute_hash(matrix, errors)
return {
"matrix": matrix,
"fingerprint_hash": fp_hash,
"errors": errors,
"observed_at": datetime.now(timezone.utc).isoformat(),
}

View File

@@ -1,5 +1,6 @@
# Import all probe modules to trigger ActiveProbeMeta registration. # Import all probe modules to trigger ActiveProbeMeta registration.
from decnet.prober.probes.hassh import HasshProbe as HasshProbe from decnet.prober.probes.hassh import HasshProbe as HasshProbe
from decnet.prober.probes.icmp_error_probe import IcmpErrorProbe as IcmpErrorProbe
from decnet.prober.probes.ipv6_leak_probe import Ipv6LeakProbe as Ipv6LeakProbe from decnet.prober.probes.ipv6_leak_probe import Ipv6LeakProbe as Ipv6LeakProbe
from decnet.prober.probes.jarm import JarmProbe as JarmProbe from decnet.prober.probes.jarm import JarmProbe as JarmProbe
from decnet.prober.probes.tcpfp import TcpfpProbe as TcpfpProbe from decnet.prober.probes.tcpfp import TcpfpProbe as TcpfpProbe

View File

@@ -0,0 +1,72 @@
from __future__ import annotations
from typing import Any
from decnet.logging import get_logger
from decnet.prober.base import ActiveProbe
_log = get_logger("prober.icmp_error_probe")
class IcmpErrorProbe(ActiveProbe):
"""Port-free probe that elicits ICMP error replies from the attacker.
Sends four crafted stimuli (UDP/closed-port, TTL=1, DF+oversized, bad
IP option) and records which ICMP error classes the target emits, the
per-error RTT, and bytes echoed back in each ICMP error body.
Silent responses are as fingerprint-worthy as replies: Linux emits at
most 1 ICMP error/sec, so rate-limited absences reveal OS behaviour.
Requires root / CAP_NET_RAW. Scapy is lazy-imported inside the helper.
"""
probe_name = "icmp_error"
default_ports: list[int | None] = [None]
event_type = "icmp_error_leak"
priority = 850 # after TCP/TLS (100-200), before ipv6_leak (999)
def run(self, ip: str, port: int | None, timeout: float) -> dict[str, Any] | None:
from decnet.prober.icmp_error import elicit_icmp_errors
return elicit_icmp_errors(ip, timeout=timeout)
def syslog_fields(
self, ip: str, port: int | None, result: dict[str, Any]
) -> tuple[dict[str, Any], str]:
matrix = result.get("matrix", "")
fp_hash = result.get("fingerprint_hash", "")
errors = result.get("errors", {})
def _flag(key: str) -> str:
return "1" if errors.get(key, {}).get("returned", False) else "0"
def _rtt(key: str) -> str:
v = errors.get(key, {}).get("rtt_ms")
return str(v) if v is not None else ""
fields: dict[str, Any] = {
"icmp_matrix": matrix,
"icmp_fp_hash": fp_hash,
"icmp_port_unreach": _flag("port_unreachable"),
"icmp_time_exceeded": _flag("time_exceeded"),
"icmp_frag_needed": _flag("frag_needed"),
"icmp_param_problem": _flag("param_problem"),
"icmp_port_unreach_rtt_ms": _rtt("port_unreachable"),
"icmp_time_exceeded_rtt_ms": _rtt("time_exceeded"),
"icmp_frag_needed_rtt_ms": _rtt("frag_needed"),
"icmp_param_problem_rtt_ms": _rtt("param_problem"),
"icmp_time_exceeded_hop": errors.get("time_exceeded", {}).get("src_ip") or "",
}
msg = f"ICMP leak {ip} → matrix={matrix} fp={fp_hash[:8]}"
return fields, msg
def publish_payload(
self, ip: str, port: int | None, result: dict[str, Any]
) -> dict[str, Any]:
return {
"attacker_ip": ip,
"icmp_matrix": result.get("matrix", ""),
"icmp_fp_hash": result.get("fingerprint_hash", ""),
"errors": result.get("errors", {}),
"observed_at": result.get("observed_at", ""),
}

View File

@@ -23,12 +23,15 @@ class TestRegistryContents:
def test_all_probes_registered(self): def test_all_probes_registered(self):
names = {cls.probe_name for cls in ActiveProbeMeta.all()} names = {cls.probe_name for cls in ActiveProbeMeta.all()}
assert names == {"jarm", "hassh", "tcpfp", "ipv6_leak"} assert names == {"jarm", "hassh", "tcpfp", "ipv6_leak", "tls_certificate", "icmp_error"}
def test_sorted_by_priority_then_name(self): def test_sorted_by_priority_then_name(self):
order = [cls.probe_name for cls in ActiveProbeMeta.all()] order = [cls.probe_name for cls in ActiveProbeMeta.all()]
# hassh/jarm/tcpfp all priority=100 (alphabetical), ipv6_leak priority=999 last # priority=100: hassh/jarm/tcpfp (alphabetical)
assert order == ["hassh", "jarm", "tcpfp", "ipv6_leak"] # priority=200: tls_certificate
# priority=850: icmp_error
# priority=999: ipv6_leak
assert order == ["hassh", "jarm", "tcpfp", "tls_certificate", "icmp_error", "ipv6_leak"]
def test_priority10_probe_sorts_first(self): def test_priority10_probe_sorts_first(self):
class _FastProbe(ActiveProbe): class _FastProbe(ActiveProbe):
@@ -48,7 +51,7 @@ class TestRegistryContents:
order = [cls.probe_name for cls in ActiveProbeMeta.all()] order = [cls.probe_name for cls in ActiveProbeMeta.all()]
assert order[0] == "_fast_test_probe" assert order[0] == "_fast_test_probe"
assert set(order[1:]) == {"hassh", "jarm", "tcpfp", "ipv6_leak"} assert set(order[1:]) == {"hassh", "jarm", "tcpfp", "ipv6_leak", "tls_certificate", "icmp_error"}
def test_port_none_probe_dispatched_with_none_port(self): def test_port_none_probe_dispatched_with_none_port(self):
"""_run_probe must call run(ip, None, timeout) for a port-free probe.""" """_run_probe must call run(ip, None, timeout) for a port-free probe."""

View File

@@ -0,0 +1,319 @@
"""Tests for IcmpErrorProbe and the underlying icmp_error helpers.
Covers:
- IcmpErrorProbe.run() returns helper result verbatim.
- IcmpErrorProbe.run() returns None when helper returns None.
- IcmpErrorProbe.syslog_fields() — stable key set, correct flag encoding, human msg.
- IcmpErrorProbe.publish_payload() — correct bus payload shape.
- _probe_port_unreachable / _probe_time_exceeded / _probe_frag_needed /
_probe_param_problem — returned-reply and silent-timeout cases.
- _probe_time_exceeded skipped when on-link.
- elicit_icmp_errors returns None when scapy is unavailable.
- Fingerprint hash is deterministic for identical inputs.
- Matrix encoding table-driven across all four present/absent combinations.
"""
from __future__ import annotations
import importlib
from typing import Any
from unittest.mock import MagicMock, call, patch
# ─── fixtures ────────────────────────────────────────────────────────────────
_SILENT: dict[str, Any] = {
"returned": False,
"rtt_ms": None,
"src_ip": None,
"icmp_code": None,
"echo_len": None,
"echo_bytes_hex": None,
}
_EVIDENCE: dict[str, Any] = {
"matrix": "PT..",
"fingerprint_hash": "abcdef1234567890abcdef1234567890",
"errors": {
"port_unreachable": {
"sent": True, "returned": True, "rtt_ms": 1.5, "src_ip": "10.0.0.9",
"icmp_code": 3, "echo_len": 28, "echo_bytes_hex": "aabbcc",
},
"time_exceeded": {
"sent": True, "returned": True, "rtt_ms": 0.8, "src_ip": "192.168.1.1",
"icmp_code": 0, "echo_len": 28, "echo_bytes_hex": "ddeeff",
},
"frag_needed": dict(_SILENT),
"param_problem": dict(_SILENT),
},
"observed_at": "2026-01-01T00:00:00+00:00",
}
def _make_probe():
from decnet.prober.probes.icmp_error_probe import IcmpErrorProbe
return IcmpErrorProbe()
# ─── IcmpErrorProbe.run() ─────────────────────────────────────────────────────
def test_run_returns_evidence() -> None:
probe = _make_probe()
with patch("decnet.prober.icmp_error.elicit_icmp_errors", return_value=_EVIDENCE) as mock_fn:
result = probe.run("10.0.0.9", None, 2.0)
assert result == _EVIDENCE
mock_fn.assert_called_once_with("10.0.0.9", timeout=2.0)
def test_run_returns_none_when_helper_returns_none() -> None:
probe = _make_probe()
with patch("decnet.prober.icmp_error.elicit_icmp_errors", return_value=None):
result = probe.run("10.0.0.9", None, 2.0)
assert result is None
# ─── IcmpErrorProbe.syslog_fields() ──────────────────────────────────────────
_EXPECTED_SD_KEYS = {
"icmp_matrix",
"icmp_fp_hash",
"icmp_port_unreach",
"icmp_time_exceeded",
"icmp_frag_needed",
"icmp_param_problem",
"icmp_port_unreach_rtt_ms",
"icmp_time_exceeded_rtt_ms",
"icmp_frag_needed_rtt_ms",
"icmp_param_problem_rtt_ms",
"icmp_time_exceeded_hop",
}
def test_syslog_fields_byte_stable() -> None:
probe = _make_probe()
fields, _ = probe.syslog_fields("10.0.0.9", None, _EVIDENCE)
assert set(fields.keys()) == _EXPECTED_SD_KEYS
def test_syslog_fields_flag_encoding() -> None:
probe = _make_probe()
fields, _ = probe.syslog_fields("10.0.0.9", None, _EVIDENCE)
assert fields["icmp_port_unreach"] == "1"
assert fields["icmp_time_exceeded"] == "1"
assert fields["icmp_frag_needed"] == "0"
assert fields["icmp_param_problem"] == "0"
def test_syslog_fields_rtt_populated() -> None:
probe = _make_probe()
fields, _ = probe.syslog_fields("10.0.0.9", None, _EVIDENCE)
assert fields["icmp_port_unreach_rtt_ms"] == "1.5"
assert fields["icmp_time_exceeded_rtt_ms"] == "0.8"
assert fields["icmp_frag_needed_rtt_ms"] == ""
assert fields["icmp_param_problem_rtt_ms"] == ""
def test_syslog_fields_time_exceeded_hop() -> None:
probe = _make_probe()
fields, _ = probe.syslog_fields("10.0.0.9", None, _EVIDENCE)
assert fields["icmp_time_exceeded_hop"] == "192.168.1.1"
def test_syslog_fields_human_msg_contains_ip_and_matrix() -> None:
probe = _make_probe()
_, msg = probe.syslog_fields("10.0.0.9", None, _EVIDENCE)
assert "10.0.0.9" in msg
assert "PT.." in msg
def test_syslog_fields_matrix_and_hash_present() -> None:
probe = _make_probe()
fields, _ = probe.syslog_fields("10.0.0.9", None, _EVIDENCE)
assert fields["icmp_matrix"] == "PT.."
assert fields["icmp_fp_hash"] == _EVIDENCE["fingerprint_hash"]
# ─── IcmpErrorProbe.publish_payload() ────────────────────────────────────────
def test_publish_payload_structure() -> None:
probe = _make_probe()
payload = probe.publish_payload("10.0.0.9", None, _EVIDENCE)
assert payload["attacker_ip"] == "10.0.0.9"
assert payload["icmp_matrix"] == "PT.."
assert payload["icmp_fp_hash"] == _EVIDENCE["fingerprint_hash"]
assert payload["errors"] is _EVIDENCE["errors"]
assert payload["observed_at"] == _EVIDENCE["observed_at"]
# ─── helper: _parse_reply ────────────────────────────────────────────────────
def _make_mock_resp(icmp_type: int, icmp_code: int, src_ip: str, payload_bytes: bytes = b"\x00" * 20) -> MagicMock:
"""Build a minimal scapy-shaped response mock."""
resp = MagicMock()
resp.time = 0.0
icmp_layer = MagicMock()
icmp_layer.type = icmp_type
icmp_layer.code = icmp_code
icmp_layer.payload = MagicMock()
icmp_layer.payload.__bytes__ = lambda self: payload_bytes
# make bytes(icmp_layer.payload) work
type(icmp_layer.payload).__bytes__ = lambda self: payload_bytes
ip_layer = MagicMock()
ip_layer.src = src_ip
def getitem(key):
from scapy.all import ICMP, IP # type: ignore[attr-defined]
if key is IP or (isinstance(key, type) and key.__name__ == "IP"):
return ip_layer
if key is ICMP or (isinstance(key, type) and key.__name__ == "ICMP"):
return icmp_layer
raise KeyError(key)
resp.__getitem__ = getitem
return resp
# ─── helper: primitive probe unit tests ──────────────────────────────────────
def test_probe_port_unreachable_silent_on_none_response() -> None:
from decnet.prober.icmp_error import _probe_port_unreachable
with patch("decnet.prober.icmp_error._closed_udp_port", return_value=33434), \
patch("decnet.prober.icmp_error._ephemeral", return_value=50000):
with patch("scapy.all.sr1", return_value=None):
result = _probe_port_unreachable("10.0.0.9", 0.1)
assert result["returned"] is False
assert result["rtt_ms"] is None
def test_probe_frag_needed_silent_on_none_response() -> None:
from decnet.prober.icmp_error import _probe_frag_needed
with patch("decnet.prober.icmp_error._closed_udp_port", return_value=33434), \
patch("decnet.prober.icmp_error._ephemeral", return_value=50000):
with patch("scapy.all.sr1", return_value=None):
result = _probe_frag_needed("10.0.0.9", 0.1)
assert result["returned"] is False
def test_probe_param_problem_silent_on_none_response() -> None:
from decnet.prober.icmp_error import _probe_param_problem
with patch("decnet.prober.icmp_error._closed_udp_port", return_value=33434), \
patch("decnet.prober.icmp_error._ephemeral", return_value=50000):
with patch("scapy.all.sr1", return_value=None):
result = _probe_param_problem("10.0.0.9", 0.1)
assert result["returned"] is False
def test_probe_time_exceeded_skipped_when_on_link() -> None:
from decnet.prober.icmp_error import _probe_time_exceeded
with patch("scapy.all.sr1") as mock_sr1:
result = _probe_time_exceeded("10.0.0.9", 0.1, on_link=True)
assert result["returned"] is False
mock_sr1.assert_not_called()
def test_probe_time_exceeded_silent_when_not_on_link() -> None:
from decnet.prober.icmp_error import _probe_time_exceeded
with patch("decnet.prober.icmp_error._closed_udp_port", return_value=33434), \
patch("decnet.prober.icmp_error._ephemeral", return_value=50000):
with patch("scapy.all.sr1", return_value=None):
result = _probe_time_exceeded("10.0.0.9", 0.1, on_link=False)
assert result["returned"] is False
# ─── helper: elicit_icmp_errors ──────────────────────────────────────────────
def test_elicit_returns_none_when_scapy_unavailable() -> None:
from decnet.prober.icmp_error import elicit_icmp_errors
real_import = __builtins__.__import__ if hasattr(__builtins__, "__import__") else __import__
def _import_blocker(name, *args, **kwargs):
if name.startswith("scapy"):
raise ImportError(f"mocked: {name}")
return real_import(name, *args, **kwargs)
import builtins
with patch.object(builtins, "__import__", side_effect=_import_blocker):
result = elicit_icmp_errors("10.0.0.9", 0.1)
assert result is None
def test_elicit_returns_dict_with_all_keys() -> None:
from decnet.prober.icmp_error import elicit_icmp_errors
silent = dict(_SILENT)
# At least one primitive must have sent=True or elicit returns None.
sent_silent = {**silent, "sent": True}
with (
patch("decnet.prober.icmp_error._probe_port_unreachable", return_value=sent_silent),
patch("decnet.prober.icmp_error._probe_time_exceeded", return_value=silent),
patch("decnet.prober.icmp_error._probe_frag_needed", return_value=silent),
patch("decnet.prober.icmp_error._probe_param_problem", return_value=silent),
patch("decnet.prober.ipv6_leak._route_info", return_value=(False, "eth0")),
):
result = elicit_icmp_errors("10.0.0.9", 0.1)
assert result is not None
assert set(result.keys()) == {"matrix", "fingerprint_hash", "errors", "observed_at"}
assert set(result["errors"].keys()) == {
"port_unreachable", "time_exceeded", "frag_needed", "param_problem"
}
def test_elicit_returns_none_when_all_silent_no_caps() -> None:
from decnet.prober.icmp_error import elicit_icmp_errors
silent = dict(_SILENT) # all sent=False (PermissionError path)
with (
patch("decnet.prober.icmp_error._probe_port_unreachable", return_value=silent),
patch("decnet.prober.icmp_error._probe_time_exceeded", return_value=silent),
patch("decnet.prober.icmp_error._probe_frag_needed", return_value=silent),
patch("decnet.prober.icmp_error._probe_param_problem", return_value=silent),
patch("decnet.prober.ipv6_leak._route_info", return_value=(False, "eth0")),
):
result = elicit_icmp_errors("10.0.0.9", 0.1)
assert result is None
def test_fingerprint_hash_stable() -> None:
from decnet.prober.icmp_error import _build_matrix, _compute_hash
errors = {
"port_unreachable": {"returned": True, "icmp_code": 3, "echo_len": 28},
"time_exceeded": {"returned": False, "icmp_code": None, "echo_len": None},
"frag_needed": {"returned": False, "icmp_code": None, "echo_len": None},
"param_problem": {"returned": False, "icmp_code": None, "echo_len": None},
}
matrix = _build_matrix(errors) # type: ignore[arg-type]
h1 = _compute_hash(matrix, errors) # type: ignore[arg-type]
h2 = _compute_hash(matrix, errors) # type: ignore[arg-type]
assert h1 == h2
assert len(h1) == 32
def test_matrix_encoding_table() -> None:
"""Matrix encodes presence/absence for all four primitives correctly."""
from decnet.prober.icmp_error import _build_matrix
def _ret(code: int | None) -> dict[str, Any]:
return {"returned": True, "icmp_code": code, "echo_len": 8}
def _sil() -> dict[str, Any]:
return {"returned": False, "icmp_code": None, "echo_len": None}
# All silent
m = _build_matrix({"port_unreachable": _sil(), "time_exceeded": _sil(), "frag_needed": _sil(), "param_problem": _sil()}) # type: ignore[arg-type]
assert m == "...."
# All returned with codes
m = _build_matrix({"port_unreachable": _ret(3), "time_exceeded": _ret(0), "frag_needed": _ret(4), "param_problem": _ret(0)}) # type: ignore[arg-type]
assert m == "PTFX"
# Mixed — first and third returned
m = _build_matrix({"port_unreachable": _ret(3), "time_exceeded": _sil(), "frag_needed": _ret(4), "param_problem": _sil()}) # type: ignore[arg-type]
assert m == "P.F."
# Returned but code is None → '~' (wrong type / parse failure)
m = _build_matrix({"port_unreachable": _ret(None), "time_exceeded": _sil(), "frag_needed": _sil(), "param_problem": _sil()}) # type: ignore[arg-type]
assert m == "~..."

View File

@@ -28,10 +28,7 @@ def test_jarm_phase_calls_recorder(tmp_path: Path) -> None:
probe = JarmProbe() probe = JarmProbe()
probe._ports = [443] probe._ports = [443]
with ( with patch("decnet.prober.probes.jarm.jarm_hash", return_value="c0c" * 10 + "a" * 32):
patch("decnet.prober.probes.jarm.jarm_hash", return_value="c0c" * 10 + "a" * 32),
patch("decnet.prober.worker.fetch_leaf_cert", return_value=None),
):
_run_probe( _run_probe(
probe, "10.0.0.5", {}, probe, "10.0.0.5", {},
tmp_path / "decnet.log", tmp_path / "decnet.json", tmp_path / "decnet.log", tmp_path / "decnet.json",
@@ -118,10 +115,7 @@ def test_recorder_optional_no_crash_when_none(tmp_path: Path) -> None:
probe = JarmProbe() probe = JarmProbe()
probe._ports = [443] probe._ports = [443]
with ( with patch("decnet.prober.probes.jarm.jarm_hash", return_value="c0c" * 10 + "a" * 32):
patch("decnet.prober.probes.jarm.jarm_hash", return_value="c0c" * 10 + "a" * 32),
patch("decnet.prober.worker.fetch_leaf_cert", return_value=None),
):
_run_probe( _run_probe(
probe, "10.0.0.5", {}, probe, "10.0.0.5", {},
tmp_path / "decnet.log", tmp_path / "decnet.json", tmp_path / "decnet.log", tmp_path / "decnet.json",

View File

@@ -110,13 +110,11 @@ class TestDiscoverAttackers:
class TestProbeCycleJARM: class TestProbeCycleJARM:
@patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None))
@patch("decnet.prober.worker.fetch_leaf_cert", return_value=None)
@patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.tcpfp.tcp_fingerprint")
@patch("decnet.prober.probes.hassh.hassh_server") @patch("decnet.prober.probes.hassh.hassh_server")
@patch("decnet.prober.probes.jarm.jarm_hash") @patch("decnet.prober.probes.jarm.jarm_hash")
def test_probes_new_ips(self, mock_jarm: MagicMock, mock_hassh: MagicMock, def test_probes_new_ips(self, mock_jarm: MagicMock, mock_hassh: MagicMock,
mock_tcpfp: MagicMock, mock_cert: MagicMock, mock_tcpfp: MagicMock, mock_ipv6: MagicMock,
mock_ipv6: MagicMock,
tmp_path: Path, monkeypatch: pytest.MonkeyPatch): tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
monkeypatch.setattr(JarmProbe, "default_ports", [443, 8443]) monkeypatch.setattr(JarmProbe, "default_ports", [443, 8443])
monkeypatch.setattr(HasshProbe, "default_ports", []) monkeypatch.setattr(HasshProbe, "default_ports", [])
@@ -137,13 +135,11 @@ class TestProbeCycleJARM:
assert 8443 in probed["10.0.0.1"]["jarm"] assert 8443 in probed["10.0.0.1"]["jarm"]
@patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None))
@patch("decnet.prober.worker.fetch_leaf_cert", return_value=None)
@patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.tcpfp.tcp_fingerprint")
@patch("decnet.prober.probes.hassh.hassh_server") @patch("decnet.prober.probes.hassh.hassh_server")
@patch("decnet.prober.probes.jarm.jarm_hash") @patch("decnet.prober.probes.jarm.jarm_hash")
def test_skips_already_probed_ports(self, mock_jarm: MagicMock, mock_hassh: MagicMock, def test_skips_already_probed_ports(self, mock_jarm: MagicMock, mock_hassh: MagicMock,
mock_tcpfp: MagicMock, mock_cert: MagicMock, mock_tcpfp: MagicMock, mock_ipv6: MagicMock,
mock_ipv6: MagicMock,
tmp_path: Path, monkeypatch: pytest.MonkeyPatch): tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
monkeypatch.setattr(JarmProbe, "default_ports", [443, 8443]) monkeypatch.setattr(JarmProbe, "default_ports", [443, 8443])
monkeypatch.setattr(HasshProbe, "default_ports", []) monkeypatch.setattr(HasshProbe, "default_ports", [])
@@ -560,34 +556,11 @@ class TestWriteEvent:
assert record["fields"]["target_ip"] == "10.0.0.1" assert record["fields"]["target_ip"] == "10.0.0.1"
# ─── _probe_cycle: TLS certificate capture (after JARM) ─────────────────── # ─── _probe_cycle: TLS certificate capture ────────────────────────────────
# TlsCertProbe is now an independent registered probe (priority=200).
# It calls fetch_leaf_cert directly — not coupled to JARM outcome.
class TestProbeCycleTLSCert: _CERT_STUB = {
@patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None))
@patch("decnet.prober.worker.fetch_leaf_cert")
@patch("decnet.prober.probes.tcpfp.tcp_fingerprint")
@patch("decnet.prober.probes.hassh.hassh_server")
@patch("decnet.prober.probes.jarm.jarm_hash")
def test_cert_event_emitted_after_successful_jarm(
self,
mock_jarm: MagicMock,
mock_hassh: MagicMock,
mock_tcpfp: MagicMock,
mock_cert: MagicMock,
mock_ipv6: MagicMock,
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
):
"""A non-empty JARM hash should trigger a follow-up cert fetch and
write a tls_certificate event with all parsed fields."""
monkeypatch.setattr(JarmProbe, "default_ports", [443])
monkeypatch.setattr(HasshProbe, "default_ports", [])
monkeypatch.setattr(TcpfpProbe, "default_ports", [])
mock_jarm.return_value = "c0c" * 10 + "a" * 32
mock_hassh.return_value = None
mock_tcpfp.return_value = None
mock_cert.return_value = {
"subject_cn": "evil.example.com", "subject_cn": "evil.example.com",
"issuer": "CN=evil.example.com", "issuer": "CN=evil.example.com",
"self_signed": True, "self_signed": True,
@@ -595,36 +568,59 @@ class TestProbeCycleTLSCert:
"not_after": "2027-01-01T00:00:00Z", "not_after": "2027-01-01T00:00:00Z",
"sans": ["evil.example.com", "c2.example.com"], "sans": ["evil.example.com", "c2.example.com"],
"cert_sha256": "ab" * 32, "cert_sha256": "ab" * 32,
} }
class TestProbeCycleTLSCert:
@patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None))
@patch("decnet.prober.probes.tlscert_probe.fetch_leaf_cert")
@patch("decnet.prober.probes.tcpfp.tcp_fingerprint")
@patch("decnet.prober.probes.hassh.hassh_server")
@patch("decnet.prober.probes.jarm.jarm_hash")
def test_cert_event_emitted_for_tls_port(
self,
mock_jarm: MagicMock,
mock_hassh: MagicMock,
mock_tcpfp: MagicMock,
mock_cert: MagicMock,
mock_ipv6: MagicMock,
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
):
"""TlsCertProbe runs independently; a successful fetch writes a tls_certificate event."""
from decnet.prober.probes.tlscert_probe import TlsCertProbe
monkeypatch.setattr(JarmProbe, "default_ports", [])
monkeypatch.setattr(HasshProbe, "default_ports", [])
monkeypatch.setattr(TcpfpProbe, "default_ports", [])
monkeypatch.setattr(TlsCertProbe, "default_ports", [443])
mock_jarm.return_value = JARM_EMPTY_HASH
mock_hassh.return_value = None
mock_tcpfp.return_value = None
mock_cert.return_value = _CERT_STUB
log_path = tmp_path / "decnet.log" log_path = tmp_path / "decnet.log"
json_path = tmp_path / "decnet.json" json_path = tmp_path / "decnet.json"
_probe_cycle({"10.0.0.1"}, {}, log_path, json_path, timeout=1.0) _probe_cycle({"10.0.0.1"}, {}, log_path, json_path, timeout=1.0)
mock_cert.assert_called_once_with("10.0.0.1", 443, timeout=1.0) mock_cert.assert_called_once_with("10.0.0.1", 443, timeout=1.0)
records = [ records = [json.loads(line) for line in json_path.read_text().splitlines() if line]
json.loads(line)
for line in json_path.read_text().splitlines() if line
]
cert_records = [r for r in records if r["event_type"] == "tls_certificate"] cert_records = [r for r in records if r["event_type"] == "tls_certificate"]
assert len(cert_records) == 1 assert len(cert_records) == 1
f = cert_records[0]["fields"] f = cert_records[0]["fields"]
assert f["target_ip"] == "10.0.0.1" assert f["target_ip"] == "10.0.0.1"
assert f["target_port"] == "443" assert f["target_port"] == "443"
assert f["subject_cn"] == "evil.example.com" assert f["subject_cn"] == "evil.example.com"
assert f["issuer"] == "CN=evil.example.com"
assert f["self_signed"] == "true" assert f["self_signed"] == "true"
assert f["not_before"] == "2026-01-01T00:00:00Z"
assert f["not_after"] == "2027-01-01T00:00:00Z"
assert f["sans"] == "evil.example.com,c2.example.com" assert f["sans"] == "evil.example.com,c2.example.com"
assert f["cert_sha256"] == "ab" * 32 assert f["cert_sha256"] == "ab" * 32
@patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None))
@patch("decnet.prober.worker.fetch_leaf_cert") @patch("decnet.prober.probes.tlscert_probe.fetch_leaf_cert", return_value=None)
@patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.tcpfp.tcp_fingerprint")
@patch("decnet.prober.probes.hassh.hassh_server") @patch("decnet.prober.probes.hassh.hassh_server")
@patch("decnet.prober.probes.jarm.jarm_hash") @patch("decnet.prober.probes.jarm.jarm_hash")
def test_cert_fetch_skipped_on_empty_jarm( def test_cert_skipped_when_fetch_returns_none(
self, self,
mock_jarm: MagicMock, mock_jarm: MagicMock,
mock_hassh: MagicMock, mock_hassh: MagicMock,
@@ -634,10 +630,12 @@ class TestProbeCycleTLSCert:
tmp_path: Path, tmp_path: Path,
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
): ):
"""JARM_EMPTY_HASH means the port doesn't speak TLS; skip cert fetch.""" """fetch_leaf_cert returning None → no tls_certificate event."""
monkeypatch.setattr(JarmProbe, "default_ports", [443]) from decnet.prober.probes.tlscert_probe import TlsCertProbe
monkeypatch.setattr(JarmProbe, "default_ports", [])
monkeypatch.setattr(HasshProbe, "default_ports", []) monkeypatch.setattr(HasshProbe, "default_ports", [])
monkeypatch.setattr(TcpfpProbe, "default_ports", []) monkeypatch.setattr(TcpfpProbe, "default_ports", [])
monkeypatch.setattr(TlsCertProbe, "default_ports", [443])
mock_jarm.return_value = JARM_EMPTY_HASH mock_jarm.return_value = JARM_EMPTY_HASH
mock_hassh.return_value = None mock_hassh.return_value = None
mock_tcpfp.return_value = None mock_tcpfp.return_value = None
@@ -646,42 +644,13 @@ class TestProbeCycleTLSCert:
_probe_cycle({"10.0.0.1"}, {}, log_path, json_path, timeout=1.0) _probe_cycle({"10.0.0.1"}, {}, log_path, json_path, timeout=1.0)
mock_cert.assert_not_called()
@patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None))
@patch("decnet.prober.worker.fetch_leaf_cert", return_value=None)
@patch("decnet.prober.probes.tcpfp.tcp_fingerprint")
@patch("decnet.prober.probes.hassh.hassh_server")
@patch("decnet.prober.probes.jarm.jarm_hash")
def test_cert_fetch_failure_silent(
self,
mock_jarm: MagicMock,
mock_hassh: MagicMock,
mock_tcpfp: MagicMock,
mock_cert: MagicMock,
mock_ipv6: MagicMock,
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
):
"""fetch_leaf_cert returning None must not write a cert event."""
monkeypatch.setattr(JarmProbe, "default_ports", [443])
monkeypatch.setattr(HasshProbe, "default_ports", [])
monkeypatch.setattr(TcpfpProbe, "default_ports", [])
mock_jarm.return_value = "c0c" * 10 + "a" * 32
mock_hassh.return_value = None
mock_tcpfp.return_value = None
log_path = tmp_path / "decnet.log"
json_path = tmp_path / "decnet.json"
_probe_cycle({"10.0.0.1"}, {}, log_path, json_path, timeout=1.0)
mock_cert.assert_called_once_with("10.0.0.1", 443, timeout=1.0) mock_cert.assert_called_once_with("10.0.0.1", 443, timeout=1.0)
if json_path.exists(): if json_path.exists():
content = json_path.read_text() content = json_path.read_text()
assert "tls_certificate" not in content assert "tls_certificate" not in content
@patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None))
@patch("decnet.prober.worker.fetch_leaf_cert") @patch("decnet.prober.probes.tlscert_probe.fetch_leaf_cert")
@patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.tcpfp.tcp_fingerprint")
@patch("decnet.prober.probes.hassh.hassh_server") @patch("decnet.prober.probes.hassh.hassh_server")
@patch("decnet.prober.probes.jarm.jarm_hash") @patch("decnet.prober.probes.jarm.jarm_hash")
@@ -695,12 +664,13 @@ class TestProbeCycleTLSCert:
tmp_path: Path, tmp_path: Path,
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
): ):
"""If fetch_leaf_cert throws despite its contract, the JARM phase """fetch_leaf_cert crash is caught by _run_probe; both ports still marked probed."""
must keep moving to the next port without crashing.""" from decnet.prober.probes.tlscert_probe import TlsCertProbe
monkeypatch.setattr(JarmProbe, "default_ports", [443, 8443]) monkeypatch.setattr(JarmProbe, "default_ports", [])
monkeypatch.setattr(HasshProbe, "default_ports", []) monkeypatch.setattr(HasshProbe, "default_ports", [])
monkeypatch.setattr(TcpfpProbe, "default_ports", []) monkeypatch.setattr(TcpfpProbe, "default_ports", [])
mock_jarm.return_value = "c0c" * 10 + "a" * 32 monkeypatch.setattr(TlsCertProbe, "default_ports", [443, 8443])
mock_jarm.return_value = JARM_EMPTY_HASH
mock_hassh.return_value = None mock_hassh.return_value = None
mock_tcpfp.return_value = None mock_tcpfp.return_value = None
mock_cert.side_effect = RuntimeError("unexpected") mock_cert.side_effect = RuntimeError("unexpected")
@@ -709,11 +679,10 @@ class TestProbeCycleTLSCert:
_probe_cycle({"10.0.0.1"}, {}, log_path, json_path, timeout=1.0) _probe_cycle({"10.0.0.1"}, {}, log_path, json_path, timeout=1.0)
# Both ports still marked probed despite the cert-side crash.
assert mock_cert.call_count == 2 assert mock_cert.call_count == 2
@patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None))
@patch("decnet.prober.worker.fetch_leaf_cert") @patch("decnet.prober.probes.tlscert_probe.fetch_leaf_cert")
@patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.tcpfp.tcp_fingerprint")
@patch("decnet.prober.probes.hassh.hassh_server") @patch("decnet.prober.probes.hassh.hassh_server")
@patch("decnet.prober.probes.jarm.jarm_hash") @patch("decnet.prober.probes.jarm.jarm_hash")
@@ -727,11 +696,13 @@ class TestProbeCycleTLSCert:
tmp_path: Path, tmp_path: Path,
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
): ):
"""publish_fn must receive a 'tls_certificate' event when capture succeeds.""" """publish_fn receives a 'tls_certificate' event on successful cert capture."""
monkeypatch.setattr(JarmProbe, "default_ports", [443]) from decnet.prober.probes.tlscert_probe import TlsCertProbe
monkeypatch.setattr(JarmProbe, "default_ports", [])
monkeypatch.setattr(HasshProbe, "default_ports", []) monkeypatch.setattr(HasshProbe, "default_ports", [])
monkeypatch.setattr(TcpfpProbe, "default_ports", []) monkeypatch.setattr(TcpfpProbe, "default_ports", [])
mock_jarm.return_value = "c0c" * 10 + "a" * 32 monkeypatch.setattr(TlsCertProbe, "default_ports", [443])
mock_jarm.return_value = JARM_EMPTY_HASH
mock_hassh.return_value = None mock_hassh.return_value = None
mock_tcpfp.return_value = None mock_tcpfp.return_value = None
mock_cert.return_value = { mock_cert.return_value = {
@@ -760,3 +731,36 @@ class TestProbeCycleTLSCert:
assert cert_pubs[0][1]["port"] == 443 assert cert_pubs[0][1]["port"] == 443
assert cert_pubs[0][1]["cert_sha256"] == "cd" * 32 assert cert_pubs[0][1]["cert_sha256"] == "cd" * 32
assert cert_pubs[0][1]["self_signed"] is True assert cert_pubs[0][1]["self_signed"] is True
@patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None))
@patch("decnet.prober.probes.tlscert_probe.fetch_leaf_cert")
@patch("decnet.prober.probes.tcpfp.tcp_fingerprint")
@patch("decnet.prober.probes.hassh.hassh_server")
@patch("decnet.prober.probes.jarm.jarm_hash")
def test_cert_independent_of_jarm_result(
self,
mock_jarm: MagicMock,
mock_hassh: MagicMock,
mock_tcpfp: MagicMock,
mock_cert: MagicMock,
mock_ipv6: MagicMock,
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
):
"""TlsCertProbe runs regardless of JARM outcome (independent registry probe)."""
from decnet.prober.probes.tlscert_probe import TlsCertProbe
monkeypatch.setattr(JarmProbe, "default_ports", [443])
monkeypatch.setattr(HasshProbe, "default_ports", [])
monkeypatch.setattr(TcpfpProbe, "default_ports", [])
monkeypatch.setattr(TlsCertProbe, "default_ports", [443])
mock_jarm.return_value = JARM_EMPTY_HASH # port doesn't speak TLS per JARM
mock_hassh.return_value = None
mock_tcpfp.return_value = None
mock_cert.return_value = _CERT_STUB
log_path = tmp_path / "decnet.log"
json_path = tmp_path / "decnet.json"
_probe_cycle({"10.0.0.1"}, {}, log_path, json_path, timeout=1.0)
# TlsCertProbe still called despite empty JARM hash
mock_cert.assert_called_once_with("10.0.0.1", 443, timeout=1.0)