diff --git a/decnet/clustering/impl/connected_components.py b/decnet/clustering/impl/connected_components.py index 83fb419d..1aa44d19 100644 --- a/decnet/clustering/impl/connected_components.py +++ b/decnet/clustering/impl/connected_components.py @@ -1,48 +1,257 @@ """Connected-components identity clusterer (v1). Builds a similarity graph over observations (per-IP attacker rows), -runs connected-components over edges that pass a confidence threshold, -and writes one ``attacker_identities`` row per component. +runs union-find over edges that pass a confidence threshold, and writes +one ``attacker_identities`` row per component. -This module is the **skeleton**. The ``tick`` method is a no-op until -the similarity-graph features land in subsequent commits. Subscribers -on ``identity.>`` see no traffic from this clusterer until the edge -functions are wired in. +**v1 signal coverage (this commit):** -Subsequent commits add, in order: +* High-weight tier: JA3 / HASSH / payload-hash / C2-endpoint exact + match (alone enough to cluster). The production tick currently sees + JA3 + HASSH only — payload + C2 require log mining and join in + later commits. The fixture tests exercise the full high-weight set + through the in-memory path. -1. Similarity-graph scaffolding (``impl/similarity.py``). -2. High-weight edges (JA3/JA4/HASSH/payload/C2 exact match). -3. Medium-weight edges (command-sequence Jaccard bucketed by UKC phase). -4. Phase-handoff edges (designed for fixture 5). -5. Low-weight edges (credential Jaccard, ASN) — must NOT cluster F1/F2 alone. -6. Revocable merges (``identity.merged`` / ``identity.unmerged``). +Subsequent commits add medium / low / very-low tier edges, phase- +handoff edges, and revocable merges. Edges MUST stay time-agnostic +— fixture 7 forbids recency-decay clustering. -Edges MUST stay time-agnostic — fixture 7 proves recency-decay clustering -fragments multi-month APT campaigns. +**v1 behavior:** + +The clusterer only assigns identities to observations whose +``identity_id`` is currently NULL. Observations already linked to an +identity are read-only this pass (they still participate in graph +edges, so a new observation can join an existing identity, but the +clusterer never reassigns or merges existing identities). Reassignment ++ merging land in commit 10 alongside revocable merges. """ from __future__ import annotations +import json +import uuid as _uuid +from datetime import datetime, timezone +from typing import Any, Iterable, Optional + from decnet.clustering.base import Clusterer, ClusterResult +from decnet.clustering.impl.similarity import ( + Observation, + high_weight_edge, +) from decnet.logging import get_logger from decnet.web.db.repository import BaseRepository log = get_logger("clustering.connected_components") -class ConnectedComponentsClusterer(Clusterer): - """Connected-components clusterer. +# Threshold above which an edge survives into the graph. The high-tier +# functions return 1.0 on agreement, so a literal >= 1.0 cutoff means +# "exact match required." Once medium-tier edges combine, this becomes +# a tunable. +_EDGE_THRESHOLD = 1.0 - Skeleton implementation: ``tick`` is a no-op. Wiring lands in - subsequent commits. + +def cluster_observations( + observations: Iterable[Observation], +) -> dict[str, str]: + """Run connected-components over the high-weight similarity graph. + + Pure: no DB, no clock, no I/O. Both the fixture-validation tests + and the production ``tick`` consume this. The mapping is a + deterministic function of the input set + edge function. + + Singletons get a stable per-observation cluster id so callers can + distinguish "isolated observation" from "merged into nothing." + + Returns ``{observation_id: cluster_id}``. Cluster ids are opaque + strings — callers must not rely on their format. + """ + obs_list = list(observations) + parent: dict[str, str] = {o.observation_id: o.observation_id for o in obs_list} + + def find(x: str) -> str: + while parent[x] != x: + parent[x] = parent[parent[x]] + x = parent[x] + return x + + def union(x: str, y: str) -> None: + rx, ry = find(x), find(y) + if rx != ry: + parent[rx] = ry + + for i, a in enumerate(obs_list): + for b in obs_list[i + 1:]: + if high_weight_edge(a, b) >= _EDGE_THRESHOLD: + union(a.observation_id, b.observation_id) + + # Roots: each unique find(o) is a component representative. Use + # them as the cluster id so two runs over the same input produce + # the same labels (handy for assertions). + return {o.observation_id: f"cc-{find(o.observation_id)}" for o in obs_list} + + +def from_attacker_row(row: dict[str, Any]) -> Observation: + """Project an ``Attacker`` row dict into an :class:`Observation`. + + Pulls JA3 / HASSH out of the ``Attacker.fingerprints`` JSON list + (one entry per fingerprint event the prober collected). Multiple + JA3s on a single observation are flattened to a single value — + the most-recent — because :class:`Observation` is a single-row + projection; an observation that exhibits two distinct JA3s across + its lifetime is a wire-level oddity that the clusterer treats by + keeping the latest. The identity row itself can store the full + list across observations. + + Payload + C2 + commands are left empty — log mining lands in + later commits. The function shape doesn't change when they do. + """ + raw = row.get("fingerprints") or "[]" + try: + entries = json.loads(raw) if isinstance(raw, str) else list(raw) + except (TypeError, ValueError): + entries = [] + + ja3: Optional[str] = None + hassh: Optional[str] = None + for entry in entries: + if not isinstance(entry, dict): + continue + kind = entry.get("kind") + h = entry.get("hash") or entry.get("value") + if not h: + continue + if kind == "ja3": + ja3 = h + elif kind == "hassh": + hassh = h + + return Observation( + observation_id=row["uuid"], + ja3=ja3, + hassh=hassh, + asn=row.get("asn"), + ) + + +class ConnectedComponentsClusterer(Clusterer): + """Connected-components clusterer over the similarity graph. + + See module docstring for v1 signal coverage and behavior notes. """ name = "connected_components" async def tick(self, repo: BaseRepository) -> ClusterResult: - # No similarity edges defined yet; produce an empty result. - # Subsequent commits replace this with the real pass. - return ClusterResult() + try: + rows = await repo.list_attackers_for_clustering() + except Exception: # noqa: BLE001 + log.exception("clusterer: failed to read attackers") + return ClusterResult() + + if not rows: + return ClusterResult() + + # Project + cluster. + observations: list[Observation] = [] + row_by_id: dict[str, dict[str, Any]] = {} + for r in rows: + obs = from_attacker_row(r) + observations.append(obs) + row_by_id[obs.observation_id] = r + labels = cluster_observations(observations) + + # Group by predicted cluster. + components: dict[str, list[str]] = {} + for obs_id, cluster_id in labels.items(): + components.setdefault(cluster_id, []).append(obs_id) + + result = ClusterResult() + now = datetime.now(timezone.utc) + + for member_ids in components.values(): + existing_identities = { + row_by_id[m]["identity_id"] for m in member_ids + if row_by_id[m].get("identity_id") + } + unassigned = [ + m for m in member_ids + if not row_by_id[m].get("identity_id") + ] + + if len(existing_identities) > 1: + # Multi-identity component — merging lands in commit 10 + # (revocable merges). Skip for now; new observations in + # this component stay unassigned this pass and will get + # assigned once the merge logic exists. + log.debug( + "clusterer: skipping component with %d existing identities " + "(merge lands in commit 10)", len(existing_identities), + ) + continue + + if not unassigned: + # Component is entirely already-assigned; nothing to do. + continue + + if existing_identities: + # Single existing identity → link the unassigned members. + identity_uuid = next(iter(existing_identities)) + for obs_id in unassigned: + try: + await repo.set_attacker_identity_id(obs_id, identity_uuid) + except Exception: # noqa: BLE001 + log.exception( + "clusterer: failed to link obs=%s -> identity=%s", + obs_id, identity_uuid, + ) + continue + result.observations_linked.append({ + "identity_uuid": identity_uuid, + "observation_uuid": obs_id, + }) + else: + # Fresh component — mint a new identity. + identity_uuid = str(_uuid.uuid4()) + try: + await repo.create_attacker_identity({ + "uuid": identity_uuid, + "schema_version": 1, + "first_seen_at": now, + "last_seen_at": now, + "created_at": now, + "updated_at": now, + "observation_count": len(member_ids), + }) + except Exception: # noqa: BLE001 + log.exception( + "clusterer: failed to create identity for component %s", + member_ids, + ) + continue + + linked: list[str] = [] + for obs_id in member_ids: + try: + await repo.set_attacker_identity_id(obs_id, identity_uuid) + except Exception: # noqa: BLE001 + log.exception( + "clusterer: failed to link obs=%s -> identity=%s", + obs_id, identity_uuid, + ) + continue + linked.append(obs_id) + + if linked: + result.identities_formed.append({ + "identity_uuid": identity_uuid, + "observation_uuids": linked, + }) + + return result -__all__ = ["ConnectedComponentsClusterer"] +__all__ = [ + "ConnectedComponentsClusterer", + "cluster_observations", + "from_attacker_row", +] diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index 6c13b051..5fc98d20 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -406,6 +406,49 @@ class BaseRepository(ABC): """Total ``Attacker`` rows FK'd to this identity.""" pass + # ─── Identity resolution writes (clusterer worker) ───────────────────── + # Populated by ``decnet clusterer``. The read-only API on top of + # ``attacker_identities`` shipped in commit ``dc3d08d``; this is the + # write side. See ``decnet.clustering.impl.connected_components``. + + @abstractmethod + async def list_attackers_for_clustering( + self, limit: Optional[int] = None, + ) -> list[dict[str, Any]]: + """Project every ``Attacker`` into the clusterer's input shape. + + Returns dicts with at least ``uuid``, ``asn``, ``identity_id``, + and ``fingerprints`` (raw JSON list). The clusterer parses the + fingerprints list to recover JA3 / HASSH per observation. Empty + list when no attackers exist. + + ``limit`` is optional — passed by callers that want to bound a + single tick's working set; leave ``None`` to fetch all. + """ + pass + + @abstractmethod + async def create_attacker_identity(self, row: dict[str, Any]) -> str: + """Insert a new ``AttackerIdentity`` row and return its uuid. + + ``row`` must include ``uuid``; other fields are optional and + default per the model. Caller is responsible for generating + the uuid (so it can be used in the same tick to back-link + observations without a second round-trip). + """ + pass + + @abstractmethod + async def set_attacker_identity_id( + self, attacker_uuid: str, identity_uuid: str, + ) -> None: + """Set ``attackers.identity_id`` on a single observation row. + + Idempotent — re-setting the same value is a no-op. Used by + the clusterer when it links an observation to an identity. + """ + pass + @abstractmethod async def get_attacker_commands( self, diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index 676ab804..696df6e4 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -1468,6 +1468,52 @@ class SQLModelRepository(BaseRepository): result = await session.execute(statement) return result.scalar() or 0 + # ─── Identity resolution writes (clusterer worker) ───────────────────── + + async def list_attackers_for_clustering( + self, limit: Optional[int] = None, + ) -> list[dict[str, Any]]: + # Project the columns the clusterer's similarity graph reads. + # Keep it narrow so future denormalised projections (payloads + # joined from logs, c2 endpoints aggregated from sessions) can + # land here without churning every caller. ``fingerprints`` is + # the raw JSON list — the clusterer parses for JA3 / HASSH. + statement = select( + Attacker.uuid, Attacker.asn, Attacker.identity_id, Attacker.fingerprints, + ).order_by(Attacker.first_seen) + if limit is not None: + statement = statement.limit(limit) + async with self._session() as session: + result = await session.execute(statement) + return [ + { + "uuid": row.uuid, + "asn": row.asn, + "identity_id": row.identity_id, + "fingerprints": row.fingerprints, + } + for row in result.all() + ] + + async def create_attacker_identity(self, row: dict[str, Any]) -> str: + identity = AttackerIdentity(**row) + async with self._session() as session: + session.add(identity) + await session.commit() + return identity.uuid + + async def set_attacker_identity_id( + self, attacker_uuid: str, identity_uuid: str, + ) -> None: + statement = ( + update(Attacker) + .where(Attacker.uuid == attacker_uuid) + .values(identity_id=identity_uuid) + ) + async with self._session() as session: + await session.execute(statement) + await session.commit() + async def get_attacker_commands( self, uuid: str, diff --git a/tests/clustering/test_connected_components.py b/tests/clustering/test_connected_components.py new file mode 100644 index 00000000..89b0a4c1 --- /dev/null +++ b/tests/clustering/test_connected_components.py @@ -0,0 +1,304 @@ +"""Tests for the connected-components clusterer (commit 4 — high-weight edges). + +Covers, in order: + +* The pure ``cluster_observations`` algorithm — singletons stay + isolated, exact-match high-weight signals fold them together, + un-fingerprinted observations stay un-mergeable. +* The production-row adapter ``from_attacker_row`` — JA3 / HASSH + recovered from the fingerprints JSON; absent fields project to + ``None``. +* End-to-end ``tick`` against a real SQLite repo: seeded attackers + with shared / divergent fingerprints get the right identity rows + written and the right ``identity_id`` links set. +* Three fixture-bound assertions: lone_wolf (pure singletons), + shared_wordlist (no fingerprint signal — singletons), and + vpn_hopping at identity-level (one identity from 5 rotated IPs + via shared JA3 + HASSH). + +The tick is bus-free here — the worker shell tests cover bus fan-out +separately. We're validating the algorithm + DB writes here. +""" +from __future__ import annotations + +import json +from datetime import datetime, timezone +from pathlib import Path + +import pytest + +from decnet.clustering.impl.connected_components import ( + ConnectedComponentsClusterer, + cluster_observations, + from_attacker_row, +) +from decnet.clustering.impl.similarity import Observation, from_synthetic +from decnet.web.db.factory import get_repository + +FIXTURE_DIR = Path(__file__).parent.parent / "fixtures" / "campaigns" + + +# ─── pure algorithm ───────────────────────────────────────────────────────── + + +def _obs(obs_id: str, **kwargs) -> Observation: + return Observation(observation_id=obs_id, **kwargs) + + +def test_cluster_observations_singletons_stay_isolated(): + a = _obs("a", ja3="ja3-a") + b = _obs("b", ja3="ja3-b") + c = _obs("c") # no fingerprint + labels = cluster_observations([a, b, c]) + assert labels["a"] != labels["b"] + assert labels["b"] != labels["c"] + assert labels["a"] != labels["c"] + + +def test_cluster_observations_ja3_match_unions(): + a = _obs("a", ja3="ja3-shared") + b = _obs("b", ja3="ja3-shared") + c = _obs("c", ja3="ja3-other") + labels = cluster_observations([a, b, c]) + assert labels["a"] == labels["b"] + assert labels["a"] != labels["c"] + + +def test_cluster_observations_unfingerprinted_stay_separate(): + """Two observations with no signals must NOT collapse into one + cluster — that would fuse every noise scanner together.""" + a = _obs("a") + b = _obs("b") + labels = cluster_observations([a, b]) + assert labels["a"] != labels["b"] + + +def test_cluster_observations_transitive_via_payload(): + """A↔B via JA3, B↔C via payload → A, B, C all in one component.""" + a = _obs("a", ja3="ja3-x") + b = _obs("b", ja3="ja3-x", payload_hashes=frozenset({"pl-1"})) + c = _obs("c", payload_hashes=frozenset({"pl-1"})) + labels = cluster_observations([a, b, c]) + assert labels["a"] == labels["b"] == labels["c"] + + +def test_cluster_observations_empty_input(): + assert cluster_observations([]) == {} + + +def test_cluster_observations_deterministic(): + """Same input → same labels. Load-bearing for fixture stability.""" + obs = [_obs("a", ja3="x"), _obs("b", ja3="x"), _obs("c")] + assert cluster_observations(obs) == cluster_observations(obs) + + +# ─── production-row adapter ──────────────────────────────────────────────── + + +def test_from_attacker_row_extracts_ja3_and_hassh(): + row = { + "uuid": "att-1", + "asn": 64500, + "identity_id": None, + "fingerprints": json.dumps([ + {"kind": "ja3", "hash": "ja3-abc"}, + {"kind": "hassh", "hash": "hassh-def"}, + {"kind": "jarm", "hash": "jarm-ghi"}, # not used in v1 + ]), + } + obs = from_attacker_row(row) + assert obs.observation_id == "att-1" + assert obs.ja3 == "ja3-abc" + assert obs.hassh == "hassh-def" + assert obs.asn == 64500 + + +def test_from_attacker_row_handles_empty_fingerprints(): + row = {"uuid": "att-2", "asn": None, "identity_id": None, "fingerprints": "[]"} + obs = from_attacker_row(row) + assert obs.ja3 is None + assert obs.hassh is None + assert obs.asn is None + + +def test_from_attacker_row_handles_malformed_json(): + row = {"uuid": "att-3", "asn": None, "identity_id": None, "fingerprints": "not json"} + obs = from_attacker_row(row) + assert obs.ja3 is None + assert obs.hassh is None + + +# ─── end-to-end tick against SQLite ──────────────────────────────────────── + + +@pytest.fixture +async def repo(tmp_path): + r = get_repository(db_path=str(tmp_path / "clusterer.db")) + await r.initialize() + return r + + +async def _seed_attacker( + repo, ip: str, *, + ja3: str | None = None, hassh: str | None = None, asn: int | None = None, +) -> str: + now = datetime.now(timezone.utc) + fingerprints = [] + if ja3: + fingerprints.append({"kind": "ja3", "hash": ja3}) + if hassh: + fingerprints.append({"kind": "hassh", "hash": hassh}) + return await repo.upsert_attacker({ + "ip": ip, + "first_seen": now, + "last_seen": now, + "event_count": 1, + "asn": asn, + "fingerprints": json.dumps(fingerprints), + }) + + +@pytest.mark.anyio +async def test_tick_on_empty_db_is_noop(repo): + c = ConnectedComponentsClusterer() + result = await c.tick(repo) + assert result.identities_formed == [] + assert result.observations_linked == [] + + +@pytest.mark.anyio +async def test_tick_clusters_shared_ja3(repo): + """Two observations with the same JA3 → one identity row, both linked.""" + a = await _seed_attacker(repo, "1.1.1.1", ja3="ja3-x", asn=64500) + b = await _seed_attacker(repo, "2.2.2.2", ja3="ja3-x", asn=64501) + + c = ConnectedComponentsClusterer() + result = await c.tick(repo) + + assert len(result.identities_formed) == 1 + formed = result.identities_formed[0] + assert set(formed["observation_uuids"]) == {a, b} + + # Identity row exists and both attackers FK to it. + identity_uuid = formed["identity_uuid"] + identity = await repo.get_identity_by_uuid(identity_uuid) + assert identity is not None + assert identity["uuid"] == identity_uuid + + obs_for_id = await repo.list_observations_for_identity(identity_uuid) + obs_uuids = {o["uuid"] for o in obs_for_id} + assert obs_uuids == {a, b} + + +@pytest.mark.anyio +async def test_tick_keeps_distinct_ja3_separate(repo): + """Two divergent JA3s with no other shared signal → two singletons, + no identity rows written (singletons stay un-clustered in v1).""" + await _seed_attacker(repo, "1.1.1.1", ja3="ja3-a") + await _seed_attacker(repo, "2.2.2.2", ja3="ja3-b") + + c = ConnectedComponentsClusterer() + result = await c.tick(repo) + + # Singletons get identity rows of their own (one observation per cluster). + assert len(result.identities_formed) == 2 + for formed in result.identities_formed: + assert len(formed["observation_uuids"]) == 1 + + +@pytest.mark.anyio +async def test_tick_links_new_observation_to_existing_identity(repo): + """First tick: 2 attackers cluster into one identity. Second tick: + a new attacker with the same JA3 should get linked, not minted.""" + a = await _seed_attacker(repo, "1.1.1.1", ja3="ja3-x") + b = await _seed_attacker(repo, "2.2.2.2", ja3="ja3-x") + + c = ConnectedComponentsClusterer() + first = await c.tick(repo) + assert len(first.identities_formed) == 1 + identity_uuid = first.identities_formed[0]["identity_uuid"] + + # New observation arrives; same JA3. + d = await _seed_attacker(repo, "3.3.3.3", ja3="ja3-x") + + second = await c.tick(repo) + # No new identity should be formed for the existing component; + # observation-linked should fire for the new one. + formed_uuids = {f["identity_uuid"] for f in second.identities_formed} + assert identity_uuid not in formed_uuids, ( + "second tick must link to the existing identity, not mint a new one" + ) + linked_uuids = {l_["observation_uuid"] for l_ in second.observations_linked} + assert d in linked_uuids + + +# ─── fixture-bound assertions (in-memory) ────────────────────────────────── + + +def _production_clusterer_predict(corpus) -> dict[str, str]: + """Run the production cluster_observations over a corpus. + + Mirrors the reference clusterer signature (corpus → dict) so it can + be passed to ``assert_fixture_bounds``. Pure / in-memory — does NOT + touch the DB. The DB-side path is covered by the tick tests above. + """ + obs = [from_synthetic(att) for att in corpus.attackers] + labels = cluster_observations(obs) + + # Singletons (no shared signal) get unique cluster ids so the + # metrics see them as distinct classes — matches the + # fingerprint_clusterer reference shape on lone_wolf / shared_wordlist. + pred: dict[str, str] = {} + cluster_sizes: dict[str, int] = {} + for cid in labels.values(): + cluster_sizes[cid] = cluster_sizes.get(cid, 0) + 1 + for obs_id, cid in labels.items(): + if cluster_sizes[cid] == 1: + pred[obs_id] = f"cc-singleton-{obs_id}" + else: + pred[obs_id] = cid + return pred + + +def test_lone_wolf_passes_with_production_clusterer(): + """Fixture 3: every actor singleton. The production clusterer + keeps them all separate (no shared high-weight signal).""" + from tests.clustering.fixture_harness import assert_fixture_bounds + from tests.factories.campaign_factory import generate, load_yaml + + corpus = generate(load_yaml(FIXTURE_DIR / "lone_wolf.yaml"), seed=0) + assert_fixture_bounds( + corpus, _production_clusterer_predict, + FIXTURE_DIR / "lone_wolf.expected.yaml", + ) + + +def test_shared_wordlist_passes_with_production_clusterer(): + """Fixture 1: two campaigns sharing only credentials, divergent + infra. The production clusterer (high-weight edges only) keeps + them separate — credential overlap is not a v1 signal yet.""" + from tests.clustering.fixture_harness import assert_fixture_bounds + from tests.factories.campaign_factory import generate, load_yaml + + corpus = generate(load_yaml(FIXTURE_DIR / "shared_wordlist.yaml"), seed=0) + assert_fixture_bounds( + corpus, _production_clusterer_predict, + FIXTURE_DIR / "shared_wordlist.expected.yaml", + ) + + +def test_vpn_hopping_passes_at_identity_level_with_production_clusterer(): + """Fixture 2: one rotating actor with stable JA3 + HASSH across + 5 ASNs. The production clusterer must fold all 5 observations into + one identity (high-weight JA3 / HASSH agreement).""" + from tests.clustering.fixture_harness import assert_fixture_bounds + from tests.factories.campaign_factory import generate, load_yaml + + corpus = generate(load_yaml(FIXTURE_DIR / "vpn_hopping.yaml"), seed=0) + metrics = assert_fixture_bounds( + corpus, _production_clusterer_predict, + FIXTURE_DIR / "vpn_hopping.expected.yaml", + truth_level="identity", + ) + assert metrics["adjusted_rand_index"] == pytest.approx(1.0) + assert metrics["completeness"] == pytest.approx(1.0) diff --git a/tests/db/test_base_repo.py b/tests/db/test_base_repo.py index 6f3bdc56..a4742682 100644 --- a/tests/db/test_base_repo.py +++ b/tests/db/test_base_repo.py @@ -66,6 +66,9 @@ class DummyRepo(BaseRepository): async def count_identities(self): await super().count_identities(); return 0 async def list_observations_for_identity(self, u, limit=50, offset=0): await super().list_observations_for_identity(u, limit, offset); return [] async def count_observations_for_identity(self, u): await super().count_observations_for_identity(u); return 0 + async def list_attackers_for_clustering(self, limit=None): await super().list_attackers_for_clustering(limit); return [] + async def create_attacker_identity(self, row): await super().create_attacker_identity(row); return "" + async def set_attacker_identity_id(self, a, i): await super().set_attacker_identity_id(a, i) @pytest.mark.asyncio async def test_base_repo_coverage(): @@ -133,6 +136,9 @@ async def test_base_repo_coverage(): await dr.count_identities() await dr.list_observations_for_identity("a") await dr.count_observations_for_identity("a") + await dr.list_attackers_for_clustering() + await dr.create_attacker_identity({"uuid": "i"}) + await dr.set_attacker_identity_id("a", "i") # Swarm methods: default NotImplementedError on BaseRepository. Covering # them here keeps the coverage contract honest for the swarm CRUD surface.