Files
DECNET/tests/ttp/test_canary_fingerprint_lifter.py
anti f211d394e6 feat(ttp): E.3.11 CanaryFingerprintLifter (R0049-R0053)
Browser-payload derivations per Appendix A.9: navigator.webdriver flag,
canvas/audio/WebGL automation hash matches (Puppeteer/Playwright/
Selenium/curl-impersonate), WebRTC IP leak, TZ/language vs source-IP
geo mismatch, navigator.platform vs userAgent vs WebGL renderer
inconsistency.

Evidence shape pinned to CanaryFingerprintEvidence (metric +
matched_signature) — raw fingerprint blobs (canvas hashes, full UAs,
navigator.platform values) explicitly NOT carried into TTPTag.evidence
per TTP_TAGGING.md §'Hard parts §7' (enrichment vs tag boundary). The
identity-merge guard rail is preserved: composite fp.id matches across
IPs are NOT a TTP, so no rule fires on the bare hash.

Tests: tests/ttp/test_canary_fingerprint_lifter.py per-rule positive +
negative + evidence-shape guard + state modulation.
tests/ttp/rule_precision/test_canary_rules.py xfail flipped to real
precision (R0049/R0050/R0051/R0053 H-band ≥95%; R0052 M-band ≥80%).
2026-05-01 20:25:57 -04:00

147 lines
4.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Per-rule unit tests for :class:`CanaryFingerprintLifter` (E.3.11).
Pins the predicates for R0049R0053 and the
:class:`~decnet.web.db.models.ttp.CanaryFingerprintEvidence` shape
contract — raw fingerprint blobs MUST NOT leak into emitted tags.
"""
from __future__ import annotations
import asyncio
import logging
from pathlib import Path
from typing import Any
import pytest
from decnet.ttp.base import TaggerEvent
from decnet.ttp.impl.canary_fingerprint_lifter import CanaryFingerprintLifter
from decnet.ttp.impl.rule_engine import CompiledRule
from decnet.ttp.store.base import RuleState
from decnet.ttp.store.impl.filesystem import _parse_and_compile
from tests.ttp._stub_store import StubRuleStore
_RULES_DIR = Path(__file__).resolve().parents[2] / "rules" / "ttp"
def _compile(rule_id: str, state: RuleState | None = None) -> CompiledRule:
return _parse_and_compile(
_RULES_DIR / f"{rule_id}.yaml", state or RuleState(),
)
def _ev(payload: dict[str, Any]) -> TaggerEvent:
return TaggerEvent(
source_kind="canary_fingerprint",
source_id="src-canary",
attacker_uuid="att1",
identity_uuid=None,
session_id=None,
decky_id=None,
payload=payload,
)
def _make_lifter(rule_ids: list[str]) -> CanaryFingerprintLifter:
rules = [_compile(rid) for rid in rule_ids]
lifter = CanaryFingerprintLifter(StubRuleStore(compiled=rules))
for rule in rules:
lifter._index.install(rule)
return lifter
@pytest.mark.parametrize(
"rule_id,payload,techniques",
[
("R0049", {"navigator_webdriver": True}, {"T1059"}),
(
"R0050",
{"canvas_audio_hash_match": "puppeteer"},
{"T1059", "T1588"},
),
("R0051", {"webrtc_geo_mismatch": True}, {"T1090"}),
("R0052", {"tz_mismatch_zones": 5}, {"T1090"}),
("R0052", {"lang_country_mismatch": True}, {"T1090"}),
("R0053", {"platform_ua_inconsistent": True}, {"T1036"}),
],
)
def test_rule_fires_on_positive(
rule_id: str,
payload: dict[str, Any],
techniques: set[str],
) -> None:
lifter = _make_lifter([rule_id])
out = asyncio.run(lifter.tag(_ev(payload)))
assert out, f"{rule_id} did not fire on positive payload"
fired = {tag.technique_id for tag in out}
assert fired == techniques
def test_evidence_shape_only_metric_and_signature() -> None:
"""PII / blob-leak guard: emitted evidence keys ⊆ {metric, matched_signature}.
Raw canvas hashes, navigator props, full UA strings must NEVER make
it into TTPTag.evidence — they live on Attacker.fingerprints
(enrichment), not on the tag (TTP_TAGGING.md §"Hard parts §7").
"""
lifter = _make_lifter(["R0049"])
out = asyncio.run(lifter.tag(_ev({
"navigator_webdriver": True,
"canvas_hash": "should-not-appear-in-evidence",
"user_agent": "should-not-appear-in-evidence",
})))
assert out
for tag in out:
assert set(tag.evidence) <= {"metric", "matched_signature"}, (
f"unexpected evidence keys: {tag.evidence!r}"
)
def test_webdriver_false_no_fire() -> None:
lifter = _make_lifter(["R0049"])
out = asyncio.run(lifter.tag(_ev({"navigator_webdriver": False})))
assert out == []
def test_automation_hash_unknown_tool_no_fire() -> None:
lifter = _make_lifter(["R0050"])
out = asyncio.run(lifter.tag(_ev({
"canvas_audio_hash_match": "some_random_browser",
})))
assert out == []
def test_tz_below_threshold_no_fire() -> None:
lifter = _make_lifter(["R0052"])
out = asyncio.run(lifter.tag(_ev({"tz_mismatch_zones": 1})))
assert out == []
def test_disabled_state_no_emit() -> None:
rule = _compile("R0049", RuleState(state="disabled"))
lifter = CanaryFingerprintLifter(StubRuleStore())
lifter._index.install(rule)
out = asyncio.run(lifter.tag(_ev({"navigator_webdriver": True})))
assert out == []
def test_empty_payload_no_errors(caplog: pytest.LogCaptureFixture) -> None:
caplog.set_level(logging.DEBUG)
lifter = _make_lifter(["R0049", "R0050", "R0051", "R0052", "R0053"])
out = asyncio.run(lifter.tag(_ev({})))
assert out == []
assert not [r for r in caplog.records if r.levelno >= logging.ERROR]
def test_owns_only_canary_prefix() -> None:
behavioral = _compile("R0031")
canary = _compile("R0049")
lifter = CanaryFingerprintLifter(
StubRuleStore(compiled=[behavioral, canary]),
)
asyncio.run(lifter._index.hydrate_from(
lifter._store, predicate=lifter._owns, # type: ignore[arg-type]
))
assert lifter._index.get("R0049") is not None
assert lifter._index.get("R0031") is None