Files
DECNET/decnet/prober/ipv6_leak.py
anti f2b3393669 chore: relicense to AGPL-3.0-or-later and add SPDX headers
Replaces LICENSE (GPLv3 -> AGPLv3) and prepends
`SPDX-License-Identifier: AGPL-3.0-or-later` to every source file
across decnet/, decnet_web/, tests/, scripts/, and tools/.

Rationale: closes the GPLv3 ASP loophole so any party operating a
modified DECNET as a network service must offer their modified
source. Personal copyright (Samuel Paschuan) + inbound=outbound
contributions make a future unilateral relicense infeasible.

- LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt)
- COPYRIGHT: project copyright notice
- tools/add_spdx_headers.py: idempotent header injector
  (shebang- and PEP 263-aware)

Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh).
No behavior change; comments only.
2026-05-22 21:04:16 -04:00

132 lines
4.1 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""Active IPv6 link-local solicitation prober.
Sends ICMPv6 Neighbor Solicitation and Echo Request packets to the
link-local multicast group (ff02::1) on the attacker's reachable iface
to elicit a fe80:: response that reveals the attacker's IID/MAC.
Only useful when the prober shares layer-2 with the attacker (on-link).
The phase function in worker.py gates on this before sending.
"""
from __future__ import annotations
import ipaddress
import subprocess # nosec B404
from datetime import datetime, timezone
from typing import Any
from decnet.logging import get_logger
from decnet.sniffer.fingerprint import _ipv6_iid_classify
_log = get_logger("prober.ipv6_leak")
def _is_link_local(addr: str) -> bool:
try:
return ipaddress.ip_address(addr).is_link_local
except ValueError:
return False
def _ip_route_get(attacker_v4: str) -> str:
"""Return stdout of `ip route get <attacker_v4>`, or "" on failure."""
try:
out = subprocess.run( # nosec B603 B607
["ip", "route", "get", attacker_v4],
capture_output=True, text=True, timeout=2,
)
return out.stdout
except Exception as exc:
_log.debug("ipv6_leak: ip route get failed for %s: %s", attacker_v4, exc)
return ""
def _route_info(attacker_v4: str) -> tuple[bool, str | None]:
"""Return (on_link, iface) with a single `ip route get` invocation.
on_link is True when there is no intermediate gateway ("via" absent).
iface is the local interface name, or None if not parseable.
"""
stdout = _ip_route_get(attacker_v4)
parts = stdout.split()
on_link = "via" not in parts
iface: str | None = None
if "dev" in parts:
idx = parts.index("dev")
iface = parts[idx + 1] if idx + 1 < len(parts) else None
return on_link, iface
def _resolve_iface_for_ip(attacker_v4: str) -> str | None:
"""Return the local interface name that would route to attacker_v4."""
_, iface = _route_info(attacker_v4)
return iface
def _is_on_link(attacker_v4: str) -> bool:
"""Return True only when the attacker is directly reachable on L2."""
on_link, _ = _route_info(attacker_v4)
return on_link
def solicit_ipv6_leak(
attacker_v4: str,
iface: str,
timeout: float = 3.0,
) -> dict[str, Any] | None:
"""Send ICMPv6 solicitations on *iface* and return evidence if a
fe80:: response arrives.
Returns an ``Ipv6LinkLocalLeakEvidence``-shaped dict on success,
or None when scapy is unavailable, the iface has no link-local addr,
or no fe80:: response is seen within *timeout* seconds.
"""
try:
from scapy.layers.inet6 import ICMPv6EchoRequest, IPv6
from scapy.sendrecv import sr1
except ImportError:
_log.debug("scapy not available — ipv6_leak active probe skipped")
return None
from decnet.network import list_v6_addrs
v6_addrs = list_v6_addrs(iface)
link_local_src = next(
(addr for addr, scope in v6_addrs if scope == "link"), None
)
if link_local_src is None:
_log.debug("ipv6_leak: no link-local addr on %s — skip active probe", iface)
return None
# ICMPv6 Echo to ff02::1 (all-nodes multicast) elicits responses from
# all on-link hosts; the first fe80:: reply is the attacker's IID.
pkt = IPv6(src=link_local_src, dst="ff02::1") / ICMPv6EchoRequest()
try:
resp = sr1(pkt, iface=iface, timeout=timeout, verbose=0)
except Exception as exc:
_log.debug("ipv6_leak: sr1 failed on %s: %s", iface, exc)
return None
if resp is None:
return None
try:
src_addr: str = resp[IPv6].src
except Exception as exc:
_log.debug("ipv6_leak: response parse failed on %s: %s", attacker_v4, exc)
return None
if not _is_link_local(src_addr):
return None
iid_kind, mac_oui = _ipv6_iid_classify(src_addr)
return {
"addr": src_addr,
"mac_oui": mac_oui,
"iid_kind": iid_kind,
"vector": "active_echo",
"on_iface": iface,
"attacker_v4": attacker_v4,
"observed_at": datetime.now(timezone.utc).isoformat(),
}