feat(ttp): fail-closed validation that lifter+UKC IDs resolve in ATT&CK bundle
Drift between the technique/tactic IDs hardcoded in the lifters and what the loaded ATT&CK STIX bundle actually contains is silent in the status quo: a renamed-or-retired technique just stops being tagged. Every emission point now has an explicit validator that asserts its IDs resolve in the loaded bundle, called once at TTP-worker boot. - intel_lifter.all_emitted_technique_ids() collects every technique the four provider tables (AbuseIPDB / GreyNoise / Feodo / ThreatFox) plus the decision-flow constants in _greynoise_decisions and _feodo_decisions can emit. validate_against_attack_bundle() runs it through attack_stix.assert_known_technique_ids(). - ukc.validate_against_attack_bundle() asserts every key in ATTACK_TACTIC_TO_UKC resolves, with TA0100..TA0106 documented as _NON_ENTERPRISE_TACTICS (lives in the ICS bundle, not the enterprise bundle DECNET loads). - decnet/ttp/worker.py:run_ttp_worker_loop calls both validators before subscribing to the bus. A bundle-vs-code mismatch refuses to start the worker rather than silently mistagging events. - tests/ttp/test_attack_bundle_validation.py covers the happy path for both validators, the negative path (injected bogus tactic ID raises AttackBundleError), the ICS exemption, and the lone T1078 reference in credential_lifter.
This commit is contained in:
@@ -15,6 +15,7 @@ emits no events for unobservable phases.
|
||||
from __future__ import annotations
|
||||
|
||||
from enum import Enum
|
||||
from typing import Final
|
||||
|
||||
|
||||
class UKCPhase(str, Enum):
|
||||
@@ -138,6 +139,31 @@ ATTACK_TACTIC_TO_UKC: dict[str, UKCPhase] = {
|
||||
}
|
||||
|
||||
|
||||
# ICS tactics live in a separate STIX bundle (mitre/ics-attack) that
|
||||
# DECNET does not currently load. They're exempt from the
|
||||
# enterprise-bundle validation in :func:`validate_against_attack_bundle`
|
||||
# so a startup check doesn't false-fail the moment ICS rules are wired.
|
||||
_NON_ENTERPRISE_TACTICS: Final[frozenset[str]] = frozenset(
|
||||
{"TA0100", "TA0102", "TA0105", "TA0106"}
|
||||
)
|
||||
|
||||
|
||||
def validate_against_attack_bundle() -> None:
|
||||
"""Assert every enterprise tactic ID in :data:`ATTACK_TACTIC_TO_UKC` resolves in the loaded STIX bundle.
|
||||
|
||||
Called at startup (see :mod:`decnet.ttp.impl.rule_engine`) so a
|
||||
typoed tactic ID surfaces as a fail-closed boot, not a silent
|
||||
miss in campaign rollups.
|
||||
"""
|
||||
from decnet.ttp.attack_stix import assert_known_tactic_ids
|
||||
|
||||
assert_known_tactic_ids(
|
||||
list(ATTACK_TACTIC_TO_UKC.keys()),
|
||||
source="decnet.clustering.ukc.ATTACK_TACTIC_TO_UKC",
|
||||
exempt=set(_NON_ENTERPRISE_TACTICS),
|
||||
)
|
||||
|
||||
|
||||
def tactic_to_ukc_phase(tactic: str) -> UKCPhase | None:
|
||||
"""Map an ATT&CK tactic ID (e.g. ``"TA0001"``) to a :class:`UKCPhase`.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user