feat(bus): reserve identity.unmerged topic

Revocable merges (a contradiction-driven undo of identity.merged) ship
in the clusterer work; this reserves the topic up-front so identity.>
subscribers receive it day one without a re-subscribe.

The clusterer worker's ClusterResult fan-out now publishes on
identity.unmerged when populated. The skeleton clusterer never
populates it; the revocable-merge commit will.

Wiki update lives in wiki-checkout/Service-Bus.md (separate repo).
This commit is contained in:
2026-04-26 08:10:56 -04:00
parent e545f7d8d3
commit fb522af107
4 changed files with 25 additions and 6 deletions

View File

@@ -17,6 +17,7 @@ Token structure (NATS-style, dot-separated):
identity.formed identity.formed
identity.observation.linked identity.observation.linked
identity.merged identity.merged
identity.unmerged
credential.captured credential.captured
credential.reuse.detected credential.reuse.detected
system.log system.log
@@ -101,12 +102,21 @@ ATTACKER_INTEL_ENRICHED = "intel.enriched"
# identity.merged — two identities collapsed; loser gets # identity.merged — two identities collapsed; loser gets
# ``merged_into_uuid`` set, subscribers # ``merged_into_uuid`` set, subscribers
# re-key cached references to the winner # re-key cached references to the winner
# identity.unmerged — revocable-merge undo: contradicting
# evidence cleared ``merged_into_uuid``
# and re-split observations. The
# resurrected side's UUID is the same
# as the prior loser, so subscribers
# that cached references to the loser
# during the merged interval can
# re-attach without a new lookup.
# #
# ``identity.campaign.assigned`` is deferred; it ships when the campaign # ``identity.campaign.assigned`` is deferred; it ships when the campaign
# clusterer ships. YAGNI before then. # clusterer ships. YAGNI before then.
IDENTITY_FORMED = "formed" IDENTITY_FORMED = "formed"
IDENTITY_OBSERVATION_LINKED = "observation.linked" IDENTITY_OBSERVATION_LINKED = "observation.linked"
IDENTITY_MERGED = "merged" IDENTITY_MERGED = "merged"
IDENTITY_UNMERGED = "unmerged"
# Credential event types (second/third tokens under ``credential``). # Credential event types (second/third tokens under ``credential``).
# ``credential.captured`` fires once per upserted Credential row — the # ``credential.captured`` fires once per upserted Credential row — the
@@ -215,9 +225,9 @@ def identity(event_type: str) -> str:
"""Build ``identity.<event_type>``. """Build ``identity.<event_type>``.
*event_type* is typically one of :data:`IDENTITY_FORMED`, *event_type* is typically one of :data:`IDENTITY_FORMED`,
:data:`IDENTITY_OBSERVATION_LINKED`, :data:`IDENTITY_MERGED`. Dotted :data:`IDENTITY_OBSERVATION_LINKED`, :data:`IDENTITY_MERGED`, or
leaves (``observation.linked``) are permitted — same rationale as :data:`IDENTITY_UNMERGED`. Dotted leaves (``observation.linked``)
:func:`system`. are permitted — same rationale as :func:`system`.
""" """
if not event_type: if not event_type:
raise ValueError("identity topic requires a non-empty event_type") raise ValueError("identity topic requires a non-empty event_type")

View File

@@ -148,9 +148,13 @@ async def _publish_result(bus: Optional[BaseBus], result: ClusterResult) -> None
merged, merged,
event_type=_topics.IDENTITY_MERGED, event_type=_topics.IDENTITY_MERGED,
) )
# identities_unmerged ships once IDENTITY_UNMERGED is reserved for unmerged in result.identities_unmerged:
# (next commit). The field is already on ClusterResult so the await publish_safely(
# revocable-merge work doesn't reshape the dataclass. bus,
_topics.identity(_topics.IDENTITY_UNMERGED),
unmerged,
event_type=_topics.IDENTITY_UNMERGED,
)
async def _wake_on(bus: BaseBus, wake: asyncio.Event, pattern: str) -> None: async def _wake_on(bus: BaseBus, wake: asyncio.Event, pattern: str) -> None:

View File

@@ -81,6 +81,7 @@ def test_identity_builder() -> None:
assert topics.identity(topics.IDENTITY_FORMED) == "identity.formed" assert topics.identity(topics.IDENTITY_FORMED) == "identity.formed"
assert topics.identity(topics.IDENTITY_OBSERVATION_LINKED) == "identity.observation.linked" assert topics.identity(topics.IDENTITY_OBSERVATION_LINKED) == "identity.observation.linked"
assert topics.identity(topics.IDENTITY_MERGED) == "identity.merged" assert topics.identity(topics.IDENTITY_MERGED) == "identity.merged"
assert topics.identity(topics.IDENTITY_UNMERGED) == "identity.unmerged"
def test_identity_builder_rejects_empty() -> None: def test_identity_builder_rejects_empty() -> None:

View File

@@ -149,6 +149,9 @@ async def test_publishes_cluster_result_on_bus(monkeypatch, repo):
identities_merged=[ identities_merged=[
{"winner_uuid": "id-1", "loser_uuid": "id-2"}, {"winner_uuid": "id-1", "loser_uuid": "id-2"},
], ],
identities_unmerged=[
{"resurrected_uuid": "id-2", "former_winner_uuid": "id-1"},
],
) )
clusterer = _FakeClusterer(results=[result]) clusterer = _FakeClusterer(results=[result])
@@ -169,6 +172,7 @@ async def test_publishes_cluster_result_on_bus(monkeypatch, repo):
assert _topics.identity(_topics.IDENTITY_FORMED) in topics_seen assert _topics.identity(_topics.IDENTITY_FORMED) in topics_seen
assert _topics.identity(_topics.IDENTITY_OBSERVATION_LINKED) in topics_seen assert _topics.identity(_topics.IDENTITY_OBSERVATION_LINKED) in topics_seen
assert _topics.identity(_topics.IDENTITY_MERGED) in topics_seen assert _topics.identity(_topics.IDENTITY_MERGED) in topics_seen
assert _topics.identity(_topics.IDENTITY_UNMERGED) in topics_seen
@pytest.mark.anyio @pytest.mark.anyio