From 9056e339625c14923a9b37eb7edb11d3220ce64c Mon Sep 17 00:00:00 2001 From: anti Date: Sun, 17 May 2026 20:22:26 -0400 Subject: [PATCH] feat(ttp): Ipv6LeakLifter + R0059 rule for IPv6 link-local opsec failures MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ipv6LeakLifter subscribes to source_kind="ipv6_leak" events from both the passive sniffer and active prober. Emits T1090 (Proxy) under TA0011 (C2) when fe80:: source address is observed — the attacker's VPN only tunnels IPv4 so their link-local IID leaks their NIC identity. Rule R0059 sets base confidence 0.85; iid_kind in the evidence carries the per-observation strength (eui64 = MAC-derived, deterministic; stable_privacy = RFC 7217; temporary = RFC 4941). --- decnet/ttp/impl/ipv6_leak_lifter.py | 74 +++++++++++++++++++++++++++++ rules/ttp/R0059.yaml | 27 +++++++++++ tests/ttp/test_evidence_shape.py | 18 ++++++- 3 files changed, 118 insertions(+), 1 deletion(-) create mode 100644 decnet/ttp/impl/ipv6_leak_lifter.py create mode 100644 rules/ttp/R0059.yaml diff --git a/decnet/ttp/impl/ipv6_leak_lifter.py b/decnet/ttp/impl/ipv6_leak_lifter.py new file mode 100644 index 00000000..e8d5b0a2 --- /dev/null +++ b/decnet/ttp/impl/ipv6_leak_lifter.py @@ -0,0 +1,74 @@ +"""IPv6 link-local leak lifter — opsec-failure tagger (R0059). + +Reads ``ipv6_leak`` source-kind events emitted by the passive sniffer +(SnifferEngine._on_ipv6_packet) and the active prober (_ipv6_leak_phase) +and emits a Command-and-Control / Proxy technique tag (T1090) when a +fe80:: address is observed for an attacker known to be behind an IPv4 VPN. + +Evidence is pinned to :class:`~decnet.web.db.models.ttp.Ipv6LinkLocalLeakEvidence`. +The ``iid_kind`` field carries classification confidence context so analysts +can filter EUI-64 (strongest, MAC-derived) from stable-privacy or temporary IIDs. +""" +from __future__ import annotations + +from typing import Any, Final + +from decnet.ttp.base import TaggerEvent, TolerantTagger +from decnet.ttp.impl._emit import emit_tags +from decnet.ttp.impl._rule_index import RuleIndex +from decnet.ttp.impl._state import is_active +from decnet.ttp.impl.rule_engine import CompiledRule +from decnet.ttp.store.base import RuleStore +from decnet.web.db.models.ttp import TTPTag + +_OWNED_PREFIX: Final[str] = "lifter:ipv6_link_local_leak" + + +def _p_ipv6_leak( + _spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + addr: str = payload.get("addr", "") or payload.get("src_ip", "") + if not addr.lower().startswith("fe80:"): + return None + return { + "addr": addr, + "mac_oui": payload.get("mac_oui", ""), + "iid_kind": payload.get("iid_kind", "unknown"), + "vector": payload.get("vector", ""), + "on_iface": payload.get("on_iface", ""), + "attacker_v4": payload.get("attacker_v4", "") or payload.get("attacker_ip", ""), + "observed_at": payload.get("observed_at", ""), + } + + +class Ipv6LeakLifter(TolerantTagger): + name = "ipv6_leak" + HANDLES = frozenset({"ipv6_leak"}) + + def __init__(self, store: RuleStore) -> None: + self._store = store + self._index = RuleIndex() + + @classmethod + def _owns(cls, rule: CompiledRule) -> bool: + kind = rule.match_spec.get("kind", "") + return isinstance(kind, str) and kind == _OWNED_PREFIX + + async def watch_store(self) -> None: + await self._index.watch(self._store, predicate=self._owns) + + async def _tag_impl(self, event: TaggerEvent) -> list[TTPTag]: + out: list[TTPTag] = [] + for rule in self._index.values(): + if event.source_kind not in rule.applies_to: + continue + if not is_active(rule.state): + continue + evidence = _p_ipv6_leak(rule.match_spec, event.payload) + if evidence is None: + continue + out.extend(emit_tags(rule, event, evidence)) + return out + + +__all__ = ["Ipv6LeakLifter"] diff --git a/rules/ttp/R0059.yaml b/rules/ttp/R0059.yaml new file mode 100644 index 00000000..66e78cb3 --- /dev/null +++ b/rules/ttp/R0059.yaml @@ -0,0 +1,27 @@ +rule_id: R0059 +rule_version: 1 +last_reviewed: "2026-05-17" +next_review: "2026-08-17" +name: ipv6_link_local_leak +description: | + Attacker's IPv6 link-local address (fe80::/10) observed despite operating + behind an IPv4-only VPN. The IID is derived from the NIC MAC address + (EUI-64) or a stable per-host value (RFC 7217 stable-privacy), either of + which survives VPN/IP rotation and constitutes a persistent host fingerprint. + Passive sniffer and active ICMPv6 solicitation both feed this rule. +applies_to: + - ipv6_leak +match: + kind: lifter:ipv6_link_local_leak +emits: + - tactic: TA0011 + technique_id: T1090 + confidence: 0.85 +evidence_fields: + - addr + - mac_oui + - iid_kind + - vector + - on_iface + - attacker_v4 + - observed_at diff --git a/tests/ttp/test_evidence_shape.py b/tests/ttp/test_evidence_shape.py index e8e46f6e..a31d83cf 100644 --- a/tests/ttp/test_evidence_shape.py +++ b/tests/ttp/test_evidence_shape.py @@ -21,6 +21,7 @@ from decnet.ttp.impl.canary_fingerprint_lifter import CanaryFingerprintLifter from decnet.ttp.impl.email_lifter import EmailLifter from decnet.ttp.impl.http_fingerprint_lifter import HttpFingerprintLifter from decnet.ttp.impl.intel_lifter import IntelLifter +from decnet.ttp.impl.ipv6_leak_lifter import Ipv6LeakLifter from decnet.ttp.impl.rule_engine import CompiledRule from decnet.ttp.store.base import RuleState from decnet.ttp.store.impl.filesystem import _parse_and_compile @@ -171,13 +172,28 @@ _LIFTER_CASES: list[tuple[str, Any, Any, Any, dict[str, Any]]] = [ lambda: _compile_yaml("R0049"), {"navigator_webdriver": True}, ), + ( + "ipv6_leak", + Ipv6LeakLifter, + Ipv6LinkLocalLeakEvidence, + lambda: _compile_yaml("R0059"), + { + "addr": "fe80::aabb:ccff:fedd:eeff", + "mac_oui": "a8:bb:cc", + "iid_kind": "eui64", + "vector": "passive_ndp", + "on_iface": "eth0", + "attacker_v4": "10.0.0.9", + "observed_at": "2026-01-01T00:00:00+00:00", + }, + ), ] @pytest.mark.parametrize( "source_kind, lifter_cls, td_cls, rule_factory, payload", _LIFTER_CASES, - ids=["http_fingerprint", "intel", "email", "canary_fingerprint"], + ids=["http_fingerprint", "intel", "email", "canary_fingerprint", "ipv6_leak"], ) def test_lifter_emits_evidence_matching_typeddict( source_kind: str,