Files
DECNET/tests/bus/test_ttp_topics.py
anti f2b3393669 chore: relicense to AGPL-3.0-or-later and add SPDX headers
Replaces LICENSE (GPLv3 -> AGPLv3) and prepends
`SPDX-License-Identifier: AGPL-3.0-or-later` to every source file
across decnet/, decnet_web/, tests/, scripts/, and tools/.

Rationale: closes the GPLv3 ASP loophole so any party operating a
modified DECNET as a network service must offer their modified
source. Personal copyright (Samuel Paschuan) + inbound=outbound
contributions make a future unilateral relicense infeasible.

- LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt)
- COPYRIGHT: project copyright notice
- tools/add_spdx_headers.py: idempotent header injector
  (shebang- and PEP 263-aware)

Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh).
No behavior change; comments only.
2026-05-22 21:04:16 -04:00

138 lines
5.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Bus topic naming tests for the TTP family (CDD step E.2.3).
Pins the wire vocabulary the worker (E.1.7), the API router (E.3.8),
and downstream SIEM consumers compile against. All assertions are
GREEN today — the constants and builders ship in
``decnet/bus/topics.py`` already; this test enforces that future
edits don't drift the names or break the wildcard contract.
"""
from __future__ import annotations
import pytest
from decnet.bus import topics
from decnet.bus.base import matches
# ─── Constant identity ───────────────────────────────────────────────────────
def test_ttp_leaf_constants() -> None:
assert topics.TTP_TAGGED == "tagged"
assert topics.TTP_RULE_FIRED == "rule.fired"
assert topics.TTP_RULE_SUPPRESSED == "rule.suppressed"
assert topics.TTP_RULE_RELOADED == "rule.reloaded"
assert topics.TTP_RULE_STATE == "rule.state"
def test_email_received_is_one_nats_token() -> None:
# The leaf must carry NO embedded dot; the bus tokenizer would
# otherwise split it into two segments and break the
# ``email.<event>`` hierarchy. Pinned at the constant level so a
# future edit "received.full" trips this test before it ships.
assert topics.EMAIL_RECEIVED == "received"
assert "." not in topics.EMAIL_RECEIVED
# ─── Built topics ────────────────────────────────────────────────────────────
def test_ttp_builder_produces_documented_strings() -> None:
assert topics.ttp(topics.TTP_TAGGED) == "ttp.tagged"
assert topics.ttp(topics.TTP_RULE_FIRED) == "ttp.rule.fired"
assert topics.ttp(topics.TTP_RULE_SUPPRESSED) == "ttp.rule.suppressed"
def test_ttp_rule_fired_per_technique() -> None:
assert topics.ttp_rule_fired("T1110") == "ttp.rule.fired.T1110"
assert topics.ttp_rule_fired("T1059") == "ttp.rule.fired.T1059"
def test_ttp_rule_reloaded_per_rule() -> None:
assert topics.ttp_rule_reloaded("R0001") == "ttp.rule.reloaded.R0001"
assert topics.ttp_rule_reloaded("R9999") == "ttp.rule.reloaded.R9999"
def test_ttp_rule_state_per_rule() -> None:
assert topics.ttp_rule_state("R0001") == "ttp.rule.state.R0001"
assert topics.ttp_rule_state("R0042") == "ttp.rule.state.R0042"
@pytest.mark.parametrize("bad", ["", "has.dot", "has*wild", "has>wild", "with space"])
def test_ttp_rule_reloaded_rejects_bad_segments(bad: str) -> None:
with pytest.raises(ValueError):
topics.ttp_rule_reloaded(bad)
@pytest.mark.parametrize("bad", ["", "has.dot", "has*wild", "has>wild", "with space"])
def test_ttp_rule_state_rejects_bad_segments(bad: str) -> None:
with pytest.raises(ValueError):
topics.ttp_rule_state(bad)
def test_email_topic_builder() -> None:
assert topics.email_topic(topics.EMAIL_RECEIVED) == "email.received"
def test_ttp_builder_rejects_empty() -> None:
with pytest.raises(ValueError):
topics.ttp("")
# ─── Wildcard subscription contract ──────────────────────────────────────────
@pytest.mark.parametrize("topic", [
"ttp.tagged",
"ttp.rule.fired",
"ttp.rule.fired.T1110",
"ttp.rule.suppressed",
"ttp.rule.reloaded.R0001",
"ttp.rule.state.R0001",
])
def test_ttp_wildcard_matches_every_documented_topic(topic: str) -> None:
assert matches("ttp.>", topic) is True
def test_ttp_rule_reloaded_wildcard_per_rule() -> None:
assert matches("ttp.rule.reloaded.>", "ttp.rule.reloaded.R0001") is True
assert matches("ttp.rule.reloaded.>", "ttp.rule.reloaded") is False
def test_ttp_rule_state_wildcard_per_rule() -> None:
assert matches("ttp.rule.state.>", "ttp.rule.state.R0001") is True
assert matches("ttp.rule.state.>", "ttp.rule.state") is False
def test_ttp_wildcard_excludes_root() -> None:
# ``>`` requires AT LEAST one trailing token. The bare root
# ``ttp`` must not match — pinned so a regression in
# decnet.bus.base.matches() (e.g. allowing zero-token suffix)
# is caught here.
assert matches("ttp.>", "ttp") is False
def test_ttp_rule_fired_wildcard_per_technique() -> None:
assert matches("ttp.rule.fired.>", "ttp.rule.fired.T1110") is True
assert matches("ttp.rule.fired.>", "ttp.rule.fired") is False
# ─── Sub-technique IDs are NOT topic segments ────────────────────────────────
def test_ttp_rule_fired_rejects_subtechnique_segment() -> None:
# Sub-technique ids carry an embedded dot (T1110.001). Allowing
# them as a topic segment would silently split the topic into two
# tokens and break ``ttp.rule.fired.>`` subscribers. The builder
# MUST reject — sub_technique_id rides the payload, never the
# wire address. (Documented at decnet/bus/topics.py:474485.)
with pytest.raises(ValueError):
topics.ttp_rule_fired("T1110.001")
@pytest.mark.parametrize("bad", ["", "has.dot", "has*wild", "has>wild", "with space"])
def test_ttp_rule_fired_rejects_bad_segments(bad: str) -> None:
with pytest.raises(ValueError):
topics.ttp_rule_fired(bad)