feat(prober): add Icmp6ErrorProbe — ICMPv6 error-leakage fingerprint

Four RFC 4443 stimuli (port-unreach, hop-limit-exceeded, unknown-NH,
bad-dest-option) produce a 4-char matrix + sha256 fingerprint for IPv6
attackers. Auto-registers via ActiveProbeMeta at priority=860 (after v4
icmp_error=850, before ipv6_leak=999). IPv4 targets fast-return None.
This commit is contained in:
2026-05-21 15:03:10 -04:00
parent 56229a272b
commit 3f8170be10
4 changed files with 702 additions and 0 deletions

View File

@@ -0,0 +1,313 @@
"""ICMPv6 error-elicitation prober.
Sends four crafted stimuli to a target and records which ICMPv6 error
classes are returned, the per-error RTT, and the bytes echoed back inside
each ICMPv6 error body. Silence is as informative as a reply.
Requires root / CAP_NET_RAW. Scapy is lazy-imported so non-root callers
of other prober modules are not broken.
"""
from __future__ import annotations
import hashlib
import random
from datetime import datetime, timezone
from typing import Any
from decnet.logging import get_logger
from decnet.telemetry import traced as _traced
_log = get_logger("prober.icmp6_error")
# ─── per-primitive result shape ──────────────────────────────────────────────
# {
# "returned": bool,
# "rtt_ms": float | None,
# "src_ip": str | None, # sender of the ICMPv6 error (may differ from target)
# "icmp_code": int | None,
# "echo_len": int | None, # bytes the ICMPv6 body echoed back from our probe
# "echo_bytes_hex": str | None,
# }
_SILENT: dict[str, Any] = {
"sent": False, # False when PermissionError blocked sr1 (no CAP_NET_RAW)
"returned": False,
"rtt_ms": None,
"src_ip": None,
"icmp_code": None,
"echo_len": None,
"echo_bytes_hex": None,
}
# Matrix letter codes: uppercase if returned, '.' if silent, '~' if wrong type
_MATRIX_LETTERS = {
"port_unreachable_v6": "U",
"hop_limit_exceeded": "H",
"unknown_next_header": "N",
"bad_dest_option": "B",
}
# ─── helpers ──────────────────────────────────────────────────────────────────
def _ephemeral() -> int:
return random.randint(49152, 65535) # nosec B311 — ephemeral port, not crypto
def _closed_udp_port() -> int:
return random.randint(33434, 33534) # nosec B311
def _parse_reply_v6(
resp: Any,
expected_type: int,
expected_code: int | None,
) -> dict[str, Any]:
"""Extract a per-primitive result dict from a scapy ICMPv6 error reply."""
from scapy.layers.inet6 import (
ICMPv6DestUnreach,
ICMPv6ParamProblem,
ICMPv6TimeExceeded,
IPv6,
)
rtt_ms: float | None = None
try:
rtt_ms = round(resp.time * 1000, 3)
except Exception as exc:
_log.debug("icmp6_error: rtt extraction failed: %s", exc)
result: dict[str, Any] = {"sent": True}
src_ip: str | None = None
try:
src_ip = resp[IPv6].src
except Exception as exc:
_log.debug("icmp6_error: src_ip extraction failed: %s", exc)
icmp_code: int | None = None
echo_len: int | None = None
echo_bytes_hex: str | None = None
actual_type: int | None = None
for t, cls in (
(1, ICMPv6DestUnreach),
(3, ICMPv6TimeExceeded),
(4, ICMPv6ParamProblem),
):
if resp.haslayer(cls):
actual_type = t
try:
layer = resp[cls]
icmp_code = int(layer.code)
payload = bytes(layer.payload)
if payload:
echo_len = len(payload)
echo_bytes_hex = payload[:32].hex()
except Exception as exc:
_log.debug("icmp6_error: ICMPv6 field extraction failed: %s", exc)
break
wrong_type = (
actual_type is None
or actual_type != expected_type
or (expected_code is not None and icmp_code != expected_code)
)
result.update({
"returned": not wrong_type,
"rtt_ms": rtt_ms,
"src_ip": src_ip,
"icmp_code": icmp_code,
"echo_len": echo_len,
"echo_bytes_hex": echo_bytes_hex,
})
return result
# ─── four stimulus primitives ─────────────────────────────────────────────────
@_traced("prober.icmp6_error.port_unreachable_v6")
def _probe_port_unreachable_v6(target_ip: str, timeout: float) -> dict[str, Any]:
"""UDP to a closed port → expect ICMPv6 type=1 code=4 (Port Unreachable)."""
try:
from scapy.layers.inet6 import IPv6, UDP
from scapy.packet import Raw
from scapy.sendrecv import sr1
import time as _time
pkt = IPv6(dst=target_ip) / UDP(sport=_ephemeral(), dport=_closed_udp_port()) / Raw(b"\x00" * 32)
t0 = _time.monotonic()
resp = sr1(pkt, timeout=timeout, verbose=0)
if resp is None:
return dict(_SILENT)
resp.time = _time.monotonic() - t0
return _parse_reply_v6(resp, expected_type=1, expected_code=4)
except (OSError, PermissionError) as exc:
_log.debug("icmp6_error: port_unreachable_v6 probe failed for %s: %s", target_ip, exc)
return dict(_SILENT)
@_traced("prober.icmp6_error.hop_limit_exceeded")
def _probe_hop_limit_exceeded(
target_ip: str,
timeout: float,
on_link: bool = False,
) -> dict[str, Any]:
"""UDP with hlim=1 → expect ICMPv6 type=3 code=0 (Hop Limit Exceeded) from next-hop.
Skipped when the attacker is on-link (no intermediate hop to probe).
`on_link` is pre-computed by the caller to avoid a redundant route lookup.
"""
if on_link:
return dict(_SILENT)
try:
from scapy.layers.inet6 import IPv6, UDP
from scapy.packet import Raw
from scapy.sendrecv import sr1
import time as _time
pkt = IPv6(dst=target_ip, hlim=1) / UDP(sport=_ephemeral(), dport=_closed_udp_port()) / Raw(b"\x00" * 32)
t0 = _time.monotonic()
resp = sr1(pkt, timeout=timeout, verbose=0)
if resp is None:
return dict(_SILENT)
resp.time = _time.monotonic() - t0
return _parse_reply_v6(resp, expected_type=3, expected_code=0)
except (OSError, PermissionError) as exc:
_log.debug("icmp6_error: hop_limit_exceeded probe failed for %s: %s", target_ip, exc)
return dict(_SILENT)
@_traced("prober.icmp6_error.unknown_next_header")
def _probe_unknown_next_header(target_ip: str, timeout: float) -> dict[str, Any]:
"""IPv6 with unrecognised Next Header → expect ICMPv6 type=4 code=1.
NH=253 (RFC 3692 experimental) forces the target to send Parameter Problem
code=1 (Unrecognized Next Header type encountered).
"""
try:
from scapy.layers.inet6 import IPv6
from scapy.packet import Raw
from scapy.sendrecv import sr1
import time as _time
pkt = IPv6(dst=target_ip, nh=253) / Raw(b"\x00" * 8)
t0 = _time.monotonic()
resp = sr1(pkt, timeout=timeout, verbose=0)
if resp is None:
return dict(_SILENT)
resp.time = _time.monotonic() - t0
return _parse_reply_v6(resp, expected_type=4, expected_code=1)
except (OSError, PermissionError) as exc:
_log.debug("icmp6_error: unknown_next_header probe failed for %s: %s", target_ip, exc)
return dict(_SILENT)
@_traced("prober.icmp6_error.bad_dest_option")
def _probe_bad_dest_option(target_ip: str, timeout: float) -> dict[str, Any]:
"""Destination option type=0x80 → expect ICMPv6 type=4 code=2.
Option type high-bits 10xxxxxx = discard + send ICMPv6 error (RFC 2460 §4.2).
Most stacks silently drop unknown options; absence is still a fingerprint.
"""
try:
from scapy.layers.inet6 import HBHOptUnknown, IPv6, IPv6ExtHdrDestOpt, UDP
from scapy.packet import Raw
from scapy.sendrecv import sr1
import time as _time
# 0x80 = 0b10000000: bits 10 → discard packet + send ICMPv6 to source
bad_opt = HBHOptUnknown(otype=0x80, optdata=b"\x00\x00\x00\x00")
pkt = (
IPv6(dst=target_ip)
/ IPv6ExtHdrDestOpt(options=[bad_opt])
/ UDP(sport=_ephemeral(), dport=_closed_udp_port())
/ Raw(b"\x00" * 8)
)
t0 = _time.monotonic()
resp = sr1(pkt, timeout=timeout, verbose=0)
if resp is None:
return dict(_SILENT)
resp.time = _time.monotonic() - t0
return _parse_reply_v6(resp, expected_type=4, expected_code=2)
except (OSError, PermissionError) as exc:
_log.debug("icmp6_error: bad_dest_option probe failed for %s: %s", target_ip, exc)
return dict(_SILENT)
# ─── matrix + hash ────────────────────────────────────────────────────────────
def _build_matrix(errors: dict[str, dict[str, Any]]) -> str:
parts = []
for key, letter in _MATRIX_LETTERS.items():
e = errors.get(key, _SILENT)
if not e.get("returned", False):
parts.append(".")
elif e.get("icmp_code") is not None:
parts.append(letter)
else:
parts.append("~")
return "".join(parts)
def _compute_hash(matrix: str, errors: dict[str, dict[str, Any]]) -> str:
echo_lens = tuple(
errors.get(k, _SILENT).get("echo_len") or 0
for k in _MATRIX_LETTERS
)
icmp_codes = tuple(
errors.get(k, _SILENT).get("icmp_code") or -1
for k in _MATRIX_LETTERS
)
raw = f"{matrix}:{echo_lens}:{icmp_codes}"
return hashlib.sha256(raw.encode()).hexdigest()[:32]
# ─── public API ───────────────────────────────────────────────────────────────
@_traced("prober.icmp6_error.elicit")
def elicit_icmp6_errors(
target_ip: str,
timeout: float = 2.0,
) -> dict[str, Any] | None:
"""Send four ICMPv6-eliciting probes and return a fingerprint result dict.
Returns None when scapy inet6 is unavailable or all four primitives
produced no information (all sent=False, meaning no CAP_NET_RAW).
Requires root / CAP_NET_RAW. target_ip must be a valid IPv6 address.
"""
try:
import scapy.layers.inet6 # noqa: F401 — presence check
except ImportError:
_log.debug("scapy inet6 not available — icmp6_error active probe skipped")
return None
on_link = False
try:
from decnet.prober.ipv6_leak import _route_info
on_link, _ = _route_info(target_ip)
except Exception as exc:
_log.debug("icmp6_error: _route_info failed for %s: %s", target_ip, exc)
errors: dict[str, dict[str, Any]] = {
"port_unreachable_v6": _probe_port_unreachable_v6(target_ip, timeout),
"hop_limit_exceeded": _probe_hop_limit_exceeded(target_ip, timeout, on_link=on_link),
"unknown_next_header": _probe_unknown_next_header(target_ip, timeout),
"bad_dest_option": _probe_bad_dest_option(target_ip, timeout),
}
if not any(e.get("sent", False) for e in errors.values()):
_log.debug("icmp6_error: no packets sent to %s (no CAP_NET_RAW?)", target_ip)
return None
matrix = _build_matrix(errors)
fp_hash = _compute_hash(matrix, errors)
return {
"matrix": matrix,
"fingerprint_hash": fp_hash,
"errors": errors,
"observed_at": datetime.now(timezone.utc).isoformat(),
}

View File

@@ -1,5 +1,6 @@
# Import all probe modules to trigger ActiveProbeMeta registration.
from decnet.prober.probes.hassh import HasshProbe as HasshProbe
from decnet.prober.probes.icmp6_error_probe import Icmp6ErrorProbe as Icmp6ErrorProbe
from decnet.prober.probes.icmp_error_probe import IcmpErrorProbe as IcmpErrorProbe
from decnet.prober.probes.ipv6_leak_probe import Ipv6LeakProbe as Ipv6LeakProbe
from decnet.prober.probes.jarm import JarmProbe as JarmProbe

View File

@@ -0,0 +1,78 @@
from __future__ import annotations
import ipaddress
from typing import Any
from decnet.logging import get_logger
from decnet.prober.base import ActiveProbe
_log = get_logger("prober.icmp6_error_probe")
class Icmp6ErrorProbe(ActiveProbe):
"""Port-free probe that elicits ICMPv6 error replies from an IPv6 attacker.
Sends four crafted stimuli (UDP/closed-port, hlim=1, unknown NH=253,
bad destination option type=0x80) and records which ICMPv6 error classes
the target emits, per-error RTT, and bytes echoed back in each error body.
Returns None immediately for IPv4 attacker IPs — those are handled by
IcmpErrorProbe.
Requires root / CAP_NET_RAW. Scapy is lazy-imported inside the helper.
"""
probe_name = "icmp6_error"
default_ports: list[int | None] = [None]
event_type = "icmp6_error_leak"
priority = 860 # after icmp_error (850), before ipv6_leak (999)
def run(self, ip: str, port: int | None, timeout: float) -> dict[str, Any] | None:
try:
if ipaddress.ip_address(ip).version != 6:
return None
except ValueError:
return None
from decnet.prober.icmp6_error import elicit_icmp6_errors
return elicit_icmp6_errors(ip, timeout=timeout)
def syslog_fields(
self, ip: str, port: int | None, result: dict[str, Any]
) -> tuple[dict[str, Any], str]:
matrix = result.get("matrix", "")
fp_hash = result.get("fingerprint_hash", "")
errors = result.get("errors", {})
def _flag(key: str) -> str:
return "1" if errors.get(key, {}).get("returned", False) else "0"
def _rtt(key: str) -> str:
v = errors.get(key, {}).get("rtt_ms")
return str(v) if v is not None else ""
fields: dict[str, Any] = {
"icmp6_matrix": matrix,
"icmp6_fp_hash": fp_hash,
"icmp6_port_unreach": _flag("port_unreachable_v6"),
"icmp6_hop_limit_exceeded": _flag("hop_limit_exceeded"),
"icmp6_unknown_next_header": _flag("unknown_next_header"),
"icmp6_bad_dest_option": _flag("bad_dest_option"),
"icmp6_port_unreach_rtt_ms": _rtt("port_unreachable_v6"),
"icmp6_hop_limit_exceeded_rtt_ms": _rtt("hop_limit_exceeded"),
"icmp6_unknown_next_header_rtt_ms": _rtt("unknown_next_header"),
"icmp6_bad_dest_option_rtt_ms": _rtt("bad_dest_option"),
"icmp6_hop_limit_exceeded_hop": errors.get("hop_limit_exceeded", {}).get("src_ip") or "",
}
msg = f"ICMPv6 leak {ip} → matrix={matrix} fp={fp_hash[:8]}"
return fields, msg
def publish_payload(
self, ip: str, port: int | None, result: dict[str, Any]
) -> dict[str, Any]:
return {
"attacker_ip": ip,
"icmp6_matrix": result.get("matrix", ""),
"icmp6_fp_hash": result.get("fingerprint_hash", ""),
"errors": result.get("errors", {}),
"observed_at": result.get("observed_at", ""),
}