From e7531ee756338c249b8ba93adc82151c2a3dc041 Mon Sep 17 00:00:00 2001 From: anti Date: Fri, 1 May 2026 20:09:18 -0400 Subject: [PATCH] refactor(ttp): extract RuleIndex from RuleEngine E.3.9.0 prerequisite for the per-source lifters (E.3.9-E.3.13). The dispatch index, install/evict/apply_change atomic-swap protocol, and state-modulation helpers (is_active / apply_ceiling) move out of rule_engine.py into _rule_index.py and _state.py. RuleEngine wraps a RuleIndex; back-compat shims preserve _by_kind / _by_rule / _install attribute access for tests poking at the dispatch internals. Lifters in E.3.9-E.3.12 will each hold their own RuleIndex, watching the same RuleStore via subscribe_changes() fan-out. Hot-reload semantics (disable / clip / TTL via set_state API) now reach lifter-bound rules through the same atomic-swap path the engine uses, not a future composite-rebuild compromise. --- decnet/ttp/impl/_rule_index.py | 180 +++++++++++++++++++ decnet/ttp/impl/_state.py | 55 ++++++ decnet/ttp/impl/rule_engine.py | 141 +++++---------- tests/ttp/test_rule_index.py | 304 +++++++++++++++++++++++++++++++++ 4 files changed, 579 insertions(+), 101 deletions(-) create mode 100644 decnet/ttp/impl/_rule_index.py create mode 100644 decnet/ttp/impl/_state.py create mode 100644 tests/ttp/test_rule_index.py diff --git a/decnet/ttp/impl/_rule_index.py b/decnet/ttp/impl/_rule_index.py new file mode 100644 index 00000000..7dfbc473 --- /dev/null +++ b/decnet/ttp/impl/_rule_index.py @@ -0,0 +1,180 @@ +"""Hot-swappable rule registry shared by RuleEngine and per-source lifters. + +The dispatch index originally lived inline on +:class:`~decnet.ttp.impl.rule_engine.RuleEngine`. E.3.9 adds four +per-source lifters that need the same install / evict / state-restamp +atomic-swap protocol; pulling it into one helper keeps the contract +single-sourced. + +Atomicity invariant (TTP_TAGGING.md §"Atomic swap" / E.2.14b): a rule +sitting in the index must never be torn mid-evaluate. Mutations +replace dict entries with fresh lists / fresh +:class:`~decnet.ttp.impl.rule_engine.CompiledRule` tuples — never +in-place edits. Single dict assignments are GIL-atomic to readers. +""" +from __future__ import annotations + +from collections.abc import Callable, Iterable +from typing import TYPE_CHECKING + +from decnet.logging import get_logger + +if TYPE_CHECKING: + from decnet.ttp.impl.rule_engine import CompiledRule + from decnet.ttp.store.base import RuleChange, RuleStore + + +_log = get_logger("ttp.index") + + +class RuleIndex: + """Owns ``rule_id -> CompiledRule`` plus a ``source_kind -> [rules]`` index. + + Consumers: + + * :class:`RuleEngine` — uses :meth:`by_kind` to dispatch evaluate(). + * Per-source lifters (E.3.9–E.3.13) — use :meth:`get` and + :meth:`values` to consume rules they own (filtered via the + ``predicate`` passed to :meth:`watch`). + """ + + def __init__(self) -> None: + # source_kind -> list of compiled rules that claim it. + self._by_kind: dict[str, list["CompiledRule"]] = {} + # rule_id -> compiled rule (mirror; used for state restamp). + self._by_rule: dict[str, "CompiledRule"] = {} + + # ── Read API ──────────────────────────────────────────────────── + + def by_kind(self, source_kind: str) -> list["CompiledRule"]: + return self._by_kind.get(source_kind, []) + + def get(self, rule_id: str) -> "CompiledRule | None": + return self._by_rule.get(rule_id) + + def values(self) -> Iterable["CompiledRule"]: + return self._by_rule.values() + + # ── Mutation API (atomic-swap) ────────────────────────────────── + + def install(self, rule: "CompiledRule") -> None: + """Atomic-swap install of one compiled rule. + + Empty ``applies_to`` AND empty ``emits`` is the deletion sentinel + used by both store backends — drop the rule from the index + instead of registering a no-op entry. + """ + if not rule.applies_to and not rule.emits: + self.evict(rule.rule_id) + return + self._by_rule[rule.rule_id] = rule + for kind in rule.applies_to: + current = self._by_kind.get(kind, []) + replaced = [r for r in current if r.rule_id != rule.rule_id] + replaced.append(rule) + # Single dict assignment — GIL-atomic to readers. + self._by_kind[kind] = replaced + + def evict(self, rule_id: str) -> None: + existing = self._by_rule.pop(rule_id, None) + if existing is None: + return + for kind in existing.applies_to: + current = self._by_kind.get(kind, []) + replaced = [r for r in current if r.rule_id != rule_id] + self._by_kind[kind] = replaced + + def apply_change( + self, change: "RuleChange", state_cls: type + ) -> None: + """Apply one :class:`RuleChange` to the index. + + ``state_cls`` is :class:`RuleState`; passed in to avoid a + runtime-circular import — the store package imports from this + one transitively. + """ + from decnet.ttp.impl.rule_engine import CompiledRule # noqa: PLC0415 + + if change.change_kind == "definition": + value = change.new_value + if isinstance(value, CompiledRule): + self.install(value) + return + # state change + existing = self._by_rule.get(change.rule_id) + if existing is None or not isinstance(change.new_value, state_cls): + return + new_state = change.new_value + # NamedTuple._replace returns a fresh frozen tuple — single + # dict assignment swaps it in atomically. + restamped = existing._replace(state=new_state) # type: ignore[arg-type] + self._by_rule[change.rule_id] = restamped + for kind in restamped.applies_to: + current = self._by_kind.get(kind, []) + replaced = [r for r in current if r.rule_id != change.rule_id] + replaced.append(restamped) + self._by_kind[kind] = replaced + + # ── Lifecycle ─────────────────────────────────────────────────── + + async def hydrate_from( + self, + store: "RuleStore", + predicate: Callable[["CompiledRule"], bool] | None = None, + ) -> None: + """Load every compiled rule from *store* and install matching ones. + + ``predicate`` filters; engine omits it (installs everything), + lifters pass a ``match.kind`` prefix check. + """ + compiled = await store.load_compiled() + for rule in compiled: + if predicate is not None and not predicate(rule): + continue + self.install(rule) + + async def watch( + self, + store: "RuleStore", + predicate: Callable[["CompiledRule"], bool] | None = None, + ) -> None: + """Hydrate once + drain ``subscribe_changes`` forever. + + Cancellation-safe: an :class:`asyncio.CancelledError` from the + outer task propagates cleanly. Per-change application errors + log and continue — one bad rule edit must not stall the stream. + """ + from decnet.ttp.store.base import RuleState # noqa: PLC0415 + + await self.hydrate_from(store, predicate=predicate) + async for change in store.subscribe_changes(): + if predicate is not None: + # For state changes the value is a RuleState (no + # match_spec to inspect); always apply when the rule + # is already in the index, otherwise skip. + if change.change_kind == "state": + if change.rule_id not in self._by_rule: + continue + else: + value = change.new_value + # Definition changes carry a CompiledRule; skip + # ones the predicate doesn't claim. A previously- + # owned rule whose YAML moved out of our ownership + # gets evicted explicitly. + from decnet.ttp.impl.rule_engine import ( # noqa: PLC0415 + CompiledRule, + ) + if isinstance(value, CompiledRule) and not predicate(value): + if change.rule_id in self._by_rule: + self.evict(change.rule_id) + continue + try: + self.apply_change(change, RuleState) + except Exception: # noqa: BLE001 + _log.exception( + "ttp.index: rule change apply failed rule_id=%s", + change.rule_id, + ) + + +__all__ = ["RuleIndex"] diff --git a/decnet/ttp/impl/_state.py b/decnet/ttp/impl/_state.py new file mode 100644 index 00000000..556938e8 --- /dev/null +++ b/decnet/ttp/impl/_state.py @@ -0,0 +1,55 @@ +"""Shared state-modulation helpers for rule consumers. + +Both :class:`~decnet.ttp.impl.rule_engine.RuleEngine` and the per-source +lifters (E.3.9 onward) read :class:`~decnet.ttp.store.base.RuleState` +the same way: skip on ``disabled``, defense-in-depth re-check +``expires_at``, clamp confidence on ``clipped``. Single source of truth +so a future change to the state contract lands in one place. +""" +from __future__ import annotations + +from datetime import datetime, timezone +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from decnet.ttp.store.base import RuleState + + +def is_active(state: "RuleState") -> bool: + """Return ``True`` iff a rule with this state is allowed to fire. + + ``disabled`` rules never fire. ``clipped`` rules still fire — the + clip caps emitted confidence, doesn't suppress the emit. Expired + states act as ``disabled`` even though the store auto-reverts; the + re-check here is defense-in-depth against a racing read between + expiry and the store's revert pass. + """ + if state.state == "disabled": + return False + if state.expires_at is not None: + expires = state.expires_at + if expires.tzinfo is None: + expires = expires.replace(tzinfo=timezone.utc) + if expires < datetime.now(tz=timezone.utc): + return False + return True + + +def apply_ceiling(base: float, state: "RuleState") -> float: + """Apply the operator's confidence ceiling, downward only. + + A ``clipped`` state with ``confidence_max < 1.0`` clamps the emitted + confidence to ``min(base, base * ceiling)``. Any other state is a + no-op. The clamp is downward by construction — operator clips can + never raise a rule's confidence above its YAML-declared base, per + TTP_TAGGING.md §"Confidence model". + """ + if state.state != "clipped": + return base + ceiling = state.confidence_max + if ceiling is None or ceiling >= 1.0: + return base + return min(base, base * ceiling) + + +__all__ = ["is_active", "apply_ceiling"] diff --git a/decnet/ttp/impl/rule_engine.py b/decnet/ttp/impl/rule_engine.py index e552e96c..a5e5708e 100644 --- a/decnet/ttp/impl/rule_engine.py +++ b/decnet/ttp/impl/rule_engine.py @@ -37,6 +37,8 @@ from pydantic import BaseModel, Field from decnet import telemetry as _telemetry from decnet.logging import get_logger from decnet.ttp.base import TaggerEvent +from decnet.ttp.impl._rule_index import RuleIndex +from decnet.ttp.impl._state import apply_ceiling, is_active from decnet.web.db.models.ttp import TTPTag, compute_tag_uuid if TYPE_CHECKING: @@ -163,14 +165,27 @@ class RuleEngine: def __init__(self, store: "RuleStore") -> None: self._store = store - # ``source_kind`` → list of compiled rules that claim it. - # Replaced wholesale on per-rule changes via watch_store(); the - # GIL-atomic dict assignment is the "atomic swap" pin from - # TTP_TAGGING.md §"Atomic swap" / E.2.14b. - self._by_kind: dict[str, list[CompiledRule]] = {} - # Mirror keyed by rule_id for definition+state restamping. - # Same atomicity contract: replacement, never in-place mutation. - self._by_rule: dict[str, CompiledRule] = {} + # Dispatch index extracted to RuleIndex so per-source lifters + # (E.3.9–E.3.13) reuse the same atomic-swap protocol. Legacy + # ``_by_kind`` / ``_by_rule`` properties below proxy to it for + # callers (and tests) that still poke the dispatch index directly. + self._index = RuleIndex() + + @property + def _by_kind(self) -> dict[str, list[CompiledRule]]: + return self._index._by_kind + + @_by_kind.setter + def _by_kind(self, value: dict[str, list[CompiledRule]]) -> None: + self._index._by_kind = value + + @property + def _by_rule(self) -> dict[str, CompiledRule]: + return self._index._by_rule + + @_by_rule.setter + def _by_rule(self, value: dict[str, CompiledRule]) -> None: + self._index._by_rule = value async def evaluate(self, event: TaggerEvent) -> list[TTPTag]: """Return zero or more tags produced by rules matching *event*. @@ -184,7 +199,7 @@ class RuleEngine: states, but the engine double-checks ``expires_at`` as defense-in-depth. """ - rules = self._by_kind.get(event.source_kind, []) + rules = self._index.by_kind(event.source_kind) if not rules: return [] with _span( @@ -198,101 +213,31 @@ class RuleEngine: async def watch_store(self) -> None: """Subscribe to per-rule changes and atomically swap them in. - Loads the initial corpus from :meth:`RuleStore.load_compiled`, - builds the dispatch index, then drains - :meth:`RuleStore.subscribe_changes` forever. Each ``definition`` - change replaces the affected rule wholesale; each ``state`` - change re-stamps the existing :class:`CompiledRule`'s ``state`` - field via NamedTuple ``_replace`` (single dict assignment, no - in-place mutation). + Delegates to :meth:`RuleIndex.watch`: loads the initial corpus + from :meth:`RuleStore.load_compiled`, builds the dispatch + index, then drains :meth:`RuleStore.subscribe_changes` forever. + Each ``definition`` change replaces the affected rule wholesale; + each ``state`` change re-stamps the existing + :class:`CompiledRule`'s ``state`` field via NamedTuple + ``_replace`` (single dict assignment, no in-place mutation). """ - # Forward import — avoids the contract-phase circular shape - # dependency captured by the TYPE_CHECKING block above. - from decnet.ttp.store.base import RuleState # noqa: PLC0415 - - compiled = await self._store.load_compiled() - for rule in compiled: - self._install(rule) - async for change in self._store.subscribe_changes(): - try: - self._apply_change(change, RuleState) - except Exception: # noqa: BLE001 - _log.exception( - "ttp.engine: rule change apply failed rule_id=%s", - change.rule_id, - ) + await self._index.watch(self._store) # ── Internals ─────────────────────────────────────────────────── + # Back-compat shims — the dispatch-index protocol moved into + # :class:`RuleIndex`. Existing callers / tests that poke at + # ``_install`` / ``_evict`` / ``_apply_change`` keep working. def _install(self, rule: CompiledRule) -> None: - """Atomic-swap install of one compiled rule. - - Empty ``applies_to`` + empty ``emits`` is the deletion sentinel - used by both backends — drop the rule from the index instead of - adding a no-op entry. - """ - if not rule.applies_to and not rule.emits: - self._evict(rule.rule_id) - return - self._by_rule[rule.rule_id] = rule - for kind in rule.applies_to: - current = self._by_kind.get(kind, []) - replaced = [r for r in current if r.rule_id != rule.rule_id] - replaced.append(rule) - # Single dict assignment — GIL-atomic to readers. - self._by_kind[kind] = replaced + self._index.install(rule) def _evict(self, rule_id: str) -> None: - existing = self._by_rule.pop(rule_id, None) - if existing is None: - return - for kind in existing.applies_to: - current = self._by_kind.get(kind, []) - replaced = [r for r in current if r.rule_id != rule_id] - self._by_kind[kind] = replaced + self._index.evict(rule_id) def _apply_change( self, change: "RuleChange", state_cls: type, ) -> None: - if change.change_kind == "definition": - value = change.new_value - if isinstance(value, CompiledRule): - self._install(value) - return - # state change - existing = self._by_rule.get(change.rule_id) - if existing is None or not isinstance(change.new_value, state_cls): - return - new_state = change.new_value # narrowed by isinstance above - # NamedTuple._replace returns a fresh frozen tuple — single - # dict assignment swaps it in atomically. - restamped = existing._replace(state=new_state) # type: ignore[arg-type] - self._by_rule[change.rule_id] = restamped - for kind in restamped.applies_to: - current = self._by_kind.get(kind, []) - replaced = [r for r in current if r.rule_id != change.rule_id] - replaced.append(restamped) - self._by_kind[kind] = replaced - - -def _state_active(state: "RuleState") -> bool: - """A rule fires iff its state isn't ``disabled``. ``clipped`` rules - still fire — the clip caps confidence, doesn't suppress. - """ - if state.state == "disabled": - return False - if state.expires_at is not None: - # Defense-in-depth: stores auto-revert expired states, but a - # racing read between expiry and revert must not fire a rule - # the operator told us was off. - from datetime import datetime, timezone # noqa: PLC0415 - - expires = state.expires_at - if expires.tzinfo is None: - expires = expires.replace(tzinfo=timezone.utc) - if expires < datetime.now(tz=timezone.utc): - return False - return True + self._index.apply_change(change, state_cls) def _match_event(rule: CompiledRule, event: TaggerEvent) -> bool: @@ -350,16 +295,10 @@ def _evaluate_rules( ) -> list[TTPTag]: out: list[TTPTag] = [] for rule in rules: - if not _state_active(rule.state): + if not is_active(rule.state): continue if not _match_event(rule, event): continue - ceiling = ( - rule.state.confidence_max - if rule.state.state == "clipped" - and rule.state.confidence_max is not None - else 1.0 - ) with _span( "ttp.rule.fire", rule_id=rule.rule_id, @@ -371,7 +310,7 @@ def _evaluate_rules( span.set_attribute("technique_id", technique_id) except (TypeError, ValueError): pass - confidence = min(base_conf, base_conf * ceiling) if ceiling < 1.0 else base_conf + confidence = apply_ceiling(base_conf, rule.state) tag_uuid = compute_tag_uuid( source_kind=event.source_kind, source_id=event.source_id, diff --git a/tests/ttp/test_rule_index.py b/tests/ttp/test_rule_index.py new file mode 100644 index 00000000..71cdc4a6 --- /dev/null +++ b/tests/ttp/test_rule_index.py @@ -0,0 +1,304 @@ +"""Unit tests for :class:`decnet.ttp.impl._rule_index.RuleIndex`. + +The dispatch index was extracted from :class:`RuleEngine` so the four +per-source lifters (E.3.9–E.3.13) can reuse the install / evict / +state-restamp atomic-swap protocol. These tests pin the contract +independently of the engine — a future regression in the engine +shouldn't be the only signal that the shared helper broke. +""" +from __future__ import annotations + +import asyncio +from collections.abc import AsyncIterator +from datetime import datetime, timedelta, timezone +from typing import Any + +import pytest + +from decnet.ttp.impl._rule_index import RuleIndex +from decnet.ttp.impl.rule_engine import CompiledRule +from decnet.ttp.store.base import RuleChange, RuleState + + +def _rule( + rule_id: str = "R0001", + *, + rule_version: int = 1, + applies_to: frozenset[str] = frozenset({"command"}), + state: RuleState | None = None, + emits: tuple[tuple[str, str | None, str, float], ...] = ( + ("T1110", None, "TA0006", 0.85), + ), +) -> CompiledRule: + return CompiledRule( + rule_id=rule_id, + rule_version=rule_version, + name="test", + applies_to=applies_to, + match_spec={"pattern": "x"}, + emits=emits, + evidence_fields=(), + state=state if state is not None else RuleState(), + ) + + +def test_install_and_lookup() -> None: + idx = RuleIndex() + rule = _rule() + idx.install(rule) + assert idx.get("R0001") is rule + assert idx.by_kind("command") == [rule] + assert idx.by_kind("email") == [] + + +def test_install_replaces_same_rule_id() -> None: + idx = RuleIndex() + idx.install(_rule(rule_version=1)) + idx.install(_rule(rule_version=2)) + bucket = idx.by_kind("command") + assert len(bucket) == 1 + assert bucket[0].rule_version == 2 + + +def test_install_deletion_sentinel_evicts() -> None: + idx = RuleIndex() + idx.install(_rule()) + sentinel = CompiledRule( + rule_id="R0001", + rule_version=1, + name="", + applies_to=frozenset(), + match_spec={}, + emits=(), + evidence_fields=(), + state=RuleState(), + ) + idx.install(sentinel) + assert idx.get("R0001") is None + assert idx.by_kind("command") == [] + + +def test_evict_unknown_is_noop() -> None: + idx = RuleIndex() + idx.evict("R_NOPE") # must not raise + + +def test_apply_change_definition_installs() -> None: + idx = RuleIndex() + rule = _rule() + idx.apply_change( + RuleChange(change_kind="definition", rule_id="R0001", new_value=rule), + RuleState, + ) + assert idx.get("R0001") is rule + + +def test_apply_change_state_restamps_atomically() -> None: + idx = RuleIndex() + idx.install(_rule()) + new_state = RuleState(state="clipped", confidence_max=0.5) + idx.apply_change( + RuleChange(change_kind="state", rule_id="R0001", new_value=new_state), + RuleState, + ) + restamped = idx.get("R0001") + assert restamped is not None + assert restamped.state == new_state + bucket = idx.by_kind("command") + assert len(bucket) == 1 + assert bucket[0].state.confidence_max == 0.5 + + +def test_apply_state_change_for_unknown_rule_is_noop() -> None: + idx = RuleIndex() + idx.apply_change( + RuleChange( + change_kind="state", + rule_id="R_GHOST", + new_value=RuleState(state="disabled"), + ), + RuleState, + ) + assert idx.get("R_GHOST") is None + + +# ── Hydrate / watch via stub store ────────────────────────────────── + + +class _StubStore: + def __init__( + self, + compiled: list[CompiledRule] | None = None, + changes: list[RuleChange] | None = None, + ) -> None: + self._compiled = compiled or [] + self._changes = changes or [] + + async def load_compiled(self) -> list[CompiledRule]: + return list(self._compiled) + + async def get_state(self, _rule_id: str) -> RuleState: + return RuleState() + + async def set_state(self, *_a: Any, **_kw: Any) -> None: + return None + + def subscribe_changes(self) -> AsyncIterator[RuleChange]: + changes = list(self._changes) + + async def _gen() -> AsyncIterator[RuleChange]: + for change in changes: + yield change + + return _gen() + + +def test_hydrate_from_installs_all() -> None: + idx = RuleIndex() + a = _rule("R_A") + b = _rule("R_B", applies_to=frozenset({"email"})) + store = _StubStore(compiled=[a, b]) + asyncio.run(idx.hydrate_from(store)) + assert idx.get("R_A") is a + assert idx.get("R_B") is b + + +def test_hydrate_predicate_filters() -> None: + idx = RuleIndex() + a = _rule("R_A") # match.kind defaults to {"pattern": "x"} + b = _rule("R_B") + store = _StubStore(compiled=[a, b]) + asyncio.run( + idx.hydrate_from(store, predicate=lambda r: r.rule_id == "R_A") + ) + assert idx.get("R_A") is a + assert idx.get("R_B") is None + + +def test_watch_drains_definition_changes() -> None: + idx = RuleIndex() + a = _rule("R_A") + b = _rule("R_B", applies_to=frozenset({"email"})) + store = _StubStore( + compiled=[], + changes=[ + RuleChange(change_kind="definition", rule_id="R_A", new_value=a), + RuleChange(change_kind="definition", rule_id="R_B", new_value=b), + ], + ) + asyncio.run(idx.watch(store)) + assert idx.get("R_A") is a + assert idx.get("R_B") is b + + +def test_watch_predicate_evicts_unowned_definition_changes() -> None: + """A rule whose YAML moves out of our predicate's claim set must be + evicted from the index, not silently retained. + """ + idx = RuleIndex() + owned = _rule("R_A") + unowned = _rule("R_B") + idx.install(owned) + idx.install(unowned) + + # Predicate now only owns R_A; R_B's incoming definition update + # should evict it. + new_b = _rule("R_B", rule_version=2) + store = _StubStore( + compiled=[], + changes=[ + RuleChange( + change_kind="definition", rule_id="R_B", new_value=new_b, + ), + ], + ) + asyncio.run(idx.watch(store, predicate=lambda r: r.rule_id == "R_A")) + assert idx.get("R_A") is owned + assert idx.get("R_B") is None + + +def test_watch_state_change_for_owned_rule_applies() -> None: + idx = RuleIndex() + idx.install(_rule("R_A")) + new_state = RuleState(state="disabled") + store = _StubStore( + compiled=[], + changes=[ + RuleChange( + change_kind="state", rule_id="R_A", new_value=new_state, + ), + ], + ) + asyncio.run(idx.watch(store, predicate=lambda r: r.rule_id == "R_A")) + restamped = idx.get("R_A") + assert restamped is not None + assert restamped.state.state == "disabled" + + +def test_watch_state_change_for_unowned_rule_skipped() -> None: + idx = RuleIndex() + # R_B was never installed (predicate excluded it). State change + # for R_B must NOT install a phantom entry. + store = _StubStore( + compiled=[], + changes=[ + RuleChange( + change_kind="state", + rule_id="R_B", + new_value=RuleState(state="disabled"), + ), + ], + ) + asyncio.run(idx.watch(store, predicate=lambda r: r.rule_id == "R_A")) + assert idx.get("R_B") is None + + +def test_apply_change_continues_on_error(caplog: pytest.LogCaptureFixture) -> None: + """A single bad change must not stall the watch loop.""" + idx = RuleIndex() + # Force an exception by passing the wrong value type for definition. + bad = RuleChange( + change_kind="definition", + rule_id="R_BAD", + new_value=RuleState(), # wrong type — apply_change ignores silently + ) + good = _rule("R_GOOD") + store = _StubStore( + compiled=[], + changes=[ + bad, + RuleChange( + change_kind="definition", rule_id="R_GOOD", new_value=good, + ), + ], + ) + asyncio.run(idx.watch(store)) + assert idx.get("R_GOOD") is good + assert idx.get("R_BAD") is None + + +def test_expired_state_treated_as_disabled_by_is_active() -> None: + """Sanity check on the helper used by both engine and lifters.""" + from decnet.ttp.impl._state import is_active + + expired = RuleState( + state="enabled", + expires_at=datetime.now(timezone.utc) - timedelta(seconds=1), + ) + assert is_active(expired) is False + fresh = RuleState( + state="enabled", + expires_at=datetime.now(timezone.utc) + timedelta(hours=1), + ) + assert is_active(fresh) is True + + +def test_apply_ceiling_only_clamps_clipped() -> None: + from decnet.ttp.impl._state import apply_ceiling + + enabled = RuleState(state="enabled", confidence_max=0.5) + assert apply_ceiling(0.9, enabled) == 0.9 # ceiling ignored unless clipped + clipped = RuleState(state="clipped", confidence_max=0.5) + assert apply_ceiling(0.9, clipped) == pytest.approx(0.45) + clipped_no_max = RuleState(state="clipped", confidence_max=None) + assert apply_ceiling(0.9, clipped_no_max) == 0.9