diff --git a/decnet/ttp/impl/_emit.py b/decnet/ttp/impl/_emit.py index ac17e5f9..6851d193 100644 --- a/decnet/ttp/impl/_emit.py +++ b/decnet/ttp/impl/_emit.py @@ -11,6 +11,7 @@ from __future__ import annotations from typing import Any +from decnet.ttp.attack_stix import mitre_url_for from decnet.ttp.base import TaggerEvent from decnet.ttp.impl._state import apply_ceiling from decnet.ttp.impl.rule_engine import _ATTACK_RELEASE, CompiledRule @@ -61,6 +62,7 @@ def emit_tags( rule_version=rule.rule_version, evidence=dict(evidence), attack_release=_ATTACK_RELEASE, + mitre_url=mitre_url_for(sub_technique_id or technique_id), )) return out diff --git a/decnet/ttp/impl/intel_lifter.py b/decnet/ttp/impl/intel_lifter.py index e34d9c4f..050c8700 100644 --- a/decnet/ttp/impl/intel_lifter.py +++ b/decnet/ttp/impl/intel_lifter.py @@ -23,7 +23,6 @@ from typing import Any, Final from decnet.ttp.base import TaggerEvent, TolerantTagger from decnet.ttp.data.intel_loader import ( ProviderMapping, - TechniqueEmission, load_provider_mapping, ) from decnet.ttp.impl._emit import emit_tags @@ -53,22 +52,6 @@ def _mapping(provider: str) -> ProviderMapping: _GREYNOISE_MALICIOUS_BARE_MULT: Final[float] = 0.5 -def _emission_url_extras( - emissions: dict[str, TechniqueEmission], -) -> dict[str, dict[str, str]]: - """Map technique_id → {"mitre_url": ""} for every emission that has one. - - Lets the per-decision-function evidence_extra builders attach the - canonical MITRE URL to each emit slot without re-resolving against - the loaded ATT&CK bundle. - """ - return { - tid: {"mitre_url": e.mitre_url} - for tid, e in emissions.items() - if e.mitre_url - } - - # Predicate signature: returns either a list of (technique_id_filter, # confidence_multiplier, evidence_extra) tuples — one per emit slot the # rule should fire — or empty list when the rule does not fire. @@ -92,11 +75,9 @@ def _abuseipdb_decisions( # Resolve technique set across all categories present, applying # any per-technique high-score gate (see TechniqueEmission). triggered: dict[str, list[int]] = {} - emissions_by_tech: dict[str, TechniqueEmission] = {} for cat in categories: for emission in mapping.techniques_for_signal(f"cat_{cat}", score=float(score)): triggered.setdefault(emission.technique_id, []).append(cat) - emissions_by_tech.setdefault(emission.technique_id, emission) if not triggered: return [] multiplier = float(score) / 100.0 @@ -104,10 +85,6 @@ def _abuseipdb_decisions( (tech, multiplier, { "abuseipdb_categories": cats, "abuse_confidence_score": int(score), - **( - {"mitre_url": emissions_by_tech[tech].mitre_url} - if emissions_by_tech[tech].mitre_url else {} - ), }) for tech, cats in triggered.items() ] @@ -136,21 +113,16 @@ def _greynoise_decisions( # signals that triggered it AND the multiplier to apply (max wins # if multiple lanes hit the same technique). triggered: dict[str, tuple[float, list[str]]] = {} - emissions_by_tech: dict[str, TechniqueEmission] = {} - def _bump( - tech: str, mult: float, signal: str, emission: TechniqueEmission | None = None, - ) -> None: + def _bump(tech: str, mult: float, signal: str) -> None: existing = triggered.get(tech) if existing is None: triggered[tech] = (mult, [signal]) - else: - old_mult, signals = existing - signals.append(signal) - if mult > old_mult: - triggered[tech] = (mult, signals) - if emission is not None: - emissions_by_tech.setdefault(tech, emission) + return + old_mult, signals = existing + signals.append(signal) + if mult > old_mult: + triggered[tech] = (mult, signals) if classification == "scanner": _bump("T1595", 1.0, "scanner") @@ -159,7 +131,7 @@ def _greynoise_decisions( if not isinstance(tag, str): continue for emission in mapping.techniques_for_signal(tag): - _bump(emission.technique_id, 1.0, tag, emission) + _bump(emission.technique_id, 1.0, tag) if classification == "malicious" and "T1071" not in triggered: _bump("T1071", _GREYNOISE_MALICIOUS_BARE_MULT, "malicious") if not triggered: @@ -168,11 +140,6 @@ def _greynoise_decisions( (tech, mult, { "greynoise_classification": classification, "greynoise_tags": signals, - **( - {"mitre_url": emissions_by_tech[tech].mitre_url} - if tech in emissions_by_tech and emissions_by_tech[tech].mitre_url - else {} - ), }) for tech, (mult, signals) in triggered.items() ] @@ -187,17 +154,14 @@ def _feodo_decisions( payload.get("feodo_malware_family") or payload.get("malware_family") ) - base_extra: dict[str, Any] = {"feodo_listed": True} + extra: dict[str, Any] = {"feodo_listed": True} if isinstance(family, str) and family: - base_extra["malware_family"] = family + extra["malware_family"] = family mapping = _mapping("feodo") - out: EmitDecision = [] - for emission in mapping.techniques_for_signal("feodo_listed"): - extra = dict(base_extra) - if emission.mitre_url: - extra["mitre_url"] = emission.mitre_url - out.append((emission.technique_id, 1.0, extra)) - return out + return [ + (emission.technique_id, 1.0, dict(extra)) + for emission in mapping.techniques_for_signal("feodo_listed") + ] def _threatfox_decisions( @@ -224,11 +188,9 @@ def _threatfox_decisions( mapping = _mapping("threatfox") triggered: dict[str, list[str]] = {} - emissions_by_tech: dict[str, TechniqueEmission] = {} for tt in threat_types: for emission in mapping.techniques_for_signal(tt): triggered.setdefault(emission.technique_id, []).append(tt) - emissions_by_tech.setdefault(emission.technique_id, emission) if not triggered: return [] @@ -252,10 +214,6 @@ def _threatfox_decisions( "threat_types": signals, **({"malware_families": families} if families else {}), **({"ioc_types": ioc_types} if ioc_types else {}), - **( - {"mitre_url": emissions_by_tech[tech].mitre_url} - if emissions_by_tech[tech].mitre_url else {} - ), }) for tech, signals in triggered.items() ] diff --git a/decnet/ttp/impl/rule_engine.py b/decnet/ttp/impl/rule_engine.py index b5e09fc3..acac6a4c 100644 --- a/decnet/ttp/impl/rule_engine.py +++ b/decnet/ttp/impl/rule_engine.py @@ -36,6 +36,7 @@ from pydantic import BaseModel, Field from decnet import telemetry as _telemetry from decnet.logging import get_logger +from decnet.ttp.attack_stix import mitre_url_for from decnet.ttp.base import Tagger, TaggerEvent from decnet.ttp.impl._rule_index import RuleIndex from decnet.ttp.impl._state import apply_ceiling, is_active @@ -367,6 +368,7 @@ def _evaluate_rules( rule_version=rule.rule_version, evidence=evidence, attack_release=_ATTACK_RELEASE, + mitre_url=mitre_url_for(sub_technique_id or technique_id), )) return out diff --git a/decnet/web/db/models/ttp.py b/decnet/web/db/models/ttp.py index 151e1e97..2943c643 100644 --- a/decnet/web/db/models/ttp.py +++ b/decnet/web/db/models/ttp.py @@ -146,6 +146,15 @@ class TTPTag(SQLModel, table=True): # ID cannot render deterministically in MITRE Navigator. attack_release: str = Field(index=True) + # Canonical attack.mitre.org URL for this technique (or + # sub-technique when present). Resolved at insert via + # decnet.ttp.attack_stix.mitre_url_for from the loaded STIX + # bundle. Nullable because (a) the bundle may not be loaded in + # certain test paths and (b) a future release could deprecate + # a technique we have legacy tags for. Not indexed — derived + # deeplink, not a query target; technique_id is already indexed. + mitre_url: Optional[str] = Field(default=None) + created_at: datetime = Field( default_factory=lambda: datetime.now(timezone.utc), index=True, @@ -256,6 +265,7 @@ class TechniqueRollupRow(BaseModel): tactic: str count: int last_seen: datetime + mitre_url: Optional[str] = None class IdentityTechniqueRow(BaseModel): @@ -278,6 +288,7 @@ class IdentityTechniqueRow(BaseModel): first_seen: datetime last_seen: datetime confidence_max: float + mitre_url: Optional[str] = None class TTPTagDetailRow(BaseModel): @@ -306,6 +317,7 @@ class TTPTagDetailRow(BaseModel): evidence: dict[str, Any] = Field(default_factory=dict) attack_release: str created_at: datetime + mitre_url: Optional[str] = None class CampaignTechniqueRow(BaseModel): @@ -320,6 +332,7 @@ class CampaignTechniqueRow(BaseModel): count: int identity_count: int last_seen: datetime + mitre_url: Optional[str] = None class RuleCatalogueRow(BaseModel): diff --git a/decnet/web/db/sqlmodel_repo/ttp.py b/decnet/web/db/sqlmodel_repo/ttp.py index adb24b20..30adf077 100644 --- a/decnet/web/db/sqlmodel_repo/ttp.py +++ b/decnet/web/db/sqlmodel_repo/ttp.py @@ -21,6 +21,7 @@ from sqlalchemy import func, select from sqlmodel import col from decnet.ttp.attack_catalog import technique_name as _technique_name +from decnet.ttp.attack_stix import mitre_url_for as _mitre_url_for from decnet.web.db.models import ( Attacker, AttackerIdentity, @@ -117,6 +118,7 @@ class TTPMixin(_MixinBase): technique_name=_technique_name(r.technique_id), sub_technique_id=r.sub_technique_id, sub_technique_name=_technique_name(r.sub_technique_id), + mitre_url=_mitre_url_for(r.sub_technique_id or r.technique_id), tactic=r.tactic, count=r.count, first_seen=r.first_seen, @@ -155,6 +157,7 @@ class TTPMixin(_MixinBase): technique_name=_technique_name(r.technique_id), sub_technique_id=r.sub_technique_id, sub_technique_name=_technique_name(r.sub_technique_id), + mitre_url=_mitre_url_for(r.sub_technique_id or r.technique_id), tactic=r.tactic, count=r.count, first_seen=r.first_seen, @@ -199,6 +202,7 @@ class TTPMixin(_MixinBase): technique_name=_technique_name(r.technique_id), sub_technique_id=r.sub_technique_id, sub_technique_name=_technique_name(r.sub_technique_id), + mitre_url=_mitre_url_for(r.sub_technique_id or r.technique_id), tactic=r.tactic, count=r.count, identity_count=r.identity_count, @@ -235,6 +239,7 @@ class TTPMixin(_MixinBase): technique_name=_technique_name(r.technique_id), sub_technique_id=r.sub_technique_id, sub_technique_name=_technique_name(r.sub_technique_id), + mitre_url=_mitre_url_for(r.sub_technique_id or r.technique_id), tactic=r.tactic, count=r.count, first_seen=r.first_seen, @@ -408,6 +413,7 @@ class TTPMixin(_MixinBase): technique_name=_technique_name(r.technique_id), sub_technique_id=r.sub_technique_id, sub_technique_name=_technique_name(r.sub_technique_id), + mitre_url=_mitre_url_for(r.sub_technique_id or r.technique_id), tactic=r.tactic, count=r.count, last_seen=r.last_seen, diff --git a/tests/ttp/test_emit_attaches_mitre_url.py b/tests/ttp/test_emit_attaches_mitre_url.py new file mode 100644 index 00000000..86c74b38 --- /dev/null +++ b/tests/ttp/test_emit_attaches_mitre_url.py @@ -0,0 +1,142 @@ +"""Every TTPTag emitted via ``emit_tags()`` carries a populated ``mitre_url`` column. + +Phase 3 promoted ``mitre_url`` from a JSON evidence field to a +first-class TTPTag column populated at construction. The two +construction sites are ``decnet/ttp/impl/_emit.py`` (the lifter +choke point) and the inline path in ``rule_engine._evaluate_rules``; +both look up :func:`decnet.ttp.attack_stix.mitre_url_for`. + +Also covers the regression-net: intel_lifter's evidence dicts must +NOT carry a ``mitre_url`` key (the column is canonical now — +duplicating in the JSON column drifts when the bundle moves). +""" +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import pytest + +from decnet.ttp import attack_stix +from decnet.ttp.base import TaggerEvent +from decnet.ttp.impl._emit import emit_tags +from decnet.ttp.impl.rule_engine import CompiledRule +from decnet.ttp.store.base import RuleState + +_REPO_BUNDLE = Path(__file__).resolve().parents[2] / "enterprise-attack-19.0.json" + + +@pytest.fixture(autouse=True) +def _pin_bundle(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + license_path = tmp_path / "LICENSE.txt" + license_path.write_text("placeholder", encoding="utf-8") + monkeypatch.setenv("DECNET_ATTACK_BUNDLE", str(_REPO_BUNDLE)) + monkeypatch.setenv("DECNET_ATTACK_LICENSE", str(license_path)) + attack_stix._data = None + attack_stix._loaded_path = None + attack_stix._attack_pattern_by_id.cache_clear() + attack_stix._tactic_by_id.cache_clear() + attack_stix._tactic_by_short_name.cache_clear() + attack_stix.groups_using_technique.cache_clear() + + +def _rule(emits: tuple[tuple[str, str | None, str, float], ...]) -> CompiledRule: + return CompiledRule( + rule_id="R-test", + rule_version=1, + name="test rule", + applies_to=frozenset({"command"}), + match_spec={"pattern": "test"}, + emits=emits, + evidence_fields=("matched_tokens",), + state=RuleState(), + ) + + +def _event() -> TaggerEvent: + return TaggerEvent( + source_kind="command", + source_id="cmd-1", + attacker_uuid="att-uuid", + identity_uuid=None, + session_id=None, + decky_id=None, + payload={"matched_tokens": ["hydra"]}, + ) + + +def test_emit_tags_attaches_mitre_url_for_top_level_technique() -> None: + rule = _rule((("T1110", None, "TA0006", 0.85),)) + tags = emit_tags(rule, _event(), evidence={"matched_tokens": ["hydra"]}) + assert len(tags) == 1 + assert tags[0].mitre_url == "https://attack.mitre.org/techniques/T1110" + + +def test_emit_tags_attaches_subtechnique_url_when_subtechnique_present() -> None: + rule = _rule((("T1059", "T1059.004", "TA0002", 0.9),)) + tags = emit_tags(rule, _event(), evidence={"matched_tokens": ["sh"]}) + assert tags[0].mitre_url == "https://attack.mitre.org/techniques/T1059/004" + + +def test_emit_tags_mitre_url_none_for_unknown_technique() -> None: + rule = _rule((("T9999", None, "TA0001", 0.5),)) + tags = emit_tags(rule, _event(), evidence={"matched_tokens": ["x"]}) + assert tags[0].mitre_url is None + + +def test_emit_tags_per_emit_resolves_independently() -> None: + """Multi-emit rule: each emit slot resolves its own URL.""" + rule = _rule(( + ("T1110", None, "TA0006", 0.85), + ("T1059", "T1059.004", "TA0002", 0.9), + )) + tags = emit_tags(rule, _event(), evidence={"matched_tokens": ["x"]}) + urls = [t.mitre_url for t in tags] + assert "https://attack.mitre.org/techniques/T1110" in urls + assert "https://attack.mitre.org/techniques/T1059/004" in urls + + +def test_intel_lifter_evidence_does_not_contain_mitre_url() -> None: + """Regression: mitre_url lives on the column, not in the evidence JSON. + + Calls each provider's decision function directly with a payload + that should produce emits, then asserts no resulting evidence-extra + dict contains a ``mitre_url`` key. + """ + from decnet.ttp.data import intel_loader + from decnet.ttp.impl import intel_lifter + + intel_loader.clear_cache() + intel_lifter._mapping.cache_clear() + + decisions: list[tuple[str, dict[str, Any]]] = [ + ( + "abuseipdb", + {"abuseipdb_score": 95, "abuseipdb_categories": [5, 22]}, + ), + ( + "greynoise", + { + "greynoise_classification": "scanner", + "greynoise_tags": ["tor_exit_node", "ssh_bruteforcer"], + }, + ), + ("threatfox", {"threatfox_threat_types": ["botnet_cc"]}), + ("feodo", {"feodo_listed": True, "feodo_malware_family": "Emotet"}), + ] + + fns = { + "abuseipdb": intel_lifter._abuseipdb_decisions, + "greynoise": intel_lifter._greynoise_decisions, + "threatfox": intel_lifter._threatfox_decisions, + "feodo": intel_lifter._feodo_decisions, + } + for provider, payload in decisions: + emits = fns[provider]({}, payload) + assert emits, f"{provider}: expected at least one emit" + for _tech, _mult, evidence_extra in emits: + assert "mitre_url" not in evidence_extra, ( + f"{provider}: evidence_extra still carries mitre_url " + f"(should live on TTPTag.mitre_url column instead): " + f"{evidence_extra}" + )