diff --git a/decnet/ttp/data/__init__.py b/decnet/ttp/data/__init__.py new file mode 100644 index 00000000..818e29e7 --- /dev/null +++ b/decnet/ttp/data/__init__.py @@ -0,0 +1,6 @@ +"""Data files used at runtime by the TTP layer. + +See ``decnet/ttp/data/intel/`` for provider-signal → ATT&CK technique +mappings consumed by :mod:`decnet.ttp.impl.intel_lifter` via +:mod:`decnet.ttp.data.intel_loader`. +""" diff --git a/decnet/ttp/data/intel/__init__.py b/decnet/ttp/data/intel/__init__.py new file mode 100644 index 00000000..3918fc4d --- /dev/null +++ b/decnet/ttp/data/intel/__init__.py @@ -0,0 +1,8 @@ +"""Per-provider intel-signal → ATT&CK technique mapping data. + +One YAML file per intel provider (abuseipdb / greynoise / feodo / +threatfox), structured per the schema in +:mod:`decnet.ttp.data.intel_loader`. Each entry carries a STIX-shaped +``external_reference`` so the future STIX/MISP exporter can emit +relationship objects without a second mapping pass. +""" diff --git a/decnet/ttp/data/intel/abuseipdb.yaml b/decnet/ttp/data/intel/abuseipdb.yaml new file mode 100644 index 00000000..979d45cf --- /dev/null +++ b/decnet/ttp/data/intel/abuseipdb.yaml @@ -0,0 +1,125 @@ +# AbuseIPDB category → ATT&CK technique mapping. +# +# Mirrors what _ABUSEIPDB_CATEGORY_TO_TECHNIQUES + _ABUSEIPDB_HIGH_SCORE_GATED +# used to encode in decnet/ttp/impl/intel_lifter.py before the data +# extraction. Source-of-truth column for which categories produce +# which ATT&CK tags, paired with rules/ttp/R0054.yaml which declares +# the full slate the predicate can emit. +# +# Cat 4 (DDoS), 10 (Web Spam), 12 (Blog Spam) are intentionally +# unmapped — design doc TTP_TAGGING.md §A.10: DDoS-without-protocol +# is too muddy for v0; CMS spam has no clean ATT&CK fit at the IP +# layer. Keep the explanatory comments here so the next quarterly +# drift check (development/DEBT.md DEBT-048) can diff cheaply. +provider: abuseipdb +mapping_version: "2" +attack_release: ">=15.1" +signals: + - id: cat_5 + label: "FTP Brute-Force" + external_reference: + source_name: abuseipdb + url: "https://www.abuseipdb.com/categories#5" + techniques: + - technique_id: T1110 + - id: cat_7 + label: "Phishing" + external_reference: + source_name: abuseipdb + url: "https://www.abuseipdb.com/categories#7" + techniques: + - technique_id: T1566 + - id: cat_9 + label: "Open Proxy" + external_reference: + source_name: abuseipdb + url: "https://www.abuseipdb.com/categories#9" + techniques: + - technique_id: T1090 + - id: cat_11 + label: "Email Spam" + external_reference: + source_name: abuseipdb + url: "https://www.abuseipdb.com/categories#11" + techniques: + - technique_id: T1496 + - technique_id: T1566 + high_score_threshold: 80 + - id: cat_13 + label: "VPN IP" + external_reference: + source_name: abuseipdb + url: "https://www.abuseipdb.com/categories#13" + techniques: + - technique_id: T1090 + - id: cat_14 + label: "Port Scan" + external_reference: + source_name: abuseipdb + url: "https://www.abuseipdb.com/categories#14" + techniques: + - technique_id: T1046 + - technique_id: T1595 + - id: cat_15 + label: "Hacking" + external_reference: + source_name: abuseipdb + url: "https://www.abuseipdb.com/categories#15" + techniques: + - technique_id: T1190 + - id: cat_16 + label: "SQL Injection" + external_reference: + source_name: abuseipdb + url: "https://www.abuseipdb.com/categories#16" + techniques: + - technique_id: T1190 + - id: cat_17 + label: "Spoofing" + external_reference: + source_name: abuseipdb + url: "https://www.abuseipdb.com/categories#17" + techniques: + - technique_id: T1566 + - id: cat_18 + label: "Brute-Force" + external_reference: + source_name: abuseipdb + url: "https://www.abuseipdb.com/categories#18" + techniques: + - technique_id: T1110 + - id: cat_19 + label: "Bad Web Bot" + external_reference: + source_name: abuseipdb + url: "https://www.abuseipdb.com/categories#19" + techniques: + - technique_id: T1595 + - id: cat_20 + label: "Exploited Host" + external_reference: + source_name: abuseipdb + url: "https://www.abuseipdb.com/categories#20" + techniques: + - technique_id: T1078 + - id: cat_21 + label: "Web App Attack" + external_reference: + source_name: abuseipdb + url: "https://www.abuseipdb.com/categories#21" + techniques: + - technique_id: T1190 + - id: cat_22 + label: "SSH" + external_reference: + source_name: abuseipdb + url: "https://www.abuseipdb.com/categories#22" + techniques: + - technique_id: T1110 + - id: cat_23 + label: "IoT Targeted" + external_reference: + source_name: abuseipdb + url: "https://www.abuseipdb.com/categories#23" + techniques: + - technique_id: T1190 diff --git a/decnet/ttp/data/intel/feodo.yaml b/decnet/ttp/data/intel/feodo.yaml new file mode 100644 index 00000000..0172936d --- /dev/null +++ b/decnet/ttp/data/intel/feodo.yaml @@ -0,0 +1,20 @@ +# Feodo Tracker → ATT&CK technique mapping. +# +# Feodo Tracker is a binary listed/not-listed feed; there are no +# per-signal subtypes to enumerate. Both T1071 (Application Layer +# Protocol) and T1588 (Obtain Capabilities) fire whenever an attacker +# IP is on the Feodo blocklist. Keeping this as a single ``feodo_listed`` +# signal preserves the structured-mapping shape for the future +# STIX/MISP exporter without inventing fake categories. +provider: feodo +mapping_version: "1" +attack_release: ">=15.1" +signals: + - id: feodo_listed + label: "Listed on Feodo Tracker" + external_reference: + source_name: feodo + url: "https://feodotracker.abuse.ch/about/" + techniques: + - technique_id: T1071 + - technique_id: T1588 diff --git a/decnet/ttp/data/intel/greynoise.yaml b/decnet/ttp/data/intel/greynoise.yaml new file mode 100644 index 00000000..3bca00eb --- /dev/null +++ b/decnet/ttp/data/intel/greynoise.yaml @@ -0,0 +1,74 @@ +# GreyNoise tag → ATT&CK technique mapping. +# +# Mirrors what _GREYNOISE_TAG_TO_TECHNIQUES used to encode in +# decnet/ttp/impl/intel_lifter.py. Note: GreyNoise's Community +# endpoint does not return tags; these fire only when operators wire +# a non-Community provider (Visualizer / Enterprise / RIOT). Kept +# canonical here so the upgrade path is a column populate, not a +# code change. Decision-flow constants for bare ``classification == +# "scanner"`` (T1595) and bare ``classification == "malicious"`` +# (T1071 at 0.5×) stay in code — they're not table rows. +provider: greynoise +mapping_version: "1" +attack_release: ">=15.1" +signals: + - id: tor_exit_node + label: "Tor exit node" + external_reference: + source_name: greynoise + url: "https://docs.greynoise.io/docs/understanding-greynoise-tags" + external_id: tor_exit_node + techniques: + - technique_id: T1090 + - id: ssh_bruteforcer + label: "SSH brute-forcer" + external_reference: + source_name: greynoise + url: "https://docs.greynoise.io/docs/understanding-greynoise-tags" + external_id: ssh_bruteforcer + techniques: + - technique_id: T1110 + - id: web_crawler + label: "Web crawler" + external_reference: + source_name: greynoise + url: "https://docs.greynoise.io/docs/understanding-greynoise-tags" + external_id: web_crawler + techniques: + - technique_id: T1595 + - id: cobalt_strike + label: "Cobalt Strike" + external_reference: + source_name: greynoise + url: "https://docs.greynoise.io/docs/understanding-greynoise-tags" + external_id: cobalt_strike + techniques: + - technique_id: T1071 + - technique_id: T1588 + - id: metasploit + label: "Metasploit" + external_reference: + source_name: greynoise + url: "https://docs.greynoise.io/docs/understanding-greynoise-tags" + external_id: metasploit + techniques: + - technique_id: T1071 + - technique_id: T1588 + - id: sliver + label: "Sliver" + external_reference: + source_name: greynoise + url: "https://docs.greynoise.io/docs/understanding-greynoise-tags" + external_id: sliver + techniques: + - technique_id: T1071 + - technique_id: T1588 + - id: havoc + label: "Havoc" + external_reference: + source_name: greynoise + url: "https://docs.greynoise.io/docs/understanding-greynoise-tags" + external_id: havoc + techniques: + - technique_id: T1071 + - technique_id: T1588 diff --git a/decnet/ttp/data/intel/threatfox.yaml b/decnet/ttp/data/intel/threatfox.yaml new file mode 100644 index 00000000..e9e25543 --- /dev/null +++ b/decnet/ttp/data/intel/threatfox.yaml @@ -0,0 +1,45 @@ +# ThreatFox threat_type → ATT&CK technique mapping. +# +# Mirrors _THREATFOX_THREAT_TYPE_TO_TECHNIQUES from +# decnet/ttp/impl/intel_lifter.py. ThreatFox's canonical taxonomy is +# the ``threat_type`` field (NOT ``ioc_type`` — that was the v1 +# ship-time bug). ``ioc_type`` is the indicator format (url, domain, +# md5_hash, …) and carries no ATT&CK signal. +provider: threatfox +mapping_version: "1" +attack_release: ">=15.1" +signals: + - id: botnet_cc + label: "Botnet C2" + external_reference: + source_name: threatfox + url: "https://threatfox.abuse.ch/faq/" + external_id: botnet_cc + techniques: + - technique_id: T1071 + - technique_id: T1588 + - id: payload_delivery + label: "Payload delivery" + external_reference: + source_name: threatfox + url: "https://threatfox.abuse.ch/faq/" + external_id: payload_delivery + techniques: + - technique_id: T1105 + - technique_id: T1588 + - id: payload + label: "Payload" + external_reference: + source_name: threatfox + url: "https://threatfox.abuse.ch/faq/" + external_id: payload + techniques: + - technique_id: T1588 + - id: cc_skimming + label: "Credit-card skimming" + external_reference: + source_name: threatfox + url: "https://threatfox.abuse.ch/faq/" + external_id: cc_skimming + techniques: + - technique_id: T1056 diff --git a/decnet/ttp/data/intel_loader.py b/decnet/ttp/data/intel_loader.py new file mode 100644 index 00000000..822127de --- /dev/null +++ b/decnet/ttp/data/intel_loader.py @@ -0,0 +1,229 @@ +"""YAML-backed loader for intel-provider → ATT&CK technique mappings. + +Replaces the ``_*_TO_TECHNIQUES`` ``Final[dict]`` tables that used to +live in :mod:`decnet.ttp.impl.intel_lifter`. Source-of-truth files +live under :mod:`decnet.ttp.data.intel` (one YAML per provider) and +are validated against the loaded ATT&CK STIX bundle at load time: + +* every ``technique_id`` in every signal must resolve in + :func:`decnet.ttp.attack_stix.technique_exists` +* every entry is enriched with the canonical MITRE + ``external_reference`` (source_name=``mitre-attack``, url) so the + future STIX/MISP exporter can emit fully-resolved relationship + objects without a second mapping pass + +Design constraint: this module is the only place provider-mapping +schema knowledge lives. ``intel_lifter`` reads :class:`ProviderMapping` +accessors and never touches the dicts directly. +""" +from __future__ import annotations + +from dataclasses import dataclass +from functools import lru_cache +from pathlib import Path +from typing import Any + +import yaml +from pydantic import BaseModel, ConfigDict, Field + +from decnet.ttp import attack_stix + +_DATA_DIR: Path = Path(__file__).parent / "intel" + + +# ─── YAML schema (pydantic v2) ───────────────────────────────────── + + +class ExternalReference(BaseModel): + """STIX 2.1 ``external-reference`` shape — kept faithful so the + future STIX exporter is a direct translation.""" + + model_config = ConfigDict(frozen=True) + + source_name: str + url: str + external_id: str | None = None + + +class TechniqueEntry(BaseModel): + model_config = ConfigDict(frozen=True) + + technique_id: str + # Per-technique gate: emission only fires when an upstream + # confidence score (e.g. AbuseIPDB ``abuseConfidenceScore``) + # meets or exceeds this floor. None = always fire. + high_score_threshold: int | None = None + + +class SignalEntry(BaseModel): + model_config = ConfigDict(frozen=True) + + id: str + label: str + external_reference: ExternalReference + techniques: tuple[TechniqueEntry, ...] + confidence_multiplier: float = 1.0 + + +class ProviderMappingFile(BaseModel): + model_config = ConfigDict(frozen=True) + + provider: str + mapping_version: str + attack_release: str = Field( + description="Minimum ATT&CK release this mapping is known-correct against." + ) + signals: tuple[SignalEntry, ...] + + +# ─── Runtime accessor objects ────────────────────────────────────── + + +@dataclass(frozen=True) +class TechniqueEmission: + """A single emit slot for a (signal, technique) pair, enriched with the canonical MITRE URL.""" + + technique_id: str + high_score_threshold: int | None + mitre_url: str | None + + +@dataclass(frozen=True) +class Signal: + id: str + label: str + external_reference: ExternalReference + emissions: tuple[TechniqueEmission, ...] + confidence_multiplier: float + + def technique_ids(self) -> frozenset[str]: + return frozenset(e.technique_id for e in self.emissions) + + +@dataclass(frozen=True) +class ProviderMapping: + provider: str + mapping_version: str + signals: tuple[Signal, ...] + _by_id: dict[str, Signal] + + def get(self, signal_id: str) -> Signal | None: + return self._by_id.get(signal_id) + + def techniques_for_signal( + self, signal_id: str, *, score: float | None = None + ) -> frozenset[TechniqueEmission]: + """Emissions a given signal produces, filtered by ``score``-vs-threshold gate. + + ``score`` is the upstream confidence (e.g. AbuseIPDB + ``abuseConfidenceScore`` 0-100). If a technique has a + ``high_score_threshold`` and ``score`` is below it (or + unknown), that technique is filtered out. Mirrors the legacy + ``_ABUSEIPDB_HIGH_SCORE_GATED`` semantics. + """ + sig = self._by_id.get(signal_id) + if sig is None: + return frozenset() + out: set[TechniqueEmission] = set() + for emission in sig.emissions: + if emission.high_score_threshold is not None: + if score is None or score < emission.high_score_threshold: + continue + out.add(emission) + return frozenset(out) + + def all_technique_ids(self) -> frozenset[str]: + return frozenset( + e.technique_id for sig in self.signals for e in sig.emissions + ) + + def signal_ids(self) -> frozenset[str]: + return frozenset(self._by_id.keys()) + + +# ─── Loader ──────────────────────────────────────────────────────── + + +def _mitre_url_for(technique_id: str) -> str | None: + obj = attack_stix._attack_pattern_by_id(technique_id) + if obj is None: + return None + for ref in obj.get("external_references", []): + if ref.get("source_name") == "mitre-attack": + return ref.get("url") + return None + + +def _data_path(provider: str) -> Path: + return _DATA_DIR / f"{provider}.yaml" + + +@lru_cache(maxsize=8) +def load_provider_mapping(provider: str) -> ProviderMapping: + """Load + validate + enrich a provider's mapping YAML. Cached process-wide.""" + path = _data_path(provider) + if not path.is_file(): + raise FileNotFoundError( + f"intel mapping for provider {provider!r} not found at {path}" + ) + raw: Any = yaml.safe_load(path.read_text(encoding="utf-8")) + parsed = ProviderMappingFile.model_validate(raw) + if parsed.provider != provider: + raise ValueError( + f"{path}: provider field {parsed.provider!r} does not match " + f"filename {provider!r}" + ) + + # Validate every technique resolves in the loaded ATT&CK bundle. + all_ids = sorted( + {t.technique_id for s in parsed.signals for t in s.techniques} + ) + attack_stix.assert_known_technique_ids( + all_ids, source=f"decnet/ttp/data/intel/{provider}.yaml" + ) + + signals: list[Signal] = [] + for s in parsed.signals: + emissions = tuple( + TechniqueEmission( + technique_id=t.technique_id, + high_score_threshold=t.high_score_threshold, + mitre_url=_mitre_url_for(t.technique_id), + ) + for t in s.techniques + ) + signals.append( + Signal( + id=s.id, + label=s.label, + external_reference=s.external_reference, + emissions=emissions, + confidence_multiplier=s.confidence_multiplier, + ) + ) + by_id = {s.id: s for s in signals} + if len(by_id) != len(signals): + dupes = [s.id for s in signals if list(by_id).count(s.id) > 1] + raise ValueError(f"{path}: duplicate signal ids: {dupes}") + + return ProviderMapping( + provider=parsed.provider, + mapping_version=parsed.mapping_version, + signals=tuple(signals), + _by_id=by_id, + ) + + +def clear_cache() -> None: + """Drop cached :class:`ProviderMapping` instances. Test-only knob.""" + load_provider_mapping.cache_clear() + + +__all__ = [ + "ExternalReference", + "ProviderMapping", + "Signal", + "TechniqueEmission", + "clear_cache", + "load_provider_mapping", +] diff --git a/decnet/ttp/impl/intel_lifter.py b/decnet/ttp/impl/intel_lifter.py index d920c231..e34d9c4f 100644 --- a/decnet/ttp/impl/intel_lifter.py +++ b/decnet/ttp/impl/intel_lifter.py @@ -17,9 +17,15 @@ gate emission, not provider count). from __future__ import annotations from collections.abc import Callable +from functools import lru_cache from typing import Any, Final from decnet.ttp.base import TaggerEvent, TolerantTagger +from decnet.ttp.data.intel_loader import ( + ProviderMapping, + TechniqueEmission, + load_provider_mapping, +) from decnet.ttp.impl._emit import emit_tags from decnet.ttp.impl._rule_index import RuleIndex from decnet.ttp.impl._state import apply_ceiling, is_active @@ -28,71 +34,39 @@ from decnet.ttp.store.base import RuleStore from decnet.web.db.models.ttp import TTPTag, compute_tag_uuid -# AbuseIPDB category → set of technique_ids that fire on it. Derived -# from TTP_TAGGING.md Appendix A.10 (post 2026-05-02 ship-time audit). -# Category code names are AbuseIPDB's canonical taxonomy at -# https://www.abuseipdb.com/categories — kept verbatim in the comment so -# the next quarterly drift check (development/DEBT.md DEBT-048) can -# diff cheaply. Cat 4 (DDoS Attack) and 10 (Web Spam) and 12 (Blog -# Spam) are intentionally unmapped — design doc §A.10 marks -# DDoS-without-protocol as too muddy for v0, and CMS spam has no clean -# ATT&CK fit at the IP layer. -_ABUSEIPDB_CATEGORY_TO_TECHNIQUES: Final[dict[int, frozenset[str]]] = { - 5: frozenset({"T1110"}), # FTP Brute-Force - 7: frozenset({"T1566"}), # Phishing - 9: frozenset({"T1090"}), # Open Proxy - 11: frozenset({"T1496", "T1566"}), # Email Spam (T1566 high-score only) - 13: frozenset({"T1090"}), # VPN IP - 14: frozenset({"T1046", "T1595"}), # Port Scan - 15: frozenset({"T1190"}), # Hacking - 16: frozenset({"T1190"}), # SQL Injection - 17: frozenset({"T1566"}), # Spoofing (email-sender) - 18: frozenset({"T1110"}), # Brute-Force - 19: frozenset({"T1595"}), # Bad Web Bot - 20: frozenset({"T1078"}), # Exploited Host - 21: frozenset({"T1190"}), # Web App Attack - 22: frozenset({"T1110"}), # SSH - 23: frozenset({"T1190"}), # IoT Targeted -} - -# Categories where a technique only fires above a confidence-score -# threshold (per A.10: "11 — Email Spam (high score, ≥80) → T1566"). -_ABUSEIPDB_HIGH_SCORE_GATED: Final[dict[int, dict[str, int]]] = { - 11: {"T1566": 80}, -} +# Provider→technique mappings live as YAML under +# decnet/ttp/data/intel/{provider}.yaml — see +# decnet.ttp.data.intel_loader for the schema and validation. Lazy +# accessors below mean module import does not trigger an ATT&CK +# bundle load (the loader validates every technique resolves there). -# GreyNoise tag → set of technique_ids the tag warrants. Note: the -# Community endpoint does not return tags today — these fire only when -# operators wire a non-Community provider that does. Kept canonical so -# the upgrade path is just a column populate, not a code change. -_GREYNOISE_TAG_TO_TECHNIQUES: Final[dict[str, frozenset[str]]] = { - "tor_exit_node": frozenset({"T1090"}), - "ssh_bruteforcer": frozenset({"T1110"}), - "web_crawler": frozenset({"T1595"}), - "cobalt_strike": frozenset({"T1071", "T1588"}), - "metasploit": frozenset({"T1071", "T1588"}), - "sliver": frozenset({"T1071", "T1588"}), - "havoc": frozenset({"T1071", "T1588"}), -} +@lru_cache(maxsize=4) +def _mapping(provider: str) -> ProviderMapping: + return load_provider_mapping(provider) + # Confidence multiplier when GreyNoise reports ``classification == # "malicious"`` without a specific tag we recognise. The bare # classification is real signal but weaker than a tag — half-confidence -# keeps the floor honest. +# keeps the floor honest. Decision-flow constant, not a table row. _GREYNOISE_MALICIOUS_BARE_MULT: Final[float] = 0.5 -# ThreatFox THREAT TYPE (NOT ioc_type — that was the v1 ship-time bug) -# → set of technique_ids. Per ThreatFox's API the canonical taxonomy -# field is ``threat_type`` ∈ {botnet_cc, payload_delivery, payload, -# cc_skimming}; ``ioc_type`` is the indicator format (url, domain, -# md5_hash, …) and carries no ATT&CK signal. -_THREATFOX_THREAT_TYPE_TO_TECHNIQUES: Final[dict[str, frozenset[str]]] = { - "botnet_cc": frozenset({"T1071", "T1588"}), - "payload_delivery": frozenset({"T1105", "T1588"}), - "payload": frozenset({"T1588"}), - "cc_skimming": frozenset({"T1056"}), -} + +def _emission_url_extras( + emissions: dict[str, TechniqueEmission], +) -> dict[str, dict[str, str]]: + """Map technique_id → {"mitre_url": ""} for every emission that has one. + + Lets the per-decision-function evidence_extra builders attach the + canonical MITRE URL to each emit slot without re-resolving against + the loaded ATT&CK bundle. + """ + return { + tid: {"mitre_url": e.mitre_url} + for tid, e in emissions.items() + if e.mitre_url + } # Predicate signature: returns either a list of (technique_id_filter, @@ -114,14 +88,15 @@ def _abuseipdb_decisions( categories: list[int] = [c for c in categories_raw if isinstance(c, int)] if not categories: return [] - # Resolve technique set across all categories present. + mapping = _mapping("abuseipdb") + # Resolve technique set across all categories present, applying + # any per-technique high-score gate (see TechniqueEmission). triggered: dict[str, list[int]] = {} + emissions_by_tech: dict[str, TechniqueEmission] = {} for cat in categories: - for tech in _ABUSEIPDB_CATEGORY_TO_TECHNIQUES.get(cat, frozenset()): - gate = _ABUSEIPDB_HIGH_SCORE_GATED.get(cat, {}).get(tech) - if gate is not None and score < gate: - continue - triggered.setdefault(tech, []).append(cat) + for emission in mapping.techniques_for_signal(f"cat_{cat}", score=float(score)): + triggered.setdefault(emission.technique_id, []).append(cat) + emissions_by_tech.setdefault(emission.technique_id, emission) if not triggered: return [] multiplier = float(score) / 100.0 @@ -129,6 +104,10 @@ def _abuseipdb_decisions( (tech, multiplier, { "abuseipdb_categories": cats, "abuse_confidence_score": int(score), + **( + {"mitre_url": emissions_by_tech[tech].mitre_url} + if emissions_by_tech[tech].mitre_url else {} + ), }) for tech, cats in triggered.items() ] @@ -152,20 +131,26 @@ def _greynoise_decisions( """ classification = payload.get("greynoise_classification") tags_raw = payload.get("greynoise_tags") or [] + mapping = _mapping("greynoise") # Per-technique evidence accumulator — maps technique_id to the # signals that triggered it AND the multiplier to apply (max wins # if multiple lanes hit the same technique). triggered: dict[str, tuple[float, list[str]]] = {} + emissions_by_tech: dict[str, TechniqueEmission] = {} - def _bump(tech: str, mult: float, signal: str) -> None: + def _bump( + tech: str, mult: float, signal: str, emission: TechniqueEmission | None = None, + ) -> None: existing = triggered.get(tech) if existing is None: triggered[tech] = (mult, [signal]) - return - old_mult, signals = existing - signals.append(signal) - if mult > old_mult: - triggered[tech] = (mult, signals) + else: + old_mult, signals = existing + signals.append(signal) + if mult > old_mult: + triggered[tech] = (mult, signals) + if emission is not None: + emissions_by_tech.setdefault(tech, emission) if classification == "scanner": _bump("T1595", 1.0, "scanner") @@ -173,8 +158,8 @@ def _greynoise_decisions( for tag in tags_raw: if not isinstance(tag, str): continue - for tech in _GREYNOISE_TAG_TO_TECHNIQUES.get(tag, frozenset()): - _bump(tech, 1.0, tag) + for emission in mapping.techniques_for_signal(tag): + _bump(emission.technique_id, 1.0, tag, emission) if classification == "malicious" and "T1071" not in triggered: _bump("T1071", _GREYNOISE_MALICIOUS_BARE_MULT, "malicious") if not triggered: @@ -183,6 +168,11 @@ def _greynoise_decisions( (tech, mult, { "greynoise_classification": classification, "greynoise_tags": signals, + **( + {"mitre_url": emissions_by_tech[tech].mitre_url} + if tech in emissions_by_tech and emissions_by_tech[tech].mitre_url + else {} + ), }) for tech, (mult, signals) in triggered.items() ] @@ -197,14 +187,17 @@ def _feodo_decisions( payload.get("feodo_malware_family") or payload.get("malware_family") ) - extra: dict[str, Any] = {"feodo_listed": True} + base_extra: dict[str, Any] = {"feodo_listed": True} if isinstance(family, str) and family: - extra["malware_family"] = family - # Both T1071 and T1588 emits fire from a Feodo hit. - return [ - ("T1071", 1.0, extra), - ("T1588", 1.0, extra), - ] + base_extra["malware_family"] = family + mapping = _mapping("feodo") + out: EmitDecision = [] + for emission in mapping.techniques_for_signal("feodo_listed"): + extra = dict(base_extra) + if emission.mitre_url: + extra["mitre_url"] = emission.mitre_url + out.append((emission.technique_id, 1.0, extra)) + return out def _threatfox_decisions( @@ -229,10 +222,13 @@ def _threatfox_decisions( elif isinstance(threat_types_raw, str) and threat_types_raw: threat_types = [threat_types_raw] + mapping = _mapping("threatfox") triggered: dict[str, list[str]] = {} + emissions_by_tech: dict[str, TechniqueEmission] = {} for tt in threat_types: - for tech in _THREATFOX_THREAT_TYPE_TO_TECHNIQUES.get(tt, frozenset()): - triggered.setdefault(tech, []).append(tt) + for emission in mapping.techniques_for_signal(tt): + triggered.setdefault(emission.technique_id, []).append(tt) + emissions_by_tech.setdefault(emission.technique_id, emission) if not triggered: return [] @@ -256,6 +252,10 @@ def _threatfox_decisions( "threat_types": signals, **({"malware_families": families} if families else {}), **({"ioc_types": ioc_types} if ioc_types else {}), + **( + {"mitre_url": emissions_by_tech[tech].mitre_url} + if emissions_by_tech[tech].mitre_url else {} + ), }) for tech, signals in triggered.items() ] @@ -376,25 +376,22 @@ def _emit_filtered( def all_emitted_technique_ids() -> frozenset[str]: - """Every technique ID this lifter could emit, drawn from all four provider tables. + """Every technique ID this lifter could emit, drawn from the four provider mapping YAMLs plus decision-flow constants. - Used by :func:`validate_against_attack_bundle` (and - :mod:`tests.ttp.test_attack_catalog`-adjacent tests) to assert that - every provider-driven emission resolves in the loaded ATT&CK STIX + Used by :func:`validate_against_attack_bundle` to assert every + provider-driven emission resolves in the loaded ATT&CK STIX bundle. Includes the bare-classification emissions in - ``_greynoise_decisions`` and the unconditional emissions in - ``_feodo_decisions`` — those don't appear in the lookup tables - above because they're decision-flow constants, not table entries. + ``_greynoise_decisions`` (T1595 for ``classification == "scanner"``, + T1071 for bare ``"malicious"``) — those are decision-flow + constants, not YAML rows. The loader itself already validates + YAML-sourced IDs at load; this fold-in covers the in-code + constants too. """ ids: set[str] = set() - for techs in _ABUSEIPDB_CATEGORY_TO_TECHNIQUES.values(): - ids.update(techs) - for techs in _GREYNOISE_TAG_TO_TECHNIQUES.values(): - ids.update(techs) - for techs in _THREATFOX_THREAT_TYPE_TO_TECHNIQUES.values(): - ids.update(techs) - # Decision-flow constants (see _greynoise_decisions, _feodo_decisions). - ids.update({"T1071", "T1595", "T1588"}) + for provider in ("abuseipdb", "greynoise", "feodo", "threatfox"): + ids.update(_mapping(provider).all_technique_ids()) + # Decision-flow constants (see _greynoise_decisions). + ids.update({"T1071", "T1595"}) return frozenset(ids) diff --git a/tests/ttp/test_intel_mappings.py b/tests/ttp/test_intel_mappings.py new file mode 100644 index 00000000..31b3785a --- /dev/null +++ b/tests/ttp/test_intel_mappings.py @@ -0,0 +1,254 @@ +"""YAML intel-provider mappings reproduce the legacy dicts byte-for-byte. + +Snapshot equivalence test: the dicts that used to live in +``decnet/ttp/impl/intel_lifter.py`` are mirrored here as ground +truth. If a future YAML edit drops or adds a category/tag/threat-type +mapping, this test catches it. The same dicts are deleted from the +lifter — they live ONLY here, as the regression net. + +Also covers: +* every technique referenced in every YAML resolves in the loaded + ATT&CK bundle (the loader does this at load; we just confirm it), +* every signal carries a STIX-shaped ``external_reference``, +* the ``mitre_url`` enrichment is present on every emission whose + technique is in the bundle (i.e. all of them), +* high-score gating (``cat_11``→T1566 only when score≥80) works, +* invalid YAML (unknown technique_id) raises ``AttackBundleError``. +""" +from __future__ import annotations + +from pathlib import Path +from typing import Final + +import pytest +import yaml + +from decnet.ttp import attack_stix +from decnet.ttp.data.intel_loader import ( + ProviderMapping, + clear_cache, + load_provider_mapping, +) + +_REPO_BUNDLE = Path(__file__).resolve().parents[2] / "enterprise-attack-19.0.json" +_DATA_DIR = Path(__file__).resolve().parents[2] / "decnet" / "ttp" / "data" / "intel" + + +# Ground truth — the legacy dicts from intel_lifter.py before the YAML +# extraction. Edit these only when the mapping intentionally changes, +# and update the corresponding YAML in the same commit. +_ABUSEIPDB_LEGACY: Final[dict[int, frozenset[str]]] = { + 5: frozenset({"T1110"}), + 7: frozenset({"T1566"}), + 9: frozenset({"T1090"}), + 11: frozenset({"T1496", "T1566"}), + 13: frozenset({"T1090"}), + 14: frozenset({"T1046", "T1595"}), + 15: frozenset({"T1190"}), + 16: frozenset({"T1190"}), + 17: frozenset({"T1566"}), + 18: frozenset({"T1110"}), + 19: frozenset({"T1595"}), + 20: frozenset({"T1078"}), + 21: frozenset({"T1190"}), + 22: frozenset({"T1110"}), + 23: frozenset({"T1190"}), +} + +_ABUSEIPDB_GATED_LEGACY: Final[dict[int, dict[str, int]]] = { + 11: {"T1566": 80}, +} + +_GREYNOISE_LEGACY: Final[dict[str, frozenset[str]]] = { + "tor_exit_node": frozenset({"T1090"}), + "ssh_bruteforcer": frozenset({"T1110"}), + "web_crawler": frozenset({"T1595"}), + "cobalt_strike": frozenset({"T1071", "T1588"}), + "metasploit": frozenset({"T1071", "T1588"}), + "sliver": frozenset({"T1071", "T1588"}), + "havoc": frozenset({"T1071", "T1588"}), +} + +_THREATFOX_LEGACY: Final[dict[str, frozenset[str]]] = { + "botnet_cc": frozenset({"T1071", "T1588"}), + "payload_delivery": frozenset({"T1105", "T1588"}), + "payload": frozenset({"T1588"}), + "cc_skimming": frozenset({"T1056"}), +} + + +@pytest.fixture(autouse=True) +def _pin_bundle(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + license_path = tmp_path / "LICENSE.txt" + license_path.write_text("placeholder for tests", encoding="utf-8") + monkeypatch.setenv("DECNET_ATTACK_BUNDLE", str(_REPO_BUNDLE)) + monkeypatch.setenv("DECNET_ATTACK_LICENSE", str(license_path)) + attack_stix._data = None + attack_stix._loaded_path = None + attack_stix._attack_pattern_by_id.cache_clear() + attack_stix._tactic_by_id.cache_clear() + attack_stix._tactic_by_short_name.cache_clear() + clear_cache() + + +def _ids_at_full_score(m: ProviderMapping, signal_id: str) -> frozenset[str]: + return frozenset( + e.technique_id for e in m.techniques_for_signal(signal_id, score=100) + ) + + +def test_abuseipdb_yaml_reproduces_legacy_dict() -> None: + m = load_provider_mapping("abuseipdb") + for cat, expected in _ABUSEIPDB_LEGACY.items(): + got = _ids_at_full_score(m, f"cat_{cat}") + assert got == expected, f"cat_{cat}: got {got}, want {expected}" + # No extra signals — full set match. + assert m.signal_ids() == {f"cat_{c}" for c in _ABUSEIPDB_LEGACY} + + +def test_abuseipdb_high_score_gate() -> None: + m = load_provider_mapping("abuseipdb") + # Below threshold: T1566 dropped, T1496 still fires. + below = {e.technique_id for e in m.techniques_for_signal("cat_11", score=50)} + assert below == {"T1496"} + # At threshold and above: both fire. + at = {e.technique_id for e in m.techniques_for_signal("cat_11", score=80)} + assert at == {"T1496", "T1566"} + above = {e.technique_id for e in m.techniques_for_signal("cat_11", score=99)} + assert above == {"T1496", "T1566"} + # Score=None: gated emission filtered (matches legacy: no score → no T1566). + none = {e.technique_id for e in m.techniques_for_signal("cat_11", score=None)} + assert none == {"T1496"} + + +def test_greynoise_yaml_reproduces_legacy_dict() -> None: + m = load_provider_mapping("greynoise") + for tag, expected in _GREYNOISE_LEGACY.items(): + got = _ids_at_full_score(m, tag) + assert got == expected, f"{tag}: got {got}, want {expected}" + assert m.signal_ids() == set(_GREYNOISE_LEGACY) + + +def test_threatfox_yaml_reproduces_legacy_dict() -> None: + m = load_provider_mapping("threatfox") + for tt, expected in _THREATFOX_LEGACY.items(): + got = _ids_at_full_score(m, tt) + assert got == expected, f"{tt}: got {got}, want {expected}" + assert m.signal_ids() == set(_THREATFOX_LEGACY) + + +def test_feodo_yaml_emits_t1071_and_t1588() -> None: + m = load_provider_mapping("feodo") + got = _ids_at_full_score(m, "feodo_listed") + assert got == {"T1071", "T1588"} + + +@pytest.mark.parametrize( + "provider", ["abuseipdb", "greynoise", "feodo", "threatfox"] +) +def test_every_signal_has_external_reference(provider: str) -> None: + m = load_provider_mapping(provider) + for sig in m.signals: + assert sig.external_reference.source_name + assert sig.external_reference.url.startswith("http") + + +@pytest.mark.parametrize( + "provider", ["abuseipdb", "greynoise", "feodo", "threatfox"] +) +def test_every_emission_has_mitre_url(provider: str) -> None: + m = load_provider_mapping(provider) + for sig in m.signals: + for emission in sig.emissions: + assert emission.mitre_url is not None, ( + f"{provider}/{sig.id}/{emission.technique_id} missing mitre_url" + ) + assert emission.mitre_url.startswith( + "https://attack.mitre.org/techniques/" + ) + + +def test_load_unknown_provider_raises() -> None: + with pytest.raises(FileNotFoundError): + load_provider_mapping("does_not_exist") + + +def test_unknown_technique_id_in_yaml_fails_closed(tmp_path: Path) -> None: + bogus = tmp_path / "intel" / "bogus.yaml" + bogus.parent.mkdir(parents=True) + bogus.write_text( + yaml.safe_dump( + { + "provider": "bogus", + "mapping_version": "1", + "attack_release": ">=15.1", + "signals": [ + { + "id": "sig_1", + "label": "Test", + "external_reference": { + "source_name": "test", + "url": "https://example.com", + }, + "techniques": [{"technique_id": "T9999"}], + }, + ], + } + ), + encoding="utf-8", + ) + # Point the loader at the temp file. We do this by patching the + # loader's internal _data_path to resolve to the temp dir for the + # 'bogus' provider only. + from decnet.ttp.data import intel_loader + + original = intel_loader._data_path + + def fake_path(provider: str) -> Path: + return bogus if provider == "bogus" else original(provider) + + intel_loader._data_path = fake_path # type: ignore[assignment] + intel_loader.clear_cache() + try: + with pytest.raises(attack_stix.AttackBundleError) as exc: + load_provider_mapping("bogus") + assert "T9999" in str(exc.value) + finally: + intel_loader._data_path = original # type: ignore[assignment] + intel_loader.clear_cache() + + +def test_yaml_provider_field_must_match_filename(tmp_path: Path) -> None: + """A YAML claiming provider=X loaded from .yaml is rejected — drift catcher.""" + mismatched = tmp_path / "intel" / "abuseipdb.yaml" + mismatched.parent.mkdir(parents=True) + mismatched.write_text( + yaml.safe_dump( + { + "provider": "wrong_name", + "mapping_version": "1", + "attack_release": ">=15.1", + "signals": [], + } + ), + encoding="utf-8", + ) + from decnet.ttp.data import intel_loader + + original = intel_loader._data_path + intel_loader._data_path = lambda _p: mismatched # type: ignore[assignment] + intel_loader.clear_cache() + try: + with pytest.raises(ValueError, match="does not match"): + load_provider_mapping("abuseipdb") + finally: + intel_loader._data_path = original # type: ignore[assignment] + intel_loader.clear_cache() + + +def test_yaml_files_match_directory_listing() -> None: + """Catch a YAML that's been added without a corresponding mapping + or removed without cleanup. Keeps the data dir in sync with the + test parametrize lists.""" + files = sorted(p.stem for p in _DATA_DIR.glob("*.yaml")) + assert files == ["abuseipdb", "feodo", "greynoise", "threatfox"]