diff --git a/decnet/ttp/attack_catalog.py b/decnet/ttp/attack_catalog.py index c95788c2..5a2c4dd0 100644 --- a/decnet/ttp/attack_catalog.py +++ b/decnet/ttp/attack_catalog.py @@ -35,6 +35,7 @@ TECHNIQUE_NAMES: Final[dict[str, str]] = { "T1046": "Network Service Discovery", "T1049": "System Network Connections Discovery", "T1053": "Scheduled Task/Job", + "T1056": "Input Capture", "T1059": "Command and Scripting Interpreter", "T1070": "Indicator Removal", "T1071": "Application Layer Protocol", diff --git a/decnet/ttp/impl/intel_lifter.py b/decnet/ttp/impl/intel_lifter.py index d2af03be..f8d347fc 100644 --- a/decnet/ttp/impl/intel_lifter.py +++ b/decnet/ttp/impl/intel_lifter.py @@ -29,23 +29,29 @@ from decnet.web.db.models.ttp import TTPTag, compute_tag_uuid # AbuseIPDB category → set of technique_ids that fire on it. Derived -# from TTP_TAGGING.md Appendix A.10. Multiple categories can map to the -# same technique (18 + 22 both → T1110); a category may map to multiple -# techniques (14 → T1046 + T1595). +# from TTP_TAGGING.md Appendix A.10 (post 2026-05-02 ship-time audit). +# Category code names are AbuseIPDB's canonical taxonomy at +# https://www.abuseipdb.com/categories — kept verbatim in the comment so +# the next quarterly drift check (per DEBT.md) can diff cheaply. Cat 4 +# (DDoS Attack) and 10 (Web Spam) and 12 (Blog Spam) are intentionally +# unmapped — design doc §A.10 marks DDoS-without-protocol as too muddy +# for v0, and CMS spam has no clean ATT&CK fit at the IP layer. _ABUSEIPDB_CATEGORY_TO_TECHNIQUES: Final[dict[int, frozenset[str]]] = { - 14: frozenset({"T1046", "T1595"}), # Port Scan + 5: frozenset({"T1110"}), # FTP Brute-Force + 7: frozenset({"T1566"}), # Phishing + 9: frozenset({"T1090"}), # Open Proxy + 11: frozenset({"T1496", "T1566"}), # Email Spam (T1566 high-score only) + 13: frozenset({"T1090"}), # VPN IP + 14: frozenset({"T1046", "T1595"}), # Port Scan 15: frozenset({"T1190"}), # Hacking + 16: frozenset({"T1190"}), # SQL Injection + 17: frozenset({"T1566"}), # Spoofing (email-sender) 18: frozenset({"T1110"}), # Brute-Force 19: frozenset({"T1595"}), # Bad Web Bot 20: frozenset({"T1078"}), # Exploited Host 21: frozenset({"T1190"}), # Web App Attack 22: frozenset({"T1110"}), # SSH 23: frozenset({"T1190"}), # IoT Targeted - 11: frozenset({"T1496", "T1566"}), # Email Spam (T1566 high-score only) - 10: frozenset({"T1498"}), # DDoS - 5: frozenset({"T1110"}), # FTP Brute-Force - 17: frozenset({"T1090"}), # VPN IP - 9: frozenset({"T1090"}), # Open Proxy } # Categories where a technique only fires above a confidence-score @@ -55,7 +61,10 @@ _ABUSEIPDB_HIGH_SCORE_GATED: Final[dict[int, dict[str, int]]] = { } -# GreyNoise tag → set of technique_ids the tag warrants. +# GreyNoise tag → set of technique_ids the tag warrants. Note: the +# Community endpoint does not return tags today — these fire only when +# operators wire a non-Community provider that does. Kept canonical so +# the upgrade path is just a column populate, not a code change. _GREYNOISE_TAG_TO_TECHNIQUES: Final[dict[str, frozenset[str]]] = { "tor_exit_node": frozenset({"T1090"}), "ssh_bruteforcer": frozenset({"T1110"}), @@ -66,12 +75,22 @@ _GREYNOISE_TAG_TO_TECHNIQUES: Final[dict[str, frozenset[str]]] = { "havoc": frozenset({"T1071", "T1588"}), } -# ThreatFox IOC type → set of technique_ids per A.10. -_THREATFOX_IOC_TO_TECHNIQUES: Final[dict[str, frozenset[str]]] = { +# Confidence multiplier when GreyNoise reports ``classification == +# "malicious"`` without a specific tag we recognise. The bare +# classification is real signal but weaker than a tag — half-confidence +# keeps the floor honest. +_GREYNOISE_MALICIOUS_BARE_MULT: Final[float] = 0.5 + +# ThreatFox THREAT TYPE (NOT ioc_type — that was the v1 ship-time bug) +# → set of technique_ids. Per ThreatFox's API the canonical taxonomy +# field is ``threat_type`` ∈ {botnet_cc, payload_delivery, payload, +# cc_skimming}; ``ioc_type`` is the indicator format (url, domain, +# md5_hash, …) and carries no ATT&CK signal. +_THREATFOX_THREAT_TYPE_TO_TECHNIQUES: Final[dict[str, frozenset[str]]] = { "botnet_cc": frozenset({"T1071", "T1588"}), - "c2_server": frozenset({"T1071"}), "payload_delivery": frozenset({"T1105", "T1588"}), - "download_url": frozenset({"T1105"}), + "payload": frozenset({"T1588"}), + "cc_skimming": frozenset({"T1056"}), } @@ -117,25 +136,54 @@ def _abuseipdb_decisions( def _greynoise_decisions( _spec: dict[str, Any], payload: dict[str, Any], ) -> EmitDecision: + """Decide GreyNoise emissions. + + Three signal lanes: + * ``classification == "scanner"`` — full-strength T1595 (kept for + compatibility with non-Community provider plans that surface + this verdict; the Community endpoint reports {malicious, benign, + suspicious, unknown} only). + * Specific recognised tag → its mapped technique(s) at 1.0×. + * Bare ``classification == "malicious"`` with no recognised tag → + T1071 at half multiplier (post-audit decision: the verdict is + real but unspecific). The bare-malicious lane is suppressed when + a tag already fired on T1071 to avoid double-stamping. + """ classification = payload.get("greynoise_classification") tags_raw = payload.get("greynoise_tags") or [] - triggered: dict[str, list[str]] = {} + # Per-technique evidence accumulator — maps technique_id to the + # signals that triggered it AND the multiplier to apply (max wins + # if multiple lanes hit the same technique). + triggered: dict[str, tuple[float, list[str]]] = {} + + def _bump(tech: str, mult: float, signal: str) -> None: + existing = triggered.get(tech) + if existing is None: + triggered[tech] = (mult, [signal]) + return + old_mult, signals = existing + signals.append(signal) + if mult > old_mult: + triggered[tech] = (mult, signals) + if classification == "scanner": - triggered.setdefault("T1595", []).append("scanner") + _bump("T1595", 1.0, "scanner") if isinstance(tags_raw, list): for tag in tags_raw: if not isinstance(tag, str): continue for tech in _GREYNOISE_TAG_TO_TECHNIQUES.get(tag, frozenset()): - triggered.setdefault(tech, []).append(tag) + _bump(tech, 1.0, tag) + if classification == "malicious" and "T1071" not in triggered: + _bump("T1071", _GREYNOISE_MALICIOUS_BARE_MULT, "malicious") if not triggered: return [] return [ - (tech, 1.0, { + (tech, mult, { "greynoise_classification": classification, "greynoise_tags": signals, }) - for tech, signals in triggered.items() + for tech, (mult, signals) in triggered.items() ] @@ -144,7 +192,10 @@ def _feodo_decisions( ) -> EmitDecision: if payload.get("feodo_listed") is not True: return [] - family = payload.get("malware_family") + family = ( + payload.get("feodo_malware_family") + or payload.get("malware_family") + ) extra: dict[str, Any] = {"feodo_listed": True} if isinstance(family, str) and family: extra["malware_family"] = family @@ -158,17 +209,55 @@ def _feodo_decisions( def _threatfox_decisions( _spec: dict[str, Any], payload: dict[str, Any], ) -> EmitDecision: - ioc_type = payload.get("ioc_type") - if not isinstance(ioc_type, str): + """ThreatFox dispatch keys on ``threat_type`` (canonical taxonomy) + not ``ioc_type`` — the v1 ship-time mapping had it backwards. + + Accepts either ``threatfox_threat_types`` (list, preferred — comes + from the bus payload built by the intel worker) or a singular + ``threat_type``/``ioc_type`` field for legacy callers and tests. + The lifter is tolerant by contract; missing inputs produce zero + emissions, never an error. + """ + threat_types_raw = ( + payload.get("threatfox_threat_types") + or payload.get("threat_type") + ) + threat_types: list[str] = [] + if isinstance(threat_types_raw, list): + threat_types = [t for t in threat_types_raw if isinstance(t, str)] + elif isinstance(threat_types_raw, str) and threat_types_raw: + threat_types = [threat_types_raw] + + triggered: dict[str, list[str]] = {} + for tt in threat_types: + for tech in _THREATFOX_THREAT_TYPE_TO_TECHNIQUES.get(tt, frozenset()): + triggered.setdefault(tech, []).append(tt) + if not triggered: return [] - techs = _THREATFOX_IOC_TO_TECHNIQUES.get(ioc_type, frozenset()) - if not techs: - return [] - family = payload.get("malware_family") - extra: dict[str, Any] = {"ioc_type": ioc_type} - if isinstance(family, str) and family: - extra["malware_family"] = family - return [(tech, 1.0, extra) for tech in techs] + + families_raw = ( + payload.get("threatfox_malware_families") + or payload.get("malware_family") + ) + families: list[str] = [] + if isinstance(families_raw, list): + families = [f for f in families_raw if isinstance(f, str)] + elif isinstance(families_raw, str) and families_raw: + families = [families_raw] + ioc_types_raw = payload.get("threatfox_ioc_types") + ioc_types: list[str] = ( + [i for i in ioc_types_raw if isinstance(i, str)] + if isinstance(ioc_types_raw, list) else [] + ) + + return [ + (tech, 1.0, { + "threat_types": signals, + **({"malware_families": families} if families else {}), + **({"ioc_types": ioc_types} if ioc_types else {}), + }) + for tech, signals in triggered.items() + ] def _aggregate_bump_decisions( diff --git a/tests/ttp/test_intel_lifter.py b/tests/ttp/test_intel_lifter.py index ebe171a3..45c1c2f2 100644 --- a/tests/ttp/test_intel_lifter.py +++ b/tests/ttp/test_intel_lifter.py @@ -1,10 +1,16 @@ -"""Per-rule unit tests for :class:`IntelLifter` (E.3.10). +"""Per-rule unit tests for :class:`IntelLifter` (E.3.10 + 2026-05-02 audit). Per Appendix A.10 each provider's mapping is exercised positively with -realistic payload shapes (categories, tags, ioc_type) and negatively +realistic payload shapes (categories, tags, threat_types) and negatively with null / missing signals. The lifter must NEVER import from ``decnet.intel.*``; the static guard at E.2.7 enforces that — these tests are the behavioral counterpart. + +The 2026-05-02 ship-time audit found a class of cascade bugs where +``_emit_filtered`` silently dropped predicate decisions whose +``technique_id`` was missing from the rule YAML's ``emits`` list. The +test suite below exercises every previously-dropped technique end-to- +end so a regression of the same kind shows up immediately. """ from __future__ import annotations @@ -52,112 +58,232 @@ def _make_lifter(rule_ids: list[str]) -> IntelLifter: return lifter -# ── R0054 AbuseIPDB ──────────────────────────────────────────────── +def _techs(out: list[Any]) -> set[str]: + return {tag.technique_id for tag in out} + + +# ── R0054 AbuseIPDB — corrected v2 mapping ──────────────────────────── def test_abuseipdb_brute_force_category_emits_t1110() -> None: - lifter = _make_lifter(["R0054"]) - out = asyncio.run(lifter.tag(_ev({ + out = asyncio.run(_make_lifter(["R0054"]).tag(_ev({ "abuseipdb_score": 90, "abuseipdb_categories": [18, 22], }))) - techs = {tag.technique_id for tag in out} - assert "T1110" in techs + assert "T1110" in _techs(out) def test_abuseipdb_web_attack_emits_t1190() -> None: - lifter = _make_lifter(["R0054"]) - out = asyncio.run(lifter.tag(_ev({ + out = asyncio.run(_make_lifter(["R0054"]).tag(_ev({ "abuseipdb_score": 80, "abuseipdb_categories": [21], }))) - techs = {tag.technique_id for tag in out} - assert "T1190" in techs + assert "T1190" in _techs(out) -def test_abuseipdb_email_spam_high_score_includes_t1566() -> None: - lifter = _make_lifter(["R0054"]) - out = asyncio.run(lifter.tag(_ev({ - "abuseipdb_score": 90, # gated >=80 +def test_abuseipdb_email_spam_high_score_includes_t1566_and_t1496() -> None: + out = asyncio.run(_make_lifter(["R0054"]).tag(_ev({ + "abuseipdb_score": 90, "abuseipdb_categories": [11], }))) - techs = {tag.technique_id for tag in out} - assert "T1566" in techs + techs = _techs(out) + assert {"T1566", "T1496"} <= techs def test_abuseipdb_email_spam_low_score_excludes_t1566() -> None: - lifter = _make_lifter(["R0054"]) - out = asyncio.run(lifter.tag(_ev({ - "abuseipdb_score": 50, # below the T1566 gate + out = asyncio.run(_make_lifter(["R0054"]).tag(_ev({ + "abuseipdb_score": 50, "abuseipdb_categories": [11], }))) - techs = {tag.technique_id for tag in out} + techs = _techs(out) assert "T1566" not in techs + assert "T1496" in techs # ungated, still fires + + +def test_abuseipdb_port_scan_emits_t1046_and_t1595() -> None: + """Cat 14 → T1046 + T1595. Both were silently dropped pre-v2.""" + out = asyncio.run(_make_lifter(["R0054"]).tag(_ev({ + "abuseipdb_score": 80, + "abuseipdb_categories": [14], + }))) + assert {"T1046", "T1595"} <= _techs(out) + + +def test_abuseipdb_exploited_host_emits_t1078() -> None: + """Cat 20 → T1078. Silently dropped pre-v2.""" + out = asyncio.run(_make_lifter(["R0054"]).tag(_ev({ + "abuseipdb_score": 80, + "abuseipdb_categories": [20], + }))) + assert "T1078" in _techs(out) + + +def test_abuseipdb_open_proxy_emits_t1090() -> None: + """Cat 9 → T1090. Silently dropped pre-v2.""" + out = asyncio.run(_make_lifter(["R0054"]).tag(_ev({ + "abuseipdb_score": 80, + "abuseipdb_categories": [9], + }))) + assert "T1090" in _techs(out) + + +def test_abuseipdb_vpn_uses_correct_cat_13() -> None: + """Audit fix: cat 13 (VPN IP), NOT cat 17 — that was the v1 typo.""" + out = asyncio.run(_make_lifter(["R0054"]).tag(_ev({ + "abuseipdb_score": 80, + "abuseipdb_categories": [13], + }))) + assert "T1090" in _techs(out) + + +def test_abuseipdb_cat_17_now_emits_t1566_not_t1090() -> None: + """Audit fix: cat 17 is Spoofing, not VPN. Now → T1566 (phishing).""" + out = asyncio.run(_make_lifter(["R0054"]).tag(_ev({ + "abuseipdb_score": 80, + "abuseipdb_categories": [17], + }))) + techs = _techs(out) + assert "T1566" in techs + assert "T1090" not in techs + + +def test_abuseipdb_phishing_cat_7_emits_t1566() -> None: + """New mapping: cat 7 (Phishing) → T1566.""" + out = asyncio.run(_make_lifter(["R0054"]).tag(_ev({ + "abuseipdb_score": 80, + "abuseipdb_categories": [7], + }))) + assert "T1566" in _techs(out) + + +def test_abuseipdb_sql_injection_cat_16_emits_t1190() -> None: + """New mapping: cat 16 (SQL Injection) → T1190.""" + out = asyncio.run(_make_lifter(["R0054"]).tag(_ev({ + "abuseipdb_score": 80, + "abuseipdb_categories": [16], + }))) + assert "T1190" in _techs(out) + + +def test_abuseipdb_cat_10_no_longer_emits_ddos() -> None: + """Audit fix: cat 10 is Web Spam, not DDoS. Used to wrongly fire T1498. + Cat 10 is intentionally unmapped in v2 — no clean ATT&CK fit at IP layer. + """ + out = asyncio.run(_make_lifter(["R0054"]).tag(_ev({ + "abuseipdb_score": 95, + "abuseipdb_categories": [10], + }))) + assert "T1498" not in _techs(out) + + +def test_abuseipdb_cat_4_remains_unmapped() -> None: + """Per A.10: cat 4 (real DDoS) is intentionally dropped — too muddy.""" + out = asyncio.run(_make_lifter(["R0054"]).tag(_ev({ + "abuseipdb_score": 95, + "abuseipdb_categories": [4], + }))) + assert _techs(out) == set() def test_abuseipdb_confidence_scaled_by_score() -> None: - lifter = _make_lifter(["R0054"]) - out = asyncio.run(lifter.tag(_ev({ + out = asyncio.run(_make_lifter(["R0054"]).tag(_ev({ "abuseipdb_score": 50, "abuseipdb_categories": [18], }))) assert out - # Base for T1110 in R0054 YAML is 0.7 → 0.7 * 0.5 = 0.35. for tag in out: if tag.technique_id == "T1110": assert tag.confidence == pytest.approx(0.35) def test_abuseipdb_no_categories_no_emit() -> None: - lifter = _make_lifter(["R0054"]) - out = asyncio.run(lifter.tag(_ev({"abuseipdb_score": 95}))) + out = asyncio.run(_make_lifter(["R0054"]).tag(_ev({ + "abuseipdb_score": 95, + }))) assert out == [] def test_abuseipdb_score_none_no_emit() -> None: - lifter = _make_lifter(["R0054"]) - out = asyncio.run(lifter.tag(_ev({ + out = asyncio.run(_make_lifter(["R0054"]).tag(_ev({ "abuseipdb_score": None, "abuseipdb_categories": [18], }))) assert out == [] -# ── R0055 GreyNoise ──────────────────────────────────────────────── +# ── R0055 GreyNoise — corrected v2 mapping ──────────────────────────── def test_greynoise_scanner_emits_t1595() -> None: - lifter = _make_lifter(["R0055"]) - out = asyncio.run(lifter.tag(_ev({ + out = asyncio.run(_make_lifter(["R0055"]).tag(_ev({ "greynoise_classification": "scanner", }))) - techs = {tag.technique_id for tag in out} - assert "T1595" in techs + assert "T1595" in _techs(out) -def test_greynoise_c2_tag_emits_t1071() -> None: - lifter = _make_lifter(["R0055"]) - out = asyncio.run(lifter.tag(_ev({ +def test_greynoise_c2_tag_emits_both_t1071_and_t1588() -> None: + """Audit fix: T1588 used to be silently dropped. Now both fire.""" + out = asyncio.run(_make_lifter(["R0055"]).tag(_ev({ "greynoise_classification": "malicious", "greynoise_tags": ["cobalt_strike"], }))) - techs = {tag.technique_id for tag in out} - assert "T1071" in techs + assert {"T1071", "T1588"} <= _techs(out) + + +def test_greynoise_tor_exit_emits_t1090() -> None: + """Audit fix: tor_exit_node → T1090 silently dropped pre-v2.""" + out = asyncio.run(_make_lifter(["R0055"]).tag(_ev({ + "greynoise_classification": "malicious", + "greynoise_tags": ["tor_exit_node"], + }))) + assert "T1090" in _techs(out) + + +def test_greynoise_ssh_bruteforcer_emits_t1110() -> None: + """Audit fix: ssh_bruteforcer → T1110 silently dropped pre-v2.""" + out = asyncio.run(_make_lifter(["R0055"]).tag(_ev({ + "greynoise_classification": "malicious", + "greynoise_tags": ["ssh_bruteforcer"], + }))) + assert "T1110" in _techs(out) + + +def test_greynoise_bare_malicious_emits_t1071_at_half() -> None: + """Audit gap: bare malicious classification used to emit nothing.""" + out = asyncio.run(_make_lifter(["R0055"]).tag(_ev({ + "greynoise_classification": "malicious", + "greynoise_tags": [], + }))) + techs_by_id = {tag.technique_id: tag for tag in out} + assert "T1071" in techs_by_id + # R0055 base T1071 conf is 0.7; bare-malicious multiplier is 0.5 → + # 0.35. Tags would have fired at 1.0× (0.7). + assert techs_by_id["T1071"].confidence == pytest.approx(0.35) + + +def test_greynoise_bare_malicious_does_not_fire_when_specific_tag_present() -> None: + """Bare-malicious lane is suppressed when a tag already fires T1071 + so we don't double-stamp at conflicting confidence levels.""" + out = asyncio.run(_make_lifter(["R0055"]).tag(_ev({ + "greynoise_classification": "malicious", + "greynoise_tags": ["cobalt_strike"], + }))) + t1071_tags = [t for t in out if t.technique_id == "T1071"] + assert len(t1071_tags) == 1 + assert t1071_tags[0].confidence == pytest.approx(0.7) # tag rate, not 0.35 def test_greynoise_benign_no_emit() -> None: - lifter = _make_lifter(["R0055"]) - out = asyncio.run(lifter.tag(_ev({ + out = asyncio.run(_make_lifter(["R0055"]).tag(_ev({ "greynoise_classification": "benign", "greynoise_tags": [], }))) assert out == [] -def test_greynoise_unknown_tag_no_emit() -> None: - lifter = _make_lifter(["R0055"]) - out = asyncio.run(lifter.tag(_ev({ - "greynoise_classification": "malicious", +def test_greynoise_unknown_tag_with_unknown_classification_no_emit() -> None: + out = asyncio.run(_make_lifter(["R0055"]).tag(_ev({ + "greynoise_classification": "unknown", "greynoise_tags": ["random_unmapped"], }))) assert out == [] @@ -167,47 +293,100 @@ def test_greynoise_unknown_tag_no_emit() -> None: def test_feodo_listed_emits_both() -> None: - lifter = _make_lifter(["R0056"]) - out = asyncio.run(lifter.tag(_ev({ + out = asyncio.run(_make_lifter(["R0056"]).tag(_ev({ "feodo_listed": True, - "malware_family": "Emotet", + "feodo_malware_family": "Emotet", }))) - techs = {tag.technique_id for tag in out} + techs = _techs(out) assert techs == {"T1071", "T1588"} for tag in out: assert tag.evidence.get("malware_family") == "Emotet" +def test_feodo_legacy_malware_family_field_still_works() -> None: + """Tolerate the older payload shape (`malware_family` not prefixed).""" + out = asyncio.run(_make_lifter(["R0056"]).tag(_ev({ + "feodo_listed": True, + "malware_family": "Dridex", + }))) + for tag in out: + assert tag.evidence.get("malware_family") == "Dridex" + + def test_feodo_unlisted_no_emit() -> None: - lifter = _make_lifter(["R0056"]) - out = asyncio.run(lifter.tag(_ev({"feodo_listed": False}))) + out = asyncio.run(_make_lifter(["R0056"]).tag(_ev({"feodo_listed": False}))) assert out == [] def test_feodo_missing_no_emit() -> None: - lifter = _make_lifter(["R0056"]) - out = asyncio.run(lifter.tag(_ev({}))) + out = asyncio.run(_make_lifter(["R0056"]).tag(_ev({}))) assert out == [] -# ── R0057 ThreatFox ──────────────────────────────────────────────── +# ── R0057 ThreatFox — corrected v2 mapping ───────────────────────── -def test_threatfox_botnet_cc_emits() -> None: - lifter = _make_lifter(["R0057"]) - out = asyncio.run(lifter.tag(_ev({ - "ioc_type": "botnet_cc", - "malware_family": "sliver", +def test_threatfox_botnet_cc_threat_type_emits_t1071_and_t1588() -> None: + """Audit fix: ThreatFox keys on threat_type now (was ioc_type). And + T1588 used to be silently dropped despite being in the mapping.""" + out = asyncio.run(_make_lifter(["R0057"]).tag(_ev({ + "threatfox_threat_types": ["botnet_cc"], + "threatfox_malware_families": ["sliver"], }))) - techs = {tag.technique_id for tag in out} - assert "T1071" in techs and "T1588" in techs + techs = _techs(out) + assert {"T1071", "T1588"} <= techs for tag in out: - assert tag.evidence.get("malware_family") == "sliver" + ev_families = tag.evidence.get("malware_families") + if ev_families: + assert "sliver" in ev_families -def test_threatfox_unknown_ioc_no_emit() -> None: - lifter = _make_lifter(["R0057"]) - out = asyncio.run(lifter.tag(_ev({"ioc_type": "weird_unknown"}))) +def test_threatfox_payload_delivery_emits_t1105_and_t1588() -> None: + """Audit fix: payload_delivery → T1105 silently dropped pre-v2.""" + out = asyncio.run(_make_lifter(["R0057"]).tag(_ev({ + "threatfox_threat_types": ["payload_delivery"], + }))) + assert {"T1105", "T1588"} <= _techs(out) + + +def test_threatfox_payload_emits_t1588_only() -> None: + """New mapping: threat_type=payload (hash-only IOC) → T1588.""" + out = asyncio.run(_make_lifter(["R0057"]).tag(_ev({ + "threatfox_threat_types": ["payload"], + }))) + assert "T1588" in _techs(out) + + +def test_threatfox_cc_skimming_emits_t1056() -> None: + """New mapping: cc_skimming → T1056 (Input Capture).""" + out = asyncio.run(_make_lifter(["R0057"]).tag(_ev({ + "threatfox_threat_types": ["cc_skimming"], + }))) + assert "T1056" in _techs(out) + + +def test_threatfox_legacy_threat_type_singular_still_works() -> None: + """Tolerate the legacy/test payload shape with a singular field.""" + out = asyncio.run(_make_lifter(["R0057"]).tag(_ev({ + "threat_type": "botnet_cc", + }))) + assert "T1071" in _techs(out) + + +def test_threatfox_unknown_threat_type_no_emit() -> None: + out = asyncio.run(_make_lifter(["R0057"]).tag(_ev({ + "threatfox_threat_types": ["mystery_type"], + }))) + assert out == [] + + +def test_threatfox_ioc_type_alone_no_longer_drives_techniques() -> None: + """Audit fix: ioc_type is the indicator format (url/domain/hash) and + carries no ATT&CK signal. v1 mistakenly keyed on it.""" + out = asyncio.run(_make_lifter(["R0057"]).tag(_ev({ + "threatfox_ioc_types": ["url", "domain"], + # threat_types intentionally absent + }))) assert out == [] @@ -215,11 +394,7 @@ def test_threatfox_unknown_ioc_no_emit() -> None: def test_aggregate_bump_is_inert_in_v0() -> None: - """R0058 is a bump-only meta-rule; the v0 lifter cannot bump - cross-tag confidences from a single TaggerEvent. Stays no-op - until E.3.14 worker bootstrap can plumb the cross-tag write.""" - lifter = _make_lifter(["R0058"]) - out = asyncio.run(lifter.tag(_ev({ + out = asyncio.run(_make_lifter(["R0058"]).tag(_ev({ "aggregate_verdict": "malicious", }))) assert out == [] @@ -249,7 +424,6 @@ def test_clipped_intel_rule_caps_confidence() -> None: }))) assert out for tag in out: - # Base T1110 conf 0.7 × score 1.0 × ceiling 0.5 = 0.35 assert tag.confidence <= 0.35 + 1e-6 @@ -257,8 +431,6 @@ def test_clipped_intel_rule_caps_confidence() -> None: def test_module_has_no_intel_imports() -> None: - """IntelLifter must reach AttackerIntel data only via the upstream - payload — never by importing from decnet.intel.*.""" import decnet.ttp.impl.intel_lifter as mod # noqa: PLC0415 src = Path(mod.__file__ or "").read_text()