From ed3f340ea80607cbc062c557201e49ea0ddf62c9 Mon Sep 17 00:00:00 2001 From: anti Date: Fri, 1 May 2026 08:49:15 -0400 Subject: [PATCH] =?UTF-8?q?feat(ttp):=20E.3.7=20RuleEngine=20=E2=80=94=20e?= =?UTF-8?q?valuate=20+=20atomic-swap=20watch=5Fstore?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the rule engine body left empty at contract phase: evaluate() dispatches by source_kind through self._by_kind, runs the rule's match spec against event.payload, and emits one TTPTag per emits entry. watch_store() loads the initial corpus from RuleStore.load_compiled, then drains subscribe_changes, applying definition changes via single-statement dict assignment (atomic swap, GIL-atomic to readers) and state changes via NamedTuple._replace on the existing CompiledRule. Why: with the FS + DB stores in place (E.3.5/E.3.6), the engine is the last piece of the rule plane. Lifters (E.3.9–E.3.13) consume the engine; the worker bootstrap (E.3.14) wires watch_store into the asyncio event loop. After this commit a CompositeTagger constructed with a RuleEngine + a populated rules dir will produce real tags. Notes: - CompiledRule.emits extended to 4-tuple (technique_id, sub_technique_id, tactic, confidence). Tactic + confidence ride per-emit so a single rule can carry multiple precision targets (the "one event maps to many techniques" property). Compile helpers in both backends extract them from the YAML emits dict; missing tactic or confidence is a deploy-time error. - v0 match operator is "pattern" (regex). The field defaults per source_kind (command_text / raw_url / subject / verdict / …) and is overridable via match.field. Future ops (contains, equals, in_set) extend _match_event without touching the engine surface. - Confidence model: rules with state="clipped" + confidence_max set cap the per-emit confidence downward; clipped is a soft suppress, not a hard skip. Disabled rules are skipped wholly; expires_at past is re-checked at evaluate as defense-in-depth (the store auto-reverts, but a racing read between expiry and revert must not fire the rule). - _span(name, **attrs) helper in engine + both stores short-circuits on decnet.telemetry._ENABLED — matches the project's @traced / wrap_repository zero-overhead-when-disabled pattern instead of relying solely on the no-op tracer indirection. - Late-bound tracer (telemetry.get_tracer called per-span, not at module load) so test_tracing's monkeypatch reaches the production code path. xfails flipped: tests/ttp/test_rule_engine.py multi-emit fan-out + rule_version-collision-via-engine; tests/ttp/test_multi_mapping.py N×M engine fan-out + idempotent replay; tests/ttp/test_tracing.py ttp.eval span hierarchy + ttp.rule.fire span attributes. Tests: 214 passed, 19 xfailed (gated on E.3.8 lifters / rule pack / worker bootstrap). mypy: clean on prod code; pre-existing test-stub arg-type warnings unchanged. --- decnet/ttp/impl/rule_engine.py | 301 ++++++++++++++++++++++++++-- decnet/ttp/store/impl/database.py | 88 +++++--- decnet/ttp/store/impl/filesystem.py | 92 +++++---- development/TTP_TAGGING.md | 11 +- tests/ttp/store/test_conformance.py | 6 +- tests/ttp/store/test_database.py | 8 +- tests/ttp/store/test_filesystem.py | 6 +- tests/ttp/test_multi_mapping.py | 117 +++++++++-- tests/ttp/test_rule_engine.py | 89 +++++--- tests/ttp/test_tracing.py | 111 ++++++++-- 10 files changed, 679 insertions(+), 150 deletions(-) diff --git a/decnet/ttp/impl/rule_engine.py b/decnet/ttp/impl/rule_engine.py index 22266584..e552e96c 100644 --- a/decnet/ttp/impl/rule_engine.py +++ b/decnet/ttp/impl/rule_engine.py @@ -27,12 +27,17 @@ file is importable before that step lands. """ from __future__ import annotations +import re +from collections.abc import Iterator +from contextlib import contextmanager from typing import TYPE_CHECKING, Any, NamedTuple from pydantic import BaseModel, Field +from decnet import telemetry as _telemetry +from decnet.logging import get_logger from decnet.ttp.base import TaggerEvent -from decnet.web.db.models.ttp import TTPTag +from decnet.web.db.models.ttp import TTPTag, compute_tag_uuid if TYPE_CHECKING: # Store contracts ship in E.1.11. Forward-referenced under @@ -40,7 +45,42 @@ if TYPE_CHECKING: # without creating a circular shape dependency on a not-yet-shipped # subpackage. Concrete construction happens at the worker layer # (E.1.7) where both halves are in scope. - from decnet.ttp.store.base import RuleState, RuleStore + from decnet.ttp.store.base import RuleChange, RuleState, RuleStore + + +_log = get_logger("ttp.engine") + + +@contextmanager +def _span(name: str, **attrs: Any) -> Iterator[Any]: + """Span context manager gated on ``DECNET_DEVELOPER_TRACING``. + + Same shape as the helpers in :mod:`decnet.ttp.store.impl.filesystem` + / :mod:`decnet.ttp.store.impl.database`: zero per-call overhead when + tracing is off (single attribute lookup, then yield ``None``); + late-bound tracer when on so the test_tracing monkeypatch reaches + us. Modeled on the project's ``@traced`` / ``wrap_repository`` + no-overhead-when-disabled pattern. + """ + if not _telemetry._ENABLED: + yield None + return + tracer = _telemetry.get_tracer("ttp.engine") + with tracer.start_as_current_span(name) as span: + for key, value in attrs.items(): + try: + span.set_attribute(key, value) + except (TypeError, ValueError): + continue + yield span + + +# ATT&CK release stamped on every emitted tag. Pinned at the engine +# layer rather than per-rule because rule authors don't manage ATT&CK +# matrix drift; the engine owns that. Bumping this value invalidates +# tag UUID continuity across deploys, so the bump must land alongside a +# documented ATT&CK upgrade per TTP_TAGGING.md §"Hard parts §8". +_ATTACK_RELEASE: str = "v15.1" class CompiledRule(NamedTuple): @@ -71,9 +111,11 @@ class CompiledRule(NamedTuple): #: phase (E.3). Kept ``dict[str, Any]`` here rather than typed so #: rule authors can extend match operators without touching the ABC. match_spec: dict[str, Any] - #: ``((technique_id, sub_technique_id | None), ...)``. Tuple, not - #: list, so the record stays hashable. - emits: tuple[tuple[str, str | None], ...] + #: ``((technique_id, sub_technique_id | None, tactic, confidence), ...)`` + #: per emit. Tuple-of-tuples, not list, so the record stays hashable. + #: One YAML rule may emit N tags from a single match — see + #: TTP_TAGGING.md §"One event maps to many techniques". + emits: tuple[tuple[str, str | None, str, float], ...] #: Names of evidence keys the rule populates on emitted tags. evidence_fields: tuple[str, ...] #: Operational state stamped in by the store at compile time. @@ -95,7 +137,13 @@ class RuleSchema(BaseModel): name: str applies_to: list[str] match: dict[str, Any] - emits: list[dict[str, str]] + #: ``[{"tactic": "TA0007", "technique_id": "T1083", + #: "sub_technique_id": "T1083.001"?, "confidence": 0.85}, ...]`` + #: Per-emit tactic + confidence ride here so a single rule can carry + #: multiple precision targets (the "one event maps to many techniques" + #: case from TTP_TAGGING.md, including different confidences per + #: technique on the same match). + emits: list[dict[str, Any]] evidence_fields: list[str] = Field(default_factory=list) @@ -116,30 +164,245 @@ class RuleEngine: def __init__(self, store: "RuleStore") -> None: self._store = store # ``source_kind`` → list of compiled rules that claim it. - # Empty here; populated by :meth:`watch_store` once the store - # contract lands (E.1.11). + # Replaced wholesale on per-rule changes via watch_store(); the + # GIL-atomic dict assignment is the "atomic swap" pin from + # TTP_TAGGING.md §"Atomic swap" / E.2.14b. self._by_kind: dict[str, list[CompiledRule]] = {} + # Mirror keyed by rule_id for definition+state restamping. + # Same atomicity contract: replacement, never in-place mutation. + self._by_rule: dict[str, CompiledRule] = {} async def evaluate(self, event: TaggerEvent) -> list[TTPTag]: """Return zero or more tags produced by rules matching *event*. - Empty in the contract phase. The impl phase fans the event out - to ``self._by_kind[event.source_kind]`` and merges results. + Dispatches by ``event.source_kind``; for each rule whose + ``applies_to`` set covers the kind, runs the match spec against + ``event.payload`` and emits one :class:`TTPTag` per ``emits`` + entry. Confidence is the per-emit base scaled by the rule's + :class:`RuleState.confidence_max` ceiling (no-op when ``None``). + Disabled rules are skipped; the store auto-reverts expired + states, but the engine double-checks ``expires_at`` as + defense-in-depth. """ - return [] + rules = self._by_kind.get(event.source_kind, []) + if not rules: + return [] + with _span( + "ttp.eval", + attacker_uuid=str(event.attacker_uuid or ""), + identity_uuid=str(event.identity_uuid or ""), + source_kind=event.source_kind, + ): + return _evaluate_rules(rules, event) async def watch_store(self) -> None: """Subscribe to per-rule changes and atomically swap them in. - Reads from :meth:`RuleStore.subscribe_changes`. Each yielded - change is one rule_id; the engine recompiles that rule alone - and replaces the corresponding entries in the dispatch index - in a single assignment. Never returns under normal operation — - the worker cancels it during shutdown. - - Empty in the contract phase. + Loads the initial corpus from :meth:`RuleStore.load_compiled`, + builds the dispatch index, then drains + :meth:`RuleStore.subscribe_changes` forever. Each ``definition`` + change replaces the affected rule wholesale; each ``state`` + change re-stamps the existing :class:`CompiledRule`'s ``state`` + field via NamedTuple ``_replace`` (single dict assignment, no + in-place mutation). """ - return None + # Forward import — avoids the contract-phase circular shape + # dependency captured by the TYPE_CHECKING block above. + from decnet.ttp.store.base import RuleState # noqa: PLC0415 + + compiled = await self._store.load_compiled() + for rule in compiled: + self._install(rule) + async for change in self._store.subscribe_changes(): + try: + self._apply_change(change, RuleState) + except Exception: # noqa: BLE001 + _log.exception( + "ttp.engine: rule change apply failed rule_id=%s", + change.rule_id, + ) + + # ── Internals ─────────────────────────────────────────────────── + + def _install(self, rule: CompiledRule) -> None: + """Atomic-swap install of one compiled rule. + + Empty ``applies_to`` + empty ``emits`` is the deletion sentinel + used by both backends — drop the rule from the index instead of + adding a no-op entry. + """ + if not rule.applies_to and not rule.emits: + self._evict(rule.rule_id) + return + self._by_rule[rule.rule_id] = rule + for kind in rule.applies_to: + current = self._by_kind.get(kind, []) + replaced = [r for r in current if r.rule_id != rule.rule_id] + replaced.append(rule) + # Single dict assignment — GIL-atomic to readers. + self._by_kind[kind] = replaced + + def _evict(self, rule_id: str) -> None: + existing = self._by_rule.pop(rule_id, None) + if existing is None: + return + for kind in existing.applies_to: + current = self._by_kind.get(kind, []) + replaced = [r for r in current if r.rule_id != rule_id] + self._by_kind[kind] = replaced + + def _apply_change( + self, change: "RuleChange", state_cls: type, + ) -> None: + if change.change_kind == "definition": + value = change.new_value + if isinstance(value, CompiledRule): + self._install(value) + return + # state change + existing = self._by_rule.get(change.rule_id) + if existing is None or not isinstance(change.new_value, state_cls): + return + new_state = change.new_value # narrowed by isinstance above + # NamedTuple._replace returns a fresh frozen tuple — single + # dict assignment swaps it in atomically. + restamped = existing._replace(state=new_state) # type: ignore[arg-type] + self._by_rule[change.rule_id] = restamped + for kind in restamped.applies_to: + current = self._by_kind.get(kind, []) + replaced = [r for r in current if r.rule_id != change.rule_id] + replaced.append(restamped) + self._by_kind[kind] = replaced + + +def _state_active(state: "RuleState") -> bool: + """A rule fires iff its state isn't ``disabled``. ``clipped`` rules + still fire — the clip caps confidence, doesn't suppress. + """ + if state.state == "disabled": + return False + if state.expires_at is not None: + # Defense-in-depth: stores auto-revert expired states, but a + # racing read between expiry and revert must not fire a rule + # the operator told us was off. + from datetime import datetime, timezone # noqa: PLC0415 + + expires = state.expires_at + if expires.tzinfo is None: + expires = expires.replace(tzinfo=timezone.utc) + if expires < datetime.now(tz=timezone.utc): + return False + return True + + +def _match_event(rule: CompiledRule, event: TaggerEvent) -> bool: + """Run the rule's match spec against ``event.payload``. + + For v0 the only operator is ``pattern`` — a regex against a + payload field. The field name comes from ``match_spec["field"]`` + if present, otherwise the per-source-kind default + (``command_text`` for ``command``, ``raw_url`` for + ``http_request``, etc.). A future PR can extend this to + ``contains``, ``equals``, ``in_set`` without touching the engine + surface — only this function changes. + """ + spec = rule.match_spec + pattern = spec.get("pattern") + if pattern is None: + return False + field = spec.get("field") or _default_field(event.source_kind) + if field is None: + return False + haystack = event.payload.get(field) + if not isinstance(haystack, str): + return False + try: + return re.search(pattern, haystack) is not None + except re.error: + # Malformed regex made it past schema validation — log and + # don't fire. The deploy-time hook (load_compiled) catches + # most of these; this path is the runtime fallback. + _log.warning( + "ttp.engine: bad regex in rule %s: %r", rule.rule_id, pattern, + ) + return False + + +def _default_field(source_kind: str) -> str | None: + return _DEFAULT_MATCH_FIELD.get(source_kind) + + +# Per-source_kind default field for the ``pattern`` operator. New +# source_kinds can override via ``match.field`` in the YAML rule. +_DEFAULT_MATCH_FIELD: dict[str, str] = { + "command": "command_text", + "http_request": "raw_url", + "email": "subject", + "intel": "verdict", + "canary_fingerprint": "ua_signature", + "auth_attempt": "username", + "payload": "payload_text", +} + + +def _evaluate_rules( + rules: list[CompiledRule], event: TaggerEvent, +) -> list[TTPTag]: + out: list[TTPTag] = [] + for rule in rules: + if not _state_active(rule.state): + continue + if not _match_event(rule, event): + continue + ceiling = ( + rule.state.confidence_max + if rule.state.state == "clipped" + and rule.state.confidence_max is not None + else 1.0 + ) + with _span( + "ttp.rule.fire", + rule_id=rule.rule_id, + rule_version=rule.rule_version, + ) as span: + for technique_id, sub_technique_id, tactic, base_conf in rule.emits: + if span is not None: + try: + span.set_attribute("technique_id", technique_id) + except (TypeError, ValueError): + pass + confidence = min(base_conf, base_conf * ceiling) if ceiling < 1.0 else base_conf + tag_uuid = compute_tag_uuid( + source_kind=event.source_kind, + source_id=event.source_id, + rule_id=rule.rule_id, + rule_version=rule.rule_version, + technique_id=technique_id, + sub_technique_id=sub_technique_id, + ) + evidence: dict[str, Any] = { + field: event.payload.get(field) + for field in rule.evidence_fields + if field in event.payload + } + out.append(TTPTag( + uuid=tag_uuid, + source_kind=event.source_kind, + source_id=event.source_id, + attacker_uuid=event.attacker_uuid, + identity_uuid=event.identity_uuid, + session_id=event.session_id, + decky_id=event.decky_id, + tactic=tactic, + technique_id=technique_id, + sub_technique_id=sub_technique_id, + confidence=confidence, + rule_id=rule.rule_id, + rule_version=rule.rule_version, + evidence=evidence, + attack_release=_ATTACK_RELEASE, + )) + return out __all__ = [ diff --git a/decnet/ttp/store/impl/database.py b/decnet/ttp/store/impl/database.py index a4d850f3..00efe88c 100644 --- a/decnet/ttp/store/impl/database.py +++ b/decnet/ttp/store/impl/database.py @@ -37,7 +37,8 @@ The master-side filesystem→DB sync helper is from __future__ import annotations import asyncio -from collections.abc import AsyncIterator +from collections.abc import AsyncIterator, Iterator +from contextlib import contextmanager from dataclasses import replace from datetime import datetime, timezone from typing import TYPE_CHECKING, Any, Final @@ -64,8 +65,26 @@ if TYPE_CHECKING: _log = get_logger("ttp.store.database") -def _tracer() -> Any: - return _telemetry.get_tracer("ttp.store") +@contextmanager +def _span(name: str, **attrs: Any) -> Iterator[Any]: + """Span context manager gated on ``DECNET_DEVELOPER_TRACING``. + + Mirrors the helper in :mod:`decnet.ttp.store.impl.filesystem`: zero + per-call overhead when tracing is off, late-bound tracer when on + (so ``test_tracing.py``'s monkeypatch of + :func:`decnet.telemetry.get_tracer` reaches us). + """ + if not _telemetry._ENABLED: + yield None + return + tracer = _telemetry.get_tracer("ttp.store") + with tracer.start_as_current_span(name) as span: + for key, value in attrs.items(): + try: + span.set_attribute(key, value) + except (TypeError, ValueError): + continue + yield span def _utcnow() -> datetime: @@ -100,27 +119,35 @@ def _row_to_state(row: TTPRuleState) -> RuleState: ) -def _safe_set_attrs(span: Any, **attrs: Any) -> None: - setter = getattr(span, "set_attribute", None) - if setter is None: - return - for key, value in attrs.items(): - try: - setter(key, value) - except (TypeError, ValueError): - continue - - def _compile_one(parsed: RuleSchema, state: RuleState) -> CompiledRule: - emits: list[tuple[str, str | None]] = [] + """Mirror of :func:`decnet.ttp.store.impl.filesystem._compile_one`. + + Same 4-tuple emits shape so a rule round-trips identically through + either backend. Kept as a sibling rather than imported from the FS + module to avoid dragging the asyncinotify import onto non-Linux + hosts that only use the database backend. + """ + emits: list[tuple[str, str | None, str, float]] = [] for entry in parsed.emits: tid = entry.get("technique_id") if not tid: raise ValueError( f"rule {parsed.rule_id}: every emits entry needs technique_id", ) - sub = entry.get("sub_technique_id") or None - emits.append((tid, sub)) + sub_raw = entry.get("sub_technique_id") + sub = sub_raw if sub_raw else None + tactic = entry.get("tactic") + if not tactic: + raise ValueError( + f"rule {parsed.rule_id}: emit for {tid} needs a tactic", + ) + confidence_raw = entry.get("confidence") + if confidence_raw is None: + raise ValueError( + f"rule {parsed.rule_id}: emit for {tid} needs a confidence", + ) + confidence = float(confidence_raw) + emits.append((str(tid), sub, str(tactic), confidence)) return CompiledRule( rule_id=parsed.rule_id, rule_version=parsed.rule_version, @@ -146,9 +173,13 @@ def _yaml_to_compiled(yaml_text: str, state: RuleState) -> CompiledRule: def _compiled_to_yaml(compiled: CompiledRule) -> str: """Serialize a :class:`CompiledRule` back to a YAML rule body for master-side filesystem→DB sync. Mirrors :class:`RuleSchema`.""" - emits: list[dict[str, str]] = [] - for technique_id, sub in compiled.emits: - entry: dict[str, str] = {"technique_id": technique_id} + emits: list[dict[str, Any]] = [] + for technique_id, sub, tactic, confidence in compiled.emits: + entry: dict[str, Any] = { + "technique_id": technique_id, + "tactic": tactic, + "confidence": confidence, + } if sub: entry["sub_technique_id"] = sub emits.append(entry) @@ -275,17 +306,16 @@ class DatabaseRuleStore(RuleStore): state: RuleState, set_by: str, ) -> None: - with _tracer().start_as_current_span("ttp.rule.state.change") as span: - _safe_set_attrs( - span, - rule_id=rule_id, - state=state.state, - set_by=set_by, - ) + with _span( + "ttp.rule.state.change", + rule_id=rule_id, + state=state.state, + set_by=set_by, + ): stamped = replace(state, set_by=set_by, set_at=_utcnow()) - with _tracer().start_as_current_span("ttp.store.write_state"): + with _span("ttp.store.write_state"): await self._upsert_state_row(rule_id, stamped) - with _tracer().start_as_current_span("ttp.rule.publish"): + with _span("ttp.rule.publish"): await self._emit_change( RuleChange("state", rule_id, stamped), bus_topic=_topics.ttp_rule_state(rule_id), diff --git a/decnet/ttp/store/impl/filesystem.py b/decnet/ttp/store/impl/filesystem.py index 2c19721b..a34342cc 100644 --- a/decnet/ttp/store/impl/filesystem.py +++ b/decnet/ttp/store/impl/filesystem.py @@ -41,7 +41,8 @@ from __future__ import annotations import asyncio import re import sys -from collections.abc import AsyncIterator +from collections.abc import AsyncIterator, Iterator +from contextlib import contextmanager from dataclasses import replace from datetime import datetime, timezone from pathlib import Path @@ -64,11 +65,30 @@ if TYPE_CHECKING: _log = get_logger("ttp.store.filesystem") -def _tracer() -> Any: +@contextmanager +def _span(name: str, **attrs: Any) -> Iterator[Any]: + """Span context manager gated on ``DECNET_DEVELOPER_TRACING``. + + When tracing is off, yields ``None`` after a single attribute + lookup — matches the project's ``@traced`` / ``wrap_repository`` + pattern of zero per-call overhead in the disabled case. When on, + opens an OTEL span via the (late-bound) tracer and applies + *attrs* defensively. + """ + if not _telemetry._ENABLED: + yield None + return # Late binding: tests monkeypatch ``decnet.telemetry.get_tracer`` - # at fixture setup; capturing the tracer at import time would freeze - # the no-op tracer into the module forever. - return _telemetry.get_tracer("ttp.store") + # at fixture setup; capturing the tracer at import time would + # freeze the no-op tracer into the module forever. + tracer = _telemetry.get_tracer("ttp.store") + with tracer.start_as_current_span(name) as span: + for key, value in attrs.items(): + try: + span.set_attribute(key, value) + except (TypeError, ValueError): + continue + yield span # ── Filename allowlist ────────────────────────────────────────────── @@ -110,20 +130,6 @@ def _utcnow() -> datetime: return datetime.now(tz=timezone.utc) -def _safe_set_attrs(span: Any, **attrs: Any) -> None: - """Best-effort attribute setter on either real OTEL or no-op span.""" - setter = getattr(span, "set_attribute", None) - if setter is None: - return - for key, value in attrs.items(): - try: - setter(key, value) - except (TypeError, ValueError): - # OTEL rejects un-serializable types; not load-bearing for - # store correctness. Skip the attribute, keep the span. - continue - - def _is_expired(state: RuleState, now: datetime) -> bool: if state.expires_at is None: return False @@ -136,19 +142,36 @@ def _is_expired(state: RuleState, now: datetime) -> bool: def _compile_one(parsed: RuleSchema, state: RuleState) -> CompiledRule: """Translate a validated :class:`RuleSchema` into a :class:`CompiledRule`. + Each ``emits`` entry contributes a 4-tuple + ``(technique_id, sub_technique_id, tactic, confidence)`` — + consumed by :class:`RuleEngine` when fanning a single match into + one tag per technique. Missing tactic / confidence in the YAML is + a deploy-time error: a tag without a tactic can't render in the + Navigator export, and a missing confidence has no sane default. The match spec is passed through verbatim — the engine owns - interpretation of operator keys (``pattern``, ``contains``, …); the - store only validates structural shape. + interpretation of operator keys (``pattern``, ``contains``, …). """ - emits: list[tuple[str, str | None]] = [] + emits: list[tuple[str, str | None, str, float]] = [] for entry in parsed.emits: tid = entry.get("technique_id") if not tid: raise ValueError( f"rule {parsed.rule_id}: every emits entry needs technique_id", ) - sub = entry.get("sub_technique_id") or None - emits.append((tid, sub)) + sub_raw = entry.get("sub_technique_id") + sub = sub_raw if sub_raw else None + tactic = entry.get("tactic") + if not tactic: + raise ValueError( + f"rule {parsed.rule_id}: emit for {tid} needs a tactic", + ) + confidence_raw = entry.get("confidence") + if confidence_raw is None: + raise ValueError( + f"rule {parsed.rule_id}: emit for {tid} needs a confidence", + ) + confidence = float(confidence_raw) + emits.append((str(tid), sub, str(tactic), confidence)) return CompiledRule( rule_id=parsed.rule_id, rule_version=parsed.rule_version, @@ -330,22 +353,17 @@ class FilesystemRuleStore(RuleStore): # Operational state changes are NOT a tolerated-absence path. # Failures here MUST raise rather than silently drop — the # E.2.14b conformance test pins this. - with _tracer().start_as_current_span("ttp.rule.state.change") as span: - # Defensive set_attribute: real OTEL spans accept str/int/etc; - # the no-op tracer's _NoOpSpan ignores attributes silently. A - # caller-side wrapper keeps both paths green without leaking - # tracer-shape knowledge into the store. - _safe_set_attrs( - span, - rule_id=rule_id, - state=state.state, - set_by=set_by, - ) + with _span( + "ttp.rule.state.change", + rule_id=rule_id, + state=state.state, + set_by=set_by, + ): stamped = replace(state, set_by=set_by, set_at=_utcnow()) - with _tracer().start_as_current_span("ttp.store.write_state"): + with _span("ttp.store.write_state"): self._state[rule_id] = stamped self._restamp_compiled(rule_id, stamped) - with _tracer().start_as_current_span("ttp.rule.publish"): + with _span("ttp.rule.publish"): await self._emit_change( RuleChange("state", rule_id, stamped), bus_topic=_topics.ttp_rule_state(rule_id), diff --git a/development/TTP_TAGGING.md b/development/TTP_TAGGING.md index fed90c82..33126ab1 100644 --- a/development/TTP_TAGGING.md +++ b/development/TTP_TAGGING.md @@ -2954,7 +2954,16 @@ Order: 7. **RuleEngine** — implement engine consuming from `RuleStore`. Atomic per-rule swap on `RuleChange`. State applied after-parsing via `RuleState` join. `test_rule_engine.py` - green. + green. ✅ done. `CompiledRule.emits` extended to a 4-tuple + `(technique_id, sub_technique_id, tactic, confidence)` per emit; + the engine fans one match into N `TTPTag` rows. Match operator is + `pattern` (regex) for v0; per-source-kind default field + (`command_text` / `raw_url` / `subject` / …) overridable via + `match.field`. Disabled rules skipped; clipped state caps + confidence; `expires_at` re-checked at evaluate as + defense-in-depth. Tracing helper `_span(name, **attrs)` short- + circuits on `decnet.telemetry._ENABLED`, matching `@traced` / + `wrap_repository` zero-overhead-when-disabled pattern. 8. **Rule pack v0** — write the YAML files for `R0001`–`R0058` at `./rules/ttp/`. Each rule lands with its precision-target test per Appendix C in the same commit. The corpus for diff --git a/tests/ttp/store/test_conformance.py b/tests/ttp/store/test_conformance.py index 5d2d50af..5441fd61 100644 --- a/tests/ttp/store/test_conformance.py +++ b/tests/ttp/store/test_conformance.py @@ -51,7 +51,9 @@ applies_to: [command] match: pattern: 'hydra' emits: - - technique_id: T1110 + - tactic: TA0006 + technique_id: T1110 + confidence: 0.85 """ @@ -114,7 +116,7 @@ async def test_load_compiled_corpus_identical_across_backends( assert isinstance(c, CompiledRule) assert c.state == RuleState() assert c.applies_to == frozenset({"command"}) - assert c.emits == (("T1110", None),) + assert c.emits == (("T1110", None, "TA0006", 0.85),) async def test_set_state_isolates_rules(rule_store: RuleStore) -> None: diff --git a/tests/ttp/store/test_database.py b/tests/ttp/store/test_database.py index cbc4549f..a92af4e0 100644 --- a/tests/ttp/store/test_database.py +++ b/tests/ttp/store/test_database.py @@ -127,7 +127,7 @@ async def test_filesystem_to_db_sync_populates_ttp_rule( name="brute force ssh", applies_to=frozenset({"command"}), match_spec={"pattern": "hydra"}, - emits=(("T1110", None),), + emits=(("T1110", None, "TA0006", 0.85),), evidence_fields=("matched_tokens",), state=RuleState(), ) @@ -151,7 +151,7 @@ async def test_filesystem_to_db_sync_populates_ttp_rule( loaded = await db_store.load_compiled() assert len(loaded) == 1 assert loaded[0].rule_id == "R0001" - assert loaded[0].emits == (("T1110", None),) + assert loaded[0].emits == (("T1110", None, "TA0006", 0.85),) @pytest.mark.skipif( @@ -186,7 +186,9 @@ applies_to: [command] match: pattern: 'whoami' emits: - - technique_id: T1033 + - tactic: TA0007 + technique_id: T1033 + confidence: 0.85 """, encoding="utf-8", ) diff --git a/tests/ttp/store/test_filesystem.py b/tests/ttp/store/test_filesystem.py index 4fd8deb4..8ba57708 100644 --- a/tests/ttp/store/test_filesystem.py +++ b/tests/ttp/store/test_filesystem.py @@ -48,7 +48,9 @@ applies_to: [command] match: pattern: 'hydra' emits: - - technique_id: T1110 + - tactic: TA0006 + technique_id: T1110 + confidence: 0.85 evidence_fields: [matched_tokens] """ @@ -190,7 +192,7 @@ def test_compiled_rule_is_frozen() -> None: name="test", applies_to=frozenset({"attacker_command"}), match_spec={}, - emits=(("T1110", None),), + emits=(("T1110", None, "TA0006", 0.85),), evidence_fields=(), state=RuleState(), ) diff --git a/tests/ttp/test_multi_mapping.py b/tests/ttp/test_multi_mapping.py index f250c40e..0f5c06ed 100644 --- a/tests/ttp/test_multi_mapping.py +++ b/tests/ttp/test_multi_mapping.py @@ -152,34 +152,111 @@ def test_uuid_is_deterministic_replay_safe( # ── Engine fan-out (xfail until E.3.7) ────────────────────────────── -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.7 — RuleEngine.evaluate() empty body returns " - "[]; engine-level fan-out lands with the engine impl", -) def test_engine_emits_n_times_m_rows() -> None: """End-to-end: a synthetic event matched by 3 rules each emitting 2 techniques produces 6 tag rows from ``RuleEngine.evaluate()``. - - Today the engine returns ``[]`` so this assertion xfails. Flips - to GREEN at E.3.7 when the engine's dispatch + match + emit logic - lands. """ - pytest.fail("RuleEngine.evaluate() fan-out not yet implemented") + import asyncio + + from decnet.ttp.base import TaggerEvent + from decnet.ttp.impl.rule_engine import CompiledRule, RuleEngine + from decnet.ttp.store.base import RuleState + + class _Stub: + async def load_compiled(self): # pragma: no cover + return [] + + async def get_state(self, _): # pragma: no cover + return RuleState() + + async def set_state(self, *_a, **_kw): # pragma: no cover + return None + + def subscribe_changes(self): # pragma: no cover + async def _g(): + if False: + yield None + return _g() + + rules = [ + CompiledRule( + rule_id=f"R000{i}", + rule_version=1, + name=f"r{i}", + applies_to=frozenset({"command"}), + match_spec={"pattern": "hydra"}, + emits=( + (f"T{1000 + 2 * i}", None, "TA0006", 0.85), + (f"T{1001 + 2 * i}", None, "TA0006", 0.80), + ), + evidence_fields=(), + state=RuleState(), + ) + for i in range(3) + ] + eng = RuleEngine(store=_Stub()) + eng._by_kind = {"command": rules} + event = TaggerEvent( + source_kind="command", + source_id="src1", + attacker_uuid="att1", + identity_uuid=None, + session_id=None, + decky_id=None, + payload={"command_text": "hydra -l root ssh://1.2.3.4"}, + ) + out = asyncio.run(eng.evaluate(event)) + assert len(out) == 6 -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.7 — re-running evaluate() on the same event " - "must produce zero NEW rows (idempotent UUID at engine level)", -) def test_engine_replay_produces_no_new_rows() -> None: """Idempotency at the engine level: ``evaluate(e)`` followed by ``evaluate(e)`` again yields tag rows with identical UUIDs, so the downstream ``insert_tags`` no-ops the second batch. - - Pure ``compute_tag_uuid`` determinism is already covered by - :func:`test_uuid_is_deterministic_replay_safe`; this test pins - the engine wiring around it. """ - pytest.fail("RuleEngine replay-safety wiring not yet implemented") + import asyncio + + from decnet.ttp.base import TaggerEvent + from decnet.ttp.impl.rule_engine import CompiledRule, RuleEngine + from decnet.ttp.store.base import RuleState + + class _Stub: + async def load_compiled(self): # pragma: no cover + return [] + + async def get_state(self, _): # pragma: no cover + return RuleState() + + async def set_state(self, *_a, **_kw): # pragma: no cover + return None + + def subscribe_changes(self): # pragma: no cover + async def _g(): + if False: + yield None + return _g() + + rule = CompiledRule( + rule_id="R0001", + rule_version=1, + name="r", + applies_to=frozenset({"command"}), + match_spec={"pattern": "hydra"}, + emits=(("T1110", None, "TA0006", 0.85),), + evidence_fields=(), + state=RuleState(), + ) + eng = RuleEngine(store=_Stub()) + eng._by_kind = {"command": [rule]} + event = TaggerEvent( + source_kind="command", + source_id="src1", + attacker_uuid="att1", + identity_uuid=None, + session_id=None, + decky_id=None, + payload={"command_text": "hydra -l root ssh://1.2.3.4"}, + ) + out1 = asyncio.run(eng.evaluate(event)) + out2 = asyncio.run(eng.evaluate(event)) + assert {t.uuid for t in out1} == {t.uuid for t in out2} diff --git a/tests/ttp/test_rule_engine.py b/tests/ttp/test_rule_engine.py index faaab0d2..6dd121f8 100644 --- a/tests/ttp/test_rule_engine.py +++ b/tests/ttp/test_rule_engine.py @@ -44,24 +44,52 @@ def _ev() -> TaggerEvent: class _StubStore: - """Minimal duck-typed RuleStore for contract-phase construction.""" + """Minimal duck-typed RuleStore for engine construction in tests. + + Provides the subset of the ABC the engine touches at construction + time. Tests that drive ``evaluate()`` populate ``eng._by_kind`` + directly rather than going through ``watch_store()``; the + ``load_compiled`` / ``subscribe_changes`` stubs are only here so a + test that DOES want to drive the watch loop can opt in. + """ + + async def load_compiled(self) -> list[CompiledRule]: # pragma: no cover + return [] + + async def get_state(self, _rule_id: str): # pragma: no cover + from decnet.ttp.store.base import RuleState + return RuleState() + + async def set_state(self, *_a: Any, **_kw: Any) -> None: # pragma: no cover + return None + + def subscribe_changes(self): # pragma: no cover + async def _gen(): + if False: + yield None + return _gen() def _make_compiled_rule( *, rule_id: str = "R0001", rule_version: int = 1, - emits: tuple[tuple[str, str | None], ...] = (("T1110", None),), + emits: tuple[tuple[str, str | None, str, float], ...] = ( + ("T1110", None, "TA0006", 0.85), + ), + match_spec: dict[str, Any] | None = None, ) -> CompiledRule: + from decnet.ttp.store.base import RuleState # noqa: PLC0415 + return CompiledRule( rule_id=rule_id, rule_version=rule_version, name="test rule", applies_to=frozenset({"command"}), - match_spec={"contains": "hydra"}, + match_spec=match_spec or {"pattern": "hydra"}, emits=emits, evidence_fields=("matched_tokens",), - state=object(), # RuleState lands in E.1.11; opaque here + state=RuleState(), ) @@ -84,15 +112,17 @@ def test_compiled_rule_is_immutable() -> None: # NamedTuple gives us field-level immutability — the atomic-swap # property (E.2.14b) requires that a rule in the dispatch index # cannot be mutated in place; replacement is the only legal edit. + from decnet.ttp.store.base import RuleState # noqa: PLC0415 + cr = CompiledRule( rule_id="R0001", rule_version=1, name="brute", applies_to=frozenset({"command"}), match_spec={}, - emits=(("T1110", None),), + emits=(("T1110", None, "TA0006", 0.85),), evidence_fields=("matched_tokens",), - state=object(), + state=RuleState(), ) with pytest.raises(AttributeError): cr.rule_id = "R9999" # type: ignore[misc] @@ -109,15 +139,28 @@ def test_rule_engine_init_signature_takes_store() -> None: assert list(sig.parameters)[1] == "store" -def test_evaluate_returns_empty_list_in_contract_phase() -> None: - eng = RuleEngine(store=_StubStore()) +def test_evaluate_returns_empty_list_for_unknown_source_kind() -> None: + eng = RuleEngine(store=_StubStore()) out = asyncio.run(eng.evaluate(_ev())) assert out == [] -def test_watch_store_returns_none_and_does_not_raise() -> None: - eng = RuleEngine(store=_StubStore()) - assert asyncio.run(eng.watch_store()) is None +def test_watch_store_drains_and_can_be_cancelled() -> None: + """``watch_store()`` blocks on ``subscribe_changes`` after loading + the empty corpus. Test that it can be cancelled cleanly — the + worker bootstrap (E.3.14) cancels it during shutdown.""" + eng = RuleEngine(store=_StubStore()) + + async def _drive() -> None: + task = asyncio.create_task(eng.watch_store()) + await asyncio.sleep(0.05) + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + asyncio.run(_drive()) def test_rule_schema_has_documented_fields() -> None: @@ -208,21 +251,22 @@ def test_e25_evaluate_unknown_source_kind_returns_empty() -> None: assert asyncio.run(eng.evaluate(weird)) == [] -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.5: evaluate() does not yet fan out emits", -) def test_e25_one_rule_multiple_emits_produces_multiple_tags() -> None: """One matching rule with N entries in ``emits`` must produce N tag rows from a single event. The "one event maps to many techniques" property enforced at engine level.""" - eng = RuleEngine(store=_StubStore()) + eng = RuleEngine(store=_StubStore()) rule = _make_compiled_rule( rule_id="R_MULTI", - emits=(("T1110", None), ("T1078", None), ("T1059", "001")), + emits=( + ("T1110", None, "TA0006", 0.85), + ("T1078", None, "TA0001", 0.80), + ("T1059", "001", "TA0002", 0.90), + ), ) eng._by_kind = {"command": [rule]} - out = asyncio.run(eng.evaluate(_ev())) + event = _ev()._replace(payload={"command_text": "hydra -l root ssh://1.2.3.4"}) + out = asyncio.run(eng.evaluate(event)) assert len(out) == 3 techs = {(t.technique_id, t.sub_technique_id) for t in out} assert techs == {("T1110", None), ("T1078", None), ("T1059", "001")} @@ -253,19 +297,16 @@ def test_e25_rule_version_collision_yields_distinct_tag_uuids() -> None: assert u_v1 != u_v2 -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.5: evaluate() does not yet emit tags", -) def test_e25_rule_version_collision_via_engine_yields_distinct_tag_uuids() -> None: """Same property as above, but driven through the engine: two CompiledRule instances differing only in rule_version produce two rows whose ``uuid`` columns differ.""" - eng = RuleEngine(store=_StubStore()) + eng = RuleEngine(store=_StubStore()) r1 = _make_compiled_rule(rule_id="R_VER", rule_version=1) r2 = _make_compiled_rule(rule_id="R_VER", rule_version=2) eng._by_kind = {"command": [r1, r2]} - out = asyncio.run(eng.evaluate(_ev())) + event = _ev()._replace(payload={"command_text": "hydra -l root ssh://1.2.3.4"}) + out = asyncio.run(eng.evaluate(event)) assert len(out) == 2 uuids = {t.uuid for t in out} assert len(uuids) == 2 diff --git a/tests/ttp/test_tracing.py b/tests/ttp/test_tracing.py index 231445b8..46e97236 100644 --- a/tests/ttp/test_tracing.py +++ b/tests/ttp/test_tracing.py @@ -121,15 +121,58 @@ def span_exporter( # ── Eval span hierarchy (xfail until E.3.7) ───────────────────────── -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.7 — RuleEngine.evaluate() emits no spans " - "today; ttp.eval span lands with the engine impl", -) -def test_eval_emits_top_level_span(span_exporter: tuple[InMemorySpanExporter, TracerProvider]) -> None: +def test_eval_emits_top_level_span( + span_exporter: tuple[InMemorySpanExporter, TracerProvider], +) -> None: """``evaluate()`` produces a ``ttp.eval`` span with ``attacker_uuid`` and ``identity_uuid`` attributes.""" - pytest.fail("ttp.eval span not yet emitted") + import asyncio + + from decnet.ttp.base import TaggerEvent + from decnet.ttp.impl.rule_engine import CompiledRule, RuleEngine + from decnet.ttp.store.base import RuleState + + class _Stub: + async def load_compiled(self): # pragma: no cover + return [] + + async def get_state(self, _): # pragma: no cover + return RuleState() + + async def set_state(self, *_a, **_kw): # pragma: no cover + return None + + def subscribe_changes(self): # pragma: no cover + async def _g(): + if False: + yield None + return _g() + + exporter, _ = span_exporter + rule = CompiledRule( + rule_id="R0001", + rule_version=1, + name="r", + applies_to=frozenset({"command"}), + match_spec={"pattern": "hydra"}, + emits=(("T1110", None, "TA0006", 0.85),), + evidence_fields=(), + state=RuleState(), + ) + eng = RuleEngine(store=_Stub()) + eng._by_kind = {"command": [rule]} + event = TaggerEvent( + source_kind="command", source_id="src1", + attacker_uuid="ATT_X", identity_uuid="IDY_Y", + session_id=None, decky_id=None, + payload={"command_text": "hydra"}, + ) + asyncio.run(eng.evaluate(event)) + eval_spans = [s for s in exporter.get_finished_spans() if s.name == "ttp.eval"] + assert eval_spans + attrs = dict(eval_spans[0].attributes or {}) + assert attrs.get("attacker_uuid") == "ATT_X" + assert attrs.get("identity_uuid") == "IDY_Y" @pytest.mark.xfail( @@ -143,17 +186,59 @@ def test_lifter_child_spans_emitted(span_exporter: tuple[InMemorySpanExporter, T pytest.fail("per-lifter spans not yet emitted") -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.7 — ttp.rule.fire spans with rule_id + " - "technique_id land with the engine impl", -) def test_rule_fire_spans_carry_rule_and_technique_attrs( span_exporter: tuple[InMemorySpanExporter, TracerProvider], ) -> None: """Each matched rule produces a ``ttp.rule.fire`` span with ``rule_id`` and ``technique_id`` attributes set.""" - pytest.fail("ttp.rule.fire spans not yet emitted") + import asyncio + + from decnet.ttp.base import TaggerEvent + from decnet.ttp.impl.rule_engine import CompiledRule, RuleEngine + from decnet.ttp.store.base import RuleState + + class _Stub: + async def load_compiled(self): # pragma: no cover + return [] + + async def get_state(self, _): # pragma: no cover + return RuleState() + + async def set_state(self, *_a, **_kw): # pragma: no cover + return None + + def subscribe_changes(self): # pragma: no cover + async def _g(): + if False: + yield None + return _g() + + exporter, _ = span_exporter + rule = CompiledRule( + rule_id="R_FIRE", + rule_version=1, + name="r", + applies_to=frozenset({"command"}), + match_spec={"pattern": "hydra"}, + emits=(("T1110", None, "TA0006", 0.85),), + evidence_fields=(), + state=RuleState(), + ) + eng = RuleEngine(store=_Stub()) + eng._by_kind = {"command": [rule]} + asyncio.run(eng.evaluate(TaggerEvent( + source_kind="command", source_id="s", + attacker_uuid="a", identity_uuid=None, + session_id=None, decky_id=None, + payload={"command_text": "hydra"}, + ))) + fire_spans = [ + s for s in exporter.get_finished_spans() if s.name == "ttp.rule.fire" + ] + assert fire_spans + attrs = dict(fire_spans[0].attributes or {}) + assert attrs.get("rule_id") == "R_FIRE" + assert attrs.get("technique_id") == "T1110" # ── set_state span hierarchy (xfail until E.3.5/E.3.6) ──────────────