feat(ttp): E.3.18c wire RuleEngine via RuleEngineTagger
The canonical rule-based engine from §"Tagging engines, layered §1"
of TTP_TAGGING.md was fully implemented but never instantiated as a
composite child — pure pattern rules (R0014/R0017/R0023/... 23 rules
total) had no tagger to dispatch them.
- Add `RuleEngineTagger(Tagger)` adapter in rule_engine.py wrapping
`RuleEngine.evaluate()`. `HANDLES = {command, http_request,
auth_attempt, payload}` — the source kinds whose rules typically
live outside any per-source lifter.
- Adapter's `watch_store()` filters via `_is_engine_owned` so the
engine's dispatch index excludes lifter-claimed rules
(`match.kind: lifter:*`) and stays disjoint from per-lifter ownership.
- Prepend `RuleEngineTagger` to the `CompositeTagger` lifter list so
generic pattern rules dispatch before per-source cross-event logic.
- Composes with E.3.18a (worker hydrates `watch_store`) and E.3.18b
(worker fans session payloads into per-`command` events) — together
these three commits make R0001–R0030 actually fire at runtime.
This commit is contained in:
@@ -142,9 +142,16 @@ def get_tagger() -> Tagger:
|
|||||||
from decnet.ttp.impl.email_lifter import EmailLifter
|
from decnet.ttp.impl.email_lifter import EmailLifter
|
||||||
from decnet.ttp.impl.identity_lifter import IdentityLifter
|
from decnet.ttp.impl.identity_lifter import IdentityLifter
|
||||||
from decnet.ttp.impl.intel_lifter import IntelLifter
|
from decnet.ttp.impl.intel_lifter import IntelLifter
|
||||||
|
from decnet.ttp.impl.rule_engine import RuleEngineTagger
|
||||||
from decnet.ttp.store.factory import get_rule_store
|
from decnet.ttp.store.factory import get_rule_store
|
||||||
store = get_rule_store()
|
store = get_rule_store()
|
||||||
|
# RuleEngineTagger first so generic pattern rules dispatch
|
||||||
|
# before the per-source lifters' cross-event logic. Order is
|
||||||
|
# observational — every tagger sees every event for its
|
||||||
|
# `HANDLES` set; tags from all of them aggregate into a single
|
||||||
|
# `ttp.tagged` envelope at the worker.
|
||||||
return CompositeTagger(lifters=[
|
return CompositeTagger(lifters=[
|
||||||
|
RuleEngineTagger(store),
|
||||||
BehavioralLifter(store),
|
BehavioralLifter(store),
|
||||||
IntelLifter(store),
|
IntelLifter(store),
|
||||||
CanaryFingerprintLifter(store),
|
CanaryFingerprintLifter(store),
|
||||||
|
|||||||
@@ -36,7 +36,7 @@ from pydantic import BaseModel, Field
|
|||||||
|
|
||||||
from decnet import telemetry as _telemetry
|
from decnet import telemetry as _telemetry
|
||||||
from decnet.logging import get_logger
|
from decnet.logging import get_logger
|
||||||
from decnet.ttp.base import TaggerEvent
|
from decnet.ttp.base import Tagger, TaggerEvent
|
||||||
from decnet.ttp.impl._rule_index import RuleIndex
|
from decnet.ttp.impl._rule_index import RuleIndex
|
||||||
from decnet.ttp.impl._state import apply_ceiling, is_active
|
from decnet.ttp.impl._state import apply_ceiling, is_active
|
||||||
from decnet.web.db.models.ttp import TTPTag, compute_tag_uuid
|
from decnet.web.db.models.ttp import TTPTag, compute_tag_uuid
|
||||||
@@ -344,8 +344,66 @@ def _evaluate_rules(
|
|||||||
return out
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def _is_engine_owned(rule: CompiledRule) -> bool:
|
||||||
|
"""Predicate: rule belongs to the generic RuleEngine, not a lifter.
|
||||||
|
|
||||||
|
Per-source lifters (Behavioral, Intel, …) tag their rules with
|
||||||
|
``match.kind: lifter:<name>_*``. The :class:`RuleEngineTagger`
|
||||||
|
claims everything else — pure ``pattern`` rules whose semantics
|
||||||
|
are "regex against a payload field" with no cross-event state.
|
||||||
|
"""
|
||||||
|
kind = rule.match_spec.get("kind", "")
|
||||||
|
if isinstance(kind, str) and kind.startswith("lifter:"):
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
class RuleEngineTagger(Tagger):
|
||||||
|
"""Tagger adapter that wires :class:`RuleEngine` into the composite.
|
||||||
|
|
||||||
|
The composite tagger fans events out to its children by
|
||||||
|
``HANDLES``; without this adapter the canonical rule-based engine
|
||||||
|
from §"Tagging engines, layered §1" of TTP_TAGGING.md never sees
|
||||||
|
any traffic. This class is intentionally thin — all dispatch and
|
||||||
|
hot-reload logic lives in :class:`RuleEngine` / :class:`RuleIndex`;
|
||||||
|
we only translate between the ``Tagger.tag`` ABC and
|
||||||
|
:meth:`RuleEngine.evaluate`, and route ``watch_store()`` through a
|
||||||
|
predicate that excludes lifter-owned rules so the engine's
|
||||||
|
dispatch index doesn't hold rules another tagger already claims.
|
||||||
|
|
||||||
|
``HANDLES`` enumerates the source kinds whose YAML rules typically
|
||||||
|
live outside any per-source lifter — shell command rules
|
||||||
|
(``command``), HTTP request pattern rules (``http_request``),
|
||||||
|
auth attempts handled by raw regex rather than the
|
||||||
|
:class:`CredentialLifter` cross-event counter, and generic
|
||||||
|
``payload`` matches. The composite uses this for routing; the
|
||||||
|
engine itself filters by ``applies_to`` from the YAML.
|
||||||
|
"""
|
||||||
|
|
||||||
|
name = "rule_engine"
|
||||||
|
HANDLES = frozenset({"command", "http_request", "auth_attempt", "payload"})
|
||||||
|
|
||||||
|
def __init__(self, store: "RuleStore") -> None:
|
||||||
|
self._engine = RuleEngine(store)
|
||||||
|
self._store = store
|
||||||
|
|
||||||
|
async def tag(self, event: TaggerEvent) -> list[TTPTag]:
|
||||||
|
return await self._engine.evaluate(event)
|
||||||
|
|
||||||
|
async def watch_store(self) -> None:
|
||||||
|
# Filter to engine-owned rules so the dispatch index stays
|
||||||
|
# disjoint from per-lifter ownership. Without the predicate
|
||||||
|
# the engine would carry every lifter's rules too — they would
|
||||||
|
# never match (no `pattern` operator), but they would inflate
|
||||||
|
# the index and confuse tooling.
|
||||||
|
await self._engine._index.watch(
|
||||||
|
self._store, predicate=_is_engine_owned,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"CompiledRule",
|
"CompiledRule",
|
||||||
"RuleEngine",
|
"RuleEngine",
|
||||||
|
"RuleEngineTagger",
|
||||||
"RuleSchema",
|
"RuleSchema",
|
||||||
]
|
]
|
||||||
|
|||||||
128
tests/ttp/test_rule_engine_tagger.py
Normal file
128
tests/ttp/test_rule_engine_tagger.py
Normal file
@@ -0,0 +1,128 @@
|
|||||||
|
"""E.3.18c — RuleEngineTagger wires RuleEngine into the composite.
|
||||||
|
|
||||||
|
Pins the wiring fix from ``development/TTP_TAGGING.md`` §"Tagging
|
||||||
|
engines, layered §1": the canonical rule-based engine must dispatch
|
||||||
|
through the :class:`CompositeTagger` like any other lifter. The
|
||||||
|
adapter is intentionally thin — it is only here so the composite's
|
||||||
|
fan-out reaches :class:`RuleEngine` and so the worker's per-watchable
|
||||||
|
fan-out (E.3.18a) hydrates the engine's index alongside the lifters'.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from decnet.ttp.base import Tagger, TaggerEvent, WatchableTagger
|
||||||
|
from decnet.ttp.factory import CompositeTagger, get_tagger
|
||||||
|
from decnet.ttp.impl.rule_engine import (
|
||||||
|
CompiledRule,
|
||||||
|
RuleEngineTagger,
|
||||||
|
_is_engine_owned,
|
||||||
|
)
|
||||||
|
|
||||||
|
from tests.ttp._stub_store import StubRuleStore
|
||||||
|
|
||||||
|
|
||||||
|
def _rule(
|
||||||
|
*,
|
||||||
|
rule_id: str = "R9001",
|
||||||
|
applies_to: frozenset[str] = frozenset({"command"}),
|
||||||
|
match_spec: dict[str, Any] | None = None,
|
||||||
|
) -> CompiledRule:
|
||||||
|
from decnet.ttp.store.base import RuleState # noqa: PLC0415
|
||||||
|
|
||||||
|
return CompiledRule(
|
||||||
|
rule_id=rule_id,
|
||||||
|
rule_version=1,
|
||||||
|
name="test",
|
||||||
|
applies_to=applies_to,
|
||||||
|
match_spec=match_spec or {"pattern": "whoami"},
|
||||||
|
emits=(("T1059", None, "TA0002", 0.9),),
|
||||||
|
evidence_fields=("command_text",),
|
||||||
|
state=RuleState(),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_rule_engine_tagger_handles_generic_source_kinds() -> None:
|
||||||
|
assert "command" in RuleEngineTagger.HANDLES
|
||||||
|
assert "http_request" in RuleEngineTagger.HANDLES
|
||||||
|
assert "auth_attempt" in RuleEngineTagger.HANDLES
|
||||||
|
assert "payload" in RuleEngineTagger.HANDLES
|
||||||
|
|
||||||
|
|
||||||
|
def test_rule_engine_tagger_is_a_tagger() -> None:
|
||||||
|
store = StubRuleStore()
|
||||||
|
tagger = RuleEngineTagger(store)
|
||||||
|
assert isinstance(tagger, Tagger)
|
||||||
|
|
||||||
|
|
||||||
|
def test_rule_engine_tagger_is_watchable() -> None:
|
||||||
|
"""Worker's `iter_watchables()` filters on this Protocol."""
|
||||||
|
store = StubRuleStore()
|
||||||
|
tagger = RuleEngineTagger(store)
|
||||||
|
assert isinstance(tagger, WatchableTagger)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_tag_proxies_to_engine_evaluate() -> None:
|
||||||
|
rule = _rule(match_spec={"field": "command_text", "pattern": r"\bwhoami\b"})
|
||||||
|
store = StubRuleStore(compiled=[rule])
|
||||||
|
tagger = RuleEngineTagger(store)
|
||||||
|
# Hydrate the engine's index (uses the predicate; pure pattern
|
||||||
|
# rule is engine-owned so it lands in the index).
|
||||||
|
await tagger._engine._index.hydrate_from(store, predicate=_is_engine_owned)
|
||||||
|
event = TaggerEvent(
|
||||||
|
source_kind="command",
|
||||||
|
source_id="cmd-1",
|
||||||
|
attacker_uuid="att-1",
|
||||||
|
identity_uuid=None,
|
||||||
|
session_id="sess-1",
|
||||||
|
decky_id=None,
|
||||||
|
payload={"command_text": "whoami"},
|
||||||
|
)
|
||||||
|
tags = await tagger.tag(event)
|
||||||
|
assert len(tags) == 1
|
||||||
|
assert tags[0].technique_id == "T1059"
|
||||||
|
assert tags[0].rule_id == "R9001"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_engine_predicate_excludes_lifter_owned_rules() -> None:
|
||||||
|
"""Lifter-owned rules don't pollute the engine's dispatch index."""
|
||||||
|
engine_rule = _rule(rule_id="R9100", match_spec={"pattern": "x"})
|
||||||
|
lifter_rule = _rule(
|
||||||
|
rule_id="R9101",
|
||||||
|
match_spec={"kind": "lifter:behavioral_beaconing"},
|
||||||
|
)
|
||||||
|
assert _is_engine_owned(engine_rule)
|
||||||
|
assert not _is_engine_owned(lifter_rule)
|
||||||
|
|
||||||
|
store = StubRuleStore(compiled=[engine_rule, lifter_rule])
|
||||||
|
tagger = RuleEngineTagger(store)
|
||||||
|
await tagger._engine._index.hydrate_from(store, predicate=_is_engine_owned)
|
||||||
|
by_rule = tagger._engine._by_rule
|
||||||
|
assert "R9100" in by_rule
|
||||||
|
assert "R9101" not in by_rule
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_tagger_includes_rule_engine_tagger_first(
|
||||||
|
monkeypatch: pytest.MonkeyPatch,
|
||||||
|
) -> None:
|
||||||
|
"""The canonical engine must be one of the composite's lifters."""
|
||||||
|
monkeypatch.setenv("DECNET_TTP_TAGGER_TYPE", "composite")
|
||||||
|
composite = get_tagger()
|
||||||
|
assert isinstance(composite, CompositeTagger)
|
||||||
|
names = [lifter.name for lifter in composite._lifters]
|
||||||
|
assert "rule_engine" in names
|
||||||
|
# Prepended so generic pattern rules dispatch before per-source
|
||||||
|
# lifters' cross-event logic.
|
||||||
|
assert names[0] == "rule_engine"
|
||||||
|
|
||||||
|
|
||||||
|
def test_rule_engine_tagger_is_in_iter_watchables() -> None:
|
||||||
|
store = StubRuleStore()
|
||||||
|
engine_tagger = RuleEngineTagger(store)
|
||||||
|
composite = CompositeTagger(lifters=[engine_tagger])
|
||||||
|
yielded = list(composite.iter_watchables())
|
||||||
|
assert engine_tagger in yielded
|
||||||
Reference in New Issue
Block a user