diff --git a/decnet/ttp/base.py b/decnet/ttp/base.py index 976f1448..3a1b8ef5 100644 --- a/decnet/ttp/base.py +++ b/decnet/ttp/base.py @@ -17,7 +17,7 @@ from __future__ import annotations import logging from abc import ABC, abstractmethod -from typing import Any, Final, NamedTuple +from typing import Any, Final, NamedTuple, Protocol, runtime_checkable from decnet.web.db.models.ttp import TTPTag @@ -130,9 +130,28 @@ class TolerantTagger(Tagger): """Real tagging logic — subclasses override this, not :meth:`tag`.""" +@runtime_checkable +class WatchableTagger(Protocol): + """Structural protocol for taggers that hot-reload from a RuleStore. + + Each per-source lifter (and :class:`RuleEngineTagger`) holds its + own :class:`~decnet.ttp.impl._rule_index.RuleIndex` and exposes an + ``async def watch_store()`` coroutine that loads the initial + corpus and drains store change events forever. The worker + (E.3.14) starts one task per ``WatchableTagger`` so dispatch + indexes hydrate at startup; without this the indexes stay empty + and no rule fires. ``runtime_checkable`` so the worker can fan + out via :func:`isinstance` without leaking the protocol into the + abstract :class:`Tagger` base. + """ + + async def watch_store(self) -> None: ... + + __all__ = [ "KNOWN_SOURCE_KINDS", "TaggerEvent", "Tagger", "TolerantTagger", + "WatchableTagger", ] diff --git a/decnet/ttp/factory.py b/decnet/ttp/factory.py index 2827a689..9e5513a9 100644 --- a/decnet/ttp/factory.py +++ b/decnet/ttp/factory.py @@ -23,7 +23,14 @@ import logging import os from typing import Final -from decnet.ttp.base import KNOWN_SOURCE_KINDS, Tagger, TaggerEvent +from collections.abc import Iterator + +from decnet.ttp.base import ( + KNOWN_SOURCE_KINDS, + Tagger, + TaggerEvent, + WatchableTagger, +) from decnet.web.db.models.ttp import TTPTag _log = logging.getLogger(__name__) @@ -66,6 +73,19 @@ class CompositeTagger(Tagger): self._warned_known: set[str] = set() self._informed_unknown: set[str] = set() + def iter_watchables(self) -> Iterator[WatchableTagger]: + """Yield every child lifter that hot-reloads from a RuleStore. + + The worker (E.3.14) starts one ``asyncio.Task`` per yielded + lifter so its dispatch index hydrates at startup; without this + every index stays empty and no rule fires in production. + Filtering on the structural :class:`WatchableTagger` protocol + keeps the worker free of per-lifter type knowledge. + """ + for lifter in self._lifters: + if isinstance(lifter, WatchableTagger): + yield lifter + async def tag(self, event: TaggerEvent) -> list[TTPTag]: lifters = self._by_kind.get(event.source_kind, []) if not lifters: diff --git a/decnet/ttp/worker.py b/decnet/ttp/worker.py index a90c1b84..686c1267 100644 --- a/decnet/ttp/worker.py +++ b/decnet/ttp/worker.py @@ -40,7 +40,7 @@ from decnet.bus.publish import ( ) from decnet.logging import get_logger from decnet.ttp.base import Tagger, TaggerEvent -from decnet.ttp.factory import get_tagger +from decnet.ttp.factory import CompositeTagger, get_tagger from decnet.web.db.models.ttp import TTPTag from decnet.web.db.repository import BaseRepository @@ -186,8 +186,23 @@ async def run_ttp_worker_loop( owned_bus = False queue: asyncio.Queue[tuple[str, Event] | None] = asyncio.Queue() pump_tasks: list[asyncio.Task[None]] = [] + watch_tasks: list[asyncio.Task[None]] = [] heartbeat_task: Optional[asyncio.Task[None]] = None control_task: Optional[asyncio.Task[None]] = None + + # Hydrate per-lifter rule indexes. Each WatchableTagger + # (CompositeTagger children + the RuleEngineTagger) owns its own + # RuleIndex and drains store change events forever via + # `watch_store`. Without these tasks every dispatch index stays + # empty and no rule fires — the bus subscriptions work, the + # pump tasks run, and tagger.tag() returns [] every call. Tasks + # are independent of the bus, so this fan-out runs even in + # poll-only mode. + if isinstance(tagger, CompositeTagger): + for watchable in tagger.iter_watchables(): + watch_tasks.append(asyncio.create_task( + _run_watch(watchable), + )) try: if bus is None: try: @@ -237,6 +252,8 @@ async def run_ttp_worker_loop( finally: for task in pump_tasks: task.cancel() + for task in watch_tasks: + task.cancel() if heartbeat_task is not None: heartbeat_task.cancel() if control_task is not None: @@ -244,6 +261,9 @@ async def run_ttp_worker_loop( for task in pump_tasks: with contextlib.suppress(asyncio.CancelledError, Exception): await task + for task in watch_tasks: + with contextlib.suppress(asyncio.CancelledError, Exception): + await task for opt in (heartbeat_task, control_task): if opt is None: continue @@ -342,6 +362,26 @@ async def _publish_tagged(bus: BaseBus, tags: list[TTPTag]) -> None: ) +async def _run_watch(watchable: Any) -> None: + """Drive one lifter's ``watch_store()`` coroutine forever. + + Mirrors :func:`_pump`'s tolerance contract: a transient store error + logs and exits the watch task without taking the worker down. The + main loop's poll-interval fallback continues to heartbeat; a + subsequent worker restart re-runs the watch fan-out and rehydrates. + """ + name = getattr(watchable, "name", watchable.__class__.__name__) + try: + await watchable.watch_store() + except asyncio.CancelledError: + raise + except Exception as exc: # noqa: BLE001 + log.warning( + "ttp worker: watch_store for %s died (%s); index will not " + "hot-reload until next worker restart", name, exc, + ) + + async def _pump( bus: BaseBus, queue: "asyncio.Queue[tuple[str, Event] | None]", diff --git a/tests/ttp/test_worker_watch_loop.py b/tests/ttp/test_worker_watch_loop.py new file mode 100644 index 00000000..3e984268 --- /dev/null +++ b/tests/ttp/test_worker_watch_loop.py @@ -0,0 +1,135 @@ +"""E.3.18a — Worker hydrates per-lifter rule indexes via watch_store(). + +Pins the wiring fix from ``development/TTP_TAGGING.md`` §"Worker shape": +each :class:`~decnet.ttp.base.WatchableTagger` child of the +:class:`CompositeTagger` (every per-source lifter, plus the +:class:`RuleEngineTagger`) must have its ``watch_store()`` coroutine +launched as an :mod:`asyncio` task by ``run_ttp_worker_loop`` — without +this fan-out every dispatch index stays empty and no rule fires in +production. +""" +from __future__ import annotations + +import asyncio +from typing import Any + +import pytest + +from decnet.bus.fake import FakeBus +from decnet.ttp.base import Tagger, TaggerEvent +from decnet.ttp.factory import CompositeTagger +from decnet.ttp.worker import run_ttp_worker_loop +from decnet.web.db.models.ttp import TTPTag + + +class _WatchableLifter(Tagger): + """Stub lifter exposing a ``watch_store`` that records lifecycle events.""" + + name = "watchable" + HANDLES = frozenset({"session"}) + + def __init__(self, *, raise_on_watch: bool = False) -> None: + self.watch_started = asyncio.Event() + self.watch_cancelled = False + self.watch_finished = False + self.raise_on_watch = raise_on_watch + + async def tag(self, event: TaggerEvent) -> list[TTPTag]: + return [] + + async def watch_store(self) -> None: + self.watch_started.set() + if self.raise_on_watch: + raise RuntimeError("watch_store blew up") + try: + await asyncio.Event().wait() # block forever until cancelled + except asyncio.CancelledError: + self.watch_cancelled = True + raise + finally: + self.watch_finished = True + + +class _NonWatchableLifter(Tagger): + """Stub lifter with NO watch_store — must be skipped by fan-out.""" + + name = "nonwatch" + HANDLES = frozenset({"intel"}) + + async def tag(self, event: TaggerEvent) -> list[TTPTag]: + return [] + + +class _StubRepo: + async def insert_tags(self, rows: list[TTPTag]) -> int: + return 0 + + +async def _run_worker_briefly( + composite: CompositeTagger, repo: Any, *, settle: float = 0.05, +) -> None: + bus = FakeBus() + await bus.connect() + shutdown = asyncio.Event() + task = asyncio.create_task(run_ttp_worker_loop( + repo=repo, + poll_interval_secs=0.05, + tagger=composite, + shutdown=shutdown, + bus=bus, + )) + await asyncio.sleep(settle) + shutdown.set() + await asyncio.wait_for(task, timeout=2.0) + await bus.close() + + +@pytest.mark.asyncio +async def test_iter_watchables_filters_to_watch_capable_lifters() -> None: + watchable = _WatchableLifter() + non = _NonWatchableLifter() + composite = CompositeTagger(lifters=[watchable, non]) + yielded = list(composite.iter_watchables()) + assert watchable in yielded + assert non not in yielded + + +@pytest.mark.asyncio +async def test_worker_starts_watch_store_for_every_watchable() -> None: + a, b = _WatchableLifter(), _WatchableLifter() + composite = CompositeTagger(lifters=[a, b]) + await _run_worker_briefly(composite, _StubRepo()) + assert a.watch_started.is_set() + assert b.watch_started.is_set() + assert a.watch_cancelled and b.watch_cancelled + + +@pytest.mark.asyncio +async def test_worker_does_not_call_watch_store_on_nonwatchable() -> None: + watch = _WatchableLifter() + non = _NonWatchableLifter() + composite = CompositeTagger(lifters=[watch, non]) + # If the worker tried to call watch_store on `non` it would + # AttributeError; that the run completes cleanly proves we filter. + await _run_worker_briefly(composite, _StubRepo()) + assert watch.watch_started.is_set() + + +@pytest.mark.asyncio +async def test_watch_store_failure_does_not_kill_worker() -> None: + bad = _WatchableLifter(raise_on_watch=True) + good = _WatchableLifter() + composite = CompositeTagger(lifters=[bad, good]) + # A blow-up in one watch task must not propagate; the worker shuts + # down cleanly and the surviving lifter's task still runs. + await _run_worker_briefly(composite, _StubRepo()) + assert good.watch_started.is_set() + + +@pytest.mark.asyncio +async def test_watch_tasks_cancelled_on_worker_shutdown() -> None: + watch = _WatchableLifter() + composite = CompositeTagger(lifters=[watch]) + await _run_worker_briefly(composite, _StubRepo()) + assert watch.watch_cancelled + assert watch.watch_finished