Pins the evidence shape for IPv6 link-local leakage findings. All fields optional (total=False) so partial observation (passive sniffer vs active solicitation) fills whatever the vector provides. Lifter lands in a subsequent commit.
247 lines
7.9 KiB
Python
247 lines
7.9 KiB
Python
"""Evidence shape contract tests (E.2.1b).
|
|
|
|
Pins the per-``source_kind`` ``TypedDict`` contract on
|
|
:class:`~decnet.web.db.models.ttp.TTPTag.evidence`.
|
|
|
|
The PII property — ``EmailEvidence`` carries no field for raw rcpt
|
|
addresses or body bytes — is GREEN today: it lives in the type, not
|
|
in code paths.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import typing
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import pytest
|
|
|
|
from decnet.ttp.base import TaggerEvent, TolerantTagger
|
|
from decnet.ttp.impl.canary_fingerprint_lifter import CanaryFingerprintLifter
|
|
from decnet.ttp.impl.email_lifter import EmailLifter
|
|
from decnet.ttp.impl.http_fingerprint_lifter import HttpFingerprintLifter
|
|
from decnet.ttp.impl.intel_lifter import IntelLifter
|
|
from decnet.ttp.impl.rule_engine import CompiledRule
|
|
from decnet.ttp.store.base import RuleState
|
|
from decnet.ttp.store.impl.filesystem import _parse_and_compile
|
|
from decnet.web.db.models.ttp import (
|
|
CanaryFingerprintEvidence,
|
|
CommandEvidence,
|
|
EmailEvidence,
|
|
HttpFingerprintEvidence,
|
|
IntelEvidence,
|
|
Ipv6LinkLocalLeakEvidence,
|
|
TTPTag,
|
|
compute_tag_uuid,
|
|
)
|
|
from tests.ttp._stub_store import StubRuleStore
|
|
|
|
|
|
_RULES_DIR = Path(__file__).resolve().parents[2] / "rules" / "ttp"
|
|
|
|
|
|
# ── PII rule §6: type-level, GREEN today ────────────────────────────
|
|
|
|
|
|
def test_email_evidence_excludes_raw_rcpt_and_body() -> None:
|
|
"""``EmailEvidence`` MUST NOT carry raw recipient addresses or
|
|
body bytes. The PII discipline lives in the *type* — a lifter that
|
|
tries to leak them fails type-check before it can run.
|
|
"""
|
|
keys = (
|
|
EmailEvidence.__required_keys__ | EmailEvidence.__optional_keys__
|
|
)
|
|
assert "rcpt_to_list" not in keys
|
|
assert "body" not in keys
|
|
|
|
|
|
def test_command_evidence_keys() -> None:
|
|
keys = (
|
|
CommandEvidence.__required_keys__ | CommandEvidence.__optional_keys__
|
|
)
|
|
assert keys == {"matched_tokens", "rule_pattern"}
|
|
|
|
|
|
def test_intel_evidence_keys() -> None:
|
|
keys = (
|
|
IntelEvidence.__required_keys__ | IntelEvidence.__optional_keys__
|
|
)
|
|
assert keys == {
|
|
# AbuseIPDB
|
|
"abuseipdb_categories", "abuseipdb_score", "abuse_confidence_score",
|
|
# GreyNoise
|
|
"greynoise_classification", "greynoise_tags", "greynoise_name",
|
|
# Feodo
|
|
"feodo_listed", "feodo_malware_family", "first_seen_feodo", "malware_family",
|
|
# ThreatFox
|
|
"threatfox_threat_types", "threatfox_ioc_types", "threatfox_malware_families",
|
|
"threat_types", "malware_families", "ioc_types",
|
|
# Aggregate meta-rule
|
|
"aggregate_verdict", "bumped_rule_ids",
|
|
}
|
|
|
|
|
|
def test_canary_fingerprint_evidence_keys() -> None:
|
|
keys = (
|
|
CanaryFingerprintEvidence.__required_keys__
|
|
| CanaryFingerprintEvidence.__optional_keys__
|
|
)
|
|
assert keys == {"metric", "matched_signature"}
|
|
|
|
|
|
def test_http_fingerprint_evidence_keys() -> None:
|
|
keys = (
|
|
HttpFingerprintEvidence.__required_keys__
|
|
| HttpFingerprintEvidence.__optional_keys__
|
|
)
|
|
assert keys == {"kind", "hash", "protocol", "client_ip", "seen_at", "raw"}
|
|
|
|
|
|
def test_ipv6_link_local_leak_evidence_keys() -> None:
|
|
keys = (
|
|
Ipv6LinkLocalLeakEvidence.__required_keys__
|
|
| Ipv6LinkLocalLeakEvidence.__optional_keys__
|
|
)
|
|
assert keys == {
|
|
"addr", "mac_oui", "iid_kind", "vector",
|
|
"on_iface", "attacker_v4", "observed_at",
|
|
}
|
|
|
|
|
|
# ── Per-lifter parametrized positive case ───────────────────────────
|
|
|
|
|
|
def _ev(source_kind: str, payload: dict[str, Any]) -> TaggerEvent:
|
|
return TaggerEvent(
|
|
source_kind=source_kind,
|
|
source_id="src1",
|
|
attacker_uuid="att_1",
|
|
identity_uuid="id_1",
|
|
session_id="sess_1",
|
|
decky_id="decky_1",
|
|
payload=payload,
|
|
)
|
|
|
|
|
|
def _compile_yaml(rule_id: str) -> CompiledRule:
|
|
return _parse_and_compile(_RULES_DIR / f"{rule_id}.yaml", RuleState())
|
|
|
|
|
|
def _hfp_rule() -> CompiledRule:
|
|
"""HFP-0001 has no backing YAML — construct it directly."""
|
|
return CompiledRule(
|
|
rule_id="HFP-0001",
|
|
rule_version=1,
|
|
name="scanner_ja4h",
|
|
applies_to=frozenset({"http_fingerprint"}),
|
|
match_spec={},
|
|
emits=(("T1592.002", "T1592", "TA0043", 0.7),),
|
|
evidence_fields=("kind", "hash", "protocol", "client_ip", "seen_at", "raw"),
|
|
state=RuleState(),
|
|
)
|
|
|
|
|
|
_LIFTER_CASES: list[tuple[str, Any, Any, Any, dict[str, Any]]] = [
|
|
(
|
|
"http_fingerprint",
|
|
HttpFingerprintLifter,
|
|
HttpFingerprintEvidence,
|
|
_hfp_rule,
|
|
{"ja4h": "GE11nn0000_cafebabe", "protocol": "h1",
|
|
"client_ip": "10.0.0.1", "seen_at": "2024-01-01T00:00:00Z"},
|
|
),
|
|
(
|
|
"intel",
|
|
IntelLifter,
|
|
IntelEvidence,
|
|
lambda: _compile_yaml("R0054"),
|
|
{"abuseipdb_score": 90.0, "abuseipdb_categories": [18, 22]},
|
|
),
|
|
(
|
|
"email",
|
|
EmailLifter,
|
|
EmailEvidence,
|
|
lambda: _compile_yaml("R0042"),
|
|
{"rcpt_count": 30, "body_simhash": "abc123sha256"},
|
|
),
|
|
(
|
|
"canary_fingerprint",
|
|
CanaryFingerprintLifter,
|
|
CanaryFingerprintEvidence,
|
|
lambda: _compile_yaml("R0049"),
|
|
{"navigator_webdriver": True},
|
|
),
|
|
]
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"source_kind, lifter_cls, td_cls, rule_factory, payload",
|
|
_LIFTER_CASES,
|
|
ids=["http_fingerprint", "intel", "email", "canary_fingerprint"],
|
|
)
|
|
def test_lifter_emits_evidence_matching_typeddict(
|
|
source_kind: str,
|
|
lifter_cls: type[TolerantTagger],
|
|
td_cls: Any,
|
|
rule_factory: Any,
|
|
payload: dict[str, Any],
|
|
) -> None:
|
|
"""Each lifter's emitted ``evidence`` dict structurally matches
|
|
its ``TypedDict``: keys are a subset of the declared keys and
|
|
runtime types of the present values agree with the hints.
|
|
"""
|
|
rule = rule_factory()
|
|
lifter = lifter_cls(StubRuleStore(compiled=[rule]))
|
|
lifter._index.install(rule)
|
|
out = asyncio.run(lifter.tag(_ev(source_kind, payload)))
|
|
assert out, "lifter emitted no tags — cannot verify evidence shape"
|
|
tag = out[0]
|
|
|
|
declared = td_cls.__required_keys__ | td_cls.__optional_keys__
|
|
hints = typing.get_type_hints(td_cls)
|
|
for key, value in tag.evidence.items():
|
|
assert key in declared, f"evidence key {key!r} not in {td_cls.__name__}"
|
|
hint = hints.get(key)
|
|
if hint in (str, int, float, bool, list, dict):
|
|
assert isinstance(value, hint)
|
|
|
|
|
|
# ── Negative case: shape violation propagates (impl phase) ──────────
|
|
|
|
|
|
def test_evidence_shape_violation_propagates_as_typeerror() -> None:
|
|
"""A lifter that emits an evidence dict with a key not in its
|
|
``TypedDict`` is a programmer error — it MUST propagate past the
|
|
``TolerantTagger`` boundary as ``TypeError``, not silently land
|
|
among "absence is normal" swallowed exceptions.
|
|
"""
|
|
|
|
class BadShapeLifter(TolerantTagger):
|
|
name = "bad_shape"
|
|
HANDLES = frozenset({"command"})
|
|
|
|
async def _tag_impl(self, event: TaggerEvent) -> list[TTPTag]:
|
|
# ``not_in_typeddict`` is not a CommandEvidence key — the
|
|
# tolerant boundary must let this through.
|
|
return [
|
|
TTPTag(
|
|
uuid=compute_tag_uuid(
|
|
"command", "src1", "R0001", 1, "T1083", None,
|
|
),
|
|
source_kind="command",
|
|
source_id="src1",
|
|
attacker_uuid="att_1",
|
|
identity_uuid="id_1",
|
|
tactic="TA0007",
|
|
technique_id="T1083",
|
|
confidence=0.5,
|
|
rule_id="R0001",
|
|
rule_version=1,
|
|
evidence={"not_in_typeddict": True},
|
|
attack_release="enterprise-v15.1",
|
|
)
|
|
]
|
|
|
|
with pytest.raises(TypeError):
|
|
asyncio.run(BadShapeLifter().tag(_ev("command")))
|