diff --git a/decnet/profiler/behave_shell/_features/__init__.py b/decnet/profiler/behave_shell/_features/__init__.py index b8c78b6c..eef51b91 100644 --- a/decnet/profiler/behave_shell/_features/__init__.py +++ b/decnet/profiler/behave_shell/_features/__init__.py @@ -52,6 +52,7 @@ from decnet.profiler.behave_shell._features.temporal import ( ) from decnet.profiler.behave_shell._features.motor import ( command_chunking, + digraph_simhash, error_correction, input_modality, keystroke_cadence, @@ -68,6 +69,7 @@ FEATURES: tuple[FeatureFn, ...] = ( input_modality, paste_burst_rate, keystroke_cadence, + digraph_simhash, motor_stability, error_correction, command_chunking, diff --git a/decnet/profiler/behave_shell/_features/motor.py b/decnet/profiler/behave_shell/_features/motor.py index 5908769c..e9482e8c 100644 --- a/decnet/profiler/behave_shell/_features/motor.py +++ b/decnet/profiler/behave_shell/_features/motor.py @@ -15,13 +15,18 @@ from behave_core.spec.envelope import Observation from decnet.profiler.behave_shell._ctx import SessionContext from decnet.profiler.behave_shell._features._emit import make_observation +from decnet.util.simhash import simhash64 from decnet.profiler.behave_shell._thresholds import ( BACKSPACE_IMMEDIATE_MAX_S, CMD_CHUNKING_FLUENT_CV_MAX, CV_BURSTY_MAX, CV_MACHINE_MAX, CV_STEADY_MAX, + DIGRAPH_FLIGHT_BUCKETS_S, IKI_MACHINE_MAX_S, + IKI_THINK_MAX_S, + MIN_DIGRAPH_SAMPLES, + MIN_DIGRAPHS_FOR_SIMHASH, MIN_INPUTS_FOR_CADENCE, MODALITY_PASTED_MIN, MODALITY_TYPED_MAX, @@ -421,3 +426,64 @@ def pipe_chaining_depth(ctx: SessionContext) -> Iterator[Observation]: value=value, confidence=confidence, ) + + +def _flight_bucket(seconds: float) -> int: + """Quantize a digraph flight time into a coarse log bucket index. + + Returns 0..len(DIGRAPH_FLIGHT_BUCKETS_S); the coarseness is what lets + the same typist collide across sessions despite per-keystroke jitter. + """ + for i, edge in enumerate(DIGRAPH_FLIGHT_BUCKETS_S): + if seconds < edge: + return i + return len(DIGRAPH_FLIGHT_BUCKETS_S) + + +def digraph_simhash(ctx: SessionContext) -> Iterator[Observation]: + """Emit ``motor.digraph_simhash`` — a 64-bit LSH fingerprint of the + operator's per-digraph keystroke flight times. + + For each consecutive pair of single-char input keystrokes ``(c1, c2)`` + the flight time is the inter-event gap. Pastes / escape sequences + (multi-char events) and think-pauses (> ``IKI_THINK_MAX_S``) break the + chain so they don't pollute timing. Each digraph's *median* flight is + bucketed; ``(c1c2, bucket)`` tokens are SimHashed (weighted by sample + count) so the same typist lands Hamming-close across sessions, while a + faster/slower or different-vocabulary operator separates. + + Stays silent below ``MIN_DIGRAPHS_FOR_SIMHASH`` distinct pairs or + ``MIN_DIGRAPH_SAMPLES`` total samples — too little signal to fingerprint. + """ + flights: dict[str, list[float]] = {} + prev_t: float | None = None + prev_c: str | None = None + for t, _kind, data in ctx.input_events: + if len(data) != 1: + # paste / control / escape burst — not a single keystroke + prev_t = prev_c = None + continue + if prev_c is not None and prev_t is not None: + dt = t - prev_t + if 0.0 < dt <= IKI_THINK_MAX_S: + flights.setdefault(prev_c + data, []).append(dt) + prev_t, prev_c = t, data + + total = sum(len(v) for v in flights.values()) + if len(flights) < MIN_DIGRAPHS_FOR_SIMHASH or total < MIN_DIGRAPH_SAMPLES: + return + + tokens = { + f"{digraph}:{_flight_bucket(statistics.median(dts))}": len(dts) + for digraph, dts in flights.items() + } + # Confidence grows with the number of distinct digraphs (more pairs = + # more stable fingerprint), capped at 0.95 — never claim certainty on a + # biometric inferred from terminal timing. + confidence = min(0.95, 0.40 + 0.05 * len(flights)) + yield make_observation( + ctx, + primitive="motor.digraph_simhash", + value=format(simhash64(tokens), "016x"), + confidence=confidence, + ) diff --git a/decnet/profiler/behave_shell/_thresholds.py b/decnet/profiler/behave_shell/_thresholds.py index 42e10aa6..6dae24f6 100644 --- a/decnet/profiler/behave_shell/_thresholds.py +++ b/decnet/profiler/behave_shell/_thresholds.py @@ -292,6 +292,17 @@ CV_BURSTY_MAX: float = 1.50 # Need this many input events before we'll claim a cadence at all. MIN_INPUTS_FOR_CADENCE: int = 5 +# ── motor.digraph_simhash (keystroke-rhythm biometric) ────────────────────── +# A digraph is two consecutive single-char keystrokes; its flight time is the +# inter-event gap. We need enough distinct pairs AND enough samples before the +# SimHash is stable enough to fingerprint a typist — below this it's noise. +MIN_DIGRAPHS_FOR_SIMHASH: int = 8 +MIN_DIGRAPH_SAMPLES: int = 20 +# Median flight time per digraph is quantized into these log-spaced buckets +# (upper edges, seconds). Coarse on purpose: the same typist must land in the +# same bucket despite jitter, while a clearly faster/slower typist separates. +DIGRAPH_FLIGHT_BUCKETS_S: tuple[float, ...] = (0.03, 0.06, 0.12, 0.25, 0.5, 1.0) + # ── motor.motor_stability (Step B.2) ──────────────────────────────────────── # Tremor proxy: fraction of within-burst IATs below TREMOR_FAST_FLOOR_S # (30 ms — physiologically implausible double-press floor; humans can't diff --git a/decnet/util/__init__.py b/decnet/util/__init__.py new file mode 100644 index 00000000..06b63eb0 --- /dev/null +++ b/decnet/util/__init__.py @@ -0,0 +1,2 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""Small cross-cutting helpers with no domain home of their own.""" diff --git a/decnet/util/simhash.py b/decnet/util/simhash.py new file mode 100644 index 00000000..57d95d51 --- /dev/null +++ b/decnet/util/simhash.py @@ -0,0 +1,66 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""Charikar 64-bit SimHash + Hamming helpers. + +Locality-sensitive fingerprint: inputs that share most weighted tokens +produce hashes a few bits apart (small Hamming distance), so near- +duplicates cluster without storing the raw feature vector. Used by the +keystroke-digraph biometric (``decnet/profiler/.../motor.py``) and the +campaign clusterer's typing-similarity edge. + +ponytail: ``templates/smtp/server.py:_body_simhash`` is the same +algorithm, inlined to keep slim decky containers from importing decnet. +Left as-is to avoid pulling decnet into decky images; dedup here only if +a third caller appears. +""" +from __future__ import annotations + +import hashlib +from collections.abc import Mapping + +_BITS = 64 +_MASK = (1 << _BITS) - 1 + + +def simhash64(weighted_tokens: Mapping[str, int]) -> int: + """Charikar 64-bit SimHash over frequency-weighted tokens. + + Returns 0 on empty/all-zero-weight input — callers treat 0 as "no + signal". Per-token hash is md5[:8]: a content fingerprint, not a + security primitive. + """ + if not weighted_tokens: + return 0 + bits = [0] * _BITS + for tok, weight in weighted_tokens.items(): + if weight <= 0: + continue + h = int.from_bytes( + # Content fingerprint, not a security primitive — md5[:8] is fast + # and 64 bits is all we need; usedforsecurity=False clears B324. + hashlib.md5( + tok.encode("utf-8", errors="replace"), usedforsecurity=False, + ).digest()[:8], + "big", + ) + for i in range(_BITS): + bits[i] += weight if (h >> i) & 1 else -weight + out = 0 + for i in range(_BITS): + if bits[i] > 0: + out |= (1 << i) + return out + + +def hamming64(a: int, b: int) -> int: + """Number of differing bits between two 64-bit ints.""" + return ((a ^ b) & _MASK).bit_count() + + +def to_bytes8(value: int) -> bytes: + """64-bit int → 8 big-endian bytes (for ``BINARY(8)`` storage).""" + return (value & _MASK).to_bytes(8, "big") + + +def from_bytes8(raw: bytes) -> int: + """8 big-endian bytes → 64-bit int.""" + return int.from_bytes(raw, "big") diff --git a/development/BEHAVE-EXTRACTOR.md b/development/BEHAVE-EXTRACTOR.md index 91d8ef03..f4849eeb 100644 --- a/development/BEHAVE-EXTRACTOR.md +++ b/development/BEHAVE-EXTRACTOR.md @@ -1139,6 +1139,25 @@ own plan. --- +## Post-v0 addition — `motor.digraph_simhash` (38th Tier-A primitive) + +Added in behave-shell 0.1.2 (the v0 corpus above was 37). It is the +**keystroke-rhythm biometric**: a 64-bit Charikar SimHash of the +operator's per-digraph (two-key) flight times, bucketed per character +pair. Locality-sensitive — the same typist lands Hamming-close across +sessions and decoys, so it links one human behind multiple identities. + +- **Extractor:** `_features/motor.py:digraph_simhash`, `ValueKind.HASH`, + conditional (rides `MIN_DIGRAPHS_FOR_SIMHASH` / `MIN_DIGRAPH_SAMPLES` + floors; lives in `PHASE_G_CONDITIONAL_PRIMITIVES`). Live-typed input + only — pastes/escape bursts break the digraph chain. +- **Rollup:** the identity clusterer folds the session SimHashes into a + bitwise-majority centroid written to `AttackerIdentity.kd_digraph_simhash`; + the campaign clusterer adds a Hamming-proximity edge. STIX export + carries the centroid (hex). Tier-A count is now **38**. + +--- + **Owner:** ANTI. **Implementation gate:** Step 0 starts after this doc is reviewed + Phase 1 of `BEHAVE-INTEGRATION.md` lands (storage table exists). diff --git a/pyproject.toml b/pyproject.toml index 71217f0d..ad6f13c4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,7 +59,7 @@ dependencies = [ # bus event adapter consumed by decnet/profiler/behave_shell/. Pin # range tracks BEHAVE-INTEGRATION.md §"Versioning". "behave-core>=0.1.0,<0.2", - "behave-shell>=0.1.0,<0.2", + "behave-shell>=0.1.2,<0.2", # STIX → MISP conversion: CIRCL-maintained reference converter used by # MISP itself. Pulls pymisp transitively (needed for MISPEvent output). "misp-stix>=2026.4", diff --git a/tests/profiler/behave_shell/test_calibration_grid.py b/tests/profiler/behave_shell/test_calibration_grid.py index 294ddba1..16b75708 100644 --- a/tests/profiler/behave_shell/test_calibration_grid.py +++ b/tests/profiler/behave_shell/test_calibration_grid.py @@ -109,6 +109,7 @@ PHASE_G_CONDITIONAL_PRIMITIVES: frozenset[str] = frozenset({ "emotional_valence.arousal", # needs typing bursts "emotional_valence.valence", # needs ≥ 80 typed letters "emotional_valence.frustration_venting", # needs ≥ 30 typed letters + "motor.digraph_simhash", # needs ≥ 8 distinct digraphs / ≥ 20 samples }) # Backwards-compatible aliases for any external import — earlier phases diff --git a/tests/profiler/behave_shell/test_motor_digraph_simhash.py b/tests/profiler/behave_shell/test_motor_digraph_simhash.py new file mode 100644 index 00000000..4b98cb60 --- /dev/null +++ b/tests/profiler/behave_shell/test_motor_digraph_simhash.py @@ -0,0 +1,69 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""``motor.digraph_simhash`` — keystroke-rhythm biometric. + +Builds typed input streams (single-char ``"i"`` events at a fixed +inter-key gap) and asserts the LSH property: same typist → Hamming-close, +different cadence → far apart, pastes excluded, thin sessions silent. +""" +from __future__ import annotations + +from decnet.profiler.behave_shell import extract_session +from decnet.profiler.behave_shell._parse import AsciinemaEvent +from decnet.util.simhash import from_bytes8, hamming64 + +# A realistic multi-command session: plenty of distinct digraphs, > 20 samples. +_PHRASE = "ls -la /etc; cat /etc/passwd; whoami; uname -a; netstat -tlnp\r" + + +def _typed(phrase: str, dt: float, *, start: float = 0.0) -> list[AsciinemaEvent]: + events: list[AsciinemaEvent] = [] + t = start + for ch in phrase: + events.append((t, "i", ch)) + t += dt + return events + + +def _digraph_obs(events: list[AsciinemaEvent], sid: str): + out = list(extract_session(events, sid=sid)) + obs = [o for o in out if o.primitive == "motor.digraph_simhash"] + return obs + + +def _hash_int(obs) -> int: + return from_bytes8(bytes.fromhex(obs.value)) + + +def test_emits_one_observation_for_a_normal_session() -> None: + obs = _digraph_obs(_typed(_PHRASE, 0.12), "dg-basic") + assert len(obs) == 1 + assert len(obs[0].value) == 16 # 64-bit hex + assert 0.0 < obs[0].confidence <= 0.95 + + +def test_same_typist_identical_timing_is_identical() -> None: + a = _digraph_obs(_typed(_PHRASE, 0.12), "dg-a")[0] + b = _digraph_obs(_typed(_PHRASE, 0.12), "dg-b")[0] + # Identical text + timing → identical fingerprint (0 Hamming). + assert hamming64(_hash_int(a), _hash_int(b)) == 0 + + +def test_different_cadence_separates() -> None: + fast = _digraph_obs(_typed(_PHRASE, 0.05), "dg-fast")[0] + slow = _digraph_obs(_typed(_PHRASE, 0.45), "dg-slow")[0] + # Same vocabulary, very different flight-time buckets → the hashes diverge. + assert hamming64(_hash_int(fast), _hash_int(slow)) > 0 + + +def test_pastes_do_not_form_digraphs() -> None: + # A session made of large paste events (len >= 4) carries no single-char + # keystrokes, so no digraphs and no observation. + events: list[AsciinemaEvent] = [ + (float(i), "i", "sudo apt-get update") for i in range(10) + ] + assert _digraph_obs(events, "dg-paste") == [] + + +def test_thin_session_is_silent() -> None: + # Below MIN_DIGRAPHS_FOR_SIMHASH / MIN_DIGRAPH_SAMPLES → no emission. + assert _digraph_obs(_typed("ls\r", 0.1), "dg-thin") == [] diff --git a/tests/profiler/behave_shell/test_registry_coverage.py b/tests/profiler/behave_shell/test_registry_coverage.py index 9a8a4d93..6b17fe18 100644 --- a/tests/profiler/behave_shell/test_registry_coverage.py +++ b/tests/profiler/behave_shell/test_registry_coverage.py @@ -91,10 +91,14 @@ def test_no_extractor_set_drifts_from_registry() -> None: ) -def test_tier_a_count_is_37() -> None: - """Sanity check: Tier-A count matches the design doc (37 primitives).""" - assert len(_tier_a_primitives()) == 37, ( - f"Expected 37 Tier-A primitives per BEHAVE-EXTRACTOR.md; " +def test_tier_a_count_is_38() -> None: + """Sanity check: Tier-A count matches the design doc. + + 38 since behave-shell 0.1.2 added ``motor.digraph_simhash`` (the + keystroke-rhythm biometric); was 37. + """ + assert len(_tier_a_primitives()) == 38, ( + f"Expected 38 Tier-A primitives per BEHAVE-EXTRACTOR.md; " f"got {len(_tier_a_primitives())}. Update Phase H if the " f"spec genuinely changed, or adjust TIER_B_ALLOWLIST." ) diff --git a/tests/util/__init__.py b/tests/util/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/util/test_simhash.py b/tests/util/test_simhash.py new file mode 100644 index 00000000..52a50e07 --- /dev/null +++ b/tests/util/test_simhash.py @@ -0,0 +1,38 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""Charikar SimHash util — determinism, LSH property, byte round-trip.""" +from __future__ import annotations + +from decnet.util.simhash import from_bytes8, hamming64, simhash64, to_bytes8 + + +def test_empty_or_zero_weight_is_zero() -> None: + assert simhash64({}) == 0 + assert simhash64({"a": 0, "b": -3}) == 0 # non-positive weights skipped + + +def test_deterministic() -> None: + tokens = {"th": 3, "he": 2, "er": 1} + assert simhash64(tokens) == simhash64(dict(tokens)) + + +def test_near_duplicate_low_hamming() -> None: + base = {f"dg{i}": (i % 5) + 1 for i in range(40)} + identical = dict(base) + perturbed = dict(base) + perturbed["NEW_PAIR"] = 1 # one extra low-weight token + assert hamming64(simhash64(base), simhash64(identical)) == 0 + assert hamming64(simhash64(base), simhash64(perturbed)) <= 8 + + +def test_disjoint_high_hamming() -> None: + a = {f"a{i}": 2 for i in range(30)} + b = {f"b{i}": 2 for i in range(30)} + # Unrelated token sets ≈ half the 64 bits differ; comfortably ≥ 20. + assert hamming64(simhash64(a), simhash64(b)) >= 20 + + +def test_bytes_roundtrip_is_8_bytes() -> None: + h = simhash64({"x": 1, "y": 2, "z": 5}) + raw = to_bytes8(h) + assert isinstance(raw, bytes) and len(raw) == 8 + assert from_bytes8(raw) == h