feat(bus): identity.* topic family (formed / observation.linked / merged)

Fourth of the five-step identity-resolution substrate. Constants and
builder ship now; no publishers exist yet — they land with the
clusterer worker. Subscribers (webhook worker, dashboard SSE relay)
can register against identity.> from day one.

* decnet/bus/topics.py — IDENTITY root + IDENTITY_FORMED /
  IDENTITY_OBSERVATION_LINKED / IDENTITY_MERGED leaves; identity()
  builder mirroring the attacker() / system() helpers. Module
  docstring topic-tree updated.
* tests/bus/test_topics.py — assert builder produces the expected
  three topic strings + rejects empty event_type.

Wiki Service-Bus.md and a new Identity-Resolution.md page land in the
companion wiki-checkout commit.
This commit is contained in:
2026-04-26 07:15:44 -04:00
parent 448212ebcd
commit 4f1077be72
2 changed files with 52 additions and 0 deletions

View File

@@ -14,6 +14,9 @@ Token structure (NATS-style, dot-separated):
attacker.scored
attacker.session.started
attacker.session.ended
identity.formed
identity.observation.linked
identity.merged
credential.captured
credential.reuse.detected
system.log
@@ -33,6 +36,7 @@ from __future__ import annotations
TOPOLOGY = "topology"
DECKY = "decky"
ATTACKER = "attacker"
IDENTITY = "identity"
SYSTEM = "system"
CREDENTIAL = "credential"
@@ -83,6 +87,27 @@ ATTACKER_SESSION_ENDED = "session.ended"
# provider summary so SIEM-bound webhooks don't need to re-query the DB.
ATTACKER_INTEL_ENRICHED = "intel.enriched"
# Identity-resolution event types (second/third tokens under ``identity``).
# Published by the (future) clusterer worker — see
# development/IDENTITY_RESOLUTION.md. Constants ship in this commit;
# no publishers exist yet, but consumers (webhook worker, dashboard
# SSE relay) can subscribe to ``identity.>`` from day one and receive
# events the instant the clusterer comes online.
#
# identity.formed — clusterer creates a new identity from
# one or more observations
# identity.observation.linked — observation attached to an existing
# identity (or reattached from another)
# identity.merged — two identities collapsed; loser gets
# ``merged_into_uuid`` set, subscribers
# re-key cached references to the winner
#
# ``identity.campaign.assigned`` is deferred; it ships when the campaign
# clusterer ships. YAGNI before then.
IDENTITY_FORMED = "formed"
IDENTITY_OBSERVATION_LINKED = "observation.linked"
IDENTITY_MERGED = "merged"
# Credential event types (second/third tokens under ``credential``).
# ``credential.captured`` fires once per upserted Credential row — the
# correlator listens for it and runs the cred-reuse query in response,
@@ -186,6 +211,19 @@ def attacker(event_type: str) -> str:
return f"{ATTACKER}.{event_type}"
def identity(event_type: str) -> str:
"""Build ``identity.<event_type>``.
*event_type* is typically one of :data:`IDENTITY_FORMED`,
:data:`IDENTITY_OBSERVATION_LINKED`, :data:`IDENTITY_MERGED`. Dotted
leaves (``observation.linked``) are permitted — same rationale as
:func:`system`.
"""
if not event_type:
raise ValueError("identity topic requires a non-empty event_type")
return f"{IDENTITY}.{event_type}"
def system_health(worker: str) -> str:
"""Build ``system.<worker>.health``.

View File

@@ -72,3 +72,17 @@ def test_system_control_builder() -> None:
def test_system_control_rejects_bad_segments(bad: str) -> None:
with pytest.raises(ValueError):
topics.system_control(bad)
# ─── Identity resolution topics (commit 4 of IDENTITY_RESOLUTION.md) ─────────
def test_identity_builder() -> None:
assert topics.identity(topics.IDENTITY_FORMED) == "identity.formed"
assert topics.identity(topics.IDENTITY_OBSERVATION_LINKED) == "identity.observation.linked"
assert topics.identity(topics.IDENTITY_MERGED) == "identity.merged"
def test_identity_builder_rejects_empty() -> None:
with pytest.raises(ValueError):
topics.identity("")