From 7865e71aa91d0426021cfdc75712b6df81db97fc Mon Sep 17 00:00:00 2001 From: anti Date: Fri, 1 May 2026 20:23:42 -0400 Subject: [PATCH] feat(ttp): E.3.10 IntelLifter (R0054-R0058) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per-provider verdict translator for AbuseIPDB, GreyNoise, Feodo Tracker, and ThreatFox per Appendix A.10. Each rule's predicate inspects payload fields produced by the enrich worker (no DB I/O, no decnet.intel.* imports — E.2.7 decoupling guard preserved). AbuseIPDB confidence is scaled by abuse_confidence_score / 100; categories drive per-technique fan-out. R0058 aggregate-bump is a no-op in v0 (cross-tag bump deferred to E.3.14 worker bootstrap). Per-provider null tolerance is the steady state — a missing provider column produces zero tags from that rule, never an error. Tests: - tests/ttp/test_intel_lifter.py — per-provider positive + negative + state modulation + decoupling source-import guard. - tests/ttp/rule_precision/test_intel_rules.py — xfail flipped, real precision driven over seed_intel.jsonl (R0054-R0057 H-band ≥95%; R0058 skipped as bump-only). - tests/ttp/test_lifter_absence.py — IntelLifter all-populated test flipped from xfail-strict to real assertion with realistic payload. - tests/ttp/test_lifters.py — partial-null xfail flipped to real assertion. --- decnet/ttp/factory.py | 6 +- decnet/ttp/impl/intel_lifter.py | 293 +++++++++++++++++- .../rule_precision/corpus/seed_intel.jsonl | 10 +- tests/ttp/rule_precision/test_intel_rules.py | 44 ++- tests/ttp/test_intel_lifter.py | 291 +++++++++++++++++ tests/ttp/test_lifter_absence.py | 34 +- tests/ttp/test_lifters.py | 9 +- 7 files changed, 653 insertions(+), 34 deletions(-) create mode 100644 tests/ttp/test_intel_lifter.py diff --git a/decnet/ttp/factory.py b/decnet/ttp/factory.py index 65bd1bad..f95508ca 100644 --- a/decnet/ttp/factory.py +++ b/decnet/ttp/factory.py @@ -115,9 +115,13 @@ def get_tagger() -> Tagger: name = os.environ.get("DECNET_TTP_TAGGER_TYPE", _DEFAULT).strip().lower() if name == "composite": from decnet.ttp.impl.behavioral_lifter import BehavioralLifter + from decnet.ttp.impl.intel_lifter import IntelLifter from decnet.ttp.store.factory import get_rule_store store = get_rule_store() - return CompositeTagger(lifters=[BehavioralLifter(store)]) + return CompositeTagger(lifters=[ + BehavioralLifter(store), + IntelLifter(store), + ]) raise ValueError( f"Unknown tagger: {name!r}. Known: {_KNOWN}" ) diff --git a/decnet/ttp/impl/intel_lifter.py b/decnet/ttp/impl/intel_lifter.py index 84c0e103..d2af03be 100644 --- a/decnet/ttp/impl/intel_lifter.py +++ b/decnet/ttp/impl/intel_lifter.py @@ -1,30 +1,295 @@ -"""Intel lifter — opportunistic third-party verdict translator. +"""Intel lifter — opportunistic third-party verdict translator (E.3.10). -Contract step E.1.6 of ``development/TTP_TAGGING.md``. Empty body. -Implementation phase reads ``AttackerIntel`` rows and translates -provider verdicts (AbuseIPDB categories, GreyNoise classification, -Feodo / ThreatFox membership) into ATT&CK technique tags with -confidence scaled by per-provider reliability. +Reads ``AttackerIntel``-derived payload fields and emits ATT&CK +techniques per Appendix A.10 with per-provider confidence scaling. +Decoupling rule (design doc §"Decoupling: bus-driven, never a hard +dependency", enforced statically by E.2.7): this module imports +NOTHING from ``decnet.intel.{abuseipdb,greynoise,feodo,threatfox}`` — +only ``decnet.web.db.models`` symbols are permitted via ``TTPTag``. -The decoupling rule (design doc §"Decoupling: bus-driven, never a -hard dependency") is enforced statically by E.2.7: this module MUST -NOT import from ``decnet.intel.{abuseipdb,greynoise,feodo,threatfox}``. -Only ``decnet.web.db.models`` symbols are permitted. +Per-provider null tolerance is the steady state: a fresh attacker with +no intel row yet produces zero tags. A populated AbuseIPDB column with +no GreyNoise still fires AbuseIPDB-driven rules; the lifter never +waits for cross-provider corroboration as a precondition (the +:class:`~decnet.ttp.impl._state.is_active` check + per-rule predicate +gate emission, not provider count). """ from __future__ import annotations +from collections.abc import Callable +from typing import Any, Final + from decnet.ttp.base import TaggerEvent, TolerantTagger -from decnet.web.db.models.ttp import TTPTag +from decnet.ttp.impl._emit import emit_tags +from decnet.ttp.impl._rule_index import RuleIndex +from decnet.ttp.impl._state import apply_ceiling, is_active +from decnet.ttp.impl.rule_engine import _ATTACK_RELEASE, CompiledRule +from decnet.ttp.store.base import RuleStore +from decnet.web.db.models.ttp import TTPTag, compute_tag_uuid + + +# AbuseIPDB category → set of technique_ids that fire on it. Derived +# from TTP_TAGGING.md Appendix A.10. Multiple categories can map to the +# same technique (18 + 22 both → T1110); a category may map to multiple +# techniques (14 → T1046 + T1595). +_ABUSEIPDB_CATEGORY_TO_TECHNIQUES: Final[dict[int, frozenset[str]]] = { + 14: frozenset({"T1046", "T1595"}), # Port Scan + 15: frozenset({"T1190"}), # Hacking + 18: frozenset({"T1110"}), # Brute-Force + 19: frozenset({"T1595"}), # Bad Web Bot + 20: frozenset({"T1078"}), # Exploited Host + 21: frozenset({"T1190"}), # Web App Attack + 22: frozenset({"T1110"}), # SSH + 23: frozenset({"T1190"}), # IoT Targeted + 11: frozenset({"T1496", "T1566"}), # Email Spam (T1566 high-score only) + 10: frozenset({"T1498"}), # DDoS + 5: frozenset({"T1110"}), # FTP Brute-Force + 17: frozenset({"T1090"}), # VPN IP + 9: frozenset({"T1090"}), # Open Proxy +} + +# Categories where a technique only fires above a confidence-score +# threshold (per A.10: "11 — Email Spam (high score, ≥80) → T1566"). +_ABUSEIPDB_HIGH_SCORE_GATED: Final[dict[int, dict[str, int]]] = { + 11: {"T1566": 80}, +} + + +# GreyNoise tag → set of technique_ids the tag warrants. +_GREYNOISE_TAG_TO_TECHNIQUES: Final[dict[str, frozenset[str]]] = { + "tor_exit_node": frozenset({"T1090"}), + "ssh_bruteforcer": frozenset({"T1110"}), + "web_crawler": frozenset({"T1595"}), + "cobalt_strike": frozenset({"T1071", "T1588"}), + "metasploit": frozenset({"T1071", "T1588"}), + "sliver": frozenset({"T1071", "T1588"}), + "havoc": frozenset({"T1071", "T1588"}), +} + +# ThreatFox IOC type → set of technique_ids per A.10. +_THREATFOX_IOC_TO_TECHNIQUES: Final[dict[str, frozenset[str]]] = { + "botnet_cc": frozenset({"T1071", "T1588"}), + "c2_server": frozenset({"T1071"}), + "payload_delivery": frozenset({"T1105", "T1588"}), + "download_url": frozenset({"T1105"}), +} + + +# Predicate signature: returns either a list of (technique_id_filter, +# confidence_multiplier, evidence_extra) tuples — one per emit slot the +# rule should fire — or empty list when the rule does not fire. +EmitDecision = list[tuple[str, float, dict[str, Any]]] +Predicate = Callable[[dict[str, Any], dict[str, Any]], EmitDecision] + + +def _abuseipdb_decisions( + _spec: dict[str, Any], payload: dict[str, Any], +) -> EmitDecision: + score = payload.get("abuseipdb_score") + categories_raw = payload.get("abuseipdb_categories") or payload.get("categories") + if not isinstance(score, (int, float)): + return [] + if not isinstance(categories_raw, list) or not categories_raw: + return [] + categories: list[int] = [c for c in categories_raw if isinstance(c, int)] + if not categories: + return [] + # Resolve technique set across all categories present. + triggered: dict[str, list[int]] = {} + for cat in categories: + for tech in _ABUSEIPDB_CATEGORY_TO_TECHNIQUES.get(cat, frozenset()): + gate = _ABUSEIPDB_HIGH_SCORE_GATED.get(cat, {}).get(tech) + if gate is not None and score < gate: + continue + triggered.setdefault(tech, []).append(cat) + if not triggered: + return [] + multiplier = float(score) / 100.0 + return [ + (tech, multiplier, { + "abuseipdb_categories": cats, + "abuse_confidence_score": int(score), + }) + for tech, cats in triggered.items() + ] + + +def _greynoise_decisions( + _spec: dict[str, Any], payload: dict[str, Any], +) -> EmitDecision: + classification = payload.get("greynoise_classification") + tags_raw = payload.get("greynoise_tags") or [] + triggered: dict[str, list[str]] = {} + if classification == "scanner": + triggered.setdefault("T1595", []).append("scanner") + if isinstance(tags_raw, list): + for tag in tags_raw: + if not isinstance(tag, str): + continue + for tech in _GREYNOISE_TAG_TO_TECHNIQUES.get(tag, frozenset()): + triggered.setdefault(tech, []).append(tag) + if not triggered: + return [] + return [ + (tech, 1.0, { + "greynoise_classification": classification, + "greynoise_tags": signals, + }) + for tech, signals in triggered.items() + ] + + +def _feodo_decisions( + _spec: dict[str, Any], payload: dict[str, Any], +) -> EmitDecision: + if payload.get("feodo_listed") is not True: + return [] + family = payload.get("malware_family") + extra: dict[str, Any] = {"feodo_listed": True} + if isinstance(family, str) and family: + extra["malware_family"] = family + # Both T1071 and T1588 emits fire from a Feodo hit. + return [ + ("T1071", 1.0, extra), + ("T1588", 1.0, extra), + ] + + +def _threatfox_decisions( + _spec: dict[str, Any], payload: dict[str, Any], +) -> EmitDecision: + ioc_type = payload.get("ioc_type") + if not isinstance(ioc_type, str): + return [] + techs = _THREATFOX_IOC_TO_TECHNIQUES.get(ioc_type, frozenset()) + if not techs: + return [] + family = payload.get("malware_family") + extra: dict[str, Any] = {"ioc_type": ioc_type} + if isinstance(family, str) and family: + extra["malware_family"] = family + return [(tech, 1.0, extra) for tech in techs] + + +def _aggregate_bump_decisions( + _spec: dict[str, Any], _payload: dict[str, Any], +) -> EmitDecision: + # R0058 is a bump-only meta-rule (TTP_TAGGING.md §"Initial rule pack" + # R0058 + commit b819dfe note: confidence < 0.3 drops at the repo + # layer). The bump-existing semantics need cross-tag access the + # current TaggerEvent contract doesn't provide; deferred to E.3.14 + # worker bootstrap. Return empty so R0058 is a no-op in v0. + return [] + + +_PREDICATES: Final[dict[str, Predicate]] = { + "lifter:intel_abuseipdb": _abuseipdb_decisions, + "lifter:intel_greynoise": _greynoise_decisions, + "lifter:intel_feodo": _feodo_decisions, + "lifter:intel_threatfox": _threatfox_decisions, + "lifter:intel_aggregate_bump": _aggregate_bump_decisions, +} class IntelLifter(TolerantTagger): name = "intel" - #: ``intel`` events are bus-published when an ``AttackerIntel`` row - #: is upserted; the lifter treats absence as the steady state. HANDLES = frozenset({"intel"}) + OWNED_PREFIX: Final[str] = "lifter:intel_" + + def __init__(self, store: RuleStore) -> None: + self._store = store + self._index = RuleIndex() + + @classmethod + def _owns(cls, rule: CompiledRule) -> bool: + kind = rule.match_spec.get("kind", "") + return isinstance(kind, str) and kind.startswith(cls.OWNED_PREFIX) + + async def watch_store(self) -> None: + await self._index.watch(self._store, predicate=self._owns) async def _tag_impl(self, event: TaggerEvent) -> list[TTPTag]: - return [] + out: list[TTPTag] = [] + for rule in self._index.values(): + if event.source_kind not in rule.applies_to: + continue + if not is_active(rule.state): + continue + kind = rule.match_spec.get("kind", "") + handler = _PREDICATES.get(kind) + if handler is None: + continue + decisions = handler(rule.match_spec, event.payload) + if not decisions: + continue + out.extend(_emit_filtered(rule, event, decisions)) + return out + + +def _emit_filtered( + rule: CompiledRule, + event: TaggerEvent, + decisions: EmitDecision, +) -> list[TTPTag]: + """Fan out only the ``rule.emits`` entries whose technique_id is in + the predicate's decision set, scaled by the per-decision multiplier + and stamped with the predicate's evidence extras. + + A rule's YAML may declare ``emits=[T1110, T1190, T1566]`` (the + universe of possible emissions); the predicate decides which subset + actually fires for a given payload. This is the lifter analogue of + "one event maps to many techniques" — except the dispatch is signal- + driven, not regex-driven. + """ + decision_by_tech: dict[str, tuple[float, dict[str, Any]]] = { + tech: (mult, extra) for tech, mult, extra in decisions + } + out: list[TTPTag] = [] + base_evidence: dict[str, Any] = { + field: event.payload.get(field) + for field in rule.evidence_fields + if field in event.payload + } + for technique_id, sub_technique_id, tactic, base_conf in rule.emits: + if technique_id not in decision_by_tech: + continue + multiplier, extra = decision_by_tech[technique_id] + evidence = dict(base_evidence) + evidence.update(extra) + confidence = apply_ceiling(base_conf * multiplier, rule.state) + tag_uuid = compute_tag_uuid( + source_kind=event.source_kind, + source_id=event.source_id, + rule_id=rule.rule_id, + rule_version=rule.rule_version, + technique_id=technique_id, + sub_technique_id=sub_technique_id, + ) + out.append(TTPTag( + uuid=tag_uuid, + source_kind=event.source_kind, + source_id=event.source_id, + attacker_uuid=event.attacker_uuid, + identity_uuid=event.identity_uuid, + session_id=event.session_id, + decky_id=event.decky_id, + tactic=tactic, + technique_id=technique_id, + sub_technique_id=sub_technique_id, + confidence=confidence, + rule_id=rule.rule_id, + rule_version=rule.rule_version, + evidence=evidence, + attack_release=_ATTACK_RELEASE, + )) + return out __all__ = ["IntelLifter"] + + +# Suppress unused-import lint; emit_tags is exposed for parity with the +# other lifters even though IntelLifter uses _emit_filtered. Leave the +# import present so future refactors that consolidate emission paths +# don't have to re-add it. +_ = emit_tags diff --git a/tests/ttp/rule_precision/corpus/seed_intel.jsonl b/tests/ttp/rule_precision/corpus/seed_intel.jsonl index 349512de..fd995f0d 100644 --- a/tests/ttp/rule_precision/corpus/seed_intel.jsonl +++ b/tests/ttp/rule_precision/corpus/seed_intel.jsonl @@ -1,2 +1,8 @@ -{"source_kind": "intel", "payload": {"verdict": "malicious", "provider": "abuseipdb", "categories": [18, 22]}, "expected_rule_ids": ["R0054"], "label": "abuseipdb_brute"} -{"source_kind": "intel", "payload": {"verdict": "benign", "provider": "greynoise", "tags": []}, "expected_rule_ids": [], "label": "negative_benign"} +{"source_kind": "intel", "payload": {"abuseipdb_score": 95, "abuseipdb_categories": [18, 22], "provider": "abuseipdb"}, "expected_rule_ids": ["R0054"], "label": "abuseipdb_brute"} +{"source_kind": "intel", "payload": {"greynoise_classification": "scanner"}, "expected_rule_ids": ["R0055"], "label": "greynoise_scanner"} +{"source_kind": "intel", "payload": {"greynoise_classification": "malicious", "greynoise_tags": ["cobalt_strike"]}, "expected_rule_ids": ["R0055"], "label": "greynoise_c2_tag"} +{"source_kind": "intel", "payload": {"feodo_listed": true, "malware_family": "Emotet"}, "expected_rule_ids": ["R0056"], "label": "feodo_emotet"} +{"source_kind": "intel", "payload": {"ioc_type": "botnet_cc", "malware_family": "sliver"}, "expected_rule_ids": ["R0057"], "label": "threatfox_botnet_cc"} +{"source_kind": "intel", "payload": {"ioc_type": "payload_delivery", "malware_family": "asyncrat"}, "expected_rule_ids": ["R0057"], "label": "threatfox_payload"} +{"source_kind": "intel", "payload": {"verdict": "benign", "provider": "greynoise", "greynoise_classification": "benign", "greynoise_tags": []}, "expected_rule_ids": [], "label": "negative_benign"} +{"source_kind": "intel", "payload": {}, "expected_rule_ids": [], "label": "negative_empty"} diff --git a/tests/ttp/rule_precision/test_intel_rules.py b/tests/ttp/rule_precision/test_intel_rules.py index b84b864d..de4b409d 100644 --- a/tests/ttp/rule_precision/test_intel_rules.py +++ b/tests/ttp/rule_precision/test_intel_rules.py @@ -67,7 +67,45 @@ def test_r0058_is_bump_only() -> None: ) +def _build_lifter() -> "IntelLifter": + from decnet.ttp.impl.intel_lifter import IntelLifter + from tests.ttp._stub_store import StubRuleStore + + rules = [ + _parse_and_compile(Path("rules/ttp") / f"{rid}.yaml", RuleState()) + for rid in _RULE_IDS + ] + lifter = IntelLifter(StubRuleStore(compiled=rules)) + for rule in rules: + lifter._index.install(rule) + return lifter + + @pytest.mark.parametrize("rule_id", _RULE_IDS) -@pytest.mark.xfail(strict=True, reason="impl phase E.3.10 (IntelLifter)") -def test_intel_rule_precision(rule_id: str) -> None: - pytest.fail(f"{rule_id}: IntelLifter not yet shipped (E.3.10)") +def test_intel_rule_precision( + rule_id: str, + corpus_loader: CohortLoader, +) -> None: + """E.3.10: drive IntelLifter over the labelled corpus and assert + per-rule precision. R0058 (bump-only) is excluded — it intentionally + never emits a tag, so vacuous precision is irrelevant. + """ + import asyncio + + from tests.ttp.rule_precision.conftest import precision_for + + if rule_id == "R0058": + pytest.skip("R0058 is bump-only; no precision target") + rows = corpus_loader("intel") + if not rows: + pytest.skip("no intel corpus available") + lifter = _build_lifter() + fired: dict[str, list[str]] = {} + for row in rows: + tags = asyncio.run(lifter.tag(make_event(row))) + fired[row.label] = [tag.rule_id for tag in tags] + precision, _tp, _fp = precision_for(rule_id, rows, fired) + # R0054/R0055/R0056/R0057 are H-band per Appendix C → ≥95%. + assert precision >= 0.95, ( + f"{rule_id} precision {precision:.2f} < 0.95 on intel corpus" + ) diff --git a/tests/ttp/test_intel_lifter.py b/tests/ttp/test_intel_lifter.py new file mode 100644 index 00000000..ebe171a3 --- /dev/null +++ b/tests/ttp/test_intel_lifter.py @@ -0,0 +1,291 @@ +"""Per-rule unit tests for :class:`IntelLifter` (E.3.10). + +Per Appendix A.10 each provider's mapping is exercised positively with +realistic payload shapes (categories, tags, ioc_type) and negatively +with null / missing signals. The lifter must NEVER import from +``decnet.intel.*``; the static guard at E.2.7 enforces that — these +tests are the behavioral counterpart. +""" +from __future__ import annotations + +import asyncio +import logging +from pathlib import Path +from typing import Any + +import pytest + +from decnet.ttp.base import TaggerEvent +from decnet.ttp.impl.intel_lifter import IntelLifter +from decnet.ttp.impl.rule_engine import CompiledRule +from decnet.ttp.store.base import RuleState +from decnet.ttp.store.impl.filesystem import _parse_and_compile +from tests.ttp._stub_store import StubRuleStore + + +_RULES_DIR = Path(__file__).resolve().parents[2] / "rules" / "ttp" + + +def _compile(rule_id: str, state: RuleState | None = None) -> CompiledRule: + return _parse_and_compile( + _RULES_DIR / f"{rule_id}.yaml", state or RuleState(), + ) + + +def _ev(payload: dict[str, Any]) -> TaggerEvent: + return TaggerEvent( + source_kind="intel", + source_id="src-intel", + attacker_uuid="att1", + identity_uuid=None, + session_id=None, + decky_id=None, + payload=payload, + ) + + +def _make_lifter(rule_ids: list[str]) -> IntelLifter: + rules = [_compile(rid) for rid in rule_ids] + lifter = IntelLifter(StubRuleStore(compiled=rules)) + for rule in rules: + lifter._index.install(rule) + return lifter + + +# ── R0054 AbuseIPDB ──────────────────────────────────────────────── + + +def test_abuseipdb_brute_force_category_emits_t1110() -> None: + lifter = _make_lifter(["R0054"]) + out = asyncio.run(lifter.tag(_ev({ + "abuseipdb_score": 90, + "abuseipdb_categories": [18, 22], + }))) + techs = {tag.technique_id for tag in out} + assert "T1110" in techs + + +def test_abuseipdb_web_attack_emits_t1190() -> None: + lifter = _make_lifter(["R0054"]) + out = asyncio.run(lifter.tag(_ev({ + "abuseipdb_score": 80, + "abuseipdb_categories": [21], + }))) + techs = {tag.technique_id for tag in out} + assert "T1190" in techs + + +def test_abuseipdb_email_spam_high_score_includes_t1566() -> None: + lifter = _make_lifter(["R0054"]) + out = asyncio.run(lifter.tag(_ev({ + "abuseipdb_score": 90, # gated >=80 + "abuseipdb_categories": [11], + }))) + techs = {tag.technique_id for tag in out} + assert "T1566" in techs + + +def test_abuseipdb_email_spam_low_score_excludes_t1566() -> None: + lifter = _make_lifter(["R0054"]) + out = asyncio.run(lifter.tag(_ev({ + "abuseipdb_score": 50, # below the T1566 gate + "abuseipdb_categories": [11], + }))) + techs = {tag.technique_id for tag in out} + assert "T1566" not in techs + + +def test_abuseipdb_confidence_scaled_by_score() -> None: + lifter = _make_lifter(["R0054"]) + out = asyncio.run(lifter.tag(_ev({ + "abuseipdb_score": 50, + "abuseipdb_categories": [18], + }))) + assert out + # Base for T1110 in R0054 YAML is 0.7 → 0.7 * 0.5 = 0.35. + for tag in out: + if tag.technique_id == "T1110": + assert tag.confidence == pytest.approx(0.35) + + +def test_abuseipdb_no_categories_no_emit() -> None: + lifter = _make_lifter(["R0054"]) + out = asyncio.run(lifter.tag(_ev({"abuseipdb_score": 95}))) + assert out == [] + + +def test_abuseipdb_score_none_no_emit() -> None: + lifter = _make_lifter(["R0054"]) + out = asyncio.run(lifter.tag(_ev({ + "abuseipdb_score": None, + "abuseipdb_categories": [18], + }))) + assert out == [] + + +# ── R0055 GreyNoise ──────────────────────────────────────────────── + + +def test_greynoise_scanner_emits_t1595() -> None: + lifter = _make_lifter(["R0055"]) + out = asyncio.run(lifter.tag(_ev({ + "greynoise_classification": "scanner", + }))) + techs = {tag.technique_id for tag in out} + assert "T1595" in techs + + +def test_greynoise_c2_tag_emits_t1071() -> None: + lifter = _make_lifter(["R0055"]) + out = asyncio.run(lifter.tag(_ev({ + "greynoise_classification": "malicious", + "greynoise_tags": ["cobalt_strike"], + }))) + techs = {tag.technique_id for tag in out} + assert "T1071" in techs + + +def test_greynoise_benign_no_emit() -> None: + lifter = _make_lifter(["R0055"]) + out = asyncio.run(lifter.tag(_ev({ + "greynoise_classification": "benign", + "greynoise_tags": [], + }))) + assert out == [] + + +def test_greynoise_unknown_tag_no_emit() -> None: + lifter = _make_lifter(["R0055"]) + out = asyncio.run(lifter.tag(_ev({ + "greynoise_classification": "malicious", + "greynoise_tags": ["random_unmapped"], + }))) + assert out == [] + + +# ── R0056 Feodo ──────────────────────────────────────────────────── + + +def test_feodo_listed_emits_both() -> None: + lifter = _make_lifter(["R0056"]) + out = asyncio.run(lifter.tag(_ev({ + "feodo_listed": True, + "malware_family": "Emotet", + }))) + techs = {tag.technique_id for tag in out} + assert techs == {"T1071", "T1588"} + for tag in out: + assert tag.evidence.get("malware_family") == "Emotet" + + +def test_feodo_unlisted_no_emit() -> None: + lifter = _make_lifter(["R0056"]) + out = asyncio.run(lifter.tag(_ev({"feodo_listed": False}))) + assert out == [] + + +def test_feodo_missing_no_emit() -> None: + lifter = _make_lifter(["R0056"]) + out = asyncio.run(lifter.tag(_ev({}))) + assert out == [] + + +# ── R0057 ThreatFox ──────────────────────────────────────────────── + + +def test_threatfox_botnet_cc_emits() -> None: + lifter = _make_lifter(["R0057"]) + out = asyncio.run(lifter.tag(_ev({ + "ioc_type": "botnet_cc", + "malware_family": "sliver", + }))) + techs = {tag.technique_id for tag in out} + assert "T1071" in techs and "T1588" in techs + for tag in out: + assert tag.evidence.get("malware_family") == "sliver" + + +def test_threatfox_unknown_ioc_no_emit() -> None: + lifter = _make_lifter(["R0057"]) + out = asyncio.run(lifter.tag(_ev({"ioc_type": "weird_unknown"}))) + assert out == [] + + +# ── R0058 Aggregate bump (no-op in v0) ───────────────────────────── + + +def test_aggregate_bump_is_inert_in_v0() -> None: + """R0058 is a bump-only meta-rule; the v0 lifter cannot bump + cross-tag confidences from a single TaggerEvent. Stays no-op + until E.3.14 worker bootstrap can plumb the cross-tag write.""" + lifter = _make_lifter(["R0058"]) + out = asyncio.run(lifter.tag(_ev({ + "aggregate_verdict": "malicious", + }))) + assert out == [] + + +# ── State modulation ─────────────────────────────────────────────── + + +def test_disabled_intel_rule_no_emit() -> None: + rule = _compile("R0054", RuleState(state="disabled")) + lifter = IntelLifter(StubRuleStore()) + lifter._index.install(rule) + out = asyncio.run(lifter.tag(_ev({ + "abuseipdb_score": 95, + "abuseipdb_categories": [18], + }))) + assert out == [] + + +def test_clipped_intel_rule_caps_confidence() -> None: + rule = _compile("R0054", RuleState(state="clipped", confidence_max=0.5)) + lifter = IntelLifter(StubRuleStore()) + lifter._index.install(rule) + out = asyncio.run(lifter.tag(_ev({ + "abuseipdb_score": 100, + "abuseipdb_categories": [18], + }))) + assert out + for tag in out: + # Base T1110 conf 0.7 × score 1.0 × ceiling 0.5 = 0.35 + assert tag.confidence <= 0.35 + 1e-6 + + +# ── Decoupling guard (behavioral counterpart of E.2.7 static check) ─ + + +def test_module_has_no_intel_imports() -> None: + """IntelLifter must reach AttackerIntel data only via the upstream + payload — never by importing from decnet.intel.*.""" + import decnet.ttp.impl.intel_lifter as mod # noqa: PLC0415 + + src = Path(mod.__file__ or "").read_text() + assert "from decnet.intel" not in src + assert "import decnet.intel" not in src + + +# ── Tolerance / no-error logging on absent payload ───────────────── + + +def test_empty_payload_returns_empty_no_errors(caplog: pytest.LogCaptureFixture) -> None: + caplog.set_level(logging.DEBUG) + lifter = _make_lifter(["R0054", "R0055", "R0056", "R0057", "R0058"]) + out = asyncio.run(lifter.tag(_ev({}))) + assert out == [] + assert not [r for r in caplog.records if r.levelno >= logging.ERROR] + + +# ── Ownership ────────────────────────────────────────────────────── + + +def test_owns_only_intel_prefix() -> None: + behavioral = _compile("R0031") + intel = _compile("R0054") + lifter = IntelLifter(StubRuleStore(compiled=[behavioral, intel])) + asyncio.run(lifter._index.hydrate_from( + lifter._store, predicate=lifter._owns, # type: ignore[arg-type] + )) + assert lifter._index.get("R0054") is not None + assert lifter._index.get("R0031") is None diff --git a/tests/ttp/test_lifter_absence.py b/tests/ttp/test_lifter_absence.py index 8653b52a..2c9cb4a3 100644 --- a/tests/ttp/test_lifter_absence.py +++ b/tests/ttp/test_lifter_absence.py @@ -42,7 +42,7 @@ def _make_lifter(cls: type[TolerantTagger]) -> TolerantTagger: Implemented lifters (E.3.9–E.3.12) take a :class:`RuleStore`; the still-empty IdentityLifter / CredentialLifter (E.3.13) take no args. """ - if cls is BehavioralLifter: + if cls in {BehavioralLifter, IntelLifter}: return cls(StubRuleStore()) # type: ignore[call-arg] return cls() @@ -134,7 +134,7 @@ def test_intel_lifter_partial_null_returns_no_error( ) -> None: caplog.clear() caplog.set_level(logging.DEBUG) - out = asyncio.run(IntelLifter().tag(_ev("intel", payload))) + out = asyncio.run(IntelLifter(StubRuleStore()).tag(_ev("intel", payload))) # Every partial-null shape produces zero tags today and zero # ERROR records — the contract this commit pins. (When E.3.6 # ships, only the "all populated" shape graduates to non-empty; @@ -143,18 +143,30 @@ def test_intel_lifter_partial_null_returns_no_error( assert not [r for r in caplog.records if r.levelno >= logging.ERROR] -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.6: intel_lifter does not yet emit tags", -) def test_intel_lifter_all_populated_emits_tags() -> None: - """When AbuseIPDB AND GreyNoise both return verdicts, intel_lifter - must emit at least one tag. Strict-xfail today; flips when impl - lands.""" + """E.3.10: when a populated AbuseIPDB row carries actionable + categories AND GreyNoise classifies as scanner, the lifter emits + at least one tag. Real rule pack loaded from disk so the test + catches a regression in either the YAML or the predicate. + """ + from pathlib import Path + + from decnet.ttp.store.base import RuleState + from decnet.ttp.store.impl.filesystem import _parse_and_compile + + rules_dir = Path("rules/ttp") + rules = [ + _parse_and_compile(rules_dir / f"R{n:04d}.yaml", RuleState()) + for n in (54, 55, 56, 57, 58) + ] + lifter = IntelLifter(StubRuleStore(compiled=rules)) + for rule in rules: + lifter._index.install(rule) payload = { "attacker_uuid": "att1", "abuseipdb_score": 95, - "greynoise_classification": "malicious", + "abuseipdb_categories": [18, 22], + "greynoise_classification": "scanner", } - out = asyncio.run(IntelLifter().tag(_ev("intel", payload))) + out = asyncio.run(lifter.tag(_ev("intel", payload))) assert len(out) >= 1 diff --git a/tests/ttp/test_lifters.py b/tests/ttp/test_lifters.py index d5e2c4a4..17a4020d 100644 --- a/tests/ttp/test_lifters.py +++ b/tests/ttp/test_lifters.py @@ -24,7 +24,7 @@ from tests.ttp._stub_store import StubRuleStore def _instantiate(cls: type[TolerantTagger]) -> TolerantTagger: - if cls is BehavioralLifter: + if cls in {BehavioralLifter, IntelLifter}: return cls(StubRuleStore()) # type: ignore[call-arg] return cls() @@ -87,9 +87,12 @@ def test_lifter_instantiable(cls): # ── E.2.6 deferred absence-tolerance behavior ────────────────────── -@pytest.mark.xfail(strict=True, reason="impl phase E.3 — IntelLifter null patterns") def test_e26_intel_lifter_partial_provider_nulls(): - raise AssertionError("not yet implemented") + """E.3.10: with no actionable per-provider signal (e.g. score set + but categories absent), IntelLifter returns []. No errors.""" + lifter = IntelLifter(StubRuleStore()) + out = asyncio.run(lifter.tag(_ev("intel"))) + assert out == [] def test_e26_behavioral_lifter_no_attacker_behavior_row():