From 432057f44a581b2f4b50eaa96de70ea0ba611211 Mon Sep 17 00:00:00 2001 From: anti Date: Sat, 9 May 2026 05:58:06 -0400 Subject: [PATCH] feat(ttp): fail-closed validation that lifter+UKC IDs resolve in ATT&CK bundle Drift between the technique/tactic IDs hardcoded in the lifters and what the loaded ATT&CK STIX bundle actually contains is silent in the status quo: a renamed-or-retired technique just stops being tagged. Every emission point now has an explicit validator that asserts its IDs resolve in the loaded bundle, called once at TTP-worker boot. - intel_lifter.all_emitted_technique_ids() collects every technique the four provider tables (AbuseIPDB / GreyNoise / Feodo / ThreatFox) plus the decision-flow constants in _greynoise_decisions and _feodo_decisions can emit. validate_against_attack_bundle() runs it through attack_stix.assert_known_technique_ids(). - ukc.validate_against_attack_bundle() asserts every key in ATTACK_TACTIC_TO_UKC resolves, with TA0100..TA0106 documented as _NON_ENTERPRISE_TACTICS (lives in the ICS bundle, not the enterprise bundle DECNET loads). - decnet/ttp/worker.py:run_ttp_worker_loop calls both validators before subscribing to the bus. A bundle-vs-code mismatch refuses to start the worker rather than silently mistagging events. - tests/ttp/test_attack_bundle_validation.py covers the happy path for both validators, the negative path (injected bogus tactic ID raises AttackBundleError), the ICS exemption, and the lone T1078 reference in credential_lifter. --- decnet/clustering/ukc.py | 26 +++++++ decnet/ttp/impl/intel_lifter.py | 35 +++++++++- decnet/ttp/worker.py | 15 ++++ tests/ttp/test_attack_bundle_validation.py | 79 ++++++++++++++++++++++ 4 files changed, 154 insertions(+), 1 deletion(-) create mode 100644 tests/ttp/test_attack_bundle_validation.py diff --git a/decnet/clustering/ukc.py b/decnet/clustering/ukc.py index 5e603a31..73cd79de 100644 --- a/decnet/clustering/ukc.py +++ b/decnet/clustering/ukc.py @@ -15,6 +15,7 @@ emits no events for unobservable phases. from __future__ import annotations from enum import Enum +from typing import Final class UKCPhase(str, Enum): @@ -138,6 +139,31 @@ ATTACK_TACTIC_TO_UKC: dict[str, UKCPhase] = { } +# ICS tactics live in a separate STIX bundle (mitre/ics-attack) that +# DECNET does not currently load. They're exempt from the +# enterprise-bundle validation in :func:`validate_against_attack_bundle` +# so a startup check doesn't false-fail the moment ICS rules are wired. +_NON_ENTERPRISE_TACTICS: Final[frozenset[str]] = frozenset( + {"TA0100", "TA0102", "TA0105", "TA0106"} +) + + +def validate_against_attack_bundle() -> None: + """Assert every enterprise tactic ID in :data:`ATTACK_TACTIC_TO_UKC` resolves in the loaded STIX bundle. + + Called at startup (see :mod:`decnet.ttp.impl.rule_engine`) so a + typoed tactic ID surfaces as a fail-closed boot, not a silent + miss in campaign rollups. + """ + from decnet.ttp.attack_stix import assert_known_tactic_ids + + assert_known_tactic_ids( + list(ATTACK_TACTIC_TO_UKC.keys()), + source="decnet.clustering.ukc.ATTACK_TACTIC_TO_UKC", + exempt=set(_NON_ENTERPRISE_TACTICS), + ) + + def tactic_to_ukc_phase(tactic: str) -> UKCPhase | None: """Map an ATT&CK tactic ID (e.g. ``"TA0001"``) to a :class:`UKCPhase`. diff --git a/decnet/ttp/impl/intel_lifter.py b/decnet/ttp/impl/intel_lifter.py index b4c0bff9..d920c231 100644 --- a/decnet/ttp/impl/intel_lifter.py +++ b/decnet/ttp/impl/intel_lifter.py @@ -375,7 +375,40 @@ def _emit_filtered( return out -__all__ = ["IntelLifter"] +def all_emitted_technique_ids() -> frozenset[str]: + """Every technique ID this lifter could emit, drawn from all four provider tables. + + Used by :func:`validate_against_attack_bundle` (and + :mod:`tests.ttp.test_attack_catalog`-adjacent tests) to assert that + every provider-driven emission resolves in the loaded ATT&CK STIX + bundle. Includes the bare-classification emissions in + ``_greynoise_decisions`` and the unconditional emissions in + ``_feodo_decisions`` — those don't appear in the lookup tables + above because they're decision-flow constants, not table entries. + """ + ids: set[str] = set() + for techs in _ABUSEIPDB_CATEGORY_TO_TECHNIQUES.values(): + ids.update(techs) + for techs in _GREYNOISE_TAG_TO_TECHNIQUES.values(): + ids.update(techs) + for techs in _THREATFOX_THREAT_TYPE_TO_TECHNIQUES.values(): + ids.update(techs) + # Decision-flow constants (see _greynoise_decisions, _feodo_decisions). + ids.update({"T1071", "T1595", "T1588"}) + return frozenset(ids) + + +def validate_against_attack_bundle() -> None: + """Assert every technique ID this lifter could emit resolves in the loaded ATT&CK STIX bundle.""" + from decnet.ttp.attack_stix import assert_known_technique_ids + + assert_known_technique_ids( + list(all_emitted_technique_ids()), + source="decnet.ttp.impl.intel_lifter", + ) + + +__all__ = ["IntelLifter", "all_emitted_technique_ids", "validate_against_attack_bundle"] # Suppress unused-import lint; emit_tags is exposed for parity with the diff --git a/decnet/ttp/worker.py b/decnet/ttp/worker.py index 886bf9df..ac5177fe 100644 --- a/decnet/ttp/worker.py +++ b/decnet/ttp/worker.py @@ -248,6 +248,21 @@ async def run_ttp_worker_loop( """ if tagger is None: tagger = get_tagger() + + # Fail closed at boot if any technique/tactic the worker can emit + # is missing from the loaded ATT&CK STIX bundle. The bundle is the + # canonical source of truth (see decnet/ttp/attack_stix.py) — drift + # between the pinned version and what the lifters reference would + # silently mistag thousands of events. We run this once per worker + # process; the underlying bundle load is itself memoised. + from decnet.clustering.ukc import validate_against_attack_bundle as _validate_ukc + from decnet.ttp.impl.intel_lifter import ( + validate_against_attack_bundle as _validate_intel, + ) + + _validate_intel() + _validate_ukc() + log.info( "ttp worker started tagger=%s poll_interval_secs=%s topics=%d", tagger.name, poll_interval_secs, len(_TOPICS), diff --git a/tests/ttp/test_attack_bundle_validation.py b/tests/ttp/test_attack_bundle_validation.py new file mode 100644 index 00000000..451fd714 --- /dev/null +++ b/tests/ttp/test_attack_bundle_validation.py @@ -0,0 +1,79 @@ +"""Boot-time ATT&CK bundle validation for lifters and the UKC tactic map. + +Mirrors what :func:`decnet.ttp.worker.run_ttp_worker_loop` runs at +startup so a CI run catches the same drift the worker would refuse to +boot on. The two validators (``intel_lifter.validate_against_attack_bundle`` +and ``ukc.validate_against_attack_bundle``) are the entry points; this +module also asserts the negative path (a typoed ID inside the +collection function raises :class:`AttackBundleError`) so a future +refactor that loses the assertion fails loudly here rather than in +production. +""" +from __future__ import annotations + +from pathlib import Path + +import pytest + +from decnet.clustering import ukc +from decnet.ttp import attack_stix +from decnet.ttp.impl import intel_lifter + +_REPO_BUNDLE = Path(__file__).resolve().parents[2] / "enterprise-attack-19.0.json" + + +@pytest.fixture(autouse=True) +def _pin_bundle(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("DECNET_ATTACK_BUNDLE", str(_REPO_BUNDLE)) + attack_stix._data = None + attack_stix._loaded_path = None + attack_stix._attack_pattern_by_id.cache_clear() + attack_stix._tactic_by_id.cache_clear() + attack_stix._tactic_by_short_name.cache_clear() + + +def test_intel_lifter_emissions_resolve_in_bundle() -> None: + intel_lifter.validate_against_attack_bundle() + + +def test_intel_lifter_emission_set_is_complete() -> None: + ids = intel_lifter.all_emitted_technique_ids() + # Decision-flow constants should be present even though they don't + # appear in the lookup tables (see _greynoise_decisions / + # _feodo_decisions). + assert {"T1071", "T1595", "T1588"}.issubset(ids) + # Spot-check at least one entry from each table. + assert "T1110" in ids # AbuseIPDB cat 5/22 + assert "T1090" in ids # GreyNoise tor_exit_node + assert "T1056" in ids # ThreatFox cc_skimming + + +def test_ukc_tactic_map_resolves_in_bundle() -> None: + ukc.validate_against_attack_bundle() + + +def test_ukc_ics_tactics_are_exempt_from_validation() -> None: + # ICS tactics aren't in the enterprise bundle, but the validator + # tolerates them via the _NON_ENTERPRISE_TACTICS exempt set. + assert "TA0100" in ukc._NON_ENTERPRISE_TACTICS + assert not attack_stix.tactic_exists("TA0100") + # And the validator passes (tested above) despite TA0100..TA0106 + # being in ATTACK_TACTIC_TO_UKC. + + +def test_validator_raises_when_unknown_id_injected( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Inject a bogus tactic into the map for the duration of the test. + bogus = "TA9999" + monkeypatch.setitem(ukc.ATTACK_TACTIC_TO_UKC, bogus, ukc.UKCPhase.IMPACT) + with pytest.raises(attack_stix.AttackBundleError) as exc: + ukc.validate_against_attack_bundle() + assert bogus in str(exc.value) + + +def test_credential_lifter_t1078_resolves() -> None: + # credential_lifter has a single hardcoded T1078 reference; cover + # it explicitly so a future ATT&CK release that retires T1078 + # surfaces here as well as in the rule-pack coverage test. + assert attack_stix.technique_exists("T1078")