diff --git a/decnet/ttp/attack_stix.py b/decnet/ttp/attack_stix.py index f040c91c..76c6c710 100644 --- a/decnet/ttp/attack_stix.py +++ b/decnet/ttp/attack_stix.py @@ -45,11 +45,15 @@ from decnet.ttp.attack_version import ( ATTACK_BUNDLE_SHA256, ATTACK_BUNDLE_URL, ATTACK_BUNDLE_VERSION, + ATTACK_LICENSE_FILENAME, + ATTACK_LICENSE_SHA256, + ATTACK_LICENSE_URL, ) logger = logging.getLogger(__name__) _ENV_BUNDLE_PATH: Final[str] = "DECNET_ATTACK_BUNDLE" +_ENV_LICENSE_PATH: Final[str] = "DECNET_ATTACK_LICENSE" _ENV_CACHE_DIR: Final[str] = "DECNET_ATTACK_CACHE_DIR" _DEFAULT_CACHE_DIR: Final[Path] = Path.home() / ".cache" / "decnet" / "attack" @@ -71,12 +75,20 @@ def _expected_cache_path() -> Path: return _cache_dir() / f"enterprise-attack-{ATTACK_BUNDLE_VERSION}.json" -def _verify_sha256(path: Path) -> None: +def _expected_license_path() -> Path: + return _cache_dir() / ATTACK_LICENSE_FILENAME + + +def _sha256(path: Path) -> str: h = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(1 << 20), b""): h.update(chunk) - actual = h.hexdigest() + return h.hexdigest() + + +def _verify_sha256(path: Path) -> None: + actual = _sha256(path) if actual != ATTACK_BUNDLE_SHA256: raise AttackBundleError( f"ATT&CK bundle at {path} sha256={actual} does not match " @@ -85,14 +97,14 @@ def _verify_sha256(path: Path) -> None: ) -def _fetch_bundle(target: Path) -> None: +def _download(url: str, target: Path, *, label: str) -> None: import requests target.parent.mkdir(parents=True, exist_ok=True) - logger.info("Fetching ATT&CK bundle %s -> %s", ATTACK_BUNDLE_URL, target) + logger.info("Fetching %s %s -> %s", label, url, target) tmp = target.with_suffix(target.suffix + ".part") try: - resp = requests.get(ATTACK_BUNDLE_URL, timeout=60, stream=True) + resp = requests.get(url, timeout=60, stream=True) resp.raise_for_status() with tmp.open("wb") as f: for chunk in resp.iter_content(1 << 20): @@ -104,8 +116,73 @@ def _fetch_bundle(target: Path) -> None: raise +def _fetch_bundle(target: Path) -> None: + _download(ATTACK_BUNDLE_URL, target, label="ATT&CK bundle") + + +def _fetch_license(target: Path) -> None: + """Fetch MITRE's LICENSE.txt. Hash mismatch is logged + re-fetched, never fail-closed. + + The ATT&CK bundle is fail-closed because a tampered bundle would + silently mistag thousands of events. The license is required by + the Terms of Use *to be present*; an upstream formatting tweak + isn't a security event, so we resync rather than refuse to boot. + """ + _download(ATTACK_LICENSE_URL, target, label="ATT&CK license") + actual = _sha256(target) + if actual != ATTACK_LICENSE_SHA256: + logger.warning( + "ATT&CK LICENSE.txt sha256=%s differs from pinned %s — " + "MITRE may have updated the license text. Update " + "ATTACK_LICENSE_SHA256 in attack_version.py if intentional.", + actual, + ATTACK_LICENSE_SHA256, + ) + + +def _ensure_license(cache_dir: Path) -> Path: + """Return the path to a present LICENSE.txt, fetching if missing. + + Honors ``DECNET_ATTACK_LICENSE`` for operator-controlled overrides + (mirrors ``DECNET_ATTACK_BUNDLE`` for offline / air-gapped installs). + Refuses to return without a license file on disk — this is the + compliance ratchet enforcing MITRE's Terms of Use. + """ + override = os.environ.get(_ENV_LICENSE_PATH) + if override: + path = Path(override) + if not path.is_file(): + raise AttackBundleError( + f"{_ENV_LICENSE_PATH}={override} does not point to a file. " + "MITRE's ATT&CK Terms of Use require the license to be " + "present alongside any cached copy of ATT&CK data." + ) + return path + + license_path = cache_dir / ATTACK_LICENSE_FILENAME + if not license_path.is_file(): + _fetch_license(license_path) + if not license_path.is_file(): + raise AttackBundleError( + f"ATT&CK license missing at {license_path}. MITRE's ATT&CK " + "Terms of Use require the license to be present alongside " + "any cached copy of ATT&CK data. Run " + "`python -m decnet.ttp.attack_stix fetch` or set " + f"{_ENV_LICENSE_PATH} to an existing LICENSE.txt." + ) + return license_path + + def resolve_bundle_path() -> Path: - """Return the verified bundle path, fetching if necessary.""" + """Return the verified bundle path, fetching the bundle and LICENSE if necessary. + + Both files must be present on disk before this returns. When + ``DECNET_ATTACK_BUNDLE`` overrides the bundle path the license + must live next to that bundle, or be reachable via + ``DECNET_ATTACK_LICENSE``. The cache dir is checked first so + operator-supplied bundles can still rely on the auto-cached + license. + """ override = os.environ.get(_ENV_BUNDLE_PATH) if override: path = Path(override) @@ -114,15 +191,44 @@ def resolve_bundle_path() -> Path: f"{_ENV_BUNDLE_PATH}={override} does not point to a file" ) _verify_sha256(path) + # License must accompany an override bundle. Check next to the + # bundle first, then DECNET_ATTACK_LICENSE, then the cache dir + # as a last resort. + sibling = path.parent / ATTACK_LICENSE_FILENAME + if sibling.is_file() or os.environ.get(_ENV_LICENSE_PATH): + _ensure_license(path.parent) + else: + _ensure_license(_cache_dir()) return path cached = _expected_cache_path() if not cached.is_file(): _fetch_bundle(cached) _verify_sha256(cached) + _ensure_license(_cache_dir()) return cached +def loaded_license_path() -> Path | None: + """Return the path to the on-disk LICENSE.txt this process is operating under. + + Resolution mirrors :func:`_ensure_license` but is read-only — it + never fetches. Useful for the ``license`` CLI subcommand and for + operators auditing what license text they accepted. + """ + override = os.environ.get(_ENV_LICENSE_PATH) + if override: + p = Path(override) + return p if p.is_file() else None + bundle_override = os.environ.get(_ENV_BUNDLE_PATH) + if bundle_override: + sibling = Path(bundle_override).parent / ATTACK_LICENSE_FILENAME + if sibling.is_file(): + return sibling + cached = _cache_dir() / ATTACK_LICENSE_FILENAME + return cached if cached.is_file() else None + + def _load() -> MitreAttackData: global _data, _loaded_path with _data_lock: @@ -282,18 +388,22 @@ def assert_known_tactic_ids( def _cli_fetch(print_sha: bool) -> int: cached = _expected_cache_path() + license_path = _expected_license_path() if not cached.is_file(): try: _fetch_bundle(cached) except Exception as exc: # pragma: no cover - network failure path - print(f"fetch failed: {exc}", file=sys.stderr) + print(f"bundle fetch failed: {exc}", file=sys.stderr) + return 1 + if not license_path.is_file(): + try: + _fetch_license(license_path) + except Exception as exc: # pragma: no cover - network failure path + print(f"license fetch failed: {exc}", file=sys.stderr) return 1 if print_sha: - h = hashlib.sha256() - with cached.open("rb") as f: - for chunk in iter(lambda: f.read(1 << 20), b""): - h.update(chunk) - print(f"{h.hexdigest()} {cached}") + print(f"{_sha256(cached)} {cached}") + print(f"{_sha256(license_path)} {license_path}") return 0 try: _verify_sha256(cached) @@ -301,6 +411,19 @@ def _cli_fetch(print_sha: bool) -> int: print(str(exc), file=sys.stderr) return 2 print(f"OK {cached} (version {ATTACK_BUNDLE_VERSION})") + print(f"OK {license_path}") + return 0 + + +def _cli_license() -> int: + path = loaded_license_path() + if path is None: + print( + "No ATT&CK LICENSE.txt found. Run `python -m decnet.ttp.attack_stix fetch`.", + file=sys.stderr, + ) + return 1 + print(path.read_text(encoding="utf-8")) return 0 @@ -309,15 +432,23 @@ def main(argv: list[str] | None = None) -> int: p = argparse.ArgumentParser(prog="python -m decnet.ttp.attack_stix") sub = p.add_subparsers(dest="cmd", required=True) - f = sub.add_parser("fetch", help="Fetch and verify the pinned ATT&CK bundle.") + f = sub.add_parser( + "fetch", help="Fetch + verify the pinned ATT&CK bundle and LICENSE.txt." + ) f.add_argument( "--print-sha", action="store_true", - help="Print sha256 of the cached bundle (for updating attack_version.py).", + help="Print sha256 of the cached files (for updating attack_version.py).", + ) + sub.add_parser( + "license", + help="Print the cached MITRE ATT&CK LICENSE.txt to stdout.", ) args = p.parse_args(argv) if args.cmd == "fetch": return _cli_fetch(args.print_sha) + if args.cmd == "license": + return _cli_license() return 1 @@ -333,6 +464,7 @@ __all__ = [ "is_subtechnique", "kill_chain_phases", "loaded_bundle_path", + "loaded_license_path", "resolve_bundle_path", "subtechnique_parent_name", "tactic_exists", diff --git a/decnet/ttp/attack_version.py b/decnet/ttp/attack_version.py index c82709d5..692c3565 100644 --- a/decnet/ttp/attack_version.py +++ b/decnet/ttp/attack_version.py @@ -29,8 +29,30 @@ ATTACK_BUNDLE_URL: Final[str] = ( f"/master/enterprise-attack/enterprise-attack-{ATTACK_BUNDLE_VERSION}.json" ) +# MITRE's ATT&CK Terms of Use (https://attack.mitre.org/resources/legal-and-branding/terms-of-use/) +# require reproducing their copyright + license alongside any cached +# copy of ATT&CK data. The license file lives at the root of the +# attack-stix-data repository and is fetched into the same cache dir +# as the bundle. ``resolve_bundle_path`` refuses to operate without +# this file present — a hard compliance ratchet, not a soft warning. +ATTACK_LICENSE_URL: Final[str] = ( + "https://raw.githubusercontent.com/mitre-attack/attack-stix-data/master/LICENSE.txt" +) + +# sha256 of the LICENSE.txt at the time of pinning. License text gets +# occasional formatting touch-ups, so a mismatch is logged + refreshed +# rather than fail-closed (see _fetch_license in attack_stix.py). +ATTACK_LICENSE_SHA256: Final[str] = ( + "738144f7fb054722a4ef9d3367c51710341dc12fc574c6ac3a41daaaecd8bf5e" +) + +ATTACK_LICENSE_FILENAME: Final[str] = "LICENSE.txt" + __all__ = [ "ATTACK_BUNDLE_SHA256", "ATTACK_BUNDLE_URL", "ATTACK_BUNDLE_VERSION", + "ATTACK_LICENSE_FILENAME", + "ATTACK_LICENSE_SHA256", + "ATTACK_LICENSE_URL", ] diff --git a/tests/ttp/test_attack_license.py b/tests/ttp/test_attack_license.py new file mode 100644 index 00000000..b7962582 --- /dev/null +++ b/tests/ttp/test_attack_license.py @@ -0,0 +1,131 @@ +"""MITRE ATT&CK Terms of Use compliance: LICENSE.txt is fetched, verified, and required. + +Bundle and license live side-by-side in the cache dir. The bundle is +fail-closed on hash mismatch (drift = mistagging risk); the license +is logged-and-refreshed on hash mismatch (drift = MITRE updated the +text, not a security event), but its *presence* is mandatory. +""" +from __future__ import annotations + +import hashlib +from pathlib import Path + +import pytest + +from decnet.ttp import attack_stix +from decnet.ttp.attack_version import ( + ATTACK_LICENSE_FILENAME, + ATTACK_LICENSE_SHA256, +) + +_REPO_BUNDLE = Path(__file__).resolve().parents[2] / "enterprise-attack-19.0.json" + + +@pytest.fixture(autouse=True) +def _reset_loader_state() -> None: + attack_stix._data = None + attack_stix._loaded_path = None + attack_stix._attack_pattern_by_id.cache_clear() + attack_stix._tactic_by_id.cache_clear() + attack_stix._tactic_by_short_name.cache_clear() + + +def _write_dummy_license(path: Path) -> str: + text = "placeholder license content for tests" + path.write_text(text, encoding="utf-8") + return hashlib.sha256(text.encode()).hexdigest() + + +def test_license_filename_constant() -> None: + assert ATTACK_LICENSE_FILENAME == "LICENSE.txt" + + +def test_resolve_bundle_path_with_override_and_sibling_license( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """Operator points DECNET_ATTACK_BUNDLE at a file with LICENSE.txt next to it — happy path.""" + bundle = tmp_path / "enterprise-attack-19.0.json" + bundle.write_bytes(_REPO_BUNDLE.read_bytes()) + license_path = tmp_path / ATTACK_LICENSE_FILENAME + _write_dummy_license(license_path) + + monkeypatch.setenv("DECNET_ATTACK_BUNDLE", str(bundle)) + monkeypatch.delenv("DECNET_ATTACK_LICENSE", raising=False) + # Empty cache dir so override-mode resolves license from sibling. + monkeypatch.setenv("DECNET_ATTACK_CACHE_DIR", str(tmp_path / "cache")) + + resolved = attack_stix.resolve_bundle_path() + assert resolved == bundle + assert attack_stix.loaded_license_path() == license_path + + +def test_resolve_bundle_path_via_decnet_attack_license_env( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """DECNET_ATTACK_LICENSE points to an arbitrary path — accepted.""" + bundle = tmp_path / "bundle" / "enterprise-attack-19.0.json" + bundle.parent.mkdir() + bundle.write_bytes(_REPO_BUNDLE.read_bytes()) + explicit_license = tmp_path / "license_elsewhere.txt" + _write_dummy_license(explicit_license) + + monkeypatch.setenv("DECNET_ATTACK_BUNDLE", str(bundle)) + monkeypatch.setenv("DECNET_ATTACK_LICENSE", str(explicit_license)) + monkeypatch.setenv("DECNET_ATTACK_CACHE_DIR", str(tmp_path / "cache")) + + resolved = attack_stix.resolve_bundle_path() + assert resolved == bundle + assert attack_stix.loaded_license_path() == explicit_license + + +def test_decnet_attack_license_pointing_to_missing_file_raises( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + bundle = tmp_path / "enterprise-attack-19.0.json" + bundle.write_bytes(_REPO_BUNDLE.read_bytes()) + monkeypatch.setenv("DECNET_ATTACK_BUNDLE", str(bundle)) + monkeypatch.setenv("DECNET_ATTACK_LICENSE", str(tmp_path / "nope.txt")) + monkeypatch.setenv("DECNET_ATTACK_CACHE_DIR", str(tmp_path / "cache")) + + with pytest.raises(attack_stix.AttackBundleError) as exc: + attack_stix.resolve_bundle_path() + assert "Terms of Use" in str(exc.value) + + +def test_loaded_license_path_returns_none_when_absent( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.delenv("DECNET_ATTACK_BUNDLE", raising=False) + monkeypatch.delenv("DECNET_ATTACK_LICENSE", raising=False) + monkeypatch.setenv("DECNET_ATTACK_CACHE_DIR", str(tmp_path)) + assert attack_stix.loaded_license_path() is None + + +def test_pinned_license_sha_matches_repo_committed_text() -> None: + """The pinned hash in attack_version.py is a 64-char lowercase hex sha256.""" + assert len(ATTACK_LICENSE_SHA256) == 64 + assert all(c in "0123456789abcdef" for c in ATTACK_LICENSE_SHA256) + + +def test_cli_license_subcommand_prints_cached_license( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str] +) -> None: + license_path = tmp_path / ATTACK_LICENSE_FILENAME + license_path.write_text("MITRE Corporation grants you a license\n", encoding="utf-8") + monkeypatch.setenv("DECNET_ATTACK_LICENSE", str(license_path)) + + rc = attack_stix.main(["license"]) + assert rc == 0 + out = capsys.readouterr().out + assert "MITRE Corporation" in out + + +def test_cli_license_returns_nonzero_when_not_present( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.delenv("DECNET_ATTACK_LICENSE", raising=False) + monkeypatch.delenv("DECNET_ATTACK_BUNDLE", raising=False) + monkeypatch.setenv("DECNET_ATTACK_CACHE_DIR", str(tmp_path / "empty")) + + rc = attack_stix.main(["license"]) + assert rc == 1