feat(ttp): E.1.2 bus topic contract — TTP_TAGGED, TTP_RULE_FIRED, TTP_RULE_SUPPRESSED, EMAIL_RECEIVED

Second TTP-tagging contract commit. Constants only — no publishers,
no subscribers, no tests. (E.2.3 ships the bus-topic naming tests.)

- New roots: EMAIL, TTP.
- New leaves: EMAIL_RECEIVED ('received', single-token under EMAIL),
  TTP_TAGGED ('tagged'), TTP_RULE_FIRED ('rule.fired'),
  TTP_RULE_SUPPRESSED ('rule.suppressed'). Per-rule reload + state
  topics ship with the RuleStore (E.1.11) — co-located with
  producer.
- New builders: email_topic(event_type), ttp(event_type),
  ttp_rule_fired(technique_id). The ttp_rule_fired builder validates
  technique_id as a single segment so sub-techniques like T1110.001
  are rejected at construction; topic key is the parent technique,
  sub_technique lives in the payload.
- email_topic is named with the _topic suffix to avoid shadowing the
  Python email stdlib at import sites that pull both.
- TTP_TAGGING.md E.1.2 entry corrected: the spec referenced
  'ATTACKER_ENRICHED' but the actual constant is
  ATTACKER_INTEL_ENRICHED ('intel.enriched'). The existing constant
  covers the design intent (TTP intel_lifter wakes on
  attacker.intel.enriched). No rename — would break every existing
  subscriber.

Wiki update for the four new topics ships in a sibling commit in
wiki-checkout (separate repo per project layout).
This commit is contained in:
2026-05-01 06:08:11 -04:00
parent ce7efdfdd2
commit e395306dcb
2 changed files with 87 additions and 3 deletions

View File

@@ -34,6 +34,10 @@ Token structure (NATS-style, dot-separated):
system.log
system.bus.health
system.{worker}.health
email.received
ttp.tagged
ttp.rule.fired.{technique_id}
ttp.rule.suppressed
Wildcards (per :func:`decnet.bus.base.matches`):
@@ -55,6 +59,8 @@ CREDENTIAL = "credential"
ORCHESTRATOR = "orchestrator"
CANARY = "canary"
SMTP = "smtp"
EMAIL = "email"
TTP = "ttp"
# ─── Leaf event-type constants (the last segment of each topic) ──────────────
@@ -245,6 +251,40 @@ WORKER_CONTROL_START = "start"
# of patterns. Payload is currently empty; consumers only need the signal.
WEBHOOK_SUBSCRIPTIONS_CHANGED = "system.webhook.subscriptions_changed"
# Email-receipt event — fired by smtp / smtp-relay services on full-message
# receipt (envelope + headers + body + attachments captured). Single-token
# leaf so the bus tokenizer accepts it directly under the ``email`` root.
# Consumed by the TTP ``email_lifter`` for header / body-pattern / attachment
# rules. PII rule (TTP_TAGGING.md "Hard parts §6"): payload carries hashes,
# counts, header names, and rcpt-domain sets — never rcpt addresses or body
# bytes.
EMAIL_RECEIVED = "received"
# TTP-tagging event types (second/third tokens under ``ttp``).
#
# ttp.tagged — one or more new tags written. Published
# only when ``INSERT OR IGNORE`` wrote at
# least one new row; idempotent
# re-evaluations publish nothing
# (loop-prevention invariant — see
# TTP_TAGGING.md).
# ttp.rule.fired.{technique_id} — per-technique fan-out for SIEM
# consumers that subscribe to a single
# technique. Topic key is the parent
# technique; sub_technique is in the
# payload. Built via :func:`ttp_rule_fired`.
# ttp.rule.suppressed — rule fired but the tag was dropped
# (confidence below floor, rate-limited,
# or the rule's RuleState was disabled).
# Observability signal for the dashboard.
#
# Per-rule reload + state-change topics (``ttp.rule.reloaded.{rule_id}`` /
# ``ttp.rule.state.{rule_id}``) ship in the RuleStore contract step — they
# are co-located with the producer.
TTP_TAGGED = "tagged"
TTP_RULE_FIRED = "rule.fired"
TTP_RULE_SUPPRESSED = "rule.suppressed"
# ─── Builders ────────────────────────────────────────────────────────────────
@@ -405,6 +445,46 @@ def smtp(event_type: str) -> str:
return f"{SMTP}.{event_type}"
def email_topic(event_type: str) -> str:
"""Build ``email.<event_type>``.
Named ``email_topic`` rather than ``email`` to avoid shadowing the
Python ``email`` stdlib package at import sites that pull both.
*event_type* is typically :data:`EMAIL_RECEIVED`.
"""
if not event_type:
raise ValueError("email topic requires a non-empty event_type")
return f"{EMAIL}.{event_type}"
def ttp(event_type: str) -> str:
"""Build ``ttp.<event_type>``.
*event_type* is typically one of :data:`TTP_TAGGED`,
:data:`TTP_RULE_FIRED`, or :data:`TTP_RULE_SUPPRESSED`. Dotted
leaves (``rule.fired``) are permitted — same rationale as
:func:`system`. For per-technique fan-out use
:func:`ttp_rule_fired`.
"""
if not event_type:
raise ValueError("ttp topic requires a non-empty event_type")
return f"{TTP}.{event_type}"
def ttp_rule_fired(technique_id: str) -> str:
"""Build ``ttp.rule.fired.<technique_id>``.
Per-technique fan-out: SIEM subscribers can listen on
``ttp.rule.fired.>`` for everything, ``ttp.rule.fired.T1110`` for
one technique. *technique_id* is validated as a single segment —
sub-techniques like ``T1110.001`` are rejected because they would
split into two tokens. The topic key is the parent technique;
``sub_technique_id`` lives in the payload.
"""
_reject_tokens(technique_id)
return f"{TTP}.rule.fired.{technique_id}"
def _reject_tokens(*parts: str) -> None:
"""Reject topic segments that would break NATS-style tokenization.