diff --git a/decnet/ttp/factory.py b/decnet/ttp/factory.py index f95508ca..9cdf848d 100644 --- a/decnet/ttp/factory.py +++ b/decnet/ttp/factory.py @@ -115,12 +115,16 @@ def get_tagger() -> Tagger: name = os.environ.get("DECNET_TTP_TAGGER_TYPE", _DEFAULT).strip().lower() if name == "composite": from decnet.ttp.impl.behavioral_lifter import BehavioralLifter + from decnet.ttp.impl.canary_fingerprint_lifter import ( + CanaryFingerprintLifter, + ) from decnet.ttp.impl.intel_lifter import IntelLifter from decnet.ttp.store.factory import get_rule_store store = get_rule_store() return CompositeTagger(lifters=[ BehavioralLifter(store), IntelLifter(store), + CanaryFingerprintLifter(store), ]) raise ValueError( f"Unknown tagger: {name!r}. Known: {_KNOWN}" diff --git a/decnet/ttp/impl/canary_fingerprint_lifter.py b/decnet/ttp/impl/canary_fingerprint_lifter.py index bf38fd8c..8cd03e86 100644 --- a/decnet/ttp/impl/canary_fingerprint_lifter.py +++ b/decnet/ttp/impl/canary_fingerprint_lifter.py @@ -1,25 +1,159 @@ -"""Canary fingerprint lifter — browser-payload derived technique tagger. +"""Canary fingerprint lifter — browser-payload derived technique tagger (E.3.11). -Contract step E.1.6 of ``development/TTP_TAGGING.md``. Empty body. -Implementation phase reads canary-payload fingerprints (navigator -properties, canvas hashes, proxy/VPN leakage signatures) and emits -Discovery / Defense-Evasion techniques. The evidence shape is pinned -to :class:`~decnet.web.db.models.ttp.CanaryFingerprintEvidence` +Reads canary-payload fingerprints (navigator properties, canvas hashes, +proxy/VPN leakage signatures) per Appendix A.9 and emits Discovery / +Defense-Evasion techniques. Evidence shape is pinned to +:class:`~decnet.web.db.models.ttp.CanaryFingerprintEvidence` (``metric`` + ``matched_signature``) — raw fingerprint blobs never -land in evidence. +land in the tag payload. The composite identity hash matching across +IPs is explicitly NOT a TTP (TTP_TAGGING.md §"Identity-merge guard +rail"); the lifter does not emit on it. """ from __future__ import annotations +from collections.abc import Callable +from typing import Any, Final + from decnet.ttp.base import TaggerEvent, TolerantTagger +from decnet.ttp.impl._emit import emit_tags +from decnet.ttp.impl._rule_index import RuleIndex +from decnet.ttp.impl._state import is_active +from decnet.ttp.impl.rule_engine import CompiledRule +from decnet.ttp.store.base import RuleStore from decnet.web.db.models.ttp import TTPTag +Predicate = Callable[ + [dict[str, Any], dict[str, Any]], + "dict[str, Any] | None", +] + + +def _p_webdriver( + _spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + if payload.get("navigator_webdriver") is True: + return { + "metric": "navigator_webdriver", + "matched_signature": "navigator.webdriver_true", + } + return None + + +def _p_automation_hash( + spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + catalogues_raw = spec.get("catalogues", []) + catalogues = ( + {c for c in catalogues_raw if isinstance(c, str)} + if isinstance(catalogues_raw, list) + else set() + ) + matched = payload.get("canvas_audio_hash_match") or payload.get("matched_tool") + if isinstance(matched, str) and (not catalogues or matched in catalogues): + return { + "metric": "canvas_audio_hash", + "matched_signature": matched, + "matched_tool": matched, + } + return None + + +def _p_webrtc_leak( + _spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + if payload.get("webrtc_geo_mismatch") is True: + return { + "metric": "webrtc_geo_mismatch", + "matched_signature": "webrtc_private_vs_source_ip", + } + return None + + +def _p_tz_lang_mismatch( + _spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + tz_zones = payload.get("tz_mismatch_zones") + lang_mismatch = payload.get("lang_country_mismatch") is True + tz_hit = isinstance(tz_zones, int) and tz_zones >= 3 + if tz_hit: + return { + "metric": "timezone_geo_mismatch", + "matched_signature": f"tz_zones>={tz_zones}", + } + if lang_mismatch: + return { + "metric": "language_country_mismatch", + "matched_signature": "lang_vs_source_country", + } + return None + + +def _p_platform_inconsistency( + _spec: dict[str, Any], payload: dict[str, Any], +) -> dict[str, Any] | None: + if payload.get("platform_ua_inconsistent") is True: + return { + "metric": "platform_ua_mismatch", + "matched_signature": "navigator.platform_vs_userAgent", + } + if payload.get("ua_webgl_mismatch") is True: + return { + "metric": "ua_webgl_mismatch", + "matched_signature": "userAgent_vs_webgl_renderer", + } + return None + + +_PREDICATES: Final[dict[str, Predicate]] = { + "lifter:canary_webdriver": _p_webdriver, + "lifter:canary_automation_hash": _p_automation_hash, + "lifter:canary_webrtc_leak": _p_webrtc_leak, + "lifter:canary_tz_lang_mismatch": _p_tz_lang_mismatch, + "lifter:canary_platform_inconsistency": _p_platform_inconsistency, +} + + class CanaryFingerprintLifter(TolerantTagger): name = "canary_fingerprint" HANDLES = frozenset({"canary_fingerprint"}) + OWNED_PREFIX: Final[str] = "lifter:canary_" + + def __init__(self, store: RuleStore) -> None: + self._store = store + self._index = RuleIndex() + + @classmethod + def _owns(cls, rule: CompiledRule) -> bool: + kind = rule.match_spec.get("kind", "") + return isinstance(kind, str) and kind.startswith(cls.OWNED_PREFIX) + + async def watch_store(self) -> None: + await self._index.watch(self._store, predicate=self._owns) async def _tag_impl(self, event: TaggerEvent) -> list[TTPTag]: - return [] + out: list[TTPTag] = [] + for rule in self._index.values(): + if event.source_kind not in rule.applies_to: + continue + if not is_active(rule.state): + continue + kind = rule.match_spec.get("kind", "") + handler = _PREDICATES.get(kind) + if handler is None: + continue + extra = handler(rule.match_spec, event.payload) + if extra is None: + continue + # Evidence shape is pinned by CanaryFingerprintEvidence — + # only metric + matched_signature land in the tag. Raw + # fingerprint blobs explicitly NOT carried. + evidence: dict[str, Any] = { + "metric": extra.get("metric", ""), + "matched_signature": extra.get("matched_signature", ""), + } + out.extend(emit_tags(rule, event, evidence)) + return out __all__ = ["CanaryFingerprintLifter"] diff --git a/tests/ttp/rule_precision/corpus/seed_canary.jsonl b/tests/ttp/rule_precision/corpus/seed_canary.jsonl index 42ed8da3..82f4d368 100644 --- a/tests/ttp/rule_precision/corpus/seed_canary.jsonl +++ b/tests/ttp/rule_precision/corpus/seed_canary.jsonl @@ -1,2 +1,7 @@ {"source_kind": "canary_fingerprint", "payload": {"ua_signature": "HeadlessChrome/119", "navigator_webdriver": true}, "expected_rule_ids": ["R0049"], "label": "webdriver_flag"} +{"source_kind": "canary_fingerprint", "payload": {"canvas_audio_hash_match": "puppeteer"}, "expected_rule_ids": ["R0050"], "label": "puppeteer_canvas"} +{"source_kind": "canary_fingerprint", "payload": {"canvas_audio_hash_match": "selenium"}, "expected_rule_ids": ["R0050"], "label": "selenium_canvas"} +{"source_kind": "canary_fingerprint", "payload": {"webrtc_geo_mismatch": true}, "expected_rule_ids": ["R0051"], "label": "webrtc_leak"} +{"source_kind": "canary_fingerprint", "payload": {"tz_mismatch_zones": 7}, "expected_rule_ids": ["R0052"], "label": "tz_mismatch"} +{"source_kind": "canary_fingerprint", "payload": {"platform_ua_inconsistent": true}, "expected_rule_ids": ["R0053"], "label": "platform_inconsistent"} {"source_kind": "canary_fingerprint", "payload": {"ua_signature": "Mozilla/5.0", "navigator_webdriver": false}, "expected_rule_ids": [], "label": "negative_browser"} diff --git a/tests/ttp/rule_precision/test_canary_rules.py b/tests/ttp/rule_precision/test_canary_rules.py index e533b14a..764aed5e 100644 --- a/tests/ttp/rule_precision/test_canary_rules.py +++ b/tests/ttp/rule_precision/test_canary_rules.py @@ -44,9 +44,45 @@ async def test_lifter_bound_inert_in_v0( assert rule_id not in fired +def _build_lifter() -> "CanaryFingerprintLifter": + from decnet.ttp.impl.canary_fingerprint_lifter import ( + CanaryFingerprintLifter, + ) + from tests.ttp._stub_store import StubRuleStore + + rules = [ + _parse_and_compile(Path("rules/ttp") / f"{rid}.yaml", RuleState()) + for rid in _RULE_IDS + ] + lifter = CanaryFingerprintLifter(StubRuleStore(compiled=rules)) + for rule in rules: + lifter._index.install(rule) + return lifter + + @pytest.mark.parametrize("rule_id", _RULE_IDS) -@pytest.mark.xfail( - strict=True, reason="impl phase E.3.11 (CanaryFingerprintLifter)", -) -def test_canary_rule_precision(rule_id: str) -> None: - pytest.fail(f"{rule_id}: CanaryFingerprintLifter not yet shipped (E.3.11)") +def test_canary_rule_precision( + rule_id: str, + corpus_loader: CohortLoader, +) -> None: + """E.3.11 — drive CanaryFingerprintLifter over the labelled corpus + and assert per-rule precision (H-band rules → ≥95%, M-band → ≥80%). + R0052 is M-band (0.7 confidence); the rest are H-band. + """ + import asyncio + + from tests.ttp.rule_precision.conftest import precision_for + + rows = corpus_loader("canary") + if not rows: + pytest.skip("no canary corpus available") + lifter = _build_lifter() + fired: dict[str, list[str]] = {} + for row in rows: + tags = asyncio.run(lifter.tag(make_event(row))) + fired[row.label] = [tag.rule_id for tag in tags] + precision, _tp, _fp = precision_for(rule_id, rows, fired) + threshold = 0.80 if rule_id == "R0052" else 0.95 + assert precision >= threshold, ( + f"{rule_id} precision {precision:.2f} < {threshold} on canary corpus" + ) diff --git a/tests/ttp/test_canary_fingerprint_lifter.py b/tests/ttp/test_canary_fingerprint_lifter.py new file mode 100644 index 00000000..1be9544c --- /dev/null +++ b/tests/ttp/test_canary_fingerprint_lifter.py @@ -0,0 +1,146 @@ +"""Per-rule unit tests for :class:`CanaryFingerprintLifter` (E.3.11). + +Pins the predicates for R0049–R0053 and the +:class:`~decnet.web.db.models.ttp.CanaryFingerprintEvidence` shape +contract — raw fingerprint blobs MUST NOT leak into emitted tags. +""" +from __future__ import annotations + +import asyncio +import logging +from pathlib import Path +from typing import Any + +import pytest + +from decnet.ttp.base import TaggerEvent +from decnet.ttp.impl.canary_fingerprint_lifter import CanaryFingerprintLifter +from decnet.ttp.impl.rule_engine import CompiledRule +from decnet.ttp.store.base import RuleState +from decnet.ttp.store.impl.filesystem import _parse_and_compile +from tests.ttp._stub_store import StubRuleStore + + +_RULES_DIR = Path(__file__).resolve().parents[2] / "rules" / "ttp" + + +def _compile(rule_id: str, state: RuleState | None = None) -> CompiledRule: + return _parse_and_compile( + _RULES_DIR / f"{rule_id}.yaml", state or RuleState(), + ) + + +def _ev(payload: dict[str, Any]) -> TaggerEvent: + return TaggerEvent( + source_kind="canary_fingerprint", + source_id="src-canary", + attacker_uuid="att1", + identity_uuid=None, + session_id=None, + decky_id=None, + payload=payload, + ) + + +def _make_lifter(rule_ids: list[str]) -> CanaryFingerprintLifter: + rules = [_compile(rid) for rid in rule_ids] + lifter = CanaryFingerprintLifter(StubRuleStore(compiled=rules)) + for rule in rules: + lifter._index.install(rule) + return lifter + + +@pytest.mark.parametrize( + "rule_id,payload,techniques", + [ + ("R0049", {"navigator_webdriver": True}, {"T1059"}), + ( + "R0050", + {"canvas_audio_hash_match": "puppeteer"}, + {"T1059", "T1588"}, + ), + ("R0051", {"webrtc_geo_mismatch": True}, {"T1090"}), + ("R0052", {"tz_mismatch_zones": 5}, {"T1090"}), + ("R0052", {"lang_country_mismatch": True}, {"T1090"}), + ("R0053", {"platform_ua_inconsistent": True}, {"T1036"}), + ], +) +def test_rule_fires_on_positive( + rule_id: str, + payload: dict[str, Any], + techniques: set[str], +) -> None: + lifter = _make_lifter([rule_id]) + out = asyncio.run(lifter.tag(_ev(payload))) + assert out, f"{rule_id} did not fire on positive payload" + fired = {tag.technique_id for tag in out} + assert fired == techniques + + +def test_evidence_shape_only_metric_and_signature() -> None: + """PII / blob-leak guard: emitted evidence keys ⊆ {metric, matched_signature}. + + Raw canvas hashes, navigator props, full UA strings must NEVER make + it into TTPTag.evidence — they live on Attacker.fingerprints + (enrichment), not on the tag (TTP_TAGGING.md §"Hard parts §7"). + """ + lifter = _make_lifter(["R0049"]) + out = asyncio.run(lifter.tag(_ev({ + "navigator_webdriver": True, + "canvas_hash": "should-not-appear-in-evidence", + "user_agent": "should-not-appear-in-evidence", + }))) + assert out + for tag in out: + assert set(tag.evidence) <= {"metric", "matched_signature"}, ( + f"unexpected evidence keys: {tag.evidence!r}" + ) + + +def test_webdriver_false_no_fire() -> None: + lifter = _make_lifter(["R0049"]) + out = asyncio.run(lifter.tag(_ev({"navigator_webdriver": False}))) + assert out == [] + + +def test_automation_hash_unknown_tool_no_fire() -> None: + lifter = _make_lifter(["R0050"]) + out = asyncio.run(lifter.tag(_ev({ + "canvas_audio_hash_match": "some_random_browser", + }))) + assert out == [] + + +def test_tz_below_threshold_no_fire() -> None: + lifter = _make_lifter(["R0052"]) + out = asyncio.run(lifter.tag(_ev({"tz_mismatch_zones": 1}))) + assert out == [] + + +def test_disabled_state_no_emit() -> None: + rule = _compile("R0049", RuleState(state="disabled")) + lifter = CanaryFingerprintLifter(StubRuleStore()) + lifter._index.install(rule) + out = asyncio.run(lifter.tag(_ev({"navigator_webdriver": True}))) + assert out == [] + + +def test_empty_payload_no_errors(caplog: pytest.LogCaptureFixture) -> None: + caplog.set_level(logging.DEBUG) + lifter = _make_lifter(["R0049", "R0050", "R0051", "R0052", "R0053"]) + out = asyncio.run(lifter.tag(_ev({}))) + assert out == [] + assert not [r for r in caplog.records if r.levelno >= logging.ERROR] + + +def test_owns_only_canary_prefix() -> None: + behavioral = _compile("R0031") + canary = _compile("R0049") + lifter = CanaryFingerprintLifter( + StubRuleStore(compiled=[behavioral, canary]), + ) + asyncio.run(lifter._index.hydrate_from( + lifter._store, predicate=lifter._owns, # type: ignore[arg-type] + )) + assert lifter._index.get("R0049") is not None + assert lifter._index.get("R0031") is None diff --git a/tests/ttp/test_lifter_absence.py b/tests/ttp/test_lifter_absence.py index 2c9cb4a3..eb87fba6 100644 --- a/tests/ttp/test_lifter_absence.py +++ b/tests/ttp/test_lifter_absence.py @@ -42,7 +42,7 @@ def _make_lifter(cls: type[TolerantTagger]) -> TolerantTagger: Implemented lifters (E.3.9–E.3.12) take a :class:`RuleStore`; the still-empty IdentityLifter / CredentialLifter (E.3.13) take no args. """ - if cls in {BehavioralLifter, IntelLifter}: + if cls in {BehavioralLifter, IntelLifter, CanaryFingerprintLifter}: return cls(StubRuleStore()) # type: ignore[call-arg] return cls() diff --git a/tests/ttp/test_lifters.py b/tests/ttp/test_lifters.py index 17a4020d..fbf5c858 100644 --- a/tests/ttp/test_lifters.py +++ b/tests/ttp/test_lifters.py @@ -24,7 +24,7 @@ from tests.ttp._stub_store import StubRuleStore def _instantiate(cls: type[TolerantTagger]) -> TolerantTagger: - if cls in {BehavioralLifter, IntelLifter}: + if cls in {BehavioralLifter, IntelLifter, CanaryFingerprintLifter}: return cls(StubRuleStore()) # type: ignore[call-arg] return cls()