diff --git a/decnet/ttp/factory.py b/decnet/ttp/factory.py index 9e5513a9..53b07506 100644 --- a/decnet/ttp/factory.py +++ b/decnet/ttp/factory.py @@ -142,9 +142,16 @@ def get_tagger() -> Tagger: from decnet.ttp.impl.email_lifter import EmailLifter from decnet.ttp.impl.identity_lifter import IdentityLifter from decnet.ttp.impl.intel_lifter import IntelLifter + from decnet.ttp.impl.rule_engine import RuleEngineTagger from decnet.ttp.store.factory import get_rule_store store = get_rule_store() + # RuleEngineTagger first so generic pattern rules dispatch + # before the per-source lifters' cross-event logic. Order is + # observational — every tagger sees every event for its + # `HANDLES` set; tags from all of them aggregate into a single + # `ttp.tagged` envelope at the worker. return CompositeTagger(lifters=[ + RuleEngineTagger(store), BehavioralLifter(store), IntelLifter(store), CanaryFingerprintLifter(store), diff --git a/decnet/ttp/impl/rule_engine.py b/decnet/ttp/impl/rule_engine.py index a5e5708e..4c047315 100644 --- a/decnet/ttp/impl/rule_engine.py +++ b/decnet/ttp/impl/rule_engine.py @@ -36,7 +36,7 @@ from pydantic import BaseModel, Field from decnet import telemetry as _telemetry from decnet.logging import get_logger -from decnet.ttp.base import TaggerEvent +from decnet.ttp.base import Tagger, TaggerEvent from decnet.ttp.impl._rule_index import RuleIndex from decnet.ttp.impl._state import apply_ceiling, is_active from decnet.web.db.models.ttp import TTPTag, compute_tag_uuid @@ -344,8 +344,66 @@ def _evaluate_rules( return out +def _is_engine_owned(rule: CompiledRule) -> bool: + """Predicate: rule belongs to the generic RuleEngine, not a lifter. + + Per-source lifters (Behavioral, Intel, …) tag their rules with + ``match.kind: lifter:_*``. The :class:`RuleEngineTagger` + claims everything else — pure ``pattern`` rules whose semantics + are "regex against a payload field" with no cross-event state. + """ + kind = rule.match_spec.get("kind", "") + if isinstance(kind, str) and kind.startswith("lifter:"): + return False + return True + + +class RuleEngineTagger(Tagger): + """Tagger adapter that wires :class:`RuleEngine` into the composite. + + The composite tagger fans events out to its children by + ``HANDLES``; without this adapter the canonical rule-based engine + from §"Tagging engines, layered §1" of TTP_TAGGING.md never sees + any traffic. This class is intentionally thin — all dispatch and + hot-reload logic lives in :class:`RuleEngine` / :class:`RuleIndex`; + we only translate between the ``Tagger.tag`` ABC and + :meth:`RuleEngine.evaluate`, and route ``watch_store()`` through a + predicate that excludes lifter-owned rules so the engine's + dispatch index doesn't hold rules another tagger already claims. + + ``HANDLES`` enumerates the source kinds whose YAML rules typically + live outside any per-source lifter — shell command rules + (``command``), HTTP request pattern rules (``http_request``), + auth attempts handled by raw regex rather than the + :class:`CredentialLifter` cross-event counter, and generic + ``payload`` matches. The composite uses this for routing; the + engine itself filters by ``applies_to`` from the YAML. + """ + + name = "rule_engine" + HANDLES = frozenset({"command", "http_request", "auth_attempt", "payload"}) + + def __init__(self, store: "RuleStore") -> None: + self._engine = RuleEngine(store) + self._store = store + + async def tag(self, event: TaggerEvent) -> list[TTPTag]: + return await self._engine.evaluate(event) + + async def watch_store(self) -> None: + # Filter to engine-owned rules so the dispatch index stays + # disjoint from per-lifter ownership. Without the predicate + # the engine would carry every lifter's rules too — they would + # never match (no `pattern` operator), but they would inflate + # the index and confuse tooling. + await self._engine._index.watch( + self._store, predicate=_is_engine_owned, + ) + + __all__ = [ "CompiledRule", "RuleEngine", + "RuleEngineTagger", "RuleSchema", ] diff --git a/tests/ttp/test_rule_engine_tagger.py b/tests/ttp/test_rule_engine_tagger.py new file mode 100644 index 00000000..82bb76df --- /dev/null +++ b/tests/ttp/test_rule_engine_tagger.py @@ -0,0 +1,128 @@ +"""E.3.18c — RuleEngineTagger wires RuleEngine into the composite. + +Pins the wiring fix from ``development/TTP_TAGGING.md`` §"Tagging +engines, layered §1": the canonical rule-based engine must dispatch +through the :class:`CompositeTagger` like any other lifter. The +adapter is intentionally thin — it is only here so the composite's +fan-out reaches :class:`RuleEngine` and so the worker's per-watchable +fan-out (E.3.18a) hydrates the engine's index alongside the lifters'. +""" +from __future__ import annotations + +from typing import Any + +import pytest + +from decnet.ttp.base import Tagger, TaggerEvent, WatchableTagger +from decnet.ttp.factory import CompositeTagger, get_tagger +from decnet.ttp.impl.rule_engine import ( + CompiledRule, + RuleEngineTagger, + _is_engine_owned, +) + +from tests.ttp._stub_store import StubRuleStore + + +def _rule( + *, + rule_id: str = "R9001", + applies_to: frozenset[str] = frozenset({"command"}), + match_spec: dict[str, Any] | None = None, +) -> CompiledRule: + from decnet.ttp.store.base import RuleState # noqa: PLC0415 + + return CompiledRule( + rule_id=rule_id, + rule_version=1, + name="test", + applies_to=applies_to, + match_spec=match_spec or {"pattern": "whoami"}, + emits=(("T1059", None, "TA0002", 0.9),), + evidence_fields=("command_text",), + state=RuleState(), + ) + + +def test_rule_engine_tagger_handles_generic_source_kinds() -> None: + assert "command" in RuleEngineTagger.HANDLES + assert "http_request" in RuleEngineTagger.HANDLES + assert "auth_attempt" in RuleEngineTagger.HANDLES + assert "payload" in RuleEngineTagger.HANDLES + + +def test_rule_engine_tagger_is_a_tagger() -> None: + store = StubRuleStore() + tagger = RuleEngineTagger(store) + assert isinstance(tagger, Tagger) + + +def test_rule_engine_tagger_is_watchable() -> None: + """Worker's `iter_watchables()` filters on this Protocol.""" + store = StubRuleStore() + tagger = RuleEngineTagger(store) + assert isinstance(tagger, WatchableTagger) + + +@pytest.mark.asyncio +async def test_tag_proxies_to_engine_evaluate() -> None: + rule = _rule(match_spec={"field": "command_text", "pattern": r"\bwhoami\b"}) + store = StubRuleStore(compiled=[rule]) + tagger = RuleEngineTagger(store) + # Hydrate the engine's index (uses the predicate; pure pattern + # rule is engine-owned so it lands in the index). + await tagger._engine._index.hydrate_from(store, predicate=_is_engine_owned) + event = TaggerEvent( + source_kind="command", + source_id="cmd-1", + attacker_uuid="att-1", + identity_uuid=None, + session_id="sess-1", + decky_id=None, + payload={"command_text": "whoami"}, + ) + tags = await tagger.tag(event) + assert len(tags) == 1 + assert tags[0].technique_id == "T1059" + assert tags[0].rule_id == "R9001" + + +@pytest.mark.asyncio +async def test_engine_predicate_excludes_lifter_owned_rules() -> None: + """Lifter-owned rules don't pollute the engine's dispatch index.""" + engine_rule = _rule(rule_id="R9100", match_spec={"pattern": "x"}) + lifter_rule = _rule( + rule_id="R9101", + match_spec={"kind": "lifter:behavioral_beaconing"}, + ) + assert _is_engine_owned(engine_rule) + assert not _is_engine_owned(lifter_rule) + + store = StubRuleStore(compiled=[engine_rule, lifter_rule]) + tagger = RuleEngineTagger(store) + await tagger._engine._index.hydrate_from(store, predicate=_is_engine_owned) + by_rule = tagger._engine._by_rule + assert "R9100" in by_rule + assert "R9101" not in by_rule + + +def test_get_tagger_includes_rule_engine_tagger_first( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """The canonical engine must be one of the composite's lifters.""" + monkeypatch.setenv("DECNET_TTP_TAGGER_TYPE", "composite") + composite = get_tagger() + assert isinstance(composite, CompositeTagger) + names = [lifter.name for lifter in composite._lifters] + assert "rule_engine" in names + # Prepended so generic pattern rules dispatch before per-source + # lifters' cross-event logic. + assert names[0] == "rule_engine" + + +def test_rule_engine_tagger_is_in_iter_watchables() -> None: + store = StubRuleStore() + engine_tagger = RuleEngineTagger(store) + composite = CompositeTagger(lifters=[engine_tagger]) + yielded = list(composite.iter_watchables()) + assert engine_tagger in yielded