From 504340745ed969b819305bb589aa5c01f335d7af Mon Sep 17 00:00:00 2001 From: anti Date: Sun, 17 May 2026 20:20:19 -0400 Subject: [PATCH] feat(prober): active IPv6 link-local solicitation phase MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add ipv6_leak.py with solicit_ipv6_leak() — sends ICMPv6 Echo to ff02::1 on the attacker's iface and returns fe80:: evidence when a link-local response arrives. Gated on _is_on_link(): skips when attacker is behind a router (no L2 adjacency). Add _ipv6_leak_phase() to worker.py (Phase 4 in _probe_cycle). Phase runs once per attacker IP per cycle (sentinel at port 0 in ip_probed["ipv6_leak"]) and publishes kind="ipv6_leak" via publish_fn. Add list_v6_addrs(iface) to network.py: returns [(addr, scope)] for all IPv6 addresses on an interface, required for source-routing ICMPv6 from the correct link-local address. --- decnet/network.py | 28 ++++++++ decnet/prober/ipv6_leak.py | 118 ++++++++++++++++++++++++++++++++ decnet/prober/worker.py | 72 ++++++++++++++++++++ tests/prober/test_ipv6_leak.py | 120 +++++++++++++++++++++++++++++++++ 4 files changed, 338 insertions(+) create mode 100644 decnet/prober/ipv6_leak.py create mode 100644 tests/prober/test_ipv6_leak.py diff --git a/decnet/network.py b/decnet/network.py index b0378dfa..ddde02e5 100644 --- a/decnet/network.py +++ b/decnet/network.py @@ -78,6 +78,34 @@ def get_host_ip(interface: str) -> str: raise RuntimeError(f"Could not determine host IP for interface {interface}.") +def list_v6_addrs(interface: str) -> list[tuple[str, str]]: + """Return [(addr, scope)] for all IPv6 addresses on *interface*. + + addr — the IPv6 address without prefix length (e.g. "fe80::1") + scope — the kernel scope label (e.g. "link", "global", "host") + + Returns an empty list when the interface has no IPv6 addresses or + when `ip addr show` fails. + """ + try: + result = _run(["ip", "-6", "addr", "show", interface]) + except Exception: + return [] + out: list[tuple[str, str]] = [] + for line in result.stdout.splitlines(): + line = line.strip() + if not line.startswith("inet6 "): + continue + # "inet6 fe80::1/64 scope link" + parts = line.split() + addr = parts[1].split("/")[0] + scope = "" + if "scope" in parts: + scope = parts[parts.index("scope") + 1] + out.append((addr, scope)) + return out + + # --------------------------------------------------------------------------- # IP allocation # --------------------------------------------------------------------------- diff --git a/decnet/prober/ipv6_leak.py b/decnet/prober/ipv6_leak.py new file mode 100644 index 00000000..e9c5cf61 --- /dev/null +++ b/decnet/prober/ipv6_leak.py @@ -0,0 +1,118 @@ +"""Active IPv6 link-local solicitation prober. + +Sends ICMPv6 Neighbor Solicitation and Echo Request packets to the +link-local multicast group (ff02::1) on the attacker's reachable iface +to elicit a fe80:: response that reveals the attacker's IID/MAC. + +Only useful when the prober shares layer-2 with the attacker (on-link). +The phase function in worker.py gates on this before sending. +""" +from __future__ import annotations + +import ipaddress +import subprocess # nosec B404 +from datetime import datetime, timezone +from typing import Any + +from decnet.logging import get_logger +from decnet.sniffer.fingerprint import _ipv6_iid_classify + +_log = get_logger("prober.ipv6_leak") + + +def _is_link_local(addr: str) -> bool: + try: + return ipaddress.ip_address(addr).is_link_local + except ValueError: + return False + + +def _ip_route_get(attacker_v4: str) -> str: + """Return stdout of `ip route get `, or "" on failure.""" + try: + out = subprocess.run( # nosec B603 B607 + ["ip", "route", "get", attacker_v4], + capture_output=True, text=True, timeout=2, + ) + return out.stdout + except Exception: + return "" + + +def _resolve_iface_for_ip(attacker_v4: str) -> str | None: + """Return the local interface name that would route to attacker_v4.""" + stdout = _ip_route_get(attacker_v4) + parts = stdout.split() + if "dev" in parts: + idx = parts.index("dev") + return parts[idx + 1] if idx + 1 < len(parts) else None + return None + + +def _is_on_link(attacker_v4: str) -> bool: + """Return True only when the attacker is directly reachable on L2. + + Checks that `ip route get` shows no intermediate gateway (no "via"). + """ + return "via" not in _ip_route_get(attacker_v4) + + +def solicit_ipv6_leak( + attacker_v4: str, + iface: str, + timeout: float = 3.0, +) -> dict[str, Any] | None: + """Send ICMPv6 solicitations on *iface* and return evidence if a + fe80:: response arrives. + + Returns an ``Ipv6LinkLocalLeakEvidence``-shaped dict on success, + or None when scapy is unavailable, the iface has no link-local addr, + or no fe80:: response is seen within *timeout* seconds. + """ + try: + from scapy.layers.inet6 import ICMPv6EchoRequest, IPv6 + from scapy.sendrecv import sr1 + except ImportError: + _log.debug("scapy not available — ipv6_leak active probe skipped") + return None + + from decnet.network import list_v6_addrs + v6_addrs = list_v6_addrs(iface) + link_local_src = next( + (addr for addr, scope in v6_addrs if scope == "link"), None + ) + if link_local_src is None: + _log.debug("ipv6_leak: no link-local addr on %s — skip active probe", iface) + return None + + # ICMPv6 Echo to ff02::1 (all-nodes multicast) elicits responses from + # all on-link hosts; the first fe80:: reply is the attacker's IID. + pkt = IPv6(src=link_local_src, dst="ff02::1") / ICMPv6EchoRequest() + + try: + resp = sr1(pkt, iface=iface, timeout=timeout, verbose=0) + except Exception as exc: + _log.debug("ipv6_leak: sr1 failed on %s: %s", iface, exc) + return None + + if resp is None: + return None + + try: + src_addr: str = resp[IPv6].src + except Exception: + return None + + if not _is_link_local(src_addr): + return None + + iid_kind, mac_oui = _ipv6_iid_classify(src_addr) + return { + "addr": src_addr, + "mac_oui": mac_oui, + "iid_kind": iid_kind, + "vector": "active_echo", + "on_iface": iface, + "attacker_v4": attacker_v4, + "observed_at": datetime.now(timezone.utc).isoformat(), + } diff --git a/decnet/prober/worker.py b/decnet/prober/worker.py index 8013f74e..d070748b 100644 --- a/decnet/prober/worker.py +++ b/decnet/prober/worker.py @@ -302,6 +302,9 @@ def _probe_cycle( # Phase 3: TCP/IP stack fingerprinting _tcpfp_phase(ip, ip_probed, tcpfp_ports, log_path, json_path, timeout, publish_fn, record_rotation) + # Phase 4: IPv6 link-local leak (active ICMPv6 solicitation; on-link only) + _ipv6_leak_phase(ip, ip_probed, log_path, json_path, timeout, publish_fn) + @_traced("prober.jarm_phase") def _jarm_phase( @@ -543,6 +546,75 @@ def _tcpfp_phase( logger.warning("prober: TCPFP probe failed %s:%d: %s", ip, port, exc) +@_traced("prober.ipv6_leak_phase") +def _ipv6_leak_phase( + ip: str, + ip_probed: dict[str, set[int]], + log_path: Path, + json_path: Path, + timeout: float, + publish_fn: ProbePublishFn | None = None, +) -> None: + """Attempt active ICMPv6 solicitation to elicit a fe80:: response. + + Skipped when: + - already attempted for this attacker in this cycle + - attacker is not on a directly connected (link-local reachable) L2 + - scapy unavailable or the local iface has no fe80:: address + """ + done = ip_probed.setdefault("ipv6_leak", set()) + # Use port 0 as a sentinel (no port concept for ICMPv6 probes). + if 0 in done: + return + done.add(0) + + from decnet.prober.ipv6_leak import _is_on_link, _resolve_iface_for_ip, solicit_ipv6_leak + + if not _is_on_link(ip): + logger.debug("prober: ipv6_leak: %s is not on-link — skip active probe", ip) + return + + iface = _resolve_iface_for_ip(ip) + if iface is None: + logger.debug("prober: ipv6_leak: cannot determine iface for %s", ip) + return + + try: + evidence = solicit_ipv6_leak(ip, iface, timeout=timeout) + except Exception as exc: + logger.warning("prober: ipv6_leak active probe failed %s: %s", ip, exc) + return + + if evidence is None: + return + + _write_event( + log_path, json_path, + "ipv6_link_local_leak", + target_ip=ip, + ipv6_addr=evidence.get("addr", ""), + iid_kind=evidence.get("iid_kind", ""), + mac_oui=evidence.get("mac_oui", ""), + on_iface=evidence.get("on_iface", ""), + vector=evidence.get("vector", ""), + msg=f"IPv6 leak {ip} → {evidence.get('addr', '')} ({evidence.get('iid_kind', '')})", + ) + logger.info( + "prober: ipv6_leak %s → %s kind=%s oui=%s", + ip, evidence.get("addr"), evidence.get("iid_kind"), evidence.get("mac_oui"), + ) + if publish_fn is not None: + publish_fn("ipv6_leak", { + "attacker_ip": ip, + "addr": evidence.get("addr", ""), + "iid_kind": evidence.get("iid_kind", ""), + "mac_oui": evidence.get("mac_oui", ""), + "vector": evidence.get("vector", ""), + "on_iface": evidence.get("on_iface", ""), + "observed_at": evidence.get("observed_at", ""), + }) + + # ─── Main worker ───────────────────────────────────────────────────────────── @_traced("prober.worker") diff --git a/tests/prober/test_ipv6_leak.py b/tests/prober/test_ipv6_leak.py new file mode 100644 index 00000000..79de990c --- /dev/null +++ b/tests/prober/test_ipv6_leak.py @@ -0,0 +1,120 @@ +"""Active IPv6 link-local solicitation prober tests. + +Tests _ipv6_leak_phase() via monkeypatching — no actual scapy send/receive, +no sniff threads. Validates: +- Phase skips when attacker is not on-link. +- Phase skips on second call (dedup via ip_probed sentinel). +- Phase emits log + publish_fn when solicit_ipv6_leak returns evidence. +- Phase is silent when solicit_ipv6_leak returns None. +""" +from __future__ import annotations + +from pathlib import Path +from typing import Any +from unittest.mock import MagicMock, patch + + +def _phase( + ip: str = "10.0.0.9", + ip_probed: dict | None = None, + log_path: Path | None = None, + json_path: Path | None = None, + timeout: float = 1.0, + publish_fn=None, +): + from decnet.prober.worker import _ipv6_leak_phase + if ip_probed is None: + ip_probed = {} + if log_path is None: + log_path = Path("/dev/null") + if json_path is None: + json_path = Path("/dev/null") + _ipv6_leak_phase(ip, ip_probed, log_path, json_path, timeout, publish_fn) + + +_EVIDENCE = { + "addr": "fe80::aabb:ccff:fedd:eeff", + "mac_oui": "a8:bb:cc", + "iid_kind": "eui64", + "vector": "active_echo", + "on_iface": "eth0", + "attacker_v4": "10.0.0.9", + "observed_at": "2026-01-01T00:00:00+00:00", +} + + +def test_phase_skips_when_not_on_link() -> None: + published: list[Any] = [] + with ( + patch("decnet.prober.ipv6_leak._is_on_link", return_value=False), + patch("decnet.prober.ipv6_leak._resolve_iface_for_ip", return_value="eth0"), + patch("decnet.prober.ipv6_leak.solicit_ipv6_leak", return_value=_EVIDENCE) as mock_sol, + ): + _phase(publish_fn=lambda k, p: published.append((k, p))) + mock_sol.assert_not_called() + assert published == [] + + +def test_phase_skips_when_no_iface() -> None: + published: list[Any] = [] + with ( + patch("decnet.prober.ipv6_leak._is_on_link", return_value=True), + patch("decnet.prober.ipv6_leak._resolve_iface_for_ip", return_value=None), + patch("decnet.prober.ipv6_leak.solicit_ipv6_leak", return_value=_EVIDENCE) as mock_sol, + ): + _phase(publish_fn=lambda k, p: published.append((k, p))) + mock_sol.assert_not_called() + assert published == [] + + +def test_phase_emits_on_evidence() -> None: + published: list[Any] = [] + with ( + patch("decnet.prober.ipv6_leak._is_on_link", return_value=True), + patch("decnet.prober.ipv6_leak._resolve_iface_for_ip", return_value="eth0"), + patch("decnet.prober.ipv6_leak.solicit_ipv6_leak", return_value=_EVIDENCE), + ): + _phase(publish_fn=lambda k, p: published.append((k, p))) + assert len(published) == 1 + kind, payload = published[0] + assert kind == "ipv6_leak" + assert payload["addr"] == _EVIDENCE["addr"] + assert payload["iid_kind"] == "eui64" + assert payload["mac_oui"] == "a8:bb:cc" + + +def test_phase_silent_when_solicit_returns_none() -> None: + published: list[Any] = [] + with ( + patch("decnet.prober.ipv6_leak._is_on_link", return_value=True), + patch("decnet.prober.ipv6_leak._resolve_iface_for_ip", return_value="eth0"), + patch("decnet.prober.ipv6_leak.solicit_ipv6_leak", return_value=None), + ): + _phase(publish_fn=lambda k, p: published.append((k, p))) + assert published == [] + + +def test_phase_dedup_skips_on_second_call() -> None: + published: list[Any] = [] + ip_probed: dict = {} + with ( + patch("decnet.prober.ipv6_leak._is_on_link", return_value=True), + patch("decnet.prober.ipv6_leak._resolve_iface_for_ip", return_value="eth0"), + patch("decnet.prober.ipv6_leak.solicit_ipv6_leak", return_value=_EVIDENCE) as mock_sol, + ): + _phase(ip_probed=ip_probed, publish_fn=lambda k, p: published.append((k, p))) + _phase(ip_probed=ip_probed, publish_fn=lambda k, p: published.append((k, p))) + # solicit called only once despite two phase invocations + mock_sol.assert_called_once() + assert len(published) == 1 + + +def test_phase_handles_solicit_exception_silently() -> None: + published: list[Any] = [] + with ( + patch("decnet.prober.ipv6_leak._is_on_link", return_value=True), + patch("decnet.prober.ipv6_leak._resolve_iface_for_ip", return_value="eth0"), + patch("decnet.prober.ipv6_leak.solicit_ipv6_leak", side_effect=RuntimeError("boom")), + ): + _phase(publish_fn=lambda k, p: published.append((k, p))) + assert published == []