Files
DECNET/decnet/ttp/impl/canary_fingerprint_lifter.py
anti f2b3393669 chore: relicense to AGPL-3.0-or-later and add SPDX headers
Replaces LICENSE (GPLv3 -> AGPLv3) and prepends
`SPDX-License-Identifier: AGPL-3.0-or-later` to every source file
across decnet/, decnet_web/, tests/, scripts/, and tools/.

Rationale: closes the GPLv3 ASP loophole so any party operating a
modified DECNET as a network service must offer their modified
source. Personal copyright (Samuel Paschuan) + inbound=outbound
contributions make a future unilateral relicense infeasible.

- LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt)
- COPYRIGHT: project copyright notice
- tools/add_spdx_headers.py: idempotent header injector
  (shebang- and PEP 263-aware)

Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh).
No behavior change; comments only.
2026-05-22 21:04:16 -04:00

161 lines
5.3 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""Canary fingerprint lifter — browser-payload derived technique tagger (E.3.11).
Reads canary-payload fingerprints (navigator properties, canvas hashes,
proxy/VPN leakage signatures) per Appendix A.9 and emits Discovery /
Defense-Evasion techniques. Evidence shape is pinned to
:class:`~decnet.web.db.models.ttp.CanaryFingerprintEvidence`
(``metric`` + ``matched_signature``) — raw fingerprint blobs never
land in the tag payload. The composite identity hash matching across
IPs is explicitly NOT a TTP (TTP_TAGGING.md §"Identity-merge guard
rail"); the lifter does not emit on it.
"""
from __future__ import annotations
from collections.abc import Callable
from typing import Any, Final
from decnet.ttp.base import TaggerEvent, TolerantTagger
from decnet.ttp.impl._emit import emit_tags
from decnet.ttp.impl._rule_index import RuleIndex
from decnet.ttp.impl._state import is_active
from decnet.ttp.impl.rule_engine import CompiledRule
from decnet.ttp.store.base import RuleStore
from decnet.web.db.models.ttp import TTPTag
Predicate = Callable[
[dict[str, Any], dict[str, Any]],
"dict[str, Any] | None",
]
def _p_webdriver(
_spec: dict[str, Any], payload: dict[str, Any],
) -> dict[str, Any] | None:
if payload.get("navigator_webdriver") is True:
return {
"metric": "navigator_webdriver",
"matched_signature": "navigator.webdriver_true",
}
return None
def _p_automation_hash(
spec: dict[str, Any], payload: dict[str, Any],
) -> dict[str, Any] | None:
catalogues_raw = spec.get("catalogues", [])
catalogues = (
{c for c in catalogues_raw if isinstance(c, str)}
if isinstance(catalogues_raw, list)
else set()
)
matched = payload.get("canvas_audio_hash_match") or payload.get("matched_tool")
if isinstance(matched, str) and (not catalogues or matched in catalogues):
return {
"metric": "canvas_audio_hash",
"matched_signature": matched,
"matched_tool": matched,
}
return None
def _p_webrtc_leak(
_spec: dict[str, Any], payload: dict[str, Any],
) -> dict[str, Any] | None:
if payload.get("webrtc_geo_mismatch") is True:
return {
"metric": "webrtc_geo_mismatch",
"matched_signature": "webrtc_private_vs_source_ip",
}
return None
def _p_tz_lang_mismatch(
_spec: dict[str, Any], payload: dict[str, Any],
) -> dict[str, Any] | None:
tz_zones = payload.get("tz_mismatch_zones")
lang_mismatch = payload.get("lang_country_mismatch") is True
tz_hit = isinstance(tz_zones, int) and tz_zones >= 3
if tz_hit:
return {
"metric": "timezone_geo_mismatch",
"matched_signature": f"tz_zones>={tz_zones}",
}
if lang_mismatch:
return {
"metric": "language_country_mismatch",
"matched_signature": "lang_vs_source_country",
}
return None
def _p_platform_inconsistency(
_spec: dict[str, Any], payload: dict[str, Any],
) -> dict[str, Any] | None:
if payload.get("platform_ua_inconsistent") is True:
return {
"metric": "platform_ua_mismatch",
"matched_signature": "navigator.platform_vs_userAgent",
}
if payload.get("ua_webgl_mismatch") is True:
return {
"metric": "ua_webgl_mismatch",
"matched_signature": "userAgent_vs_webgl_renderer",
}
return None
_PREDICATES: Final[dict[str, Predicate]] = {
"lifter:canary_webdriver": _p_webdriver,
"lifter:canary_automation_hash": _p_automation_hash,
"lifter:canary_webrtc_leak": _p_webrtc_leak,
"lifter:canary_tz_lang_mismatch": _p_tz_lang_mismatch,
"lifter:canary_platform_inconsistency": _p_platform_inconsistency,
}
class CanaryFingerprintLifter(TolerantTagger):
name = "canary_fingerprint"
HANDLES = frozenset({"canary_fingerprint"})
OWNED_PREFIX: Final[str] = "lifter:canary_"
def __init__(self, store: RuleStore) -> None:
self._store = store
self._index = RuleIndex()
@classmethod
def _owns(cls, rule: CompiledRule) -> bool:
kind = rule.match_spec.get("kind", "")
return isinstance(kind, str) and kind.startswith(cls.OWNED_PREFIX)
async def watch_store(self) -> None:
await self._index.watch(self._store, predicate=self._owns)
async def _tag_impl(self, event: TaggerEvent) -> list[TTPTag]:
out: list[TTPTag] = []
for rule in self._index.values():
if event.source_kind not in rule.applies_to:
continue
if not is_active(rule.state):
continue
kind = rule.match_spec.get("kind", "")
handler = _PREDICATES.get(kind)
if handler is None:
continue
extra = handler(rule.match_spec, event.payload)
if extra is None:
continue
# Evidence shape is pinned by CanaryFingerprintEvidence —
# only metric + matched_signature land in the tag. Raw
# fingerprint blobs explicitly NOT carried.
evidence: dict[str, Any] = {
"metric": extra.get("metric", ""),
"matched_signature": extra.get("matched_signature", ""),
}
out.extend(emit_tags(rule, event, evidence))
return out
__all__ = ["CanaryFingerprintLifter"]