Files
DECNET/tests/ttp/test_canary_fingerprint_lifter.py
anti f2b3393669 chore: relicense to AGPL-3.0-or-later and add SPDX headers
Replaces LICENSE (GPLv3 -> AGPLv3) and prepends
`SPDX-License-Identifier: AGPL-3.0-or-later` to every source file
across decnet/, decnet_web/, tests/, scripts/, and tools/.

Rationale: closes the GPLv3 ASP loophole so any party operating a
modified DECNET as a network service must offer their modified
source. Personal copyright (Samuel Paschuan) + inbound=outbound
contributions make a future unilateral relicense infeasible.

- LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt)
- COPYRIGHT: project copyright notice
- tools/add_spdx_headers.py: idempotent header injector
  (shebang- and PEP 263-aware)

Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh).
No behavior change; comments only.
2026-05-22 21:04:16 -04:00

148 lines
4.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Per-rule unit tests for :class:`CanaryFingerprintLifter` (E.3.11).
Pins the predicates for R0049R0053 and the
:class:`~decnet.web.db.models.ttp.CanaryFingerprintEvidence` shape
contract — raw fingerprint blobs MUST NOT leak into emitted tags.
"""
from __future__ import annotations
import asyncio
import logging
from pathlib import Path
from typing import Any
import pytest
from decnet.ttp.base import TaggerEvent
from decnet.ttp.impl.canary_fingerprint_lifter import CanaryFingerprintLifter
from decnet.ttp.impl.rule_engine import CompiledRule
from decnet.ttp.store.base import RuleState
from decnet.ttp.store.impl.filesystem import _parse_and_compile
from tests.ttp._stub_store import StubRuleStore
_RULES_DIR = Path(__file__).resolve().parents[2] / "rules" / "ttp"
def _compile(rule_id: str, state: RuleState | None = None) -> CompiledRule:
return _parse_and_compile(
_RULES_DIR / f"{rule_id}.yaml", state or RuleState(),
)
def _ev(payload: dict[str, Any]) -> TaggerEvent:
return TaggerEvent(
source_kind="canary_fingerprint",
source_id="src-canary",
attacker_uuid="att1",
identity_uuid=None,
session_id=None,
decky_id=None,
payload=payload,
)
def _make_lifter(rule_ids: list[str]) -> CanaryFingerprintLifter:
rules = [_compile(rid) for rid in rule_ids]
lifter = CanaryFingerprintLifter(StubRuleStore(compiled=rules))
for rule in rules:
lifter._index.install(rule)
return lifter
@pytest.mark.parametrize(
"rule_id,payload,techniques",
[
("R0049", {"navigator_webdriver": True}, {"T1059"}),
(
"R0050",
{"canvas_audio_hash_match": "puppeteer"},
{"T1059", "T1588"},
),
("R0051", {"webrtc_geo_mismatch": True}, {"T1090"}),
("R0052", {"tz_mismatch_zones": 5}, {"T1090"}),
("R0052", {"lang_country_mismatch": True}, {"T1090"}),
("R0053", {"platform_ua_inconsistent": True}, {"T1036"}),
],
)
def test_rule_fires_on_positive(
rule_id: str,
payload: dict[str, Any],
techniques: set[str],
) -> None:
lifter = _make_lifter([rule_id])
out = asyncio.run(lifter.tag(_ev(payload)))
assert out, f"{rule_id} did not fire on positive payload"
fired = {tag.technique_id for tag in out}
assert fired == techniques
def test_evidence_shape_only_metric_and_signature() -> None:
"""PII / blob-leak guard: emitted evidence keys ⊆ {metric, matched_signature}.
Raw canvas hashes, navigator props, full UA strings must NEVER make
it into TTPTag.evidence — they live on Attacker.fingerprints
(enrichment), not on the tag (TTP_TAGGING.md §"Hard parts §7").
"""
lifter = _make_lifter(["R0049"])
out = asyncio.run(lifter.tag(_ev({
"navigator_webdriver": True,
"canvas_hash": "should-not-appear-in-evidence",
"user_agent": "should-not-appear-in-evidence",
})))
assert out
for tag in out:
assert set(tag.evidence) <= {"metric", "matched_signature"}, (
f"unexpected evidence keys: {tag.evidence!r}"
)
def test_webdriver_false_no_fire() -> None:
lifter = _make_lifter(["R0049"])
out = asyncio.run(lifter.tag(_ev({"navigator_webdriver": False})))
assert out == []
def test_automation_hash_unknown_tool_no_fire() -> None:
lifter = _make_lifter(["R0050"])
out = asyncio.run(lifter.tag(_ev({
"canvas_audio_hash_match": "some_random_browser",
})))
assert out == []
def test_tz_below_threshold_no_fire() -> None:
lifter = _make_lifter(["R0052"])
out = asyncio.run(lifter.tag(_ev({"tz_mismatch_zones": 1})))
assert out == []
def test_disabled_state_no_emit() -> None:
rule = _compile("R0049", RuleState(state="disabled"))
lifter = CanaryFingerprintLifter(StubRuleStore())
lifter._index.install(rule)
out = asyncio.run(lifter.tag(_ev({"navigator_webdriver": True})))
assert out == []
def test_empty_payload_no_errors(caplog: pytest.LogCaptureFixture) -> None:
caplog.set_level(logging.DEBUG)
lifter = _make_lifter(["R0049", "R0050", "R0051", "R0052", "R0053"])
out = asyncio.run(lifter.tag(_ev({})))
assert out == []
assert not [r for r in caplog.records if r.levelno >= logging.ERROR]
def test_owns_only_canary_prefix() -> None:
behavioral = _compile("R0031")
canary = _compile("R0049")
lifter = CanaryFingerprintLifter(
StubRuleStore(compiled=[behavioral, canary]),
)
asyncio.run(lifter._index.hydrate_from(
lifter._store, predicate=lifter._owns, # type: ignore[arg-type]
))
assert lifter._index.get("R0049") is not None
assert lifter._index.get("R0031") is None