Files
anti 6936a1426c feat(clustering): campaign-clusterer worker + bus topics + CLI
The campaign clusterer worker mirrors the identity-side worker shell
(bus connect, heartbeat, control listener, slow-tick fallback) but
wakes on identity.> instead of attacker.> — campaign-level work is
gated on identity-layer changes, not raw observations.

The connected-components implementation reads identities via
list_identities_for_clustering, projects them with from_identity_row,
runs union-find over combined_campaign_weight, writes campaigns rows,
sets attacker_identities.campaign_id, and runs the same revocable-
merge pass as the identity layer (a merged-out campaign whose
identities no longer co-cluster with the winner gets revoked).

Bus: adds campaign.> family (formed / identity.assigned / merged /
unmerged) plus the cross-family identity.campaign.assigned so
existing identity-stream subscribers see the badge update without
having to subscribe to campaign.>. Wiki Service-Bus.md updated in
wiki-checkout in the same wave per the project's bus-signals
discipline.

CLI: decnet campaign-clusterer registered as master-only via
MASTER_ONLY_COMMANDS; --poll-interval / --daemon mirror the identity
clusterer command surface.
2026-04-26 09:04:00 -04:00

67 lines
2.3 KiB
Python

"""Campaign clusterer protocol — layer above identity resolution.
Mirrors :mod:`decnet.clustering.base` for the layer above. Each concrete
campaign clusterer implements :class:`CampaignClusterer`; callers obtain
the active instance via
:func:`decnet.clustering.campaign.factory.get_campaign_clusterer`.
The result shape parallels :class:`ClusterResult` but speaks campaign
vocabulary: campaigns formed, identities assigned, campaigns merged,
campaigns unmerged.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any
from decnet.web.db.repository import BaseRepository
@dataclass
class CampaignClusterResult:
"""Side-effects produced by a single campaign-clusterer ``tick``.
Consumed by the worker shell to publish on the bus
(``campaign.formed`` / ``campaign.identity.assigned`` /
``campaign.merged`` / ``campaign.unmerged`` plus the cross-family
``identity.campaign.assigned``). DB writes are already committed
by the time this returns.
"""
campaigns_formed: list[dict[str, Any]] = field(default_factory=list)
"""``{"campaign_uuid": str, "identity_uuids": [str, ...]}``."""
identities_assigned: list[dict[str, Any]] = field(default_factory=list)
"""``{"campaign_uuid": str, "identity_uuid": str,
"prior_campaign_uuid": Optional[str]}``."""
campaigns_merged: list[dict[str, Any]] = field(default_factory=list)
"""``{"winner_uuid": str, "loser_uuid": str}``."""
campaigns_unmerged: list[dict[str, Any]] = field(default_factory=list)
"""``{"resurrected_uuid": str, "former_winner_uuid": str}``."""
class CampaignClusterer(ABC):
"""Abstract campaign clusterer.
Single-method contract mirroring :class:`Clusterer`: ``tick`` reads
identities from the repo, projects them to a campaign-level feature
shape, runs a clustering pass, commits ``campaigns`` rows + sets
``attacker_identities.campaign_id``, and returns a
:class:`CampaignClusterResult` summarising side-effects.
Implementations MUST NOT raise from ``tick``: a single bad pass
cannot be allowed to crash the worker.
"""
name: str
@abstractmethod
async def tick(self, repo: BaseRepository) -> CampaignClusterResult:
"""Run a single campaign clustering pass."""
__all__ = ["CampaignClusterer", "CampaignClusterResult"]