feat(ttp): add mitre_url_for + groups_using_technique helpers
Two reusable bundle-derived lookups that the next two commits build on: - mitre_url_for(tid) returns the canonical attack.mitre.org URL by reading external_references on the cached attack-pattern. Backed by the existing lru-cached _attack_pattern_by_id so per-call cost is constant. Handles top-level techniques and sub-techniques (T1059.004 -> .../techniques/T1059/004). - GroupRef + groups_using_technique(tid) surface the intrusion-set reverse index from the loaded bundle: given a technique, return the MITRE-tracked groups documented as using it. Sorted by group_id for deterministic responses; lru-cached. Sub-technique semantics match ATT&CK Navigator (do NOT auto-union with parent). - decnet/ttp/data/intel_loader._mitre_url_for collapses to a thin re-export of attack_stix.mitre_url_for; the loader keeps mitre_url on TechniqueEmission for the eventual STIX export. - tests/ttp/test_attack_url.py covers both helpers: top-level + sub URLs, unknown -> None / (), GroupRef immutability + hashability, deterministic ordering, sub-technique distinct from parent.
This commit is contained in:
@@ -34,6 +34,7 @@ import hashlib
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
from dataclasses import dataclass
|
||||||
from functools import lru_cache
|
from functools import lru_cache
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from threading import Lock
|
from threading import Lock
|
||||||
@@ -347,6 +348,100 @@ def kill_chain_phases(technique_id: str) -> list[str]:
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def mitre_url_for(technique_id: str | None) -> str | None:
|
||||||
|
"""Return the canonical attack.mitre.org URL for *technique_id*, or None.
|
||||||
|
|
||||||
|
Pulled from ``external_references[source_name="mitre-attack"].url``
|
||||||
|
on the cached attack-pattern. Reuses the lru-cached
|
||||||
|
:func:`_attack_pattern_by_id` so per-call cost is constant after
|
||||||
|
first hit. ``None`` for unknown / missing IDs — callers must
|
||||||
|
handle nullability (the column is ``Optional`` everywhere it
|
||||||
|
surfaces).
|
||||||
|
"""
|
||||||
|
if not technique_id:
|
||||||
|
return None
|
||||||
|
obj = _attack_pattern_by_id(technique_id)
|
||||||
|
if obj is None:
|
||||||
|
return None
|
||||||
|
for ref in obj.get("external_references", []):
|
||||||
|
if ref.get("source_name") == "mitre-attack":
|
||||||
|
url = ref.get("url")
|
||||||
|
return url if isinstance(url, str) else None
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class GroupRef:
|
||||||
|
"""A single MITRE ATT&CK ``intrusion-set`` (group) reference.
|
||||||
|
|
||||||
|
Returned by :func:`groups_using_technique` to surface "groups
|
||||||
|
MITRE has documented as using this technique". Read-only —
|
||||||
|
explicitly *not* an attribution claim about a DECNET attacker.
|
||||||
|
"""
|
||||||
|
|
||||||
|
group_id: str # e.g. "G0001"
|
||||||
|
name: str
|
||||||
|
aliases: tuple[str, ...]
|
||||||
|
mitre_url: str | None # https://attack.mitre.org/groups/G0001
|
||||||
|
|
||||||
|
|
||||||
|
def _group_external_id(obj: dict) -> str | None:
|
||||||
|
for ref in obj.get("external_references", []):
|
||||||
|
if ref.get("source_name") == "mitre-attack":
|
||||||
|
ext = ref.get("external_id")
|
||||||
|
return ext if isinstance(ext, str) else None
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _group_mitre_url(obj: dict) -> str | None:
|
||||||
|
for ref in obj.get("external_references", []):
|
||||||
|
if ref.get("source_name") == "mitre-attack":
|
||||||
|
url = ref.get("url")
|
||||||
|
return url if isinstance(url, str) else None
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
@lru_cache(maxsize=4096)
|
||||||
|
def groups_using_technique(technique_id: str) -> tuple[GroupRef, ...]:
|
||||||
|
"""Groups MITRE has documented as using *technique_id* — exact-match, deterministic order.
|
||||||
|
|
||||||
|
Sub-techniques are queried directly and do **not** union their
|
||||||
|
parent's groups (matching ATT&CK Navigator semantics). Callers
|
||||||
|
that want a broader view can resolve the parent themselves via
|
||||||
|
:func:`subtechnique_parent_name`.
|
||||||
|
|
||||||
|
Returns an empty tuple if the technique is unknown or has no
|
||||||
|
``uses`` relationships in the loaded bundle. Groups are sorted
|
||||||
|
by group_id ascending so JSON responses are stable across runs.
|
||||||
|
"""
|
||||||
|
if not technique_id:
|
||||||
|
return ()
|
||||||
|
obj = _attack_pattern_by_id(technique_id)
|
||||||
|
if obj is None:
|
||||||
|
return ()
|
||||||
|
raw = _load().get_groups_using_technique(obj["id"])
|
||||||
|
refs: list[GroupRef] = []
|
||||||
|
for entry in raw:
|
||||||
|
# mitreattack-python returns [{"object": IntrusionSet, "relationships": [...]}]
|
||||||
|
sdo = entry.get("object") if isinstance(entry, dict) else entry
|
||||||
|
if sdo is None:
|
||||||
|
continue
|
||||||
|
gid = _group_external_id(sdo)
|
||||||
|
if gid is None:
|
||||||
|
continue
|
||||||
|
aliases = sdo.get("aliases") or ()
|
||||||
|
refs.append(
|
||||||
|
GroupRef(
|
||||||
|
group_id=gid,
|
||||||
|
name=sdo.get("name", gid),
|
||||||
|
aliases=tuple(a for a in aliases if isinstance(a, str)),
|
||||||
|
mitre_url=_group_mitre_url(sdo),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
refs.sort(key=lambda g: g.group_id)
|
||||||
|
return tuple(refs)
|
||||||
|
|
||||||
|
|
||||||
def technique_exists(technique_id: str) -> bool:
|
def technique_exists(technique_id: str) -> bool:
|
||||||
return _attack_pattern_by_id(technique_id) is not None
|
return _attack_pattern_by_id(technique_id) is not None
|
||||||
|
|
||||||
@@ -463,8 +558,11 @@ __all__ = [
|
|||||||
"assert_known_technique_ids",
|
"assert_known_technique_ids",
|
||||||
"is_subtechnique",
|
"is_subtechnique",
|
||||||
"kill_chain_phases",
|
"kill_chain_phases",
|
||||||
|
"GroupRef",
|
||||||
|
"groups_using_technique",
|
||||||
"loaded_bundle_path",
|
"loaded_bundle_path",
|
||||||
"loaded_license_path",
|
"loaded_license_path",
|
||||||
|
"mitre_url_for",
|
||||||
"resolve_bundle_path",
|
"resolve_bundle_path",
|
||||||
"subtechnique_parent_name",
|
"subtechnique_parent_name",
|
||||||
"tactic_exists",
|
"tactic_exists",
|
||||||
|
|||||||
@@ -145,13 +145,13 @@ class ProviderMapping:
|
|||||||
|
|
||||||
|
|
||||||
def _mitre_url_for(technique_id: str) -> str | None:
|
def _mitre_url_for(technique_id: str) -> str | None:
|
||||||
obj = attack_stix._attack_pattern_by_id(technique_id)
|
"""Compatibility shim — collapsed to a re-export of :func:`attack_stix.mitre_url_for`.
|
||||||
if obj is None:
|
|
||||||
return None
|
Public callers should import :func:`decnet.ttp.attack_stix.mitre_url_for`
|
||||||
for ref in obj.get("external_references", []):
|
directly. Kept here so the in-tree loader stays self-contained
|
||||||
if ref.get("source_name") == "mitre-attack":
|
when someone reads it cold.
|
||||||
return ref.get("url")
|
"""
|
||||||
return None
|
return attack_stix.mitre_url_for(technique_id)
|
||||||
|
|
||||||
|
|
||||||
def _data_path(provider: str) -> Path:
|
def _data_path(provider: str) -> Path:
|
||||||
|
|||||||
107
tests/ttp/test_attack_url.py
Normal file
107
tests/ttp/test_attack_url.py
Normal file
@@ -0,0 +1,107 @@
|
|||||||
|
"""``attack_stix.mitre_url_for`` and ``groups_using_technique`` happy/sad paths.
|
||||||
|
|
||||||
|
These are the bundle-derived helpers Phase 3 wires into the
|
||||||
|
TTPTag column and the new groups endpoint. Tests pin against the
|
||||||
|
in-repo bundle (DECNET_ATTACK_BUNDLE) so they run hermetically and
|
||||||
|
the spot-check assertions stay tolerant of minor across-version
|
||||||
|
re-namings.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from decnet.ttp import attack_stix
|
||||||
|
|
||||||
|
_REPO_BUNDLE = Path(__file__).resolve().parents[2] / "enterprise-attack-19.0.json"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _pin_bundle(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
|
||||||
|
license_path = tmp_path / "LICENSE.txt"
|
||||||
|
license_path.write_text("placeholder", encoding="utf-8")
|
||||||
|
monkeypatch.setenv("DECNET_ATTACK_BUNDLE", str(_REPO_BUNDLE))
|
||||||
|
monkeypatch.setenv("DECNET_ATTACK_LICENSE", str(license_path))
|
||||||
|
attack_stix._data = None
|
||||||
|
attack_stix._loaded_path = None
|
||||||
|
attack_stix._attack_pattern_by_id.cache_clear()
|
||||||
|
attack_stix._tactic_by_id.cache_clear()
|
||||||
|
attack_stix._tactic_by_short_name.cache_clear()
|
||||||
|
attack_stix.groups_using_technique.cache_clear()
|
||||||
|
|
||||||
|
|
||||||
|
def test_mitre_url_for_top_level_technique() -> None:
|
||||||
|
assert (
|
||||||
|
attack_stix.mitre_url_for("T1059")
|
||||||
|
== "https://attack.mitre.org/techniques/T1059"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_mitre_url_for_subtechnique() -> None:
|
||||||
|
assert (
|
||||||
|
attack_stix.mitre_url_for("T1059.004")
|
||||||
|
== "https://attack.mitre.org/techniques/T1059/004"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("bad", [None, "", "T9999", "not-a-technique"])
|
||||||
|
def test_mitre_url_for_returns_none_for_unknown(bad: str | None) -> None:
|
||||||
|
assert attack_stix.mitre_url_for(bad) is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_groups_using_technique_returns_grouprefs() -> None:
|
||||||
|
groups = attack_stix.groups_using_technique("T1059")
|
||||||
|
assert len(groups) >= 5
|
||||||
|
sample = groups[0]
|
||||||
|
assert isinstance(sample, attack_stix.GroupRef)
|
||||||
|
assert sample.group_id.startswith("G")
|
||||||
|
assert sample.name
|
||||||
|
assert sample.mitre_url and sample.mitre_url.startswith(
|
||||||
|
"https://attack.mitre.org/groups/G"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_groups_using_technique_is_sorted_by_group_id() -> None:
|
||||||
|
groups = attack_stix.groups_using_technique("T1059")
|
||||||
|
ids = [g.group_id for g in groups]
|
||||||
|
assert ids == sorted(ids), f"groups not sorted by group_id: {ids}"
|
||||||
|
|
||||||
|
|
||||||
|
def test_groups_using_technique_aliases_populated_for_at_least_one() -> None:
|
||||||
|
groups = attack_stix.groups_using_technique("T1059")
|
||||||
|
# Some MITRE groups have rich alias lists; assert at least one
|
||||||
|
# group surfaces aliases. Bundle-version-tolerant: we don't pin
|
||||||
|
# the alias text, just that the field is populated somewhere.
|
||||||
|
assert any(len(g.aliases) >= 2 for g in groups)
|
||||||
|
|
||||||
|
|
||||||
|
def test_groups_using_technique_subtechnique_distinct_from_parent() -> None:
|
||||||
|
"""T1059.004 (Unix Shell) has fewer attributed groups than the abstract T1059.
|
||||||
|
|
||||||
|
Sub-technique semantics: ATT&CK tracks group attribution
|
||||||
|
independently for sub-techniques. We do NOT auto-union with the
|
||||||
|
parent (matches Navigator behavior).
|
||||||
|
"""
|
||||||
|
parent = attack_stix.groups_using_technique("T1059")
|
||||||
|
sub = attack_stix.groups_using_technique("T1059.004")
|
||||||
|
assert len(sub) >= 1
|
||||||
|
assert len(sub) <= len(parent)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("bad", ["", "T9999", "not-a-technique"])
|
||||||
|
def test_groups_using_technique_unknown_returns_empty(bad: str) -> None:
|
||||||
|
assert attack_stix.groups_using_technique(bad) == ()
|
||||||
|
|
||||||
|
|
||||||
|
def test_groupref_is_frozen_and_hashable() -> None:
|
||||||
|
g = attack_stix.GroupRef(
|
||||||
|
group_id="G0001",
|
||||||
|
name="Test",
|
||||||
|
aliases=("Test",),
|
||||||
|
mitre_url=None,
|
||||||
|
)
|
||||||
|
with pytest.raises(Exception):
|
||||||
|
g.name = "other" # type: ignore[misc]
|
||||||
|
# Hashable so we can put GroupRef in a set if a caller wants.
|
||||||
|
assert hash(g)
|
||||||
Reference in New Issue
Block a user