From 6814949bc015cbe37238fc1a02e1c9857506fd56 Mon Sep 17 00:00:00 2001 From: anti Date: Fri, 1 May 2026 07:37:58 -0400 Subject: [PATCH] =?UTF-8?q?test(ttp):=20E.2.12=20worker=20bus=20integratio?= =?UTF-8?q?n=20=E2=80=94=20=5FTOPICS=20equality,=20loop-prevention,=20deli?= =?UTF-8?q?very=20asymmetry?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pin _TOPICS frozenset against documented set (single source of truth). Worker→engine invocation, loop-prevention invariant, attacker.enriched/email.received catch-up asymmetry xfail-gated behind E.3.14. --- tests/ttp/test_worker_bus.py | 225 +++++++++++++++++++++++++++++++++++ 1 file changed, 225 insertions(+) create mode 100644 tests/ttp/test_worker_bus.py diff --git a/tests/ttp/test_worker_bus.py b/tests/ttp/test_worker_bus.py new file mode 100644 index 00000000..3d0d8e4a --- /dev/null +++ b/tests/ttp/test_worker_bus.py @@ -0,0 +1,225 @@ +"""E.2.12 — TTP worker bus integration tests. + +Pins the bus surface from ``development/TTP_TAGGING.md`` §"Bus topics", +§"Worker shape", §"Bus delivery requirements": + +* ``_TOPICS`` is the single source of truth for worker subscriptions + and matches the documented set exactly. +* Worker subscribes ONLY to topics in ``_TOPICS`` (no accidental + string-literal subscriptions drifting from the constants). +* Loop-prevention invariant: invoking the worker on the same source + event twice (or N=10×) publishes exactly one ``ttp.tagged`` event. +* Bus delivery asymmetry: dropping ``attacker.enriched`` still + produces intel-derived tags via the ``attacker.session.ended`` + catch-up path; dropping ``email.received`` produces NO email tags + (no catch-up exists for email). +* Engine invoked on incoming events. + +Topic-set equality is GREEN today. Worker-loop behavior beyond the +empty inner loop xfail-gated behind E.3.14. +""" +from __future__ import annotations + +import asyncio +from typing import AsyncIterator + +import pytest +import pytest_asyncio + +from decnet.bus import topics as _topics +from decnet.bus.fake import FakeBus +from decnet.ttp import worker as _worker + +# Re-imported so a `__all__` regression on the worker module fails +# noisily here rather than via a vague "module has no attribute". +from decnet.ttp.worker import _TOPICS, run_ttp_worker_loop + + +# ── Fixtures ──────────────────────────────────────────────────────── + + +@pytest_asyncio.fixture +async def fake_bus() -> AsyncIterator[FakeBus]: + bus = FakeBus() + await bus.connect() + try: + yield bus + finally: + await bus.close() + + +# ── _TOPICS surface (GREEN today) ─────────────────────────────────── + + +def test_topics_matches_documented_set() -> None: + """``_TOPICS`` equals the exact set declared in TTP_TAGGING.md + §"Bus topics". + + Pinning frozenset equality (rather than tuple equality) since + subscription order has no observable effect — but the *set* + must match. A future contributor adding a topic without doc / + test updates trips this. + """ + expected = frozenset({ + _topics.attacker(_topics.ATTACKER_SESSION_ENDED), + _topics.attacker(_topics.ATTACKER_OBSERVED), + _topics.attacker(_topics.ATTACKER_INTEL_ENRICHED), + _topics.identity(_topics.IDENTITY_FORMED), + _topics.identity(_topics.IDENTITY_MERGED), + _topics.credential(_topics.CREDENTIAL_REUSE_DETECTED), + _topics.email_topic(_topics.EMAIL_RECEIVED), + f"{_topics.CANARY}.>", + }) + assert frozenset(_TOPICS) == expected + + +def test_topics_is_module_level_constant() -> None: + """``_TOPICS`` lives at module scope (not method-local) so tests + can introspect it without invoking the loop. Catches a refactor + that hides the list inside :func:`run_ttp_worker_loop`.""" + assert hasattr(_worker, "_TOPICS") + assert isinstance(_worker._TOPICS, tuple) + assert all(isinstance(t, str) for t in _worker._TOPICS) + + +def test_topics_published_on_publish_topics_match_pattern() -> None: + """Every entry in ``_TOPICS`` is a valid bus topic / wildcard. + + Cheap sanity check — no dot-prefix bug, no empty strings, the + wildcard form (``canary.>``) actually parses through the bus + matcher. + """ + from decnet.bus.base import matches # noqa: PLC0415 — local import to avoid contaminate + for pattern in _TOPICS: + assert pattern, f"empty pattern in _TOPICS" + assert " " not in pattern + # Self-match: every pattern matches itself when interpreted + # as both pattern and concrete topic (modulo the ``>`` form + # which is only valid as pattern-side; for those we test a + # synthetic concrete extension matches). + if pattern.endswith(".>"): + base = pattern[:-2] + assert matches(pattern, f"{base}.example") + else: + assert matches(pattern, pattern) + + +# ── Subscription wiring (GREEN today: empty subset trivially holds) ─ + + +@pytest.mark.xfail( + strict=True, + reason="impl phase E.3.14 — worker bootstrap wires real " + "subscriptions; today the contract loop subscribes via _wake_on " + "but the assertion that no OTHER patterns are subscribed needs " + "introspection that the contract phase doesn't provide.", +) +async def test_worker_subscribes_only_to_topics(fake_bus: FakeBus) -> None: + """Run the worker briefly against a FakeBus and assert every + subscription target appears in :data:`_TOPICS`. + + Today the worker creates per-pattern wake tasks via + :func:`_wake_on`, which DO call ``bus.subscribe`` — but the + FakeBus doesn't expose a subscriber registry the test can read + without poking at private state. xfail until E.3.14 wires a + proper introspection hook (or the impl naturally exposes + subscribed patterns via a public method). + """ + pytest.fail("subscription introspection not yet wired") + + +# ── Worker invokes engine on session.ended (xfail until E.3.14) ───── + + +@pytest.mark.xfail( + strict=True, + reason="impl phase E.3.14 — worker inner loop is a no-op idle " + "today; engine invocation lands with the worker bootstrap step", +) +async def test_session_ended_invokes_engine(fake_bus: FakeBus) -> None: + """A faked ``attacker.session.ended`` event triggers a call to + ``RuleEngine.evaluate`` for the session's events. + + Today the worker idles on the wake event without invoking + anything, so this assertion xfails. Flips at E.3.14. + """ + pytest.fail("worker → engine wiring not yet implemented") + + +# ── Loop prevention (xfail until E.3.14) ──────────────────────────── + + +@pytest.mark.xfail( + strict=True, + reason="impl phase E.3.14 — loop-prevention invariant requires " + "the worker to actually publish ttp.tagged on first eval and " + "no-op on replay; today the worker publishes nothing.", +) +async def test_loop_prevention_no_re_fire(fake_bus: FakeBus) -> None: + """Invoking the worker on the same source event N=10× publishes + exactly one ``ttp.tagged`` event. + + Re-firing on a tag-write would create a feedback loop: + ttp.tagged → re-eval → ttp.tagged → … . The worker MUST NOT + subscribe to its own output, AND the underlying repo's + ``insert_tags`` is idempotent so re-eval writes nothing — both + halves of the invariant land at E.3.14 + E.3.3. + """ + pytest.fail("loop-prevention invariant not yet implemented") + + +# ── Bus delivery asymmetry (xfail until E.3.14) ───────────────────── + + +@pytest.mark.xfail( + strict=True, + reason="impl phase E.3.14 — catch-up via attacker.session.ended " + "lands with the intel lifter wire-up", +) +async def test_dropped_intel_enriched_still_produces_intel_tags( + fake_bus: FakeBus, +) -> None: + """Dropping ``attacker.enriched`` events does NOT lose intel-derived + tags, because the ``attacker.session.ended`` handler ALSO runs the + intel lifter as a catch-up path. Pinned per design doc §"Bus + delivery requirements": "best-effort intel events are belt; the + session-ended sweep is braces".""" + pytest.fail("intel catch-up path not yet implemented") + + +@pytest.mark.xfail( + strict=True, + reason="impl phase E.3.14 — email lifter only fires on " + "email.received; no catch-up path exists by design", +) +async def test_dropped_email_received_produces_no_email_tags( + fake_bus: FakeBus, +) -> None: + """Dropping ``email.received`` produces NO email-derived tags. + + The asymmetry is deliberate: emails are not stored as a + re-readable log the worker can sweep on session-ended — they + arrive as a single bus event and are processed once. The test + pins this rather than papering over it; a future contributor + "improving" the worker by adding an email catch-up path would + trip this test, which is the trip-wire that says "discuss the + PII implications first". + """ + pytest.fail("email lifter wiring not yet implemented") + + +# ── Worker module surface (GREEN today) ───────────────────────────── + + +def test_run_ttp_worker_loop_signature() -> None: + """The public entry point exists and is async. Catches a + refactor that accidentally renames or de-async's the function. + """ + import inspect # noqa: PLC0415 + assert asyncio.iscoroutinefunction(run_ttp_worker_loop) + sig = inspect.signature(run_ttp_worker_loop) + # Per E.1.7 contract: positional `repo`, keyword-only + # `poll_interval_secs`, `tagger`, `shutdown`. + assert "repo" in sig.parameters + assert "tagger" in sig.parameters + assert "shutdown" in sig.parameters