diff --git a/decnet/bus/topics.py b/decnet/bus/topics.py index 216b2229..4016d867 100644 --- a/decnet/bus/topics.py +++ b/decnet/bus/topics.py @@ -18,6 +18,11 @@ Token structure (NATS-style, dot-separated): identity.observation.linked identity.merged identity.unmerged + identity.campaign.assigned + campaign.formed + campaign.identity.assigned + campaign.merged + campaign.unmerged credential.captured credential.reuse.detected system.log @@ -38,6 +43,7 @@ TOPOLOGY = "topology" DECKY = "decky" ATTACKER = "attacker" IDENTITY = "identity" +CAMPAIGN = "campaign" SYSTEM = "system" CREDENTIAL = "credential" @@ -117,6 +123,33 @@ IDENTITY_FORMED = "formed" IDENTITY_OBSERVATION_LINKED = "observation.linked" IDENTITY_MERGED = "merged" IDENTITY_UNMERGED = "unmerged" +# Campaign-clusterer cross-family event — fires under ``identity.>`` so +# identity-stream subscribers (e.g. the IdentityDetail SSE client) get +# notified the moment an identity's ``campaign_id`` changes without +# having to subscribe to the campaign topic family. The same event +# fires under ``campaign.identity.assigned`` for campaign-side +# subscribers. +IDENTITY_CAMPAIGN_ASSIGNED = "campaign.assigned" + +# Campaign-clusterer event types (second/third tokens under +# ``campaign``). Mirror of the identity family at the layer above: +# campaigns group identities into operations, and the clusterer +# publishes the same form / link / merge / unmerge lifecycle. +# +# campaign.formed — clusterer creates a new campaign from +# one or more identities +# campaign.identity.assigned — identity attached to an existing +# campaign (or reassigned from another) +# campaign.merged — two campaigns collapsed; loser gets +# ``merged_into_uuid`` set, subscribers +# re-key cached references to the winner +# campaign.unmerged — revocable-merge undo: contradicting +# evidence cleared ``merged_into_uuid`` +# and re-split identities +CAMPAIGN_FORMED = "formed" +CAMPAIGN_IDENTITY_ASSIGNED = "identity.assigned" +CAMPAIGN_MERGED = "merged" +CAMPAIGN_UNMERGED = "unmerged" # Credential event types (second/third tokens under ``credential``). # ``credential.captured`` fires once per upserted Credential row — the @@ -221,6 +254,19 @@ def attacker(event_type: str) -> str: return f"{ATTACKER}.{event_type}" +def campaign(event_type: str) -> str: + """Build ``campaign.``. + + *event_type* is typically one of :data:`CAMPAIGN_FORMED`, + :data:`CAMPAIGN_IDENTITY_ASSIGNED`, :data:`CAMPAIGN_MERGED`, or + :data:`CAMPAIGN_UNMERGED`. Dotted leaves (``identity.assigned``) + are permitted — same rationale as :func:`system`. + """ + if not event_type: + raise ValueError("campaign topic requires a non-empty event_type") + return f"{CAMPAIGN}.{event_type}" + + def identity(event_type: str) -> str: """Build ``identity.``. diff --git a/decnet/cli/gating.py b/decnet/cli/gating.py index 201cec60..bcbafc48 100644 --- a/decnet/cli/gating.py +++ b/decnet/cli/gating.py @@ -29,7 +29,7 @@ MASTER_ONLY_COMMANDS: frozenset[str] = frozenset({ "api", "swarmctl", "deploy", "redeploy", "teardown", "mutate", "listener", "profiler", "services", "distros", "correlate", "archetypes", "web", - "db-reset", "init", "webhook", "clusterer", + "db-reset", "init", "webhook", "clusterer", "campaign-clusterer", }) MASTER_ONLY_GROUPS: frozenset[str] = frozenset({"swarm", "topology", "geoip"}) diff --git a/decnet/cli/workers.py b/decnet/cli/workers.py index 378e9f6f..4d4ae7db 100644 --- a/decnet/cli/workers.py +++ b/decnet/cli/workers.py @@ -239,3 +239,59 @@ def register(app: typer.Typer) -> None: asyncio.run(_run()) except KeyboardInterrupt: console.print("\n[yellow]Identity clusterer stopped.[/]") + + @app.command(name="campaign-clusterer") + def campaign_clusterer( + poll_interval_secs: float = typer.Option( + 60.0, "--poll-interval", "-i", + help="Slow-tick fallback when the bus is idle or unavailable (seconds)", + ), + daemon: bool = typer.Option( + False, "--daemon", "-d", + help="Detach to background as a daemon process", + ), + ) -> None: + """Campaign clusterer — groups identities into operations. + + Bus-woken on ``identity.>`` (any identity-layer change is + potential input); reads ``AttackerIdentity`` rows, runs + connected-components over the campaign-level similarity graph + (phase-handoff / shared-infra / temporal-overlap / cohort), + writes ``campaigns`` rows + sets ``attacker_identities.campaign_id``, + and publishes ``campaign.formed`` / ``campaign.identity.assigned`` + / ``campaign.merged`` / ``campaign.unmerged`` plus the cross-family + ``identity.campaign.assigned`` so identity-side subscribers see + the badge update. + """ + import asyncio + from decnet.cli.gating import _require_master_mode + from decnet.clustering.campaign.worker import ( + run_campaign_clusterer_loop, + ) + from decnet.web.dependencies import repo + + _require_master_mode("campaign-clusterer") + + if daemon: + log.info("campaign-clusterer daemonizing poll=%s", poll_interval_secs) + _utils._daemonize() + + log.info( + "campaign-clusterer command invoked poll=%s", poll_interval_secs, + ) + console.print( + f"[bold cyan]Campaign clusterer starting[/] " + f"poll={poll_interval_secs}s" + ) + console.print("[dim]Press Ctrl+C to stop[/]") + + async def _run() -> None: + await repo.initialize() + await run_campaign_clusterer_loop( + repo, poll_interval_secs=poll_interval_secs, + ) + + try: + asyncio.run(_run()) + except KeyboardInterrupt: + console.print("\n[yellow]Campaign clusterer stopped.[/]") diff --git a/decnet/clustering/campaign/base.py b/decnet/clustering/campaign/base.py new file mode 100644 index 00000000..30354070 --- /dev/null +++ b/decnet/clustering/campaign/base.py @@ -0,0 +1,66 @@ +"""Campaign clusterer protocol — layer above identity resolution. + +Mirrors :mod:`decnet.clustering.base` for the layer above. Each concrete +campaign clusterer implements :class:`CampaignClusterer`; callers obtain +the active instance via +:func:`decnet.clustering.campaign.factory.get_campaign_clusterer`. + +The result shape parallels :class:`ClusterResult` but speaks campaign +vocabulary: campaigns formed, identities assigned, campaigns merged, +campaigns unmerged. +""" +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from typing import Any + +from decnet.web.db.repository import BaseRepository + + +@dataclass +class CampaignClusterResult: + """Side-effects produced by a single campaign-clusterer ``tick``. + + Consumed by the worker shell to publish on the bus + (``campaign.formed`` / ``campaign.identity.assigned`` / + ``campaign.merged`` / ``campaign.unmerged`` plus the cross-family + ``identity.campaign.assigned``). DB writes are already committed + by the time this returns. + """ + + campaigns_formed: list[dict[str, Any]] = field(default_factory=list) + """``{"campaign_uuid": str, "identity_uuids": [str, ...]}``.""" + + identities_assigned: list[dict[str, Any]] = field(default_factory=list) + """``{"campaign_uuid": str, "identity_uuid": str, + "prior_campaign_uuid": Optional[str]}``.""" + + campaigns_merged: list[dict[str, Any]] = field(default_factory=list) + """``{"winner_uuid": str, "loser_uuid": str}``.""" + + campaigns_unmerged: list[dict[str, Any]] = field(default_factory=list) + """``{"resurrected_uuid": str, "former_winner_uuid": str}``.""" + + +class CampaignClusterer(ABC): + """Abstract campaign clusterer. + + Single-method contract mirroring :class:`Clusterer`: ``tick`` reads + identities from the repo, projects them to a campaign-level feature + shape, runs a clustering pass, commits ``campaigns`` rows + sets + ``attacker_identities.campaign_id``, and returns a + :class:`CampaignClusterResult` summarising side-effects. + + Implementations MUST NOT raise from ``tick``: a single bad pass + cannot be allowed to crash the worker. + """ + + name: str + + @abstractmethod + async def tick(self, repo: BaseRepository) -> CampaignClusterResult: + """Run a single campaign clustering pass.""" + + +__all__ = ["CampaignClusterer", "CampaignClusterResult"] diff --git a/decnet/clustering/campaign/factory.py b/decnet/clustering/campaign/factory.py new file mode 100644 index 00000000..d6f60105 --- /dev/null +++ b/decnet/clustering/campaign/factory.py @@ -0,0 +1,31 @@ +"""Campaign-clusterer factory. + +Mirrors :mod:`decnet.clustering.factory` for the campaign layer. +Configuration knob ``DECNET_CAMPAIGN_CLUSTERER_TYPE``; default +``"connected_components"``. +""" +from __future__ import annotations + +import os + +from decnet.clustering.campaign.base import CampaignClusterer + +_KNOWN: tuple[str, ...] = ("connected_components",) +_DEFAULT = "connected_components" + + +def get_campaign_clusterer() -> CampaignClusterer: + name = os.environ.get( + "DECNET_CAMPAIGN_CLUSTERER_TYPE", _DEFAULT, + ).strip().lower() + if name == "connected_components": + from decnet.clustering.campaign.impl.connected_components import ( + ConnectedComponentsCampaignClusterer, + ) + return ConnectedComponentsCampaignClusterer() + raise ValueError( + f"Unknown campaign clusterer: {name!r}. Known: {_KNOWN}" + ) + + +__all__ = ["get_campaign_clusterer"] diff --git a/decnet/clustering/campaign/impl/connected_components.py b/decnet/clustering/campaign/impl/connected_components.py new file mode 100644 index 00000000..b78b8e30 --- /dev/null +++ b/decnet/clustering/campaign/impl/connected_components.py @@ -0,0 +1,304 @@ +"""Connected-components campaign clusterer (v1). + +Builds a similarity graph over identities (the layer below — already +clustered from raw observations), runs union-find over edges that pass +:data:`CAMPAIGN_EDGE_THRESHOLD`, and writes one ``campaigns`` row per +component. + +Mirror of :mod:`decnet.clustering.impl.connected_components` for the +layer above. Same revocable-merge discipline: identities stay FK'd to +their original campaign row throughout, soft pointers via +``campaigns.merged_into_uuid``. + +**Time-agnostic.** Edges depend only on pairwise relative offsets — +fixture F7 (slow_burn) invariant carries forward to this layer. +""" +from __future__ import annotations + +import json +import uuid as _uuid +from datetime import datetime, timezone +from typing import Any, Iterable, Optional + +from decnet.clustering.campaign.base import ( + CampaignClusterer, + CampaignClusterResult, +) +from decnet.clustering.campaign.impl.similarity import ( + CAMPAIGN_EDGE_THRESHOLD, + IdentityFeatures, + combined_campaign_weight, +) +from decnet.logging import get_logger +from decnet.web.db.repository import BaseRepository + +log = get_logger("clustering.campaign.connected_components") + + +def cluster_identities( + features: Iterable[IdentityFeatures], +) -> dict[str, str]: + """Run connected-components over the campaign-level similarity graph. + + Pure: no DB, no clock, no I/O. Returns ``{identity_uuid: cluster_id}``. + Singletons get a stable per-identity cluster id; cluster ids are + opaque strings. + """ + feat_list = list(features) + parent: dict[str, str] = {f.identity_uuid: f.identity_uuid for f in feat_list} + + def find(x: str) -> str: + while parent[x] != x: + parent[x] = parent[parent[x]] + x = parent[x] + return x + + def union(x: str, y: str) -> None: + rx, ry = find(x), find(y) + if rx != ry: + parent[rx] = ry + + for i, a in enumerate(feat_list): + for b in feat_list[i + 1:]: + if combined_campaign_weight(a, b) >= CAMPAIGN_EDGE_THRESHOLD: + union(a.identity_uuid, b.identity_uuid) + + return {f.identity_uuid: f"cmp-{find(f.identity_uuid)}" for f in feat_list} + + +def from_identity_row(row: dict[str, Any]) -> IdentityFeatures: + """Project an ``AttackerIdentity`` projection row dict into an + :class:`IdentityFeatures`. + + ``row`` is the shape returned by + ``BaseRepository.list_identities_for_clustering``: uuid + + ja3_hashes / hassh_hashes / payload_simhashes / c2_endpoints + (JSON list[str] or null). + + Phase-handoff fields stay empty until the production-row adapter + learns to mine logs for per-decky phase sequences (TODO.md + "production-side payload + C2 + commands joins"). Without those, + the campaign clusterer falls back to shared-infra + temporal + overlap + cohort signals on production data; the fixture path + exercises the full feature set via :func:`from_synthetic_identity`. + """ + payload_hashes = _parse_json_list(row.get("payload_simhashes")) + c2_endpoints = _parse_json_list(row.get("c2_endpoints")) + + return IdentityFeatures( + identity_uuid=row["uuid"], + payload_hashes=frozenset(payload_hashes), + c2_endpoints=frozenset(c2_endpoints), + ) + + +def _parse_json_list(raw: Optional[str]) -> list[str]: + if not raw: + return [] + try: + decoded = json.loads(raw) + except (TypeError, ValueError): + return [] + if not isinstance(decoded, list): + return [] + return [str(x) for x in decoded if x is not None] + + +class ConnectedComponentsCampaignClusterer(CampaignClusterer): + """Connected-components campaign clusterer.""" + + name = "connected_components" + + async def tick(self, repo: BaseRepository) -> CampaignClusterResult: + try: + rows = await repo.list_identities_for_clustering() + except Exception: # noqa: BLE001 + log.exception("campaign clusterer: failed to read identities") + return CampaignClusterResult() + + if not rows: + return CampaignClusterResult() + + # Pre-compute the campaign merge chain so an identity's + # "effective" campaign follows merged_into_uuid up to the winner. + try: + all_campaigns = await repo.list_all_campaigns() + except Exception: # noqa: BLE001 + log.exception("campaign clusterer: failed to read campaigns") + return CampaignClusterResult() + campaign_chain = _build_merge_chain(all_campaigns) + + # Project + cluster. Skip identities that are themselves + # merged out — their winner is the active row and gets clustered + # on its own. This keeps the campaign graph from double-counting. + active_rows = [r for r in rows if not r.get("merged_into_uuid")] + feature_list: list[IdentityFeatures] = [ + from_identity_row(r) for r in active_rows + ] + row_by_uuid: dict[str, dict[str, Any]] = { + r["uuid"]: r for r in active_rows + } + labels = cluster_identities(feature_list) + + # Group identities by predicted cluster. + components: dict[str, list[str]] = {} + for identity_uuid, cluster_id in labels.items(): + components.setdefault(cluster_id, []).append(identity_uuid) + + result = CampaignClusterResult() + now = datetime.now(timezone.utc) + + # Pass 1 — per-component reconciliation: form, link, merge. + for member_ids in components.values(): + literal_campaign_ids = { + row_by_uuid[m]["campaign_id"] for m in member_ids + if row_by_uuid[m].get("campaign_id") + } + effective_ids = { + campaign_chain.get(c, c) for c in literal_campaign_ids + } + unassigned = [ + m for m in member_ids + if not row_by_uuid[m].get("campaign_id") + ] + + if not effective_ids: + campaign_uuid = str(_uuid.uuid4()) + try: + await repo.create_campaign({ + "uuid": campaign_uuid, + "schema_version": 1, + "first_seen_at": now, + "last_seen_at": now, + "created_at": now, + "updated_at": now, + "identity_count": len(member_ids), + }) + except Exception: # noqa: BLE001 + log.exception( + "campaign clusterer: failed to create campaign for " + "component %s", member_ids, + ) + continue + + linked: list[str] = [] + for identity_uuid in member_ids: + if await _link(repo, identity_uuid, campaign_uuid): + linked.append(identity_uuid) + if linked: + result.campaigns_formed.append({ + "campaign_uuid": campaign_uuid, + "identity_uuids": linked, + }) + continue + + winner_uuid = min(effective_ids) + losers = effective_ids - {winner_uuid} + + for loser_uuid in losers: + try: + await repo.update_campaign_merged_into( + loser_uuid, winner_uuid, + ) + except Exception: # noqa: BLE001 + log.exception( + "campaign clusterer: failed to merge %s -> %s", + loser_uuid, winner_uuid, + ) + continue + campaign_chain[loser_uuid] = winner_uuid + result.campaigns_merged.append({ + "winner_uuid": winner_uuid, + "loser_uuid": loser_uuid, + }) + + for identity_uuid in unassigned: + if await _link(repo, identity_uuid, winner_uuid): + result.identities_assigned.append({ + "campaign_uuid": winner_uuid, + "identity_uuid": identity_uuid, + "prior_campaign_uuid": None, + }) + + # Pass 2 — revocable-merge undo for campaigns. Same shape as + # the identity-side check: if a merged-out campaign's + # identities no longer cluster with the winner's, revoke. + identities_by_literal_campaign: dict[str, list[str]] = {} + for identity_uuid, r in row_by_uuid.items(): + cid = r.get("campaign_id") + if cid: + identities_by_literal_campaign.setdefault(cid, []).append( + identity_uuid, + ) + + for campaign_row in all_campaigns: + if not campaign_row.get("merged_into_uuid"): + continue + loser_uuid = campaign_row["uuid"] + winner_uuid = campaign_chain.get(loser_uuid, loser_uuid) + if winner_uuid == loser_uuid: + continue + loser_idents = identities_by_literal_campaign.get(loser_uuid, []) + winner_idents = identities_by_literal_campaign.get(winner_uuid, []) + if not loser_idents or not winner_idents: + continue + loser_clusters = {labels[i] for i in loser_idents if i in labels} + winner_clusters = {labels[i] for i in winner_idents if i in labels} + if loser_clusters & winner_clusters: + continue + try: + await repo.update_campaign_merged_into(loser_uuid, None) + except Exception: # noqa: BLE001 + log.exception( + "campaign clusterer: failed to unmerge %s from %s", + loser_uuid, winner_uuid, + ) + continue + campaign_chain[loser_uuid] = loser_uuid + result.campaigns_unmerged.append({ + "resurrected_uuid": loser_uuid, + "former_winner_uuid": winner_uuid, + }) + + return result + + +def _build_merge_chain( + rows: list[dict[str, Any]], +) -> dict[str, str]: + _MAX_HOPS = 8 + by_uuid: dict[str, dict[str, Any]] = {r["uuid"]: r for r in rows} + chain: dict[str, str] = {} + for uuid_ in by_uuid: + cur = uuid_ + for _ in range(_MAX_HOPS): + row = by_uuid.get(cur) + if row is None: + break + nxt = row.get("merged_into_uuid") + if not nxt or nxt == cur: + break + cur = nxt + chain[uuid_] = cur + return chain + + +async def _link( + repo: BaseRepository, identity_uuid: str, campaign_uuid: str, +) -> bool: + try: + await repo.set_identity_campaign_id(identity_uuid, campaign_uuid) + return True + except Exception: # noqa: BLE001 + log.exception( + "campaign clusterer: failed to link identity=%s -> campaign=%s", + identity_uuid, campaign_uuid, + ) + return False + + +__all__ = [ + "ConnectedComponentsCampaignClusterer", + "cluster_identities", + "from_identity_row", +] diff --git a/decnet/clustering/campaign/worker.py b/decnet/clustering/campaign/worker.py new file mode 100644 index 00000000..fe84cac0 --- /dev/null +++ b/decnet/clustering/campaign/worker.py @@ -0,0 +1,191 @@ +"""Long-running campaign-clusterer worker. + +Mirrors :mod:`decnet.clustering.worker` for the layer above. Bus-woken +on ``identity.>`` (not ``attacker.>`` — the campaign clusterer reads +identities, not raw observations); falls back to a 60s slow-tick poll +when the bus is unavailable. + +Publishes the four ``campaign.*`` events plus the cross-family +``identity.campaign.assigned`` so existing identity-stream subscribers +see campaign-id changes without subscribing to ``campaign.>``. +""" +from __future__ import annotations + +import asyncio +import contextlib +from typing import Optional + +from decnet.bus import topics as _topics +from decnet.bus.base import BaseBus +from decnet.bus.factory import get_bus +from decnet.bus.publish import ( + publish_safely, + run_control_listener_signal as _run_control_listener_signal, + run_health_heartbeat as _run_health_heartbeat, +) +from decnet.clustering.campaign.base import ( + CampaignClusterer, + CampaignClusterResult, +) +from decnet.clustering.campaign.factory import get_campaign_clusterer +from decnet.logging import get_logger +from decnet.web.db.repository import BaseRepository + +log = get_logger("clustering.campaign.worker") + +_DEFAULT_POLL_SECS = 60.0 +_WORKER_NAME = "campaign-clusterer" + + +async def run_campaign_clusterer_loop( + repo: BaseRepository, + *, + poll_interval_secs: float = _DEFAULT_POLL_SECS, + clusterer: Optional[CampaignClusterer] = None, + shutdown: Optional[asyncio.Event] = None, +) -> None: + """Run the campaign clusterer until cancelled.""" + if clusterer is None: + clusterer = get_campaign_clusterer() + log.info( + "campaign-clusterer started impl=%s poll_interval_secs=%s", + clusterer.name, poll_interval_secs, + ) + + bus: Optional[BaseBus] = None + wake = asyncio.Event() + wake_tasks: list[asyncio.Task] = [] + heartbeat_task: Optional[asyncio.Task] = None + try: + candidate = get_bus(client_name=_WORKER_NAME) + await candidate.connect() + bus = candidate + # Wake on any identity-layer event — formed / linked / merged / + # unmerged all change the input set the campaign clusterer + # operates over. + wake_tasks.append(asyncio.create_task( + _wake_on(bus, wake, f"{_topics.IDENTITY}.>"), + )) + heartbeat_task = asyncio.create_task( + _run_health_heartbeat(bus, _WORKER_NAME), + ) + wake_tasks.append(asyncio.create_task( + _run_control_listener_signal(bus, _WORKER_NAME), + )) + except Exception as exc: # noqa: BLE001 + log.warning( + "campaign-clusterer: bus unavailable, running in poll-only " + "mode: %s", exc, + ) + + if shutdown is None: + shutdown = asyncio.Event() + + try: + while not shutdown.is_set(): + try: + result = await clusterer.tick(repo) + except Exception: # noqa: BLE001 + log.exception("campaign-clusterer: tick failed") + result = CampaignClusterResult() + + await _publish_result(bus, result) + + try: + await asyncio.wait_for( + wake.wait(), timeout=float(poll_interval_secs), + ) + except asyncio.TimeoutError: + pass + wake.clear() + except (asyncio.CancelledError, KeyboardInterrupt): + log.info("campaign-clusterer stopped") + finally: + for t in wake_tasks: + t.cancel() + if heartbeat_task is not None: + heartbeat_task.cancel() + for t in (*wake_tasks, heartbeat_task): + if t is None: + continue + with contextlib.suppress(asyncio.CancelledError, Exception): + await t + if bus is not None: + with contextlib.suppress(Exception): + await bus.close() + + +async def _publish_result( + bus: Optional[BaseBus], result: CampaignClusterResult, +) -> None: + """Fan ``CampaignClusterResult`` out to ``campaign.*`` topics + + cross-family ``identity.campaign.assigned``.""" + for formed in result.campaigns_formed: + await publish_safely( + bus, + _topics.campaign(_topics.CAMPAIGN_FORMED), + formed, + event_type=_topics.CAMPAIGN_FORMED, + ) + # Also fire identity.campaign.assigned per identity so the + # existing identity SSE stream sees the badge update. + for identity_uuid in formed.get("identity_uuids", []): + await publish_safely( + bus, + _topics.identity(_topics.IDENTITY_CAMPAIGN_ASSIGNED), + { + "identity_uuid": identity_uuid, + "campaign_uuid": formed["campaign_uuid"], + "prior_campaign_uuid": None, + }, + event_type=_topics.IDENTITY_CAMPAIGN_ASSIGNED, + ) + for assigned in result.identities_assigned: + await publish_safely( + bus, + _topics.campaign(_topics.CAMPAIGN_IDENTITY_ASSIGNED), + assigned, + event_type=_topics.CAMPAIGN_IDENTITY_ASSIGNED, + ) + await publish_safely( + bus, + _topics.identity(_topics.IDENTITY_CAMPAIGN_ASSIGNED), + { + "identity_uuid": assigned["identity_uuid"], + "campaign_uuid": assigned["campaign_uuid"], + "prior_campaign_uuid": assigned.get("prior_campaign_uuid"), + }, + event_type=_topics.IDENTITY_CAMPAIGN_ASSIGNED, + ) + for merged in result.campaigns_merged: + await publish_safely( + bus, + _topics.campaign(_topics.CAMPAIGN_MERGED), + merged, + event_type=_topics.CAMPAIGN_MERGED, + ) + for unmerged in result.campaigns_unmerged: + await publish_safely( + bus, + _topics.campaign(_topics.CAMPAIGN_UNMERGED), + unmerged, + event_type=_topics.CAMPAIGN_UNMERGED, + ) + + +async def _wake_on(bus: BaseBus, wake: asyncio.Event, pattern: str) -> None: + try: + sub = bus.subscribe(pattern) + async with sub: + async for _event in sub: + wake.set() + except asyncio.CancelledError: + raise + except Exception as exc: # noqa: BLE001 + log.warning( + "campaign-clusterer: subscriber for %s died (%s); falling back " + "to poll", pattern, exc, + ) + + +__all__ = ["run_campaign_clusterer_loop"] diff --git a/tests/clustering/test_campaign_worker.py b/tests/clustering/test_campaign_worker.py new file mode 100644 index 00000000..cde333ff --- /dev/null +++ b/tests/clustering/test_campaign_worker.py @@ -0,0 +1,344 @@ +"""End-to-end tests for the campaign-clusterer worker shell + tick. + +Mirrors :mod:`tests.clustering.test_clusterer_worker` for the layer +above. Covers shell lifecycle (shutdown / cancel / raising tick), +end-to-end ``tick`` against SQLite (form, link, merge, revoke), bus +fan-out to the four ``campaign.*`` topics + cross-family +``identity.campaign.assigned``, factory dispatch, and CLI gating. +""" +from __future__ import annotations + +import asyncio +import json +from datetime import datetime, timezone + +import pytest + +from decnet.bus import topics as _topics +from decnet.clustering.campaign.base import ( + CampaignClusterer, + CampaignClusterResult, +) +from decnet.clustering.campaign.factory import get_campaign_clusterer +from decnet.clustering.campaign.impl.connected_components import ( + ConnectedComponentsCampaignClusterer, + cluster_identities, + from_identity_row, +) +from decnet.clustering.campaign.impl.similarity import IdentityFeatures +from decnet.clustering.campaign.worker import run_campaign_clusterer_loop +from decnet.web.db.factory import get_repository + + +@pytest.fixture +async def repo(tmp_path): + r = get_repository(db_path=str(tmp_path / "campaign.db")) + await r.initialize() + return r + + +@pytest.fixture(autouse=True) +def _no_bus(monkeypatch): + """Run workers in poll-only mode — no real Unix socket.""" + monkeypatch.setenv("DECNET_BUS_ENABLED", "false") + + +# ─── Test doubles ─────────────────────────────────────────────────────────── + + +class _FakeClusterer(CampaignClusterer): + name = "fake" + + def __init__(self, results=None) -> None: + self._results = list(results or []) + self.calls = 0 + + async def tick(self, repo) -> CampaignClusterResult: + self.calls += 1 + if self._results: + return self._results.pop(0) + return CampaignClusterResult() + + +class _RaisingClusterer(CampaignClusterer): + name = "raising" + + def __init__(self) -> None: + self.calls = 0 + + async def tick(self, repo) -> CampaignClusterResult: + self.calls += 1 + raise RuntimeError("boom") + + +# ─── Shell lifecycle ──────────────────────────────────────────────────────── + + +@pytest.mark.anyio +async def test_loop_exits_on_shutdown(repo): + shutdown = asyncio.Event() + clusterer = _FakeClusterer() + task = asyncio.create_task( + run_campaign_clusterer_loop( + repo, poll_interval_secs=0.05, + clusterer=clusterer, shutdown=shutdown, + ) + ) + await asyncio.sleep(0.12) + shutdown.set() + await asyncio.wait_for(task, timeout=2.0) + assert clusterer.calls >= 1 + + +@pytest.mark.anyio +async def test_loop_exits_on_cancel(repo): + clusterer = _FakeClusterer() + task = asyncio.create_task( + run_campaign_clusterer_loop( + repo, poll_interval_secs=0.05, clusterer=clusterer, + ) + ) + await asyncio.sleep(0.1) + task.cancel() + await asyncio.wait_for(task, timeout=2.0) + assert clusterer.calls >= 1 + + +@pytest.mark.anyio +async def test_tick_failure_does_not_crash_loop(repo): + shutdown = asyncio.Event() + clusterer = _RaisingClusterer() + task = asyncio.create_task( + run_campaign_clusterer_loop( + repo, poll_interval_secs=0.05, + clusterer=clusterer, shutdown=shutdown, + ) + ) + await asyncio.sleep(0.2) + shutdown.set() + await asyncio.wait_for(task, timeout=2.0) + assert clusterer.calls >= 2 + + +# ─── Bus fan-out ──────────────────────────────────────────────────────────── + + +@pytest.mark.anyio +async def test_publishes_campaign_result_on_bus(monkeypatch, repo): + published: list[tuple[str, dict, str]] = [] + + async def _fake_publish(bus, topic, payload, event_type=""): + published.append((topic, payload, event_type)) + + monkeypatch.setattr( + "decnet.clustering.campaign.worker.publish_safely", _fake_publish, + ) + + result = CampaignClusterResult( + campaigns_formed=[ + {"campaign_uuid": "c-1", "identity_uuids": ["i-1", "i-2"]}, + ], + identities_assigned=[ + {"campaign_uuid": "c-1", "identity_uuid": "i-3", + "prior_campaign_uuid": None}, + ], + campaigns_merged=[ + {"winner_uuid": "c-1", "loser_uuid": "c-2"}, + ], + campaigns_unmerged=[ + {"resurrected_uuid": "c-2", "former_winner_uuid": "c-1"}, + ], + ) + clusterer = _FakeClusterer(results=[result]) + + shutdown = asyncio.Event() + task = asyncio.create_task( + run_campaign_clusterer_loop( + repo, poll_interval_secs=0.05, + clusterer=clusterer, shutdown=shutdown, + ) + ) + await asyncio.sleep(0.1) + shutdown.set() + await asyncio.wait_for(task, timeout=2.0) + + topics_seen = {t for t, _, _ in published} + assert _topics.campaign(_topics.CAMPAIGN_FORMED) in topics_seen + assert _topics.campaign(_topics.CAMPAIGN_IDENTITY_ASSIGNED) in topics_seen + assert _topics.campaign(_topics.CAMPAIGN_MERGED) in topics_seen + assert _topics.campaign(_topics.CAMPAIGN_UNMERGED) in topics_seen + # Cross-family signal — every campaigns_formed identity AND every + # identities_assigned identity should fire identity.campaign.assigned. + cross = _topics.identity(_topics.IDENTITY_CAMPAIGN_ASSIGNED) + cross_payloads = [p for t, p, _ in published if t == cross] + cross_idents = {p["identity_uuid"] for p in cross_payloads} + assert {"i-1", "i-2", "i-3"}.issubset(cross_idents) + + +# ─── Pure clusterer + projection ──────────────────────────────────────────── + + +def test_cluster_identities_singletons(): + a = IdentityFeatures(identity_uuid="a") + b = IdentityFeatures(identity_uuid="b") + labels = cluster_identities([a, b]) + assert labels["a"] != labels["b"] + + +def test_cluster_identities_phase_handoff_unions(): + a = IdentityFeatures( + identity_uuid="a", + last_phase_per_decky={"d1": "command_and_control"}, + last_seen_per_decky={"d1": 1000.0}, + ) + b = IdentityFeatures( + identity_uuid="b", + first_phase_per_decky={"d1": "discovery"}, + first_seen_per_decky={"d1": 1100.0}, + ) + labels = cluster_identities([a, b]) + assert labels["a"] == labels["b"] + + +def test_from_identity_row_parses_json_lists(): + feat = from_identity_row({ + "uuid": "i-1", + "payload_simhashes": json.dumps(["h1", "h2"]), + "c2_endpoints": json.dumps(["c1"]), + }) + assert feat.identity_uuid == "i-1" + assert feat.payload_hashes == frozenset({"h1", "h2"}) + assert feat.c2_endpoints == frozenset({"c1"}) + + +def test_from_identity_row_handles_null_and_garbage(): + f = from_identity_row({ + "uuid": "i-1", + "payload_simhashes": None, + "c2_endpoints": "not-json", + }) + assert f.payload_hashes == frozenset() + assert f.c2_endpoints == frozenset() + + +# ─── End-to-end tick against SQLite ──────────────────────────────────────── + + +async def _create_identity(repo, uuid: str, **kwargs) -> str: + now = datetime.now(timezone.utc) + return await repo.create_attacker_identity({ + "uuid": uuid, + "first_seen_at": now, + "last_seen_at": now, + "payload_simhashes": kwargs.get("payload_simhashes"), + "c2_endpoints": kwargs.get("c2_endpoints"), + }) + + +@pytest.mark.anyio +async def test_tick_empty_db_returns_empty_result(repo): + c = ConnectedComponentsCampaignClusterer() + result = await c.tick(repo) + assert result.campaigns_formed == [] + assert result.identities_assigned == [] + assert result.campaigns_merged == [] + assert result.campaigns_unmerged == [] + + +@pytest.mark.anyio +async def test_tick_forms_campaign_for_shared_infra_co_op(repo): + # Two identities, full shared-infra (payload + c2). Below threshold + # at identity level (and identity-side veto would block them) but at + # campaign level shared-infra alone is 0.7; need temporal overlap to + # cross. Add overlap via session windows... but the production-row + # adapter doesn't yet populate session_windows. So instead use a + # full payload+c2 overlap which gives Jaccard=1.0 → 0.7. Below + # threshold. The realistic production scenario for crossing is + # phase-handoff which the production-row adapter also doesn't yet + # populate. So with the v1 production-row adapter the campaign + # clusterer's effective behavior is "every identity is its own + # campaign" — exactly the F3 lone_wolf pass. Verify that here. + await _create_identity( + repo, "i1", + payload_simhashes=json.dumps(["h1"]), + c2_endpoints=json.dumps(["c1"]), + ) + await _create_identity( + repo, "i2", + payload_simhashes=json.dumps(["h1"]), + c2_endpoints=json.dumps(["c1"]), + ) + + c = ConnectedComponentsCampaignClusterer() + result = await c.tick(repo) + + # No phase-handoff or temporal overlap available from the + # production-row adapter — both stay singletons. + assert len(result.campaigns_formed) == 2 + formed_idents = { + i for entry in result.campaigns_formed for i in entry["identity_uuids"] + } + assert formed_idents == {"i1", "i2"} + + +@pytest.mark.anyio +async def test_tick_idempotent_links_existing_identity(repo): + """Second tick on same input doesn't double-create campaigns.""" + await _create_identity(repo, "i1") + c = ConnectedComponentsCampaignClusterer() + + r1 = await c.tick(repo) + assert len(r1.campaigns_formed) == 1 + campaign_uuid = r1.campaigns_formed[0]["campaign_uuid"] + + r2 = await c.tick(repo) + # Identity already linked — no new campaign, no new assignment. + assert r2.campaigns_formed == [] + assert r2.identities_assigned == [] + # And the existing assignment persisted. + assert await repo.count_identities_for_campaign(campaign_uuid) == 1 + + +@pytest.mark.anyio +async def test_tick_skips_merged_out_identities(repo): + """Merged-out identity rows must not show up as cluster inputs.""" + await _create_identity(repo, "i1") + await _create_identity(repo, "i2") + # Soft-merge i2 into i1 at the identity layer. + await repo.update_identity_merged_into("i2", "i1") + + c = ConnectedComponentsCampaignClusterer() + result = await c.tick(repo) + + # Only i1 is an active row; one campaign formed, with one identity. + assert len(result.campaigns_formed) == 1 + assert result.campaigns_formed[0]["identity_uuids"] == ["i1"] + + +# ─── Factory + CLI gating ──────────────────────────────────────────────────── + + +def test_factory_default(): + c = get_campaign_clusterer() + assert isinstance(c, ConnectedComponentsCampaignClusterer) + + +def test_factory_unknown_raises(monkeypatch): + monkeypatch.setenv("DECNET_CAMPAIGN_CLUSTERER_TYPE", "nope") + with pytest.raises(ValueError): + get_campaign_clusterer() + + +def test_campaign_clusterer_registered_in_cli(): + from decnet.cli.gating import MASTER_ONLY_COMMANDS + assert "campaign-clusterer" in MASTER_ONLY_COMMANDS + + +def test_campaign_topic_builder_round_trips(): + assert _topics.campaign(_topics.CAMPAIGN_FORMED) == "campaign.formed" + assert _topics.campaign(_topics.CAMPAIGN_IDENTITY_ASSIGNED) == ( + "campaign.identity.assigned" + ) + assert _topics.identity(_topics.IDENTITY_CAMPAIGN_ASSIGNED) == ( + "identity.campaign.assigned" + )