feat(ttp): fetch + verify MITRE ATT&CK LICENSE alongside the bundle
MITRE's ATT&CK Terms of Use require reproducing their copyright + license alongside any cached copy of ATT&CK data. Today we ship the bundle but not the license — this commit closes that compliance gap. - attack_version.py pins ATTACK_LICENSE_URL + ATTACK_LICENSE_SHA256 + ATTACK_LICENSE_FILENAME, sourced from the same attack-stix-data repo as the bundle. - attack_stix.py:_fetch_license downloads LICENSE.txt next to the bundle. License sha mismatch is logged + refreshed (license text gets occasional formatting tweaks; not a security event), unlike the bundle which stays fail-closed. - _ensure_license is the compliance ratchet: resolve_bundle_path refuses to return without LICENSE.txt on disk. Override-mode (DECNET_ATTACK_BUNDLE) checks for a sibling LICENSE.txt first, then DECNET_ATTACK_LICENSE, then the cache dir. - python -m decnet.ttp.attack_stix license prints the cached license to stdout for operator audit. - loaded_license_path() exposes the active license path read-only. - tests/ttp/test_attack_license.py covers happy paths (sibling + explicit env), refusal when DECNET_ATTACK_LICENSE points at a missing file, the CLI subcommand, and the pinned-sha shape.
This commit is contained in:
@@ -45,11 +45,15 @@ from decnet.ttp.attack_version import (
|
||||
ATTACK_BUNDLE_SHA256,
|
||||
ATTACK_BUNDLE_URL,
|
||||
ATTACK_BUNDLE_VERSION,
|
||||
ATTACK_LICENSE_FILENAME,
|
||||
ATTACK_LICENSE_SHA256,
|
||||
ATTACK_LICENSE_URL,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_ENV_BUNDLE_PATH: Final[str] = "DECNET_ATTACK_BUNDLE"
|
||||
_ENV_LICENSE_PATH: Final[str] = "DECNET_ATTACK_LICENSE"
|
||||
_ENV_CACHE_DIR: Final[str] = "DECNET_ATTACK_CACHE_DIR"
|
||||
_DEFAULT_CACHE_DIR: Final[Path] = Path.home() / ".cache" / "decnet" / "attack"
|
||||
|
||||
@@ -71,12 +75,20 @@ def _expected_cache_path() -> Path:
|
||||
return _cache_dir() / f"enterprise-attack-{ATTACK_BUNDLE_VERSION}.json"
|
||||
|
||||
|
||||
def _verify_sha256(path: Path) -> None:
|
||||
def _expected_license_path() -> Path:
|
||||
return _cache_dir() / ATTACK_LICENSE_FILENAME
|
||||
|
||||
|
||||
def _sha256(path: Path) -> str:
|
||||
h = hashlib.sha256()
|
||||
with path.open("rb") as f:
|
||||
for chunk in iter(lambda: f.read(1 << 20), b""):
|
||||
h.update(chunk)
|
||||
actual = h.hexdigest()
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def _verify_sha256(path: Path) -> None:
|
||||
actual = _sha256(path)
|
||||
if actual != ATTACK_BUNDLE_SHA256:
|
||||
raise AttackBundleError(
|
||||
f"ATT&CK bundle at {path} sha256={actual} does not match "
|
||||
@@ -85,14 +97,14 @@ def _verify_sha256(path: Path) -> None:
|
||||
)
|
||||
|
||||
|
||||
def _fetch_bundle(target: Path) -> None:
|
||||
def _download(url: str, target: Path, *, label: str) -> None:
|
||||
import requests
|
||||
|
||||
target.parent.mkdir(parents=True, exist_ok=True)
|
||||
logger.info("Fetching ATT&CK bundle %s -> %s", ATTACK_BUNDLE_URL, target)
|
||||
logger.info("Fetching %s %s -> %s", label, url, target)
|
||||
tmp = target.with_suffix(target.suffix + ".part")
|
||||
try:
|
||||
resp = requests.get(ATTACK_BUNDLE_URL, timeout=60, stream=True)
|
||||
resp = requests.get(url, timeout=60, stream=True)
|
||||
resp.raise_for_status()
|
||||
with tmp.open("wb") as f:
|
||||
for chunk in resp.iter_content(1 << 20):
|
||||
@@ -104,8 +116,73 @@ def _fetch_bundle(target: Path) -> None:
|
||||
raise
|
||||
|
||||
|
||||
def _fetch_bundle(target: Path) -> None:
|
||||
_download(ATTACK_BUNDLE_URL, target, label="ATT&CK bundle")
|
||||
|
||||
|
||||
def _fetch_license(target: Path) -> None:
|
||||
"""Fetch MITRE's LICENSE.txt. Hash mismatch is logged + re-fetched, never fail-closed.
|
||||
|
||||
The ATT&CK bundle is fail-closed because a tampered bundle would
|
||||
silently mistag thousands of events. The license is required by
|
||||
the Terms of Use *to be present*; an upstream formatting tweak
|
||||
isn't a security event, so we resync rather than refuse to boot.
|
||||
"""
|
||||
_download(ATTACK_LICENSE_URL, target, label="ATT&CK license")
|
||||
actual = _sha256(target)
|
||||
if actual != ATTACK_LICENSE_SHA256:
|
||||
logger.warning(
|
||||
"ATT&CK LICENSE.txt sha256=%s differs from pinned %s — "
|
||||
"MITRE may have updated the license text. Update "
|
||||
"ATTACK_LICENSE_SHA256 in attack_version.py if intentional.",
|
||||
actual,
|
||||
ATTACK_LICENSE_SHA256,
|
||||
)
|
||||
|
||||
|
||||
def _ensure_license(cache_dir: Path) -> Path:
|
||||
"""Return the path to a present LICENSE.txt, fetching if missing.
|
||||
|
||||
Honors ``DECNET_ATTACK_LICENSE`` for operator-controlled overrides
|
||||
(mirrors ``DECNET_ATTACK_BUNDLE`` for offline / air-gapped installs).
|
||||
Refuses to return without a license file on disk — this is the
|
||||
compliance ratchet enforcing MITRE's Terms of Use.
|
||||
"""
|
||||
override = os.environ.get(_ENV_LICENSE_PATH)
|
||||
if override:
|
||||
path = Path(override)
|
||||
if not path.is_file():
|
||||
raise AttackBundleError(
|
||||
f"{_ENV_LICENSE_PATH}={override} does not point to a file. "
|
||||
"MITRE's ATT&CK Terms of Use require the license to be "
|
||||
"present alongside any cached copy of ATT&CK data."
|
||||
)
|
||||
return path
|
||||
|
||||
license_path = cache_dir / ATTACK_LICENSE_FILENAME
|
||||
if not license_path.is_file():
|
||||
_fetch_license(license_path)
|
||||
if not license_path.is_file():
|
||||
raise AttackBundleError(
|
||||
f"ATT&CK license missing at {license_path}. MITRE's ATT&CK "
|
||||
"Terms of Use require the license to be present alongside "
|
||||
"any cached copy of ATT&CK data. Run "
|
||||
"`python -m decnet.ttp.attack_stix fetch` or set "
|
||||
f"{_ENV_LICENSE_PATH} to an existing LICENSE.txt."
|
||||
)
|
||||
return license_path
|
||||
|
||||
|
||||
def resolve_bundle_path() -> Path:
|
||||
"""Return the verified bundle path, fetching if necessary."""
|
||||
"""Return the verified bundle path, fetching the bundle and LICENSE if necessary.
|
||||
|
||||
Both files must be present on disk before this returns. When
|
||||
``DECNET_ATTACK_BUNDLE`` overrides the bundle path the license
|
||||
must live next to that bundle, or be reachable via
|
||||
``DECNET_ATTACK_LICENSE``. The cache dir is checked first so
|
||||
operator-supplied bundles can still rely on the auto-cached
|
||||
license.
|
||||
"""
|
||||
override = os.environ.get(_ENV_BUNDLE_PATH)
|
||||
if override:
|
||||
path = Path(override)
|
||||
@@ -114,15 +191,44 @@ def resolve_bundle_path() -> Path:
|
||||
f"{_ENV_BUNDLE_PATH}={override} does not point to a file"
|
||||
)
|
||||
_verify_sha256(path)
|
||||
# License must accompany an override bundle. Check next to the
|
||||
# bundle first, then DECNET_ATTACK_LICENSE, then the cache dir
|
||||
# as a last resort.
|
||||
sibling = path.parent / ATTACK_LICENSE_FILENAME
|
||||
if sibling.is_file() or os.environ.get(_ENV_LICENSE_PATH):
|
||||
_ensure_license(path.parent)
|
||||
else:
|
||||
_ensure_license(_cache_dir())
|
||||
return path
|
||||
|
||||
cached = _expected_cache_path()
|
||||
if not cached.is_file():
|
||||
_fetch_bundle(cached)
|
||||
_verify_sha256(cached)
|
||||
_ensure_license(_cache_dir())
|
||||
return cached
|
||||
|
||||
|
||||
def loaded_license_path() -> Path | None:
|
||||
"""Return the path to the on-disk LICENSE.txt this process is operating under.
|
||||
|
||||
Resolution mirrors :func:`_ensure_license` but is read-only — it
|
||||
never fetches. Useful for the ``license`` CLI subcommand and for
|
||||
operators auditing what license text they accepted.
|
||||
"""
|
||||
override = os.environ.get(_ENV_LICENSE_PATH)
|
||||
if override:
|
||||
p = Path(override)
|
||||
return p if p.is_file() else None
|
||||
bundle_override = os.environ.get(_ENV_BUNDLE_PATH)
|
||||
if bundle_override:
|
||||
sibling = Path(bundle_override).parent / ATTACK_LICENSE_FILENAME
|
||||
if sibling.is_file():
|
||||
return sibling
|
||||
cached = _cache_dir() / ATTACK_LICENSE_FILENAME
|
||||
return cached if cached.is_file() else None
|
||||
|
||||
|
||||
def _load() -> MitreAttackData:
|
||||
global _data, _loaded_path
|
||||
with _data_lock:
|
||||
@@ -282,18 +388,22 @@ def assert_known_tactic_ids(
|
||||
|
||||
def _cli_fetch(print_sha: bool) -> int:
|
||||
cached = _expected_cache_path()
|
||||
license_path = _expected_license_path()
|
||||
if not cached.is_file():
|
||||
try:
|
||||
_fetch_bundle(cached)
|
||||
except Exception as exc: # pragma: no cover - network failure path
|
||||
print(f"fetch failed: {exc}", file=sys.stderr)
|
||||
print(f"bundle fetch failed: {exc}", file=sys.stderr)
|
||||
return 1
|
||||
if not license_path.is_file():
|
||||
try:
|
||||
_fetch_license(license_path)
|
||||
except Exception as exc: # pragma: no cover - network failure path
|
||||
print(f"license fetch failed: {exc}", file=sys.stderr)
|
||||
return 1
|
||||
if print_sha:
|
||||
h = hashlib.sha256()
|
||||
with cached.open("rb") as f:
|
||||
for chunk in iter(lambda: f.read(1 << 20), b""):
|
||||
h.update(chunk)
|
||||
print(f"{h.hexdigest()} {cached}")
|
||||
print(f"{_sha256(cached)} {cached}")
|
||||
print(f"{_sha256(license_path)} {license_path}")
|
||||
return 0
|
||||
try:
|
||||
_verify_sha256(cached)
|
||||
@@ -301,6 +411,19 @@ def _cli_fetch(print_sha: bool) -> int:
|
||||
print(str(exc), file=sys.stderr)
|
||||
return 2
|
||||
print(f"OK {cached} (version {ATTACK_BUNDLE_VERSION})")
|
||||
print(f"OK {license_path}")
|
||||
return 0
|
||||
|
||||
|
||||
def _cli_license() -> int:
|
||||
path = loaded_license_path()
|
||||
if path is None:
|
||||
print(
|
||||
"No ATT&CK LICENSE.txt found. Run `python -m decnet.ttp.attack_stix fetch`.",
|
||||
file=sys.stderr,
|
||||
)
|
||||
return 1
|
||||
print(path.read_text(encoding="utf-8"))
|
||||
return 0
|
||||
|
||||
|
||||
@@ -309,15 +432,23 @@ def main(argv: list[str] | None = None) -> int:
|
||||
|
||||
p = argparse.ArgumentParser(prog="python -m decnet.ttp.attack_stix")
|
||||
sub = p.add_subparsers(dest="cmd", required=True)
|
||||
f = sub.add_parser("fetch", help="Fetch and verify the pinned ATT&CK bundle.")
|
||||
f = sub.add_parser(
|
||||
"fetch", help="Fetch + verify the pinned ATT&CK bundle and LICENSE.txt."
|
||||
)
|
||||
f.add_argument(
|
||||
"--print-sha",
|
||||
action="store_true",
|
||||
help="Print sha256 of the cached bundle (for updating attack_version.py).",
|
||||
help="Print sha256 of the cached files (for updating attack_version.py).",
|
||||
)
|
||||
sub.add_parser(
|
||||
"license",
|
||||
help="Print the cached MITRE ATT&CK LICENSE.txt to stdout.",
|
||||
)
|
||||
args = p.parse_args(argv)
|
||||
if args.cmd == "fetch":
|
||||
return _cli_fetch(args.print_sha)
|
||||
if args.cmd == "license":
|
||||
return _cli_license()
|
||||
return 1
|
||||
|
||||
|
||||
@@ -333,6 +464,7 @@ __all__ = [
|
||||
"is_subtechnique",
|
||||
"kill_chain_phases",
|
||||
"loaded_bundle_path",
|
||||
"loaded_license_path",
|
||||
"resolve_bundle_path",
|
||||
"subtechnique_parent_name",
|
||||
"tactic_exists",
|
||||
|
||||
@@ -29,8 +29,30 @@ ATTACK_BUNDLE_URL: Final[str] = (
|
||||
f"/master/enterprise-attack/enterprise-attack-{ATTACK_BUNDLE_VERSION}.json"
|
||||
)
|
||||
|
||||
# MITRE's ATT&CK Terms of Use (https://attack.mitre.org/resources/legal-and-branding/terms-of-use/)
|
||||
# require reproducing their copyright + license alongside any cached
|
||||
# copy of ATT&CK data. The license file lives at the root of the
|
||||
# attack-stix-data repository and is fetched into the same cache dir
|
||||
# as the bundle. ``resolve_bundle_path`` refuses to operate without
|
||||
# this file present — a hard compliance ratchet, not a soft warning.
|
||||
ATTACK_LICENSE_URL: Final[str] = (
|
||||
"https://raw.githubusercontent.com/mitre-attack/attack-stix-data/master/LICENSE.txt"
|
||||
)
|
||||
|
||||
# sha256 of the LICENSE.txt at the time of pinning. License text gets
|
||||
# occasional formatting touch-ups, so a mismatch is logged + refreshed
|
||||
# rather than fail-closed (see _fetch_license in attack_stix.py).
|
||||
ATTACK_LICENSE_SHA256: Final[str] = (
|
||||
"738144f7fb054722a4ef9d3367c51710341dc12fc574c6ac3a41daaaecd8bf5e"
|
||||
)
|
||||
|
||||
ATTACK_LICENSE_FILENAME: Final[str] = "LICENSE.txt"
|
||||
|
||||
__all__ = [
|
||||
"ATTACK_BUNDLE_SHA256",
|
||||
"ATTACK_BUNDLE_URL",
|
||||
"ATTACK_BUNDLE_VERSION",
|
||||
"ATTACK_LICENSE_FILENAME",
|
||||
"ATTACK_LICENSE_SHA256",
|
||||
"ATTACK_LICENSE_URL",
|
||||
]
|
||||
|
||||
131
tests/ttp/test_attack_license.py
Normal file
131
tests/ttp/test_attack_license.py
Normal file
@@ -0,0 +1,131 @@
|
||||
"""MITRE ATT&CK Terms of Use compliance: LICENSE.txt is fetched, verified, and required.
|
||||
|
||||
Bundle and license live side-by-side in the cache dir. The bundle is
|
||||
fail-closed on hash mismatch (drift = mistagging risk); the license
|
||||
is logged-and-refreshed on hash mismatch (drift = MITRE updated the
|
||||
text, not a security event), but its *presence* is mandatory.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from decnet.ttp import attack_stix
|
||||
from decnet.ttp.attack_version import (
|
||||
ATTACK_LICENSE_FILENAME,
|
||||
ATTACK_LICENSE_SHA256,
|
||||
)
|
||||
|
||||
_REPO_BUNDLE = Path(__file__).resolve().parents[2] / "enterprise-attack-19.0.json"
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset_loader_state() -> None:
|
||||
attack_stix._data = None
|
||||
attack_stix._loaded_path = None
|
||||
attack_stix._attack_pattern_by_id.cache_clear()
|
||||
attack_stix._tactic_by_id.cache_clear()
|
||||
attack_stix._tactic_by_short_name.cache_clear()
|
||||
|
||||
|
||||
def _write_dummy_license(path: Path) -> str:
|
||||
text = "placeholder license content for tests"
|
||||
path.write_text(text, encoding="utf-8")
|
||||
return hashlib.sha256(text.encode()).hexdigest()
|
||||
|
||||
|
||||
def test_license_filename_constant() -> None:
|
||||
assert ATTACK_LICENSE_FILENAME == "LICENSE.txt"
|
||||
|
||||
|
||||
def test_resolve_bundle_path_with_override_and_sibling_license(
|
||||
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
"""Operator points DECNET_ATTACK_BUNDLE at a file with LICENSE.txt next to it — happy path."""
|
||||
bundle = tmp_path / "enterprise-attack-19.0.json"
|
||||
bundle.write_bytes(_REPO_BUNDLE.read_bytes())
|
||||
license_path = tmp_path / ATTACK_LICENSE_FILENAME
|
||||
_write_dummy_license(license_path)
|
||||
|
||||
monkeypatch.setenv("DECNET_ATTACK_BUNDLE", str(bundle))
|
||||
monkeypatch.delenv("DECNET_ATTACK_LICENSE", raising=False)
|
||||
# Empty cache dir so override-mode resolves license from sibling.
|
||||
monkeypatch.setenv("DECNET_ATTACK_CACHE_DIR", str(tmp_path / "cache"))
|
||||
|
||||
resolved = attack_stix.resolve_bundle_path()
|
||||
assert resolved == bundle
|
||||
assert attack_stix.loaded_license_path() == license_path
|
||||
|
||||
|
||||
def test_resolve_bundle_path_via_decnet_attack_license_env(
|
||||
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
"""DECNET_ATTACK_LICENSE points to an arbitrary path — accepted."""
|
||||
bundle = tmp_path / "bundle" / "enterprise-attack-19.0.json"
|
||||
bundle.parent.mkdir()
|
||||
bundle.write_bytes(_REPO_BUNDLE.read_bytes())
|
||||
explicit_license = tmp_path / "license_elsewhere.txt"
|
||||
_write_dummy_license(explicit_license)
|
||||
|
||||
monkeypatch.setenv("DECNET_ATTACK_BUNDLE", str(bundle))
|
||||
monkeypatch.setenv("DECNET_ATTACK_LICENSE", str(explicit_license))
|
||||
monkeypatch.setenv("DECNET_ATTACK_CACHE_DIR", str(tmp_path / "cache"))
|
||||
|
||||
resolved = attack_stix.resolve_bundle_path()
|
||||
assert resolved == bundle
|
||||
assert attack_stix.loaded_license_path() == explicit_license
|
||||
|
||||
|
||||
def test_decnet_attack_license_pointing_to_missing_file_raises(
|
||||
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
bundle = tmp_path / "enterprise-attack-19.0.json"
|
||||
bundle.write_bytes(_REPO_BUNDLE.read_bytes())
|
||||
monkeypatch.setenv("DECNET_ATTACK_BUNDLE", str(bundle))
|
||||
monkeypatch.setenv("DECNET_ATTACK_LICENSE", str(tmp_path / "nope.txt"))
|
||||
monkeypatch.setenv("DECNET_ATTACK_CACHE_DIR", str(tmp_path / "cache"))
|
||||
|
||||
with pytest.raises(attack_stix.AttackBundleError) as exc:
|
||||
attack_stix.resolve_bundle_path()
|
||||
assert "Terms of Use" in str(exc.value)
|
||||
|
||||
|
||||
def test_loaded_license_path_returns_none_when_absent(
|
||||
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
monkeypatch.delenv("DECNET_ATTACK_BUNDLE", raising=False)
|
||||
monkeypatch.delenv("DECNET_ATTACK_LICENSE", raising=False)
|
||||
monkeypatch.setenv("DECNET_ATTACK_CACHE_DIR", str(tmp_path))
|
||||
assert attack_stix.loaded_license_path() is None
|
||||
|
||||
|
||||
def test_pinned_license_sha_matches_repo_committed_text() -> None:
|
||||
"""The pinned hash in attack_version.py is a 64-char lowercase hex sha256."""
|
||||
assert len(ATTACK_LICENSE_SHA256) == 64
|
||||
assert all(c in "0123456789abcdef" for c in ATTACK_LICENSE_SHA256)
|
||||
|
||||
|
||||
def test_cli_license_subcommand_prints_cached_license(
|
||||
tmp_path: Path, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]
|
||||
) -> None:
|
||||
license_path = tmp_path / ATTACK_LICENSE_FILENAME
|
||||
license_path.write_text("MITRE Corporation grants you a license\n", encoding="utf-8")
|
||||
monkeypatch.setenv("DECNET_ATTACK_LICENSE", str(license_path))
|
||||
|
||||
rc = attack_stix.main(["license"])
|
||||
assert rc == 0
|
||||
out = capsys.readouterr().out
|
||||
assert "MITRE Corporation" in out
|
||||
|
||||
|
||||
def test_cli_license_returns_nonzero_when_not_present(
|
||||
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
monkeypatch.delenv("DECNET_ATTACK_LICENSE", raising=False)
|
||||
monkeypatch.delenv("DECNET_ATTACK_BUNDLE", raising=False)
|
||||
monkeypatch.setenv("DECNET_ATTACK_CACHE_DIR", str(tmp_path / "empty"))
|
||||
|
||||
rc = attack_stix.main(["license"])
|
||||
assert rc == 1
|
||||
Reference in New Issue
Block a user