feat(ttp): E.3.11 CanaryFingerprintLifter (R0049-R0053)

Browser-payload derivations per Appendix A.9: navigator.webdriver flag,
canvas/audio/WebGL automation hash matches (Puppeteer/Playwright/
Selenium/curl-impersonate), WebRTC IP leak, TZ/language vs source-IP
geo mismatch, navigator.platform vs userAgent vs WebGL renderer
inconsistency.

Evidence shape pinned to CanaryFingerprintEvidence (metric +
matched_signature) — raw fingerprint blobs (canvas hashes, full UAs,
navigator.platform values) explicitly NOT carried into TTPTag.evidence
per TTP_TAGGING.md §'Hard parts §7' (enrichment vs tag boundary). The
identity-merge guard rail is preserved: composite fp.id matches across
IPs are NOT a TTP, so no rule fires on the bare hash.

Tests: tests/ttp/test_canary_fingerprint_lifter.py per-rule positive +
negative + evidence-shape guard + state modulation.
tests/ttp/rule_precision/test_canary_rules.py xfail flipped to real
precision (R0049/R0050/R0051/R0053 H-band ≥95%; R0052 M-band ≥80%).
This commit is contained in:
2026-05-01 20:25:57 -04:00
parent 7865e71aa9
commit f211d394e6
7 changed files with 340 additions and 15 deletions

View File

@@ -115,12 +115,16 @@ def get_tagger() -> Tagger:
name = os.environ.get("DECNET_TTP_TAGGER_TYPE", _DEFAULT).strip().lower()
if name == "composite":
from decnet.ttp.impl.behavioral_lifter import BehavioralLifter
from decnet.ttp.impl.canary_fingerprint_lifter import (
CanaryFingerprintLifter,
)
from decnet.ttp.impl.intel_lifter import IntelLifter
from decnet.ttp.store.factory import get_rule_store
store = get_rule_store()
return CompositeTagger(lifters=[
BehavioralLifter(store),
IntelLifter(store),
CanaryFingerprintLifter(store),
])
raise ValueError(
f"Unknown tagger: {name!r}. Known: {_KNOWN}"

View File

@@ -1,25 +1,159 @@
"""Canary fingerprint lifter — browser-payload derived technique tagger.
"""Canary fingerprint lifter — browser-payload derived technique tagger (E.3.11).
Contract step E.1.6 of ``development/TTP_TAGGING.md``. Empty body.
Implementation phase reads canary-payload fingerprints (navigator
properties, canvas hashes, proxy/VPN leakage signatures) and emits
Discovery / Defense-Evasion techniques. The evidence shape is pinned
to :class:`~decnet.web.db.models.ttp.CanaryFingerprintEvidence`
Reads canary-payload fingerprints (navigator properties, canvas hashes,
proxy/VPN leakage signatures) per Appendix A.9 and emits Discovery /
Defense-Evasion techniques. Evidence shape is pinned to
:class:`~decnet.web.db.models.ttp.CanaryFingerprintEvidence`
(``metric`` + ``matched_signature``) — raw fingerprint blobs never
land in evidence.
land in the tag payload. The composite identity hash matching across
IPs is explicitly NOT a TTP (TTP_TAGGING.md §"Identity-merge guard
rail"); the lifter does not emit on it.
"""
from __future__ import annotations
from collections.abc import Callable
from typing import Any, Final
from decnet.ttp.base import TaggerEvent, TolerantTagger
from decnet.ttp.impl._emit import emit_tags
from decnet.ttp.impl._rule_index import RuleIndex
from decnet.ttp.impl._state import is_active
from decnet.ttp.impl.rule_engine import CompiledRule
from decnet.ttp.store.base import RuleStore
from decnet.web.db.models.ttp import TTPTag
Predicate = Callable[
[dict[str, Any], dict[str, Any]],
"dict[str, Any] | None",
]
def _p_webdriver(
_spec: dict[str, Any], payload: dict[str, Any],
) -> dict[str, Any] | None:
if payload.get("navigator_webdriver") is True:
return {
"metric": "navigator_webdriver",
"matched_signature": "navigator.webdriver_true",
}
return None
def _p_automation_hash(
spec: dict[str, Any], payload: dict[str, Any],
) -> dict[str, Any] | None:
catalogues_raw = spec.get("catalogues", [])
catalogues = (
{c for c in catalogues_raw if isinstance(c, str)}
if isinstance(catalogues_raw, list)
else set()
)
matched = payload.get("canvas_audio_hash_match") or payload.get("matched_tool")
if isinstance(matched, str) and (not catalogues or matched in catalogues):
return {
"metric": "canvas_audio_hash",
"matched_signature": matched,
"matched_tool": matched,
}
return None
def _p_webrtc_leak(
_spec: dict[str, Any], payload: dict[str, Any],
) -> dict[str, Any] | None:
if payload.get("webrtc_geo_mismatch") is True:
return {
"metric": "webrtc_geo_mismatch",
"matched_signature": "webrtc_private_vs_source_ip",
}
return None
def _p_tz_lang_mismatch(
_spec: dict[str, Any], payload: dict[str, Any],
) -> dict[str, Any] | None:
tz_zones = payload.get("tz_mismatch_zones")
lang_mismatch = payload.get("lang_country_mismatch") is True
tz_hit = isinstance(tz_zones, int) and tz_zones >= 3
if tz_hit:
return {
"metric": "timezone_geo_mismatch",
"matched_signature": f"tz_zones>={tz_zones}",
}
if lang_mismatch:
return {
"metric": "language_country_mismatch",
"matched_signature": "lang_vs_source_country",
}
return None
def _p_platform_inconsistency(
_spec: dict[str, Any], payload: dict[str, Any],
) -> dict[str, Any] | None:
if payload.get("platform_ua_inconsistent") is True:
return {
"metric": "platform_ua_mismatch",
"matched_signature": "navigator.platform_vs_userAgent",
}
if payload.get("ua_webgl_mismatch") is True:
return {
"metric": "ua_webgl_mismatch",
"matched_signature": "userAgent_vs_webgl_renderer",
}
return None
_PREDICATES: Final[dict[str, Predicate]] = {
"lifter:canary_webdriver": _p_webdriver,
"lifter:canary_automation_hash": _p_automation_hash,
"lifter:canary_webrtc_leak": _p_webrtc_leak,
"lifter:canary_tz_lang_mismatch": _p_tz_lang_mismatch,
"lifter:canary_platform_inconsistency": _p_platform_inconsistency,
}
class CanaryFingerprintLifter(TolerantTagger):
name = "canary_fingerprint"
HANDLES = frozenset({"canary_fingerprint"})
OWNED_PREFIX: Final[str] = "lifter:canary_"
def __init__(self, store: RuleStore) -> None:
self._store = store
self._index = RuleIndex()
@classmethod
def _owns(cls, rule: CompiledRule) -> bool:
kind = rule.match_spec.get("kind", "")
return isinstance(kind, str) and kind.startswith(cls.OWNED_PREFIX)
async def watch_store(self) -> None:
await self._index.watch(self._store, predicate=self._owns)
async def _tag_impl(self, event: TaggerEvent) -> list[TTPTag]:
return []
out: list[TTPTag] = []
for rule in self._index.values():
if event.source_kind not in rule.applies_to:
continue
if not is_active(rule.state):
continue
kind = rule.match_spec.get("kind", "")
handler = _PREDICATES.get(kind)
if handler is None:
continue
extra = handler(rule.match_spec, event.payload)
if extra is None:
continue
# Evidence shape is pinned by CanaryFingerprintEvidence —
# only metric + matched_signature land in the tag. Raw
# fingerprint blobs explicitly NOT carried.
evidence: dict[str, Any] = {
"metric": extra.get("metric", ""),
"matched_signature": extra.get("matched_signature", ""),
}
out.extend(emit_tags(rule, event, evidence))
return out
__all__ = ["CanaryFingerprintLifter"]