From 3f8170be1097aecc3217399cbc229a11ac2b3e62 Mon Sep 17 00:00:00 2001 From: anti Date: Thu, 21 May 2026 15:03:10 -0400 Subject: [PATCH] =?UTF-8?q?feat(prober):=20add=20Icmp6ErrorProbe=20?= =?UTF-8?q?=E2=80=94=20ICMPv6=20error-leakage=20fingerprint?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Four RFC 4443 stimuli (port-unreach, hop-limit-exceeded, unknown-NH, bad-dest-option) produce a 4-char matrix + sha256 fingerprint for IPv6 attackers. Auto-registers via ActiveProbeMeta at priority=860 (after v4 icmp_error=850, before ipv6_leak=999). IPv4 targets fast-return None. --- decnet/prober/icmp6_error.py | 313 ++++++++++++++++++++++ decnet/prober/probes/__init__.py | 1 + decnet/prober/probes/icmp6_error_probe.py | 78 ++++++ tests/prober/test_prober_icmp6_error.py | 310 +++++++++++++++++++++ 4 files changed, 702 insertions(+) create mode 100644 decnet/prober/icmp6_error.py create mode 100644 decnet/prober/probes/icmp6_error_probe.py create mode 100644 tests/prober/test_prober_icmp6_error.py diff --git a/decnet/prober/icmp6_error.py b/decnet/prober/icmp6_error.py new file mode 100644 index 00000000..e04ec078 --- /dev/null +++ b/decnet/prober/icmp6_error.py @@ -0,0 +1,313 @@ +"""ICMPv6 error-elicitation prober. + +Sends four crafted stimuli to a target and records which ICMPv6 error +classes are returned, the per-error RTT, and the bytes echoed back inside +each ICMPv6 error body. Silence is as informative as a reply. + +Requires root / CAP_NET_RAW. Scapy is lazy-imported so non-root callers +of other prober modules are not broken. +""" + +from __future__ import annotations + +import hashlib +import random +from datetime import datetime, timezone +from typing import Any + +from decnet.logging import get_logger +from decnet.telemetry import traced as _traced + +_log = get_logger("prober.icmp6_error") + +# ─── per-primitive result shape ────────────────────────────────────────────── +# { +# "returned": bool, +# "rtt_ms": float | None, +# "src_ip": str | None, # sender of the ICMPv6 error (may differ from target) +# "icmp_code": int | None, +# "echo_len": int | None, # bytes the ICMPv6 body echoed back from our probe +# "echo_bytes_hex": str | None, +# } + +_SILENT: dict[str, Any] = { + "sent": False, # False when PermissionError blocked sr1 (no CAP_NET_RAW) + "returned": False, + "rtt_ms": None, + "src_ip": None, + "icmp_code": None, + "echo_len": None, + "echo_bytes_hex": None, +} + +# Matrix letter codes: uppercase if returned, '.' if silent, '~' if wrong type +_MATRIX_LETTERS = { + "port_unreachable_v6": "U", + "hop_limit_exceeded": "H", + "unknown_next_header": "N", + "bad_dest_option": "B", +} + + +# ─── helpers ────────────────────────────────────────────────────────────────── + +def _ephemeral() -> int: + return random.randint(49152, 65535) # nosec B311 — ephemeral port, not crypto + + +def _closed_udp_port() -> int: + return random.randint(33434, 33534) # nosec B311 + + +def _parse_reply_v6( + resp: Any, + expected_type: int, + expected_code: int | None, +) -> dict[str, Any]: + """Extract a per-primitive result dict from a scapy ICMPv6 error reply.""" + from scapy.layers.inet6 import ( + ICMPv6DestUnreach, + ICMPv6ParamProblem, + ICMPv6TimeExceeded, + IPv6, + ) + + rtt_ms: float | None = None + try: + rtt_ms = round(resp.time * 1000, 3) + except Exception as exc: + _log.debug("icmp6_error: rtt extraction failed: %s", exc) + + result: dict[str, Any] = {"sent": True} + src_ip: str | None = None + try: + src_ip = resp[IPv6].src + except Exception as exc: + _log.debug("icmp6_error: src_ip extraction failed: %s", exc) + + icmp_code: int | None = None + echo_len: int | None = None + echo_bytes_hex: str | None = None + actual_type: int | None = None + + for t, cls in ( + (1, ICMPv6DestUnreach), + (3, ICMPv6TimeExceeded), + (4, ICMPv6ParamProblem), + ): + if resp.haslayer(cls): + actual_type = t + try: + layer = resp[cls] + icmp_code = int(layer.code) + payload = bytes(layer.payload) + if payload: + echo_len = len(payload) + echo_bytes_hex = payload[:32].hex() + except Exception as exc: + _log.debug("icmp6_error: ICMPv6 field extraction failed: %s", exc) + break + + wrong_type = ( + actual_type is None + or actual_type != expected_type + or (expected_code is not None and icmp_code != expected_code) + ) + + result.update({ + "returned": not wrong_type, + "rtt_ms": rtt_ms, + "src_ip": src_ip, + "icmp_code": icmp_code, + "echo_len": echo_len, + "echo_bytes_hex": echo_bytes_hex, + }) + return result + + +# ─── four stimulus primitives ───────────────────────────────────────────────── + +@_traced("prober.icmp6_error.port_unreachable_v6") +def _probe_port_unreachable_v6(target_ip: str, timeout: float) -> dict[str, Any]: + """UDP to a closed port → expect ICMPv6 type=1 code=4 (Port Unreachable).""" + try: + from scapy.layers.inet6 import IPv6, UDP + from scapy.packet import Raw + from scapy.sendrecv import sr1 + import time as _time + + pkt = IPv6(dst=target_ip) / UDP(sport=_ephemeral(), dport=_closed_udp_port()) / Raw(b"\x00" * 32) + t0 = _time.monotonic() + resp = sr1(pkt, timeout=timeout, verbose=0) + if resp is None: + return dict(_SILENT) + resp.time = _time.monotonic() - t0 + return _parse_reply_v6(resp, expected_type=1, expected_code=4) + except (OSError, PermissionError) as exc: + _log.debug("icmp6_error: port_unreachable_v6 probe failed for %s: %s", target_ip, exc) + return dict(_SILENT) + + +@_traced("prober.icmp6_error.hop_limit_exceeded") +def _probe_hop_limit_exceeded( + target_ip: str, + timeout: float, + on_link: bool = False, +) -> dict[str, Any]: + """UDP with hlim=1 → expect ICMPv6 type=3 code=0 (Hop Limit Exceeded) from next-hop. + + Skipped when the attacker is on-link (no intermediate hop to probe). + `on_link` is pre-computed by the caller to avoid a redundant route lookup. + """ + if on_link: + return dict(_SILENT) + try: + from scapy.layers.inet6 import IPv6, UDP + from scapy.packet import Raw + from scapy.sendrecv import sr1 + import time as _time + + pkt = IPv6(dst=target_ip, hlim=1) / UDP(sport=_ephemeral(), dport=_closed_udp_port()) / Raw(b"\x00" * 32) + t0 = _time.monotonic() + resp = sr1(pkt, timeout=timeout, verbose=0) + if resp is None: + return dict(_SILENT) + resp.time = _time.monotonic() - t0 + return _parse_reply_v6(resp, expected_type=3, expected_code=0) + except (OSError, PermissionError) as exc: + _log.debug("icmp6_error: hop_limit_exceeded probe failed for %s: %s", target_ip, exc) + return dict(_SILENT) + + +@_traced("prober.icmp6_error.unknown_next_header") +def _probe_unknown_next_header(target_ip: str, timeout: float) -> dict[str, Any]: + """IPv6 with unrecognised Next Header → expect ICMPv6 type=4 code=1. + + NH=253 (RFC 3692 experimental) forces the target to send Parameter Problem + code=1 (Unrecognized Next Header type encountered). + """ + try: + from scapy.layers.inet6 import IPv6 + from scapy.packet import Raw + from scapy.sendrecv import sr1 + import time as _time + + pkt = IPv6(dst=target_ip, nh=253) / Raw(b"\x00" * 8) + t0 = _time.monotonic() + resp = sr1(pkt, timeout=timeout, verbose=0) + if resp is None: + return dict(_SILENT) + resp.time = _time.monotonic() - t0 + return _parse_reply_v6(resp, expected_type=4, expected_code=1) + except (OSError, PermissionError) as exc: + _log.debug("icmp6_error: unknown_next_header probe failed for %s: %s", target_ip, exc) + return dict(_SILENT) + + +@_traced("prober.icmp6_error.bad_dest_option") +def _probe_bad_dest_option(target_ip: str, timeout: float) -> dict[str, Any]: + """Destination option type=0x80 → expect ICMPv6 type=4 code=2. + + Option type high-bits 10xxxxxx = discard + send ICMPv6 error (RFC 2460 §4.2). + Most stacks silently drop unknown options; absence is still a fingerprint. + """ + try: + from scapy.layers.inet6 import HBHOptUnknown, IPv6, IPv6ExtHdrDestOpt, UDP + from scapy.packet import Raw + from scapy.sendrecv import sr1 + import time as _time + + # 0x80 = 0b10000000: bits 10 → discard packet + send ICMPv6 to source + bad_opt = HBHOptUnknown(otype=0x80, optdata=b"\x00\x00\x00\x00") + pkt = ( + IPv6(dst=target_ip) + / IPv6ExtHdrDestOpt(options=[bad_opt]) + / UDP(sport=_ephemeral(), dport=_closed_udp_port()) + / Raw(b"\x00" * 8) + ) + t0 = _time.monotonic() + resp = sr1(pkt, timeout=timeout, verbose=0) + if resp is None: + return dict(_SILENT) + resp.time = _time.monotonic() - t0 + return _parse_reply_v6(resp, expected_type=4, expected_code=2) + except (OSError, PermissionError) as exc: + _log.debug("icmp6_error: bad_dest_option probe failed for %s: %s", target_ip, exc) + return dict(_SILENT) + + +# ─── matrix + hash ──────────────────────────────────────────────────────────── + +def _build_matrix(errors: dict[str, dict[str, Any]]) -> str: + parts = [] + for key, letter in _MATRIX_LETTERS.items(): + e = errors.get(key, _SILENT) + if not e.get("returned", False): + parts.append(".") + elif e.get("icmp_code") is not None: + parts.append(letter) + else: + parts.append("~") + return "".join(parts) + + +def _compute_hash(matrix: str, errors: dict[str, dict[str, Any]]) -> str: + echo_lens = tuple( + errors.get(k, _SILENT).get("echo_len") or 0 + for k in _MATRIX_LETTERS + ) + icmp_codes = tuple( + errors.get(k, _SILENT).get("icmp_code") or -1 + for k in _MATRIX_LETTERS + ) + raw = f"{matrix}:{echo_lens}:{icmp_codes}" + return hashlib.sha256(raw.encode()).hexdigest()[:32] + + +# ─── public API ─────────────────────────────────────────────────────────────── + +@_traced("prober.icmp6_error.elicit") +def elicit_icmp6_errors( + target_ip: str, + timeout: float = 2.0, +) -> dict[str, Any] | None: + """Send four ICMPv6-eliciting probes and return a fingerprint result dict. + + Returns None when scapy inet6 is unavailable or all four primitives + produced no information (all sent=False, meaning no CAP_NET_RAW). + + Requires root / CAP_NET_RAW. target_ip must be a valid IPv6 address. + """ + try: + import scapy.layers.inet6 # noqa: F401 — presence check + except ImportError: + _log.debug("scapy inet6 not available — icmp6_error active probe skipped") + return None + + on_link = False + try: + from decnet.prober.ipv6_leak import _route_info + on_link, _ = _route_info(target_ip) + except Exception as exc: + _log.debug("icmp6_error: _route_info failed for %s: %s", target_ip, exc) + + errors: dict[str, dict[str, Any]] = { + "port_unreachable_v6": _probe_port_unreachable_v6(target_ip, timeout), + "hop_limit_exceeded": _probe_hop_limit_exceeded(target_ip, timeout, on_link=on_link), + "unknown_next_header": _probe_unknown_next_header(target_ip, timeout), + "bad_dest_option": _probe_bad_dest_option(target_ip, timeout), + } + + if not any(e.get("sent", False) for e in errors.values()): + _log.debug("icmp6_error: no packets sent to %s (no CAP_NET_RAW?)", target_ip) + return None + + matrix = _build_matrix(errors) + fp_hash = _compute_hash(matrix, errors) + + return { + "matrix": matrix, + "fingerprint_hash": fp_hash, + "errors": errors, + "observed_at": datetime.now(timezone.utc).isoformat(), + } diff --git a/decnet/prober/probes/__init__.py b/decnet/prober/probes/__init__.py index c11c48f9..3d4eecd2 100644 --- a/decnet/prober/probes/__init__.py +++ b/decnet/prober/probes/__init__.py @@ -1,5 +1,6 @@ # Import all probe modules to trigger ActiveProbeMeta registration. from decnet.prober.probes.hassh import HasshProbe as HasshProbe +from decnet.prober.probes.icmp6_error_probe import Icmp6ErrorProbe as Icmp6ErrorProbe from decnet.prober.probes.icmp_error_probe import IcmpErrorProbe as IcmpErrorProbe from decnet.prober.probes.ipv6_leak_probe import Ipv6LeakProbe as Ipv6LeakProbe from decnet.prober.probes.jarm import JarmProbe as JarmProbe diff --git a/decnet/prober/probes/icmp6_error_probe.py b/decnet/prober/probes/icmp6_error_probe.py new file mode 100644 index 00000000..1a49a565 --- /dev/null +++ b/decnet/prober/probes/icmp6_error_probe.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +import ipaddress +from typing import Any + +from decnet.logging import get_logger +from decnet.prober.base import ActiveProbe + +_log = get_logger("prober.icmp6_error_probe") + + +class Icmp6ErrorProbe(ActiveProbe): + """Port-free probe that elicits ICMPv6 error replies from an IPv6 attacker. + + Sends four crafted stimuli (UDP/closed-port, hlim=1, unknown NH=253, + bad destination option type=0x80) and records which ICMPv6 error classes + the target emits, per-error RTT, and bytes echoed back in each error body. + + Returns None immediately for IPv4 attacker IPs — those are handled by + IcmpErrorProbe. + + Requires root / CAP_NET_RAW. Scapy is lazy-imported inside the helper. + """ + + probe_name = "icmp6_error" + default_ports: list[int | None] = [None] + event_type = "icmp6_error_leak" + priority = 860 # after icmp_error (850), before ipv6_leak (999) + + def run(self, ip: str, port: int | None, timeout: float) -> dict[str, Any] | None: + try: + if ipaddress.ip_address(ip).version != 6: + return None + except ValueError: + return None + from decnet.prober.icmp6_error import elicit_icmp6_errors + return elicit_icmp6_errors(ip, timeout=timeout) + + def syslog_fields( + self, ip: str, port: int | None, result: dict[str, Any] + ) -> tuple[dict[str, Any], str]: + matrix = result.get("matrix", "") + fp_hash = result.get("fingerprint_hash", "") + errors = result.get("errors", {}) + + def _flag(key: str) -> str: + return "1" if errors.get(key, {}).get("returned", False) else "0" + + def _rtt(key: str) -> str: + v = errors.get(key, {}).get("rtt_ms") + return str(v) if v is not None else "" + + fields: dict[str, Any] = { + "icmp6_matrix": matrix, + "icmp6_fp_hash": fp_hash, + "icmp6_port_unreach": _flag("port_unreachable_v6"), + "icmp6_hop_limit_exceeded": _flag("hop_limit_exceeded"), + "icmp6_unknown_next_header": _flag("unknown_next_header"), + "icmp6_bad_dest_option": _flag("bad_dest_option"), + "icmp6_port_unreach_rtt_ms": _rtt("port_unreachable_v6"), + "icmp6_hop_limit_exceeded_rtt_ms": _rtt("hop_limit_exceeded"), + "icmp6_unknown_next_header_rtt_ms": _rtt("unknown_next_header"), + "icmp6_bad_dest_option_rtt_ms": _rtt("bad_dest_option"), + "icmp6_hop_limit_exceeded_hop": errors.get("hop_limit_exceeded", {}).get("src_ip") or "", + } + msg = f"ICMPv6 leak {ip} → matrix={matrix} fp={fp_hash[:8]}" + return fields, msg + + def publish_payload( + self, ip: str, port: int | None, result: dict[str, Any] + ) -> dict[str, Any]: + return { + "attacker_ip": ip, + "icmp6_matrix": result.get("matrix", ""), + "icmp6_fp_hash": result.get("fingerprint_hash", ""), + "errors": result.get("errors", {}), + "observed_at": result.get("observed_at", ""), + } diff --git a/tests/prober/test_prober_icmp6_error.py b/tests/prober/test_prober_icmp6_error.py new file mode 100644 index 00000000..7f0348e8 --- /dev/null +++ b/tests/prober/test_prober_icmp6_error.py @@ -0,0 +1,310 @@ +"""Tests for Icmp6ErrorProbe and the underlying icmp6_error helpers. + +Covers: +- Icmp6ErrorProbe.run() returns helper result verbatim for IPv6 targets. +- Icmp6ErrorProbe.run() returns None for IPv4 targets (address-family gate). +- Icmp6ErrorProbe.run() returns None when helper returns None. +- Icmp6ErrorProbe.syslog_fields() — stable key set, correct flag encoding, human msg. +- Icmp6ErrorProbe.publish_payload() — correct bus payload shape. +- _probe_port_unreachable_v6 / _probe_hop_limit_exceeded / _probe_unknown_next_header / + _probe_bad_dest_option — silent-timeout cases. +- _probe_hop_limit_exceeded skipped when on-link. +- elicit_icmp6_errors returns None when scapy.layers.inet6 is unavailable. +- elicit_icmp6_errors returns None when all primitives have sent=False. +- Fingerprint hash is deterministic for identical inputs. +- Matrix encoding table-driven across all four present/absent combinations. +- Icmp6ErrorProbe registered in ActiveProbeMeta._registry. +""" +from __future__ import annotations + +from typing import Any +from unittest.mock import patch + +# ─── fixtures ──────────────────────────────────────────────────────────────── + +_SILENT: dict[str, Any] = { + "returned": False, + "rtt_ms": None, + "src_ip": None, + "icmp_code": None, + "echo_len": None, + "echo_bytes_hex": None, +} + +_EVIDENCE: dict[str, Any] = { + "matrix": "UH..", + "fingerprint_hash": "abcdef1234567890abcdef1234567890", + "errors": { + "port_unreachable_v6": { + "sent": True, "returned": True, "rtt_ms": 1.2, "src_ip": "2001:db8::1", + "icmp_code": 4, "echo_len": 48, "echo_bytes_hex": "aabbcc", + }, + "hop_limit_exceeded": { + "sent": True, "returned": True, "rtt_ms": 0.7, "src_ip": "fe80::1", + "icmp_code": 0, "echo_len": 48, "echo_bytes_hex": "ddeeff", + }, + "unknown_next_header": dict(_SILENT), + "bad_dest_option": dict(_SILENT), + }, + "observed_at": "2026-01-01T00:00:00+00:00", +} + +_TARGET_V6 = "2001:db8::9" +_TARGET_V4 = "10.0.0.9" + + +def _make_probe(): + from decnet.prober.probes.icmp6_error_probe import Icmp6ErrorProbe + return Icmp6ErrorProbe() + + +# ─── Icmp6ErrorProbe.run() ──────────────────────────────────────────────────── + +def test_run_returns_evidence_for_v6_target() -> None: + probe = _make_probe() + with patch("decnet.prober.icmp6_error.elicit_icmp6_errors", return_value=_EVIDENCE) as mock_fn: + result = probe.run(_TARGET_V6, None, 2.0) + assert result == _EVIDENCE + mock_fn.assert_called_once_with(_TARGET_V6, timeout=2.0) + + +def test_run_returns_none_for_v4_target() -> None: + probe = _make_probe() + with patch("decnet.prober.icmp6_error.elicit_icmp6_errors") as mock_fn: + result = probe.run(_TARGET_V4, None, 2.0) + assert result is None + mock_fn.assert_not_called() + + +def test_run_returns_none_when_helper_returns_none() -> None: + probe = _make_probe() + with patch("decnet.prober.icmp6_error.elicit_icmp6_errors", return_value=None): + result = probe.run(_TARGET_V6, None, 2.0) + assert result is None + + +# ─── Icmp6ErrorProbe.syslog_fields() ───────────────────────────────────────── + +_EXPECTED_SD_KEYS = { + "icmp6_matrix", + "icmp6_fp_hash", + "icmp6_port_unreach", + "icmp6_hop_limit_exceeded", + "icmp6_unknown_next_header", + "icmp6_bad_dest_option", + "icmp6_port_unreach_rtt_ms", + "icmp6_hop_limit_exceeded_rtt_ms", + "icmp6_unknown_next_header_rtt_ms", + "icmp6_bad_dest_option_rtt_ms", + "icmp6_hop_limit_exceeded_hop", +} + + +def test_syslog_fields_key_stable() -> None: + probe = _make_probe() + fields, _ = probe.syslog_fields(_TARGET_V6, None, _EVIDENCE) + assert set(fields.keys()) == _EXPECTED_SD_KEYS + + +def test_syslog_fields_flag_encoding() -> None: + probe = _make_probe() + fields, _ = probe.syslog_fields(_TARGET_V6, None, _EVIDENCE) + assert fields["icmp6_port_unreach"] == "1" + assert fields["icmp6_hop_limit_exceeded"] == "1" + assert fields["icmp6_unknown_next_header"] == "0" + assert fields["icmp6_bad_dest_option"] == "0" + + +def test_syslog_fields_rtt_populated() -> None: + probe = _make_probe() + fields, _ = probe.syslog_fields(_TARGET_V6, None, _EVIDENCE) + assert fields["icmp6_port_unreach_rtt_ms"] == "1.2" + assert fields["icmp6_hop_limit_exceeded_rtt_ms"] == "0.7" + assert fields["icmp6_unknown_next_header_rtt_ms"] == "" + assert fields["icmp6_bad_dest_option_rtt_ms"] == "" + + +def test_syslog_fields_hop_limit_hop() -> None: + probe = _make_probe() + fields, _ = probe.syslog_fields(_TARGET_V6, None, _EVIDENCE) + assert fields["icmp6_hop_limit_exceeded_hop"] == "fe80::1" + + +def test_syslog_fields_human_msg_contains_ip_and_matrix() -> None: + probe = _make_probe() + _, msg = probe.syslog_fields(_TARGET_V6, None, _EVIDENCE) + assert _TARGET_V6 in msg + assert "UH.." in msg + + +def test_syslog_fields_matrix_and_hash_present() -> None: + probe = _make_probe() + fields, _ = probe.syslog_fields(_TARGET_V6, None, _EVIDENCE) + assert fields["icmp6_matrix"] == "UH.." + assert fields["icmp6_fp_hash"] == _EVIDENCE["fingerprint_hash"] + + +# ─── Icmp6ErrorProbe.publish_payload() ─────────────────────────────────────── + +def test_publish_payload_structure() -> None: + probe = _make_probe() + payload = probe.publish_payload(_TARGET_V6, None, _EVIDENCE) + assert payload["attacker_ip"] == _TARGET_V6 + assert payload["icmp6_matrix"] == "UH.." + assert payload["icmp6_fp_hash"] == _EVIDENCE["fingerprint_hash"] + assert payload["errors"] is _EVIDENCE["errors"] + assert payload["observed_at"] == _EVIDENCE["observed_at"] + + +# ─── primitive: silent-on-None-response cases ───────────────────────────────── + +def test_probe_port_unreachable_v6_silent_on_none_response() -> None: + from decnet.prober.icmp6_error import _probe_port_unreachable_v6 + with patch("decnet.prober.icmp6_error._closed_udp_port", return_value=33434), \ + patch("decnet.prober.icmp6_error._ephemeral", return_value=50000): + with patch("scapy.sendrecv.sr1", return_value=None): + result = _probe_port_unreachable_v6(_TARGET_V6, 0.1) + assert result["returned"] is False + assert result["rtt_ms"] is None + + +def test_probe_unknown_next_header_silent_on_none_response() -> None: + from decnet.prober.icmp6_error import _probe_unknown_next_header + with patch("scapy.sendrecv.sr1", return_value=None): + result = _probe_unknown_next_header(_TARGET_V6, 0.1) + assert result["returned"] is False + + +def test_probe_bad_dest_option_silent_on_none_response() -> None: + from decnet.prober.icmp6_error import _probe_bad_dest_option + with patch("decnet.prober.icmp6_error._closed_udp_port", return_value=33434), \ + patch("decnet.prober.icmp6_error._ephemeral", return_value=50000): + with patch("scapy.sendrecv.sr1", return_value=None): + result = _probe_bad_dest_option(_TARGET_V6, 0.1) + assert result["returned"] is False + + +def test_probe_hop_limit_exceeded_skipped_when_on_link() -> None: + from decnet.prober.icmp6_error import _probe_hop_limit_exceeded + with patch("scapy.sendrecv.sr1") as mock_sr1: + result = _probe_hop_limit_exceeded(_TARGET_V6, 0.1, on_link=True) + assert result["returned"] is False + mock_sr1.assert_not_called() + + +def test_probe_hop_limit_exceeded_silent_when_not_on_link() -> None: + from decnet.prober.icmp6_error import _probe_hop_limit_exceeded + with patch("decnet.prober.icmp6_error._closed_udp_port", return_value=33434), \ + patch("decnet.prober.icmp6_error._ephemeral", return_value=50000): + with patch("scapy.sendrecv.sr1", return_value=None): + result = _probe_hop_limit_exceeded(_TARGET_V6, 0.1, on_link=False) + assert result["returned"] is False + + +# ─── elicit_icmp6_errors ────────────────────────────────────────────────────── + +def test_elicit_returns_none_when_scapy_unavailable() -> None: + from decnet.prober.icmp6_error import elicit_icmp6_errors + import builtins + + real_import = builtins.__import__ + + def _import_blocker(name, *args, **kwargs): + if name.startswith("scapy"): + raise ImportError(f"mocked: {name}") + return real_import(name, *args, **kwargs) + + with patch.object(builtins, "__import__", side_effect=_import_blocker): + result = elicit_icmp6_errors(_TARGET_V6, 0.1) + assert result is None + + +def test_elicit_returns_dict_with_all_keys() -> None: + from decnet.prober.icmp6_error import elicit_icmp6_errors + + silent = dict(_SILENT) + sent_silent = {**silent, "sent": True} + with ( + patch("decnet.prober.icmp6_error._probe_port_unreachable_v6", return_value=sent_silent), + patch("decnet.prober.icmp6_error._probe_hop_limit_exceeded", return_value=silent), + patch("decnet.prober.icmp6_error._probe_unknown_next_header", return_value=silent), + patch("decnet.prober.icmp6_error._probe_bad_dest_option", return_value=silent), + patch("decnet.prober.ipv6_leak._route_info", return_value=(False, "eth0")), + ): + result = elicit_icmp6_errors(_TARGET_V6, 0.1) + + assert result is not None + assert set(result.keys()) == {"matrix", "fingerprint_hash", "errors", "observed_at"} + assert set(result["errors"].keys()) == { + "port_unreachable_v6", "hop_limit_exceeded", "unknown_next_header", "bad_dest_option" + } + + +def test_elicit_returns_none_when_all_silent_no_caps() -> None: + from decnet.prober.icmp6_error import elicit_icmp6_errors + + silent = dict(_SILENT) # all sent=False + with ( + patch("decnet.prober.icmp6_error._probe_port_unreachable_v6", return_value=silent), + patch("decnet.prober.icmp6_error._probe_hop_limit_exceeded", return_value=silent), + patch("decnet.prober.icmp6_error._probe_unknown_next_header", return_value=silent), + patch("decnet.prober.icmp6_error._probe_bad_dest_option", return_value=silent), + patch("decnet.prober.ipv6_leak._route_info", return_value=(False, "eth0")), + ): + result = elicit_icmp6_errors(_TARGET_V6, 0.1) + + assert result is None + + +# ─── hash + matrix purity ───────────────────────────────────────────────────── + +def test_fingerprint_hash_stable() -> None: + from decnet.prober.icmp6_error import _build_matrix, _compute_hash + + errors = { + "port_unreachable_v6": {"returned": True, "icmp_code": 4, "echo_len": 48}, + "hop_limit_exceeded": {"returned": False, "icmp_code": None, "echo_len": None}, + "unknown_next_header": {"returned": False, "icmp_code": None, "echo_len": None}, + "bad_dest_option": {"returned": False, "icmp_code": None, "echo_len": None}, + } + matrix = _build_matrix(errors) # type: ignore[arg-type] + h1 = _compute_hash(matrix, errors) # type: ignore[arg-type] + h2 = _compute_hash(matrix, errors) # type: ignore[arg-type] + assert h1 == h2 + assert len(h1) == 32 + + +def test_matrix_encoding_table() -> None: + """Matrix encodes presence/absence for all four ICMPv6 primitives correctly.""" + from decnet.prober.icmp6_error import _build_matrix + + def _ret(code: int | None) -> dict[str, Any]: + return {"returned": True, "icmp_code": code, "echo_len": 8} + + def _sil() -> dict[str, Any]: + return {"returned": False, "icmp_code": None, "echo_len": None} + + # All silent + m = _build_matrix({"port_unreachable_v6": _sil(), "hop_limit_exceeded": _sil(), "unknown_next_header": _sil(), "bad_dest_option": _sil()}) # type: ignore[arg-type] + assert m == "...." + + # All returned with codes + m = _build_matrix({"port_unreachable_v6": _ret(4), "hop_limit_exceeded": _ret(0), "unknown_next_header": _ret(1), "bad_dest_option": _ret(2)}) # type: ignore[arg-type] + assert m == "UHNB" + + # Mixed — first and third returned + m = _build_matrix({"port_unreachable_v6": _ret(4), "hop_limit_exceeded": _sil(), "unknown_next_header": _ret(1), "bad_dest_option": _sil()}) # type: ignore[arg-type] + assert m == "U.N." + + # Returned but code is None → '~' + m = _build_matrix({"port_unreachable_v6": _ret(None), "hop_limit_exceeded": _sil(), "unknown_next_header": _sil(), "bad_dest_option": _sil()}) # type: ignore[arg-type] + assert m == "~..." + + +# ─── metaclass registration ─────────────────────────────────────────────────── + +def test_icmp6_error_probe_registered() -> None: + import decnet.prober.probes # noqa: F401 — triggers registration + from decnet.prober.base import ActiveProbeMeta + names = {cls.probe_name for cls in ActiveProbeMeta.all()} + assert "icmp6_error" in names