diff --git a/decnet/bus/topics.py b/decnet/bus/topics.py index d70f4227..216b2229 100644 --- a/decnet/bus/topics.py +++ b/decnet/bus/topics.py @@ -17,6 +17,7 @@ Token structure (NATS-style, dot-separated): identity.formed identity.observation.linked identity.merged + identity.unmerged credential.captured credential.reuse.detected system.log @@ -101,12 +102,21 @@ ATTACKER_INTEL_ENRICHED = "intel.enriched" # identity.merged — two identities collapsed; loser gets # ``merged_into_uuid`` set, subscribers # re-key cached references to the winner +# identity.unmerged — revocable-merge undo: contradicting +# evidence cleared ``merged_into_uuid`` +# and re-split observations. The +# resurrected side's UUID is the same +# as the prior loser, so subscribers +# that cached references to the loser +# during the merged interval can +# re-attach without a new lookup. # # ``identity.campaign.assigned`` is deferred; it ships when the campaign # clusterer ships. YAGNI before then. IDENTITY_FORMED = "formed" IDENTITY_OBSERVATION_LINKED = "observation.linked" IDENTITY_MERGED = "merged" +IDENTITY_UNMERGED = "unmerged" # Credential event types (second/third tokens under ``credential``). # ``credential.captured`` fires once per upserted Credential row — the @@ -215,9 +225,9 @@ def identity(event_type: str) -> str: """Build ``identity.``. *event_type* is typically one of :data:`IDENTITY_FORMED`, - :data:`IDENTITY_OBSERVATION_LINKED`, :data:`IDENTITY_MERGED`. Dotted - leaves (``observation.linked``) are permitted — same rationale as - :func:`system`. + :data:`IDENTITY_OBSERVATION_LINKED`, :data:`IDENTITY_MERGED`, or + :data:`IDENTITY_UNMERGED`. Dotted leaves (``observation.linked``) + are permitted — same rationale as :func:`system`. """ if not event_type: raise ValueError("identity topic requires a non-empty event_type") diff --git a/decnet/clustering/worker.py b/decnet/clustering/worker.py index 0229f855..811471c8 100644 --- a/decnet/clustering/worker.py +++ b/decnet/clustering/worker.py @@ -148,9 +148,13 @@ async def _publish_result(bus: Optional[BaseBus], result: ClusterResult) -> None merged, event_type=_topics.IDENTITY_MERGED, ) - # identities_unmerged ships once IDENTITY_UNMERGED is reserved - # (next commit). The field is already on ClusterResult so the - # revocable-merge work doesn't reshape the dataclass. + for unmerged in result.identities_unmerged: + await publish_safely( + bus, + _topics.identity(_topics.IDENTITY_UNMERGED), + unmerged, + event_type=_topics.IDENTITY_UNMERGED, + ) async def _wake_on(bus: BaseBus, wake: asyncio.Event, pattern: str) -> None: diff --git a/tests/bus/test_topics.py b/tests/bus/test_topics.py index 36c21cc5..958b473e 100644 --- a/tests/bus/test_topics.py +++ b/tests/bus/test_topics.py @@ -81,6 +81,7 @@ def test_identity_builder() -> None: assert topics.identity(topics.IDENTITY_FORMED) == "identity.formed" assert topics.identity(topics.IDENTITY_OBSERVATION_LINKED) == "identity.observation.linked" assert topics.identity(topics.IDENTITY_MERGED) == "identity.merged" + assert topics.identity(topics.IDENTITY_UNMERGED) == "identity.unmerged" def test_identity_builder_rejects_empty() -> None: diff --git a/tests/clustering/test_clusterer_worker.py b/tests/clustering/test_clusterer_worker.py index 40163213..c69efcdb 100644 --- a/tests/clustering/test_clusterer_worker.py +++ b/tests/clustering/test_clusterer_worker.py @@ -149,6 +149,9 @@ async def test_publishes_cluster_result_on_bus(monkeypatch, repo): identities_merged=[ {"winner_uuid": "id-1", "loser_uuid": "id-2"}, ], + identities_unmerged=[ + {"resurrected_uuid": "id-2", "former_winner_uuid": "id-1"}, + ], ) clusterer = _FakeClusterer(results=[result]) @@ -169,6 +172,7 @@ async def test_publishes_cluster_result_on_bus(monkeypatch, repo): assert _topics.identity(_topics.IDENTITY_FORMED) in topics_seen assert _topics.identity(_topics.IDENTITY_OBSERVATION_LINKED) in topics_seen assert _topics.identity(_topics.IDENTITY_MERGED) in topics_seen + assert _topics.identity(_topics.IDENTITY_UNMERGED) in topics_seen @pytest.mark.anyio