feat(profiler): extract motor.digraph_simhash keystroke biometric

Per-session 64-bit SimHash of inter-keystroke digraph flight times:
walk single-char input events, accumulate flight time per (c1,c2),
bucket the median, Charikar-SimHash the bucketed pairs. Locality-
sensitive so the same typist is Hamming-close across sessions; pastes
and think-pauses break the chain; silent below the sample-size floor.

New shared decnet/util/simhash.py (simhash64/hamming64/bytes helpers).
Registered as a conditional Tier-A primitive (count 37->38); requires
behave-shell>=0.1.2.
This commit is contained in:
2026-06-16 16:59:57 -04:00
parent 372375194c
commit 66c73ce59d
12 changed files with 283 additions and 5 deletions

View File

@@ -52,6 +52,7 @@ from decnet.profiler.behave_shell._features.temporal import (
)
from decnet.profiler.behave_shell._features.motor import (
command_chunking,
digraph_simhash,
error_correction,
input_modality,
keystroke_cadence,
@@ -68,6 +69,7 @@ FEATURES: tuple[FeatureFn, ...] = (
input_modality,
paste_burst_rate,
keystroke_cadence,
digraph_simhash,
motor_stability,
error_correction,
command_chunking,

View File

@@ -15,13 +15,18 @@ from behave_core.spec.envelope import Observation
from decnet.profiler.behave_shell._ctx import SessionContext
from decnet.profiler.behave_shell._features._emit import make_observation
from decnet.util.simhash import simhash64
from decnet.profiler.behave_shell._thresholds import (
BACKSPACE_IMMEDIATE_MAX_S,
CMD_CHUNKING_FLUENT_CV_MAX,
CV_BURSTY_MAX,
CV_MACHINE_MAX,
CV_STEADY_MAX,
DIGRAPH_FLIGHT_BUCKETS_S,
IKI_MACHINE_MAX_S,
IKI_THINK_MAX_S,
MIN_DIGRAPH_SAMPLES,
MIN_DIGRAPHS_FOR_SIMHASH,
MIN_INPUTS_FOR_CADENCE,
MODALITY_PASTED_MIN,
MODALITY_TYPED_MAX,
@@ -421,3 +426,64 @@ def pipe_chaining_depth(ctx: SessionContext) -> Iterator[Observation]:
value=value,
confidence=confidence,
)
def _flight_bucket(seconds: float) -> int:
"""Quantize a digraph flight time into a coarse log bucket index.
Returns 0..len(DIGRAPH_FLIGHT_BUCKETS_S); the coarseness is what lets
the same typist collide across sessions despite per-keystroke jitter.
"""
for i, edge in enumerate(DIGRAPH_FLIGHT_BUCKETS_S):
if seconds < edge:
return i
return len(DIGRAPH_FLIGHT_BUCKETS_S)
def digraph_simhash(ctx: SessionContext) -> Iterator[Observation]:
"""Emit ``motor.digraph_simhash`` — a 64-bit LSH fingerprint of the
operator's per-digraph keystroke flight times.
For each consecutive pair of single-char input keystrokes ``(c1, c2)``
the flight time is the inter-event gap. Pastes / escape sequences
(multi-char events) and think-pauses (> ``IKI_THINK_MAX_S``) break the
chain so they don't pollute timing. Each digraph's *median* flight is
bucketed; ``(c1c2, bucket)`` tokens are SimHashed (weighted by sample
count) so the same typist lands Hamming-close across sessions, while a
faster/slower or different-vocabulary operator separates.
Stays silent below ``MIN_DIGRAPHS_FOR_SIMHASH`` distinct pairs or
``MIN_DIGRAPH_SAMPLES`` total samples — too little signal to fingerprint.
"""
flights: dict[str, list[float]] = {}
prev_t: float | None = None
prev_c: str | None = None
for t, _kind, data in ctx.input_events:
if len(data) != 1:
# paste / control / escape burst — not a single keystroke
prev_t = prev_c = None
continue
if prev_c is not None and prev_t is not None:
dt = t - prev_t
if 0.0 < dt <= IKI_THINK_MAX_S:
flights.setdefault(prev_c + data, []).append(dt)
prev_t, prev_c = t, data
total = sum(len(v) for v in flights.values())
if len(flights) < MIN_DIGRAPHS_FOR_SIMHASH or total < MIN_DIGRAPH_SAMPLES:
return
tokens = {
f"{digraph}:{_flight_bucket(statistics.median(dts))}": len(dts)
for digraph, dts in flights.items()
}
# Confidence grows with the number of distinct digraphs (more pairs =
# more stable fingerprint), capped at 0.95 — never claim certainty on a
# biometric inferred from terminal timing.
confidence = min(0.95, 0.40 + 0.05 * len(flights))
yield make_observation(
ctx,
primitive="motor.digraph_simhash",
value=format(simhash64(tokens), "016x"),
confidence=confidence,
)

View File

@@ -292,6 +292,17 @@ CV_BURSTY_MAX: float = 1.50
# Need this many input events before we'll claim a cadence at all.
MIN_INPUTS_FOR_CADENCE: int = 5
# ── motor.digraph_simhash (keystroke-rhythm biometric) ──────────────────────
# A digraph is two consecutive single-char keystrokes; its flight time is the
# inter-event gap. We need enough distinct pairs AND enough samples before the
# SimHash is stable enough to fingerprint a typist — below this it's noise.
MIN_DIGRAPHS_FOR_SIMHASH: int = 8
MIN_DIGRAPH_SAMPLES: int = 20
# Median flight time per digraph is quantized into these log-spaced buckets
# (upper edges, seconds). Coarse on purpose: the same typist must land in the
# same bucket despite jitter, while a clearly faster/slower typist separates.
DIGRAPH_FLIGHT_BUCKETS_S: tuple[float, ...] = (0.03, 0.06, 0.12, 0.25, 0.5, 1.0)
# ── motor.motor_stability (Step B.2) ────────────────────────────────────────
# Tremor proxy: fraction of within-burst IATs below TREMOR_FAST_FLOOR_S
# (30 ms — physiologically implausible double-press floor; humans can't

2
decnet/util/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Small cross-cutting helpers with no domain home of their own."""

66
decnet/util/simhash.py Normal file
View File

@@ -0,0 +1,66 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Charikar 64-bit SimHash + Hamming helpers.
Locality-sensitive fingerprint: inputs that share most weighted tokens
produce hashes a few bits apart (small Hamming distance), so near-
duplicates cluster without storing the raw feature vector. Used by the
keystroke-digraph biometric (``decnet/profiler/.../motor.py``) and the
campaign clusterer's typing-similarity edge.
ponytail: ``templates/smtp/server.py:_body_simhash`` is the same
algorithm, inlined to keep slim decky containers from importing decnet.
Left as-is to avoid pulling decnet into decky images; dedup here only if
a third caller appears.
"""
from __future__ import annotations
import hashlib
from collections.abc import Mapping
_BITS = 64
_MASK = (1 << _BITS) - 1
def simhash64(weighted_tokens: Mapping[str, int]) -> int:
"""Charikar 64-bit SimHash over frequency-weighted tokens.
Returns 0 on empty/all-zero-weight input — callers treat 0 as "no
signal". Per-token hash is md5[:8]: a content fingerprint, not a
security primitive.
"""
if not weighted_tokens:
return 0
bits = [0] * _BITS
for tok, weight in weighted_tokens.items():
if weight <= 0:
continue
h = int.from_bytes(
# Content fingerprint, not a security primitive — md5[:8] is fast
# and 64 bits is all we need; usedforsecurity=False clears B324.
hashlib.md5(
tok.encode("utf-8", errors="replace"), usedforsecurity=False,
).digest()[:8],
"big",
)
for i in range(_BITS):
bits[i] += weight if (h >> i) & 1 else -weight
out = 0
for i in range(_BITS):
if bits[i] > 0:
out |= (1 << i)
return out
def hamming64(a: int, b: int) -> int:
"""Number of differing bits between two 64-bit ints."""
return ((a ^ b) & _MASK).bit_count()
def to_bytes8(value: int) -> bytes:
"""64-bit int → 8 big-endian bytes (for ``BINARY(8)`` storage)."""
return (value & _MASK).to_bytes(8, "big")
def from_bytes8(raw: bytes) -> int:
"""8 big-endian bytes → 64-bit int."""
return int.from_bytes(raw, "big")

View File

@@ -1139,6 +1139,25 @@ own plan.
---
## Post-v0 addition — `motor.digraph_simhash` (38th Tier-A primitive)
Added in behave-shell 0.1.2 (the v0 corpus above was 37). It is the
**keystroke-rhythm biometric**: a 64-bit Charikar SimHash of the
operator's per-digraph (two-key) flight times, bucketed per character
pair. Locality-sensitive — the same typist lands Hamming-close across
sessions and decoys, so it links one human behind multiple identities.
- **Extractor:** `_features/motor.py:digraph_simhash`, `ValueKind.HASH`,
conditional (rides `MIN_DIGRAPHS_FOR_SIMHASH` / `MIN_DIGRAPH_SAMPLES`
floors; lives in `PHASE_G_CONDITIONAL_PRIMITIVES`). Live-typed input
only — pastes/escape bursts break the digraph chain.
- **Rollup:** the identity clusterer folds the session SimHashes into a
bitwise-majority centroid written to `AttackerIdentity.kd_digraph_simhash`;
the campaign clusterer adds a Hamming-proximity edge. STIX export
carries the centroid (hex). Tier-A count is now **38**.
---
**Owner:** ANTI.
**Implementation gate:** Step 0 starts after this doc is reviewed +
Phase 1 of `BEHAVE-INTEGRATION.md` lands (storage table exists).

View File

@@ -59,7 +59,7 @@ dependencies = [
# bus event adapter consumed by decnet/profiler/behave_shell/. Pin
# range tracks BEHAVE-INTEGRATION.md §"Versioning".
"behave-core>=0.1.0,<0.2",
"behave-shell>=0.1.0,<0.2",
"behave-shell>=0.1.2,<0.2",
# STIX → MISP conversion: CIRCL-maintained reference converter used by
# MISP itself. Pulls pymisp transitively (needed for MISPEvent output).
"misp-stix>=2026.4",

View File

@@ -109,6 +109,7 @@ PHASE_G_CONDITIONAL_PRIMITIVES: frozenset[str] = frozenset({
"emotional_valence.arousal", # needs typing bursts
"emotional_valence.valence", # needs ≥ 80 typed letters
"emotional_valence.frustration_venting", # needs ≥ 30 typed letters
"motor.digraph_simhash", # needs ≥ 8 distinct digraphs / ≥ 20 samples
})
# Backwards-compatible aliases for any external import — earlier phases

View File

@@ -0,0 +1,69 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""``motor.digraph_simhash`` — keystroke-rhythm biometric.
Builds typed input streams (single-char ``"i"`` events at a fixed
inter-key gap) and asserts the LSH property: same typist → Hamming-close,
different cadence → far apart, pastes excluded, thin sessions silent.
"""
from __future__ import annotations
from decnet.profiler.behave_shell import extract_session
from decnet.profiler.behave_shell._parse import AsciinemaEvent
from decnet.util.simhash import from_bytes8, hamming64
# A realistic multi-command session: plenty of distinct digraphs, > 20 samples.
_PHRASE = "ls -la /etc; cat /etc/passwd; whoami; uname -a; netstat -tlnp\r"
def _typed(phrase: str, dt: float, *, start: float = 0.0) -> list[AsciinemaEvent]:
events: list[AsciinemaEvent] = []
t = start
for ch in phrase:
events.append((t, "i", ch))
t += dt
return events
def _digraph_obs(events: list[AsciinemaEvent], sid: str):
out = list(extract_session(events, sid=sid))
obs = [o for o in out if o.primitive == "motor.digraph_simhash"]
return obs
def _hash_int(obs) -> int:
return from_bytes8(bytes.fromhex(obs.value))
def test_emits_one_observation_for_a_normal_session() -> None:
obs = _digraph_obs(_typed(_PHRASE, 0.12), "dg-basic")
assert len(obs) == 1
assert len(obs[0].value) == 16 # 64-bit hex
assert 0.0 < obs[0].confidence <= 0.95
def test_same_typist_identical_timing_is_identical() -> None:
a = _digraph_obs(_typed(_PHRASE, 0.12), "dg-a")[0]
b = _digraph_obs(_typed(_PHRASE, 0.12), "dg-b")[0]
# Identical text + timing → identical fingerprint (0 Hamming).
assert hamming64(_hash_int(a), _hash_int(b)) == 0
def test_different_cadence_separates() -> None:
fast = _digraph_obs(_typed(_PHRASE, 0.05), "dg-fast")[0]
slow = _digraph_obs(_typed(_PHRASE, 0.45), "dg-slow")[0]
# Same vocabulary, very different flight-time buckets → the hashes diverge.
assert hamming64(_hash_int(fast), _hash_int(slow)) > 0
def test_pastes_do_not_form_digraphs() -> None:
# A session made of large paste events (len >= 4) carries no single-char
# keystrokes, so no digraphs and no observation.
events: list[AsciinemaEvent] = [
(float(i), "i", "sudo apt-get update") for i in range(10)
]
assert _digraph_obs(events, "dg-paste") == []
def test_thin_session_is_silent() -> None:
# Below MIN_DIGRAPHS_FOR_SIMHASH / MIN_DIGRAPH_SAMPLES → no emission.
assert _digraph_obs(_typed("ls\r", 0.1), "dg-thin") == []

View File

@@ -91,10 +91,14 @@ def test_no_extractor_set_drifts_from_registry() -> None:
)
def test_tier_a_count_is_37() -> None:
"""Sanity check: Tier-A count matches the design doc (37 primitives)."""
assert len(_tier_a_primitives()) == 37, (
f"Expected 37 Tier-A primitives per BEHAVE-EXTRACTOR.md; "
def test_tier_a_count_is_38() -> None:
"""Sanity check: Tier-A count matches the design doc.
38 since behave-shell 0.1.2 added ``motor.digraph_simhash`` (the
keystroke-rhythm biometric); was 37.
"""
assert len(_tier_a_primitives()) == 38, (
f"Expected 38 Tier-A primitives per BEHAVE-EXTRACTOR.md; "
f"got {len(_tier_a_primitives())}. Update Phase H if the "
f"spec genuinely changed, or adjust TIER_B_ALLOWLIST."
)

0
tests/util/__init__.py Normal file
View File

View File

@@ -0,0 +1,38 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Charikar SimHash util — determinism, LSH property, byte round-trip."""
from __future__ import annotations
from decnet.util.simhash import from_bytes8, hamming64, simhash64, to_bytes8
def test_empty_or_zero_weight_is_zero() -> None:
assert simhash64({}) == 0
assert simhash64({"a": 0, "b": -3}) == 0 # non-positive weights skipped
def test_deterministic() -> None:
tokens = {"th": 3, "he": 2, "er": 1}
assert simhash64(tokens) == simhash64(dict(tokens))
def test_near_duplicate_low_hamming() -> None:
base = {f"dg{i}": (i % 5) + 1 for i in range(40)}
identical = dict(base)
perturbed = dict(base)
perturbed["NEW_PAIR"] = 1 # one extra low-weight token
assert hamming64(simhash64(base), simhash64(identical)) == 0
assert hamming64(simhash64(base), simhash64(perturbed)) <= 8
def test_disjoint_high_hamming() -> None:
a = {f"a{i}": 2 for i in range(30)}
b = {f"b{i}": 2 for i in range(30)}
# Unrelated token sets ≈ half the 64 bits differ; comfortably ≥ 20.
assert hamming64(simhash64(a), simhash64(b)) >= 20
def test_bytes_roundtrip_is_8_bytes() -> None:
h = simhash64({"x": 1, "y": 2, "z": 5})
raw = to_bytes8(h)
assert isinstance(raw, bytes) and len(raw) == 8
assert from_bytes8(raw) == h