feat(clustering): revocable merges (merge + unmerge)
Reworks the clusterer's tick to handle multi-identity components and
re-evaluate prior merges. Two passes per tick:
Pass 1 — per-component reconciliation:
* Fresh component → mint identity (commit 4 path).
* Single-identity component → link unassigned observations.
* Multi-identity component → soft-merge: pick the smallest-uuid
winner deterministically, set merged_into_uuid on each loser,
link unassigned observations to the winner. Observations stay
FK'd to their original identity row — the merge is a soft
pointer, not a re-point. Audit trail preserved; cached
subscribers resolve through the chain.
Pass 2 — revocable-merge undo:
* For each merged-out identity, check whether its observations
still cluster with its winner's. If not, the merge is
contradicted by new evidence — clear merged_into_uuid and emit
identities_unmerged. The resurrected identity keeps its original
uuid, so subscribers that cached it during the merged interval
re-attach without a new lookup.
A pre-built merge-chain dict feeds Pass 1 so the effective-identity
lookup is O(1) per observation. The chain has a hop cap (paranoia
against accidental cycles in the underlying state).
Repo additions on BaseRepository + SQLModelRepository:
* list_all_identities() — includes merged-out rows.
* update_identity_merged_into(uuid, winner_or_None) — single
setter for both merge and unmerge.
DummyRepo coverage stub updated.
Tests:
* Two distinct identities bridged by a new observation merge with
the smaller uuid as winner.
* A pre-seeded soft-merge whose underlying observations diverge
gets revoked; resurrected uuid emerges with merged_into_uuid
cleared.
* Tick is idempotent under no state changes.
This commit is contained in:
@@ -18,12 +18,14 @@ handoff edges, and revocable merges. Edges MUST stay time-agnostic
|
||||
|
||||
**v1 behavior:**
|
||||
|
||||
The clusterer only assigns identities to observations whose
|
||||
``identity_id`` is currently NULL. Observations already linked to an
|
||||
identity are read-only this pass (they still participate in graph
|
||||
edges, so a new observation can join an existing identity, but the
|
||||
clusterer never reassigns or merges existing identities). Reassignment
|
||||
+ merging land in commit 10 alongside revocable merges.
|
||||
The clusterer assigns identities to NULL observations, merges existing
|
||||
identities when a single predicted component spans them, and revokes
|
||||
prior merges when the predicted component splits a merged-out identity
|
||||
away from its winner. Observations stay FK'd to their original identity
|
||||
row throughout — merges are soft pointers via
|
||||
``attacker_identities.merged_into_uuid``, never observation re-points.
|
||||
That keeps the audit trail intact and lets cached subscribers resolve
|
||||
merged-out UUIDs through the chain.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
@@ -145,6 +147,18 @@ class ConnectedComponentsClusterer(Clusterer):
|
||||
if not rows:
|
||||
return ClusterResult()
|
||||
|
||||
# Build the merge chain so a row's "effective" identity follows
|
||||
# merged_into_uuid up to the canonical winner. Pre-computing it
|
||||
# lets us reason about post-merge identity membership in one
|
||||
# place. ``identity_chain[u]`` is the canonical winner for
|
||||
# identity ``u`` (or ``u`` itself if not merged out).
|
||||
try:
|
||||
all_identities = await repo.list_all_identities()
|
||||
except Exception: # noqa: BLE001
|
||||
log.exception("clusterer: failed to read identities")
|
||||
return ClusterResult()
|
||||
identity_chain = _build_merge_chain(all_identities)
|
||||
|
||||
# Project + cluster.
|
||||
observations: list[Observation] = []
|
||||
row_by_id: dict[str, dict[str, Any]] = {}
|
||||
@@ -154,7 +168,7 @@ class ConnectedComponentsClusterer(Clusterer):
|
||||
row_by_id[obs.observation_id] = r
|
||||
labels = cluster_observations(observations)
|
||||
|
||||
# Group by predicted cluster.
|
||||
# Group observations by predicted cluster.
|
||||
components: dict[str, list[str]] = {}
|
||||
for obs_id, cluster_id in labels.items():
|
||||
components.setdefault(cluster_id, []).append(obs_id)
|
||||
@@ -162,48 +176,19 @@ class ConnectedComponentsClusterer(Clusterer):
|
||||
result = ClusterResult()
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
# Pass 1 — per-component reconciliation: form, link, merge.
|
||||
for member_ids in components.values():
|
||||
existing_identities = {
|
||||
literal_ids = {
|
||||
row_by_id[m]["identity_id"] for m in member_ids
|
||||
if row_by_id[m].get("identity_id")
|
||||
}
|
||||
effective_ids = {identity_chain.get(i, i) for i in literal_ids}
|
||||
unassigned = [
|
||||
m for m in member_ids
|
||||
if not row_by_id[m].get("identity_id")
|
||||
]
|
||||
|
||||
if len(existing_identities) > 1:
|
||||
# Multi-identity component — merging lands in commit 10
|
||||
# (revocable merges). Skip for now; new observations in
|
||||
# this component stay unassigned this pass and will get
|
||||
# assigned once the merge logic exists.
|
||||
log.debug(
|
||||
"clusterer: skipping component with %d existing identities "
|
||||
"(merge lands in commit 10)", len(existing_identities),
|
||||
)
|
||||
continue
|
||||
|
||||
if not unassigned:
|
||||
# Component is entirely already-assigned; nothing to do.
|
||||
continue
|
||||
|
||||
if existing_identities:
|
||||
# Single existing identity → link the unassigned members.
|
||||
identity_uuid = next(iter(existing_identities))
|
||||
for obs_id in unassigned:
|
||||
try:
|
||||
await repo.set_attacker_identity_id(obs_id, identity_uuid)
|
||||
except Exception: # noqa: BLE001
|
||||
log.exception(
|
||||
"clusterer: failed to link obs=%s -> identity=%s",
|
||||
obs_id, identity_uuid,
|
||||
)
|
||||
continue
|
||||
result.observations_linked.append({
|
||||
"identity_uuid": identity_uuid,
|
||||
"observation_uuid": obs_id,
|
||||
})
|
||||
else:
|
||||
if not effective_ids:
|
||||
# Fresh component — mint a new identity.
|
||||
identity_uuid = str(_uuid.uuid4())
|
||||
try:
|
||||
@@ -225,25 +210,137 @@ class ConnectedComponentsClusterer(Clusterer):
|
||||
|
||||
linked: list[str] = []
|
||||
for obs_id in member_ids:
|
||||
try:
|
||||
await repo.set_attacker_identity_id(obs_id, identity_uuid)
|
||||
except Exception: # noqa: BLE001
|
||||
log.exception(
|
||||
"clusterer: failed to link obs=%s -> identity=%s",
|
||||
obs_id, identity_uuid,
|
||||
)
|
||||
continue
|
||||
linked.append(obs_id)
|
||||
|
||||
if await _link(repo, obs_id, identity_uuid):
|
||||
linked.append(obs_id)
|
||||
if linked:
|
||||
result.identities_formed.append({
|
||||
"identity_uuid": identity_uuid,
|
||||
"observation_uuids": linked,
|
||||
})
|
||||
continue
|
||||
|
||||
# Deterministic winner so two clusterer runs produce the
|
||||
# same merge direction. Sorting by uuid string is stable
|
||||
# and doesn't depend on row insertion order.
|
||||
winner_uuid = min(effective_ids)
|
||||
losers = effective_ids - {winner_uuid}
|
||||
|
||||
for loser_uuid in losers:
|
||||
try:
|
||||
await repo.update_identity_merged_into(loser_uuid, winner_uuid)
|
||||
except Exception: # noqa: BLE001
|
||||
log.exception(
|
||||
"clusterer: failed to merge %s -> %s",
|
||||
loser_uuid, winner_uuid,
|
||||
)
|
||||
continue
|
||||
identity_chain[loser_uuid] = winner_uuid
|
||||
result.identities_merged.append({
|
||||
"winner_uuid": winner_uuid,
|
||||
"loser_uuid": loser_uuid,
|
||||
})
|
||||
|
||||
# Link any unassigned observations in the component to the
|
||||
# winner so a subsequent tick sees a single-identity
|
||||
# component and skips this branch entirely.
|
||||
for obs_id in unassigned:
|
||||
if await _link(repo, obs_id, winner_uuid):
|
||||
result.observations_linked.append({
|
||||
"identity_uuid": winner_uuid,
|
||||
"observation_uuid": obs_id,
|
||||
})
|
||||
|
||||
# Pass 2 — revocable-merge undo. For each currently-merged-out
|
||||
# identity, check whether its observations still cluster with
|
||||
# the winner's. If not, the merge is contradicted by new
|
||||
# evidence — clear merged_into_uuid and emit identity.unmerged.
|
||||
# Observations FK'd to the resurrected loser stay where they
|
||||
# were; the chain just stops following.
|
||||
observations_by_literal_identity: dict[str, list[str]] = {}
|
||||
for obs_id, r in row_by_id.items():
|
||||
iid = r.get("identity_id")
|
||||
if iid:
|
||||
observations_by_literal_identity.setdefault(iid, []).append(obs_id)
|
||||
|
||||
for identity_row in all_identities:
|
||||
if not identity_row.get("merged_into_uuid"):
|
||||
continue
|
||||
loser_uuid = identity_row["uuid"]
|
||||
winner_uuid = identity_chain.get(loser_uuid, loser_uuid)
|
||||
if winner_uuid == loser_uuid:
|
||||
continue # broken chain — paranoia
|
||||
loser_obs = observations_by_literal_identity.get(loser_uuid, [])
|
||||
winner_obs = observations_by_literal_identity.get(winner_uuid, [])
|
||||
if not loser_obs or not winner_obs:
|
||||
# No observations either side — can't disprove the merge.
|
||||
continue
|
||||
loser_clusters = {labels[o] for o in loser_obs}
|
||||
winner_clusters = {labels[o] for o in winner_obs}
|
||||
if loser_clusters & winner_clusters:
|
||||
continue # still co-clustered with winner — merge stands
|
||||
try:
|
||||
await repo.update_identity_merged_into(loser_uuid, None)
|
||||
except Exception: # noqa: BLE001
|
||||
log.exception(
|
||||
"clusterer: failed to unmerge %s from %s",
|
||||
loser_uuid, winner_uuid,
|
||||
)
|
||||
continue
|
||||
identity_chain[loser_uuid] = loser_uuid
|
||||
result.identities_unmerged.append({
|
||||
"resurrected_uuid": loser_uuid,
|
||||
"former_winner_uuid": winner_uuid,
|
||||
})
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def _build_merge_chain(
|
||||
identities: list[dict[str, Any]],
|
||||
) -> dict[str, str]:
|
||||
"""Build a uuid → canonical-winner map from a list of identity rows.
|
||||
|
||||
Follows ``merged_into_uuid`` to a fixed point per identity, with a
|
||||
hop cap to defend against accidental cycles. The returned dict
|
||||
contains an entry for every identity uuid (mapping to itself if
|
||||
not merged out).
|
||||
"""
|
||||
_MAX_HOPS = 8
|
||||
by_uuid: dict[str, dict[str, Any]] = {i["uuid"]: i for i in identities}
|
||||
chain: dict[str, str] = {}
|
||||
for uuid_ in by_uuid:
|
||||
cur = uuid_
|
||||
for _ in range(_MAX_HOPS):
|
||||
row = by_uuid.get(cur)
|
||||
if row is None:
|
||||
break
|
||||
nxt = row.get("merged_into_uuid")
|
||||
if not nxt or nxt == cur:
|
||||
break
|
||||
cur = nxt
|
||||
chain[uuid_] = cur
|
||||
return chain
|
||||
|
||||
|
||||
async def _link(
|
||||
repo: BaseRepository, observation_uuid: str, identity_uuid: str,
|
||||
) -> bool:
|
||||
"""Set ``attackers.identity_id`` and return ``True`` on success.
|
||||
|
||||
Wraps the repo call so the tick body stays linear and exception
|
||||
handling is consistent across the form / link / merge branches.
|
||||
"""
|
||||
try:
|
||||
await repo.set_attacker_identity_id(observation_uuid, identity_uuid)
|
||||
return True
|
||||
except Exception: # noqa: BLE001
|
||||
log.exception(
|
||||
"clusterer: failed to link obs=%s -> identity=%s",
|
||||
observation_uuid, identity_uuid,
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
__all__ = [
|
||||
"ConnectedComponentsClusterer",
|
||||
"cluster_observations",
|
||||
|
||||
@@ -449,6 +449,31 @@ class BaseRepository(ABC):
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def list_all_identities(self) -> list[dict[str, Any]]:
|
||||
"""Every ``AttackerIdentity`` row, including merged-out ones.
|
||||
|
||||
Distinct from :meth:`list_identities`, which filters out
|
||||
merged-out rows for the de-duped UI list. The clusterer's
|
||||
revocable-merge pass needs to re-evaluate merged-out
|
||||
identities, so it pulls the unfiltered set.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def update_identity_merged_into(
|
||||
self, identity_uuid: str, winner_uuid: Optional[str],
|
||||
) -> None:
|
||||
"""Set or clear ``attacker_identities.merged_into_uuid``.
|
||||
|
||||
Pass ``winner_uuid`` to soft-merge the row into another
|
||||
identity; pass ``None`` to revoke a prior merge (the
|
||||
revocable-merge undo path). Observations stay FK'd to their
|
||||
original identity row throughout — the merge is a soft
|
||||
pointer, not a re-point.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def get_attacker_commands(
|
||||
self,
|
||||
|
||||
@@ -1514,6 +1514,27 @@ class SQLModelRepository(BaseRepository):
|
||||
await session.execute(statement)
|
||||
await session.commit()
|
||||
|
||||
async def list_all_identities(self) -> list[dict[str, Any]]:
|
||||
statement = select(AttackerIdentity).order_by(AttackerIdentity.created_at)
|
||||
async with self._session() as session:
|
||||
result = await session.execute(statement)
|
||||
return [i.model_dump(mode="json") for i in result.scalars().all()]
|
||||
|
||||
async def update_identity_merged_into(
|
||||
self, identity_uuid: str, winner_uuid: Optional[str],
|
||||
) -> None:
|
||||
statement = (
|
||||
update(AttackerIdentity)
|
||||
.where(AttackerIdentity.uuid == identity_uuid)
|
||||
.values(
|
||||
merged_into_uuid=winner_uuid,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
)
|
||||
async with self._session() as session:
|
||||
await session.execute(statement)
|
||||
await session.commit()
|
||||
|
||||
async def get_attacker_commands(
|
||||
self,
|
||||
uuid: str,
|
||||
|
||||
@@ -206,6 +206,151 @@ async def test_tick_keeps_distinct_ja3_separate(repo):
|
||||
assert len(formed["observation_uuids"]) == 1
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_tick_merges_two_identities_when_component_spans_them(repo):
|
||||
"""Two pre-existing identities whose observations now cluster
|
||||
together (e.g. a previously-missing fingerprint shows up) get
|
||||
soft-merged: the smaller-uuid identity wins, the loser's
|
||||
merged_into_uuid is set, observations stay FK'd to their
|
||||
original identity row."""
|
||||
# Tick 1: two distinct fingerprints → two distinct identities.
|
||||
a = await _seed_attacker(repo, "1.1.1.1", ja3="ja3-A")
|
||||
b = await _seed_attacker(repo, "2.2.2.2", ja3="ja3-B")
|
||||
|
||||
c = ConnectedComponentsClusterer()
|
||||
first = await c.tick(repo)
|
||||
assert len(first.identities_formed) == 2
|
||||
|
||||
# Snapshot the two identity uuids; we'll need them after the merge.
|
||||
identities_after_first = await repo.list_all_identities()
|
||||
assert len(identities_after_first) == 2
|
||||
uuids = sorted(i["uuid"] for i in identities_after_first)
|
||||
expected_winner, expected_loser = uuids[0], uuids[1]
|
||||
|
||||
# Tick 2: a bridging observation — fingerprints match BOTH prior
|
||||
# rows. The bridge can't agree with both JA3s simultaneously, so
|
||||
# use a HASSH that matches A and a payload that matches B.
|
||||
# Simulate this with two new attackers, each linking a side.
|
||||
# Simpler: change attacker A's stored fingerprint to also include
|
||||
# ja3-B by re-seeding (in production this would be a fresh
|
||||
# observation that bridges them).
|
||||
bridge = await _seed_attacker(repo, "3.3.3.3", ja3="ja3-A", hassh="hassh-bridge")
|
||||
# Make B's row carry the same hassh so the bridge can union them.
|
||||
import json as _json
|
||||
from datetime import datetime, timezone
|
||||
now = datetime.now(timezone.utc)
|
||||
await repo.upsert_attacker({
|
||||
"ip": "2.2.2.2", "first_seen": now, "last_seen": now,
|
||||
"event_count": 1,
|
||||
"fingerprints": _json.dumps([
|
||||
{"kind": "ja3", "hash": "ja3-B"},
|
||||
{"kind": "hassh", "hash": "hassh-bridge"},
|
||||
]),
|
||||
})
|
||||
|
||||
second = await c.tick(repo)
|
||||
assert len(second.identities_merged) == 1
|
||||
merge = second.identities_merged[0]
|
||||
assert merge["winner_uuid"] == expected_winner
|
||||
assert merge["loser_uuid"] == expected_loser
|
||||
|
||||
# The loser's row still exists with merged_into_uuid set.
|
||||
all_after = {i["uuid"]: i for i in await repo.list_all_identities()}
|
||||
assert all_after[expected_loser]["merged_into_uuid"] == expected_winner
|
||||
assert all_after[expected_winner]["merged_into_uuid"] is None
|
||||
|
||||
# Observations stay FK'd to their original identity row — the
|
||||
# merge is a soft pointer, NOT a re-point.
|
||||
a_row = await repo.get_attacker_by_uuid(a)
|
||||
b_row = await repo.get_attacker_by_uuid(b)
|
||||
assert a_row["identity_id"] in {expected_winner, expected_loser}
|
||||
assert b_row["identity_id"] in {expected_winner, expected_loser}
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_tick_unmerges_when_observations_diverge(repo):
|
||||
"""Pre-seed a soft-merged pair, then change the underlying
|
||||
observations so they no longer cluster. The tick must clear
|
||||
merged_into_uuid and emit identities_unmerged."""
|
||||
import json as _json
|
||||
from datetime import datetime, timezone
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
# Two attackers with same JA3 → tick merges them via shared
|
||||
# high-tier signal (one identity formed).
|
||||
a = await _seed_attacker(repo, "1.1.1.1", ja3="ja3-shared")
|
||||
b = await _seed_attacker(repo, "2.2.2.2", ja3="ja3-shared")
|
||||
c = ConnectedComponentsClusterer()
|
||||
first = await c.tick(repo)
|
||||
assert len(first.identities_formed) == 1
|
||||
one_identity_uuid = first.identities_formed[0]["identity_uuid"]
|
||||
|
||||
# Force a soft-merge state: split observation b out into its own
|
||||
# identity, then merge that back into the first via the repo
|
||||
# directly. This emulates a state the clusterer would have
|
||||
# arrived at across multiple ticks (form, then merge).
|
||||
second_uuid = "00000000-0000-0000-0000-00000000bbbb"
|
||||
await repo.create_attacker_identity({
|
||||
"uuid": second_uuid,
|
||||
"schema_version": 1,
|
||||
"first_seen_at": now, "last_seen_at": now,
|
||||
"created_at": now, "updated_at": now,
|
||||
"observation_count": 1,
|
||||
})
|
||||
await repo.set_attacker_identity_id(b, second_uuid)
|
||||
# Soft-merge second_uuid into one_identity_uuid (winner).
|
||||
winner = min(one_identity_uuid, second_uuid)
|
||||
loser = max(one_identity_uuid, second_uuid)
|
||||
if loser == one_identity_uuid:
|
||||
# Make the canonical mapping consistent with the test setup —
|
||||
# we need the merge to be "loser → winner" by min-uuid rule.
|
||||
# Swap ownership so the smaller-uuid keeps the active observations.
|
||||
await repo.set_attacker_identity_id(a, winner)
|
||||
await repo.set_attacker_identity_id(b, loser)
|
||||
await repo.update_identity_merged_into(loser, winner)
|
||||
|
||||
# Verify the soft-merge is in place.
|
||||
pre = {i["uuid"]: i for i in await repo.list_all_identities()}
|
||||
assert pre[loser]["merged_into_uuid"] == winner
|
||||
|
||||
# Now change the underlying fingerprints so a and b no longer cluster.
|
||||
await repo.upsert_attacker({
|
||||
"ip": "2.2.2.2", "first_seen": now, "last_seen": now,
|
||||
"event_count": 1,
|
||||
"fingerprints": _json.dumps([{"kind": "ja3", "hash": "ja3-different"}]),
|
||||
})
|
||||
|
||||
# Tick should detect the divergence and revoke the merge.
|
||||
third = await c.tick(repo)
|
||||
assert len(third.identities_unmerged) == 1
|
||||
unmerged = third.identities_unmerged[0]
|
||||
assert unmerged["resurrected_uuid"] == loser
|
||||
assert unmerged["former_winner_uuid"] == winner
|
||||
|
||||
post = {i["uuid"]: i for i in await repo.list_all_identities()}
|
||||
assert post[loser]["merged_into_uuid"] is None
|
||||
assert post[winner]["merged_into_uuid"] is None
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_tick_is_idempotent_under_no_changes(repo):
|
||||
"""Running tick twice with no state changes between produces no
|
||||
side-effects on the second run."""
|
||||
await _seed_attacker(repo, "1.1.1.1", ja3="ja3-x")
|
||||
await _seed_attacker(repo, "2.2.2.2", ja3="ja3-x")
|
||||
await _seed_attacker(repo, "3.3.3.3", ja3="ja3-y")
|
||||
|
||||
c = ConnectedComponentsClusterer()
|
||||
first = await c.tick(repo)
|
||||
second = await c.tick(repo)
|
||||
assert second.identities_formed == []
|
||||
assert second.observations_linked == []
|
||||
assert second.identities_merged == []
|
||||
assert second.identities_unmerged == []
|
||||
# Sanity: the first tick did do something.
|
||||
assert first.identities_formed
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_tick_links_new_observation_to_existing_identity(repo):
|
||||
"""First tick: 2 attackers cluster into one identity. Second tick:
|
||||
|
||||
@@ -69,6 +69,8 @@ class DummyRepo(BaseRepository):
|
||||
async def list_attackers_for_clustering(self, limit=None): await super().list_attackers_for_clustering(limit); return []
|
||||
async def create_attacker_identity(self, row): await super().create_attacker_identity(row); return ""
|
||||
async def set_attacker_identity_id(self, a, i): await super().set_attacker_identity_id(a, i)
|
||||
async def list_all_identities(self): await super().list_all_identities(); return []
|
||||
async def update_identity_merged_into(self, u, w): await super().update_identity_merged_into(u, w)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_base_repo_coverage():
|
||||
@@ -139,6 +141,9 @@ async def test_base_repo_coverage():
|
||||
await dr.list_attackers_for_clustering()
|
||||
await dr.create_attacker_identity({"uuid": "i"})
|
||||
await dr.set_attacker_identity_id("a", "i")
|
||||
await dr.list_all_identities()
|
||||
await dr.update_identity_merged_into("a", "b")
|
||||
await dr.update_identity_merged_into("a", None)
|
||||
|
||||
# Swarm methods: default NotImplementedError on BaseRepository. Covering
|
||||
# them here keeps the coverage contract honest for the swarm CRUD surface.
|
||||
|
||||
Reference in New Issue
Block a user