diff --git a/decnet/sniffer/fingerprint.py b/decnet/sniffer/fingerprint.py index 322d3f0d..80dfec3f 100644 --- a/decnet/sniffer/fingerprint.py +++ b/decnet/sniffer/fingerprint.py @@ -71,9 +71,37 @@ _BUS_TRAFFIC_EVENTS: frozenset[str] = frozenset({ "http_request_fingerprint", "http2_settings", "http3_settings", + "ipv6_link_local_leak", }) +def _ipv6_iid_classify(addr: str) -> tuple[str, str]: + """Return (iid_kind, mac_oui) for a link-local IPv6 address. + + iid_kind: "eui64" | "stable_privacy" | "temporary" | "unknown" + mac_oui: "aa:bb:cc" (first 3 octets, lowercase) or "" when not derivable. + + EUI-64 IIDs embed the MAC: bytes 11-12 of the 128-bit address are 0xff 0xfe, + and bit 6 of byte 8 (the universal/local bit) is flipped. All other IIDs + from RFC 7217 (stable-privacy) or RFC 4941 (temporary) have no embedded MAC. + """ + try: + import ipaddress + packed = ipaddress.ip_address(addr).packed # 16 bytes + except Exception: + return "unknown", "" + + iid = packed[8:] # 8-byte Interface Identifier + if iid[3] == 0xff and iid[4] == 0xfe: + # EUI-64: recover 3-byte OUI from bytes 0-2, flip U/L bit (bit 1 of byte 0) + oui_bytes = bytearray([iid[0] ^ 0x02, iid[1], iid[2]]) + oui = ":".join(f"{b:02x}" for b in oui_bytes) + return "eui64", oui + # Stable-privacy (RFC 7217) and temporary (RFC 4941) are not distinguishable + # from the address alone; classify as stable_privacy by default. + return "stable_privacy", "" + + def _parse_ssh_banner(data: bytes) -> str | None: """ Return the attacker's SSH identification string (RFC 4253 §4.2) if @@ -1275,8 +1303,71 @@ class SnifferEngine: else: self._flows.pop(key, None) + def _on_ipv6_packet(self, pkt: Any) -> None: + """Handle an IPv6 packet and emit ipv6_link_local_leak if fe80::/10 src.""" + try: + from scapy.layers.inet6 import IPv6 + except ImportError: + return + + if not pkt.haslayer(IPv6): + return + + ip6 = pkt[IPv6] + src: str = ip6.src + dst: str = ip6.dst + + # Only care about link-local sources (fe80::/10). + if not src.lower().startswith("fe80:"): + return + + # Correlate to a known attacker: check if the packet destination + # is a known decky IP (attacker→decky direction). + attacker_v4 = "" + node_name = self._ip_to_decky.get(dst) + if node_name is None: + # Also accept reverse: decky→attacker (e.g. NDP response to our NS) + node_name = self._ip_to_decky.get(src) + if node_name is None: + return + else: + # dst is a decky — the attacker is the src (link-local) + # Try to find the attacker's known v4 address from ip_to_decky reverse. + # We don't have an authoritative v4 here; leave it empty and let the + # lifter correlate by decky_name + timing. + attacker_v4 = "" + + iid_kind, mac_oui = _ipv6_iid_classify(src) + # Scapy packets from sniff() carry the iface in pkt.sniffed_on when + # iface= was passed to sniff(); getattr handles absent attribute safely. + iface = getattr(pkt, "sniffed_on", None) or "" + + from datetime import datetime, timezone + observed_at = datetime.now(timezone.utc).isoformat() + + self._log( + node_name, + "ipv6_link_local_leak", + src_ip=src, + dst_ip=dst, + iid_kind=iid_kind, + mac_oui=mac_oui, + on_iface=iface, + attacker_v4=attacker_v4, + observed_at=observed_at, + ) + def on_packet(self, pkt: Any) -> None: """Process a single scapy packet. Called from the sniff thread.""" + try: + from scapy.layers.inet6 import IPv6 + except ImportError: + pass + else: + if pkt.haslayer(IPv6): + self._on_ipv6_packet(pkt) + return + try: from scapy.layers.inet import IP, TCP except ImportError: diff --git a/decnet/sniffer/worker.py b/decnet/sniffer/worker.py index 7ac44574..084306b0 100644 --- a/decnet/sniffer/worker.py +++ b/decnet/sniffer/worker.py @@ -89,7 +89,7 @@ def _sniff_loop( log_path: Path, json_path: Path, stop_event: threading.Event, - bpf_filter: str = "tcp", + bpf_filter: str = "tcp or ip6", publish_fn: Callable[[str, str, dict[str, Any]], None] | None = None, engine: "SnifferEngine | None" = None, ) -> None: diff --git a/tests/sniffer/test_ipv6_leak.py b/tests/sniffer/test_ipv6_leak.py new file mode 100644 index 00000000..15ac30cb --- /dev/null +++ b/tests/sniffer/test_ipv6_leak.py @@ -0,0 +1,135 @@ +"""Passive IPv6 link-local leak detection — sniffer unit tests. + +Tests SnifferEngine._on_ipv6_packet and _ipv6_iid_classify via direct +packet injection (no sniff thread — per project constraint on scapy sniff +threads in pytest teardown). +""" +from __future__ import annotations + +import pytest + +from scapy.layers.inet6 import ICMPv6ND_NS, IPv6 + +from decnet.sniffer.fingerprint import SnifferEngine, _ipv6_iid_classify + +_DECKY_IP6 = "fe80::1" # decky's own link-local (destination) +_DECKY_IP4 = "10.0.0.5" # corresponding v4 for ip_to_decky mapping +_DECKY = "decky-a" + +# EUI-64 derived from MAC aa:bb:cc:dd:ee:ff → +# bytes: aa^0x02, bb, cc, ff, fe, dd, ee, ff +# → IID: a8:bb:cc:ff:fe:dd:ee:ff → fe80::aabb:ccff:fedd:eeff +_EUI64_ADDR = "fe80::aabb:ccff:fedd:eeff" +_EUI64_OUI = "a8:bb:cc" # U/L bit flipped (aa XOR 0x02 = a8) + +# Stable-privacy / random IID — no fffe bytes at positions 3-4 +_STABLE_ADDR = "fe80::1234:5678:9abc:def0" + + +def _engine(extra_map: dict[str, str] | None = None) -> tuple[SnifferEngine, list[str]]: + captured: list[str] = [] + ip_map = {_DECKY_IP4: _DECKY} + if extra_map: + ip_map.update(extra_map) + engine = SnifferEngine( + ip_to_decky=ip_map, + write_fn=captured.append, + dedup_ttl=300.0, + ) + return engine, captured + + +# ── _ipv6_iid_classify ─────────────────────────────────────────────────────── + + +def test_iid_classify_eui64_returns_oui() -> None: + kind, oui = _ipv6_iid_classify(_EUI64_ADDR) + assert kind == "eui64" + assert oui == _EUI64_OUI + + +def test_iid_classify_stable_privacy() -> None: + kind, oui = _ipv6_iid_classify(_STABLE_ADDR) + assert kind == "stable_privacy" + assert oui == "" + + +def test_iid_classify_bad_addr_returns_unknown() -> None: + kind, oui = _ipv6_iid_classify("not-an-address") + assert kind == "unknown" + assert oui == "" + + +# ── _on_ipv6_packet ───────────────────────────────────────────────────────── + + +def _make_ndp_ns(src: str, dst: str) -> object: + """Craft a Neighbor Solicitation from attacker link-local to decky.""" + return IPv6(src=src, dst=dst) / ICMPv6ND_NS(tgt=dst) + + +def test_eui64_packet_emits_ipv6_leak_event() -> None: + engine, captured = _engine(extra_map={"fe80::1": _DECKY}) + pkt = _make_ndp_ns(_EUI64_ADDR, _DECKY_IP6) + engine._on_ipv6_packet(pkt) + assert len(captured) == 1 + line = captured[0] + assert "ipv6_link_local_leak" in line + assert _EUI64_ADDR in line + assert "eui64" in line + assert _EUI64_OUI in line + + +def test_stable_privacy_packet_emits_event() -> None: + engine, captured = _engine(extra_map={"fe80::1": _DECKY}) + pkt = _make_ndp_ns(_STABLE_ADDR, _DECKY_IP6) + engine._on_ipv6_packet(pkt) + assert len(captured) == 1 + assert "stable_privacy" in captured[0] + + +def test_non_link_local_src_is_ignored() -> None: + engine, captured = _engine(extra_map={"fe80::1": _DECKY}) + # GUA source — not a link-local leak + pkt = IPv6(src="2001:db8::1", dst=_DECKY_IP6) / ICMPv6ND_NS(tgt=_DECKY_IP6) + engine._on_ipv6_packet(pkt) + assert captured == [] + + +def test_packet_to_unknown_decky_is_ignored() -> None: + engine, captured = _engine() # ip_to_decky has no v6 entries + pkt = _make_ndp_ns(_EUI64_ADDR, "fe80::dead") + engine._on_ipv6_packet(pkt) + assert captured == [] + + +def test_on_packet_dispatches_ipv6_branch() -> None: + """on_packet() must route IPv6 packets to _on_ipv6_packet.""" + engine, captured = _engine(extra_map={"fe80::1": _DECKY}) + pkt = _make_ndp_ns(_EUI64_ADDR, _DECKY_IP6) + engine.on_packet(pkt) + assert any("ipv6_link_local_leak" in line for line in captured) + + +def test_dedup_suppresses_repeat_emit() -> None: + engine, captured = _engine(extra_map={"fe80::1": _DECKY}) + pkt = _make_ndp_ns(_EUI64_ADDR, _DECKY_IP6) + engine._on_ipv6_packet(pkt) + engine._on_ipv6_packet(pkt) + assert len(captured) == 1 # second identical packet deduped + + +def test_publish_fn_fires_on_leak() -> None: + published: list[tuple[str, str, dict]] = [] + engine = SnifferEngine( + ip_to_decky={"fe80::1": _DECKY}, + write_fn=lambda _: None, + publish_fn=lambda node, event, payload: published.append((node, event, payload)), + ) + pkt = _make_ndp_ns(_EUI64_ADDR, _DECKY_IP6) + engine._on_ipv6_packet(pkt) + assert len(published) == 1 + node, event, payload = published[0] + assert event == "ipv6_link_local_leak" + assert payload["iid_kind"] == "eui64" + assert payload["mac_oui"] == _EUI64_OUI