From cce84f23dc4646e6cf50a4871e369d2cf6b501cb Mon Sep 17 00:00:00 2001 From: anti Date: Fri, 1 May 2026 06:53:05 -0400 Subject: [PATCH] =?UTF-8?q?test(bus):=20E.2.3=20TTP=20topic=20naming=20?= =?UTF-8?q?=E2=80=94=20constants,=20builders,=20wildcard=20match?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- development/TTP_TAGGING.md | 2 + tests/bus/test_ttp_topics.py | 100 +++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 tests/bus/test_ttp_topics.py diff --git a/development/TTP_TAGGING.md b/development/TTP_TAGGING.md index 278ff66c..671f9897 100644 --- a/development/TTP_TAGGING.md +++ b/development/TTP_TAGGING.md @@ -2538,6 +2538,8 @@ lands; PII rule §6 type assertion is GREEN today). **E.2.3 — Bus topic naming tests** (`tests/bus/test_ttp_topics.py`) +**Status:** ✅ done. + - All TTP_* constants match the documented names exactly. - `matches("ttp.>", TTP_TAGGED)` is True (subscription wildcards work as documented). diff --git a/tests/bus/test_ttp_topics.py b/tests/bus/test_ttp_topics.py new file mode 100644 index 00000000..b1b5df52 --- /dev/null +++ b/tests/bus/test_ttp_topics.py @@ -0,0 +1,100 @@ +"""Bus topic naming tests for the TTP family (CDD step E.2.3). + +Pins the wire vocabulary the worker (E.1.7), the API router (E.3.8), +and downstream SIEM consumers compile against. All assertions are +GREEN today — the constants and builders ship in +``decnet/bus/topics.py`` already; this test enforces that future +edits don't drift the names or break the wildcard contract. +""" +from __future__ import annotations + +import pytest + +from decnet.bus import topics +from decnet.bus.base import matches + + +# ─── Constant identity ─────────────────────────────────────────────────────── + + +def test_ttp_leaf_constants() -> None: + assert topics.TTP_TAGGED == "tagged" + assert topics.TTP_RULE_FIRED == "rule.fired" + assert topics.TTP_RULE_SUPPRESSED == "rule.suppressed" + + +def test_email_received_is_one_nats_token() -> None: + # The leaf must carry NO embedded dot; the bus tokenizer would + # otherwise split it into two segments and break the + # ``email.`` hierarchy. Pinned at the constant level so a + # future edit "received.full" trips this test before it ships. + assert topics.EMAIL_RECEIVED == "received" + assert "." not in topics.EMAIL_RECEIVED + + +# ─── Built topics ──────────────────────────────────────────────────────────── + + +def test_ttp_builder_produces_documented_strings() -> None: + assert topics.ttp(topics.TTP_TAGGED) == "ttp.tagged" + assert topics.ttp(topics.TTP_RULE_FIRED) == "ttp.rule.fired" + assert topics.ttp(topics.TTP_RULE_SUPPRESSED) == "ttp.rule.suppressed" + + +def test_ttp_rule_fired_per_technique() -> None: + assert topics.ttp_rule_fired("T1110") == "ttp.rule.fired.T1110" + assert topics.ttp_rule_fired("T1059") == "ttp.rule.fired.T1059" + + +def test_email_topic_builder() -> None: + assert topics.email_topic(topics.EMAIL_RECEIVED) == "email.received" + + +def test_ttp_builder_rejects_empty() -> None: + with pytest.raises(ValueError): + topics.ttp("") + + +# ─── Wildcard subscription contract ────────────────────────────────────────── + + +@pytest.mark.parametrize("topic", [ + "ttp.tagged", + "ttp.rule.fired", + "ttp.rule.fired.T1110", + "ttp.rule.suppressed", +]) +def test_ttp_wildcard_matches_every_documented_topic(topic: str) -> None: + assert matches("ttp.>", topic) is True + + +def test_ttp_wildcard_excludes_root() -> None: + # ``>`` requires AT LEAST one trailing token. The bare root + # ``ttp`` must not match — pinned so a regression in + # decnet.bus.base.matches() (e.g. allowing zero-token suffix) + # is caught here. + assert matches("ttp.>", "ttp") is False + + +def test_ttp_rule_fired_wildcard_per_technique() -> None: + assert matches("ttp.rule.fired.>", "ttp.rule.fired.T1110") is True + assert matches("ttp.rule.fired.>", "ttp.rule.fired") is False + + +# ─── Sub-technique IDs are NOT topic segments ──────────────────────────────── + + +def test_ttp_rule_fired_rejects_subtechnique_segment() -> None: + # Sub-technique ids carry an embedded dot (T1110.001). Allowing + # them as a topic segment would silently split the topic into two + # tokens and break ``ttp.rule.fired.>`` subscribers. The builder + # MUST reject — sub_technique_id rides the payload, never the + # wire address. (Documented at decnet/bus/topics.py:474–485.) + with pytest.raises(ValueError): + topics.ttp_rule_fired("T1110.001") + + +@pytest.mark.parametrize("bad", ["", "has.dot", "has*wild", "has>wild", "with space"]) +def test_ttp_rule_fired_rejects_bad_segments(bad: str) -> None: + with pytest.raises(ValueError): + topics.ttp_rule_fired(bad)