feat(ttp): promote mitre_url to first-class TTPTag column + propagate everywhere

Phase 2 attached mitre_url to intel-emitted tags' evidence JSON;
Phase 3 promotes it to a real column populated for *every* tag —
intel, credential, behavioral, canary, identity, email, rule-engine —
from one source. Pre-v1, so the SQLModel field is added directly
without an Alembic migration.

- TTPTag gains mitre_url: Optional[str] (not indexed — derived
  deeplink, not a query target; technique_id is already indexed).
- _emit.py and rule_engine._evaluate_rules both populate mitre_url
  via attack_stix.mitre_url_for(sub_technique_id or technique_id).
  Sub-technique URL when present, else parent. The two construction
  sites stay separate because the rule_engine path carries per-emit
  span instrumentation that emit_tags() can't preserve without
  threading a span object through; minimal-change beats forced
  refactor here.
- intel_lifter strips mitre_url from evidence_extra in all four
  decision functions. The column is canonical now; duplicating in
  the JSON column would drift when the bundle moves. The unused
  TechniqueEmission import + tracking dicts removed too.
- IdentityTechniqueRow / TechniqueRollupRow / TTPTagDetailRow /
  CampaignTechniqueRow gain mitre_url: Optional[str].
- sqlmodel_repo/ttp.py:_mitre_url_for added; the 5 row-builder sites
  pass mitre_url=_mitre_url_for(sub_technique_id or technique_id)
  alongside the existing technique_name resolution.
- api_get_tag_details.py needs no change — list_tags_by_scope_and
  _technique already returns model_dump() rows that flow the new
  column through **row spread to TTPTagDetailRow.
- tests/ttp/test_emit_attaches_mitre_url.py covers both construction
  paths (top-level, sub-tech, unknown, multi-emit) and a regression
  test that intel_lifter evidence dicts no longer contain mitre_url.
This commit is contained in:
2026-05-09 06:40:08 -04:00
parent 9675f4bf92
commit 84a075e405
6 changed files with 178 additions and 55 deletions

View File

@@ -11,6 +11,7 @@ from __future__ import annotations
from typing import Any
from decnet.ttp.attack_stix import mitre_url_for
from decnet.ttp.base import TaggerEvent
from decnet.ttp.impl._state import apply_ceiling
from decnet.ttp.impl.rule_engine import _ATTACK_RELEASE, CompiledRule
@@ -61,6 +62,7 @@ def emit_tags(
rule_version=rule.rule_version,
evidence=dict(evidence),
attack_release=_ATTACK_RELEASE,
mitre_url=mitre_url_for(sub_technique_id or technique_id),
))
return out

View File

@@ -23,7 +23,6 @@ from typing import Any, Final
from decnet.ttp.base import TaggerEvent, TolerantTagger
from decnet.ttp.data.intel_loader import (
ProviderMapping,
TechniqueEmission,
load_provider_mapping,
)
from decnet.ttp.impl._emit import emit_tags
@@ -53,22 +52,6 @@ def _mapping(provider: str) -> ProviderMapping:
_GREYNOISE_MALICIOUS_BARE_MULT: Final[float] = 0.5
def _emission_url_extras(
emissions: dict[str, TechniqueEmission],
) -> dict[str, dict[str, str]]:
"""Map technique_id → {"mitre_url": "<url>"} for every emission that has one.
Lets the per-decision-function evidence_extra builders attach the
canonical MITRE URL to each emit slot without re-resolving against
the loaded ATT&CK bundle.
"""
return {
tid: {"mitre_url": e.mitre_url}
for tid, e in emissions.items()
if e.mitre_url
}
# Predicate signature: returns either a list of (technique_id_filter,
# confidence_multiplier, evidence_extra) tuples — one per emit slot the
# rule should fire — or empty list when the rule does not fire.
@@ -92,11 +75,9 @@ def _abuseipdb_decisions(
# Resolve technique set across all categories present, applying
# any per-technique high-score gate (see TechniqueEmission).
triggered: dict[str, list[int]] = {}
emissions_by_tech: dict[str, TechniqueEmission] = {}
for cat in categories:
for emission in mapping.techniques_for_signal(f"cat_{cat}", score=float(score)):
triggered.setdefault(emission.technique_id, []).append(cat)
emissions_by_tech.setdefault(emission.technique_id, emission)
if not triggered:
return []
multiplier = float(score) / 100.0
@@ -104,10 +85,6 @@ def _abuseipdb_decisions(
(tech, multiplier, {
"abuseipdb_categories": cats,
"abuse_confidence_score": int(score),
**(
{"mitre_url": emissions_by_tech[tech].mitre_url}
if emissions_by_tech[tech].mitre_url else {}
),
})
for tech, cats in triggered.items()
]
@@ -136,21 +113,16 @@ def _greynoise_decisions(
# signals that triggered it AND the multiplier to apply (max wins
# if multiple lanes hit the same technique).
triggered: dict[str, tuple[float, list[str]]] = {}
emissions_by_tech: dict[str, TechniqueEmission] = {}
def _bump(
tech: str, mult: float, signal: str, emission: TechniqueEmission | None = None,
) -> None:
def _bump(tech: str, mult: float, signal: str) -> None:
existing = triggered.get(tech)
if existing is None:
triggered[tech] = (mult, [signal])
else:
return
old_mult, signals = existing
signals.append(signal)
if mult > old_mult:
triggered[tech] = (mult, signals)
if emission is not None:
emissions_by_tech.setdefault(tech, emission)
if classification == "scanner":
_bump("T1595", 1.0, "scanner")
@@ -159,7 +131,7 @@ def _greynoise_decisions(
if not isinstance(tag, str):
continue
for emission in mapping.techniques_for_signal(tag):
_bump(emission.technique_id, 1.0, tag, emission)
_bump(emission.technique_id, 1.0, tag)
if classification == "malicious" and "T1071" not in triggered:
_bump("T1071", _GREYNOISE_MALICIOUS_BARE_MULT, "malicious")
if not triggered:
@@ -168,11 +140,6 @@ def _greynoise_decisions(
(tech, mult, {
"greynoise_classification": classification,
"greynoise_tags": signals,
**(
{"mitre_url": emissions_by_tech[tech].mitre_url}
if tech in emissions_by_tech and emissions_by_tech[tech].mitre_url
else {}
),
})
for tech, (mult, signals) in triggered.items()
]
@@ -187,17 +154,14 @@ def _feodo_decisions(
payload.get("feodo_malware_family")
or payload.get("malware_family")
)
base_extra: dict[str, Any] = {"feodo_listed": True}
extra: dict[str, Any] = {"feodo_listed": True}
if isinstance(family, str) and family:
base_extra["malware_family"] = family
extra["malware_family"] = family
mapping = _mapping("feodo")
out: EmitDecision = []
for emission in mapping.techniques_for_signal("feodo_listed"):
extra = dict(base_extra)
if emission.mitre_url:
extra["mitre_url"] = emission.mitre_url
out.append((emission.technique_id, 1.0, extra))
return out
return [
(emission.technique_id, 1.0, dict(extra))
for emission in mapping.techniques_for_signal("feodo_listed")
]
def _threatfox_decisions(
@@ -224,11 +188,9 @@ def _threatfox_decisions(
mapping = _mapping("threatfox")
triggered: dict[str, list[str]] = {}
emissions_by_tech: dict[str, TechniqueEmission] = {}
for tt in threat_types:
for emission in mapping.techniques_for_signal(tt):
triggered.setdefault(emission.technique_id, []).append(tt)
emissions_by_tech.setdefault(emission.technique_id, emission)
if not triggered:
return []
@@ -252,10 +214,6 @@ def _threatfox_decisions(
"threat_types": signals,
**({"malware_families": families} if families else {}),
**({"ioc_types": ioc_types} if ioc_types else {}),
**(
{"mitre_url": emissions_by_tech[tech].mitre_url}
if emissions_by_tech[tech].mitre_url else {}
),
})
for tech, signals in triggered.items()
]

View File

@@ -36,6 +36,7 @@ from pydantic import BaseModel, Field
from decnet import telemetry as _telemetry
from decnet.logging import get_logger
from decnet.ttp.attack_stix import mitre_url_for
from decnet.ttp.base import Tagger, TaggerEvent
from decnet.ttp.impl._rule_index import RuleIndex
from decnet.ttp.impl._state import apply_ceiling, is_active
@@ -367,6 +368,7 @@ def _evaluate_rules(
rule_version=rule.rule_version,
evidence=evidence,
attack_release=_ATTACK_RELEASE,
mitre_url=mitre_url_for(sub_technique_id or technique_id),
))
return out

View File

@@ -146,6 +146,15 @@ class TTPTag(SQLModel, table=True):
# ID cannot render deterministically in MITRE Navigator.
attack_release: str = Field(index=True)
# Canonical attack.mitre.org URL for this technique (or
# sub-technique when present). Resolved at insert via
# decnet.ttp.attack_stix.mitre_url_for from the loaded STIX
# bundle. Nullable because (a) the bundle may not be loaded in
# certain test paths and (b) a future release could deprecate
# a technique we have legacy tags for. Not indexed — derived
# deeplink, not a query target; technique_id is already indexed.
mitre_url: Optional[str] = Field(default=None)
created_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc),
index=True,
@@ -256,6 +265,7 @@ class TechniqueRollupRow(BaseModel):
tactic: str
count: int
last_seen: datetime
mitre_url: Optional[str] = None
class IdentityTechniqueRow(BaseModel):
@@ -278,6 +288,7 @@ class IdentityTechniqueRow(BaseModel):
first_seen: datetime
last_seen: datetime
confidence_max: float
mitre_url: Optional[str] = None
class TTPTagDetailRow(BaseModel):
@@ -306,6 +317,7 @@ class TTPTagDetailRow(BaseModel):
evidence: dict[str, Any] = Field(default_factory=dict)
attack_release: str
created_at: datetime
mitre_url: Optional[str] = None
class CampaignTechniqueRow(BaseModel):
@@ -320,6 +332,7 @@ class CampaignTechniqueRow(BaseModel):
count: int
identity_count: int
last_seen: datetime
mitre_url: Optional[str] = None
class RuleCatalogueRow(BaseModel):

View File

@@ -21,6 +21,7 @@ from sqlalchemy import func, select
from sqlmodel import col
from decnet.ttp.attack_catalog import technique_name as _technique_name
from decnet.ttp.attack_stix import mitre_url_for as _mitre_url_for
from decnet.web.db.models import (
Attacker,
AttackerIdentity,
@@ -117,6 +118,7 @@ class TTPMixin(_MixinBase):
technique_name=_technique_name(r.technique_id),
sub_technique_id=r.sub_technique_id,
sub_technique_name=_technique_name(r.sub_technique_id),
mitre_url=_mitre_url_for(r.sub_technique_id or r.technique_id),
tactic=r.tactic,
count=r.count,
first_seen=r.first_seen,
@@ -155,6 +157,7 @@ class TTPMixin(_MixinBase):
technique_name=_technique_name(r.technique_id),
sub_technique_id=r.sub_technique_id,
sub_technique_name=_technique_name(r.sub_technique_id),
mitre_url=_mitre_url_for(r.sub_technique_id or r.technique_id),
tactic=r.tactic,
count=r.count,
first_seen=r.first_seen,
@@ -199,6 +202,7 @@ class TTPMixin(_MixinBase):
technique_name=_technique_name(r.technique_id),
sub_technique_id=r.sub_technique_id,
sub_technique_name=_technique_name(r.sub_technique_id),
mitre_url=_mitre_url_for(r.sub_technique_id or r.technique_id),
tactic=r.tactic,
count=r.count,
identity_count=r.identity_count,
@@ -235,6 +239,7 @@ class TTPMixin(_MixinBase):
technique_name=_technique_name(r.technique_id),
sub_technique_id=r.sub_technique_id,
sub_technique_name=_technique_name(r.sub_technique_id),
mitre_url=_mitre_url_for(r.sub_technique_id or r.technique_id),
tactic=r.tactic,
count=r.count,
first_seen=r.first_seen,
@@ -408,6 +413,7 @@ class TTPMixin(_MixinBase):
technique_name=_technique_name(r.technique_id),
sub_technique_id=r.sub_technique_id,
sub_technique_name=_technique_name(r.sub_technique_id),
mitre_url=_mitre_url_for(r.sub_technique_id or r.technique_id),
tactic=r.tactic,
count=r.count,
last_seen=r.last_seen,

View File

@@ -0,0 +1,142 @@
"""Every TTPTag emitted via ``emit_tags()`` carries a populated ``mitre_url`` column.
Phase 3 promoted ``mitre_url`` from a JSON evidence field to a
first-class TTPTag column populated at construction. The two
construction sites are ``decnet/ttp/impl/_emit.py`` (the lifter
choke point) and the inline path in ``rule_engine._evaluate_rules``;
both look up :func:`decnet.ttp.attack_stix.mitre_url_for`.
Also covers the regression-net: intel_lifter's evidence dicts must
NOT carry a ``mitre_url`` key (the column is canonical now —
duplicating in the JSON column drifts when the bundle moves).
"""
from __future__ import annotations
from pathlib import Path
from typing import Any
import pytest
from decnet.ttp import attack_stix
from decnet.ttp.base import TaggerEvent
from decnet.ttp.impl._emit import emit_tags
from decnet.ttp.impl.rule_engine import CompiledRule
from decnet.ttp.store.base import RuleState
_REPO_BUNDLE = Path(__file__).resolve().parents[2] / "enterprise-attack-19.0.json"
@pytest.fixture(autouse=True)
def _pin_bundle(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
license_path = tmp_path / "LICENSE.txt"
license_path.write_text("placeholder", encoding="utf-8")
monkeypatch.setenv("DECNET_ATTACK_BUNDLE", str(_REPO_BUNDLE))
monkeypatch.setenv("DECNET_ATTACK_LICENSE", str(license_path))
attack_stix._data = None
attack_stix._loaded_path = None
attack_stix._attack_pattern_by_id.cache_clear()
attack_stix._tactic_by_id.cache_clear()
attack_stix._tactic_by_short_name.cache_clear()
attack_stix.groups_using_technique.cache_clear()
def _rule(emits: tuple[tuple[str, str | None, str, float], ...]) -> CompiledRule:
return CompiledRule(
rule_id="R-test",
rule_version=1,
name="test rule",
applies_to=frozenset({"command"}),
match_spec={"pattern": "test"},
emits=emits,
evidence_fields=("matched_tokens",),
state=RuleState(),
)
def _event() -> TaggerEvent:
return TaggerEvent(
source_kind="command",
source_id="cmd-1",
attacker_uuid="att-uuid",
identity_uuid=None,
session_id=None,
decky_id=None,
payload={"matched_tokens": ["hydra"]},
)
def test_emit_tags_attaches_mitre_url_for_top_level_technique() -> None:
rule = _rule((("T1110", None, "TA0006", 0.85),))
tags = emit_tags(rule, _event(), evidence={"matched_tokens": ["hydra"]})
assert len(tags) == 1
assert tags[0].mitre_url == "https://attack.mitre.org/techniques/T1110"
def test_emit_tags_attaches_subtechnique_url_when_subtechnique_present() -> None:
rule = _rule((("T1059", "T1059.004", "TA0002", 0.9),))
tags = emit_tags(rule, _event(), evidence={"matched_tokens": ["sh"]})
assert tags[0].mitre_url == "https://attack.mitre.org/techniques/T1059/004"
def test_emit_tags_mitre_url_none_for_unknown_technique() -> None:
rule = _rule((("T9999", None, "TA0001", 0.5),))
tags = emit_tags(rule, _event(), evidence={"matched_tokens": ["x"]})
assert tags[0].mitre_url is None
def test_emit_tags_per_emit_resolves_independently() -> None:
"""Multi-emit rule: each emit slot resolves its own URL."""
rule = _rule((
("T1110", None, "TA0006", 0.85),
("T1059", "T1059.004", "TA0002", 0.9),
))
tags = emit_tags(rule, _event(), evidence={"matched_tokens": ["x"]})
urls = [t.mitre_url for t in tags]
assert "https://attack.mitre.org/techniques/T1110" in urls
assert "https://attack.mitre.org/techniques/T1059/004" in urls
def test_intel_lifter_evidence_does_not_contain_mitre_url() -> None:
"""Regression: mitre_url lives on the column, not in the evidence JSON.
Calls each provider's decision function directly with a payload
that should produce emits, then asserts no resulting evidence-extra
dict contains a ``mitre_url`` key.
"""
from decnet.ttp.data import intel_loader
from decnet.ttp.impl import intel_lifter
intel_loader.clear_cache()
intel_lifter._mapping.cache_clear()
decisions: list[tuple[str, dict[str, Any]]] = [
(
"abuseipdb",
{"abuseipdb_score": 95, "abuseipdb_categories": [5, 22]},
),
(
"greynoise",
{
"greynoise_classification": "scanner",
"greynoise_tags": ["tor_exit_node", "ssh_bruteforcer"],
},
),
("threatfox", {"threatfox_threat_types": ["botnet_cc"]}),
("feodo", {"feodo_listed": True, "feodo_malware_family": "Emotet"}),
]
fns = {
"abuseipdb": intel_lifter._abuseipdb_decisions,
"greynoise": intel_lifter._greynoise_decisions,
"threatfox": intel_lifter._threatfox_decisions,
"feodo": intel_lifter._feodo_decisions,
}
for provider, payload in decisions:
emits = fns[provider]({}, payload)
assert emits, f"{provider}: expected at least one emit"
for _tech, _mult, evidence_extra in emits:
assert "mitre_url" not in evidence_extra, (
f"{provider}: evidence_extra still carries mitre_url "
f"(should live on TTPTag.mitre_url column instead): "
f"{evidence_extra}"
)