diff --git a/decnet/ttp/store/impl/database.py b/decnet/ttp/store/impl/database.py index 0047076c..a4d850f3 100644 --- a/decnet/ttp/store/impl/database.py +++ b/decnet/ttp/store/impl/database.py @@ -1,45 +1,273 @@ """Database-backed rule store — ``ttp_rule`` + ``ttp_rule_state``. -Contract step E.1.11. Bodies raise ``NotImplementedError``; the -backing tables (:class:`TTPRule`, :class:`TTPRuleState`) shipped at -E.1.1. - -Right for swarm: master syncs filesystem changes into ``ttp_rule``, -workers tail the DB, state in ``ttp_rule_state`` survives restart and -propagates to every worker. Pick via +E.3.6 implementation. Right for swarm: master syncs filesystem changes +into ``ttp_rule``, workers tail the DB, state in ``ttp_rule_state`` +survives restart and propagates to every worker. Pick via ``DECNET_TTP_RULE_STORE_TYPE=database``. No platform guard — works on macOS / Windows where the filesystem backend's inotify dependency is unavailable. + +Mechanics: + +* :meth:`load_compiled` — read every row of ``ttp_rule``, parse the + stored ``yaml_content`` through :class:`RuleSchema`, stamp the + matching :class:`RuleState` from ``ttp_rule_state`` (or default + ``RuleState`` if no row exists). Malformed YAML in ``yaml_content`` + raises immediately — same deploy-time-not-runtime asymmetry as the + filesystem backend. +* :meth:`get_state` — single-row lookup against ``ttp_rule_state`` + with the same ``expires_at`` auto-revert + bus-event semantics as + the filesystem store. +* :meth:`set_state` — upsert into ``ttp_rule_state``; failures raise + rather than silently drop. Publishes the change through the + in-process subscriber fan-out and (if a bus is wired) the matching + ``ttp.rule.state.{rule_id}`` topic. +* :meth:`subscribe_changes` — async iterator backed by a per-subscriber + queue. Direct :meth:`set_state` calls feed the queue synchronously; + cross-process changes (master writes a new ``ttp_rule`` row, this + worker tails it) are picked up by :meth:`tail_db` — a poll loop the + worker bootstrap (E.3.14) wires onto the asyncio event loop. + +The master-side filesystem→DB sync helper is +:meth:`sync_from_filesystem`, which subscribes to a +:class:`FilesystemRuleStore` and projects its +:class:`RuleChange` events onto upserts/deletes against ``ttp_rule``. """ from __future__ import annotations +import asyncio from collections.abc import AsyncIterator +from dataclasses import replace +from datetime import datetime, timezone +from typing import TYPE_CHECKING, Any, Final -from decnet.ttp.impl.rule_engine import CompiledRule +import yaml +from sqlalchemy import delete as sa_delete +from sqlalchemy import select as sa_select +from sqlmodel import col + +from decnet import telemetry as _telemetry +from decnet.bus import topics as _topics +from decnet.bus.publish import publish_safely +from decnet.logging import get_logger +from decnet.ttp.impl.rule_engine import CompiledRule, RuleSchema from decnet.ttp.store.base import RuleChange, RuleState, RuleStore +from decnet.web.db.models import TTPRule, TTPRuleState + +if TYPE_CHECKING: + from decnet.bus.base import BaseBus + from decnet.ttp.store.impl.filesystem import FilesystemRuleStore + from decnet.web.db.repository import BaseRepository + + +_log = get_logger("ttp.store.database") + + +def _tracer() -> Any: + return _telemetry.get_tracer("ttp.store") + + +def _utcnow() -> datetime: + return datetime.now(tz=timezone.utc) + + +def _is_expired(state: RuleState, now: datetime) -> bool: + if state.expires_at is None: + return False + expires = state.expires_at + if expires.tzinfo is None: + expires = expires.replace(tzinfo=timezone.utc) + return expires < now + + +def _row_to_state(row: TTPRuleState) -> RuleState: + state_value = row.state + if state_value not in ("enabled", "disabled", "clipped"): + # Pinned at the contract layer so an out-of-band SQL UPDATE + # cannot smuggle a bogus state through. + raise ValueError( + f"ttp_rule_state.state for {row.rule_id!r} is " + f"{state_value!r}; must be one of enabled/disabled/clipped", + ) + return RuleState( + state=state_value, # type: ignore[arg-type] + confidence_max=row.confidence_max, + expires_at=row.expires_at, + reason=row.reason, + set_by=row.set_by, + set_at=row.set_at, + ) + + +def _safe_set_attrs(span: Any, **attrs: Any) -> None: + setter = getattr(span, "set_attribute", None) + if setter is None: + return + for key, value in attrs.items(): + try: + setter(key, value) + except (TypeError, ValueError): + continue + + +def _compile_one(parsed: RuleSchema, state: RuleState) -> CompiledRule: + emits: list[tuple[str, str | None]] = [] + for entry in parsed.emits: + tid = entry.get("technique_id") + if not tid: + raise ValueError( + f"rule {parsed.rule_id}: every emits entry needs technique_id", + ) + sub = entry.get("sub_technique_id") or None + emits.append((tid, sub)) + return CompiledRule( + rule_id=parsed.rule_id, + rule_version=parsed.rule_version, + name=parsed.name, + applies_to=frozenset(parsed.applies_to), + match_spec=dict(parsed.match), + emits=tuple(emits), + evidence_fields=tuple(parsed.evidence_fields), + state=state, + ) + + +def _yaml_to_compiled(yaml_text: str, state: RuleState) -> CompiledRule: + doc = yaml.safe_load(yaml_text) + if not isinstance(doc, dict): + raise ValueError( + "ttp_rule.yaml_content top-level YAML must be a mapping", + ) + parsed = RuleSchema.model_validate(doc) + return _compile_one(parsed, state) + + +def _compiled_to_yaml(compiled: CompiledRule) -> str: + """Serialize a :class:`CompiledRule` back to a YAML rule body for + master-side filesystem→DB sync. Mirrors :class:`RuleSchema`.""" + emits: list[dict[str, str]] = [] + for technique_id, sub in compiled.emits: + entry: dict[str, str] = {"technique_id": technique_id} + if sub: + entry["sub_technique_id"] = sub + emits.append(entry) + return yaml.safe_dump({ + "rule_id": compiled.rule_id, + "rule_version": compiled.rule_version, + "name": compiled.name, + "applies_to": sorted(compiled.applies_to), + "match": compiled.match_spec, + "emits": emits, + "evidence_fields": list(compiled.evidence_fields), + }, sort_keys=False) + + +class _ChangeIterator: + def __init__( + self, + queue: asyncio.Queue[RuleChange], + subscribers: list[asyncio.Queue[RuleChange]], + ) -> None: + self._queue = queue + self._subscribers = subscribers + + def __aiter__(self) -> "_ChangeIterator": + return self + + async def __anext__(self) -> RuleChange: + return await self._queue.get() + + async def aclose(self) -> None: + try: + self._subscribers.remove(self._queue) + except ValueError: + pass class DatabaseRuleStore(RuleStore): - """``ttp_rule`` content + ``ttp_rule_state`` operational state. + """``ttp_rule`` content + ``ttp_rule_state`` operational state.""" - Contract phase: every method raises ``NotImplementedError``. The - impl step (E.3) implements DB-tail subscription + master-side - filesystem→DB sync. Worker-side tailing reads via the existing - repository pattern; the master's filesystem-watch sync is - structurally a delta from :class:`FilesystemRuleStore` plus a - ``ttp_rule`` upsert. - """ + def __init__( + self, + repo: "BaseRepository | None" = None, + *, + bus: "BaseBus | None" = None, + ) -> None: + self._repo = repo + self._bus = bus + self._subscribers: list[asyncio.Queue[RuleChange]] = [] + self._tail_task: asyncio.Task[None] | None = None + self._tail_watermark: datetime | None = None + self._sync_task: asyncio.Task[None] | None = None + self._stop = asyncio.Event() + self._lazy_lock = asyncio.Lock() + + async def _ensure_repo(self) -> "BaseRepository": + if self._repo is not None: + return self._repo + # Lazy in-memory SQLite repo so unit tests that just call + # ``DatabaseRuleStore()`` get a usable backend without ceremony. + # Production callers always pass an explicit repo via the + # worker bootstrap (E.3.14). + async with self._lazy_lock: + if self._repo is not None: + return self._repo + from decnet.web.db.sqlite.repository import SQLiteRepository # noqa: PLC0415 + + repo = SQLiteRepository(db_path=":memory:") + await repo.initialize() + self._repo = repo + return self._repo + + # ── ABC methods ───────────────────────────────────────────────── async def load_compiled(self) -> list[CompiledRule]: - raise NotImplementedError( - "DatabaseRuleStore.load_compiled lands at E.3", - ) + repo = await self._ensure_repo() + async with repo._session() as session: # type: ignore[attr-defined] + rule_rows = ( + await session.execute(sa_select(TTPRule)) + ).scalars().all() + state_rows = ( + await session.execute(sa_select(TTPRuleState)) + ).scalars().all() + states: dict[str, RuleState] = {} + now = _utcnow() + for row in state_rows: + cached = _row_to_state(row) + if _is_expired(cached, now): + cached = RuleState() + states[row.rule_id] = cached + compiled: list[CompiledRule] = [] + for rule_row in rule_rows: + state = states.get(rule_row.rule_id, RuleState()) + compiled.append(_yaml_to_compiled(rule_row.yaml_content, state)) + return compiled async def get_state(self, rule_id: str) -> RuleState: - raise NotImplementedError( - "DatabaseRuleStore.get_state lands at E.3", - ) + repo = await self._ensure_repo() + async with repo._session() as session: # type: ignore[attr-defined] + row = ( + await session.execute( + sa_select(TTPRuleState).where( + col(TTPRuleState.rule_id) == rule_id, + ), + ) + ).scalars().first() + if row is None: + return RuleState() + cached = _row_to_state(row) + if _is_expired(cached, _utcnow()): + # Auto-revert: drop the row, emit the change event. + await self._delete_state_row(rule_id) + default = RuleState() + await self._emit_change( + RuleChange("state", rule_id, default), + bus_topic=_topics.ttp_rule_state(rule_id), + payload={"rule_id": rule_id, "auto_revert": True}, + ) + return default + return cached async def set_state( self, @@ -47,14 +275,266 @@ class DatabaseRuleStore(RuleStore): state: RuleState, set_by: str, ) -> None: - raise NotImplementedError( - "DatabaseRuleStore.set_state lands at E.3", - ) + with _tracer().start_as_current_span("ttp.rule.state.change") as span: + _safe_set_attrs( + span, + rule_id=rule_id, + state=state.state, + set_by=set_by, + ) + stamped = replace(state, set_by=set_by, set_at=_utcnow()) + with _tracer().start_as_current_span("ttp.store.write_state"): + await self._upsert_state_row(rule_id, stamped) + with _tracer().start_as_current_span("ttp.rule.publish"): + await self._emit_change( + RuleChange("state", rule_id, stamped), + bus_topic=_topics.ttp_rule_state(rule_id), + payload={ + "rule_id": rule_id, + "state": stamped.state, + "set_by": set_by, + }, + ) def subscribe_changes(self) -> AsyncIterator[RuleChange]: - raise NotImplementedError( - "DatabaseRuleStore.subscribe_changes lands at E.3", + queue: asyncio.Queue[RuleChange] = asyncio.Queue() + self._subscribers.append(queue) + return _ChangeIterator(queue, self._subscribers) + + # ── Internals: subscriber fan-out ─────────────────────────────── + + async def _emit_change( + self, + change: RuleChange, + *, + bus_topic: str, + payload: dict[str, Any], + ) -> None: + for queue in list(self._subscribers): + await queue.put(change) + if self._bus is not None: + await publish_safely(self._bus, bus_topic, payload) + + # ── Internals: ttp_rule_state writes ──────────────────────────── + + async def _upsert_state_row( + self, rule_id: str, state: RuleState, + ) -> None: + repo = await self._ensure_repo() + async with repo._session() as session: # type: ignore[attr-defined] + existing = ( + await session.execute( + sa_select(TTPRuleState).where( + col(TTPRuleState.rule_id) == rule_id, + ), + ) + ).scalars().first() + if existing is None: + session.add( + TTPRuleState( + rule_id=rule_id, + state=state.state, + confidence_max=state.confidence_max, + expires_at=state.expires_at, + reason=state.reason, + set_by=state.set_by, + set_at=state.set_at or _utcnow(), + ), + ) + else: + existing.state = state.state + existing.confidence_max = state.confidence_max + existing.expires_at = state.expires_at + existing.reason = state.reason + existing.set_by = state.set_by + existing.set_at = state.set_at or _utcnow() + session.add(existing) + await session.commit() + + async def _delete_state_row(self, rule_id: str) -> None: + repo = await self._ensure_repo() + async with repo._session() as session: # type: ignore[attr-defined] + await session.execute( + sa_delete(TTPRuleState).where( + col(TTPRuleState.rule_id) == rule_id, + ), + ) + await session.commit() + + # ── ttp_rule writes (master-side filesystem sync) ─────────────── + + async def upsert_rule( + self, + compiled: CompiledRule, + *, + source_path: str, + updated_by: str, + ) -> None: + """Master-side: write a rule definition into ``ttp_rule``. + + Workers tailing the DB pick up the change via :meth:`tail_db` + and emit ``RuleChange("definition", ...)`` events to local + engines. Used by :meth:`sync_from_filesystem`. + """ + repo = await self._ensure_repo() + yaml_text = _compiled_to_yaml(compiled) + async with repo._session() as session: # type: ignore[attr-defined] + existing = ( + await session.execute( + sa_select(TTPRule).where( + col(TTPRule.rule_id) == compiled.rule_id, + ), + ) + ).scalars().first() + now = _utcnow() + if existing is None: + session.add(TTPRule( + rule_id=compiled.rule_id, + rule_version=compiled.rule_version, + source_path=source_path, + yaml_content=yaml_text, + updated_at=now, + updated_by=updated_by, + )) + else: + existing.rule_version = compiled.rule_version + existing.source_path = source_path + existing.yaml_content = yaml_text + existing.updated_at = now + existing.updated_by = updated_by + session.add(existing) + await session.commit() + await self._emit_change( + RuleChange("definition", compiled.rule_id, compiled), + bus_topic=_topics.ttp_rule_reloaded(compiled.rule_id), + payload={ + "rule_id": compiled.rule_id, + "rule_version": compiled.rule_version, + }, ) + async def delete_rule(self, rule_id: str) -> None: + repo = await self._ensure_repo() + async with repo._session() as session: # type: ignore[attr-defined] + await session.execute( + sa_delete(TTPRule).where(col(TTPRule.rule_id) == rule_id), + ) + await session.commit() + await self._emit_change( + RuleChange("definition", rule_id, _DELETED_SENTINEL), + bus_topic=_topics.ttp_rule_reloaded(rule_id), + payload={"rule_id": rule_id, "deleted": True}, + ) + + # ── Master: filesystem→DB sync ────────────────────────────────── + + async def sync_from_filesystem( + self, + fs_store: "FilesystemRuleStore", + *, + updated_by: str = "filesystem", + ) -> None: + """Subscribe to a :class:`FilesystemRuleStore` and project its + ``RuleChange`` events onto ``ttp_rule`` upserts/deletes. + + Runs forever; the caller (the master worker bootstrap E.3.14) + cancels it during shutdown. Definition deletes (the FS store + emits a sentinel ``CompiledRule`` with empty emits) project + onto a ``ttp_rule`` row delete. + """ + async for change in fs_store.subscribe_changes(): + try: + if change.change_kind != "definition": + continue + value = change.new_value + if not isinstance(value, CompiledRule): + continue + if not value.emits and not value.applies_to: + await self.delete_rule(change.rule_id) + else: + await self.upsert_rule( + value, + source_path=f"./rules/ttp/{change.rule_id}.yaml", + updated_by=updated_by, + ) + except Exception: # noqa: BLE001 + _log.exception( + "ttp.store.db: master sync failed rule_id=%s", + change.rule_id, + ) + + # ── Worker: DB-tail polling ───────────────────────────────────── + + async def tail_db(self, *, poll_interval: float = 1.0) -> None: + """Poll ``ttp_rule.updated_at`` past a watermark; emit + :class:`RuleChange` events for each row that moved. + + Used by worker bootstrap (E.3.14) so a swarm of workers each + receive per-rule definition changes without a shared bus + round-trip. The watermark advances on every observed row; + first poll initializes it to "now" so we don't replay history. + """ + repo = await self._ensure_repo() + if self._tail_watermark is None: + self._tail_watermark = _utcnow() + while not self._stop.is_set(): + try: + async with repo._session() as session: # type: ignore[attr-defined] + rows = ( + await session.execute( + sa_select(TTPRule).where( + col(TTPRule.updated_at) > self._tail_watermark, + ), + ) + ).scalars().all() + for rule_row in rows: + state = await self.get_state(rule_row.rule_id) + compiled = _yaml_to_compiled(rule_row.yaml_content, state) + await self._emit_change( + RuleChange("definition", compiled.rule_id, compiled), + bus_topic=_topics.ttp_rule_reloaded(compiled.rule_id), + payload={ + "rule_id": compiled.rule_id, + "rule_version": compiled.rule_version, + }, + ) + if ( + self._tail_watermark is None + or rule_row.updated_at > self._tail_watermark + ): + self._tail_watermark = rule_row.updated_at + except Exception: # noqa: BLE001 + _log.exception("ttp.store.db: tail poll failed") + try: + await asyncio.wait_for( + self._stop.wait(), timeout=poll_interval, + ) + except asyncio.TimeoutError: + continue + + async def stop(self) -> None: + self._stop.set() + for task in (self._tail_task, self._sync_task): + if task is not None: + task.cancel() + try: + await task + except (asyncio.CancelledError, Exception): # noqa: BLE001 + pass + self._tail_task = None + self._sync_task = None + + +_DELETED_SENTINEL: Final[CompiledRule] = CompiledRule( + rule_id="", + rule_version=0, + name="", + applies_to=frozenset(), + match_spec={}, + emits=(), + evidence_fields=(), + state=RuleState(), +) + __all__ = ["DatabaseRuleStore"] diff --git a/decnet/ttp/store/impl/filesystem.py b/decnet/ttp/store/impl/filesystem.py index d4b0013e..2c19721b 100644 --- a/decnet/ttp/store/impl/filesystem.py +++ b/decnet/ttp/store/impl/filesystem.py @@ -50,10 +50,10 @@ from typing import TYPE_CHECKING, Any, Final, Type import yaml +from decnet import telemetry as _telemetry from decnet.bus import topics as _topics from decnet.bus.publish import publish_safely from decnet.logging import get_logger -from decnet.telemetry import get_tracer from decnet.ttp.impl.rule_engine import CompiledRule, RuleSchema from decnet.ttp.store.base import RuleChange, RuleState, RuleStore @@ -62,7 +62,13 @@ if TYPE_CHECKING: _log = get_logger("ttp.store.filesystem") -_tracer = get_tracer("ttp.store") + + +def _tracer() -> Any: + # Late binding: tests monkeypatch ``decnet.telemetry.get_tracer`` + # at fixture setup; capturing the tracer at import time would freeze + # the no-op tracer into the module forever. + return _telemetry.get_tracer("ttp.store") # ── Filename allowlist ────────────────────────────────────────────── @@ -324,7 +330,7 @@ class FilesystemRuleStore(RuleStore): # Operational state changes are NOT a tolerated-absence path. # Failures here MUST raise rather than silently drop — the # E.2.14b conformance test pins this. - with _tracer.start_as_current_span("ttp.rule.state.change") as span: + with _tracer().start_as_current_span("ttp.rule.state.change") as span: # Defensive set_attribute: real OTEL spans accept str/int/etc; # the no-op tracer's _NoOpSpan ignores attributes silently. A # caller-side wrapper keeps both paths green without leaking @@ -336,10 +342,10 @@ class FilesystemRuleStore(RuleStore): set_by=set_by, ) stamped = replace(state, set_by=set_by, set_at=_utcnow()) - with _tracer.start_as_current_span("ttp.store.write_state"): + with _tracer().start_as_current_span("ttp.store.write_state"): self._state[rule_id] = stamped self._restamp_compiled(rule_id, stamped) - with _tracer.start_as_current_span("ttp.rule.publish"): + with _tracer().start_as_current_span("ttp.rule.publish"): await self._emit_change( RuleChange("state", rule_id, stamped), bus_topic=_topics.ttp_rule_state(rule_id), diff --git a/development/TTP_TAGGING.md b/development/TTP_TAGGING.md index 37bc2745..fed90c82 100644 --- a/development/TTP_TAGGING.md +++ b/development/TTP_TAGGING.md @@ -2944,7 +2944,13 @@ Order: variant. `ttp_rule` and `ttp_rule_state` tables created via SQLModel. Master-side filesystem→DB sync. Worker-side DB tail. Conformance tests green on both backends in parallel - (filesystem vs database) using the parametrized fixture. + (filesystem vs database) using the parametrized fixture. ✅ done. + Lazy in-memory SQLite repo for unconfigured construction (so + the conformance fixture works without test plumbing). + `sync_from_filesystem(fs_store)` master helper subscribes to a + `FilesystemRuleStore` and projects each `RuleChange` onto a + `ttp_rule` upsert/delete; `tail_db()` is the worker-side + watermark poll. 7. **RuleEngine** — implement engine consuming from `RuleStore`. Atomic per-rule swap on `RuleChange`. State applied after-parsing via `RuleState` join. `test_rule_engine.py` diff --git a/tests/ttp/store/conftest.py b/tests/ttp/store/conftest.py index 5a218cf8..95387602 100644 --- a/tests/ttp/store/conftest.py +++ b/tests/ttp/store/conftest.py @@ -14,29 +14,74 @@ from __future__ import annotations import sys from pathlib import Path -from typing import Iterator +from typing import AsyncIterator import pytest +import pytest_asyncio from decnet.ttp.store.base import RuleStore from decnet.ttp.store.impl.database import DatabaseRuleStore from decnet.ttp.store.impl.filesystem import FilesystemRuleStore +from decnet.web.db.models import TTPRule -@pytest.fixture( +async def _seed_rule_filesystem( + store: FilesystemRuleStore, rule_id: str, yaml_text: str, +) -> None: + rules_dir: Path = store._rules_dir + rules_dir.mkdir(parents=True, exist_ok=True) + (rules_dir / f"{rule_id}.yaml").write_text(yaml_text, encoding="utf-8") + + +async def _seed_rule_database( + store: DatabaseRuleStore, rule_id: str, yaml_text: str, +) -> None: + # Direct ``ttp_rule`` insert — bypass the master sync helper to + # keep tests deterministic. Mirrors what a swarm master would have + # written into the table. + repo = await store._ensure_repo() + async with repo._session() as session: # type: ignore[attr-defined] + from datetime import datetime, timezone # noqa: PLC0415 + + session.add(TTPRule( + rule_id=rule_id, + rule_version=1, + source_path=f"./rules/ttp/{rule_id}.yaml", + yaml_content=yaml_text, + updated_at=datetime.now(timezone.utc), + updated_by="test", + )) + await session.commit() + + +async def seed_rule(store: RuleStore, rule_id: str, yaml_text: str) -> None: + """Backend-aware test helper: write a rule into the store. + + Filesystem store: drop a YAML file under ``_rules_dir``. + Database store: insert a ``ttp_rule`` row directly. + """ + if isinstance(store, FilesystemRuleStore): + await _seed_rule_filesystem(store, rule_id, yaml_text) + elif isinstance(store, DatabaseRuleStore): + await _seed_rule_database(store, rule_id, yaml_text) + else: # pragma: no cover + raise TypeError(f"unknown rule store backend: {type(store).__name__}") + + +@pytest_asyncio.fixture( params=["filesystem", "database"], ids=["filesystem", "database"], ) -def rule_store( +async def rule_store( request: pytest.FixtureRequest, tmp_path: Path, -) -> Iterator[RuleStore]: +) -> AsyncIterator[RuleStore]: """Yield a fresh :class:`RuleStore` instance per parametrization. The filesystem backend is constructed against a ``tmp_path`` rules dir so tests never touch the real ``./rules/``. The - database backend's connection wiring lands at E.3.6; today the - fixture just hands out the raw class instance and impl-phase - tests are responsible for plumbing it into a session. + database backend gets a per-test SQLite repo (initialized with + ``metadata.create_all``) so each test sees an empty + ``ttp_rule`` / ``ttp_rule_state`` pair. """ backend = request.param if backend == "filesystem": @@ -44,4 +89,20 @@ def rule_store( pytest.skip("FilesystemRuleStore requires Linux (inotify)") yield FilesystemRuleStore(rules_dir=tmp_path) else: - yield DatabaseRuleStore() + from decnet.web.db.sqlite.repository import SQLiteRepository # noqa: PLC0415 + + repo = SQLiteRepository(db_path=str(tmp_path / "ttp_store.db")) + await repo.initialize() + store = DatabaseRuleStore(repo=repo) + # Mirror FS store's ``_rules_dir`` attr so cross-backend tests + # that need to drop sample YAML on disk have somewhere to put + # it; the DB-backend tests that need rule definitions either + # write to ``ttp_rule`` directly or call ``upsert_rule``. + store._rules_dir = tmp_path # type: ignore[attr-defined] + try: + yield store + finally: + try: + await repo.engine.dispose() + except Exception: # noqa: BLE001 — teardown best-effort + pass diff --git a/tests/ttp/store/test_conformance.py b/tests/ttp/store/test_conformance.py index 0e7ef154..5d2d50af 100644 --- a/tests/ttp/store/test_conformance.py +++ b/tests/ttp/store/test_conformance.py @@ -40,6 +40,8 @@ import pytest from decnet.ttp.impl.rule_engine import CompiledRule from decnet.ttp.store.base import RuleChange, RuleState, RuleStore +from .conftest import seed_rule + _RULE_YAML = """\ rule_id: {rule_id} @@ -53,18 +55,6 @@ emits: """ -def _xfail_db_until_e36(rule_store: RuleStore) -> None: - """Skip a parametrized run for the database backend. - - The conformance contract is identical across backends, but the - DB backend's persistence path lands at E.3.6. Per-test xfail - rather than a module-level skip so the FS-backend run still - exercises the assertion today. - """ - if type(rule_store).__name__ == "DatabaseRuleStore": - pytest.xfail("impl phase E.3.6 — DatabaseRuleStore not implemented") - - # ── Surface (GREEN today) ─────────────────────────────────────────── @@ -99,14 +89,7 @@ def test_rule_change_namedtuple_shape() -> None: async def test_get_state_unknown_returns_default(rule_store: RuleStore) -> None: """``get_state`` for a never-set ``rule_id`` returns the default - ``RuleState`` — never raises, never returns ``None``. - - GREEN for :class:`FilesystemRuleStore` (the impl already returns - ``RuleState()`` for an empty cache; covered in the contract - file). xfail for :class:`DatabaseRuleStore` until E.3.6 lands. - """ - if type(rule_store).__name__ == "DatabaseRuleStore": - pytest.xfail("impl phase E.3.6 — DatabaseRuleStore.get_state") + ``RuleState`` — never raises, never returns ``None``.""" state = await rule_store.get_state("R0001_unknown_rule") assert state == RuleState() assert state.state == "enabled" @@ -123,14 +106,8 @@ async def test_load_compiled_corpus_identical_across_backends( cross-backend property requires running the same fixture against both — pinned here as a single test that the parametrize fans out over both backends.""" - _xfail_db_until_e36(rule_store) - rules_dir: Path = rule_store._rules_dir # type: ignore[attr-defined] - (rules_dir / "R0001.yaml").write_text( - _RULE_YAML.format(rule_id="R0001"), encoding="utf-8", - ) - (rules_dir / "R0002.yaml").write_text( - _RULE_YAML.format(rule_id="R0002"), encoding="utf-8", - ) + await seed_rule(rule_store, "R0001", _RULE_YAML.format(rule_id="R0001")) + await seed_rule(rule_store, "R0002", _RULE_YAML.format(rule_id="R0002")) compiled = await rule_store.load_compiled() assert {c.rule_id for c in compiled} == {"R0001", "R0002"} for c in compiled: @@ -143,7 +120,6 @@ async def test_load_compiled_corpus_identical_across_backends( async def test_set_state_isolates_rules(rule_store: RuleStore) -> None: """``set_state(A, ...)`` does not perturb the state read by ``get_state(B)``.""" - _xfail_db_until_e36(rule_store) await rule_store.set_state( "R0001", RuleState(state="disabled", reason="A"), set_by="op", ) @@ -156,7 +132,6 @@ async def test_set_state_then_get_state_round_trips( ) -> None: """``set_state`` followed by ``get_state`` returns the value that was set. No translation, no field drop.""" - _xfail_db_until_e36(rule_store) new_state = RuleState( state="clipped", confidence_max=0.5, reason="probation", ) @@ -177,7 +152,6 @@ async def test_subscribe_changes_per_rule_not_batched( entries. The bus per-rule fan-out (``ttp.rule.reloaded.{rule_id}``) inherits its granularity from this iterator.""" - _xfail_db_until_e36(rule_store) sub = rule_store.subscribe_changes() for i in range(5): await rule_store.set_state( @@ -199,7 +173,6 @@ async def test_expired_state_reverts_to_default_and_emits( """A ``RuleState`` with ``expires_at`` in the past returns the default from :meth:`get_state` AND emits a ``ttp.rule.state.{rule_id}`` auto-revert event.""" - _xfail_db_until_e36(rule_store) past = datetime.now(tz=timezone.utc) - timedelta(seconds=5) sub = rule_store.subscribe_changes() await rule_store.set_state( @@ -224,7 +197,6 @@ async def test_set_state_failure_raises_not_silent( death) MUST raise rather than silently drop. Operational state changes are NOT a tolerated-absence path — state drift would be silent and dangerous.""" - _xfail_db_until_e36(rule_store) class _BoomQueue: async def put(self, _item: object) -> None: diff --git a/tests/ttp/store/test_database.py b/tests/ttp/store/test_database.py index 1f735e57..cbc4549f 100644 --- a/tests/ttp/store/test_database.py +++ b/tests/ttp/store/test_database.py @@ -2,8 +2,7 @@ Per ``development/TTP_TAGGING.md`` §E.2.14b: the database backend's tests run against BOTH SQLite and MySQL via the ``db_backends`` -fixture in :mod:`tests.web.db.conftest`. Today the database store's -methods raise ``NotImplementedError`` so most assertions xfail. +fixture in :mod:`tests.web.db.conftest`. The cross-backend conformance assertions (load_compiled equality, get_state default, set_state isolation/round-trip, @@ -13,16 +12,26 @@ via the parametrized ``rule_store`` fixture in :mod:`conftest`. This module pins behavior that's *only* meaningful for the database backend — specifically the propagation of state via the underlying -``ttp_rule_state`` table, which conformance tests exercise but don't -introspect at the SQL level. +``ttp_rule_state`` table and the master-side filesystem→DB sync +helper. """ from __future__ import annotations import inspect +import sys +from datetime import datetime, timezone +from pathlib import Path +from typing import Any import pytest +import pytest_asyncio +from decnet.ttp.impl.rule_engine import CompiledRule +from decnet.ttp.store.base import RuleState from decnet.ttp.store.impl.database import DatabaseRuleStore +from decnet.web.db.models import TTPRule, TTPRuleState +from sqlalchemy import select as sa_select +from sqlmodel import col def test_database_store_constructs_without_platform_guard() -> None: @@ -52,29 +61,148 @@ def test_async_methods_are_coroutines() -> None: assert inspect.iscoroutinefunction(member) -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.6 — DatabaseRuleStore needs to write " - "into ttp_rule_state via the repository session; today the " - "method body raises NotImplementedError", -) -async def test_set_state_writes_to_ttp_rule_state_table() -> None: +@pytest_asyncio.fixture +async def db_store(tmp_path: Path) -> Any: + from decnet.web.db.sqlite.repository import SQLiteRepository + + repo = SQLiteRepository(db_path=str(tmp_path / "ttp_db_store.db")) + await repo.initialize() + store = DatabaseRuleStore(repo=repo) + try: + yield store + finally: + try: + await repo.engine.dispose() + except Exception: # noqa: BLE001 + pass + + +async def test_set_state_writes_to_ttp_rule_state_table( + db_store: DatabaseRuleStore, tmp_path: Path, +) -> None: """``set_state`` writes / upserts a row in the ``ttp_rule_state`` - table. After the write, a fresh ``DatabaseRuleStore`` instance - sees the same value via :meth:`get_state` (state survives - process restart — that's the whole point of the database - backend over the filesystem one).""" - pytest.fail("DatabaseRuleStore.set_state not yet implemented") + table. After the write, a fresh :class:`DatabaseRuleStore` + instance pointing at the same DB sees the same value via + :meth:`get_state` — state survives process restart, which is the + whole point of the DB backend over the filesystem one.""" + await db_store.set_state( + "R0001", + RuleState(state="disabled", reason="probation"), + set_by="anti", + ) + repo = db_store._repo + assert repo is not None + async with repo._session() as session: # type: ignore[attr-defined] + row = ( + await session.execute( + sa_select(TTPRuleState).where( + col(TTPRuleState.rule_id) == "R0001", + ), + ) + ).scalars().first() + assert row is not None + assert row.state == "disabled" + assert row.reason == "probation" + assert row.set_by == "anti" + + # Fresh store instance against the same engine — state survives. + fresh = DatabaseRuleStore(repo=repo) + state = await fresh.get_state("R0001") + assert state.state == "disabled" + assert state.reason == "probation" -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.6 — master-side filesystem→DB sync of " - "ttp_rule lands with the swarm-mode wiring", -) -async def test_filesystem_to_db_sync_populates_ttp_rule() -> None: +async def test_filesystem_to_db_sync_populates_ttp_rule( + db_store: DatabaseRuleStore, tmp_path: Path, +) -> None: """In swarm mode, the master watches ``./rules/ttp/`` and syncs each YAML edit into the ``ttp_rule`` table; workers tail the DB. This test pins the half of the contract that - only the database backend implements.""" - pytest.fail("master-side fs→DB sync not yet implemented") + only the database backend implements: a CompiledRule fed to + :meth:`upsert_rule` lands as a ``ttp_rule`` row whose + ``yaml_content`` round-trips through :meth:`load_compiled`.""" + compiled = CompiledRule( + rule_id="R0001", + rule_version=1, + name="brute force ssh", + applies_to=frozenset({"command"}), + match_spec={"pattern": "hydra"}, + emits=(("T1110", None),), + evidence_fields=("matched_tokens",), + state=RuleState(), + ) + await db_store.upsert_rule( + compiled, + source_path="./rules/ttp/R0001.yaml", + updated_by="filesystem", + ) + repo = db_store._repo + assert repo is not None + async with repo._session() as session: # type: ignore[attr-defined] + row = ( + await session.execute( + sa_select(TTPRule).where(col(TTPRule.rule_id) == "R0001"), + ) + ).scalars().first() + assert row is not None + assert row.rule_version == 1 + assert row.updated_by == "filesystem" + # Round-trip through load_compiled. + loaded = await db_store.load_compiled() + assert len(loaded) == 1 + assert loaded[0].rule_id == "R0001" + assert loaded[0].emits == (("T1110", None),) + + +@pytest.mark.skipif( + sys.platform != "linux", + reason="FilesystemRuleStore is Linux-only (inotify dep)", +) +async def test_sync_from_filesystem_propagates_changes( + db_store: DatabaseRuleStore, tmp_path: Path, +) -> None: + """The master-side helper :meth:`sync_from_filesystem` projects + every :class:`RuleChange` from a :class:`FilesystemRuleStore` + onto a ``ttp_rule`` upsert. Validates the swarm-mode + bootstrap path: master watches disk, workers tail DB.""" + import asyncio # noqa: PLC0415 + from decnet.ttp.store.impl.filesystem import FilesystemRuleStore # noqa: PLC0415 + + rules_dir = tmp_path / "rules" + rules_dir.mkdir() + fs_store = FilesystemRuleStore(rules_dir=rules_dir) + + sync_task = asyncio.create_task( + db_store.sync_from_filesystem(fs_store, updated_by="git"), + ) + try: + async with fs_store: + await asyncio.sleep(0.05) + (rules_dir / "R0042.yaml").write_text( + """rule_id: R0042 +rule_version: 1 +name: test +applies_to: [command] +match: + pattern: 'whoami' +emits: + - technique_id: T1033 +""", + encoding="utf-8", + ) + # Give the sync task a moment to project the change. + for _ in range(20): + await asyncio.sleep(0.05) + loaded = await db_store.load_compiled() + if any(c.rule_id == "R0042" for c in loaded): + break + else: + pytest.fail("sync_from_filesystem did not project the edit") + ids = {c.rule_id for c in loaded} + assert "R0042" in ids + finally: + sync_task.cancel() + try: + await sync_task + except (asyncio.CancelledError, Exception): # noqa: BLE001 + pass diff --git a/tests/ttp/test_tracing.py b/tests/ttp/test_tracing.py index cb8e3c3c..231445b8 100644 --- a/tests/ttp/test_tracing.py +++ b/tests/ttp/test_tracing.py @@ -159,16 +159,38 @@ def test_rule_fire_spans_carry_rule_and_technique_attrs( # ── set_state span hierarchy (xfail until E.3.5/E.3.6) ────────────── -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.5/E.3.6 — set_state() span hierarchy lands " - "with the rule-store implementations", -) -def test_set_state_span_hierarchy(span_exporter: tuple[InMemorySpanExporter, TracerProvider]) -> None: +def test_set_state_span_hierarchy( + span_exporter: tuple[InMemorySpanExporter, TracerProvider], +) -> None: """``RuleStore.set_state`` produces a ``ttp.rule.state.change`` parent with ``ttp.store.write_state`` + ``ttp.rule.publish`` children — operator state changes are auditable.""" - pytest.fail("set_state spans not yet emitted") + import asyncio + import sys + + if sys.platform != "linux": # pragma: no cover + pytest.skip("FilesystemRuleStore is Linux-only (inotify dep)") + + from decnet.ttp.store.base import RuleState + from decnet.ttp.store.impl.filesystem import FilesystemRuleStore + + exporter, _provider = span_exporter + + async def _run() -> None: + import tempfile + + with tempfile.TemporaryDirectory() as td: + from pathlib import Path + store = FilesystemRuleStore(rules_dir=Path(td)) + await store.set_state( + "R0001", RuleState(state="disabled"), set_by="anti", + ) + + asyncio.run(_run()) + names = [span.name for span in exporter.get_finished_spans()] + assert "ttp.rule.state.change" in names + assert "ttp.store.write_state" in names + assert "ttp.rule.publish" in names # ── No-PII property (xfail until E.3.7+) ────────────────────────────