feat(clustering): link identities by keystroke-rhythm proximity
Campaign clusterer gains a keystroke edge: when two identities' kd_digraph_simhash centroids are within KD_HAMMING_MAX bits, a graded weight (1.0 at identical, fading to 0 at the cutoff) feeds the campaign graph. Supporting tier (0.6) — a typing match plus temporal overlap reaches threshold, but typing alone never merges (FP guard against coarse, noisy terminal timing). Projects the column through IdentityFeatures + from_identity_row.
This commit is contained in:
75
tests/clustering/test_campaign_keystroke.py
Normal file
75
tests/clustering/test_campaign_keystroke.py
Normal file
@@ -0,0 +1,75 @@
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
"""Campaign-level keystroke-rhythm edge.
|
||||
|
||||
The digraph-SimHash centroid is a *supporting* signal: a typing match
|
||||
alone must not merge two identities (FP guard), but it tips an otherwise
|
||||
sub-threshold pair (e.g. co-temporal identities) into one campaign.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from decnet.clustering.campaign.impl.connected_components import from_identity_row
|
||||
from decnet.clustering.campaign.impl.similarity import (
|
||||
CAMPAIGN_EDGE_THRESHOLD,
|
||||
KD_HAMMING_MAX,
|
||||
IdentityFeatures,
|
||||
combined_campaign_weight,
|
||||
keystroke_weight,
|
||||
)
|
||||
from decnet.util.simhash import to_bytes8
|
||||
|
||||
_H = 0xABCD1234ABCD1234
|
||||
|
||||
|
||||
def _flip_low_bits(value: int, n: int) -> int:
|
||||
"""XOR the n low bits → a hash exactly n bits away from ``value``."""
|
||||
return value ^ ((1 << n) - 1)
|
||||
|
||||
|
||||
def test_identical_rhythm_is_full_weight() -> None:
|
||||
a = IdentityFeatures("a", kd_digraph_simhash=_H)
|
||||
b = IdentityFeatures("b", kd_digraph_simhash=_H)
|
||||
assert keystroke_weight(a, b) == 1.0
|
||||
|
||||
|
||||
def test_missing_centroid_is_zero() -> None:
|
||||
a = IdentityFeatures("a", kd_digraph_simhash=_H)
|
||||
b = IdentityFeatures("b") # no biometric yet
|
||||
assert keystroke_weight(a, b) == 0.0
|
||||
|
||||
|
||||
def test_weight_grades_by_hamming() -> None:
|
||||
half = KD_HAMMING_MAX // 2
|
||||
a = IdentityFeatures("a", kd_digraph_simhash=_H)
|
||||
b = IdentityFeatures("b", kd_digraph_simhash=_flip_low_bits(_H, half))
|
||||
assert keystroke_weight(a, b) == 1.0 - half / KD_HAMMING_MAX
|
||||
|
||||
|
||||
def test_far_apart_contributes_nothing() -> None:
|
||||
a = IdentityFeatures("a", kd_digraph_simhash=_H)
|
||||
b = IdentityFeatures("b", kd_digraph_simhash=_flip_low_bits(_H, KD_HAMMING_MAX))
|
||||
assert keystroke_weight(a, b) == 0.0
|
||||
|
||||
|
||||
def test_typing_alone_does_not_merge() -> None:
|
||||
# FP guard: identical rhythm, no other signal → below threshold.
|
||||
a = IdentityFeatures("a", kd_digraph_simhash=_H)
|
||||
b = IdentityFeatures("b", kd_digraph_simhash=_H)
|
||||
assert combined_campaign_weight(a, b) < CAMPAIGN_EDGE_THRESHOLD
|
||||
|
||||
|
||||
def test_typing_plus_temporal_overlap_crosses_threshold() -> None:
|
||||
window = ((0.0, 100.0),)
|
||||
a = IdentityFeatures("a", kd_digraph_simhash=_H, session_windows=window)
|
||||
b = IdentityFeatures("b", kd_digraph_simhash=_H, session_windows=window)
|
||||
# temporal overlap (0.4) + keystroke (0.6) reaches the 1.0 threshold.
|
||||
assert combined_campaign_weight(a, b) >= CAMPAIGN_EDGE_THRESHOLD
|
||||
# Strip the biometric and the same co-temporal pair falls back under.
|
||||
a2 = IdentityFeatures("a", session_windows=window)
|
||||
b2 = IdentityFeatures("b", session_windows=window)
|
||||
assert combined_campaign_weight(a2, b2) < CAMPAIGN_EDGE_THRESHOLD
|
||||
|
||||
|
||||
def test_from_identity_row_projects_bytes_and_none() -> None:
|
||||
feat = from_identity_row({"uuid": "x", "kd_digraph_simhash": to_bytes8(_H)})
|
||||
assert feat.kd_digraph_simhash == _H
|
||||
assert from_identity_row({"uuid": "y"}).kd_digraph_simhash is None
|
||||
Reference in New Issue
Block a user