Replaces LICENSE (GPLv3 -> AGPLv3) and prepends `SPDX-License-Identifier: AGPL-3.0-or-later` to every source file across decnet/, decnet_web/, tests/, scripts/, and tools/. Rationale: closes the GPLv3 ASP loophole so any party operating a modified DECNET as a network service must offer their modified source. Personal copyright (Samuel Paschuan) + inbound=outbound contributions make a future unilateral relicense infeasible. - LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt) - COPYRIGHT: project copyright notice - tools/add_spdx_headers.py: idempotent header injector (shebang- and PEP 263-aware) Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh). No behavior change; comments only.
148 lines
4.7 KiB
Python
148 lines
4.7 KiB
Python
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||
"""Per-rule unit tests for :class:`CanaryFingerprintLifter` (E.3.11).
|
||
|
||
Pins the predicates for R0049–R0053 and the
|
||
:class:`~decnet.web.db.models.ttp.CanaryFingerprintEvidence` shape
|
||
contract — raw fingerprint blobs MUST NOT leak into emitted tags.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import logging
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
import pytest
|
||
|
||
from decnet.ttp.base import TaggerEvent
|
||
from decnet.ttp.impl.canary_fingerprint_lifter import CanaryFingerprintLifter
|
||
from decnet.ttp.impl.rule_engine import CompiledRule
|
||
from decnet.ttp.store.base import RuleState
|
||
from decnet.ttp.store.impl.filesystem import _parse_and_compile
|
||
from tests.ttp._stub_store import StubRuleStore
|
||
|
||
|
||
_RULES_DIR = Path(__file__).resolve().parents[2] / "rules" / "ttp"
|
||
|
||
|
||
def _compile(rule_id: str, state: RuleState | None = None) -> CompiledRule:
|
||
return _parse_and_compile(
|
||
_RULES_DIR / f"{rule_id}.yaml", state or RuleState(),
|
||
)
|
||
|
||
|
||
def _ev(payload: dict[str, Any]) -> TaggerEvent:
|
||
return TaggerEvent(
|
||
source_kind="canary_fingerprint",
|
||
source_id="src-canary",
|
||
attacker_uuid="att1",
|
||
identity_uuid=None,
|
||
session_id=None,
|
||
decky_id=None,
|
||
payload=payload,
|
||
)
|
||
|
||
|
||
def _make_lifter(rule_ids: list[str]) -> CanaryFingerprintLifter:
|
||
rules = [_compile(rid) for rid in rule_ids]
|
||
lifter = CanaryFingerprintLifter(StubRuleStore(compiled=rules))
|
||
for rule in rules:
|
||
lifter._index.install(rule)
|
||
return lifter
|
||
|
||
|
||
@pytest.mark.parametrize(
|
||
"rule_id,payload,techniques",
|
||
[
|
||
("R0049", {"navigator_webdriver": True}, {"T1059"}),
|
||
(
|
||
"R0050",
|
||
{"canvas_audio_hash_match": "puppeteer"},
|
||
{"T1059", "T1588"},
|
||
),
|
||
("R0051", {"webrtc_geo_mismatch": True}, {"T1090"}),
|
||
("R0052", {"tz_mismatch_zones": 5}, {"T1090"}),
|
||
("R0052", {"lang_country_mismatch": True}, {"T1090"}),
|
||
("R0053", {"platform_ua_inconsistent": True}, {"T1036"}),
|
||
],
|
||
)
|
||
def test_rule_fires_on_positive(
|
||
rule_id: str,
|
||
payload: dict[str, Any],
|
||
techniques: set[str],
|
||
) -> None:
|
||
lifter = _make_lifter([rule_id])
|
||
out = asyncio.run(lifter.tag(_ev(payload)))
|
||
assert out, f"{rule_id} did not fire on positive payload"
|
||
fired = {tag.technique_id for tag in out}
|
||
assert fired == techniques
|
||
|
||
|
||
def test_evidence_shape_only_metric_and_signature() -> None:
|
||
"""PII / blob-leak guard: emitted evidence keys ⊆ {metric, matched_signature}.
|
||
|
||
Raw canvas hashes, navigator props, full UA strings must NEVER make
|
||
it into TTPTag.evidence — they live on Attacker.fingerprints
|
||
(enrichment), not on the tag (TTP_TAGGING.md §"Hard parts §7").
|
||
"""
|
||
lifter = _make_lifter(["R0049"])
|
||
out = asyncio.run(lifter.tag(_ev({
|
||
"navigator_webdriver": True,
|
||
"canvas_hash": "should-not-appear-in-evidence",
|
||
"user_agent": "should-not-appear-in-evidence",
|
||
})))
|
||
assert out
|
||
for tag in out:
|
||
assert set(tag.evidence) <= {"metric", "matched_signature"}, (
|
||
f"unexpected evidence keys: {tag.evidence!r}"
|
||
)
|
||
|
||
|
||
def test_webdriver_false_no_fire() -> None:
|
||
lifter = _make_lifter(["R0049"])
|
||
out = asyncio.run(lifter.tag(_ev({"navigator_webdriver": False})))
|
||
assert out == []
|
||
|
||
|
||
def test_automation_hash_unknown_tool_no_fire() -> None:
|
||
lifter = _make_lifter(["R0050"])
|
||
out = asyncio.run(lifter.tag(_ev({
|
||
"canvas_audio_hash_match": "some_random_browser",
|
||
})))
|
||
assert out == []
|
||
|
||
|
||
def test_tz_below_threshold_no_fire() -> None:
|
||
lifter = _make_lifter(["R0052"])
|
||
out = asyncio.run(lifter.tag(_ev({"tz_mismatch_zones": 1})))
|
||
assert out == []
|
||
|
||
|
||
def test_disabled_state_no_emit() -> None:
|
||
rule = _compile("R0049", RuleState(state="disabled"))
|
||
lifter = CanaryFingerprintLifter(StubRuleStore())
|
||
lifter._index.install(rule)
|
||
out = asyncio.run(lifter.tag(_ev({"navigator_webdriver": True})))
|
||
assert out == []
|
||
|
||
|
||
def test_empty_payload_no_errors(caplog: pytest.LogCaptureFixture) -> None:
|
||
caplog.set_level(logging.DEBUG)
|
||
lifter = _make_lifter(["R0049", "R0050", "R0051", "R0052", "R0053"])
|
||
out = asyncio.run(lifter.tag(_ev({})))
|
||
assert out == []
|
||
assert not [r for r in caplog.records if r.levelno >= logging.ERROR]
|
||
|
||
|
||
def test_owns_only_canary_prefix() -> None:
|
||
behavioral = _compile("R0031")
|
||
canary = _compile("R0049")
|
||
lifter = CanaryFingerprintLifter(
|
||
StubRuleStore(compiled=[behavioral, canary]),
|
||
)
|
||
asyncio.run(lifter._index.hydrate_from(
|
||
lifter._store, predicate=lifter._owns, # type: ignore[arg-type]
|
||
))
|
||
assert lifter._index.get("R0049") is not None
|
||
assert lifter._index.get("R0031") is None
|