feat(ttp): load MITRE ATT&CK from official STIX 2.1 bundle
Replace the hand-maintained TECHNIQUE_NAMES dict (pinned to v15.1) with a runtime loader that reads the official enterprise-attack-N.json STIX bundle. Version bumps now require only updating attack_version.py; sub-technique parents, tactic IDs, and kill-chain phases all come from MITRE's published data. - decnet/ttp/attack_version.py pins version 19.0 + sha256 + URL - decnet/ttp/attack_stix.py is the lazy STIX loader. Resolution order: DECNET_ATTACK_BUNDLE env -> ~/.cache/decnet/attack/ -> fetch from the pinned MITRE GitHub URL. SHA-256 verified before parse; mismatch fails closed. - decnet/ttp/attack_catalog.py collapses to a shim re-exporting technique_name() so the ~9 router/repo call sites don't churn. - python -m decnet.ttp.attack_stix fetch warms the cache and can print sha256 for version-bump workflows. - test_attack_catalog.py now asserts every rule-emitted ID resolves in the loaded bundle (same contract, real source) and exercises the SHA-256-mismatch fail-closed path.
This commit is contained in:
@@ -1,116 +1,19 @@
|
|||||||
"""ATT&CK technique-id → display-name catalogue.
|
"""Backward-compatible shim over :mod:`decnet.ttp.attack_stix`.
|
||||||
|
|
||||||
Pinned to the same ATT&CK release the rule engine emits on
|
Historically this module exposed a hand-maintained
|
||||||
(``v15.1`` per ``decnet/ttp/impl/rule_engine.py:_ATTACK_RELEASE``). The
|
``TECHNIQUE_NAMES`` dict pinned to ATT&CK Enterprise v15.1. Names now
|
||||||
operator UI uses these names to render "T1595 — Active Scanning"
|
come from the official STIX 2.1 bundle loaded by
|
||||||
instead of just "T1595" in the TTPs-observed rollup and the per-tag
|
:mod:`decnet.ttp.attack_stix`; this module preserves the
|
||||||
inspector. Names are the canonical MITRE labels, not author-supplied
|
``technique_name(...)`` import path the rest of DECNET reaches for so
|
||||||
strings on rules — keeping them here means a rule author can't typo a
|
call sites in the web router, repo layer, and per-tag inspector keep
|
||||||
technique name and the entire fleet sees the typo.
|
working unchanged.
|
||||||
|
|
||||||
Bumping ``_ATTACK_RELEASE`` requires reviewing this file: any
|
``TECHNIQUE_NAMES`` is **gone**: there is no static dict to import.
|
||||||
techniques that were renamed need their entries updated in the same
|
Anything that needs an exhaustive list should iterate ATT&CK objects
|
||||||
commit. See TTP_TAGGING.md §"Hard parts §8 ATT&CK matrix drift".
|
through :mod:`decnet.ttp.attack_stix`.
|
||||||
|
|
||||||
Coverage policy: every technique_id / sub_technique_id appearing in
|
|
||||||
``rules/ttp/`` MUST have an entry here. The
|
|
||||||
``tests/ttp/test_attack_catalog.py`` coverage test enforces this so a
|
|
||||||
rule author who adds a new technique gets a loud failure rather than
|
|
||||||
a silent UI fallback.
|
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from typing import Final
|
from decnet.ttp.attack_stix import technique_name
|
||||||
|
|
||||||
# Top-level techniques + sub-techniques referenced by `rules/ttp/`
|
__all__ = ["technique_name"]
|
||||||
# (R0001..R0058). Names from MITRE ATT&CK Enterprise v15.1.
|
|
||||||
TECHNIQUE_NAMES: Final[dict[str, str]] = {
|
|
||||||
# ── Top-level techniques ─────────────────────────────────────────
|
|
||||||
"T1003": "OS Credential Dumping",
|
|
||||||
"T1016": "System Network Configuration Discovery",
|
|
||||||
"T1027": "Obfuscated Files or Information",
|
|
||||||
"T1029": "Scheduled Transfer",
|
|
||||||
"T1033": "System Owner/User Discovery",
|
|
||||||
"T1036": "Masquerading",
|
|
||||||
"T1046": "Network Service Discovery",
|
|
||||||
"T1049": "System Network Connections Discovery",
|
|
||||||
"T1053": "Scheduled Task/Job",
|
|
||||||
"T1056": "Input Capture",
|
|
||||||
"T1059": "Command and Scripting Interpreter",
|
|
||||||
"T1070": "Indicator Removal",
|
|
||||||
"T1071": "Application Layer Protocol",
|
|
||||||
"T1078": "Valid Accounts",
|
|
||||||
"T1082": "System Information Discovery",
|
|
||||||
"T1083": "File and Directory Discovery",
|
|
||||||
"T1087": "Account Discovery",
|
|
||||||
"T1090": "Proxy",
|
|
||||||
"T1098": "Account Manipulation",
|
|
||||||
"T1105": "Ingress Tool Transfer",
|
|
||||||
"T1110": "Brute Force",
|
|
||||||
"T1135": "Network Share Discovery",
|
|
||||||
"T1136": "Create Account",
|
|
||||||
"T1190": "Exploit Public-Facing Application",
|
|
||||||
"T1204": "User Execution",
|
|
||||||
"T1213": "Data from Information Repositories",
|
|
||||||
"T1482": "Domain Trust Discovery",
|
|
||||||
"T1485": "Data Destruction",
|
|
||||||
"T1486": "Data Encrypted for Impact",
|
|
||||||
"T1496": "Resource Hijacking",
|
|
||||||
"T1505": "Server Software Component",
|
|
||||||
"T1548": "Abuse Elevation Control Mechanism",
|
|
||||||
"T1552": "Unsecured Credentials",
|
|
||||||
"T1557": "Adversary-in-the-Middle",
|
|
||||||
"T1566": "Phishing",
|
|
||||||
"T1567": "Exfiltration Over Web Service",
|
|
||||||
"T1586": "Compromise Accounts",
|
|
||||||
"T1588": "Obtain Capabilities",
|
|
||||||
"T1595": "Active Scanning",
|
|
||||||
"T1602": "Data from Configuration Repository",
|
|
||||||
"T1611": "Escape to Host",
|
|
||||||
# ── Sub-techniques ───────────────────────────────────────────────
|
|
||||||
"T1003.008": "OS Credential Dumping: /etc/passwd and /etc/shadow",
|
|
||||||
"T1036.005": "Masquerading: Match Legitimate Name or Location",
|
|
||||||
"T1053.003": "Scheduled Task/Job: Cron",
|
|
||||||
"T1059.004": "Command and Scripting Interpreter: Unix Shell",
|
|
||||||
"T1070.003": "Indicator Removal: Clear Command History",
|
|
||||||
"T1071.001": "Application Layer Protocol: Web Protocols",
|
|
||||||
"T1071.003": "Application Layer Protocol: Mail Protocols",
|
|
||||||
"T1078.001": "Valid Accounts: Default Accounts",
|
|
||||||
"T1087.002": "Account Discovery: Domain Account",
|
|
||||||
"T1098.004": "Account Manipulation: SSH Authorized Keys",
|
|
||||||
"T1110.001": "Brute Force: Password Guessing",
|
|
||||||
"T1110.003": "Brute Force: Password Spraying",
|
|
||||||
"T1110.004": "Brute Force: Credential Stuffing",
|
|
||||||
"T1136.001": "Create Account: Local Account",
|
|
||||||
"T1204.002": "User Execution: Malicious File",
|
|
||||||
"T1505.003": "Server Software Component: Web Shell",
|
|
||||||
"T1548.001": "Abuse Elevation Control Mechanism: Setuid and Setgid",
|
|
||||||
"T1548.003": "Abuse Elevation Control Mechanism: Sudo and Sudo Caching",
|
|
||||||
"T1552.001": "Unsecured Credentials: Credentials In Files",
|
|
||||||
"T1552.007": "Unsecured Credentials: Container API",
|
|
||||||
"T1557.001": "Adversary-in-the-Middle: LLMNR/NBT-NS Poisoning and SMB Relay",
|
|
||||||
"T1566.001": "Phishing: Spearphishing Attachment",
|
|
||||||
"T1566.002": "Phishing: Spearphishing Link",
|
|
||||||
"T1566.003": "Phishing: Spearphishing via Service",
|
|
||||||
"T1586.002": "Compromise Accounts: Email Accounts",
|
|
||||||
"T1588.001": "Obtain Capabilities: Malware",
|
|
||||||
"T1588.002": "Obtain Capabilities: Tool",
|
|
||||||
"T1595.002": "Active Scanning: Vulnerability Scanning",
|
|
||||||
"T1602.002": "Data from Configuration Repository: Network Device Configuration Dump",
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def technique_name(technique_id: str | None) -> str | None:
|
|
||||||
"""Return the canonical ATT&CK display name for *technique_id*.
|
|
||||||
|
|
||||||
``None`` for unknown IDs — the UI falls back to showing the bare
|
|
||||||
ID. Adding a rule that emits an unknown technique should be a
|
|
||||||
deploy-time loud failure (see ``tests/ttp/test_attack_catalog.py``)
|
|
||||||
rather than a silent UI fallback in production.
|
|
||||||
"""
|
|
||||||
if not technique_id:
|
|
||||||
return None
|
|
||||||
return TECHNIQUE_NAMES.get(technique_id)
|
|
||||||
|
|
||||||
|
|
||||||
__all__ = ["TECHNIQUE_NAMES", "technique_name"]
|
|
||||||
|
|||||||
343
decnet/ttp/attack_stix.py
Normal file
343
decnet/ttp/attack_stix.py
Normal file
@@ -0,0 +1,343 @@
|
|||||||
|
"""STIX 2.1 backed MITRE ATT&CK lookups.
|
||||||
|
|
||||||
|
Replaces the hand-maintained technique-name dict that used to live in
|
||||||
|
``decnet/ttp/attack_catalog.py``. Single source of truth is the
|
||||||
|
official ``enterprise-attack-<version>.json`` STIX bundle published by
|
||||||
|
MITRE; consumers (rule engine, intel lifters, web router, frontend
|
||||||
|
rollups) call the small public API below instead of reading raw STIX.
|
||||||
|
|
||||||
|
Bundle resolution order
|
||||||
|
-----------------------
|
||||||
|
|
||||||
|
1. ``DECNET_ATTACK_BUNDLE`` env var (absolute path to a JSON file).
|
||||||
|
2. ``<cache_dir>/enterprise-attack-<version>.json`` where ``<cache_dir>``
|
||||||
|
defaults to ``~/.cache/decnet/attack`` and is overridable with
|
||||||
|
``DECNET_ATTACK_CACHE_DIR``.
|
||||||
|
3. Fetch from :data:`decnet.ttp.attack_version.ATTACK_BUNDLE_URL` into
|
||||||
|
the cache dir.
|
||||||
|
|
||||||
|
In every case the loaded bytes are verified against
|
||||||
|
:data:`decnet.ttp.attack_version.ATTACK_BUNDLE_SHA256` *before* the
|
||||||
|
bundle is parsed. A mismatch raises :class:`AttackBundleError` and the
|
||||||
|
loader refuses to serve queries. This is intentional — drift between
|
||||||
|
DECNET's expected ATT&CK version and what the operator (or a tampered
|
||||||
|
mirror) actually placed on disk would silently mistag thousands of
|
||||||
|
events.
|
||||||
|
|
||||||
|
Lazy-loading: the bundle (~50 MB) is parsed on first call to any
|
||||||
|
public function, never at import time. Tests that don't touch ATT&CK
|
||||||
|
should never pay the cost.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import hashlib
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from functools import lru_cache
|
||||||
|
from pathlib import Path
|
||||||
|
from threading import Lock
|
||||||
|
from typing import Final
|
||||||
|
|
||||||
|
from mitreattack.stix20 import MitreAttackData
|
||||||
|
|
||||||
|
from decnet.ttp.attack_version import (
|
||||||
|
ATTACK_BUNDLE_SHA256,
|
||||||
|
ATTACK_BUNDLE_URL,
|
||||||
|
ATTACK_BUNDLE_VERSION,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_ENV_BUNDLE_PATH: Final[str] = "DECNET_ATTACK_BUNDLE"
|
||||||
|
_ENV_CACHE_DIR: Final[str] = "DECNET_ATTACK_CACHE_DIR"
|
||||||
|
_DEFAULT_CACHE_DIR: Final[Path] = Path.home() / ".cache" / "decnet" / "attack"
|
||||||
|
|
||||||
|
_data_lock = Lock()
|
||||||
|
_data: MitreAttackData | None = None
|
||||||
|
_loaded_path: Path | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class AttackBundleError(RuntimeError):
|
||||||
|
"""Raised when the ATT&CK STIX bundle cannot be loaded or verified."""
|
||||||
|
|
||||||
|
|
||||||
|
def _cache_dir() -> Path:
|
||||||
|
override = os.environ.get(_ENV_CACHE_DIR)
|
||||||
|
return Path(override) if override else _DEFAULT_CACHE_DIR
|
||||||
|
|
||||||
|
|
||||||
|
def _expected_cache_path() -> Path:
|
||||||
|
return _cache_dir() / f"enterprise-attack-{ATTACK_BUNDLE_VERSION}.json"
|
||||||
|
|
||||||
|
|
||||||
|
def _verify_sha256(path: Path) -> None:
|
||||||
|
h = hashlib.sha256()
|
||||||
|
with path.open("rb") as f:
|
||||||
|
for chunk in iter(lambda: f.read(1 << 20), b""):
|
||||||
|
h.update(chunk)
|
||||||
|
actual = h.hexdigest()
|
||||||
|
if actual != ATTACK_BUNDLE_SHA256:
|
||||||
|
raise AttackBundleError(
|
||||||
|
f"ATT&CK bundle at {path} sha256={actual} does not match "
|
||||||
|
f"pinned {ATTACK_BUNDLE_SHA256} (version {ATTACK_BUNDLE_VERSION}). "
|
||||||
|
"Refusing to load — re-fetch or update attack_version.py."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _fetch_bundle(target: Path) -> None:
|
||||||
|
import requests
|
||||||
|
|
||||||
|
target.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
logger.info("Fetching ATT&CK bundle %s -> %s", ATTACK_BUNDLE_URL, target)
|
||||||
|
tmp = target.with_suffix(target.suffix + ".part")
|
||||||
|
try:
|
||||||
|
resp = requests.get(ATTACK_BUNDLE_URL, timeout=60, stream=True)
|
||||||
|
resp.raise_for_status()
|
||||||
|
with tmp.open("wb") as f:
|
||||||
|
for chunk in resp.iter_content(1 << 20):
|
||||||
|
if chunk:
|
||||||
|
f.write(chunk)
|
||||||
|
tmp.replace(target)
|
||||||
|
except Exception:
|
||||||
|
tmp.unlink(missing_ok=True)
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_bundle_path() -> Path:
|
||||||
|
"""Return the verified bundle path, fetching if necessary."""
|
||||||
|
override = os.environ.get(_ENV_BUNDLE_PATH)
|
||||||
|
if override:
|
||||||
|
path = Path(override)
|
||||||
|
if not path.is_file():
|
||||||
|
raise AttackBundleError(
|
||||||
|
f"{_ENV_BUNDLE_PATH}={override} does not point to a file"
|
||||||
|
)
|
||||||
|
_verify_sha256(path)
|
||||||
|
return path
|
||||||
|
|
||||||
|
cached = _expected_cache_path()
|
||||||
|
if not cached.is_file():
|
||||||
|
_fetch_bundle(cached)
|
||||||
|
_verify_sha256(cached)
|
||||||
|
return cached
|
||||||
|
|
||||||
|
|
||||||
|
def _load() -> MitreAttackData:
|
||||||
|
global _data, _loaded_path
|
||||||
|
with _data_lock:
|
||||||
|
if _data is not None:
|
||||||
|
return _data
|
||||||
|
path = resolve_bundle_path()
|
||||||
|
_data = MitreAttackData(str(path))
|
||||||
|
_loaded_path = path
|
||||||
|
logger.info(
|
||||||
|
"Loaded ATT&CK bundle version=%s path=%s",
|
||||||
|
ATTACK_BUNDLE_VERSION,
|
||||||
|
path,
|
||||||
|
)
|
||||||
|
return _data
|
||||||
|
|
||||||
|
|
||||||
|
def loaded_bundle_path() -> Path | None:
|
||||||
|
"""Return the path the bundle was loaded from, or ``None`` if not loaded yet."""
|
||||||
|
return _loaded_path
|
||||||
|
|
||||||
|
|
||||||
|
@lru_cache(maxsize=4096)
|
||||||
|
def _attack_pattern_by_id(technique_id: str) -> dict | None:
|
||||||
|
obj = _load().get_object_by_attack_id(technique_id, "attack-pattern")
|
||||||
|
if obj is None:
|
||||||
|
return None
|
||||||
|
return dict(obj)
|
||||||
|
|
||||||
|
|
||||||
|
@lru_cache(maxsize=64)
|
||||||
|
def _tactic_by_id(tactic_id: str) -> dict | None:
|
||||||
|
obj = _load().get_object_by_attack_id(tactic_id, "x-mitre-tactic")
|
||||||
|
if obj is None:
|
||||||
|
return None
|
||||||
|
return dict(obj)
|
||||||
|
|
||||||
|
|
||||||
|
@lru_cache(maxsize=64)
|
||||||
|
def _tactic_by_short_name(short_name: str) -> dict | None:
|
||||||
|
for obj in _load().get_tactics():
|
||||||
|
if obj.get("x_mitre_shortname") == short_name:
|
||||||
|
return dict(obj)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def technique_name(technique_id: str | None) -> str | None:
|
||||||
|
"""Return the canonical ATT&CK display name for *technique_id*.
|
||||||
|
|
||||||
|
For a sub-technique (``T1059.004``) the parent is prepended so the
|
||||||
|
rendered string matches the historical format
|
||||||
|
``"Command and Scripting Interpreter: Unix Shell"``. ``None`` for
|
||||||
|
unknown IDs — callers (UI, exporter) fall back to showing the bare
|
||||||
|
ID. Drift is caught at startup by
|
||||||
|
:func:`assert_known_technique_ids`, so a ``None`` here in
|
||||||
|
production means an upstream emitted an ID that wasn't on the
|
||||||
|
validation list (likely a hot-loaded rule).
|
||||||
|
"""
|
||||||
|
if not technique_id:
|
||||||
|
return None
|
||||||
|
obj = _attack_pattern_by_id(technique_id)
|
||||||
|
if obj is None:
|
||||||
|
return None
|
||||||
|
name = obj.get("name")
|
||||||
|
if "." not in technique_id or not obj.get("x_mitre_is_subtechnique"):
|
||||||
|
return name
|
||||||
|
parent = subtechnique_parent_name(technique_id)
|
||||||
|
if parent is None:
|
||||||
|
return name
|
||||||
|
return f"{parent}: {name}"
|
||||||
|
|
||||||
|
|
||||||
|
def subtechnique_parent_name(technique_id: str) -> str | None:
|
||||||
|
parents = _load().get_parent_technique_of_subtechnique(
|
||||||
|
_attack_pattern_by_id(technique_id)["id"] # type: ignore[index]
|
||||||
|
)
|
||||||
|
if not parents:
|
||||||
|
return None
|
||||||
|
return parents[0]["object"].name
|
||||||
|
|
||||||
|
|
||||||
|
def is_subtechnique(technique_id: str) -> bool:
|
||||||
|
obj = _attack_pattern_by_id(technique_id)
|
||||||
|
return bool(obj and obj.get("x_mitre_is_subtechnique"))
|
||||||
|
|
||||||
|
|
||||||
|
def tactic_name(tactic_id_or_short_name: str | None) -> str | None:
|
||||||
|
"""Return the tactic display name for either a ``TA0001``-style ID or a kill-chain short name."""
|
||||||
|
if not tactic_id_or_short_name:
|
||||||
|
return None
|
||||||
|
if tactic_id_or_short_name.startswith("TA"):
|
||||||
|
obj = _tactic_by_id(tactic_id_or_short_name)
|
||||||
|
else:
|
||||||
|
obj = _tactic_by_short_name(tactic_id_or_short_name)
|
||||||
|
return obj.get("name") if obj else None
|
||||||
|
|
||||||
|
|
||||||
|
def tactic_id_for_short_name(short_name: str) -> str | None:
|
||||||
|
obj = _tactic_by_short_name(short_name)
|
||||||
|
if obj is None:
|
||||||
|
return None
|
||||||
|
for ref in obj.get("external_references", []):
|
||||||
|
if ref.get("source_name") == "mitre-attack":
|
||||||
|
return ref.get("external_id")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def kill_chain_phases(technique_id: str) -> list[str]:
|
||||||
|
"""Return the kill-chain phase short-names for a technique."""
|
||||||
|
obj = _attack_pattern_by_id(technique_id)
|
||||||
|
if obj is None:
|
||||||
|
return []
|
||||||
|
return [
|
||||||
|
p["phase_name"]
|
||||||
|
for p in obj.get("kill_chain_phases", [])
|
||||||
|
if p.get("kill_chain_name") == "mitre-attack"
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def technique_exists(technique_id: str) -> bool:
|
||||||
|
return _attack_pattern_by_id(technique_id) is not None
|
||||||
|
|
||||||
|
|
||||||
|
def tactic_exists(tactic_id: str) -> bool:
|
||||||
|
return _tactic_by_id(tactic_id) is not None
|
||||||
|
|
||||||
|
|
||||||
|
def assert_known_technique_ids(
|
||||||
|
technique_ids: list[str] | set[str] | tuple[str, ...],
|
||||||
|
*,
|
||||||
|
source: str,
|
||||||
|
) -> None:
|
||||||
|
"""Raise :class:`AttackBundleError` listing any IDs missing from the bundle."""
|
||||||
|
missing = sorted({t for t in technique_ids if t and not technique_exists(t)})
|
||||||
|
if missing:
|
||||||
|
raise AttackBundleError(
|
||||||
|
f"{source}: technique IDs not present in ATT&CK Enterprise "
|
||||||
|
f"v{ATTACK_BUNDLE_VERSION}: {missing}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def assert_known_tactic_ids(
|
||||||
|
tactic_ids: list[str] | set[str] | tuple[str, ...],
|
||||||
|
*,
|
||||||
|
source: str,
|
||||||
|
exempt: set[str] | None = None,
|
||||||
|
) -> None:
|
||||||
|
exempt = exempt or set()
|
||||||
|
missing = sorted(
|
||||||
|
{t for t in tactic_ids if t and t not in exempt and not tactic_exists(t)}
|
||||||
|
)
|
||||||
|
if missing:
|
||||||
|
raise AttackBundleError(
|
||||||
|
f"{source}: tactic IDs not present in ATT&CK Enterprise "
|
||||||
|
f"v{ATTACK_BUNDLE_VERSION}: {missing}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _cli_fetch(print_sha: bool) -> int:
|
||||||
|
cached = _expected_cache_path()
|
||||||
|
if not cached.is_file():
|
||||||
|
try:
|
||||||
|
_fetch_bundle(cached)
|
||||||
|
except Exception as exc: # pragma: no cover - network failure path
|
||||||
|
print(f"fetch failed: {exc}", file=sys.stderr)
|
||||||
|
return 1
|
||||||
|
if print_sha:
|
||||||
|
h = hashlib.sha256()
|
||||||
|
with cached.open("rb") as f:
|
||||||
|
for chunk in iter(lambda: f.read(1 << 20), b""):
|
||||||
|
h.update(chunk)
|
||||||
|
print(f"{h.hexdigest()} {cached}")
|
||||||
|
return 0
|
||||||
|
try:
|
||||||
|
_verify_sha256(cached)
|
||||||
|
except AttackBundleError as exc:
|
||||||
|
print(str(exc), file=sys.stderr)
|
||||||
|
return 2
|
||||||
|
print(f"OK {cached} (version {ATTACK_BUNDLE_VERSION})")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
def main(argv: list[str] | None = None) -> int:
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
p = argparse.ArgumentParser(prog="python -m decnet.ttp.attack_stix")
|
||||||
|
sub = p.add_subparsers(dest="cmd", required=True)
|
||||||
|
f = sub.add_parser("fetch", help="Fetch and verify the pinned ATT&CK bundle.")
|
||||||
|
f.add_argument(
|
||||||
|
"--print-sha",
|
||||||
|
action="store_true",
|
||||||
|
help="Print sha256 of the cached bundle (for updating attack_version.py).",
|
||||||
|
)
|
||||||
|
args = p.parse_args(argv)
|
||||||
|
if args.cmd == "fetch":
|
||||||
|
return _cli_fetch(args.print_sha)
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__": # pragma: no cover
|
||||||
|
raise SystemExit(main())
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"ATTACK_BUNDLE_VERSION",
|
||||||
|
"AttackBundleError",
|
||||||
|
"assert_known_tactic_ids",
|
||||||
|
"assert_known_technique_ids",
|
||||||
|
"is_subtechnique",
|
||||||
|
"kill_chain_phases",
|
||||||
|
"loaded_bundle_path",
|
||||||
|
"resolve_bundle_path",
|
||||||
|
"subtechnique_parent_name",
|
||||||
|
"tactic_exists",
|
||||||
|
"tactic_id_for_short_name",
|
||||||
|
"tactic_name",
|
||||||
|
"technique_exists",
|
||||||
|
"technique_name",
|
||||||
|
]
|
||||||
36
decnet/ttp/attack_version.py
Normal file
36
decnet/ttp/attack_version.py
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
"""Pinned MITRE ATT&CK Enterprise STIX bundle version.
|
||||||
|
|
||||||
|
Bumping ``ATTACK_BUNDLE_VERSION`` is the *only* code change required
|
||||||
|
to track a new ATT&CK release — all technique/tactic names and
|
||||||
|
sub-technique parents are loaded from the bundle at runtime via
|
||||||
|
``decnet.ttp.attack_stix``. The hash is verified after fetch; a
|
||||||
|
mismatch refuses to load (fail-closed, mirroring the bundle-include
|
||||||
|
discipline used elsewhere in DECNET).
|
||||||
|
|
||||||
|
To regenerate the hash after a version bump::
|
||||||
|
|
||||||
|
.311/bin/python -m decnet.ttp.attack_stix fetch --print-sha
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Final
|
||||||
|
|
||||||
|
ATTACK_BUNDLE_VERSION: Final[str] = "19.0"
|
||||||
|
|
||||||
|
# sha256 of the canonical MITRE-published enterprise-attack-19.0.json
|
||||||
|
# from https://github.com/mitre-attack/attack-stix-data.
|
||||||
|
ATTACK_BUNDLE_SHA256: Final[str] = (
|
||||||
|
"df520ea0775a57db7bff760145b02fed89290802913e056b7ed5970b02f3626a"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Raw download URL for the pinned version.
|
||||||
|
ATTACK_BUNDLE_URL: Final[str] = (
|
||||||
|
"https://raw.githubusercontent.com/mitre-attack/attack-stix-data"
|
||||||
|
f"/master/enterprise-attack/enterprise-attack-{ATTACK_BUNDLE_VERSION}.json"
|
||||||
|
)
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"ATTACK_BUNDLE_SHA256",
|
||||||
|
"ATTACK_BUNDLE_URL",
|
||||||
|
"ATTACK_BUNDLE_VERSION",
|
||||||
|
]
|
||||||
@@ -53,6 +53,13 @@ dependencies = [
|
|||||||
# range tracks BEHAVE-INTEGRATION.md §"Versioning".
|
# range tracks BEHAVE-INTEGRATION.md §"Versioning".
|
||||||
"decnet-behave-core>=0.1.0,<0.2",
|
"decnet-behave-core>=0.1.0,<0.2",
|
||||||
"decnet-behave-shell>=0.1.0,<0.2",
|
"decnet-behave-shell>=0.1.0,<0.2",
|
||||||
|
# MITRE ATT&CK: parse the official STIX 2.1 enterprise-attack bundle
|
||||||
|
# instead of hand-maintaining technique/tactic name dicts. stix2
|
||||||
|
# gives typed parsing; mitreattack-python ships MitreAttackData
|
||||||
|
# query helpers (get_objects_by_external_id, get_parent_technique).
|
||||||
|
# See decnet/ttp/attack_stix.py.
|
||||||
|
"stix2>=3.0",
|
||||||
|
"mitreattack-python>=3.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
[project.optional-dependencies]
|
[project.optional-dependencies]
|
||||||
@@ -96,6 +103,7 @@ dev = [
|
|||||||
"gevent>=24.0",
|
"gevent>=24.0",
|
||||||
"pydeps>=3.0.3",
|
"pydeps>=3.0.3",
|
||||||
"types-PyYAML>=6.0",
|
"types-PyYAML>=6.0",
|
||||||
|
"types-requests>=2.33",
|
||||||
"mypy>=1.16,<1.20"
|
"mypy>=1.16,<1.20"
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|||||||
@@ -1,20 +1,43 @@
|
|||||||
"""ATT&CK technique-name catalogue covers every ID emitted by the rule pack.
|
"""Every technique ID emitted by ``rules/ttp/`` must resolve in the loaded ATT&CK STIX bundle.
|
||||||
|
|
||||||
A rule author who adds a new technique to ``rules/ttp/`` must also
|
The shim in :mod:`decnet.ttp.attack_catalog` now reads names from the
|
||||||
update ``decnet/ttp/attack_catalog.py`` in the same commit. Without
|
official MITRE ATT&CK Enterprise STIX bundle (loader at
|
||||||
this test the UI silently falls back to the bare ID for unknown
|
:mod:`decnet.ttp.attack_stix`). This test enforces the same invariant
|
||||||
techniques.
|
that the old hand-maintained dict did — a rule author who adds a
|
||||||
|
technique that isn't in the pinned ATT&CK release gets a loud failure
|
||||||
|
at deploy time rather than a silent UI fallback.
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
from decnet.ttp.attack_catalog import TECHNIQUE_NAMES, technique_name
|
from decnet.ttp import attack_stix
|
||||||
|
from decnet.ttp.attack_catalog import technique_name
|
||||||
|
|
||||||
_RULES_DIR = Path(__file__).resolve().parents[2] / "rules" / "ttp"
|
_RULES_DIR = Path(__file__).resolve().parents[2] / "rules" / "ttp"
|
||||||
|
_REPO_BUNDLE = Path(__file__).resolve().parents[2] / "enterprise-attack-19.0.json"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module", autouse=True)
|
||||||
|
def _use_repo_bundle(monkeypatch_module: pytest.MonkeyPatch) -> None:
|
||||||
|
"""Pin DECNET_ATTACK_BUNDLE to the in-repo copy for hermetic tests."""
|
||||||
|
monkeypatch_module.setenv("DECNET_ATTACK_BUNDLE", str(_REPO_BUNDLE))
|
||||||
|
# Reset the lazy singleton so the env var is honored.
|
||||||
|
attack_stix._data = None
|
||||||
|
attack_stix._loaded_path = None
|
||||||
|
attack_stix._attack_pattern_by_id.cache_clear()
|
||||||
|
attack_stix._tactic_by_id.cache_clear()
|
||||||
|
attack_stix._tactic_by_short_name.cache_clear()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module")
|
||||||
|
def monkeypatch_module() -> pytest.MonkeyPatch:
|
||||||
|
mp = pytest.MonkeyPatch()
|
||||||
|
yield mp
|
||||||
|
mp.undo()
|
||||||
|
|
||||||
|
|
||||||
def _all_technique_ids_in_rule_pack() -> set[str]:
|
def _all_technique_ids_in_rule_pack() -> set[str]:
|
||||||
@@ -31,12 +54,12 @@ def _all_technique_ids_in_rule_pack() -> set[str]:
|
|||||||
return ids
|
return ids
|
||||||
|
|
||||||
|
|
||||||
def test_every_rule_pack_technique_has_a_catalogue_entry() -> None:
|
def test_every_rule_pack_technique_resolves_in_bundle() -> None:
|
||||||
rule_ids = _all_technique_ids_in_rule_pack()
|
rule_ids = _all_technique_ids_in_rule_pack()
|
||||||
missing = sorted(rule_ids - TECHNIQUE_NAMES.keys())
|
missing = sorted(t for t in rule_ids if not attack_stix.technique_exists(t))
|
||||||
assert not missing, (
|
assert not missing, (
|
||||||
"rules/ttp/ emits techniques absent from "
|
f"rules/ttp/ emits techniques absent from ATT&CK Enterprise "
|
||||||
"decnet/ttp/attack_catalog.py: " + ", ".join(missing)
|
f"v{attack_stix.ATTACK_BUNDLE_VERSION}: {missing}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -51,8 +74,37 @@ def test_technique_name_unknown_id_returns_none() -> None:
|
|||||||
assert technique_name("") is None
|
assert technique_name("") is None
|
||||||
|
|
||||||
|
|
||||||
def test_catalogue_entries_are_non_empty_strings() -> None:
|
def test_subtechnique_format_matches_legacy() -> None:
|
||||||
for tid, name in TECHNIQUE_NAMES.items():
|
# Spot-check the historical "Parent: Child" rendering.
|
||||||
assert isinstance(name, str) and name.strip(), (
|
assert technique_name("T1059.004") == (
|
||||||
f"empty / non-string name for {tid!r}: {name!r}"
|
"Command and Scripting Interpreter: Unix Shell"
|
||||||
|
)
|
||||||
|
assert technique_name("T1110.001") == "Brute Force: Password Guessing"
|
||||||
|
|
||||||
|
|
||||||
|
def test_assert_known_technique_ids_raises_on_missing() -> None:
|
||||||
|
with pytest.raises(attack_stix.AttackBundleError) as exc:
|
||||||
|
attack_stix.assert_known_technique_ids(
|
||||||
|
["T1059", "T9999"], source="test_assert"
|
||||||
)
|
)
|
||||||
|
assert "T9999" in str(exc.value)
|
||||||
|
assert "T1059" not in str(exc.value)
|
||||||
|
|
||||||
|
|
||||||
|
def test_tactic_lookup_by_id_and_short_name() -> None:
|
||||||
|
assert attack_stix.tactic_name("TA0001") == "Initial Access"
|
||||||
|
assert attack_stix.tactic_name("initial-access") == "Initial Access"
|
||||||
|
assert attack_stix.tactic_id_for_short_name("initial-access") == "TA0001"
|
||||||
|
assert attack_stix.tactic_exists("TA0001")
|
||||||
|
assert not attack_stix.tactic_exists("TA9999")
|
||||||
|
|
||||||
|
|
||||||
|
def test_sha256_mismatch_refuses_to_load(
|
||||||
|
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||||
|
) -> None:
|
||||||
|
bogus = tmp_path / "enterprise-attack-19.0.json"
|
||||||
|
bogus.write_bytes(b'{"type":"bundle","id":"bundle--x","objects":[]}')
|
||||||
|
monkeypatch.setenv("DECNET_ATTACK_BUNDLE", str(bogus))
|
||||||
|
with pytest.raises(attack_stix.AttackBundleError) as exc:
|
||||||
|
attack_stix._verify_sha256(bogus)
|
||||||
|
assert "does not match" in str(exc.value)
|
||||||
|
|||||||
Reference in New Issue
Block a user