Replaces LICENSE (GPLv3 -> AGPLv3) and prepends `SPDX-License-Identifier: AGPL-3.0-or-later` to every source file across decnet/, decnet_web/, tests/, scripts/, and tools/. Rationale: closes the GPLv3 ASP loophole so any party operating a modified DECNET as a network service must offer their modified source. Personal copyright (Samuel Paschuan) + inbound=outbound contributions make a future unilateral relicense infeasible. - LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt) - COPYRIGHT: project copyright notice - tools/add_spdx_headers.py: idempotent header injector (shebang- and PEP 263-aware) Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh). No behavior change; comments only.
132 lines
4.1 KiB
Python
132 lines
4.1 KiB
Python
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
"""Active IPv6 link-local solicitation prober.
|
|
|
|
Sends ICMPv6 Neighbor Solicitation and Echo Request packets to the
|
|
link-local multicast group (ff02::1) on the attacker's reachable iface
|
|
to elicit a fe80:: response that reveals the attacker's IID/MAC.
|
|
|
|
Only useful when the prober shares layer-2 with the attacker (on-link).
|
|
The phase function in worker.py gates on this before sending.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import ipaddress
|
|
import subprocess # nosec B404
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
from decnet.logging import get_logger
|
|
from decnet.sniffer.fingerprint import _ipv6_iid_classify
|
|
|
|
_log = get_logger("prober.ipv6_leak")
|
|
|
|
|
|
def _is_link_local(addr: str) -> bool:
|
|
try:
|
|
return ipaddress.ip_address(addr).is_link_local
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def _ip_route_get(attacker_v4: str) -> str:
|
|
"""Return stdout of `ip route get <attacker_v4>`, or "" on failure."""
|
|
try:
|
|
out = subprocess.run( # nosec B603 B607
|
|
["ip", "route", "get", attacker_v4],
|
|
capture_output=True, text=True, timeout=2,
|
|
)
|
|
return out.stdout
|
|
except Exception as exc:
|
|
_log.debug("ipv6_leak: ip route get failed for %s: %s", attacker_v4, exc)
|
|
return ""
|
|
|
|
|
|
def _route_info(attacker_v4: str) -> tuple[bool, str | None]:
|
|
"""Return (on_link, iface) with a single `ip route get` invocation.
|
|
|
|
on_link is True when there is no intermediate gateway ("via" absent).
|
|
iface is the local interface name, or None if not parseable.
|
|
"""
|
|
stdout = _ip_route_get(attacker_v4)
|
|
parts = stdout.split()
|
|
on_link = "via" not in parts
|
|
iface: str | None = None
|
|
if "dev" in parts:
|
|
idx = parts.index("dev")
|
|
iface = parts[idx + 1] if idx + 1 < len(parts) else None
|
|
return on_link, iface
|
|
|
|
|
|
def _resolve_iface_for_ip(attacker_v4: str) -> str | None:
|
|
"""Return the local interface name that would route to attacker_v4."""
|
|
_, iface = _route_info(attacker_v4)
|
|
return iface
|
|
|
|
|
|
def _is_on_link(attacker_v4: str) -> bool:
|
|
"""Return True only when the attacker is directly reachable on L2."""
|
|
on_link, _ = _route_info(attacker_v4)
|
|
return on_link
|
|
|
|
|
|
def solicit_ipv6_leak(
|
|
attacker_v4: str,
|
|
iface: str,
|
|
timeout: float = 3.0,
|
|
) -> dict[str, Any] | None:
|
|
"""Send ICMPv6 solicitations on *iface* and return evidence if a
|
|
fe80:: response arrives.
|
|
|
|
Returns an ``Ipv6LinkLocalLeakEvidence``-shaped dict on success,
|
|
or None when scapy is unavailable, the iface has no link-local addr,
|
|
or no fe80:: response is seen within *timeout* seconds.
|
|
"""
|
|
try:
|
|
from scapy.layers.inet6 import ICMPv6EchoRequest, IPv6
|
|
from scapy.sendrecv import sr1
|
|
except ImportError:
|
|
_log.debug("scapy not available — ipv6_leak active probe skipped")
|
|
return None
|
|
|
|
from decnet.network import list_v6_addrs
|
|
v6_addrs = list_v6_addrs(iface)
|
|
link_local_src = next(
|
|
(addr for addr, scope in v6_addrs if scope == "link"), None
|
|
)
|
|
if link_local_src is None:
|
|
_log.debug("ipv6_leak: no link-local addr on %s — skip active probe", iface)
|
|
return None
|
|
|
|
# ICMPv6 Echo to ff02::1 (all-nodes multicast) elicits responses from
|
|
# all on-link hosts; the first fe80:: reply is the attacker's IID.
|
|
pkt = IPv6(src=link_local_src, dst="ff02::1") / ICMPv6EchoRequest()
|
|
|
|
try:
|
|
resp = sr1(pkt, iface=iface, timeout=timeout, verbose=0)
|
|
except Exception as exc:
|
|
_log.debug("ipv6_leak: sr1 failed on %s: %s", iface, exc)
|
|
return None
|
|
|
|
if resp is None:
|
|
return None
|
|
|
|
try:
|
|
src_addr: str = resp[IPv6].src
|
|
except Exception as exc:
|
|
_log.debug("ipv6_leak: response parse failed on %s: %s", attacker_v4, exc)
|
|
return None
|
|
|
|
if not _is_link_local(src_addr):
|
|
return None
|
|
|
|
iid_kind, mac_oui = _ipv6_iid_classify(src_addr)
|
|
return {
|
|
"addr": src_addr,
|
|
"mac_oui": mac_oui,
|
|
"iid_kind": iid_kind,
|
|
"vector": "active_echo",
|
|
"on_iface": iface,
|
|
"attacker_v4": attacker_v4,
|
|
"observed_at": datetime.now(timezone.utc).isoformat(),
|
|
}
|