diff --git a/decnet/prober/icmp_error.py b/decnet/prober/icmp_error.py new file mode 100644 index 00000000..1be3f3e4 --- /dev/null +++ b/decnet/prober/icmp_error.py @@ -0,0 +1,304 @@ +"""ICMP error-elicitation prober. + +Sends four crafted stimuli to a target and records which ICMP error +classes are returned, the per-error RTT, and the bytes echoed back inside +each ICMP error body. Silence is as informative as a reply — Linux emits +at most 1 ICMP error/sec by default, so rate-limited absences are +fingerprint-worthy too. + +Requires root / CAP_NET_RAW. Scapy is lazy-imported so non-root callers +of other prober modules are not broken. +""" + +from __future__ import annotations + +import hashlib +import random +from datetime import datetime, timezone +from typing import Any + +from decnet.logging import get_logger +from decnet.telemetry import traced as _traced + +_log = get_logger("prober.icmp_error") + +# ─── per-primitive result shape ────────────────────────────────────────────── +# { +# "returned": bool, +# "rtt_ms": float | None, +# "src_ip": str | None, # sender of the ICMP error (may differ from target) +# "icmp_code": int | None, +# "echo_len": int | None, # bytes the ICMP body echoed back from our probe +# "echo_bytes_hex": str | None, +# } + +_SILENT: dict[str, Any] = { + "sent": False, # False when PermissionError blocked sr1 (no CAP_NET_RAW) + "returned": False, + "rtt_ms": None, + "src_ip": None, + "icmp_code": None, + "echo_len": None, + "echo_bytes_hex": None, +} + +# Matrix letter codes: uppercase if returned, '.' if silent, '~' if wrong type +_MATRIX_LETTERS = { + "port_unreachable": "P", + "time_exceeded": "T", + "frag_needed": "F", + "param_problem": "X", +} + + +# ─── helpers ────────────────────────────────────────────────────────────────── + +def _ephemeral() -> int: + return random.randint(49152, 65535) # nosec B311 — ephemeral port, not crypto + + +def _closed_udp_port() -> int: + # traceroute range — blends with normal network probing traffic + return random.randint(33434, 33534) # nosec B311 + + +def _parse_reply(resp: Any, expected_type: int, expected_code: int | None, start_ns: int) -> dict[str, Any]: + """Extract a per-primitive result dict from a scapy ICMP reply.""" + from scapy.all import ICMP, IP # type: ignore[attr-defined] + + rtt_ms: float | None = None + try: + rtt_ms = round((resp.time - start_ns) * 1000, 3) + except Exception as exc: + _log.debug("icmp_error: rtt extraction failed: %s", exc) + + result: dict[str, Any] = {"sent": True} # we made it past sr1 + src_ip: str | None = None + try: + src_ip = resp[IP].src + except Exception as exc: + _log.debug("icmp_error: src_ip extraction failed: %s", exc) + + icmp_code: int | None = None + echo_len: int | None = None + echo_bytes_hex: str | None = None + + try: + icmp_layer = resp[ICMP] + icmp_code = int(icmp_layer.code) + + # ICMP error bodies echo the original IP header + first 8 bytes of transport. + payload = bytes(icmp_layer.payload) + if payload: + echo_len = len(payload) + echo_bytes_hex = payload[:32].hex() + except Exception as exc: + _log.debug("icmp_error: ICMP field extraction failed: %s", exc) + + wrong_type = False + try: + icmp_type = int(resp[ICMP].type) + wrong_type = icmp_type != expected_type or ( + expected_code is not None and icmp_code != expected_code + ) + except Exception: + wrong_type = True + + result.update({ + "returned": not wrong_type, + "rtt_ms": rtt_ms, + "src_ip": src_ip, + "icmp_code": icmp_code, + "echo_len": echo_len, + "echo_bytes_hex": echo_bytes_hex, + }) + return result + + +# ─── four stimulus primitives ───────────────────────────────────────────────── + +@_traced("prober.icmp_error.port_unreachable") +def _probe_port_unreachable(target_ip: str, timeout: float) -> dict[str, Any]: + """UDP to a closed port → expect ICMP type=3 code=3 (Port Unreachable).""" + try: + from scapy.all import IP, Raw, UDP, conf, sr1 # type: ignore[attr-defined] + import time as _time + + conf.verb = 0 + dport = _closed_udp_port() + pkt = IP(dst=target_ip) / UDP(sport=_ephemeral(), dport=dport) / Raw(b"\x00" * 32) + t0 = _time.monotonic() + resp = sr1(pkt, timeout=timeout, verbose=0) + if resp is None: + return dict(_SILENT) + resp.time = _time.monotonic() - t0 + return _parse_reply(resp, expected_type=3, expected_code=3, start_ns=0) + except (OSError, PermissionError) as exc: + _log.debug("icmp_error: port_unreachable probe failed for %s: %s", target_ip, exc) + return dict(_SILENT) + + +@_traced("prober.icmp_error.time_exceeded") +def _probe_time_exceeded( + target_ip: str, + timeout: float, + on_link: bool = False, +) -> dict[str, Any]: + """UDP with ttl=1 → expect ICMP type=11 code=0 (Time Exceeded) from next-hop. + + Skipped when the attacker is on-link (no intermediate hop to probe). + `on_link` is pre-computed by the caller to avoid a redundant route lookup. + """ + if on_link: + return dict(_SILENT) + try: + from scapy.all import IP, Raw, UDP, conf, sr1 # type: ignore[attr-defined] + import time as _time + + conf.verb = 0 + dport = _closed_udp_port() + pkt = IP(dst=target_ip, ttl=1) / UDP(sport=_ephemeral(), dport=dport) / Raw(b"\x00" * 32) + t0 = _time.monotonic() + resp = sr1(pkt, timeout=timeout, verbose=0) + if resp is None: + return dict(_SILENT) + resp.time = _time.monotonic() - t0 + return _parse_reply(resp, expected_type=11, expected_code=0, start_ns=0) + except (OSError, PermissionError) as exc: + _log.debug("icmp_error: time_exceeded probe failed for %s: %s", target_ip, exc) + return dict(_SILENT) + + +@_traced("prober.icmp_error.frag_needed") +def _probe_frag_needed(target_ip: str, timeout: float) -> dict[str, Any]: + """Oversized UDP with DF=1 → expect ICMP type=3 code=4 (Fragmentation Needed).""" + try: + from scapy.all import IP, Raw, UDP, conf, sr1 # type: ignore[attr-defined] + import time as _time + + conf.verb = 0 + # 1500 bytes payload forces fragmentation when DF is set + pkt = ( + IP(dst=target_ip, flags="DF") + / UDP(sport=_ephemeral(), dport=_closed_udp_port()) + / Raw(b"\x00" * 1500) + ) + t0 = _time.monotonic() + resp = sr1(pkt, timeout=timeout, verbose=0) + if resp is None: + return dict(_SILENT) + resp.time = _time.monotonic() - t0 + return _parse_reply(resp, expected_type=3, expected_code=4, start_ns=0) + except (OSError, PermissionError) as exc: + _log.debug("icmp_error: frag_needed probe failed for %s: %s", target_ip, exc) + return dict(_SILENT) + + +@_traced("prober.icmp_error.param_problem") +def _probe_param_problem(target_ip: str, timeout: float) -> dict[str, Any]: + """IP packet with malformed option → expect ICMP type=12 (Parameter Problem). + + Most stacks silently drop malformed options; absence is still a fingerprint. + """ + try: + from scapy.all import IP, Raw, UDP, conf, sr1 # type: ignore[attr-defined] + import time as _time + + conf.verb = 0 + # Option class=0 (control), number=17 (MTU probe, often unrecognised), + # length byte deliberately wrong (2 instead of a valid even value). + bad_opt = b"\x91\x02" # type=0x91 (copied flag + class 0 + number 17), len=2 + pkt = ( + IP(dst=target_ip, options=bad_opt) + / UDP(sport=_ephemeral(), dport=_closed_udp_port()) + / Raw(b"\x00" * 8) + ) + t0 = _time.monotonic() + resp = sr1(pkt, timeout=timeout, verbose=0) + if resp is None: + return dict(_SILENT) + resp.time = _time.monotonic() - t0 + return _parse_reply(resp, expected_type=12, expected_code=None, start_ns=0) + except (OSError, PermissionError) as exc: + _log.debug("icmp_error: param_problem probe failed for %s: %s", target_ip, exc) + return dict(_SILENT) + + +# ─── matrix + hash ──────────────────────────────────────────────────────────── + +def _build_matrix(errors: dict[str, dict[str, Any]]) -> str: + parts = [] + for key, letter in _MATRIX_LETTERS.items(): + e = errors.get(key, _SILENT) + if not e.get("returned", False): + parts.append(".") + elif e.get("icmp_code") is not None: + parts.append(letter) + else: + parts.append("~") + return "".join(parts) + + +def _compute_hash(matrix: str, errors: dict[str, dict[str, Any]]) -> str: + echo_lens = tuple( + errors.get(k, _SILENT).get("echo_len") or 0 + for k in _MATRIX_LETTERS + ) + icmp_codes = tuple( + errors.get(k, _SILENT).get("icmp_code") or -1 + for k in _MATRIX_LETTERS + ) + raw = f"{matrix}:{echo_lens}:{icmp_codes}" + return hashlib.sha256(raw.encode()).hexdigest()[:32] + + +# ─── public API ─────────────────────────────────────────────────────────────── + +@_traced("prober.icmp_error.elicit") +def elicit_icmp_errors( + target_ip: str, + timeout: float = 2.0, +) -> dict[str, Any] | None: + """Send four ICMP-eliciting probes and return a fingerprint result dict. + + Returns None when scapy is unavailable or all four primitives produced + no information (all silent with no exception — pure silence is still + fingerprint-worthy, but we need at least one probe to have executed + without a hard error before returning a result). + + Requires root / CAP_NET_RAW. + """ + try: + import scapy.all # noqa: F401 — presence check + except ImportError: + _log.debug("scapy not available — icmp_error active probe skipped") + return None + + # Pre-compute on-link status once for the time_exceeded gate. + on_link = False + try: + from decnet.prober.ipv6_leak import _route_info + on_link, _ = _route_info(target_ip) + except Exception as exc: + _log.debug("icmp_error: _route_info failed for %s: %s", target_ip, exc) + + errors: dict[str, dict[str, Any]] = { + "port_unreachable": _probe_port_unreachable(target_ip, timeout), + "time_exceeded": _probe_time_exceeded(target_ip, timeout, on_link=on_link), + "frag_needed": _probe_frag_needed(target_ip, timeout), + "param_problem": _probe_param_problem(target_ip, timeout), + } + + if not any(e.get("sent", False) for e in errors.values()): + _log.debug("icmp_error: no packets sent to %s (no CAP_NET_RAW?)", target_ip) + return None + + matrix = _build_matrix(errors) + fp_hash = _compute_hash(matrix, errors) + + return { + "matrix": matrix, + "fingerprint_hash": fp_hash, + "errors": errors, + "observed_at": datetime.now(timezone.utc).isoformat(), + } diff --git a/decnet/prober/probes/__init__.py b/decnet/prober/probes/__init__.py index 55dc7f4b..c11c48f9 100644 --- a/decnet/prober/probes/__init__.py +++ b/decnet/prober/probes/__init__.py @@ -1,5 +1,6 @@ # Import all probe modules to trigger ActiveProbeMeta registration. from decnet.prober.probes.hassh import HasshProbe as HasshProbe +from decnet.prober.probes.icmp_error_probe import IcmpErrorProbe as IcmpErrorProbe from decnet.prober.probes.ipv6_leak_probe import Ipv6LeakProbe as Ipv6LeakProbe from decnet.prober.probes.jarm import JarmProbe as JarmProbe from decnet.prober.probes.tcpfp import TcpfpProbe as TcpfpProbe diff --git a/decnet/prober/probes/icmp_error_probe.py b/decnet/prober/probes/icmp_error_probe.py new file mode 100644 index 00000000..39504c84 --- /dev/null +++ b/decnet/prober/probes/icmp_error_probe.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +from typing import Any + +from decnet.logging import get_logger +from decnet.prober.base import ActiveProbe + +_log = get_logger("prober.icmp_error_probe") + + +class IcmpErrorProbe(ActiveProbe): + """Port-free probe that elicits ICMP error replies from the attacker. + + Sends four crafted stimuli (UDP/closed-port, TTL=1, DF+oversized, bad + IP option) and records which ICMP error classes the target emits, the + per-error RTT, and bytes echoed back in each ICMP error body. + + Silent responses are as fingerprint-worthy as replies: Linux emits at + most 1 ICMP error/sec, so rate-limited absences reveal OS behaviour. + + Requires root / CAP_NET_RAW. Scapy is lazy-imported inside the helper. + """ + + probe_name = "icmp_error" + default_ports: list[int | None] = [None] + event_type = "icmp_error_leak" + priority = 850 # after TCP/TLS (100-200), before ipv6_leak (999) + + def run(self, ip: str, port: int | None, timeout: float) -> dict[str, Any] | None: + from decnet.prober.icmp_error import elicit_icmp_errors + return elicit_icmp_errors(ip, timeout=timeout) + + def syslog_fields( + self, ip: str, port: int | None, result: dict[str, Any] + ) -> tuple[dict[str, Any], str]: + matrix = result.get("matrix", "") + fp_hash = result.get("fingerprint_hash", "") + errors = result.get("errors", {}) + + def _flag(key: str) -> str: + return "1" if errors.get(key, {}).get("returned", False) else "0" + + def _rtt(key: str) -> str: + v = errors.get(key, {}).get("rtt_ms") + return str(v) if v is not None else "" + + fields: dict[str, Any] = { + "icmp_matrix": matrix, + "icmp_fp_hash": fp_hash, + "icmp_port_unreach": _flag("port_unreachable"), + "icmp_time_exceeded": _flag("time_exceeded"), + "icmp_frag_needed": _flag("frag_needed"), + "icmp_param_problem": _flag("param_problem"), + "icmp_port_unreach_rtt_ms": _rtt("port_unreachable"), + "icmp_time_exceeded_rtt_ms": _rtt("time_exceeded"), + "icmp_frag_needed_rtt_ms": _rtt("frag_needed"), + "icmp_param_problem_rtt_ms": _rtt("param_problem"), + "icmp_time_exceeded_hop": errors.get("time_exceeded", {}).get("src_ip") or "", + } + msg = f"ICMP leak {ip} → matrix={matrix} fp={fp_hash[:8]}" + return fields, msg + + def publish_payload( + self, ip: str, port: int | None, result: dict[str, Any] + ) -> dict[str, Any]: + return { + "attacker_ip": ip, + "icmp_matrix": result.get("matrix", ""), + "icmp_fp_hash": result.get("fingerprint_hash", ""), + "errors": result.get("errors", {}), + "observed_at": result.get("observed_at", ""), + } diff --git a/tests/prober/test_active_probe_registry.py b/tests/prober/test_active_probe_registry.py index 44388a5e..fe283bda 100644 --- a/tests/prober/test_active_probe_registry.py +++ b/tests/prober/test_active_probe_registry.py @@ -23,12 +23,15 @@ class TestRegistryContents: def test_all_probes_registered(self): names = {cls.probe_name for cls in ActiveProbeMeta.all()} - assert names == {"jarm", "hassh", "tcpfp", "ipv6_leak"} + assert names == {"jarm", "hassh", "tcpfp", "ipv6_leak", "tls_certificate", "icmp_error"} def test_sorted_by_priority_then_name(self): order = [cls.probe_name for cls in ActiveProbeMeta.all()] - # hassh/jarm/tcpfp all priority=100 (alphabetical), ipv6_leak priority=999 last - assert order == ["hassh", "jarm", "tcpfp", "ipv6_leak"] + # priority=100: hassh/jarm/tcpfp (alphabetical) + # priority=200: tls_certificate + # priority=850: icmp_error + # priority=999: ipv6_leak + assert order == ["hassh", "jarm", "tcpfp", "tls_certificate", "icmp_error", "ipv6_leak"] def test_priority10_probe_sorts_first(self): class _FastProbe(ActiveProbe): @@ -48,7 +51,7 @@ class TestRegistryContents: order = [cls.probe_name for cls in ActiveProbeMeta.all()] assert order[0] == "_fast_test_probe" - assert set(order[1:]) == {"hassh", "jarm", "tcpfp", "ipv6_leak"} + assert set(order[1:]) == {"hassh", "jarm", "tcpfp", "ipv6_leak", "tls_certificate", "icmp_error"} def test_port_none_probe_dispatched_with_none_port(self): """_run_probe must call run(ip, None, timeout) for a port-free probe.""" diff --git a/tests/prober/test_prober_icmp_error.py b/tests/prober/test_prober_icmp_error.py new file mode 100644 index 00000000..cfe412fe --- /dev/null +++ b/tests/prober/test_prober_icmp_error.py @@ -0,0 +1,319 @@ +"""Tests for IcmpErrorProbe and the underlying icmp_error helpers. + +Covers: +- IcmpErrorProbe.run() returns helper result verbatim. +- IcmpErrorProbe.run() returns None when helper returns None. +- IcmpErrorProbe.syslog_fields() — stable key set, correct flag encoding, human msg. +- IcmpErrorProbe.publish_payload() — correct bus payload shape. +- _probe_port_unreachable / _probe_time_exceeded / _probe_frag_needed / + _probe_param_problem — returned-reply and silent-timeout cases. +- _probe_time_exceeded skipped when on-link. +- elicit_icmp_errors returns None when scapy is unavailable. +- Fingerprint hash is deterministic for identical inputs. +- Matrix encoding table-driven across all four present/absent combinations. +""" +from __future__ import annotations + +import importlib +from typing import Any +from unittest.mock import MagicMock, call, patch + +# ─── fixtures ──────────────────────────────────────────────────────────────── + +_SILENT: dict[str, Any] = { + "returned": False, + "rtt_ms": None, + "src_ip": None, + "icmp_code": None, + "echo_len": None, + "echo_bytes_hex": None, +} + +_EVIDENCE: dict[str, Any] = { + "matrix": "PT..", + "fingerprint_hash": "abcdef1234567890abcdef1234567890", + "errors": { + "port_unreachable": { + "sent": True, "returned": True, "rtt_ms": 1.5, "src_ip": "10.0.0.9", + "icmp_code": 3, "echo_len": 28, "echo_bytes_hex": "aabbcc", + }, + "time_exceeded": { + "sent": True, "returned": True, "rtt_ms": 0.8, "src_ip": "192.168.1.1", + "icmp_code": 0, "echo_len": 28, "echo_bytes_hex": "ddeeff", + }, + "frag_needed": dict(_SILENT), + "param_problem": dict(_SILENT), + }, + "observed_at": "2026-01-01T00:00:00+00:00", +} + + +def _make_probe(): + from decnet.prober.probes.icmp_error_probe import IcmpErrorProbe + return IcmpErrorProbe() + + +# ─── IcmpErrorProbe.run() ───────────────────────────────────────────────────── + +def test_run_returns_evidence() -> None: + probe = _make_probe() + with patch("decnet.prober.icmp_error.elicit_icmp_errors", return_value=_EVIDENCE) as mock_fn: + result = probe.run("10.0.0.9", None, 2.0) + assert result == _EVIDENCE + mock_fn.assert_called_once_with("10.0.0.9", timeout=2.0) + + +def test_run_returns_none_when_helper_returns_none() -> None: + probe = _make_probe() + with patch("decnet.prober.icmp_error.elicit_icmp_errors", return_value=None): + result = probe.run("10.0.0.9", None, 2.0) + assert result is None + + +# ─── IcmpErrorProbe.syslog_fields() ────────────────────────────────────────── + +_EXPECTED_SD_KEYS = { + "icmp_matrix", + "icmp_fp_hash", + "icmp_port_unreach", + "icmp_time_exceeded", + "icmp_frag_needed", + "icmp_param_problem", + "icmp_port_unreach_rtt_ms", + "icmp_time_exceeded_rtt_ms", + "icmp_frag_needed_rtt_ms", + "icmp_param_problem_rtt_ms", + "icmp_time_exceeded_hop", +} + + +def test_syslog_fields_byte_stable() -> None: + probe = _make_probe() + fields, _ = probe.syslog_fields("10.0.0.9", None, _EVIDENCE) + assert set(fields.keys()) == _EXPECTED_SD_KEYS + + +def test_syslog_fields_flag_encoding() -> None: + probe = _make_probe() + fields, _ = probe.syslog_fields("10.0.0.9", None, _EVIDENCE) + assert fields["icmp_port_unreach"] == "1" + assert fields["icmp_time_exceeded"] == "1" + assert fields["icmp_frag_needed"] == "0" + assert fields["icmp_param_problem"] == "0" + + +def test_syslog_fields_rtt_populated() -> None: + probe = _make_probe() + fields, _ = probe.syslog_fields("10.0.0.9", None, _EVIDENCE) + assert fields["icmp_port_unreach_rtt_ms"] == "1.5" + assert fields["icmp_time_exceeded_rtt_ms"] == "0.8" + assert fields["icmp_frag_needed_rtt_ms"] == "" + assert fields["icmp_param_problem_rtt_ms"] == "" + + +def test_syslog_fields_time_exceeded_hop() -> None: + probe = _make_probe() + fields, _ = probe.syslog_fields("10.0.0.9", None, _EVIDENCE) + assert fields["icmp_time_exceeded_hop"] == "192.168.1.1" + + +def test_syslog_fields_human_msg_contains_ip_and_matrix() -> None: + probe = _make_probe() + _, msg = probe.syslog_fields("10.0.0.9", None, _EVIDENCE) + assert "10.0.0.9" in msg + assert "PT.." in msg + + +def test_syslog_fields_matrix_and_hash_present() -> None: + probe = _make_probe() + fields, _ = probe.syslog_fields("10.0.0.9", None, _EVIDENCE) + assert fields["icmp_matrix"] == "PT.." + assert fields["icmp_fp_hash"] == _EVIDENCE["fingerprint_hash"] + + +# ─── IcmpErrorProbe.publish_payload() ──────────────────────────────────────── + +def test_publish_payload_structure() -> None: + probe = _make_probe() + payload = probe.publish_payload("10.0.0.9", None, _EVIDENCE) + assert payload["attacker_ip"] == "10.0.0.9" + assert payload["icmp_matrix"] == "PT.." + assert payload["icmp_fp_hash"] == _EVIDENCE["fingerprint_hash"] + assert payload["errors"] is _EVIDENCE["errors"] + assert payload["observed_at"] == _EVIDENCE["observed_at"] + + +# ─── helper: _parse_reply ──────────────────────────────────────────────────── + +def _make_mock_resp(icmp_type: int, icmp_code: int, src_ip: str, payload_bytes: bytes = b"\x00" * 20) -> MagicMock: + """Build a minimal scapy-shaped response mock.""" + resp = MagicMock() + resp.time = 0.0 + + icmp_layer = MagicMock() + icmp_layer.type = icmp_type + icmp_layer.code = icmp_code + icmp_layer.payload = MagicMock() + icmp_layer.payload.__bytes__ = lambda self: payload_bytes + # make bytes(icmp_layer.payload) work + type(icmp_layer.payload).__bytes__ = lambda self: payload_bytes + + ip_layer = MagicMock() + ip_layer.src = src_ip + + def getitem(key): + from scapy.all import ICMP, IP # type: ignore[attr-defined] + if key is IP or (isinstance(key, type) and key.__name__ == "IP"): + return ip_layer + if key is ICMP or (isinstance(key, type) and key.__name__ == "ICMP"): + return icmp_layer + raise KeyError(key) + + resp.__getitem__ = getitem + return resp + + +# ─── helper: primitive probe unit tests ────────────────────────────────────── + +def test_probe_port_unreachable_silent_on_none_response() -> None: + from decnet.prober.icmp_error import _probe_port_unreachable + with patch("decnet.prober.icmp_error._closed_udp_port", return_value=33434), \ + patch("decnet.prober.icmp_error._ephemeral", return_value=50000): + with patch("scapy.all.sr1", return_value=None): + result = _probe_port_unreachable("10.0.0.9", 0.1) + assert result["returned"] is False + assert result["rtt_ms"] is None + + +def test_probe_frag_needed_silent_on_none_response() -> None: + from decnet.prober.icmp_error import _probe_frag_needed + with patch("decnet.prober.icmp_error._closed_udp_port", return_value=33434), \ + patch("decnet.prober.icmp_error._ephemeral", return_value=50000): + with patch("scapy.all.sr1", return_value=None): + result = _probe_frag_needed("10.0.0.9", 0.1) + assert result["returned"] is False + + +def test_probe_param_problem_silent_on_none_response() -> None: + from decnet.prober.icmp_error import _probe_param_problem + with patch("decnet.prober.icmp_error._closed_udp_port", return_value=33434), \ + patch("decnet.prober.icmp_error._ephemeral", return_value=50000): + with patch("scapy.all.sr1", return_value=None): + result = _probe_param_problem("10.0.0.9", 0.1) + assert result["returned"] is False + + +def test_probe_time_exceeded_skipped_when_on_link() -> None: + from decnet.prober.icmp_error import _probe_time_exceeded + with patch("scapy.all.sr1") as mock_sr1: + result = _probe_time_exceeded("10.0.0.9", 0.1, on_link=True) + assert result["returned"] is False + mock_sr1.assert_not_called() + + +def test_probe_time_exceeded_silent_when_not_on_link() -> None: + from decnet.prober.icmp_error import _probe_time_exceeded + with patch("decnet.prober.icmp_error._closed_udp_port", return_value=33434), \ + patch("decnet.prober.icmp_error._ephemeral", return_value=50000): + with patch("scapy.all.sr1", return_value=None): + result = _probe_time_exceeded("10.0.0.9", 0.1, on_link=False) + assert result["returned"] is False + + +# ─── helper: elicit_icmp_errors ────────────────────────────────────────────── + +def test_elicit_returns_none_when_scapy_unavailable() -> None: + from decnet.prober.icmp_error import elicit_icmp_errors + + real_import = __builtins__.__import__ if hasattr(__builtins__, "__import__") else __import__ + + def _import_blocker(name, *args, **kwargs): + if name.startswith("scapy"): + raise ImportError(f"mocked: {name}") + return real_import(name, *args, **kwargs) + + import builtins + with patch.object(builtins, "__import__", side_effect=_import_blocker): + result = elicit_icmp_errors("10.0.0.9", 0.1) + assert result is None + + +def test_elicit_returns_dict_with_all_keys() -> None: + from decnet.prober.icmp_error import elicit_icmp_errors + + silent = dict(_SILENT) + # At least one primitive must have sent=True or elicit returns None. + sent_silent = {**silent, "sent": True} + with ( + patch("decnet.prober.icmp_error._probe_port_unreachable", return_value=sent_silent), + patch("decnet.prober.icmp_error._probe_time_exceeded", return_value=silent), + patch("decnet.prober.icmp_error._probe_frag_needed", return_value=silent), + patch("decnet.prober.icmp_error._probe_param_problem", return_value=silent), + patch("decnet.prober.ipv6_leak._route_info", return_value=(False, "eth0")), + ): + result = elicit_icmp_errors("10.0.0.9", 0.1) + + assert result is not None + assert set(result.keys()) == {"matrix", "fingerprint_hash", "errors", "observed_at"} + assert set(result["errors"].keys()) == { + "port_unreachable", "time_exceeded", "frag_needed", "param_problem" + } + + +def test_elicit_returns_none_when_all_silent_no_caps() -> None: + from decnet.prober.icmp_error import elicit_icmp_errors + + silent = dict(_SILENT) # all sent=False (PermissionError path) + with ( + patch("decnet.prober.icmp_error._probe_port_unreachable", return_value=silent), + patch("decnet.prober.icmp_error._probe_time_exceeded", return_value=silent), + patch("decnet.prober.icmp_error._probe_frag_needed", return_value=silent), + patch("decnet.prober.icmp_error._probe_param_problem", return_value=silent), + patch("decnet.prober.ipv6_leak._route_info", return_value=(False, "eth0")), + ): + result = elicit_icmp_errors("10.0.0.9", 0.1) + + assert result is None + + +def test_fingerprint_hash_stable() -> None: + from decnet.prober.icmp_error import _build_matrix, _compute_hash + + errors = { + "port_unreachable": {"returned": True, "icmp_code": 3, "echo_len": 28}, + "time_exceeded": {"returned": False, "icmp_code": None, "echo_len": None}, + "frag_needed": {"returned": False, "icmp_code": None, "echo_len": None}, + "param_problem": {"returned": False, "icmp_code": None, "echo_len": None}, + } + matrix = _build_matrix(errors) # type: ignore[arg-type] + h1 = _compute_hash(matrix, errors) # type: ignore[arg-type] + h2 = _compute_hash(matrix, errors) # type: ignore[arg-type] + assert h1 == h2 + assert len(h1) == 32 + + +def test_matrix_encoding_table() -> None: + """Matrix encodes presence/absence for all four primitives correctly.""" + from decnet.prober.icmp_error import _build_matrix + + def _ret(code: int | None) -> dict[str, Any]: + return {"returned": True, "icmp_code": code, "echo_len": 8} + + def _sil() -> dict[str, Any]: + return {"returned": False, "icmp_code": None, "echo_len": None} + + # All silent + m = _build_matrix({"port_unreachable": _sil(), "time_exceeded": _sil(), "frag_needed": _sil(), "param_problem": _sil()}) # type: ignore[arg-type] + assert m == "...." + + # All returned with codes + m = _build_matrix({"port_unreachable": _ret(3), "time_exceeded": _ret(0), "frag_needed": _ret(4), "param_problem": _ret(0)}) # type: ignore[arg-type] + assert m == "PTFX" + + # Mixed — first and third returned + m = _build_matrix({"port_unreachable": _ret(3), "time_exceeded": _sil(), "frag_needed": _ret(4), "param_problem": _sil()}) # type: ignore[arg-type] + assert m == "P.F." + + # Returned but code is None → '~' (wrong type / parse failure) + m = _build_matrix({"port_unreachable": _ret(None), "time_exceeded": _sil(), "frag_needed": _sil(), "param_problem": _sil()}) # type: ignore[arg-type] + assert m == "~..." diff --git a/tests/prober/test_prober_rotation.py b/tests/prober/test_prober_rotation.py index f00aa69d..d0e2ba03 100644 --- a/tests/prober/test_prober_rotation.py +++ b/tests/prober/test_prober_rotation.py @@ -28,10 +28,7 @@ def test_jarm_phase_calls_recorder(tmp_path: Path) -> None: probe = JarmProbe() probe._ports = [443] - with ( - patch("decnet.prober.probes.jarm.jarm_hash", return_value="c0c" * 10 + "a" * 32), - patch("decnet.prober.worker.fetch_leaf_cert", return_value=None), - ): + with patch("decnet.prober.probes.jarm.jarm_hash", return_value="c0c" * 10 + "a" * 32): _run_probe( probe, "10.0.0.5", {}, tmp_path / "decnet.log", tmp_path / "decnet.json", @@ -118,10 +115,7 @@ def test_recorder_optional_no_crash_when_none(tmp_path: Path) -> None: probe = JarmProbe() probe._ports = [443] - with ( - patch("decnet.prober.probes.jarm.jarm_hash", return_value="c0c" * 10 + "a" * 32), - patch("decnet.prober.worker.fetch_leaf_cert", return_value=None), - ): + with patch("decnet.prober.probes.jarm.jarm_hash", return_value="c0c" * 10 + "a" * 32): _run_probe( probe, "10.0.0.5", {}, tmp_path / "decnet.log", tmp_path / "decnet.json", diff --git a/tests/prober/test_prober_worker.py b/tests/prober/test_prober_worker.py index c8b0d92f..7a5687e6 100644 --- a/tests/prober/test_prober_worker.py +++ b/tests/prober/test_prober_worker.py @@ -110,13 +110,11 @@ class TestDiscoverAttackers: class TestProbeCycleJARM: @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) - @patch("decnet.prober.worker.fetch_leaf_cert", return_value=None) @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server") @patch("decnet.prober.probes.jarm.jarm_hash") def test_probes_new_ips(self, mock_jarm: MagicMock, mock_hassh: MagicMock, - mock_tcpfp: MagicMock, mock_cert: MagicMock, - mock_ipv6: MagicMock, + mock_tcpfp: MagicMock, mock_ipv6: MagicMock, tmp_path: Path, monkeypatch: pytest.MonkeyPatch): monkeypatch.setattr(JarmProbe, "default_ports", [443, 8443]) monkeypatch.setattr(HasshProbe, "default_ports", []) @@ -137,13 +135,11 @@ class TestProbeCycleJARM: assert 8443 in probed["10.0.0.1"]["jarm"] @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) - @patch("decnet.prober.worker.fetch_leaf_cert", return_value=None) @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server") @patch("decnet.prober.probes.jarm.jarm_hash") def test_skips_already_probed_ports(self, mock_jarm: MagicMock, mock_hassh: MagicMock, - mock_tcpfp: MagicMock, mock_cert: MagicMock, - mock_ipv6: MagicMock, + mock_tcpfp: MagicMock, mock_ipv6: MagicMock, tmp_path: Path, monkeypatch: pytest.MonkeyPatch): monkeypatch.setattr(JarmProbe, "default_ports", [443, 8443]) monkeypatch.setattr(HasshProbe, "default_ports", []) @@ -560,16 +556,29 @@ class TestWriteEvent: assert record["fields"]["target_ip"] == "10.0.0.1" -# ─── _probe_cycle: TLS certificate capture (after JARM) ─────────────────── +# ─── _probe_cycle: TLS certificate capture ──────────────────────────────── +# TlsCertProbe is now an independent registered probe (priority=200). +# It calls fetch_leaf_cert directly — not coupled to JARM outcome. + +_CERT_STUB = { + "subject_cn": "evil.example.com", + "issuer": "CN=evil.example.com", + "self_signed": True, + "not_before": "2026-01-01T00:00:00Z", + "not_after": "2027-01-01T00:00:00Z", + "sans": ["evil.example.com", "c2.example.com"], + "cert_sha256": "ab" * 32, +} + class TestProbeCycleTLSCert: @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) - @patch("decnet.prober.worker.fetch_leaf_cert") + @patch("decnet.prober.probes.tlscert_probe.fetch_leaf_cert") @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server") @patch("decnet.prober.probes.jarm.jarm_hash") - def test_cert_event_emitted_after_successful_jarm( + def test_cert_event_emitted_for_tls_port( self, mock_jarm: MagicMock, mock_hassh: MagicMock, @@ -579,52 +588,39 @@ class TestProbeCycleTLSCert: tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ): - """A non-empty JARM hash should trigger a follow-up cert fetch and - write a tls_certificate event with all parsed fields.""" - monkeypatch.setattr(JarmProbe, "default_ports", [443]) + """TlsCertProbe runs independently; a successful fetch writes a tls_certificate event.""" + from decnet.prober.probes.tlscert_probe import TlsCertProbe + monkeypatch.setattr(JarmProbe, "default_ports", []) monkeypatch.setattr(HasshProbe, "default_ports", []) monkeypatch.setattr(TcpfpProbe, "default_ports", []) - mock_jarm.return_value = "c0c" * 10 + "a" * 32 + monkeypatch.setattr(TlsCertProbe, "default_ports", [443]) + mock_jarm.return_value = JARM_EMPTY_HASH mock_hassh.return_value = None mock_tcpfp.return_value = None - mock_cert.return_value = { - "subject_cn": "evil.example.com", - "issuer": "CN=evil.example.com", - "self_signed": True, - "not_before": "2026-01-01T00:00:00Z", - "not_after": "2027-01-01T00:00:00Z", - "sans": ["evil.example.com", "c2.example.com"], - "cert_sha256": "ab" * 32, - } + mock_cert.return_value = _CERT_STUB log_path = tmp_path / "decnet.log" json_path = tmp_path / "decnet.json" _probe_cycle({"10.0.0.1"}, {}, log_path, json_path, timeout=1.0) mock_cert.assert_called_once_with("10.0.0.1", 443, timeout=1.0) - records = [ - json.loads(line) - for line in json_path.read_text().splitlines() if line - ] + records = [json.loads(line) for line in json_path.read_text().splitlines() if line] cert_records = [r for r in records if r["event_type"] == "tls_certificate"] assert len(cert_records) == 1 f = cert_records[0]["fields"] assert f["target_ip"] == "10.0.0.1" assert f["target_port"] == "443" assert f["subject_cn"] == "evil.example.com" - assert f["issuer"] == "CN=evil.example.com" assert f["self_signed"] == "true" - assert f["not_before"] == "2026-01-01T00:00:00Z" - assert f["not_after"] == "2027-01-01T00:00:00Z" assert f["sans"] == "evil.example.com,c2.example.com" assert f["cert_sha256"] == "ab" * 32 @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) - @patch("decnet.prober.worker.fetch_leaf_cert") + @patch("decnet.prober.probes.tlscert_probe.fetch_leaf_cert", return_value=None) @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server") @patch("decnet.prober.probes.jarm.jarm_hash") - def test_cert_fetch_skipped_on_empty_jarm( + def test_cert_skipped_when_fetch_returns_none( self, mock_jarm: MagicMock, mock_hassh: MagicMock, @@ -634,10 +630,12 @@ class TestProbeCycleTLSCert: tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ): - """JARM_EMPTY_HASH means the port doesn't speak TLS; skip cert fetch.""" - monkeypatch.setattr(JarmProbe, "default_ports", [443]) + """fetch_leaf_cert returning None → no tls_certificate event.""" + from decnet.prober.probes.tlscert_probe import TlsCertProbe + monkeypatch.setattr(JarmProbe, "default_ports", []) monkeypatch.setattr(HasshProbe, "default_ports", []) monkeypatch.setattr(TcpfpProbe, "default_ports", []) + monkeypatch.setattr(TlsCertProbe, "default_ports", [443]) mock_jarm.return_value = JARM_EMPTY_HASH mock_hassh.return_value = None mock_tcpfp.return_value = None @@ -646,42 +644,13 @@ class TestProbeCycleTLSCert: _probe_cycle({"10.0.0.1"}, {}, log_path, json_path, timeout=1.0) - mock_cert.assert_not_called() - - @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) - @patch("decnet.prober.worker.fetch_leaf_cert", return_value=None) - @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") - @patch("decnet.prober.probes.hassh.hassh_server") - @patch("decnet.prober.probes.jarm.jarm_hash") - def test_cert_fetch_failure_silent( - self, - mock_jarm: MagicMock, - mock_hassh: MagicMock, - mock_tcpfp: MagicMock, - mock_cert: MagicMock, - mock_ipv6: MagicMock, - tmp_path: Path, - monkeypatch: pytest.MonkeyPatch, - ): - """fetch_leaf_cert returning None must not write a cert event.""" - monkeypatch.setattr(JarmProbe, "default_ports", [443]) - monkeypatch.setattr(HasshProbe, "default_ports", []) - monkeypatch.setattr(TcpfpProbe, "default_ports", []) - mock_jarm.return_value = "c0c" * 10 + "a" * 32 - mock_hassh.return_value = None - mock_tcpfp.return_value = None - log_path = tmp_path / "decnet.log" - json_path = tmp_path / "decnet.json" - - _probe_cycle({"10.0.0.1"}, {}, log_path, json_path, timeout=1.0) - mock_cert.assert_called_once_with("10.0.0.1", 443, timeout=1.0) if json_path.exists(): content = json_path.read_text() assert "tls_certificate" not in content @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) - @patch("decnet.prober.worker.fetch_leaf_cert") + @patch("decnet.prober.probes.tlscert_probe.fetch_leaf_cert") @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server") @patch("decnet.prober.probes.jarm.jarm_hash") @@ -695,12 +664,13 @@ class TestProbeCycleTLSCert: tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ): - """If fetch_leaf_cert throws despite its contract, the JARM phase - must keep moving to the next port without crashing.""" - monkeypatch.setattr(JarmProbe, "default_ports", [443, 8443]) + """fetch_leaf_cert crash is caught by _run_probe; both ports still marked probed.""" + from decnet.prober.probes.tlscert_probe import TlsCertProbe + monkeypatch.setattr(JarmProbe, "default_ports", []) monkeypatch.setattr(HasshProbe, "default_ports", []) monkeypatch.setattr(TcpfpProbe, "default_ports", []) - mock_jarm.return_value = "c0c" * 10 + "a" * 32 + monkeypatch.setattr(TlsCertProbe, "default_ports", [443, 8443]) + mock_jarm.return_value = JARM_EMPTY_HASH mock_hassh.return_value = None mock_tcpfp.return_value = None mock_cert.side_effect = RuntimeError("unexpected") @@ -709,11 +679,10 @@ class TestProbeCycleTLSCert: _probe_cycle({"10.0.0.1"}, {}, log_path, json_path, timeout=1.0) - # Both ports still marked probed despite the cert-side crash. assert mock_cert.call_count == 2 @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) - @patch("decnet.prober.worker.fetch_leaf_cert") + @patch("decnet.prober.probes.tlscert_probe.fetch_leaf_cert") @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") @patch("decnet.prober.probes.hassh.hassh_server") @patch("decnet.prober.probes.jarm.jarm_hash") @@ -727,11 +696,13 @@ class TestProbeCycleTLSCert: tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ): - """publish_fn must receive a 'tls_certificate' event when capture succeeds.""" - monkeypatch.setattr(JarmProbe, "default_ports", [443]) + """publish_fn receives a 'tls_certificate' event on successful cert capture.""" + from decnet.prober.probes.tlscert_probe import TlsCertProbe + monkeypatch.setattr(JarmProbe, "default_ports", []) monkeypatch.setattr(HasshProbe, "default_ports", []) monkeypatch.setattr(TcpfpProbe, "default_ports", []) - mock_jarm.return_value = "c0c" * 10 + "a" * 32 + monkeypatch.setattr(TlsCertProbe, "default_ports", [443]) + mock_jarm.return_value = JARM_EMPTY_HASH mock_hassh.return_value = None mock_tcpfp.return_value = None mock_cert.return_value = { @@ -760,3 +731,36 @@ class TestProbeCycleTLSCert: assert cert_pubs[0][1]["port"] == 443 assert cert_pubs[0][1]["cert_sha256"] == "cd" * 32 assert cert_pubs[0][1]["self_signed"] is True + + @patch("decnet.prober.ipv6_leak._route_info", return_value=(False, None)) + @patch("decnet.prober.probes.tlscert_probe.fetch_leaf_cert") + @patch("decnet.prober.probes.tcpfp.tcp_fingerprint") + @patch("decnet.prober.probes.hassh.hassh_server") + @patch("decnet.prober.probes.jarm.jarm_hash") + def test_cert_independent_of_jarm_result( + self, + mock_jarm: MagicMock, + mock_hassh: MagicMock, + mock_tcpfp: MagicMock, + mock_cert: MagicMock, + mock_ipv6: MagicMock, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + ): + """TlsCertProbe runs regardless of JARM outcome (independent registry probe).""" + from decnet.prober.probes.tlscert_probe import TlsCertProbe + monkeypatch.setattr(JarmProbe, "default_ports", [443]) + monkeypatch.setattr(HasshProbe, "default_ports", []) + monkeypatch.setattr(TcpfpProbe, "default_ports", []) + monkeypatch.setattr(TlsCertProbe, "default_ports", [443]) + mock_jarm.return_value = JARM_EMPTY_HASH # port doesn't speak TLS per JARM + mock_hassh.return_value = None + mock_tcpfp.return_value = None + mock_cert.return_value = _CERT_STUB + log_path = tmp_path / "decnet.log" + json_path = tmp_path / "decnet.json" + + _probe_cycle({"10.0.0.1"}, {}, log_path, json_path, timeout=1.0) + + # TlsCertProbe still called despite empty JARM hash + mock_cert.assert_called_once_with("10.0.0.1", 443, timeout=1.0)