Files
DECNET/decnet/ttp/impl/_emit.py
anti eff3e4bce7 feat(ttp): E.3.9 BehavioralLifter (R0031-R0040)
Reads pre-shaped session aggregates from TaggerEvent.payload and emits
techniques per Appendix A behavior tables. Per-rule predicates dispatch
on match.kind (lifter:behavioral_<name>); the lifter holds its own
RuleIndex watching the same RuleStore as the engine, so disable / clip /
TTL state reaches lifter-bound rules through the same atomic-swap path.

R0032/R0036/R0037/R0040 YAMLs had over-escaped regex strings (\\
instead of \\) — fixed in place.

Factory wired so default get_tagger() returns CompositeTagger with
BehavioralLifter shipped; remaining three lifters (E.3.10-E.3.12) land
in subsequent commits.

E.2.6 contract preserved via TolerantTagger: empty payload steady-state
yields [] with zero ERROR records. Disabled / clipped / expired state
verified.
2026-05-01 20:17:59 -04:00

69 lines
2.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Shared TTPTag emission helper used by per-source lifters.
The rule engine assembles a tag inline inside ``_evaluate_rules``; the
four lifters (E.3.9E.3.13) emit tags from the same shape but never
go through the engine's regex matcher. Pulling the assembly into one
helper keeps the ``compute_tag_uuid`` call signature, the
``apply_ceiling`` clamp, and the ``attack_release`` stamping
single-sourced.
"""
from __future__ import annotations
from typing import Any
from decnet.ttp.base import TaggerEvent
from decnet.ttp.impl._state import apply_ceiling
from decnet.ttp.impl.rule_engine import _ATTACK_RELEASE, CompiledRule
from decnet.web.db.models.ttp import TTPTag, compute_tag_uuid
def emit_tags(
rule: CompiledRule,
event: TaggerEvent,
evidence: dict[str, Any],
) -> list[TTPTag]:
"""Materialise one TTPTag per ``rule.emits`` entry.
Caller is responsible for having checked ``is_active(rule.state)``
and the per-rule predicate before calling. ``evidence`` is the
fully-assembled evidence dict the lifter wants on each emitted
tag — caller honours ``rule.evidence_fields`` and any per-rule
PII discipline (e.g. EmailEvidence) before passing it in.
The tag UUID is deterministic over (source_kind, source_id, rule_id,
rule_version, technique_id, sub_technique_id). Replay-safe: a worker
re-processing the same source events writes idempotent rows.
"""
out: list[TTPTag] = []
for technique_id, sub_technique_id, tactic, base_conf in rule.emits:
confidence = apply_ceiling(base_conf, rule.state)
tag_uuid = compute_tag_uuid(
source_kind=event.source_kind,
source_id=event.source_id,
rule_id=rule.rule_id,
rule_version=rule.rule_version,
technique_id=technique_id,
sub_technique_id=sub_technique_id,
)
out.append(TTPTag(
uuid=tag_uuid,
source_kind=event.source_kind,
source_id=event.source_id,
attacker_uuid=event.attacker_uuid,
identity_uuid=event.identity_uuid,
session_id=event.session_id,
decky_id=event.decky_id,
tactic=tactic,
technique_id=technique_id,
sub_technique_id=sub_technique_id,
confidence=confidence,
rule_id=rule.rule_id,
rule_version=rule.rule_version,
evidence=dict(evidence),
attack_release=_ATTACK_RELEASE,
))
return out
__all__ = ["emit_tags"]