From 101127247e13f04f870ffbd4925b90367f6ddefa Mon Sep 17 00:00:00 2001 From: anti Date: Fri, 1 May 2026 20:57:57 -0400 Subject: [PATCH] feat(ttp): E.3.14 worker bootstrap (insert + ttp.tagged publish) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Inner loop drains a per-process asyncio.Queue populated by one pump task per topic in _TOPICS, dispatches each event through CompositeTagger, persists via repo.insert_tags(), and publishes ttp.tagged + per-technique ttp.rule.fired. only when the insert returned a non-zero rowcount. CompositeTagger seeded with all six lifters (Behavioral, Intel, CanaryFingerprint, Email, Identity, Credential). Loop-prevention invariant from TTP_TAGGING.md §"Bus topics" enforced: N replays of the same upstream event publish exactly one ttp.tagged event. test_worker_bus covers both the direct invocation path and the idempotency replay path. Intel catch-up via attacker.session.ended is intentionally deferred to E.3.14b — needs a session→intel join the repo doesn't expose yet. --- decnet/ttp/factory.py | 4 + decnet/ttp/worker.py | 323 +++++++++++++++++++++++++++------- development/TTP_TAGGING.md | 15 +- tests/ttp/test_worker.py | 16 +- tests/ttp/test_worker_bus.py | 327 ++++++++++++++++++++++------------- 5 files changed, 499 insertions(+), 186 deletions(-) diff --git a/decnet/ttp/factory.py b/decnet/ttp/factory.py index 2c60d10c..2827a689 100644 --- a/decnet/ttp/factory.py +++ b/decnet/ttp/factory.py @@ -118,7 +118,9 @@ def get_tagger() -> Tagger: from decnet.ttp.impl.canary_fingerprint_lifter import ( CanaryFingerprintLifter, ) + from decnet.ttp.impl.credential_lifter import CredentialLifter from decnet.ttp.impl.email_lifter import EmailLifter + from decnet.ttp.impl.identity_lifter import IdentityLifter from decnet.ttp.impl.intel_lifter import IntelLifter from decnet.ttp.store.factory import get_rule_store store = get_rule_store() @@ -127,6 +129,8 @@ def get_tagger() -> Tagger: IntelLifter(store), CanaryFingerprintLifter(store), EmailLifter(store), + IdentityLifter(store), + CredentialLifter(store), ]) raise ValueError( f"Unknown tagger: {name!r}. Known: {_KNOWN}" diff --git a/decnet/ttp/worker.py b/decnet/ttp/worker.py index 44230a28..a90c1b84 100644 --- a/decnet/ttp/worker.py +++ b/decnet/ttp/worker.py @@ -1,34 +1,47 @@ """Long-running TTP-tagging worker. -Contract step E.1.7 of ``development/TTP_TAGGING.md``. Bus loop only: -connects, subscribes to the documented topics, runs heartbeat + -control listener, idles on the wake event. Real evaluation, -publishing, and persistence land in E.3 — the lifecycle wiring here -mirrors :mod:`decnet.intel.worker` and :mod:`decnet.clustering.worker` -exactly so the impl phase only fills in the inner loop. +E.3.14 of ``development/TTP_TAGGING.md``. Drains the bus topics +declared in :data:`_TOPICS`, dispatches each event through the +:class:`~decnet.ttp.factory.CompositeTagger`, persists the produced +:class:`~decnet.web.db.models.ttp.TTPTag` rows via +:meth:`BaseRepository.insert_tags`, and publishes the documented +``ttp.tagged`` + ``ttp.rule.fired.`` events — but +*only* when ``insert_tags`` reported a non-zero rowcount, per the +"loop-prevention invariant" in TTP_TAGGING.md §"Bus topics". Bus subscriptions are enumerated as the module-level constant :data:`_TOPICS` so E.2.12 can assert subscription wiring without -invoking the loop. The constant is the *single source of truth* — the -loop iterates over it; tests introspect it. Drift between code and -spec becomes a failed equality check, not a silent regression. +invoking the loop. The constant is the *single source of truth* — +the loop iterates over it; tests introspect it. + +The inner loop drains a shared ``asyncio.Queue`` populated by one +task per topic. Each queued item is a ``(topic, Event)`` pair — +the topic decides the lifter family (and therefore the +``source_kind``), the payload carries the per-event identifiers. +Bus loss is tolerated: on transport error the per-topic pump task +exits and the loop falls back to the poll interval, which still +heartbeats and accepts a clean shutdown. """ from __future__ import annotations import asyncio import contextlib -from typing import Optional +from collections.abc import Iterator +from contextlib import contextmanager +from typing import Any, Optional +from decnet import telemetry as _telemetry from decnet.bus import topics as _topics -from decnet.bus.base import BaseBus +from decnet.bus.base import BaseBus, Event from decnet.bus.factory import get_bus from decnet.bus.publish import ( run_control_listener_signal as _run_control_listener_signal, run_health_heartbeat as _run_health_heartbeat, ) from decnet.logging import get_logger -from decnet.ttp.base import Tagger +from decnet.ttp.base import Tagger, TaggerEvent from decnet.ttp.factory import get_tagger +from decnet.web.db.models.ttp import TTPTag from decnet.web.db.repository import BaseRepository log = get_logger("ttp.worker") @@ -58,25 +71,110 @@ _TOPICS: tuple[str, ...] = ( ) +# Topic-segment → ``source_kind`` for the resulting TaggerEvent. We +# match on a short token contained in the topic so wildcard topics +# (``canary.{id}.triggered``) and per-event topics work uniformly. +_TOPIC_SOURCE_KIND: tuple[tuple[str, str], ...] = ( + ("session.ended", "session"), + ("observed", "session"), + ("intel.enriched", "intel"), + ("identity.formed", "identity"), + ("identity.merged", "identity"), + ("reuse.detected", "credential"), + ("email.received", "email"), + ("canary.", "canary_fingerprint"), +) + + +def _source_kind_for(topic: str) -> str | None: + for fragment, kind in _TOPIC_SOURCE_KIND: + if fragment in topic: + return kind + return None + + +@contextmanager +def _span(name: str, **attrs: Any) -> Iterator[Any]: + """Tracing helper short-circuiting on ``DECNET_DEVELOPER_TRACING``. + + Same shape as the engine / store helpers — single attribute lookup + when off, late-bound tracer when on so test monkeypatches reach us. + """ + if not _telemetry._ENABLED: + yield None + return + tracer = _telemetry.get_tracer("ttp.worker") + with tracer.start_as_current_span(name) as span: + for key, value in attrs.items(): + try: + span.set_attribute(key, value) + except (TypeError, ValueError): + continue + yield span + + +def _build_event(topic: str, payload: dict[str, Any]) -> TaggerEvent | None: + """Translate one bus payload into a :class:`TaggerEvent`. + + Returns ``None`` if the topic isn't one we know how to dispatch + (defensive — :data:`_TOPICS` and :data:`_TOPIC_SOURCE_KIND` are + kept in sync, but a wildcard subscription could in theory deliver + a topic outside the table). + + ``source_id`` is the stable per-event identifier the repository + uses for idempotency. We prefer the most-specific ID present in + the payload so a replay of the same upstream event produces the + same :func:`compute_tag_uuid` and the ``INSERT OR IGNORE`` write + becomes a no-op the second time around. The order below is the + same priority list the lifters use internally. + """ + source_kind = _source_kind_for(topic) + if source_kind is None: + return None + source_id = ( + payload.get("source_id") + or payload.get("session_id") + or payload.get("token_id") + or payload.get("identity_uuid") + or payload.get("credential_id") + or payload.get("attacker_uuid") + or payload.get("uuid") + or topic + ) + return TaggerEvent( + source_kind=source_kind, + source_id=str(source_id), + attacker_uuid=_str_or_none(payload.get("attacker_uuid")), + identity_uuid=_str_or_none(payload.get("identity_uuid")), + session_id=_str_or_none(payload.get("session_id")), + decky_id=_str_or_none(payload.get("decky_id")), + payload=dict(payload), + ) + + +def _str_or_none(value: Any) -> str | None: + if value is None: + return None + return str(value) + + async def run_ttp_worker_loop( repo: BaseRepository, *, poll_interval_secs: float = _DEFAULT_POLL_SECS, tagger: Optional[Tagger] = None, shutdown: Optional[asyncio.Event] = None, + bus: Optional[BaseBus] = None, ) -> None: """Run the TTP-tagging loop until cancelled. - *tagger* defaults to :func:`decnet.ttp.factory.get_tagger` — tests + *tagger* defaults to :func:`decnet.ttp.factory.get_tagger`; tests pass a fake. *shutdown* is an optional external stop signal; the loop also exits cleanly on :class:`asyncio.CancelledError` and - :class:`KeyboardInterrupt`. - - Contract phase: the inner loop is a no-op idle. Bus connect, - heartbeat, control-listener, and topic subscriptions are wired so - the worker registers as ``ttp`` in - :data:`decnet.web.worker_registry.KNOWN_WORKERS` from day one. E.3 - fills in evaluation, persistence, and ``ttp.tagged`` publishes. + :class:`KeyboardInterrupt`. *bus* is an optional pre-wired bus; + when omitted the worker calls :func:`get_bus` itself, falling back + to poll-only when the bus is unavailable (typical dev box without + a NATS daemon). """ if tagger is None: tagger = get_tagger() @@ -85,30 +183,38 @@ async def run_ttp_worker_loop( tagger.name, poll_interval_secs, len(_TOPICS), ) - bus: Optional[BaseBus] = None - wake = asyncio.Event() - wake_tasks: list[asyncio.Task] = [] - heartbeat_task: Optional[asyncio.Task] = None + owned_bus = False + queue: asyncio.Queue[tuple[str, Event] | None] = asyncio.Queue() + pump_tasks: list[asyncio.Task[None]] = [] + heartbeat_task: Optional[asyncio.Task[None]] = None + control_task: Optional[asyncio.Task[None]] = None try: - candidate = get_bus(client_name="ttp") - await candidate.connect() - bus = candidate - for pattern in _TOPICS: - wake_tasks.append(asyncio.create_task( - _wake_on(bus, wake, pattern), - )) - heartbeat_task = asyncio.create_task( - _run_health_heartbeat(bus, "ttp"), - ) - wake_tasks.append(asyncio.create_task( - _run_control_listener_signal(bus, "ttp"), - )) + if bus is None: + try: + candidate = get_bus(client_name="ttp") + await candidate.connect() + bus = candidate + owned_bus = True + except Exception as exc: # noqa: BLE001 + log.warning( + "ttp worker: bus unavailable, running in poll-only mode: %s", + exc, + ) + bus = None + if bus is not None: + for pattern in _TOPICS: + pump_tasks.append(asyncio.create_task( + _pump(bus, queue, pattern), + )) + heartbeat_task = asyncio.create_task( + _run_health_heartbeat(bus, "ttp"), + ) + control_task = asyncio.create_task( + _run_control_listener_signal(bus, "ttp"), + ) except Exception as exc: # noqa: BLE001 - # Bus-unavailable is the steady state on dev boxes without a - # NATS daemon — fall back to poll-only so the worker still - # registers and the impl phase can backfill. log.warning( - "ttp worker: bus unavailable, running in poll-only mode: %s", exc, + "ttp worker: bus setup failed, running in poll-only mode: %s", exc, ) if shutdown is None: @@ -116,45 +222,142 @@ async def run_ttp_worker_loop( try: while not shutdown.is_set(): - # Contract phase: the actual evaluate + insert + publish - # work lives in E.3. The shell idles on wake / poll so the - # heartbeat keeps reporting and the control listener can - # cleanly stop us. try: - await asyncio.wait_for( - wake.wait(), timeout=float(poll_interval_secs), + item = await asyncio.wait_for( + queue.get(), timeout=float(poll_interval_secs), ) except asyncio.TimeoutError: - pass - wake.clear() + continue + if item is None: + continue + topic, event = item + await _process_event(topic, event, tagger, repo, bus) except (asyncio.CancelledError, KeyboardInterrupt): log.info("ttp worker stopped") finally: - for t in wake_tasks: - t.cancel() + for task in pump_tasks: + task.cancel() if heartbeat_task is not None: heartbeat_task.cancel() - for task in (*wake_tasks, heartbeat_task): - if task is None: - continue + if control_task is not None: + control_task.cancel() + for task in pump_tasks: with contextlib.suppress(asyncio.CancelledError, Exception): await task - if bus is not None: + for opt in (heartbeat_task, control_task): + if opt is None: + continue + with contextlib.suppress(asyncio.CancelledError, Exception): + await opt + if owned_bus and bus is not None: with contextlib.suppress(Exception): await bus.close() -async def _wake_on(bus: BaseBus, wake: asyncio.Event, pattern: str) -> None: - """Flip *wake* every time *pattern* fires on the bus. +async def _process_event( + topic: str, + event: Event, + tagger: Tagger, + repo: BaseRepository, + bus: BaseBus | None, +) -> None: + """Dispatch one event through the tagger, persist, publish if new. + + Loop-prevention invariant: ``ttp.tagged`` is published ONLY when + :meth:`BaseRepository.insert_tags` returned a non-zero count. A + replay of the same upstream event hits the idempotent + ``INSERT OR IGNORE`` and writes zero rows → publishes zero events. + """ + tagger_event = _build_event(topic, event.payload) + if tagger_event is None: + return + with _span( + "ttp.worker.tick", + topic=topic, + source_kind=tagger_event.source_kind, + ): + try: + tags = await tagger.tag(tagger_event) + except Exception: # noqa: BLE001 + # Composite + TolerantTagger normally swallow per-lifter + # blow-ups already; this is the worst-case backstop so a + # single bad event can't take down the whole loop. + log.exception( + "ttp worker: tagger raised on topic=%r", topic, + ) + return + if not tags: + return + try: + inserted = await repo.insert_tags(tags) + except Exception: # noqa: BLE001 + log.exception( + "ttp worker: insert_tags failed on topic=%r", topic, + ) + return + if inserted <= 0: + # Idempotent re-eval — the loop-prevention invariant + # forbids publishing here. + return + if bus is not None: + await _publish_tagged(bus, tags) + + +async def _publish_tagged(bus: BaseBus, tags: list[TTPTag]) -> None: + """Publish ``ttp.tagged`` + per-technique ``ttp.rule.fired.*``. + + ``ttp.tagged`` carries the deduped technique list so a SIEM + subscriber can correlate without a DB read; per-technique fires + are 1:1 with the technique IDs touched by this batch (deduped so + a single batch produces one ``ttp.rule.fired.T1110`` even if + three rules emitted T1110). + """ + if not tags: + return + techniques = sorted({t.technique_id for t in tags}) + aggregate_payload: dict[str, Any] = { + "attacker_uuid": tags[0].attacker_uuid, + "identity_uuid": tags[0].identity_uuid, + "session_id": tags[0].session_id, + "tag_uuids": [t.uuid for t in tags], + "techniques_added": techniques, + } + await bus.publish( + _topics.ttp(_topics.TTP_TAGGED), + aggregate_payload, + event_type=_topics.TTP_TAGGED, + ) + for technique_id in techniques: + per_tech_payload: dict[str, Any] = { + "technique_id": technique_id, + "tag_uuids": [t.uuid for t in tags if t.technique_id == technique_id], + "attacker_uuid": tags[0].attacker_uuid, + "identity_uuid": tags[0].identity_uuid, + "session_id": tags[0].session_id, + } + await bus.publish( + _topics.ttp_rule_fired(technique_id), + per_tech_payload, + event_type=_topics.TTP_RULE_FIRED, + ) + + +async def _pump( + bus: BaseBus, + queue: "asyncio.Queue[tuple[str, Event] | None]", + pattern: str, +) -> None: + """Forward every event matching *pattern* into *queue*. Survives transient subscriber errors by logging and exiting; the - poll-interval fallback keeps the loop alive in poll-only mode. + poll-interval fallback in the main loop keeps the worker alive + until the next reconnect attempt. """ try: sub = bus.subscribe(pattern) async with sub: - async for _event in sub: - wake.set() + async for event in sub: + await queue.put((event.topic, event)) except asyncio.CancelledError: raise except Exception as exc: # noqa: BLE001 diff --git a/development/TTP_TAGGING.md b/development/TTP_TAGGING.md index 44421a75..ba65803f 100644 --- a/development/TTP_TAGGING.md +++ b/development/TTP_TAGGING.md @@ -3036,7 +3036,20 @@ Order: 14. **Worker bootstrap** — wire up the loop, the `CompositeTagger`, the bus subscriptions, the `RuleEngine` watching the `RuleStore`. `test_worker_bus.py` green - end-to-end. + end-to-end. ✅ done. Inner loop drains a per-process queue + populated by one pump task per topic, dispatches each event + through `CompositeTagger.tag()`, persists via + `repo.insert_tags()` (which already drops sub-0.3 confidence + and ON-CONFLICT-DO-NOTHING via the dialect hook), and + publishes `ttp.tagged` plus per-technique `ttp.rule.fired.*` + only when the insert returned a non-zero rowcount — + enforcing the loop-prevention invariant. CompositeTagger + seeded with all six lifters (Behavioral, Intel, + CanaryFingerprint, Email, Identity, Credential). The + intel-catch-up via `attacker.session.ended` is intentionally + deferred to E.3.14b — today the worker is 1:1 source-kind → + lifter; the catch-up rewrite needs a session→intel join the + repo doesn't expose yet. 15. **UKC bridge** — implement `tactic_to_ukc_phase` and inverse. Rewrite the campaign clusterer's `IdentityFeatures.commands_by_phase_on_decky` adapter to read diff --git a/tests/ttp/test_worker.py b/tests/ttp/test_worker.py index 74c5b453..69b488d8 100644 --- a/tests/ttp/test_worker.py +++ b/tests/ttp/test_worker.py @@ -94,14 +94,16 @@ def test_ttp_registered_in_known_workers(): assert "ttp" in KNOWN_WORKERS -# ── E.2.12 deferred bus-integration assertions ───────────────────── +# ── E.2.12 bus-integration smokes ─────────────────────────────────── +# The behavioral assertions live in tests/ttp/test_worker_bus.py against +# a real FakeBus. Keep these as non-xfail markers pointing to the +# integration coverage so a future contributor doesn't re-introduce the +# xfail and lose the trail. -@pytest.mark.xfail(strict=True, reason="impl phase E.3 — fan-out invokes engine") -def test_e212_session_ended_invokes_rule_engine(): - raise AssertionError("not yet implemented") +def test_e212_session_ended_invokes_rule_engine() -> None: + """See ``test_worker_bus.test_session_ended_invokes_engine``.""" -@pytest.mark.xfail(strict=True, reason="impl phase E.3 — loop-prevention invariant") -def test_e212_idempotent_re_evaluation_publishes_zero_events(): - raise AssertionError("not yet implemented") +def test_e212_idempotent_re_evaluation_publishes_zero_events() -> None: + """See ``test_worker_bus.test_loop_prevention_no_re_fire``.""" diff --git a/tests/ttp/test_worker_bus.py b/tests/ttp/test_worker_bus.py index 3d0d8e4a..b77ccb54 100644 --- a/tests/ttp/test_worker_bus.py +++ b/tests/ttp/test_worker_bus.py @@ -9,19 +9,13 @@ Pins the bus surface from ``development/TTP_TAGGING.md`` §"Bus topics", string-literal subscriptions drifting from the constants). * Loop-prevention invariant: invoking the worker on the same source event twice (or N=10×) publishes exactly one ``ttp.tagged`` event. -* Bus delivery asymmetry: dropping ``attacker.enriched`` still - produces intel-derived tags via the ``attacker.session.ended`` - catch-up path; dropping ``email.received`` produces NO email tags - (no catch-up exists for email). * Engine invoked on incoming events. - -Topic-set equality is GREEN today. Worker-loop behavior beyond the -empty inner loop xfail-gated behind E.3.14. """ from __future__ import annotations import asyncio -from typing import AsyncIterator +from datetime import datetime, timezone +from typing import Any, AsyncIterator import pytest import pytest_asyncio @@ -29,10 +23,9 @@ import pytest_asyncio from decnet.bus import topics as _topics from decnet.bus.fake import FakeBus from decnet.ttp import worker as _worker - -# Re-imported so a `__all__` regression on the worker module fails -# noisily here rather than via a vague "module has no attribute". +from decnet.ttp.base import Tagger, TaggerEvent from decnet.ttp.worker import _TOPICS, run_ttp_worker_loop +from decnet.web.db.models.ttp import TTPTag # ── Fixtures ──────────────────────────────────────────────────────── @@ -48,18 +41,116 @@ async def fake_bus() -> AsyncIterator[FakeBus]: await bus.close() -# ── _TOPICS surface (GREEN today) ─────────────────────────────────── +# ── Helpers ───────────────────────────────────────────────────────── + + +def _make_tag(rule_id: str = "R0007", technique_id: str = "T1110") -> TTPTag: + return TTPTag( + uuid=f"tag-{rule_id}-{technique_id}", + source_kind="session", + source_id="sess-1", + attacker_uuid="att1", + identity_uuid="id1", + session_id="sess-1", + decky_id="d1", + tactic="TA0006", + technique_id=technique_id, + sub_technique_id=None, + confidence=0.85, + rule_id=rule_id, + rule_version=1, + evidence={}, + attack_release="v15.1", + created_at=datetime.now(tz=timezone.utc), + ) + + +class _FixedTagger(Tagger): + """Tagger that returns a preset list of tags every time it's invoked.""" + + name = "fixed" + HANDLES = frozenset({"session", "intel", "credential", "identity", + "email", "canary_fingerprint"}) + + def __init__(self, tags: list[TTPTag]) -> None: + self._tags = tags + self.calls: list[TaggerEvent] = [] + + async def tag(self, event: TaggerEvent) -> list[TTPTag]: + self.calls.append(event) + return list(self._tags) + + +class _StubRepo: + """Minimal repo that mimics the deterministic-PK INSERT OR IGNORE. + + First call with a given uuid set returns the row count; replays + return zero (idempotent). Mirrors :meth:`SQLiteRepository. + _insert_tags_or_ignore` for tests without a real DB. + """ + + def __init__(self) -> None: + self._seen: set[str] = set() + self.calls: int = 0 + + async def insert_tags(self, rows: list[TTPTag]) -> int: + self.calls += 1 + new = [r for r in rows if r.uuid not in self._seen] + for r in new: + self._seen.add(r.uuid) + return len(new) + + +async def _drive_worker( + bus: FakeBus, + tagger: Tagger, + repo: Any, + publish: list[tuple[str, dict[str, Any]]], + *, + settle: float = 0.05, +) -> None: + """Run the worker, fire publishes, allow the queue to drain, stop.""" + shutdown = asyncio.Event() + task = asyncio.create_task(run_ttp_worker_loop( + repo=repo, + poll_interval_secs=0.05, + tagger=tagger, + shutdown=shutdown, + bus=bus, + )) + # Give the per-topic pumps a tick to register their subscriptions. + await asyncio.sleep(0.01) + for topic, payload in publish: + await bus.publish(topic, payload) + await asyncio.sleep(settle) + shutdown.set() + await asyncio.wait_for(task, timeout=2.0) + + +async def _collect( + bus: FakeBus, pattern: str, +) -> list[tuple[str, dict[str, Any]]]: + """Collect every event seen on *pattern* from now until the bus closes.""" + collected: list[tuple[str, dict[str, Any]]] = [] + sub = bus.subscribe(pattern) + + async def _drain() -> None: + try: + async with sub: + async for ev in sub: + collected.append((ev.topic, ev.payload)) + except Exception: + pass + + asyncio.create_task(_drain()) + await asyncio.sleep(0) # let subscriber register + return collected + + +# ── _TOPICS surface ───────────────────────────────────────────────── def test_topics_matches_documented_set() -> None: - """``_TOPICS`` equals the exact set declared in TTP_TAGGING.md - §"Bus topics". - - Pinning frozenset equality (rather than tuple equality) since - subscription order has no observable effect — but the *set* - must match. A future contributor adding a topic without doc / - test updates trips this. - """ expected = frozenset({ _topics.attacker(_topics.ATTACKER_SESSION_ENDED), _topics.attacker(_topics.ATTACKER_OBSERVED), @@ -74,29 +165,17 @@ def test_topics_matches_documented_set() -> None: def test_topics_is_module_level_constant() -> None: - """``_TOPICS`` lives at module scope (not method-local) so tests - can introspect it without invoking the loop. Catches a refactor - that hides the list inside :func:`run_ttp_worker_loop`.""" assert hasattr(_worker, "_TOPICS") assert isinstance(_worker._TOPICS, tuple) assert all(isinstance(t, str) for t in _worker._TOPICS) def test_topics_published_on_publish_topics_match_pattern() -> None: - """Every entry in ``_TOPICS`` is a valid bus topic / wildcard. + from decnet.bus.base import matches # noqa: PLC0415 - Cheap sanity check — no dot-prefix bug, no empty strings, the - wildcard form (``canary.>``) actually parses through the bus - matcher. - """ - from decnet.bus.base import matches # noqa: PLC0415 — local import to avoid contaminate for pattern in _TOPICS: - assert pattern, f"empty pattern in _TOPICS" + assert pattern, "empty pattern in _TOPICS" assert " " not in pattern - # Self-match: every pattern matches itself when interpreted - # as both pattern and concrete topic (modulo the ``>`` form - # which is only valid as pattern-side; for those we test a - # synthetic concrete extension matches). if pattern.endswith(".>"): base = pattern[:-2] assert matches(pattern, f"{base}.example") @@ -104,122 +183,134 @@ def test_topics_published_on_publish_topics_match_pattern() -> None: assert matches(pattern, pattern) -# ── Subscription wiring (GREEN today: empty subset trivially holds) ─ +# ── Subscription wiring ───────────────────────────────────────────── -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.14 — worker bootstrap wires real " - "subscriptions; today the contract loop subscribes via _wake_on " - "but the assertion that no OTHER patterns are subscribed needs " - "introspection that the contract phase doesn't provide.", -) async def test_worker_subscribes_only_to_topics(fake_bus: FakeBus) -> None: - """Run the worker briefly against a FakeBus and assert every - subscription target appears in :data:`_TOPICS`. - - Today the worker creates per-pattern wake tasks via - :func:`_wake_on`, which DO call ``bus.subscribe`` — but the - FakeBus doesn't expose a subscriber registry the test can read - without poking at private state. xfail until E.3.14 wires a - proper introspection hook (or the impl naturally exposes - subscribed patterns via a public method). + """Run the worker briefly and assert every subscription pattern + appears in :data:`_TOPICS`. Reads ``FakeBus._subs`` directly — + the in-process transport's only introspection hook. """ - pytest.fail("subscription introspection not yet wired") + shutdown = asyncio.Event() + task = asyncio.create_task(run_ttp_worker_loop( + repo=_StubRepo(), + poll_interval_secs=0.05, + tagger=_FixedTagger(tags=[]), + shutdown=shutdown, + bus=fake_bus, + )) + await asyncio.sleep(0.02) + # Heartbeat + control-listener subscribe to system.* topics; filter + # those out and assert what's left is exactly the documented set. + patterns = {sub.pattern for sub in fake_bus._subs} + ttp_patterns = {p for p in patterns if not p.startswith("system.")} + shutdown.set() + await asyncio.wait_for(task, timeout=2.0) + assert ttp_patterns == set(_TOPICS), ( + f"worker subscribed outside _TOPICS: extras={ttp_patterns - set(_TOPICS)}, " + f"missing={set(_TOPICS) - ttp_patterns}" + ) -# ── Worker invokes engine on session.ended (xfail until E.3.14) ───── +# ── Worker invokes engine on session.ended ────────────────────────── -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.14 — worker inner loop is a no-op idle " - "today; engine invocation lands with the worker bootstrap step", -) async def test_session_ended_invokes_engine(fake_bus: FakeBus) -> None: - """A faked ``attacker.session.ended`` event triggers a call to - ``RuleEngine.evaluate`` for the session's events. - - Today the worker idles on the wake event without invoking - anything, so this assertion xfails. Flips at E.3.14. - """ - pytest.fail("worker → engine wiring not yet implemented") + """A faked ``attacker.session.ended`` event triggers tagger.tag().""" + tagger = _FixedTagger(tags=[_make_tag()]) + repo = _StubRepo() + await _drive_worker( + fake_bus, tagger, repo, + [(_topics.attacker(_topics.ATTACKER_SESSION_ENDED), { + "session_id": "sess-1", "attacker_uuid": "att1", + })], + ) + assert len(tagger.calls) >= 1 + assert tagger.calls[0].source_kind == "session" + assert tagger.calls[0].session_id == "sess-1" + assert repo.calls == 1 -# ── Loop prevention (xfail until E.3.14) ──────────────────────────── +# ── Loop prevention ───────────────────────────────────────────────── -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.14 — loop-prevention invariant requires " - "the worker to actually publish ttp.tagged on first eval and " - "no-op on replay; today the worker publishes nothing.", -) async def test_loop_prevention_no_re_fire(fake_bus: FakeBus) -> None: - """Invoking the worker on the same source event N=10× publishes - exactly one ``ttp.tagged`` event. + """Same upstream event fired N=5× → exactly one ``ttp.tagged``. - Re-firing on a tag-write would create a feedback loop: - ttp.tagged → re-eval → ttp.tagged → … . The worker MUST NOT - subscribe to its own output, AND the underlying repo's - ``insert_tags`` is idempotent so re-eval writes nothing — both - halves of the invariant land at E.3.14 + E.3.3. + The repo's idempotent INSERT OR IGNORE returns 0 on replays; the + worker is contractually forbidden from publishing on a 0-rowcount + write (TTP_TAGGING.md §"Bus topics"). """ - pytest.fail("loop-prevention invariant not yet implemented") + tagged: list[tuple[str, dict[str, Any]]] = [] + + async def _capture() -> None: + sub = fake_bus.subscribe(_topics.ttp(_topics.TTP_TAGGED)) + async with sub: + async for ev in sub: + tagged.append((ev.topic, ev.payload)) + + capture_task = asyncio.create_task(_capture()) + await asyncio.sleep(0) + tagger = _FixedTagger(tags=[_make_tag()]) + repo = _StubRepo() + await _drive_worker( + fake_bus, tagger, repo, + [ + (_topics.attacker(_topics.ATTACKER_SESSION_ENDED), { + "session_id": "sess-replay", "attacker_uuid": "att1", + }), + ] * 5, + settle=0.15, + ) + capture_task.cancel() + with pytest.raises((asyncio.CancelledError, Exception)): + await capture_task + assert len(tagged) == 1, f"expected 1 ttp.tagged event, got {len(tagged)}" -# ── Bus delivery asymmetry (xfail until E.3.14) ───────────────────── +# ── Worker module surface ─────────────────────────────────────────── + + +def test_run_ttp_worker_loop_signature() -> None: + import inspect # noqa: PLC0415 + assert asyncio.iscoroutinefunction(run_ttp_worker_loop) + sig = inspect.signature(run_ttp_worker_loop) + assert "repo" in sig.parameters + assert "tagger" in sig.parameters + assert "shutdown" in sig.parameters + + +# ── Bus delivery asymmetry (still xfail — catch-up paths are E.3.14b) ─ @pytest.mark.xfail( strict=True, - reason="impl phase E.3.14 — catch-up via attacker.session.ended " - "lands with the intel lifter wire-up", + reason="catch-up via attacker.session.ended is design-deferred to " + "E.3.14b; today the worker fans events 1:1 by source_kind", ) async def test_dropped_intel_enriched_still_produces_intel_tags( fake_bus: FakeBus, ) -> None: - """Dropping ``attacker.enriched`` events does NOT lose intel-derived - tags, because the ``attacker.session.ended`` handler ALSO runs the - intel lifter as a catch-up path. Pinned per design doc §"Bus - delivery requirements": "best-effort intel events are belt; the - session-ended sweep is braces".""" pytest.fail("intel catch-up path not yet implemented") -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.14 — email lifter only fires on " - "email.received; no catch-up path exists by design", -) async def test_dropped_email_received_produces_no_email_tags( fake_bus: FakeBus, ) -> None: """Dropping ``email.received`` produces NO email-derived tags. - The asymmetry is deliberate: emails are not stored as a - re-readable log the worker can sweep on session-ended — they - arrive as a single bus event and are processed once. The test - pins this rather than papering over it; a future contributor - "improving" the worker by adding an email catch-up path would - trip this test, which is the trip-wire that says "discuss the - PII implications first". + The asymmetry is deliberate: emails arrive as a single bus event + and are processed once. There is no catch-up path. Exercise this + by NOT publishing email.received and confirming the tagger never + sees an email-source event. """ - pytest.fail("email lifter wiring not yet implemented") - - -# ── Worker module surface (GREEN today) ───────────────────────────── - - -def test_run_ttp_worker_loop_signature() -> None: - """The public entry point exists and is async. Catches a - refactor that accidentally renames or de-async's the function. - """ - import inspect # noqa: PLC0415 - assert asyncio.iscoroutinefunction(run_ttp_worker_loop) - sig = inspect.signature(run_ttp_worker_loop) - # Per E.1.7 contract: positional `repo`, keyword-only - # `poll_interval_secs`, `tagger`, `shutdown`. - assert "repo" in sig.parameters - assert "tagger" in sig.parameters - assert "shutdown" in sig.parameters + tagger = _FixedTagger(tags=[]) + repo = _StubRepo() + await _drive_worker( + fake_bus, tagger, repo, + [(_topics.attacker(_topics.ATTACKER_SESSION_ENDED), { + "session_id": "sess-1", + })], + ) + email_calls = [c for c in tagger.calls if c.source_kind == "email"] + assert email_calls == []