Files
DECNET/decnet/ttp/impl/_emit.py
anti 84a075e405 feat(ttp): promote mitre_url to first-class TTPTag column + propagate everywhere
Phase 2 attached mitre_url to intel-emitted tags' evidence JSON;
Phase 3 promotes it to a real column populated for *every* tag —
intel, credential, behavioral, canary, identity, email, rule-engine —
from one source. Pre-v1, so the SQLModel field is added directly
without an Alembic migration.

- TTPTag gains mitre_url: Optional[str] (not indexed — derived
  deeplink, not a query target; technique_id is already indexed).
- _emit.py and rule_engine._evaluate_rules both populate mitre_url
  via attack_stix.mitre_url_for(sub_technique_id or technique_id).
  Sub-technique URL when present, else parent. The two construction
  sites stay separate because the rule_engine path carries per-emit
  span instrumentation that emit_tags() can't preserve without
  threading a span object through; minimal-change beats forced
  refactor here.
- intel_lifter strips mitre_url from evidence_extra in all four
  decision functions. The column is canonical now; duplicating in
  the JSON column would drift when the bundle moves. The unused
  TechniqueEmission import + tracking dicts removed too.
- IdentityTechniqueRow / TechniqueRollupRow / TTPTagDetailRow /
  CampaignTechniqueRow gain mitre_url: Optional[str].
- sqlmodel_repo/ttp.py:_mitre_url_for added; the 5 row-builder sites
  pass mitre_url=_mitre_url_for(sub_technique_id or technique_id)
  alongside the existing technique_name resolution.
- api_get_tag_details.py needs no change — list_tags_by_scope_and
  _technique already returns model_dump() rows that flow the new
  column through **row spread to TTPTagDetailRow.
- tests/ttp/test_emit_attaches_mitre_url.py covers both construction
  paths (top-level, sub-tech, unknown, multi-emit) and a regression
  test that intel_lifter evidence dicts no longer contain mitre_url.
2026-05-09 06:40:08 -04:00

71 lines
2.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Shared TTPTag emission helper used by per-source lifters.
The rule engine assembles a tag inline inside ``_evaluate_rules``; the
four lifters (E.3.9E.3.13) emit tags from the same shape but never
go through the engine's regex matcher. Pulling the assembly into one
helper keeps the ``compute_tag_uuid`` call signature, the
``apply_ceiling`` clamp, and the ``attack_release`` stamping
single-sourced.
"""
from __future__ import annotations
from typing import Any
from decnet.ttp.attack_stix import mitre_url_for
from decnet.ttp.base import TaggerEvent
from decnet.ttp.impl._state import apply_ceiling
from decnet.ttp.impl.rule_engine import _ATTACK_RELEASE, CompiledRule
from decnet.web.db.models.ttp import TTPTag, compute_tag_uuid
def emit_tags(
rule: CompiledRule,
event: TaggerEvent,
evidence: dict[str, Any],
) -> list[TTPTag]:
"""Materialise one TTPTag per ``rule.emits`` entry.
Caller is responsible for having checked ``is_active(rule.state)``
and the per-rule predicate before calling. ``evidence`` is the
fully-assembled evidence dict the lifter wants on each emitted
tag — caller honours ``rule.evidence_fields`` and any per-rule
PII discipline (e.g. EmailEvidence) before passing it in.
The tag UUID is deterministic over (source_kind, source_id, rule_id,
rule_version, technique_id, sub_technique_id). Replay-safe: a worker
re-processing the same source events writes idempotent rows.
"""
out: list[TTPTag] = []
for technique_id, sub_technique_id, tactic, base_conf in rule.emits:
confidence = apply_ceiling(base_conf, rule.state)
tag_uuid = compute_tag_uuid(
source_kind=event.source_kind,
source_id=event.source_id,
rule_id=rule.rule_id,
rule_version=rule.rule_version,
technique_id=technique_id,
sub_technique_id=sub_technique_id,
)
out.append(TTPTag(
uuid=tag_uuid,
source_kind=event.source_kind,
source_id=event.source_id,
attacker_uuid=event.attacker_uuid,
identity_uuid=event.identity_uuid,
session_id=event.session_id,
decky_id=event.decky_id,
tactic=tactic,
technique_id=technique_id,
sub_technique_id=sub_technique_id,
confidence=confidence,
rule_id=rule.rule_id,
rule_version=rule.rule_version,
evidence=dict(evidence),
attack_release=_ATTACK_RELEASE,
mitre_url=mitre_url_for(sub_technique_id or technique_id),
))
return out
__all__ = ["emit_tags"]