From 19cc8aa859d7e225b76040ecc88ca5ed73f80005 Mon Sep 17 00:00:00 2001 From: anti Date: Fri, 1 May 2026 06:33:34 -0400 Subject: [PATCH] =?UTF-8?q?feat(ttp):=20E.1.7=20worker=20contract=20?= =?UTF-8?q?=E2=80=94=20run=5Fttp=5Fworker=5Floop,=20=5FTOPICS,=20registry?= =?UTF-8?q?=20entry?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- decnet/ttp/worker.py | 167 ++++++++++++++++++++++++++++++++++ decnet/web/worker_registry.py | 1 + development/TTP_TAGGING.md | 2 + tests/ttp/test_worker.py | 107 ++++++++++++++++++++++ 4 files changed, 277 insertions(+) create mode 100644 decnet/ttp/worker.py create mode 100644 tests/ttp/test_worker.py diff --git a/decnet/ttp/worker.py b/decnet/ttp/worker.py new file mode 100644 index 00000000..44230a28 --- /dev/null +++ b/decnet/ttp/worker.py @@ -0,0 +1,167 @@ +"""Long-running TTP-tagging worker. + +Contract step E.1.7 of ``development/TTP_TAGGING.md``. Bus loop only: +connects, subscribes to the documented topics, runs heartbeat + +control listener, idles on the wake event. Real evaluation, +publishing, and persistence land in E.3 — the lifecycle wiring here +mirrors :mod:`decnet.intel.worker` and :mod:`decnet.clustering.worker` +exactly so the impl phase only fills in the inner loop. + +Bus subscriptions are enumerated as the module-level constant +:data:`_TOPICS` so E.2.12 can assert subscription wiring without +invoking the loop. The constant is the *single source of truth* — the +loop iterates over it; tests introspect it. Drift between code and +spec becomes a failed equality check, not a silent regression. +""" +from __future__ import annotations + +import asyncio +import contextlib +from typing import Optional + +from decnet.bus import topics as _topics +from decnet.bus.base import BaseBus +from decnet.bus.factory import get_bus +from decnet.bus.publish import ( + run_control_listener_signal as _run_control_listener_signal, + run_health_heartbeat as _run_health_heartbeat, +) +from decnet.logging import get_logger +from decnet.ttp.base import Tagger +from decnet.ttp.factory import get_tagger +from decnet.web.db.repository import BaseRepository + +log = get_logger("ttp.worker") + +_DEFAULT_POLL_SECS = 60.0 + + +# Bus topics the worker subscribes to. Kept as a module-level constant +# so E.2.12 can assert subscription wiring without invoking the loop — +# the test introspects this tuple, the loop iterates it. The set +# matches the design doc "Worker shape" section: session-ended primary +# trigger, observed for low-latency rules, intel-enriched + identity +# events for opportunistic re-tag, credential-reuse + email for the +# dedicated lifters, and ``canary.>`` for fleet-wide canary triggers. +_TOPICS: tuple[str, ...] = ( + _topics.attacker(_topics.ATTACKER_SESSION_ENDED), + _topics.attacker(_topics.ATTACKER_OBSERVED), + _topics.attacker(_topics.ATTACKER_INTEL_ENRICHED), + _topics.identity(_topics.IDENTITY_FORMED), + _topics.identity(_topics.IDENTITY_MERGED), + _topics.credential(_topics.CREDENTIAL_REUSE_DETECTED), + _topics.email_topic(_topics.EMAIL_RECEIVED), + # Canary triggers carry a per-token segment, so subscribe with the + # multi-token wildcard rather than enumerating per-token. Pattern + # validated against ``decnet.bus.topics.canary()``'s shape. + f"{_topics.CANARY}.>", +) + + +async def run_ttp_worker_loop( + repo: BaseRepository, + *, + poll_interval_secs: float = _DEFAULT_POLL_SECS, + tagger: Optional[Tagger] = None, + shutdown: Optional[asyncio.Event] = None, +) -> None: + """Run the TTP-tagging loop until cancelled. + + *tagger* defaults to :func:`decnet.ttp.factory.get_tagger` — tests + pass a fake. *shutdown* is an optional external stop signal; the + loop also exits cleanly on :class:`asyncio.CancelledError` and + :class:`KeyboardInterrupt`. + + Contract phase: the inner loop is a no-op idle. Bus connect, + heartbeat, control-listener, and topic subscriptions are wired so + the worker registers as ``ttp`` in + :data:`decnet.web.worker_registry.KNOWN_WORKERS` from day one. E.3 + fills in evaluation, persistence, and ``ttp.tagged`` publishes. + """ + if tagger is None: + tagger = get_tagger() + log.info( + "ttp worker started tagger=%s poll_interval_secs=%s topics=%d", + tagger.name, poll_interval_secs, len(_TOPICS), + ) + + bus: Optional[BaseBus] = None + wake = asyncio.Event() + wake_tasks: list[asyncio.Task] = [] + heartbeat_task: Optional[asyncio.Task] = None + try: + candidate = get_bus(client_name="ttp") + await candidate.connect() + bus = candidate + for pattern in _TOPICS: + wake_tasks.append(asyncio.create_task( + _wake_on(bus, wake, pattern), + )) + heartbeat_task = asyncio.create_task( + _run_health_heartbeat(bus, "ttp"), + ) + wake_tasks.append(asyncio.create_task( + _run_control_listener_signal(bus, "ttp"), + )) + except Exception as exc: # noqa: BLE001 + # Bus-unavailable is the steady state on dev boxes without a + # NATS daemon — fall back to poll-only so the worker still + # registers and the impl phase can backfill. + log.warning( + "ttp worker: bus unavailable, running in poll-only mode: %s", exc, + ) + + if shutdown is None: + shutdown = asyncio.Event() + + try: + while not shutdown.is_set(): + # Contract phase: the actual evaluate + insert + publish + # work lives in E.3. The shell idles on wake / poll so the + # heartbeat keeps reporting and the control listener can + # cleanly stop us. + try: + await asyncio.wait_for( + wake.wait(), timeout=float(poll_interval_secs), + ) + except asyncio.TimeoutError: + pass + wake.clear() + except (asyncio.CancelledError, KeyboardInterrupt): + log.info("ttp worker stopped") + finally: + for t in wake_tasks: + t.cancel() + if heartbeat_task is not None: + heartbeat_task.cancel() + for task in (*wake_tasks, heartbeat_task): + if task is None: + continue + with contextlib.suppress(asyncio.CancelledError, Exception): + await task + if bus is not None: + with contextlib.suppress(Exception): + await bus.close() + + +async def _wake_on(bus: BaseBus, wake: asyncio.Event, pattern: str) -> None: + """Flip *wake* every time *pattern* fires on the bus. + + Survives transient subscriber errors by logging and exiting; the + poll-interval fallback keeps the loop alive in poll-only mode. + """ + try: + sub = bus.subscribe(pattern) + async with sub: + async for _event in sub: + wake.set() + except asyncio.CancelledError: + raise + except Exception as exc: # noqa: BLE001 + log.warning( + "ttp worker: subscriber for %s died (%s); falling back to poll", + pattern, exc, + ) + + +__all__ = ["run_ttp_worker_loop", "_TOPICS"] diff --git a/decnet/web/worker_registry.py b/decnet/web/worker_registry.py index c36e076a..212497c4 100644 --- a/decnet/web/worker_registry.py +++ b/decnet/web/worker_registry.py @@ -43,6 +43,7 @@ KNOWN_WORKERS: tuple[str, ...] = ( "enrich", # threat-intel enrichment — bus-woken on attacker.observed/scored "clusterer", # behavioral clustering — bus-woken on attacker.scored "campaign-clusterer", # campaign assembly — bus-woken on identity.formed + "ttp", # MITRE ATT&CK technique tagging — bus-woken on session.ended / intel.enriched / email.received "webhook", # external SIEM/SOAR egress — bus consumer → HMAC HTTP POSTs "orchestrator", # synthetic life-injection — inter-decky traffic + file ops "agent", diff --git a/development/TTP_TAGGING.md b/development/TTP_TAGGING.md index 0d437cb5..dbe0eabf 100644 --- a/development/TTP_TAGGING.md +++ b/development/TTP_TAGGING.md @@ -2322,6 +2322,8 @@ unrelated events. **E.1.7 — Worker contract** (`decnet/ttp/worker.py`) +**Status:** ✅ done. + - `async def run_ttp_worker_loop(...)` signature matching `decnet/clustering/worker.py` and `decnet/intel/worker.py` (the parameter shape is already standardised across workers — copy it). diff --git a/tests/ttp/test_worker.py b/tests/ttp/test_worker.py new file mode 100644 index 00000000..74c5b453 --- /dev/null +++ b/tests/ttp/test_worker.py @@ -0,0 +1,107 @@ +"""Contract tests for :mod:`decnet.ttp.worker` (E.1.7). + +Scoped to the contract surface: ``_TOPICS`` shape and contents, +:func:`run_ttp_worker_loop` signature, clean shutdown via the +``shutdown`` event in poll-only mode, and worker-registry membership. +The full E.2.12 bus-integration battery (subscribed-set equality on +a fake bus, fan-out, loop-prevention invariant) is xfail-strict +pending E.3. +""" +from __future__ import annotations + +import asyncio +import inspect + +import pytest + +from decnet.ttp.base import Tagger, TaggerEvent +from decnet.ttp.worker import _TOPICS, run_ttp_worker_loop +from decnet.web.db.models.ttp import TTPTag +from decnet.web.worker_registry import KNOWN_WORKERS + + +class _NoopTagger(Tagger): + name = "noop" + + async def tag(self, event: TaggerEvent) -> list[TTPTag]: + return [] + + +def test_topics_is_non_empty_tuple_of_strings(): + assert isinstance(_TOPICS, tuple) + assert _TOPICS, "_TOPICS must enumerate at least one subscription" + assert all(isinstance(t, str) and t for t in _TOPICS) + + +def test_topics_covers_documented_design_subscriptions(): + # Sanity: the design doc names session.ended, intel.enriched, + # email.received, identity.formed, credential.reuse.detected, + # canary triggers, attacker.observed. We assert the topic STRINGS + # contain the documented leaves rather than re-importing the + # builders — keeps the test cheap and immune to topic-builder + # refactors that preserve wire format. + joined = " ".join(_TOPICS) + must_have = [ + "session.ended", + "intel.enriched", + "received", # email.received + "formed", # identity.formed + "reuse.detected", # credential reuse + "canary", + "observed", + ] + for fragment in must_have: + assert fragment in joined, f"_TOPICS missing {fragment!r}" + + +def test_run_ttp_worker_loop_signature(): + sig = inspect.signature(run_ttp_worker_loop) + params = sig.parameters + assert "repo" in params + assert "poll_interval_secs" in params + assert "tagger" in params + assert "shutdown" in params + # Mirrors :mod:`decnet.intel.worker` and + # :mod:`decnet.clustering.worker` — keyword-only after ``repo``. + kw_only = [ + p for p in params.values() + if p.kind is inspect.Parameter.KEYWORD_ONLY + ] + kw_only_names = {p.name for p in kw_only} + assert {"poll_interval_secs", "tagger", "shutdown"} <= kw_only_names + + +def test_worker_exits_cleanly_when_shutdown_set_immediately(): + async def _run() -> None: + shutdown = asyncio.Event() + shutdown.set() + # repo isn't touched in the contract phase; pass a sentinel. + # Bus is unavailable in test env → poll-only path. + await asyncio.wait_for( + run_ttp_worker_loop( + repo=object(), # type: ignore[arg-type] + poll_interval_secs=0.05, + tagger=_NoopTagger(), + shutdown=shutdown, + ), + timeout=5.0, + ) + + asyncio.run(_run()) + + +def test_ttp_registered_in_known_workers(): + assert "ttp" in KNOWN_WORKERS + + +# ── E.2.12 deferred bus-integration assertions ───────────────────── + + +@pytest.mark.xfail(strict=True, reason="impl phase E.3 — fan-out invokes engine") +def test_e212_session_ended_invokes_rule_engine(): + raise AssertionError("not yet implemented") + + +@pytest.mark.xfail(strict=True, reason="impl phase E.3 — loop-prevention invariant") +def test_e212_idempotent_re_evaluation_publishes_zero_events(): + raise AssertionError("not yet implemented")