Files
DECNET/decnet/ttp/impl/ipv6_leak_lifter.py
anti f2b3393669 chore: relicense to AGPL-3.0-or-later and add SPDX headers
Replaces LICENSE (GPLv3 -> AGPLv3) and prepends
`SPDX-License-Identifier: AGPL-3.0-or-later` to every source file
across decnet/, decnet_web/, tests/, scripts/, and tools/.

Rationale: closes the GPLv3 ASP loophole so any party operating a
modified DECNET as a network service must offer their modified
source. Personal copyright (Samuel Paschuan) + inbound=outbound
contributions make a future unilateral relicense infeasible.

- LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt)
- COPYRIGHT: project copyright notice
- tools/add_spdx_headers.py: idempotent header injector
  (shebang- and PEP 263-aware)

Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh).
No behavior change; comments only.
2026-05-22 21:04:16 -04:00

76 lines
2.7 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""IPv6 link-local leak lifter — opsec-failure tagger (R0059).
Reads ``ipv6_leak`` source-kind events emitted by the passive sniffer
(SnifferEngine._on_ipv6_packet) and the active prober (_ipv6_leak_phase)
and emits a Command-and-Control / Proxy technique tag (T1090) when a
fe80:: address is observed for an attacker known to be behind an IPv4 VPN.
Evidence is pinned to :class:`~decnet.web.db.models.ttp.Ipv6LinkLocalLeakEvidence`.
The ``iid_kind`` field carries classification confidence context so analysts
can filter EUI-64 (strongest, MAC-derived) from stable-privacy or temporary IIDs.
"""
from __future__ import annotations
from typing import Any, Final
from decnet.ttp.base import TaggerEvent, TolerantTagger
from decnet.ttp.impl._emit import emit_tags
from decnet.ttp.impl._rule_index import RuleIndex
from decnet.ttp.impl._state import is_active
from decnet.ttp.impl.rule_engine import CompiledRule
from decnet.ttp.store.base import RuleStore
from decnet.web.db.models.ttp import TTPTag
_OWNED_PREFIX: Final[str] = "lifter:ipv6_link_local_leak"
def _p_ipv6_leak(
_spec: dict[str, Any], payload: dict[str, Any],
) -> dict[str, Any] | None:
addr: str = payload.get("addr", "") or payload.get("src_ip", "")
if not addr.lower().startswith("fe80:"):
return None
return {
"addr": addr,
"mac_oui": payload.get("mac_oui", ""),
"iid_kind": payload.get("iid_kind", "unknown"),
"vector": payload.get("vector", ""),
"on_iface": payload.get("on_iface", ""),
"attacker_v4": payload.get("attacker_v4", "") or payload.get("attacker_ip", ""),
"observed_at": payload.get("observed_at", ""),
}
class Ipv6LeakLifter(TolerantTagger):
name = "ipv6_leak"
HANDLES = frozenset({"ipv6_leak"})
def __init__(self, store: RuleStore) -> None:
self._store = store
self._index = RuleIndex()
@classmethod
def _owns(cls, rule: CompiledRule) -> bool:
kind = rule.match_spec.get("kind", "")
return isinstance(kind, str) and kind == _OWNED_PREFIX
async def watch_store(self) -> None:
await self._index.watch(self._store, predicate=self._owns)
async def _tag_impl(self, event: TaggerEvent) -> list[TTPTag]:
out: list[TTPTag] = []
for rule in self._index.values():
if event.source_kind not in rule.applies_to:
continue
if not is_active(rule.state):
continue
evidence = _p_ipv6_leak(rule.match_spec, event.payload)
if evidence is None:
continue
out.extend(emit_tags(rule, event, evidence))
return out
__all__ = ["Ipv6LeakLifter"]