feat(ttp): Ipv6LeakLifter + R0059 rule for IPv6 link-local opsec failures

Ipv6LeakLifter subscribes to source_kind="ipv6_leak" events from both
the passive sniffer and active prober. Emits T1090 (Proxy) under TA0011
(C2) when fe80:: source address is observed — the attacker's VPN only
tunnels IPv4 so their link-local IID leaks their NIC identity.

Rule R0059 sets base confidence 0.85; iid_kind in the evidence carries
the per-observation strength (eui64 = MAC-derived, deterministic;
stable_privacy = RFC 7217; temporary = RFC 4941).
This commit is contained in:
2026-05-17 20:22:26 -04:00
parent 504340745e
commit 9056e33962
3 changed files with 118 additions and 1 deletions

View File

@@ -0,0 +1,74 @@
"""IPv6 link-local leak lifter — opsec-failure tagger (R0059).
Reads ``ipv6_leak`` source-kind events emitted by the passive sniffer
(SnifferEngine._on_ipv6_packet) and the active prober (_ipv6_leak_phase)
and emits a Command-and-Control / Proxy technique tag (T1090) when a
fe80:: address is observed for an attacker known to be behind an IPv4 VPN.
Evidence is pinned to :class:`~decnet.web.db.models.ttp.Ipv6LinkLocalLeakEvidence`.
The ``iid_kind`` field carries classification confidence context so analysts
can filter EUI-64 (strongest, MAC-derived) from stable-privacy or temporary IIDs.
"""
from __future__ import annotations
from typing import Any, Final
from decnet.ttp.base import TaggerEvent, TolerantTagger
from decnet.ttp.impl._emit import emit_tags
from decnet.ttp.impl._rule_index import RuleIndex
from decnet.ttp.impl._state import is_active
from decnet.ttp.impl.rule_engine import CompiledRule
from decnet.ttp.store.base import RuleStore
from decnet.web.db.models.ttp import TTPTag
_OWNED_PREFIX: Final[str] = "lifter:ipv6_link_local_leak"
def _p_ipv6_leak(
_spec: dict[str, Any], payload: dict[str, Any],
) -> dict[str, Any] | None:
addr: str = payload.get("addr", "") or payload.get("src_ip", "")
if not addr.lower().startswith("fe80:"):
return None
return {
"addr": addr,
"mac_oui": payload.get("mac_oui", ""),
"iid_kind": payload.get("iid_kind", "unknown"),
"vector": payload.get("vector", ""),
"on_iface": payload.get("on_iface", ""),
"attacker_v4": payload.get("attacker_v4", "") or payload.get("attacker_ip", ""),
"observed_at": payload.get("observed_at", ""),
}
class Ipv6LeakLifter(TolerantTagger):
name = "ipv6_leak"
HANDLES = frozenset({"ipv6_leak"})
def __init__(self, store: RuleStore) -> None:
self._store = store
self._index = RuleIndex()
@classmethod
def _owns(cls, rule: CompiledRule) -> bool:
kind = rule.match_spec.get("kind", "")
return isinstance(kind, str) and kind == _OWNED_PREFIX
async def watch_store(self) -> None:
await self._index.watch(self._store, predicate=self._owns)
async def _tag_impl(self, event: TaggerEvent) -> list[TTPTag]:
out: list[TTPTag] = []
for rule in self._index.values():
if event.source_kind not in rule.applies_to:
continue
if not is_active(rule.state):
continue
evidence = _p_ipv6_leak(rule.match_spec, event.payload)
if evidence is None:
continue
out.extend(emit_tags(rule, event, evidence))
return out
__all__ = ["Ipv6LeakLifter"]

27
rules/ttp/R0059.yaml Normal file
View File

@@ -0,0 +1,27 @@
rule_id: R0059
rule_version: 1
last_reviewed: "2026-05-17"
next_review: "2026-08-17"
name: ipv6_link_local_leak
description: |
Attacker's IPv6 link-local address (fe80::/10) observed despite operating
behind an IPv4-only VPN. The IID is derived from the NIC MAC address
(EUI-64) or a stable per-host value (RFC 7217 stable-privacy), either of
which survives VPN/IP rotation and constitutes a persistent host fingerprint.
Passive sniffer and active ICMPv6 solicitation both feed this rule.
applies_to:
- ipv6_leak
match:
kind: lifter:ipv6_link_local_leak
emits:
- tactic: TA0011
technique_id: T1090
confidence: 0.85
evidence_fields:
- addr
- mac_oui
- iid_kind
- vector
- on_iface
- attacker_v4
- observed_at

View File

@@ -21,6 +21,7 @@ from decnet.ttp.impl.canary_fingerprint_lifter import CanaryFingerprintLifter
from decnet.ttp.impl.email_lifter import EmailLifter
from decnet.ttp.impl.http_fingerprint_lifter import HttpFingerprintLifter
from decnet.ttp.impl.intel_lifter import IntelLifter
from decnet.ttp.impl.ipv6_leak_lifter import Ipv6LeakLifter
from decnet.ttp.impl.rule_engine import CompiledRule
from decnet.ttp.store.base import RuleState
from decnet.ttp.store.impl.filesystem import _parse_and_compile
@@ -171,13 +172,28 @@ _LIFTER_CASES: list[tuple[str, Any, Any, Any, dict[str, Any]]] = [
lambda: _compile_yaml("R0049"),
{"navigator_webdriver": True},
),
(
"ipv6_leak",
Ipv6LeakLifter,
Ipv6LinkLocalLeakEvidence,
lambda: _compile_yaml("R0059"),
{
"addr": "fe80::aabb:ccff:fedd:eeff",
"mac_oui": "a8:bb:cc",
"iid_kind": "eui64",
"vector": "passive_ndp",
"on_iface": "eth0",
"attacker_v4": "10.0.0.9",
"observed_at": "2026-01-01T00:00:00+00:00",
},
),
]
@pytest.mark.parametrize(
"source_kind, lifter_cls, td_cls, rule_factory, payload",
_LIFTER_CASES,
ids=["http_fingerprint", "intel", "email", "canary_fingerprint"],
ids=["http_fingerprint", "intel", "email", "canary_fingerprint", "ipv6_leak"],
)
def test_lifter_emits_evidence_matching_typeddict(
source_kind: str,