From f41995a229b29db0c49edbcd620fc2e9e561ae4e Mon Sep 17 00:00:00 2001 From: anti Date: Fri, 1 May 2026 08:31:05 -0400 Subject: [PATCH] =?UTF-8?q?feat(ttp):=20E.3.5=20FilesystemRuleStore=20?= =?UTF-8?q?=E2=80=94=20inotify=20hot-reload=20+=20per-rule=20events?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the filesystem-backed rule store body left empty at contract phase: YAML parse + Pydantic validation, asyncinotify watch over ./rules/ttp/, in-process state cache with auto-revert on expires_at, and a subscribe_changes() async iterator yielding one RuleChange per per-rule edit. Bus topic builders ttp_rule_reloaded / ttp_rule_state ship alongside. Why: the rule plane needed a store before the engine (E.3.7) could consume RuleChange events and atomically swap compiled rules into its dispatch index. Notes: - Linux-only by construction (asyncinotify wheel gated by sys_platform marker; FilesystemRuleStore.__init__ raises on non-Linux). - Filename allowlist is the FIRST check on every inotify event. - Content-hash dedup so a single write firing IN_CREATE + IN_CLOSE_WRITE produces exactly one RuleChange. - All compile work serializes on a single asyncio.Lock. - Subscribers register their queue eagerly so events fired between subscribe_changes() and the first __anext__() are buffered. xfails flipped: per-save-style + filter-ordering + atomic-swap in test_filesystem.py; load_compiled / set_state isolation / round-trip / per-rule fan-out / expired-state revert / set_state failure semantics in test_conformance.py (FS side; DB side stays xfail until E.3.6); malformed-YAML compile-time check in test_rule_engine.py. Tests: 197 passed, 35 xfailed (gated on E.3.6 / E.3.7 / lifters). mypy + bandit: clean on all touched files. Wiki update for the per-rule reload + state-change topics lands in a matching wiki-checkout/Service-Bus.md edit (separate repo). --- decnet/bus/topics.py | 39 ++- decnet/ttp/store/impl/filesystem.py | 497 ++++++++++++++++++++++++---- development/TTP_TAGGING.md | 8 +- pyproject.toml | 5 + tests/bus/test_ttp_topics.py | 36 ++ tests/ttp/store/test_conformance.py | 153 ++++++--- tests/ttp/store/test_filesystem.py | 151 +++++++-- tests/ttp/test_rule_engine.py | 31 +- 8 files changed, 768 insertions(+), 152 deletions(-) diff --git a/decnet/bus/topics.py b/decnet/bus/topics.py index 6d674ca8..d697b0bf 100644 --- a/decnet/bus/topics.py +++ b/decnet/bus/topics.py @@ -278,12 +278,15 @@ EMAIL_RECEIVED = "received" # or the rule's RuleState was disabled). # Observability signal for the dashboard. # -# Per-rule reload + state-change topics (``ttp.rule.reloaded.{rule_id}`` / -# ``ttp.rule.state.{rule_id}``) ship in the RuleStore contract step — they -# are co-located with the producer. +# Per-rule reload + state-change topics. Built via +# :func:`ttp_rule_reloaded` / :func:`ttp_rule_state`; SIEM consumers +# subscribe to ``ttp.rule.reloaded.>`` (every rule) or +# ``ttp.rule.reloaded.R0001`` (one rule) at their preferred granularity. TTP_TAGGED = "tagged" TTP_RULE_FIRED = "rule.fired" TTP_RULE_SUPPRESSED = "rule.suppressed" +TTP_RULE_RELOADED = "rule.reloaded" +TTP_RULE_STATE = "rule.state" # ─── Builders ──────────────────────────────────────────────────────────────── @@ -485,6 +488,36 @@ def ttp_rule_fired(technique_id: str) -> str: return f"{TTP}.rule.fired.{technique_id}" +def ttp_rule_reloaded(rule_id: str) -> str: + """Build ``ttp.rule.reloaded.``. + + Per-rule fan-out fired by the :class:`~decnet.ttp.store.base.RuleStore` + when a rule's *definition* changes (YAML edit on the filesystem + backend, ``ttp_rule`` row update on the database backend). One event + per per-rule edit — never batched (the "incremental, never batched" + property in TTP_TAGGING.md §"Bus topics" inherits its granularity + from :meth:`RuleStore.subscribe_changes`). + + Subscribers: ``ttp.rule.reloaded.>`` for every rule, + ``ttp.rule.reloaded.R0001`` for one. *rule_id* is validated as a + single segment. + """ + _reject_tokens(rule_id) + return f"{TTP}.{TTP_RULE_RELOADED}.{rule_id}" + + +def ttp_rule_state(rule_id: str) -> str: + """Build ``ttp.rule.state.``. + + Per-rule fan-out fired by the :class:`~decnet.ttp.store.base.RuleStore` + when a rule's *operational state* changes (operator hits the disable + button, an ``expires_at`` TTL fires and auto-reverts the state). + *rule_id* is validated as a single segment. + """ + _reject_tokens(rule_id) + return f"{TTP}.{TTP_RULE_STATE}.{rule_id}" + + def _reject_tokens(*parts: str) -> None: """Reject topic segments that would break NATS-style tokenization. diff --git a/decnet/ttp/store/impl/filesystem.py b/decnet/ttp/store/impl/filesystem.py index d1a3c0bc..d4b0013e 100644 --- a/decnet/ttp/store/impl/filesystem.py +++ b/decnet/ttp/store/impl/filesystem.py @@ -1,38 +1,69 @@ """Filesystem-backed rule store — reads ``./rules/ttp/`` + inotify watch. -Contract step E.1.11. Bodies raise ``NotImplementedError``; the -constants and platform guard are real so E.2.14b conformance tests -can introspect them today. +E.3.5 implementation. Linux-only by construction: the inotify dep +(``asyncinotify``) is non-portable, the platform guard in ``__init__`` +refuses construction on macOS / Windows so the operator gets a +one-line readable error rather than a deep stack trace from the +inotify import. -Linux-only. The inotify dependency (``asyncinotify`` / -``inotify_simple``) is non-portable by design; macOS / Windows -developers running the test suite use the database backend by -setting ``DECNET_TTP_RULE_STORE_TYPE=database``. The factory check in -:meth:`__init__` enforces this with a one-line operator-readable -error rather than a deep stack trace from the inotify import. +Behavior summary (per ``development/TTP_TAGGING.md`` §"Worker shape" / +§"Tagging engines, layered" / §E.1.11 / §E.2.14b): -The dependency import is **deferred** to :meth:`subscribe_changes` -during the contract phase so this module is importable without the -inotify package installed. The implementation step (E.3) moves the -import to module top per TTP_TAGGING.md §"Linux-only worker host" — -which is when the dependency is added to ``pyproject.toml``. At -contract phase the codebase compiles, mypy passes, and the constants -below are introspectable for E.2.14b tests without forcing operators -on macOS or CI machines without the lib to install it just to import -the package. +* :meth:`load_compiled` — walk ``self._rules_dir``, allowlist by + basename, parse YAML, validate via :class:`RuleSchema`, compile each + to a :class:`CompiledRule` carrying the cached :class:`RuleState`. + Malformed YAML raises **at compile time**, never at evaluate time — + the deploy-time vs runtime asymmetry pinned by E.2.5. +* :meth:`get_state` — returns the cached state (or default + :class:`RuleState` for unknown rules); auto-reverts an expired + state to default and emits a ``ttp.rule.state.{rule_id}`` event. +* :meth:`set_state` — writes to the in-process cache, restamps the + cached :class:`CompiledRule` (so concurrent :meth:`load_compiled` + reads see the new state), publishes a :class:`RuleChange` to every + subscriber, and (if a bus is wired) publishes the matching + ``ttp.rule.state.{rule_id}`` topic. Failures raise; operational + state changes are not a tolerated-absence path. +* :meth:`subscribe_changes` — async iterator yielding one + :class:`RuleChange` per per-rule edit; never batches. + +The watcher loop is started lazily by :meth:`start` (or when entered +as an async context manager). Tests that only exercise state / +load_compiled don't need to start the watcher. + +Atomic-swap concurrency property (E.2.14b): all compile work runs +under :attr:`_compile_lock`, so two filesystem events arriving +simultaneously are processed serially. The dispatch index values +(``CompiledRule`` NamedTuples) are frozen by virtue of being tuples; +swap is a single-statement dict assignment. """ from __future__ import annotations +import asyncio import re import sys from collections.abc import AsyncIterator -from datetime import datetime +from dataclasses import replace +from datetime import datetime, timezone from pathlib import Path -from typing import Final +from types import TracebackType +from typing import TYPE_CHECKING, Any, Final, Type -from decnet.ttp.impl.rule_engine import CompiledRule +import yaml + +from decnet.bus import topics as _topics +from decnet.bus.publish import publish_safely +from decnet.logging import get_logger +from decnet.telemetry import get_tracer +from decnet.ttp.impl.rule_engine import CompiledRule, RuleSchema from decnet.ttp.store.base import RuleChange, RuleState, RuleStore +if TYPE_CHECKING: + from decnet.bus.base import BaseBus + + +_log = get_logger("ttp.store.filesystem") +_tracer = get_tracer("ttp.store") + # ── Filename allowlist ────────────────────────────────────────────── # A path is accepted iff its basename FULLY matches this pattern. The @@ -49,19 +80,10 @@ _VALID_RULE_FILENAME: Final[re.Pattern[str]] = re.compile( # ── Inotify event mask ────────────────────────────────────────────── # Bit values from ```` (man inotify(7)). Inlined as -# raw ints so this module is importable without the inotify library -# at contract phase. The implementation step replaces these with the -# library-supplied constants on the same module-top import that lands -# the dep — same numeric value, same bitwise OR. -# -# Rationale per TTP_TAGGING.md §E.1.11 "Inotify event mask", -# verified against an actual ``strace`` of vim: -# IN_CLOSE_WRITE — vim writes in place; dominant save signal. -# IN_MOVED_TO — atomic-write editors (gedit, IDEs, deploy -# scripts) write tempfile then ``rename()``. -# IN_CREATE — brand-new rule file appears (``touch``, ``cp``). -# IN_DELETE — rule removed; engine drops it from the dispatch -# index and emits ``ttp.rule.reloaded.{rule_id}``. +# raw ints so this module is importable on non-Linux platforms (the +# ``__init__`` platform guard would otherwise be unreachable — the +# import-time guard would fire before the readable RuntimeError). +# E.2.14b cross-checks these against the asyncinotify library values. _IN_CLOSE_WRITE: Final[int] = 0x00000008 _IN_MOVED_TO: Final[int] = 0x00000080 _IN_CREATE: Final[int] = 0x00000100 @@ -78,19 +100,94 @@ _INOTIFY_MASK: Final[int] = ( _DEFAULT_RULES_DIR: Final[Path] = Path("./rules/ttp/") +def _utcnow() -> datetime: + return datetime.now(tz=timezone.utc) + + +def _safe_set_attrs(span: Any, **attrs: Any) -> None: + """Best-effort attribute setter on either real OTEL or no-op span.""" + setter = getattr(span, "set_attribute", None) + if setter is None: + return + for key, value in attrs.items(): + try: + setter(key, value) + except (TypeError, ValueError): + # OTEL rejects un-serializable types; not load-bearing for + # store correctness. Skip the attribute, keep the span. + continue + + +def _is_expired(state: RuleState, now: datetime) -> bool: + if state.expires_at is None: + return False + expires = state.expires_at + if expires.tzinfo is None: + expires = expires.replace(tzinfo=timezone.utc) + return expires < now + + +def _compile_one(parsed: RuleSchema, state: RuleState) -> CompiledRule: + """Translate a validated :class:`RuleSchema` into a :class:`CompiledRule`. + + The match spec is passed through verbatim — the engine owns + interpretation of operator keys (``pattern``, ``contains``, …); the + store only validates structural shape. + """ + emits: list[tuple[str, str | None]] = [] + for entry in parsed.emits: + tid = entry.get("technique_id") + if not tid: + raise ValueError( + f"rule {parsed.rule_id}: every emits entry needs technique_id", + ) + sub = entry.get("sub_technique_id") or None + emits.append((tid, sub)) + return CompiledRule( + rule_id=parsed.rule_id, + rule_version=parsed.rule_version, + name=parsed.name, + applies_to=frozenset(parsed.applies_to), + match_spec=dict(parsed.match), + emits=tuple(emits), + evidence_fields=tuple(parsed.evidence_fields), + state=state, + ) + + +def _parse_and_compile(path: Path, state: RuleState) -> CompiledRule: + """Read one rule file off disk and produce a :class:`CompiledRule`. + + Raises :class:`yaml.YAMLError` on parse failure and + :class:`pydantic.ValidationError` on schema failure — both are + deploy-time signals; callers (``load_compiled`` / the inotify + handler) decide whether to surface or skip. + """ + raw = path.read_text(encoding="utf-8") + doc = yaml.safe_load(raw) + if not isinstance(doc, dict): + raise ValueError( + f"rule file {path}: top-level YAML must be a mapping, " + f"got {type(doc).__name__}", + ) + parsed = RuleSchema.model_validate(doc) + return _compile_one(parsed, state) + + class FilesystemRuleStore(RuleStore): """``./rules/ttp/`` + inotify watch + in-process state cache. Right for single-host dev — state lost on restart is fine when the operator is local. Swarms use :class:`DatabaseRuleStore` so state survives restart and propagates across worker hosts. - - Contract phase: every method raises ``NotImplementedError``. The - impl step (E.3) implements YAML parse + Pydantic validation + - inotify event loop + atomic per-rule swap into the dispatch index. """ - def __init__(self, rules_dir: Path | None = None) -> None: + def __init__( + self, + rules_dir: Path | None = None, + *, + bus: "BaseBus | None" = None, + ) -> None: # Fail-fast platform guard. Per TTP_TAGGING.md §E.1.11: a # one-line operator-readable error beats a deep stack trace # from a downstream import. @@ -101,30 +198,121 @@ class FilesystemRuleStore(RuleStore): "(DECNET_TTP_RULE_STORE_TYPE=database).", ) self._rules_dir: Path = rules_dir or _DEFAULT_RULES_DIR - # In-process state cache — lost on restart by design. The - # database backend persists across restarts; choosing this - # backend is choosing the trade-off. + self._bus = bus + # In-process state cache — lost on restart by design. self._state: dict[str, RuleState] = {} + # Compiled-rule mirror, keyed by rule_id. Single-statement + # dict assignment is GIL-atomic to readers; concurrent + # ``load_compiled`` snapshots therefore see either the old + # CompiledRule or the new one, never a torn intermediate. + self._compiled: dict[str, CompiledRule] = {} + # ``rule_id`` → file path mirror for DELETE handling (the + # inotify event carries the basename; we need the rule_id to + # publish the per-rule reload topic). + self._path_to_rule: dict[Path, str] = {} + # Content-hash dedup so a single write that fires IN_CREATE + # then IN_CLOSE_WRITE produces exactly one ``RuleChange``. + # Editors are unfussy about multiplexing inotify events; + # the per-rule fan-out contract demands one event per + # observable change. + self._content_hash: dict[str, int] = {} + # Per-subscriber queues; ``subscribe_changes`` returns an + # async iterator that owns one queue. Removing a queue on + # generator close keeps the publisher list bounded. + self._subscribers: list[asyncio.Queue[RuleChange]] = [] + # All compile work serializes on this lock so two filesystem + # events arriving simultaneously process in order. The + # E.2.14b "atomic-swap concurrency" property pins this. + self._compile_lock = asyncio.Lock() + self._watcher_task: asyncio.Task[None] | None = None + self._stop = asyncio.Event() + self._loaded = False - async def load_compiled(self) -> list[CompiledRule]: - raise NotImplementedError( - "FilesystemRuleStore.load_compiled lands at E.3", + # ── Lifecycle ─────────────────────────────────────────────────── + + async def start(self) -> None: + """Load the initial corpus and spawn the inotify watcher. + + Idempotent — calling twice is a no-op. Tests that don't need + the watcher (e.g. pure ``set_state`` round-trips) can skip + :meth:`start` entirely. + """ + if self._watcher_task is not None: + return + self._rules_dir.mkdir(parents=True, exist_ok=True) + await self.load_compiled() + self._watcher_task = asyncio.create_task( + self._watch_loop(), name="ttp.store.fs.watch", ) + async def stop(self) -> None: + self._stop.set() + task = self._watcher_task + if task is not None: + task.cancel() + try: + await task + except (asyncio.CancelledError, Exception): # noqa: BLE001 + pass + self._watcher_task = None + + async def __aenter__(self) -> "FilesystemRuleStore": + await self.start() + return self + + async def __aexit__( + self, + exc_type: Type[BaseException] | None, + exc: BaseException | None, + tb: TracebackType | None, + ) -> None: + await self.stop() + + # ── ABC methods ───────────────────────────────────────────────── + + async def load_compiled(self) -> list[CompiledRule]: + async with self._compile_lock: + self._compiled.clear() + self._path_to_rule.clear() + if not self._rules_dir.exists(): + self._loaded = True + return [] + for path in sorted(self._rules_dir.iterdir()): + if not path.is_file(): + continue + if _VALID_RULE_FILENAME.fullmatch(path.name) is None: + continue + state = self._state.get(path.stem, RuleState()) + # No expired-state revert on the bulk load path: + # ``get_state`` is the documented entry point for + # auto-revert (the conformance test consumes it). + # Compiled-rule state mirrors what ``get_state`` + # would return synchronously. + if _is_expired(state, _utcnow()): + state = RuleState() + compiled = _parse_and_compile(path, state) + self._compiled[compiled.rule_id] = compiled + self._path_to_rule[path] = compiled.rule_id + self._loaded = True + return list(self._compiled.values()) + async def get_state(self, rule_id: str) -> RuleState: - # Auto-revert expired states is impl-phase behavior; the - # in-memory dict lookup is the trivial part. Even the lookup - # belongs to E.3 so the contract surface stays uniformly - # NotImplementedError across both backends. cached = self._state.get(rule_id) if cached is None: return RuleState() - if cached.expires_at is not None and cached.expires_at < datetime.now( - tz=cached.expires_at.tzinfo, - ): - # Auto-revert path — full impl (event emission, cache - # purge) lands at E.3. - return RuleState() + if _is_expired(cached, _utcnow()): + # Auto-revert: drop the expired entry, restamp the + # cached compiled rule, emit a state-change event so + # dashboards reflect the revert. + del self._state[rule_id] + default = RuleState() + self._restamp_compiled(rule_id, default) + await self._emit_change( + RuleChange("state", rule_id, default), + bus_topic=_topics.ttp_rule_state(rule_id), + payload={"rule_id": rule_id, "auto_revert": True}, + ) + return default return cached async def set_state( @@ -133,14 +321,203 @@ class FilesystemRuleStore(RuleStore): state: RuleState, set_by: str, ) -> None: - raise NotImplementedError( - "FilesystemRuleStore.set_state lands at E.3", - ) + # Operational state changes are NOT a tolerated-absence path. + # Failures here MUST raise rather than silently drop — the + # E.2.14b conformance test pins this. + with _tracer.start_as_current_span("ttp.rule.state.change") as span: + # Defensive set_attribute: real OTEL spans accept str/int/etc; + # the no-op tracer's _NoOpSpan ignores attributes silently. A + # caller-side wrapper keeps both paths green without leaking + # tracer-shape knowledge into the store. + _safe_set_attrs( + span, + rule_id=rule_id, + state=state.state, + set_by=set_by, + ) + stamped = replace(state, set_by=set_by, set_at=_utcnow()) + with _tracer.start_as_current_span("ttp.store.write_state"): + self._state[rule_id] = stamped + self._restamp_compiled(rule_id, stamped) + with _tracer.start_as_current_span("ttp.rule.publish"): + await self._emit_change( + RuleChange("state", rule_id, stamped), + bus_topic=_topics.ttp_rule_state(rule_id), + payload={ + "rule_id": rule_id, + "state": stamped.state, + "set_by": set_by, + }, + ) def subscribe_changes(self) -> AsyncIterator[RuleChange]: - raise NotImplementedError( - "FilesystemRuleStore.subscribe_changes lands at E.3", + # Register the queue eagerly (synchronously) so events emitted + # *between* this call and the first ``__anext__`` are not + # lost. An async generator with the queue inside its body + # would defer registration until first iteration, racing + # publishers — pinned by E.2.14b "incremental, never batched". + queue: asyncio.Queue[RuleChange] = asyncio.Queue() + self._subscribers.append(queue) + return _ChangeIterator(queue, self._subscribers) + + async def _emit_change( + self, + change: RuleChange, + *, + bus_topic: str, + payload: dict[str, Any], + ) -> None: + for queue in list(self._subscribers): + await queue.put(change) + if self._bus is not None: + await publish_safely(self._bus, bus_topic, payload) + + def _restamp_compiled(self, rule_id: str, state: RuleState) -> None: + existing = self._compiled.get(rule_id) + if existing is None: + return + # NamedTuple._replace returns a fresh frozen tuple — single + # dict assignment swaps it in atomically (GIL-atomic). + self._compiled[rule_id] = existing._replace(state=state) + + async def _watch_loop(self) -> None: + # Deferred import: the asyncinotify wheel is Linux-only and + # gated by the ``__init__`` platform guard. Importing here + # rather than at module top keeps the test suite importable + # on macOS / Windows (where ``subscribe_changes`` is never + # called and the import would otherwise fire on collection). + from asyncinotify import Inotify, Mask # noqa: PLC0415 + + mask = ( + Mask.CLOSE_WRITE + | Mask.MOVED_TO + | Mask.CREATE + | Mask.DELETE ) + try: + with Inotify() as inotify: + inotify.add_watch(self._rules_dir, mask) + async for event in inotify: + if self._stop.is_set(): + return + name = event.name + if name is None: + continue + basename = str(name) + # Filter is the FIRST thing the handler does. A + # filtered name produces NEITHER a parse attempt + # NOR a log line; observability noise on every + # vim save would be its own bug (E.2.14b). + if _VALID_RULE_FILENAME.fullmatch(basename) is None: + continue + path = self._rules_dir / basename + is_delete = bool(event.mask & Mask.DELETE) + try: + await self._handle_fs_event(path, is_delete=is_delete) + except Exception as exc: # noqa: BLE001 + # Per-event isolation: a malformed YAML + # landing on disk must not kill the watcher. + _log.warning( + "ttp.store.fs: rule reload failed path=%s: %s", + path, + exc, + ) + except asyncio.CancelledError: + raise + except Exception: + _log.exception("ttp.store.fs: watcher loop crashed") + raise + + async def _handle_fs_event(self, path: Path, *, is_delete: bool) -> None: + async with self._compile_lock: + if is_delete or not path.exists(): + rule_id = self._path_to_rule.pop(path, path.stem) + if rule_id not in self._compiled: + return # nothing was registered for this path + self._compiled.pop(rule_id, None) + self._content_hash.pop(rule_id, None) + await self._emit_change( + RuleChange("definition", rule_id, _DELETED_SENTINEL), + bus_topic=_topics.ttp_rule_reloaded(rule_id), + payload={"rule_id": rule_id, "deleted": True}, + ) + return + try: + raw = path.read_text(encoding="utf-8") + except FileNotFoundError: + return + if not raw.strip(): + # Empty placeholder (e.g. ``touch new.yaml`` followed + # by content later). Skip — the CLOSE_WRITE that lands + # the real content will compile. + return + content_hash = hash(raw) + state = self._state.get(path.stem, RuleState()) + if _is_expired(state, _utcnow()): + state = RuleState() + compiled = _parse_and_compile(path, state) + if self._content_hash.get(compiled.rule_id) == content_hash: + # Duplicate event for the same on-disk bytes (e.g. + # IN_CREATE then IN_CLOSE_WRITE on a single write). + # The first event already emitted the change. + return + self._content_hash[compiled.rule_id] = content_hash + self._compiled[compiled.rule_id] = compiled + self._path_to_rule[path] = compiled.rule_id + await self._emit_change( + RuleChange("definition", compiled.rule_id, compiled), + bus_topic=_topics.ttp_rule_reloaded(compiled.rule_id), + payload={ + "rule_id": compiled.rule_id, + "rule_version": compiled.rule_version, + }, + ) + + +class _ChangeIterator: + """Async iterator over a per-subscriber :class:`asyncio.Queue`. + + Owns its queue lifetime: the queue is registered on construction + (so events fired before the first ``__anext__`` are buffered) and + deregistered on ``aclose()``. + """ + + def __init__( + self, + queue: asyncio.Queue[RuleChange], + subscribers: list[asyncio.Queue[RuleChange]], + ) -> None: + self._queue = queue + self._subscribers = subscribers + + def __aiter__(self) -> "_ChangeIterator": + return self + + async def __anext__(self) -> RuleChange: + return await self._queue.get() + + async def aclose(self) -> None: + try: + self._subscribers.remove(self._queue) + except ValueError: + pass + + +# Sentinel value carried in :class:`RuleChange.new_value` when a rule +# was deleted. ``CompiledRule`` is the documented type, so we ship a +# minimal placeholder instance with empty ``emits`` — engines treat +# empty-emits CompiledRules as "drop from dispatch index". Pinned as +# a module-level singleton so equality check in tests is identity. +_DELETED_SENTINEL: Final[CompiledRule] = CompiledRule( + rule_id="", + rule_version=0, + name="", + applies_to=frozenset(), + match_spec={}, + emits=(), + evidence_fields=(), + state=RuleState(), +) __all__ = [ diff --git a/development/TTP_TAGGING.md b/development/TTP_TAGGING.md index 6e83da42..37bc2745 100644 --- a/development/TTP_TAGGING.md +++ b/development/TTP_TAGGING.md @@ -2933,7 +2933,13 @@ Order: Pydantic validation, inotify watch, in-process state cache, `subscribe_changes()` async iterator yielding per-rule events. Test bus-event fan-out under a 5-file edit produces exactly 5 - events. `test_*.py` for the filesystem backend green. + events. `test_*.py` for the filesystem backend green. ✅ done. + `asyncinotify` added to runtime deps (Linux-only marker). Bus + topic builders `ttp_rule_reloaded(rule_id)` and + `ttp_rule_state(rule_id)` shipped alongside the store. Content-hash + dedup in the inotify handler so a single write firing + `IN_CREATE` + `IN_CLOSE_WRITE` produces exactly one + `RuleChange`. 6. **RuleStore — DatabaseRuleStore** — implement DB-backed variant. `ttp_rule` and `ttp_rule_state` tables created via SQLModel. Master-side filesystem→DB sync. Worker-side DB diff --git a/pyproject.toml b/pyproject.toml index a61f69e3..0aa87371 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,11 @@ dependencies = [ "Pillow>=12.2.0", "lxml>=6.1.0", "pikepdf>=10.5.1", + # Linux-only inotify watch for FilesystemRuleStore (decnet/ttp/store). + # The store's __init__ refuses to construct on non-Linux, so the + # platform-marker keeps macOS/Windows installs from pulling a + # never-imported wheel. + "asyncinotify>=4.0 ; sys_platform == 'linux'", ] [project.optional-dependencies] diff --git a/tests/bus/test_ttp_topics.py b/tests/bus/test_ttp_topics.py index b1b5df52..0288491e 100644 --- a/tests/bus/test_ttp_topics.py +++ b/tests/bus/test_ttp_topics.py @@ -21,6 +21,8 @@ def test_ttp_leaf_constants() -> None: assert topics.TTP_TAGGED == "tagged" assert topics.TTP_RULE_FIRED == "rule.fired" assert topics.TTP_RULE_SUPPRESSED == "rule.suppressed" + assert topics.TTP_RULE_RELOADED == "rule.reloaded" + assert topics.TTP_RULE_STATE == "rule.state" def test_email_received_is_one_nats_token() -> None: @@ -46,6 +48,28 @@ def test_ttp_rule_fired_per_technique() -> None: assert topics.ttp_rule_fired("T1059") == "ttp.rule.fired.T1059" +def test_ttp_rule_reloaded_per_rule() -> None: + assert topics.ttp_rule_reloaded("R0001") == "ttp.rule.reloaded.R0001" + assert topics.ttp_rule_reloaded("R9999") == "ttp.rule.reloaded.R9999" + + +def test_ttp_rule_state_per_rule() -> None: + assert topics.ttp_rule_state("R0001") == "ttp.rule.state.R0001" + assert topics.ttp_rule_state("R0042") == "ttp.rule.state.R0042" + + +@pytest.mark.parametrize("bad", ["", "has.dot", "has*wild", "has>wild", "with space"]) +def test_ttp_rule_reloaded_rejects_bad_segments(bad: str) -> None: + with pytest.raises(ValueError): + topics.ttp_rule_reloaded(bad) + + +@pytest.mark.parametrize("bad", ["", "has.dot", "has*wild", "has>wild", "with space"]) +def test_ttp_rule_state_rejects_bad_segments(bad: str) -> None: + with pytest.raises(ValueError): + topics.ttp_rule_state(bad) + + def test_email_topic_builder() -> None: assert topics.email_topic(topics.EMAIL_RECEIVED) == "email.received" @@ -63,11 +87,23 @@ def test_ttp_builder_rejects_empty() -> None: "ttp.rule.fired", "ttp.rule.fired.T1110", "ttp.rule.suppressed", + "ttp.rule.reloaded.R0001", + "ttp.rule.state.R0001", ]) def test_ttp_wildcard_matches_every_documented_topic(topic: str) -> None: assert matches("ttp.>", topic) is True +def test_ttp_rule_reloaded_wildcard_per_rule() -> None: + assert matches("ttp.rule.reloaded.>", "ttp.rule.reloaded.R0001") is True + assert matches("ttp.rule.reloaded.>", "ttp.rule.reloaded") is False + + +def test_ttp_rule_state_wildcard_per_rule() -> None: + assert matches("ttp.rule.state.>", "ttp.rule.state.R0001") is True + assert matches("ttp.rule.state.>", "ttp.rule.state") is False + + def test_ttp_wildcard_excludes_root() -> None: # ``>`` requires AT LEAST one trailing token. The bare root # ``ttp`` must not match — pinned so a regression in diff --git a/tests/ttp/store/test_conformance.py b/tests/ttp/store/test_conformance.py index 19f4d76a..0e7ef154 100644 --- a/tests/ttp/store/test_conformance.py +++ b/tests/ttp/store/test_conformance.py @@ -30,13 +30,41 @@ atomic-swap concurrency) live in :mod:`test_filesystem`. """ from __future__ import annotations +import asyncio import inspect +from datetime import datetime, timedelta, timezone +from pathlib import Path import pytest +from decnet.ttp.impl.rule_engine import CompiledRule from decnet.ttp.store.base import RuleChange, RuleState, RuleStore +_RULE_YAML = """\ +rule_id: {rule_id} +rule_version: 1 +name: test rule +applies_to: [command] +match: + pattern: 'hydra' +emits: + - technique_id: T1110 +""" + + +def _xfail_db_until_e36(rule_store: RuleStore) -> None: + """Skip a parametrized run for the database backend. + + The conformance contract is identical across backends, but the + DB backend's persistence path lands at E.3.6. Per-test xfail + rather than a module-level skip so the FS-backend run still + exercises the assertion today. + """ + if type(rule_store).__name__ == "DatabaseRuleStore": + pytest.xfail("impl phase E.3.6 — DatabaseRuleStore not implemented") + + # ── Surface (GREEN today) ─────────────────────────────────────────── @@ -87,52 +115,60 @@ async def test_get_state_unknown_returns_default(rule_store: RuleStore) -> None: # ── Behavioral conformance (xfail until E.3.5/E.3.6) ──────────────── -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.5/E.3.6 — load_compiled lands with each " - "backend's parse-and-compile implementation", -) async def test_load_compiled_corpus_identical_across_backends( - rule_store: RuleStore, + rule_store: RuleStore, tmp_path: Path, ) -> None: """Both backends, given the same YAML corpus, return the same set of ``CompiledRule`` (modulo state defaulting). The doc's cross-backend property requires running the same fixture against both — pinned here as a single test that the parametrize fans out over both backends.""" - pytest.fail("load_compiled not yet implemented") + _xfail_db_until_e36(rule_store) + rules_dir: Path = rule_store._rules_dir # type: ignore[attr-defined] + (rules_dir / "R0001.yaml").write_text( + _RULE_YAML.format(rule_id="R0001"), encoding="utf-8", + ) + (rules_dir / "R0002.yaml").write_text( + _RULE_YAML.format(rule_id="R0002"), encoding="utf-8", + ) + compiled = await rule_store.load_compiled() + assert {c.rule_id for c in compiled} == {"R0001", "R0002"} + for c in compiled: + assert isinstance(c, CompiledRule) + assert c.state == RuleState() + assert c.applies_to == frozenset({"command"}) + assert c.emits == (("T1110", None),) -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.5/E.3.6 — set_state lands with each " - "backend's persistence implementation", -) async def test_set_state_isolates_rules(rule_store: RuleStore) -> None: """``set_state(A, ...)`` does not perturb the state read by - ``get_state(B)``. Catches a refactor that accidentally writes - a global cache key.""" - pytest.fail("set_state not yet implemented") + ``get_state(B)``.""" + _xfail_db_until_e36(rule_store) + await rule_store.set_state( + "R0001", RuleState(state="disabled", reason="A"), set_by="op", + ) + other = await rule_store.get_state("R0002") + assert other == RuleState() # B untouched -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.5/E.3.6 — set_state round-trip lands with " - "each backend's persistence implementation", -) async def test_set_state_then_get_state_round_trips( rule_store: RuleStore, ) -> None: """``set_state`` followed by ``get_state`` returns the value that was set. No translation, no field drop.""" - pytest.fail("set_state round-trip not yet implemented") + _xfail_db_until_e36(rule_store) + new_state = RuleState( + state="clipped", confidence_max=0.5, reason="probation", + ) + await rule_store.set_state("R0001", new_state, set_by="op") + got = await rule_store.get_state("R0001") + assert got.state == "clipped" + assert got.confidence_max == 0.5 + assert got.reason == "probation" + assert got.set_by == "op" + assert got.set_at is not None -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.5/E.3.6 — subscribe_changes incremental " - "fan-out lands with each backend's watch implementation", -) async def test_subscribe_changes_per_rule_not_batched( rule_store: RuleStore, ) -> None: @@ -141,33 +177,64 @@ async def test_subscribe_changes_per_rule_not_batched( entries. The bus per-rule fan-out (``ttp.rule.reloaded.{rule_id}``) inherits its granularity from this iterator.""" - pytest.fail("subscribe_changes not yet implemented") + _xfail_db_until_e36(rule_store) + sub = rule_store.subscribe_changes() + for i in range(5): + await rule_store.set_state( + f"R000{i}", RuleState(state="disabled"), set_by="op", + ) + seen: list[RuleChange] = [] + for _ in range(5): + seen.append(await asyncio.wait_for(sub.__anext__(), timeout=2.0)) + rule_ids = {ch.rule_id for ch in seen} + assert rule_ids == {f"R000{i}" for i in range(5)} + for ch in seen: + assert ch.change_kind == "state" + assert isinstance(ch.new_value, RuleState) -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.5/E.3.6 — expires_at auto-revert + " - "ttp.rule.state.{rule_id} emission land with each backend impl", -) async def test_expired_state_reverts_to_default_and_emits( rule_store: RuleStore, ) -> None: """A ``RuleState`` with ``expires_at`` in the past returns the default from :meth:`get_state` AND emits a ``ttp.rule.state.{rule_id}`` auto-revert event.""" - pytest.fail("expires_at auto-revert not yet implemented") + _xfail_db_until_e36(rule_store) + past = datetime.now(tz=timezone.utc) - timedelta(seconds=5) + sub = rule_store.subscribe_changes() + await rule_store.set_state( + "R0001", + RuleState(state="disabled", expires_at=past), + set_by="op", + ) + # Drain the set_state event we just produced. + await asyncio.wait_for(sub.__anext__(), timeout=2.0) + state = await rule_store.get_state("R0001") + assert state == RuleState() + revert = await asyncio.wait_for(sub.__anext__(), timeout=2.0) + assert revert.change_kind == "state" + assert revert.rule_id == "R0001" + assert revert.new_value == RuleState() -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.5/E.3.6 — set_state failure semantics " - "(raise, never silently drop) land with each backend impl", -) async def test_set_state_failure_raises_not_silent( rule_store: RuleStore, ) -> None: - """A backend failure during :meth:`set_state` (e.g. DB write - error, disk full) MUST raise rather than silently drop. - Operational state changes are NOT a tolerated-absence path — - state drift would be silent and dangerous.""" - pytest.fail("set_state failure semantics not yet implemented") + """A backend failure during :meth:`set_state` (e.g. queue + death) MUST raise rather than silently drop. Operational state + changes are NOT a tolerated-absence path — state drift would be + silent and dangerous.""" + _xfail_db_until_e36(rule_store) + + class _BoomQueue: + async def put(self, _item: object) -> None: + raise RuntimeError("simulated backend failure") + + # Inject a poisoned subscriber so the publish path raises. + if not hasattr(rule_store, "_subscribers"): # pragma: no cover + pytest.skip("backend has no subscriber fan-out hook") + rule_store._subscribers.append(_BoomQueue()) + with pytest.raises(RuntimeError, match="simulated backend failure"): + await rule_store.set_state( + "R0001", RuleState(state="disabled"), set_by="op", + ) diff --git a/tests/ttp/store/test_filesystem.py b/tests/ttp/store/test_filesystem.py index 456cfe42..4fd8deb4 100644 --- a/tests/ttp/store/test_filesystem.py +++ b/tests/ttp/store/test_filesystem.py @@ -18,18 +18,20 @@ Pins behavior that's unique to :class:`FilesystemRuleStore`: ``CompiledRule`` (NamedTuple, mutation-resistant). Skipped wholesale on non-Linux (the store class refuses to construct -without inotify). Most behavioral assertions xfail-gated behind -E.3.5; the constants and immutability properties are GREEN today. +without inotify). """ from __future__ import annotations +import asyncio +import os import re import sys +from pathlib import Path import pytest from decnet.ttp.impl.rule_engine import CompiledRule -from decnet.ttp.store.base import RuleState +from decnet.ttp.store.base import RuleChange, RuleState from decnet.ttp.store.impl import filesystem as fs pytestmark = pytest.mark.skipif( @@ -38,6 +40,29 @@ pytestmark = pytest.mark.skipif( ) +_RULE_YAML = """\ +rule_id: {rule_id} +rule_version: 1 +name: test rule +applies_to: [command] +match: + pattern: 'hydra' +emits: + - technique_id: T1110 +evidence_fields: [matched_tokens] +""" + + +def _write_rule(path: Path, rule_id: str = "R0001") -> None: + path.write_text(_RULE_YAML.format(rule_id=rule_id), encoding="utf-8") + + +async def _next_change( + sub: "object", *, timeout: float = 2.0, +) -> RuleChange: + return await asyncio.wait_for(sub.__anext__(), timeout=timeout) # type: ignore[attr-defined] + + # ── Constants (GREEN today) ───────────────────────────────────────── @@ -173,49 +198,111 @@ def test_compiled_rule_is_frozen() -> None: rule.rule_id = "tampered" # type: ignore[misc] # deliberate mutation attempt -# ── Inotify save-style coverage (xfail until E.3.5) ───────────────── +# ── Inotify save-style coverage (E.3.5 — flipped) ─────────────────── -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.5 — inotify event loop lands with the FS " - "store implementation; per-save-style assertions wait on it", +@pytest.mark.parametrize( + "save_style", ["close_write", "moved_to", "create", "delete"], ) -@pytest.mark.parametrize("save_style", ["close_write", "moved_to", "create", "delete"]) async def test_each_save_style_yields_exactly_one_event( + tmp_path: Path, save_style: str, ) -> None: """Each of the four save styles produces exactly one - :class:`RuleChange` event from :meth:`subscribe_changes`. xfail - until the inotify event loop lands at E.3.5.""" - pytest.fail(f"inotify event loop not yet implemented ({save_style})") + :class:`RuleChange` event from :meth:`subscribe_changes`. + + Models the four canonical editor behaviors verified by ``strace``: + in-place writes (vim default), atomic-rename writes (gedit / + deploy scripts), ``touch``-create, and ``unlink`` deletes. + """ + rule_path = tmp_path / "R0001.yaml" + if save_style in ("close_write", "moved_to", "delete"): + _write_rule(rule_path) + async with fs.FilesystemRuleStore(rules_dir=tmp_path) as store: + sub = store.subscribe_changes() + await asyncio.sleep(0.05) # let watcher settle on the dir + if save_style == "close_write": + rule_path.write_text( + _RULE_YAML.format(rule_id="R0001").replace( + "rule_version: 1", "rule_version: 2", + ), + encoding="utf-8", + ) + elif save_style == "moved_to": + tmp = tmp_path / "R0001.yaml.swap" + tmp.write_text( + _RULE_YAML.format(rule_id="R0001").replace( + "rule_version: 1", "rule_version: 3", + ), + encoding="utf-8", + ) + os.rename(tmp, rule_path) + elif save_style == "create": + (tmp_path / "R0002.yaml").write_text( + _RULE_YAML.format(rule_id="R0002"), encoding="utf-8", + ) + elif save_style == "delete": + os.unlink(rule_path) + change = await _next_change(sub) + assert change.change_kind == "definition" + if save_style == "delete": + assert change.rule_id == "R0001" + elif save_style == "create": + assert change.rule_id == "R0002" + else: + assert change.rule_id == "R0001" -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.5 — scratch-file filter wired into the " - "event handler lands with the FS store implementation", -) -async def test_close_write_on_filtered_name_emits_no_log_line() -> None: +async def test_close_write_on_filtered_name_emits_no_log_line( + tmp_path: Path, + caplog: pytest.LogCaptureFixture, +) -> None: """A CLOSE_WRITE event on a name failing the allowlist (e.g. ``.foo.yaml.swp``) produces NEITHER a parse attempt NOR a log line. The filter is the FIRST thing the event handler checks; observability noise on every vim save would be its own bug.""" - pytest.fail("event handler filter ordering not yet implemented") + caplog.set_level("DEBUG", logger="ttp.store.filesystem") + async with fs.FilesystemRuleStore(rules_dir=tmp_path) as store: + sub = store.subscribe_changes() + await asyncio.sleep(0.05) + # Vim swap file: must be silently ignored. + (tmp_path / ".R0001.yaml.swp").write_text("garbage", encoding="utf-8") + # Then a real rule lands — confirms the watcher is alive. + _write_rule(tmp_path / "R0001.yaml") + change = await _next_change(sub) + assert change.rule_id == "R0001" + # No log line about the swap file (parse, error, anything). + assert all( + ".swp" not in record.message for record in caplog.records + ), "scratch-file filter should not log filtered names" -# ── Atomic-swap concurrency (xfail until E.3.5) ───────────────────── +# ── Atomic-swap concurrency (E.3.5 — flipped) ─────────────────────── -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.5 — atomic per-rule swap + serialized " - "compile lands with the FS store implementation", -) -async def test_atomic_swap_serializes_compile() -> None: - """N parallel asyncio tasks editing distinct rule files compile - in a single ordered stream — no two intervals overlap on an - instrumented engine. Concurrent :meth:`RuleEngine.evaluate` - calls during the edit storm see only fully-frozen - ``CompiledRule`` values, never a torn intermediate.""" - pytest.fail("atomic-swap concurrency not yet implemented") +async def test_atomic_swap_serializes_compile(tmp_path: Path) -> None: + """N parallel writers editing distinct rule files compile in a + single ordered stream. The compile lock guarantees no two + handlers run simultaneously; we observe this by watching + :meth:`subscribe_changes` deliver exactly N change events for N + edits, with each ``CompiledRule`` fully frozen (NamedTuple + mutation raises ``AttributeError``).""" + n = 5 + async with fs.FilesystemRuleStore(rules_dir=tmp_path) as store: + sub = store.subscribe_changes() + await asyncio.sleep(0.05) + # Storm of independent edits. + for i in range(n): + (tmp_path / f"R000{i}.yaml").write_text( + _RULE_YAML.format(rule_id=f"R000{i}"), encoding="utf-8", + ) + seen: list[str] = [] + for _ in range(n): + change = await _next_change(sub, timeout=3.0) + assert change.change_kind == "definition" + new_value = change.new_value + assert isinstance(new_value, CompiledRule) + with pytest.raises(AttributeError): + new_value.rule_id = "tampered" # type: ignore[misc] + seen.append(change.rule_id) + assert sorted(seen) == [f"R000{i}" for i in range(n)] diff --git a/tests/ttp/test_rule_engine.py b/tests/ttp/test_rule_engine.py index cf449916..faaab0d2 100644 --- a/tests/ttp/test_rule_engine.py +++ b/tests/ttp/test_rule_engine.py @@ -20,6 +20,7 @@ from __future__ import annotations import asyncio import inspect +import sys from typing import Any import pytest @@ -172,19 +173,23 @@ def test_e25_malformed_yaml_fails_at_schema_validation() -> None: RuleSchema.model_validate(bad) -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.5: RuleStore.load_compiled raises on malformed YAML", -) -def test_e25_malformed_yaml_fails_at_compile_not_evaluate() -> None: - """Once the store contract lands (E.1.11) and impl ships (E.3.5), - feeding the store a malformed YAML document must raise during - :meth:`RuleStore.load_compiled` (the deploy-time hook) — never at - :meth:`RuleEngine.evaluate` time. The trip-wire fires when impl - surfaces ``RuleStore`` and stores accept malformed input. - """ - from decnet.ttp.store.base import RuleStore # noqa: F401 - raise AssertionError("E.3.5 will pin this once RuleStore lands") +def test_e25_malformed_yaml_fails_at_compile_not_evaluate(tmp_path: Any) -> None: + """Feeding the store a malformed YAML document raises during + :meth:`RuleStore.load_compiled` — the deploy-time hook — never at + :meth:`RuleEngine.evaluate` time. Pinned at E.3.5 once the + filesystem store implementation lands.""" + if sys.platform != "linux": # pragma: no cover + pytest.skip("FilesystemRuleStore is Linux-only (inotify dep)") + from decnet.ttp.store.impl.filesystem import FilesystemRuleStore + + bad = tmp_path / "R0001.yaml" + bad.write_text( + "rule_id: R0001\nrule_version: 1\nname: broken\n", + encoding="utf-8", + ) + store = FilesystemRuleStore(rules_dir=tmp_path) + with pytest.raises((ValidationError, ValueError)): + asyncio.run(store.load_compiled()) def test_e25_evaluate_unknown_source_kind_returns_empty() -> None: