From e364ef885990d27cd8347869adc95b0af18bada1 Mon Sep 17 00:00:00 2001 From: anti Date: Sun, 26 Apr 2026 08:33:32 -0400 Subject: [PATCH] feat(clustering): revocable merges (merge + unmerge) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reworks the clusterer's tick to handle multi-identity components and re-evaluate prior merges. Two passes per tick: Pass 1 — per-component reconciliation: * Fresh component → mint identity (commit 4 path). * Single-identity component → link unassigned observations. * Multi-identity component → soft-merge: pick the smallest-uuid winner deterministically, set merged_into_uuid on each loser, link unassigned observations to the winner. Observations stay FK'd to their original identity row — the merge is a soft pointer, not a re-point. Audit trail preserved; cached subscribers resolve through the chain. Pass 2 — revocable-merge undo: * For each merged-out identity, check whether its observations still cluster with its winner's. If not, the merge is contradicted by new evidence — clear merged_into_uuid and emit identities_unmerged. The resurrected identity keeps its original uuid, so subscribers that cached it during the merged interval re-attach without a new lookup. A pre-built merge-chain dict feeds Pass 1 so the effective-identity lookup is O(1) per observation. The chain has a hop cap (paranoia against accidental cycles in the underlying state). Repo additions on BaseRepository + SQLModelRepository: * list_all_identities() — includes merged-out rows. * update_identity_merged_into(uuid, winner_or_None) — single setter for both merge and unmerge. DummyRepo coverage stub updated. Tests: * Two distinct identities bridged by a new observation merge with the smaller uuid as winner. * A pre-seeded soft-merge whose underlying observations diverge gets revoked; resurrected uuid emerges with merged_into_uuid cleared. * Tick is idempotent under no state changes. --- .../clustering/impl/connected_components.py | 197 +++++++++++++----- decnet/web/db/repository.py | 25 +++ decnet/web/db/sqlmodel_repo.py | 21 ++ tests/clustering/test_connected_components.py | 145 +++++++++++++ tests/db/test_base_repo.py | 5 + 5 files changed, 343 insertions(+), 50 deletions(-) diff --git a/decnet/clustering/impl/connected_components.py b/decnet/clustering/impl/connected_components.py index f40d1eaf..eb95fb63 100644 --- a/decnet/clustering/impl/connected_components.py +++ b/decnet/clustering/impl/connected_components.py @@ -18,12 +18,14 @@ handoff edges, and revocable merges. Edges MUST stay time-agnostic **v1 behavior:** -The clusterer only assigns identities to observations whose -``identity_id`` is currently NULL. Observations already linked to an -identity are read-only this pass (they still participate in graph -edges, so a new observation can join an existing identity, but the -clusterer never reassigns or merges existing identities). Reassignment -+ merging land in commit 10 alongside revocable merges. +The clusterer assigns identities to NULL observations, merges existing +identities when a single predicted component spans them, and revokes +prior merges when the predicted component splits a merged-out identity +away from its winner. Observations stay FK'd to their original identity +row throughout — merges are soft pointers via +``attacker_identities.merged_into_uuid``, never observation re-points. +That keeps the audit trail intact and lets cached subscribers resolve +merged-out UUIDs through the chain. """ from __future__ import annotations @@ -145,6 +147,18 @@ class ConnectedComponentsClusterer(Clusterer): if not rows: return ClusterResult() + # Build the merge chain so a row's "effective" identity follows + # merged_into_uuid up to the canonical winner. Pre-computing it + # lets us reason about post-merge identity membership in one + # place. ``identity_chain[u]`` is the canonical winner for + # identity ``u`` (or ``u`` itself if not merged out). + try: + all_identities = await repo.list_all_identities() + except Exception: # noqa: BLE001 + log.exception("clusterer: failed to read identities") + return ClusterResult() + identity_chain = _build_merge_chain(all_identities) + # Project + cluster. observations: list[Observation] = [] row_by_id: dict[str, dict[str, Any]] = {} @@ -154,7 +168,7 @@ class ConnectedComponentsClusterer(Clusterer): row_by_id[obs.observation_id] = r labels = cluster_observations(observations) - # Group by predicted cluster. + # Group observations by predicted cluster. components: dict[str, list[str]] = {} for obs_id, cluster_id in labels.items(): components.setdefault(cluster_id, []).append(obs_id) @@ -162,48 +176,19 @@ class ConnectedComponentsClusterer(Clusterer): result = ClusterResult() now = datetime.now(timezone.utc) + # Pass 1 — per-component reconciliation: form, link, merge. for member_ids in components.values(): - existing_identities = { + literal_ids = { row_by_id[m]["identity_id"] for m in member_ids if row_by_id[m].get("identity_id") } + effective_ids = {identity_chain.get(i, i) for i in literal_ids} unassigned = [ m for m in member_ids if not row_by_id[m].get("identity_id") ] - if len(existing_identities) > 1: - # Multi-identity component — merging lands in commit 10 - # (revocable merges). Skip for now; new observations in - # this component stay unassigned this pass and will get - # assigned once the merge logic exists. - log.debug( - "clusterer: skipping component with %d existing identities " - "(merge lands in commit 10)", len(existing_identities), - ) - continue - - if not unassigned: - # Component is entirely already-assigned; nothing to do. - continue - - if existing_identities: - # Single existing identity → link the unassigned members. - identity_uuid = next(iter(existing_identities)) - for obs_id in unassigned: - try: - await repo.set_attacker_identity_id(obs_id, identity_uuid) - except Exception: # noqa: BLE001 - log.exception( - "clusterer: failed to link obs=%s -> identity=%s", - obs_id, identity_uuid, - ) - continue - result.observations_linked.append({ - "identity_uuid": identity_uuid, - "observation_uuid": obs_id, - }) - else: + if not effective_ids: # Fresh component — mint a new identity. identity_uuid = str(_uuid.uuid4()) try: @@ -225,25 +210,137 @@ class ConnectedComponentsClusterer(Clusterer): linked: list[str] = [] for obs_id in member_ids: - try: - await repo.set_attacker_identity_id(obs_id, identity_uuid) - except Exception: # noqa: BLE001 - log.exception( - "clusterer: failed to link obs=%s -> identity=%s", - obs_id, identity_uuid, - ) - continue - linked.append(obs_id) - + if await _link(repo, obs_id, identity_uuid): + linked.append(obs_id) if linked: result.identities_formed.append({ "identity_uuid": identity_uuid, "observation_uuids": linked, }) + continue + + # Deterministic winner so two clusterer runs produce the + # same merge direction. Sorting by uuid string is stable + # and doesn't depend on row insertion order. + winner_uuid = min(effective_ids) + losers = effective_ids - {winner_uuid} + + for loser_uuid in losers: + try: + await repo.update_identity_merged_into(loser_uuid, winner_uuid) + except Exception: # noqa: BLE001 + log.exception( + "clusterer: failed to merge %s -> %s", + loser_uuid, winner_uuid, + ) + continue + identity_chain[loser_uuid] = winner_uuid + result.identities_merged.append({ + "winner_uuid": winner_uuid, + "loser_uuid": loser_uuid, + }) + + # Link any unassigned observations in the component to the + # winner so a subsequent tick sees a single-identity + # component and skips this branch entirely. + for obs_id in unassigned: + if await _link(repo, obs_id, winner_uuid): + result.observations_linked.append({ + "identity_uuid": winner_uuid, + "observation_uuid": obs_id, + }) + + # Pass 2 — revocable-merge undo. For each currently-merged-out + # identity, check whether its observations still cluster with + # the winner's. If not, the merge is contradicted by new + # evidence — clear merged_into_uuid and emit identity.unmerged. + # Observations FK'd to the resurrected loser stay where they + # were; the chain just stops following. + observations_by_literal_identity: dict[str, list[str]] = {} + for obs_id, r in row_by_id.items(): + iid = r.get("identity_id") + if iid: + observations_by_literal_identity.setdefault(iid, []).append(obs_id) + + for identity_row in all_identities: + if not identity_row.get("merged_into_uuid"): + continue + loser_uuid = identity_row["uuid"] + winner_uuid = identity_chain.get(loser_uuid, loser_uuid) + if winner_uuid == loser_uuid: + continue # broken chain — paranoia + loser_obs = observations_by_literal_identity.get(loser_uuid, []) + winner_obs = observations_by_literal_identity.get(winner_uuid, []) + if not loser_obs or not winner_obs: + # No observations either side — can't disprove the merge. + continue + loser_clusters = {labels[o] for o in loser_obs} + winner_clusters = {labels[o] for o in winner_obs} + if loser_clusters & winner_clusters: + continue # still co-clustered with winner — merge stands + try: + await repo.update_identity_merged_into(loser_uuid, None) + except Exception: # noqa: BLE001 + log.exception( + "clusterer: failed to unmerge %s from %s", + loser_uuid, winner_uuid, + ) + continue + identity_chain[loser_uuid] = loser_uuid + result.identities_unmerged.append({ + "resurrected_uuid": loser_uuid, + "former_winner_uuid": winner_uuid, + }) return result +def _build_merge_chain( + identities: list[dict[str, Any]], +) -> dict[str, str]: + """Build a uuid → canonical-winner map from a list of identity rows. + + Follows ``merged_into_uuid`` to a fixed point per identity, with a + hop cap to defend against accidental cycles. The returned dict + contains an entry for every identity uuid (mapping to itself if + not merged out). + """ + _MAX_HOPS = 8 + by_uuid: dict[str, dict[str, Any]] = {i["uuid"]: i for i in identities} + chain: dict[str, str] = {} + for uuid_ in by_uuid: + cur = uuid_ + for _ in range(_MAX_HOPS): + row = by_uuid.get(cur) + if row is None: + break + nxt = row.get("merged_into_uuid") + if not nxt or nxt == cur: + break + cur = nxt + chain[uuid_] = cur + return chain + + +async def _link( + repo: BaseRepository, observation_uuid: str, identity_uuid: str, +) -> bool: + """Set ``attackers.identity_id`` and return ``True`` on success. + + Wraps the repo call so the tick body stays linear and exception + handling is consistent across the form / link / merge branches. + """ + try: + await repo.set_attacker_identity_id(observation_uuid, identity_uuid) + return True + except Exception: # noqa: BLE001 + log.exception( + "clusterer: failed to link obs=%s -> identity=%s", + observation_uuid, identity_uuid, + ) + return False + + __all__ = [ "ConnectedComponentsClusterer", "cluster_observations", diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index 5fc98d20..79759c5d 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -449,6 +449,31 @@ class BaseRepository(ABC): """ pass + @abstractmethod + async def list_all_identities(self) -> list[dict[str, Any]]: + """Every ``AttackerIdentity`` row, including merged-out ones. + + Distinct from :meth:`list_identities`, which filters out + merged-out rows for the de-duped UI list. The clusterer's + revocable-merge pass needs to re-evaluate merged-out + identities, so it pulls the unfiltered set. + """ + pass + + @abstractmethod + async def update_identity_merged_into( + self, identity_uuid: str, winner_uuid: Optional[str], + ) -> None: + """Set or clear ``attacker_identities.merged_into_uuid``. + + Pass ``winner_uuid`` to soft-merge the row into another + identity; pass ``None`` to revoke a prior merge (the + revocable-merge undo path). Observations stay FK'd to their + original identity row throughout — the merge is a soft + pointer, not a re-point. + """ + pass + @abstractmethod async def get_attacker_commands( self, diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index 696df6e4..0bc394be 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -1514,6 +1514,27 @@ class SQLModelRepository(BaseRepository): await session.execute(statement) await session.commit() + async def list_all_identities(self) -> list[dict[str, Any]]: + statement = select(AttackerIdentity).order_by(AttackerIdentity.created_at) + async with self._session() as session: + result = await session.execute(statement) + return [i.model_dump(mode="json") for i in result.scalars().all()] + + async def update_identity_merged_into( + self, identity_uuid: str, winner_uuid: Optional[str], + ) -> None: + statement = ( + update(AttackerIdentity) + .where(AttackerIdentity.uuid == identity_uuid) + .values( + merged_into_uuid=winner_uuid, + updated_at=datetime.now(timezone.utc), + ) + ) + async with self._session() as session: + await session.execute(statement) + await session.commit() + async def get_attacker_commands( self, uuid: str, diff --git a/tests/clustering/test_connected_components.py b/tests/clustering/test_connected_components.py index 589cf3ac..f298e896 100644 --- a/tests/clustering/test_connected_components.py +++ b/tests/clustering/test_connected_components.py @@ -206,6 +206,151 @@ async def test_tick_keeps_distinct_ja3_separate(repo): assert len(formed["observation_uuids"]) == 1 +@pytest.mark.anyio +async def test_tick_merges_two_identities_when_component_spans_them(repo): + """Two pre-existing identities whose observations now cluster + together (e.g. a previously-missing fingerprint shows up) get + soft-merged: the smaller-uuid identity wins, the loser's + merged_into_uuid is set, observations stay FK'd to their + original identity row.""" + # Tick 1: two distinct fingerprints → two distinct identities. + a = await _seed_attacker(repo, "1.1.1.1", ja3="ja3-A") + b = await _seed_attacker(repo, "2.2.2.2", ja3="ja3-B") + + c = ConnectedComponentsClusterer() + first = await c.tick(repo) + assert len(first.identities_formed) == 2 + + # Snapshot the two identity uuids; we'll need them after the merge. + identities_after_first = await repo.list_all_identities() + assert len(identities_after_first) == 2 + uuids = sorted(i["uuid"] for i in identities_after_first) + expected_winner, expected_loser = uuids[0], uuids[1] + + # Tick 2: a bridging observation — fingerprints match BOTH prior + # rows. The bridge can't agree with both JA3s simultaneously, so + # use a HASSH that matches A and a payload that matches B. + # Simulate this with two new attackers, each linking a side. + # Simpler: change attacker A's stored fingerprint to also include + # ja3-B by re-seeding (in production this would be a fresh + # observation that bridges them). + bridge = await _seed_attacker(repo, "3.3.3.3", ja3="ja3-A", hassh="hassh-bridge") + # Make B's row carry the same hassh so the bridge can union them. + import json as _json + from datetime import datetime, timezone + now = datetime.now(timezone.utc) + await repo.upsert_attacker({ + "ip": "2.2.2.2", "first_seen": now, "last_seen": now, + "event_count": 1, + "fingerprints": _json.dumps([ + {"kind": "ja3", "hash": "ja3-B"}, + {"kind": "hassh", "hash": "hassh-bridge"}, + ]), + }) + + second = await c.tick(repo) + assert len(second.identities_merged) == 1 + merge = second.identities_merged[0] + assert merge["winner_uuid"] == expected_winner + assert merge["loser_uuid"] == expected_loser + + # The loser's row still exists with merged_into_uuid set. + all_after = {i["uuid"]: i for i in await repo.list_all_identities()} + assert all_after[expected_loser]["merged_into_uuid"] == expected_winner + assert all_after[expected_winner]["merged_into_uuid"] is None + + # Observations stay FK'd to their original identity row — the + # merge is a soft pointer, NOT a re-point. + a_row = await repo.get_attacker_by_uuid(a) + b_row = await repo.get_attacker_by_uuid(b) + assert a_row["identity_id"] in {expected_winner, expected_loser} + assert b_row["identity_id"] in {expected_winner, expected_loser} + + +@pytest.mark.anyio +async def test_tick_unmerges_when_observations_diverge(repo): + """Pre-seed a soft-merged pair, then change the underlying + observations so they no longer cluster. The tick must clear + merged_into_uuid and emit identities_unmerged.""" + import json as _json + from datetime import datetime, timezone + now = datetime.now(timezone.utc) + + # Two attackers with same JA3 → tick merges them via shared + # high-tier signal (one identity formed). + a = await _seed_attacker(repo, "1.1.1.1", ja3="ja3-shared") + b = await _seed_attacker(repo, "2.2.2.2", ja3="ja3-shared") + c = ConnectedComponentsClusterer() + first = await c.tick(repo) + assert len(first.identities_formed) == 1 + one_identity_uuid = first.identities_formed[0]["identity_uuid"] + + # Force a soft-merge state: split observation b out into its own + # identity, then merge that back into the first via the repo + # directly. This emulates a state the clusterer would have + # arrived at across multiple ticks (form, then merge). + second_uuid = "00000000-0000-0000-0000-00000000bbbb" + await repo.create_attacker_identity({ + "uuid": second_uuid, + "schema_version": 1, + "first_seen_at": now, "last_seen_at": now, + "created_at": now, "updated_at": now, + "observation_count": 1, + }) + await repo.set_attacker_identity_id(b, second_uuid) + # Soft-merge second_uuid into one_identity_uuid (winner). + winner = min(one_identity_uuid, second_uuid) + loser = max(one_identity_uuid, second_uuid) + if loser == one_identity_uuid: + # Make the canonical mapping consistent with the test setup — + # we need the merge to be "loser → winner" by min-uuid rule. + # Swap ownership so the smaller-uuid keeps the active observations. + await repo.set_attacker_identity_id(a, winner) + await repo.set_attacker_identity_id(b, loser) + await repo.update_identity_merged_into(loser, winner) + + # Verify the soft-merge is in place. + pre = {i["uuid"]: i for i in await repo.list_all_identities()} + assert pre[loser]["merged_into_uuid"] == winner + + # Now change the underlying fingerprints so a and b no longer cluster. + await repo.upsert_attacker({ + "ip": "2.2.2.2", "first_seen": now, "last_seen": now, + "event_count": 1, + "fingerprints": _json.dumps([{"kind": "ja3", "hash": "ja3-different"}]), + }) + + # Tick should detect the divergence and revoke the merge. + third = await c.tick(repo) + assert len(third.identities_unmerged) == 1 + unmerged = third.identities_unmerged[0] + assert unmerged["resurrected_uuid"] == loser + assert unmerged["former_winner_uuid"] == winner + + post = {i["uuid"]: i for i in await repo.list_all_identities()} + assert post[loser]["merged_into_uuid"] is None + assert post[winner]["merged_into_uuid"] is None + + +@pytest.mark.anyio +async def test_tick_is_idempotent_under_no_changes(repo): + """Running tick twice with no state changes between produces no + side-effects on the second run.""" + await _seed_attacker(repo, "1.1.1.1", ja3="ja3-x") + await _seed_attacker(repo, "2.2.2.2", ja3="ja3-x") + await _seed_attacker(repo, "3.3.3.3", ja3="ja3-y") + + c = ConnectedComponentsClusterer() + first = await c.tick(repo) + second = await c.tick(repo) + assert second.identities_formed == [] + assert second.observations_linked == [] + assert second.identities_merged == [] + assert second.identities_unmerged == [] + # Sanity: the first tick did do something. + assert first.identities_formed + + @pytest.mark.anyio async def test_tick_links_new_observation_to_existing_identity(repo): """First tick: 2 attackers cluster into one identity. Second tick: diff --git a/tests/db/test_base_repo.py b/tests/db/test_base_repo.py index a4742682..0d635d6e 100644 --- a/tests/db/test_base_repo.py +++ b/tests/db/test_base_repo.py @@ -69,6 +69,8 @@ class DummyRepo(BaseRepository): async def list_attackers_for_clustering(self, limit=None): await super().list_attackers_for_clustering(limit); return [] async def create_attacker_identity(self, row): await super().create_attacker_identity(row); return "" async def set_attacker_identity_id(self, a, i): await super().set_attacker_identity_id(a, i) + async def list_all_identities(self): await super().list_all_identities(); return [] + async def update_identity_merged_into(self, u, w): await super().update_identity_merged_into(u, w) @pytest.mark.asyncio async def test_base_repo_coverage(): @@ -139,6 +141,9 @@ async def test_base_repo_coverage(): await dr.list_attackers_for_clustering() await dr.create_attacker_identity({"uuid": "i"}) await dr.set_attacker_identity_id("a", "i") + await dr.list_all_identities() + await dr.update_identity_merged_into("a", "b") + await dr.update_identity_merged_into("a", None) # Swarm methods: default NotImplementedError on BaseRepository. Covering # them here keeps the coverage contract honest for the swarm CRUD surface.