Files
DECNET/decnet/ttp/impl/intel_lifter.py
anti d25f69ba1b feat(ttp): extract intel_lifter provider mappings to YAML data + ATT&CK external_reference enrichment
The four provider→technique tables (AbuseIPDB cat→techniques,
GreyNoise tag→techniques, ThreatFox threat_type→techniques, plus
the Feodo binary-listed signal) used to live as Final[dict] constants
in intel_lifter.py. Two real problems with that:

1. Drift between rules/ttp/R0054.yaml..R0058.yaml (which declare
   the full slate per provider) and the Python dicts (which decide
   which slate-member fires per signal). The v2 audit comment in
   intel_lifter.py documented that they had silently drifted.
2. No ATT&CK provenance on emissions — the loaded STIX bundle has
   rich external_references (canonical attack.mitre.org URLs) that
   never surfaced because the lifter had no path back to them.

Mappings now live as YAML at decnet/ttp/data/intel/{provider}.yaml,
validated at load against the loaded ATT&CK bundle, with each entry
enriched by attack_stix._attack_pattern_by_id to attach the canonical
MITRE URL to every emission.

- decnet/ttp/data/intel_loader.py: pydantic-validated schema +
  ProviderMapping/Signal/TechniqueEmission frozen dataclasses +
  load_provider_mapping(provider) lru-cached.
- Per-technique high_score_threshold inlined into YAML
  (collapses the separate _ABUSEIPDB_HIGH_SCORE_GATED dict).
- external_reference field follows the STIX 2.1 external-reference
  shape (source_name + url + optional external_id) so the future
  STIX/MISP exporter is a direct translation.
- intel_lifter.py: dicts deleted, decision functions read from
  ProviderMapping accessors. Decision-flow constants (T1071/T1595
  bare-classification fallbacks in _greynoise_decisions) stay in
  code — they're not table rows.
- Each emit slot's evidence_extra now carries mitre_url for any
  technique resolved in the bundle (every one in practice).
- tests/ttp/test_intel_mappings.py: snapshot equivalence vs the
  legacy dicts, high-score gate behavior, every-signal-has-an-
  external-reference, every-emission-has-a-mitre-url, negative
  paths (unknown technique_id raises AttackBundleError, mismatched
  provider field rejected, dir listing matches expected providers).

The YAML schema + mitre_url enrichment lays groundwork for the
future STIX exporter; this commit does NOT build that exporter.
2026-05-09 06:18:25 -04:00

416 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Intel lifter — opportunistic third-party verdict translator (E.3.10).
Reads ``AttackerIntel``-derived payload fields and emits ATT&CK
techniques per Appendix A.10 with per-provider confidence scaling.
Decoupling rule (design doc §"Decoupling: bus-driven, never a hard
dependency", enforced statically by E.2.7): this module imports
NOTHING from ``decnet.intel.{abuseipdb,greynoise,feodo,threatfox}`` —
only ``decnet.web.db.models`` symbols are permitted via ``TTPTag``.
Per-provider null tolerance is the steady state: a fresh attacker with
no intel row yet produces zero tags. A populated AbuseIPDB column with
no GreyNoise still fires AbuseIPDB-driven rules; the lifter never
waits for cross-provider corroboration as a precondition (the
:class:`~decnet.ttp.impl._state.is_active` check + per-rule predicate
gate emission, not provider count).
"""
from __future__ import annotations
from collections.abc import Callable
from functools import lru_cache
from typing import Any, Final
from decnet.ttp.base import TaggerEvent, TolerantTagger
from decnet.ttp.data.intel_loader import (
ProviderMapping,
TechniqueEmission,
load_provider_mapping,
)
from decnet.ttp.impl._emit import emit_tags
from decnet.ttp.impl._rule_index import RuleIndex
from decnet.ttp.impl._state import apply_ceiling, is_active
from decnet.ttp.impl.rule_engine import _ATTACK_RELEASE, CompiledRule
from decnet.ttp.store.base import RuleStore
from decnet.web.db.models.ttp import TTPTag, compute_tag_uuid
# Provider→technique mappings live as YAML under
# decnet/ttp/data/intel/{provider}.yaml — see
# decnet.ttp.data.intel_loader for the schema and validation. Lazy
# accessors below mean module import does not trigger an ATT&CK
# bundle load (the loader validates every technique resolves there).
@lru_cache(maxsize=4)
def _mapping(provider: str) -> ProviderMapping:
return load_provider_mapping(provider)
# Confidence multiplier when GreyNoise reports ``classification ==
# "malicious"`` without a specific tag we recognise. The bare
# classification is real signal but weaker than a tag — half-confidence
# keeps the floor honest. Decision-flow constant, not a table row.
_GREYNOISE_MALICIOUS_BARE_MULT: Final[float] = 0.5
def _emission_url_extras(
emissions: dict[str, TechniqueEmission],
) -> dict[str, dict[str, str]]:
"""Map technique_id → {"mitre_url": "<url>"} for every emission that has one.
Lets the per-decision-function evidence_extra builders attach the
canonical MITRE URL to each emit slot without re-resolving against
the loaded ATT&CK bundle.
"""
return {
tid: {"mitre_url": e.mitre_url}
for tid, e in emissions.items()
if e.mitre_url
}
# Predicate signature: returns either a list of (technique_id_filter,
# confidence_multiplier, evidence_extra) tuples — one per emit slot the
# rule should fire — or empty list when the rule does not fire.
EmitDecision = list[tuple[str, float, dict[str, Any]]]
Predicate = Callable[[dict[str, Any], dict[str, Any]], EmitDecision]
def _abuseipdb_decisions(
_spec: dict[str, Any], payload: dict[str, Any],
) -> EmitDecision:
score = payload.get("abuseipdb_score")
categories_raw = payload.get("abuseipdb_categories") or payload.get("categories")
if not isinstance(score, (int, float)):
return []
if not isinstance(categories_raw, list) or not categories_raw:
return []
categories: list[int] = [c for c in categories_raw if isinstance(c, int)]
if not categories:
return []
mapping = _mapping("abuseipdb")
# Resolve technique set across all categories present, applying
# any per-technique high-score gate (see TechniqueEmission).
triggered: dict[str, list[int]] = {}
emissions_by_tech: dict[str, TechniqueEmission] = {}
for cat in categories:
for emission in mapping.techniques_for_signal(f"cat_{cat}", score=float(score)):
triggered.setdefault(emission.technique_id, []).append(cat)
emissions_by_tech.setdefault(emission.technique_id, emission)
if not triggered:
return []
multiplier = float(score) / 100.0
return [
(tech, multiplier, {
"abuseipdb_categories": cats,
"abuse_confidence_score": int(score),
**(
{"mitre_url": emissions_by_tech[tech].mitre_url}
if emissions_by_tech[tech].mitre_url else {}
),
})
for tech, cats in triggered.items()
]
def _greynoise_decisions(
_spec: dict[str, Any], payload: dict[str, Any],
) -> EmitDecision:
"""Decide GreyNoise emissions.
Three signal lanes:
* ``classification == "scanner"`` — full-strength T1595 (kept for
compatibility with non-Community provider plans that surface
this verdict; the Community endpoint reports {malicious, benign,
suspicious, unknown} only).
* Specific recognised tag → its mapped technique(s) at 1.0×.
* Bare ``classification == "malicious"`` with no recognised tag →
T1071 at half multiplier (post-audit decision: the verdict is
real but unspecific). The bare-malicious lane is suppressed when
a tag already fired on T1071 to avoid double-stamping.
"""
classification = payload.get("greynoise_classification")
tags_raw = payload.get("greynoise_tags") or []
mapping = _mapping("greynoise")
# Per-technique evidence accumulator — maps technique_id to the
# signals that triggered it AND the multiplier to apply (max wins
# if multiple lanes hit the same technique).
triggered: dict[str, tuple[float, list[str]]] = {}
emissions_by_tech: dict[str, TechniqueEmission] = {}
def _bump(
tech: str, mult: float, signal: str, emission: TechniqueEmission | None = None,
) -> None:
existing = triggered.get(tech)
if existing is None:
triggered[tech] = (mult, [signal])
else:
old_mult, signals = existing
signals.append(signal)
if mult > old_mult:
triggered[tech] = (mult, signals)
if emission is not None:
emissions_by_tech.setdefault(tech, emission)
if classification == "scanner":
_bump("T1595", 1.0, "scanner")
if isinstance(tags_raw, list):
for tag in tags_raw:
if not isinstance(tag, str):
continue
for emission in mapping.techniques_for_signal(tag):
_bump(emission.technique_id, 1.0, tag, emission)
if classification == "malicious" and "T1071" not in triggered:
_bump("T1071", _GREYNOISE_MALICIOUS_BARE_MULT, "malicious")
if not triggered:
return []
return [
(tech, mult, {
"greynoise_classification": classification,
"greynoise_tags": signals,
**(
{"mitre_url": emissions_by_tech[tech].mitre_url}
if tech in emissions_by_tech and emissions_by_tech[tech].mitre_url
else {}
),
})
for tech, (mult, signals) in triggered.items()
]
def _feodo_decisions(
_spec: dict[str, Any], payload: dict[str, Any],
) -> EmitDecision:
if payload.get("feodo_listed") is not True:
return []
family = (
payload.get("feodo_malware_family")
or payload.get("malware_family")
)
base_extra: dict[str, Any] = {"feodo_listed": True}
if isinstance(family, str) and family:
base_extra["malware_family"] = family
mapping = _mapping("feodo")
out: EmitDecision = []
for emission in mapping.techniques_for_signal("feodo_listed"):
extra = dict(base_extra)
if emission.mitre_url:
extra["mitre_url"] = emission.mitre_url
out.append((emission.technique_id, 1.0, extra))
return out
def _threatfox_decisions(
_spec: dict[str, Any], payload: dict[str, Any],
) -> EmitDecision:
"""ThreatFox dispatch keys on ``threat_type`` (canonical taxonomy)
not ``ioc_type`` — the v1 ship-time mapping had it backwards.
Accepts either ``threatfox_threat_types`` (list, preferred — comes
from the bus payload built by the intel worker) or a singular
``threat_type``/``ioc_type`` field for legacy callers and tests.
The lifter is tolerant by contract; missing inputs produce zero
emissions, never an error.
"""
threat_types_raw = (
payload.get("threatfox_threat_types")
or payload.get("threat_type")
)
threat_types: list[str] = []
if isinstance(threat_types_raw, list):
threat_types = [t for t in threat_types_raw if isinstance(t, str)]
elif isinstance(threat_types_raw, str) and threat_types_raw:
threat_types = [threat_types_raw]
mapping = _mapping("threatfox")
triggered: dict[str, list[str]] = {}
emissions_by_tech: dict[str, TechniqueEmission] = {}
for tt in threat_types:
for emission in mapping.techniques_for_signal(tt):
triggered.setdefault(emission.technique_id, []).append(tt)
emissions_by_tech.setdefault(emission.technique_id, emission)
if not triggered:
return []
families_raw = (
payload.get("threatfox_malware_families")
or payload.get("malware_family")
)
families: list[str] = []
if isinstance(families_raw, list):
families = [f for f in families_raw if isinstance(f, str)]
elif isinstance(families_raw, str) and families_raw:
families = [families_raw]
ioc_types_raw = payload.get("threatfox_ioc_types")
ioc_types: list[str] = (
[i for i in ioc_types_raw if isinstance(i, str)]
if isinstance(ioc_types_raw, list) else []
)
return [
(tech, 1.0, {
"threat_types": signals,
**({"malware_families": families} if families else {}),
**({"ioc_types": ioc_types} if ioc_types else {}),
**(
{"mitre_url": emissions_by_tech[tech].mitre_url}
if emissions_by_tech[tech].mitre_url else {}
),
})
for tech, signals in triggered.items()
]
def _aggregate_bump_decisions(
_spec: dict[str, Any], _payload: dict[str, Any],
) -> EmitDecision:
# R0058 is a bump-only meta-rule (TTP_TAGGING.md §"Initial rule pack"
# R0058 + commit b819dfe note: confidence < 0.3 drops at the repo
# layer). The bump-existing semantics need cross-tag access the
# current TaggerEvent contract doesn't provide; deferred to E.3.14
# worker bootstrap. Return empty so R0058 is a no-op in v0.
return []
_PREDICATES: Final[dict[str, Predicate]] = {
"lifter:intel_abuseipdb": _abuseipdb_decisions,
"lifter:intel_greynoise": _greynoise_decisions,
"lifter:intel_feodo": _feodo_decisions,
"lifter:intel_threatfox": _threatfox_decisions,
"lifter:intel_aggregate_bump": _aggregate_bump_decisions,
}
class IntelLifter(TolerantTagger):
name = "intel"
HANDLES = frozenset({"intel"})
OWNED_PREFIX: Final[str] = "lifter:intel_"
def __init__(self, store: RuleStore) -> None:
self._store = store
self._index = RuleIndex()
@classmethod
def _owns(cls, rule: CompiledRule) -> bool:
kind = rule.match_spec.get("kind", "")
return isinstance(kind, str) and kind.startswith(cls.OWNED_PREFIX)
async def watch_store(self) -> None:
await self._index.watch(self._store, predicate=self._owns)
async def _tag_impl(self, event: TaggerEvent) -> list[TTPTag]:
out: list[TTPTag] = []
for rule in self._index.values():
if event.source_kind not in rule.applies_to:
continue
if not is_active(rule.state):
continue
kind = rule.match_spec.get("kind", "")
handler = _PREDICATES.get(kind)
if handler is None:
continue
decisions = handler(rule.match_spec, event.payload)
if not decisions:
continue
out.extend(_emit_filtered(rule, event, decisions))
return out
def _emit_filtered(
rule: CompiledRule,
event: TaggerEvent,
decisions: EmitDecision,
) -> list[TTPTag]:
"""Fan out only the ``rule.emits`` entries whose technique_id is in
the predicate's decision set, scaled by the per-decision multiplier
and stamped with the predicate's evidence extras.
A rule's YAML may declare ``emits=[T1110, T1190, T1566]`` (the
universe of possible emissions); the predicate decides which subset
actually fires for a given payload. This is the lifter analogue of
"one event maps to many techniques" — except the dispatch is signal-
driven, not regex-driven.
"""
decision_by_tech: dict[str, tuple[float, dict[str, Any]]] = {
tech: (mult, extra) for tech, mult, extra in decisions
}
out: list[TTPTag] = []
base_evidence: dict[str, Any] = {
field: event.payload.get(field)
for field in rule.evidence_fields
if field in event.payload
}
for technique_id, sub_technique_id, tactic, base_conf in rule.emits:
if technique_id not in decision_by_tech:
continue
multiplier, extra = decision_by_tech[technique_id]
evidence = dict(base_evidence)
evidence.update(extra)
confidence = apply_ceiling(base_conf * multiplier, rule.state)
tag_uuid = compute_tag_uuid(
source_kind=event.source_kind,
source_id=event.source_id,
rule_id=rule.rule_id,
rule_version=rule.rule_version,
technique_id=technique_id,
sub_technique_id=sub_technique_id,
)
out.append(TTPTag(
uuid=tag_uuid,
source_kind=event.source_kind,
source_id=event.source_id,
attacker_uuid=event.attacker_uuid,
identity_uuid=event.identity_uuid,
session_id=event.session_id,
decky_id=event.decky_id,
tactic=tactic,
technique_id=technique_id,
sub_technique_id=sub_technique_id,
confidence=confidence,
rule_id=rule.rule_id,
rule_version=rule.rule_version,
evidence=evidence,
attack_release=_ATTACK_RELEASE,
))
return out
def all_emitted_technique_ids() -> frozenset[str]:
"""Every technique ID this lifter could emit, drawn from the four provider mapping YAMLs plus decision-flow constants.
Used by :func:`validate_against_attack_bundle` to assert every
provider-driven emission resolves in the loaded ATT&CK STIX
bundle. Includes the bare-classification emissions in
``_greynoise_decisions`` (T1595 for ``classification == "scanner"``,
T1071 for bare ``"malicious"``) — those are decision-flow
constants, not YAML rows. The loader itself already validates
YAML-sourced IDs at load; this fold-in covers the in-code
constants too.
"""
ids: set[str] = set()
for provider in ("abuseipdb", "greynoise", "feodo", "threatfox"):
ids.update(_mapping(provider).all_technique_ids())
# Decision-flow constants (see _greynoise_decisions).
ids.update({"T1071", "T1595"})
return frozenset(ids)
def validate_against_attack_bundle() -> None:
"""Assert every technique ID this lifter could emit resolves in the loaded ATT&CK STIX bundle."""
from decnet.ttp.attack_stix import assert_known_technique_ids
assert_known_technique_ids(
list(all_emitted_technique_ids()),
source="decnet.ttp.impl.intel_lifter",
)
__all__ = ["IntelLifter", "all_emitted_technique_ids", "validate_against_attack_bundle"]
# Suppress unused-import lint; emit_tags is exposed for parity with the
# other lifters even though IntelLifter uses _emit_filtered. Leave the
# import present so future refactors that consolidate emission paths
# don't have to re-add it.
_ = emit_tags