Per-provider verdict translator for AbuseIPDB, GreyNoise, Feodo Tracker, and ThreatFox per Appendix A.10. Each rule's predicate inspects payload fields produced by the enrich worker (no DB I/O, no decnet.intel.* imports — E.2.7 decoupling guard preserved). AbuseIPDB confidence is scaled by abuse_confidence_score / 100; categories drive per-technique fan-out. R0058 aggregate-bump is a no-op in v0 (cross-tag bump deferred to E.3.14 worker bootstrap). Per-provider null tolerance is the steady state — a missing provider column produces zero tags from that rule, never an error. Tests: - tests/ttp/test_intel_lifter.py — per-provider positive + negative + state modulation + decoupling source-import guard. - tests/ttp/rule_precision/test_intel_rules.py — xfail flipped, real precision driven over seed_intel.jsonl (R0054-R0057 H-band ≥95%; R0058 skipped as bump-only). - tests/ttp/test_lifter_absence.py — IntelLifter all-populated test flipped from xfail-strict to real assertion with realistic payload. - tests/ttp/test_lifters.py — partial-null xfail flipped to real assertion.
112 lines
3.8 KiB
Python
112 lines
3.8 KiB
Python
"""R0054-R0058 — intel verdict cohort.
|
|
|
|
IntelLifter (E.3.10) reads ``AttackerIntel`` provider columns
|
|
(AbuseIPDB, GreyNoise, Feodo, ThreatFox) and emits per the per-
|
|
provider mapping tables in Appendix A.10. Per Appendix B every
|
|
intel rule tolerates absence silently — a null provider column is
|
|
"no tag from this rule", never an error. R0058 is the
|
|
confidence-bump-only meta-rule (no fresh tag emission); the
|
|
lifter inspects rule_id and bumps existing tags.
|
|
|
|
The v0 :class:`RuleEngine` cannot navigate the intel envelope —
|
|
the rules are inert under regex.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Callable
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from decnet.ttp.impl.rule_engine import RuleEngine
|
|
from decnet.ttp.store.base import RuleState
|
|
from decnet.ttp.store.impl.filesystem import _parse_and_compile
|
|
from tests.ttp.rule_precision.conftest import CorpusRow, make_event
|
|
|
|
CohortLoader = Callable[[str], list[CorpusRow]]
|
|
|
|
_RULE_IDS = [f"R{n:04d}" for n in range(54, 59)]
|
|
|
|
|
|
@pytest.mark.parametrize("rule_id", _RULE_IDS)
|
|
def test_rule_yaml_present(rule_id: str) -> None:
|
|
path = Path("rules/ttp") / f"{rule_id}.yaml"
|
|
assert path.exists(), f"missing YAML: {path}"
|
|
compiled = _parse_and_compile(path, RuleState())
|
|
assert compiled.rule_id == rule_id
|
|
|
|
|
|
@pytest.mark.parametrize("rule_id", _RULE_IDS)
|
|
async def test_lifter_bound_inert_in_v0(
|
|
rule_id: str,
|
|
precision_engine: RuleEngine,
|
|
corpus_loader: CohortLoader,
|
|
) -> None:
|
|
fired: set[str] = set()
|
|
for row in corpus_loader("intel"):
|
|
tags = await precision_engine.evaluate(make_event(row))
|
|
fired.update(tag.rule_id for tag in tags)
|
|
assert rule_id not in fired
|
|
|
|
|
|
def test_r0058_is_bump_only() -> None:
|
|
"""R0058's only emit is a zero-confidence sentinel.
|
|
|
|
Per Appendix B the aggregate-malicious rule must not emit a fresh
|
|
tag — it bumps existing rule confidences. The repository drops
|
|
tags below 0.3 confidence, so even if the lifter accidentally
|
|
drove the engine fanout the tag would never persist. This test
|
|
pins that defense-in-depth property: any future edit pushing the
|
|
R0058 emit confidence above 0 would fire here.
|
|
"""
|
|
compiled = _parse_and_compile(
|
|
Path("rules/ttp/R0058.yaml"), RuleState(),
|
|
)
|
|
assert all(emit[3] == 0.0 for emit in compiled.emits), (
|
|
"R0058 must keep all emit confidences at 0.0 (bump-only rule)"
|
|
)
|
|
|
|
|
|
def _build_lifter() -> "IntelLifter":
|
|
from decnet.ttp.impl.intel_lifter import IntelLifter
|
|
from tests.ttp._stub_store import StubRuleStore
|
|
|
|
rules = [
|
|
_parse_and_compile(Path("rules/ttp") / f"{rid}.yaml", RuleState())
|
|
for rid in _RULE_IDS
|
|
]
|
|
lifter = IntelLifter(StubRuleStore(compiled=rules))
|
|
for rule in rules:
|
|
lifter._index.install(rule)
|
|
return lifter
|
|
|
|
|
|
@pytest.mark.parametrize("rule_id", _RULE_IDS)
|
|
def test_intel_rule_precision(
|
|
rule_id: str,
|
|
corpus_loader: CohortLoader,
|
|
) -> None:
|
|
"""E.3.10: drive IntelLifter over the labelled corpus and assert
|
|
per-rule precision. R0058 (bump-only) is excluded — it intentionally
|
|
never emits a tag, so vacuous precision is irrelevant.
|
|
"""
|
|
import asyncio
|
|
|
|
from tests.ttp.rule_precision.conftest import precision_for
|
|
|
|
if rule_id == "R0058":
|
|
pytest.skip("R0058 is bump-only; no precision target")
|
|
rows = corpus_loader("intel")
|
|
if not rows:
|
|
pytest.skip("no intel corpus available")
|
|
lifter = _build_lifter()
|
|
fired: dict[str, list[str]] = {}
|
|
for row in rows:
|
|
tags = asyncio.run(lifter.tag(make_event(row)))
|
|
fired[row.label] = [tag.rule_id for tag in tags]
|
|
precision, _tp, _fp = precision_for(rule_id, rows, fired)
|
|
# R0054/R0055/R0056/R0057 are H-band per Appendix C → ≥95%.
|
|
assert precision >= 0.95, (
|
|
f"{rule_id} precision {precision:.2f} < 0.95 on intel corpus"
|
|
)
|