From c9e4bf4022f57819f54bf44b1cc8c2bef44249e5 Mon Sep 17 00:00:00 2001 From: anti Date: Tue, 16 Jun 2026 17:09:42 -0400 Subject: [PATCH] feat(clustering): link identities by keystroke-rhythm proximity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Campaign clusterer gains a keystroke edge: when two identities' kd_digraph_simhash centroids are within KD_HAMMING_MAX bits, a graded weight (1.0 at identical, fading to 0 at the cutoff) feeds the campaign graph. Supporting tier (0.6) — a typing match plus temporal overlap reaches threshold, but typing alone never merges (FP guard against coarse, noisy terminal timing). Projects the column through IdentityFeatures + from_identity_row. --- .../campaign/impl/connected_components.py | 21 ++++++ decnet/clustering/campaign/impl/similarity.py | 37 +++++++++ tests/clustering/test_campaign_keystroke.py | 75 +++++++++++++++++++ 3 files changed, 133 insertions(+) create mode 100644 tests/clustering/test_campaign_keystroke.py diff --git a/decnet/clustering/campaign/impl/connected_components.py b/decnet/clustering/campaign/impl/connected_components.py index 9e8139d3..28429ef4 100644 --- a/decnet/clustering/campaign/impl/connected_components.py +++ b/decnet/clustering/campaign/impl/connected_components.py @@ -31,6 +31,7 @@ from decnet.clustering.campaign.impl.similarity import ( combined_campaign_weight, ) from decnet.logging import get_logger +from decnet.util.simhash import from_bytes8 from decnet.web.db.repository import BaseRepository log = get_logger("clustering.campaign.connected_components") @@ -94,6 +95,7 @@ def from_identity_row( payload_hashes = _parse_json_list(row.get("payload_simhashes")) c2_endpoints = _parse_json_list(row.get("c2_endpoints")) + kd_digraph_simhash = _parse_kd_simhash(row.get("kd_digraph_simhash")) first_phase_per_decky: dict[str, str] = {} last_phase_per_decky: dict[str, str] = {} @@ -127,6 +129,7 @@ def from_identity_row( identity_uuid=row["uuid"], payload_hashes=frozenset(payload_hashes), c2_endpoints=frozenset(c2_endpoints), + kd_digraph_simhash=kd_digraph_simhash, decky_set=frozenset(decky_set), first_phase_per_decky=first_phase_per_decky, last_phase_per_decky=last_phase_per_decky, @@ -135,6 +138,24 @@ def from_identity_row( ) +def _parse_kd_simhash(raw: Any) -> Optional[int]: + """Project the ``kd_digraph_simhash`` column into a 64-bit int. + + The column is 8 raw bytes (BINARY(8) / BLOB) or NULL; a hex string + is tolerated for fixture rows. Returns ``None`` on absent/malformed. + """ + if raw is None: + return None + if isinstance(raw, (bytes, bytearray)) and len(raw) == 8: + return from_bytes8(bytes(raw)) + if isinstance(raw, str) and len(raw) == 16: + try: + return from_bytes8(bytes.fromhex(raw)) + except ValueError: + return None + return None + + def _parse_json_list(raw: Optional[str]) -> list[str]: if not raw: return [] diff --git a/decnet/clustering/campaign/impl/similarity.py b/decnet/clustering/campaign/impl/similarity.py index 1e1e3678..0d071321 100644 --- a/decnet/clustering/campaign/impl/similarity.py +++ b/decnet/clustering/campaign/impl/similarity.py @@ -42,6 +42,8 @@ from __future__ import annotations from dataclasses import dataclass, field from typing import Mapping, Optional +from decnet.util.simhash import hamming64 + # ─── Identity-level projection ────────────────────────────────────────────── @@ -74,6 +76,11 @@ class IdentityFeatures: c2_endpoints: frozenset[str] = field(default_factory=frozenset) """Aggregated C2 endpoints across member observations.""" + kd_digraph_simhash: Optional[int] = None + """Identity's keystroke-rhythm centroid as a 64-bit int (the + ``AttackerIdentity.kd_digraph_simhash`` column). ``None`` until the + identity has enough live-typed sessions for a fingerprint.""" + decky_set: frozenset[str] = field(default_factory=frozenset) """Aggregated decky IDs the identity touched.""" @@ -305,11 +312,22 @@ def cohort_weight(a: IdentityFeatures, b: IdentityFeatures) -> float: #: 0; ASN+decky overlap fires cohort but at 0.1 stays well below #: threshold. F2 vpn_hopping is folded by the identity layer first, #: so the campaign clusterer sees one identity → one campaign. +#: Max Hamming distance (of 64 bits) at which two identities' keystroke- +#: rhythm centroids still count as the same typist. Beyond this the +#: biometric contributes nothing. Conservative — same-typist hashes are +#: typically <6 bits apart (see toolchain.payload.payload_simhash notes). +KD_HAMMING_MAX: int = 8 + CAMPAIGN_TIER_WEIGHTS: dict[str, float] = { "phase_handoff": 1.0, "shared_infra": 1.0, "temporal_overlap": 0.4, "cohort": 0.1, + # Keystroke biometric is a strong *supporting* signal — 0.6 means a + # typing match plus temporal overlap (0.4) reaches threshold, but a + # typing match alone never merges two identities (FP guard: terminal + # timing is noisy and the bucketing is coarse). + "keystroke": 0.6, } #: Threshold a combined campaign-edge weight must meet to survive @@ -337,9 +355,26 @@ def combined_campaign_weight( + CAMPAIGN_TIER_WEIGHTS["temporal_overlap"] * temporal_overlap_weight(a, b) + CAMPAIGN_TIER_WEIGHTS["cohort"] * cohort_weight(a, b) + + CAMPAIGN_TIER_WEIGHTS["keystroke"] * keystroke_weight(a, b) ) +def keystroke_weight(a: IdentityFeatures, b: IdentityFeatures) -> float: + """Keystroke-rhythm proximity ∈ [0, 1] from the two identities' + digraph-SimHash centroids. + + Graded by Hamming distance: identical rhythm → 1.0, fading linearly + to 0.0 at ``KD_HAMMING_MAX`` bits apart (and beyond). ``0.0`` when + either identity has no centroid yet. Pure / time-agnostic. + """ + if a.kd_digraph_simhash is None or b.kd_digraph_simhash is None: + return 0.0 + dist = hamming64(a.kd_digraph_simhash, b.kd_digraph_simhash) + if dist >= KD_HAMMING_MAX: + return 0.0 + return 1.0 - dist / KD_HAMMING_MAX + + # ─── Adapter for synthetic-fixture tests ──────────────────────────────────── @@ -432,6 +467,7 @@ __all__ = [ "shared_infra_weight", "temporal_overlap_weight", "cohort_weight", + "keystroke_weight", "combined_campaign_weight", "from_synthetic_identity", "HANDOFF_OUT_PHASES", @@ -439,4 +475,5 @@ __all__ = [ "DEFAULT_HANDOFF_WINDOW_S", "CAMPAIGN_TIER_WEIGHTS", "CAMPAIGN_EDGE_THRESHOLD", + "KD_HAMMING_MAX", ] diff --git a/tests/clustering/test_campaign_keystroke.py b/tests/clustering/test_campaign_keystroke.py new file mode 100644 index 00000000..6ac7e0d7 --- /dev/null +++ b/tests/clustering/test_campaign_keystroke.py @@ -0,0 +1,75 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""Campaign-level keystroke-rhythm edge. + +The digraph-SimHash centroid is a *supporting* signal: a typing match +alone must not merge two identities (FP guard), but it tips an otherwise +sub-threshold pair (e.g. co-temporal identities) into one campaign. +""" +from __future__ import annotations + +from decnet.clustering.campaign.impl.connected_components import from_identity_row +from decnet.clustering.campaign.impl.similarity import ( + CAMPAIGN_EDGE_THRESHOLD, + KD_HAMMING_MAX, + IdentityFeatures, + combined_campaign_weight, + keystroke_weight, +) +from decnet.util.simhash import to_bytes8 + +_H = 0xABCD1234ABCD1234 + + +def _flip_low_bits(value: int, n: int) -> int: + """XOR the n low bits → a hash exactly n bits away from ``value``.""" + return value ^ ((1 << n) - 1) + + +def test_identical_rhythm_is_full_weight() -> None: + a = IdentityFeatures("a", kd_digraph_simhash=_H) + b = IdentityFeatures("b", kd_digraph_simhash=_H) + assert keystroke_weight(a, b) == 1.0 + + +def test_missing_centroid_is_zero() -> None: + a = IdentityFeatures("a", kd_digraph_simhash=_H) + b = IdentityFeatures("b") # no biometric yet + assert keystroke_weight(a, b) == 0.0 + + +def test_weight_grades_by_hamming() -> None: + half = KD_HAMMING_MAX // 2 + a = IdentityFeatures("a", kd_digraph_simhash=_H) + b = IdentityFeatures("b", kd_digraph_simhash=_flip_low_bits(_H, half)) + assert keystroke_weight(a, b) == 1.0 - half / KD_HAMMING_MAX + + +def test_far_apart_contributes_nothing() -> None: + a = IdentityFeatures("a", kd_digraph_simhash=_H) + b = IdentityFeatures("b", kd_digraph_simhash=_flip_low_bits(_H, KD_HAMMING_MAX)) + assert keystroke_weight(a, b) == 0.0 + + +def test_typing_alone_does_not_merge() -> None: + # FP guard: identical rhythm, no other signal → below threshold. + a = IdentityFeatures("a", kd_digraph_simhash=_H) + b = IdentityFeatures("b", kd_digraph_simhash=_H) + assert combined_campaign_weight(a, b) < CAMPAIGN_EDGE_THRESHOLD + + +def test_typing_plus_temporal_overlap_crosses_threshold() -> None: + window = ((0.0, 100.0),) + a = IdentityFeatures("a", kd_digraph_simhash=_H, session_windows=window) + b = IdentityFeatures("b", kd_digraph_simhash=_H, session_windows=window) + # temporal overlap (0.4) + keystroke (0.6) reaches the 1.0 threshold. + assert combined_campaign_weight(a, b) >= CAMPAIGN_EDGE_THRESHOLD + # Strip the biometric and the same co-temporal pair falls back under. + a2 = IdentityFeatures("a", session_windows=window) + b2 = IdentityFeatures("b", session_windows=window) + assert combined_campaign_weight(a2, b2) < CAMPAIGN_EDGE_THRESHOLD + + +def test_from_identity_row_projects_bytes_and_none() -> None: + feat = from_identity_row({"uuid": "x", "kd_digraph_simhash": to_bytes8(_H)}) + assert feat.kd_digraph_simhash == _H + assert from_identity_row({"uuid": "y"}).kd_digraph_simhash is None