feat(ttp): E.3.7 RuleEngine — evaluate + atomic-swap watch_store

Implements the rule engine body left empty at contract phase: evaluate()
dispatches by source_kind through self._by_kind, runs the rule's match
spec against event.payload, and emits one TTPTag per emits entry.
watch_store() loads the initial corpus from RuleStore.load_compiled,
then drains subscribe_changes, applying definition changes via
single-statement dict assignment (atomic swap, GIL-atomic to readers)
and state changes via NamedTuple._replace on the existing CompiledRule.

Why: with the FS + DB stores in place (E.3.5/E.3.6), the engine is the
last piece of the rule plane. Lifters (E.3.9–E.3.13) consume the
engine; the worker bootstrap (E.3.14) wires watch_store into the
asyncio event loop. After this commit a CompositeTagger constructed
with a RuleEngine + a populated rules dir will produce real tags.

Notes:
- CompiledRule.emits extended to 4-tuple
  (technique_id, sub_technique_id, tactic, confidence). Tactic + confidence
  ride per-emit so a single rule can carry multiple precision targets
  (the "one event maps to many techniques" property). Compile helpers in
  both backends extract them from the YAML emits dict; missing tactic
  or confidence is a deploy-time error.
- v0 match operator is "pattern" (regex). The field defaults per
  source_kind (command_text / raw_url / subject / verdict / …) and is
  overridable via match.field. Future ops (contains, equals, in_set)
  extend _match_event without touching the engine surface.
- Confidence model: rules with state="clipped" + confidence_max set
  cap the per-emit confidence downward; clipped is a soft suppress, not
  a hard skip. Disabled rules are skipped wholly; expires_at past is
  re-checked at evaluate as defense-in-depth (the store auto-reverts,
  but a racing read between expiry and revert must not fire the rule).
- _span(name, **attrs) helper in engine + both stores short-circuits on
  decnet.telemetry._ENABLED — matches the project's @traced /
  wrap_repository zero-overhead-when-disabled pattern instead of relying
  solely on the no-op tracer indirection.
- Late-bound tracer (telemetry.get_tracer called per-span, not at
  module load) so test_tracing's monkeypatch reaches the production
  code path.

xfails flipped: tests/ttp/test_rule_engine.py multi-emit fan-out +
rule_version-collision-via-engine; tests/ttp/test_multi_mapping.py
N×M engine fan-out + idempotent replay; tests/ttp/test_tracing.py
ttp.eval span hierarchy + ttp.rule.fire span attributes.

Tests: 214 passed, 19 xfailed (gated on E.3.8 lifters / rule pack /
worker bootstrap).
mypy: clean on prod code; pre-existing test-stub arg-type warnings
unchanged.
This commit is contained in:
2026-05-01 08:49:15 -04:00
parent 8a93ee3129
commit ed3f340ea8
10 changed files with 679 additions and 150 deletions

View File

@@ -37,7 +37,8 @@ The master-side filesystem→DB sync helper is
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator
from collections.abc import AsyncIterator, Iterator
from contextlib import contextmanager
from dataclasses import replace
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Final
@@ -64,8 +65,26 @@ if TYPE_CHECKING:
_log = get_logger("ttp.store.database")
def _tracer() -> Any:
return _telemetry.get_tracer("ttp.store")
@contextmanager
def _span(name: str, **attrs: Any) -> Iterator[Any]:
"""Span context manager gated on ``DECNET_DEVELOPER_TRACING``.
Mirrors the helper in :mod:`decnet.ttp.store.impl.filesystem`: zero
per-call overhead when tracing is off, late-bound tracer when on
(so ``test_tracing.py``'s monkeypatch of
:func:`decnet.telemetry.get_tracer` reaches us).
"""
if not _telemetry._ENABLED:
yield None
return
tracer = _telemetry.get_tracer("ttp.store")
with tracer.start_as_current_span(name) as span:
for key, value in attrs.items():
try:
span.set_attribute(key, value)
except (TypeError, ValueError):
continue
yield span
def _utcnow() -> datetime:
@@ -100,27 +119,35 @@ def _row_to_state(row: TTPRuleState) -> RuleState:
)
def _safe_set_attrs(span: Any, **attrs: Any) -> None:
setter = getattr(span, "set_attribute", None)
if setter is None:
return
for key, value in attrs.items():
try:
setter(key, value)
except (TypeError, ValueError):
continue
def _compile_one(parsed: RuleSchema, state: RuleState) -> CompiledRule:
emits: list[tuple[str, str | None]] = []
"""Mirror of :func:`decnet.ttp.store.impl.filesystem._compile_one`.
Same 4-tuple emits shape so a rule round-trips identically through
either backend. Kept as a sibling rather than imported from the FS
module to avoid dragging the asyncinotify import onto non-Linux
hosts that only use the database backend.
"""
emits: list[tuple[str, str | None, str, float]] = []
for entry in parsed.emits:
tid = entry.get("technique_id")
if not tid:
raise ValueError(
f"rule {parsed.rule_id}: every emits entry needs technique_id",
)
sub = entry.get("sub_technique_id") or None
emits.append((tid, sub))
sub_raw = entry.get("sub_technique_id")
sub = sub_raw if sub_raw else None
tactic = entry.get("tactic")
if not tactic:
raise ValueError(
f"rule {parsed.rule_id}: emit for {tid} needs a tactic",
)
confidence_raw = entry.get("confidence")
if confidence_raw is None:
raise ValueError(
f"rule {parsed.rule_id}: emit for {tid} needs a confidence",
)
confidence = float(confidence_raw)
emits.append((str(tid), sub, str(tactic), confidence))
return CompiledRule(
rule_id=parsed.rule_id,
rule_version=parsed.rule_version,
@@ -146,9 +173,13 @@ def _yaml_to_compiled(yaml_text: str, state: RuleState) -> CompiledRule:
def _compiled_to_yaml(compiled: CompiledRule) -> str:
"""Serialize a :class:`CompiledRule` back to a YAML rule body for
master-side filesystem→DB sync. Mirrors :class:`RuleSchema`."""
emits: list[dict[str, str]] = []
for technique_id, sub in compiled.emits:
entry: dict[str, str] = {"technique_id": technique_id}
emits: list[dict[str, Any]] = []
for technique_id, sub, tactic, confidence in compiled.emits:
entry: dict[str, Any] = {
"technique_id": technique_id,
"tactic": tactic,
"confidence": confidence,
}
if sub:
entry["sub_technique_id"] = sub
emits.append(entry)
@@ -275,17 +306,16 @@ class DatabaseRuleStore(RuleStore):
state: RuleState,
set_by: str,
) -> None:
with _tracer().start_as_current_span("ttp.rule.state.change") as span:
_safe_set_attrs(
span,
rule_id=rule_id,
state=state.state,
set_by=set_by,
)
with _span(
"ttp.rule.state.change",
rule_id=rule_id,
state=state.state,
set_by=set_by,
):
stamped = replace(state, set_by=set_by, set_at=_utcnow())
with _tracer().start_as_current_span("ttp.store.write_state"):
with _span("ttp.store.write_state"):
await self._upsert_state_row(rule_id, stamped)
with _tracer().start_as_current_span("ttp.rule.publish"):
with _span("ttp.rule.publish"):
await self._emit_change(
RuleChange("state", rule_id, stamped),
bus_topic=_topics.ttp_rule_state(rule_id),

View File

@@ -41,7 +41,8 @@ from __future__ import annotations
import asyncio
import re
import sys
from collections.abc import AsyncIterator
from collections.abc import AsyncIterator, Iterator
from contextlib import contextmanager
from dataclasses import replace
from datetime import datetime, timezone
from pathlib import Path
@@ -64,11 +65,30 @@ if TYPE_CHECKING:
_log = get_logger("ttp.store.filesystem")
def _tracer() -> Any:
@contextmanager
def _span(name: str, **attrs: Any) -> Iterator[Any]:
"""Span context manager gated on ``DECNET_DEVELOPER_TRACING``.
When tracing is off, yields ``None`` after a single attribute
lookup — matches the project's ``@traced`` / ``wrap_repository``
pattern of zero per-call overhead in the disabled case. When on,
opens an OTEL span via the (late-bound) tracer and applies
*attrs* defensively.
"""
if not _telemetry._ENABLED:
yield None
return
# Late binding: tests monkeypatch ``decnet.telemetry.get_tracer``
# at fixture setup; capturing the tracer at import time would freeze
# the no-op tracer into the module forever.
return _telemetry.get_tracer("ttp.store")
# at fixture setup; capturing the tracer at import time would
# freeze the no-op tracer into the module forever.
tracer = _telemetry.get_tracer("ttp.store")
with tracer.start_as_current_span(name) as span:
for key, value in attrs.items():
try:
span.set_attribute(key, value)
except (TypeError, ValueError):
continue
yield span
# ── Filename allowlist ──────────────────────────────────────────────
@@ -110,20 +130,6 @@ def _utcnow() -> datetime:
return datetime.now(tz=timezone.utc)
def _safe_set_attrs(span: Any, **attrs: Any) -> None:
"""Best-effort attribute setter on either real OTEL or no-op span."""
setter = getattr(span, "set_attribute", None)
if setter is None:
return
for key, value in attrs.items():
try:
setter(key, value)
except (TypeError, ValueError):
# OTEL rejects un-serializable types; not load-bearing for
# store correctness. Skip the attribute, keep the span.
continue
def _is_expired(state: RuleState, now: datetime) -> bool:
if state.expires_at is None:
return False
@@ -136,19 +142,36 @@ def _is_expired(state: RuleState, now: datetime) -> bool:
def _compile_one(parsed: RuleSchema, state: RuleState) -> CompiledRule:
"""Translate a validated :class:`RuleSchema` into a :class:`CompiledRule`.
Each ``emits`` entry contributes a 4-tuple
``(technique_id, sub_technique_id, tactic, confidence)`` —
consumed by :class:`RuleEngine` when fanning a single match into
one tag per technique. Missing tactic / confidence in the YAML is
a deploy-time error: a tag without a tactic can't render in the
Navigator export, and a missing confidence has no sane default.
The match spec is passed through verbatim — the engine owns
interpretation of operator keys (``pattern``, ``contains``, …); the
store only validates structural shape.
interpretation of operator keys (``pattern``, ``contains``, …).
"""
emits: list[tuple[str, str | None]] = []
emits: list[tuple[str, str | None, str, float]] = []
for entry in parsed.emits:
tid = entry.get("technique_id")
if not tid:
raise ValueError(
f"rule {parsed.rule_id}: every emits entry needs technique_id",
)
sub = entry.get("sub_technique_id") or None
emits.append((tid, sub))
sub_raw = entry.get("sub_technique_id")
sub = sub_raw if sub_raw else None
tactic = entry.get("tactic")
if not tactic:
raise ValueError(
f"rule {parsed.rule_id}: emit for {tid} needs a tactic",
)
confidence_raw = entry.get("confidence")
if confidence_raw is None:
raise ValueError(
f"rule {parsed.rule_id}: emit for {tid} needs a confidence",
)
confidence = float(confidence_raw)
emits.append((str(tid), sub, str(tactic), confidence))
return CompiledRule(
rule_id=parsed.rule_id,
rule_version=parsed.rule_version,
@@ -330,22 +353,17 @@ class FilesystemRuleStore(RuleStore):
# Operational state changes are NOT a tolerated-absence path.
# Failures here MUST raise rather than silently drop — the
# E.2.14b conformance test pins this.
with _tracer().start_as_current_span("ttp.rule.state.change") as span:
# Defensive set_attribute: real OTEL spans accept str/int/etc;
# the no-op tracer's _NoOpSpan ignores attributes silently. A
# caller-side wrapper keeps both paths green without leaking
# tracer-shape knowledge into the store.
_safe_set_attrs(
span,
rule_id=rule_id,
state=state.state,
set_by=set_by,
)
with _span(
"ttp.rule.state.change",
rule_id=rule_id,
state=state.state,
set_by=set_by,
):
stamped = replace(state, set_by=set_by, set_at=_utcnow())
with _tracer().start_as_current_span("ttp.store.write_state"):
with _span("ttp.store.write_state"):
self._state[rule_id] = stamped
self._restamp_compiled(rule_id, stamped)
with _tracer().start_as_current_span("ttp.rule.publish"):
with _span("ttp.rule.publish"):
await self._emit_change(
RuleChange("state", rule_id, stamped),
bus_topic=_topics.ttp_rule_state(rule_id),