From d40495d71bc046f12ca0ea199078b93a04e65abc Mon Sep 17 00:00:00 2001 From: anti Date: Mon, 4 May 2026 00:13:45 -0400 Subject: [PATCH] feat(profiler/behave_shell): emit temporal.escalation_pattern Bin commands into non-overlapping windows of width max(ESCALATION_WINDOW_MIN_S, duration_s / ESCALATION_WINDOW_TARGET). CV of per-window counts + zero-window fraction classify bursty / sustained / erratic. v0.1; corpus re-tune deferred. --- .../behave_shell/_features/__init__.py | 2 + .../behave_shell/_features/temporal.py | 65 ++++++++++++++++ decnet/profiler/behave_shell/_thresholds.py | 26 +++++++ .../test_temporal_escalation_pattern.py | 74 +++++++++++++++++++ 4 files changed, 167 insertions(+) create mode 100644 tests/profiler/behave_shell/test_temporal_escalation_pattern.py diff --git a/decnet/profiler/behave_shell/_features/__init__.py b/decnet/profiler/behave_shell/_features/__init__.py index 565a361c..10dda731 100644 --- a/decnet/profiler/behave_shell/_features/__init__.py +++ b/decnet/profiler/behave_shell/_features/__init__.py @@ -25,6 +25,7 @@ from decnet.profiler.behave_shell._features.cognitive import ( inter_command_latency_class, ) from decnet.profiler.behave_shell._features.temporal import ( + escalation_pattern, session_duration, ) from decnet.profiler.behave_shell._features.motor import ( @@ -63,4 +64,5 @@ FEATURES: tuple[FeatureFn, ...] = ( error_resilience_frustration_typing, error_resilience_fallback_to_man, session_duration, + escalation_pattern, ) diff --git a/decnet/profiler/behave_shell/_features/temporal.py b/decnet/profiler/behave_shell/_features/temporal.py index 81e0b14d..9b72fbb9 100644 --- a/decnet/profiler/behave_shell/_features/temporal.py +++ b/decnet/profiler/behave_shell/_features/temporal.py @@ -6,9 +6,12 @@ observation history. The other three (``session_timing``, and computed by the attribution engine, not the extractor. Step E.1: ``temporal.session_duration``. +Step E.2: ``temporal.escalation_pattern``. """ from __future__ import annotations +import math +import statistics from typing import Iterator from decnet_behave_core.spec.envelope import Observation @@ -16,6 +19,13 @@ from decnet_behave_core.spec.envelope import Observation from decnet.profiler.behave_shell._ctx import SessionContext from decnet.profiler.behave_shell._features._emit import make_observation from decnet.profiler.behave_shell._thresholds import ( + ESCALATION_BURSTY_CV, + ESCALATION_BURSTY_ZERO_FRAC, + ESCALATION_MIN_COMMANDS, + ESCALATION_MIN_WINDOWS, + ESCALATION_SUSTAINED_CV, + ESCALATION_WINDOW_MIN_S, + ESCALATION_WINDOW_TARGET, SESSION_DURATION_LONG_MAX, SESSION_DURATION_MEDIUM_MAX, SESSION_DURATION_SHORT_MAX, @@ -48,3 +58,58 @@ def session_duration(ctx: SessionContext) -> Iterator[Observation]: value=value, confidence=0.85, ) + + +def escalation_pattern(ctx: SessionContext) -> Iterator[Observation]: + """Emit ``temporal.escalation_pattern`` ∈ {sustained, erratic, bursty}. + + Bin commands into non-overlapping windows of width + ``max(ESCALATION_WINDOW_MIN_S, duration_s / ESCALATION_WINDOW_TARGET)``. + Compute the CV of per-window command counts and the fraction of + zero-count windows. + + * **bursty** — significant silence (zero_frac ≥ threshold) AND + high dispersion (CV ≥ threshold). Real spikes against a quiet + background. + * **sustained** — low dispersion (CV < threshold). Steady cadence. + * **erratic** — fall-through. Variable but no clear silence + pattern. + + Skip emission when the session is too short to bin meaningfully + (no commands, or duration too small to produce any window). + """ + n_cmds = len(ctx.commands) + if n_cmds == 0 or ctx.duration_s <= 0.0: + return + width = max(ESCALATION_WINDOW_MIN_S, ctx.duration_s / ESCALATION_WINDOW_TARGET) + n_windows = max(1, math.ceil(ctx.duration_s / width)) + counts = [0] * n_windows + for cmd in ctx.commands: + offset = cmd.start_ts - ctx.t_start + idx = min(n_windows - 1, max(0, int(offset / width))) + counts[idx] += 1 + + mean = statistics.fmean(counts) + if mean <= 0.0 or len(counts) < 2: + cv = 0.0 + else: + cv = statistics.stdev(counts) / mean + zero_frac = sum(1 for c in counts if c == 0) / len(counts) + + if zero_frac >= ESCALATION_BURSTY_ZERO_FRAC and cv >= ESCALATION_BURSTY_CV: + value = "bursty" + elif cv < ESCALATION_SUSTAINED_CV: + value = "sustained" + else: + value = "erratic" + + if n_windows < ESCALATION_MIN_WINDOWS or n_cmds < ESCALATION_MIN_COMMANDS: + confidence = 0.40 + else: + confidence = 0.60 + yield make_observation( + ctx, + primitive="temporal.escalation_pattern", + value=value, + confidence=confidence, + ) diff --git a/decnet/profiler/behave_shell/_thresholds.py b/decnet/profiler/behave_shell/_thresholds.py index b6d1536f..b6b17e25 100644 --- a/decnet/profiler/behave_shell/_thresholds.py +++ b/decnet/profiler/behave_shell/_thresholds.py @@ -184,6 +184,32 @@ SESSION_DURATION_SHORT_MAX: float = 60.0 SESSION_DURATION_MEDIUM_MAX: float = 600.0 SESSION_DURATION_LONG_MAX: float = 3600.0 +# ── temporal.escalation_pattern (Step E.2) ───────────────────────────────── +# Bin commands into non-overlapping windows. Width is dynamic: +# +# width = max(ESCALATION_WINDOW_MIN_S, duration_s / ESCALATION_WINDOW_TARGET) +# +# so a 30s session uses 10s windows (3 windows) and a 1h session uses +# 6min windows (10 windows). CV of per-window counts + zero-window +# fraction classify: +# +# zero_frac >= ESCALATION_BURSTY_ZERO_FRAC AND CV >= ESCALATION_BURSTY_CV +# → bursty (silences then spikes) +# CV < ESCALATION_SUSTAINED_CV +# → sustained (steady cadence throughout) +# else +# → erratic (variable but no real silence pattern) +# +# v0.1; corpus re-tune deferred. Sample-size honesty caps confidence +# below ESCALATION_MIN_WINDOWS or ESCALATION_MIN_COMMANDS. +ESCALATION_WINDOW_MIN_S: float = 10.0 +ESCALATION_WINDOW_TARGET: int = 10 +ESCALATION_BURSTY_ZERO_FRAC: float = 0.30 +ESCALATION_BURSTY_CV: float = 1.00 +ESCALATION_SUSTAINED_CV: float = 0.50 +ESCALATION_MIN_WINDOWS: int = 5 +ESCALATION_MIN_COMMANDS: int = 5 + # ── motor.keystroke_cadence (Step B.1) ────────────────────────────────────── # Typing bursts split at gaps > IKI_THINK_MAX_S so think-pauses between # commands don't inflate the within-burst CV. Mirrors the prototype's diff --git a/tests/profiler/behave_shell/test_temporal_escalation_pattern.py b/tests/profiler/behave_shell/test_temporal_escalation_pattern.py new file mode 100644 index 00000000..343720a2 --- /dev/null +++ b/tests/profiler/behave_shell/test_temporal_escalation_pattern.py @@ -0,0 +1,74 @@ +"""Step E.2: ``temporal.escalation_pattern``.""" +from __future__ import annotations + +from decnet.profiler.behave_shell import extract_session +from decnet.profiler.behave_shell._parse import AsciinemaEvent + + +PRIMITIVE = "temporal.escalation_pattern" + + +def _of(observations: list, primitive: str): + obs = [o for o in observations if o.primitive == primitive] + assert len(obs) == 1, f"expected exactly one {primitive}, got {len(obs)}" + return obs[0] + + +def _commands_at(starts: list[float]) -> list[AsciinemaEvent]: + events: list[AsciinemaEvent] = [] + for s in starts: + events.append((s, "i", "x\r")) + return events + + +def test_no_commands_no_emission() -> None: + out = list(extract_session([(0.0, "i", "a"), (10.0, "i", "b")], sid="esc-empty")) + assert [o for o in out if o.primitive == PRIMITIVE] == [] + + +def test_uniform_pace_emits_sustained() -> None: + """Even spacing across a long session → low CV → sustained.""" + starts = [i * 12.0 for i in range(15)] # 15 cmds over 168s, 10 windows + out = list(extract_session(_commands_at(starts), sid="esc-sus")) + obs = _of(out, PRIMITIVE) + assert obs.value == "sustained" + + +def test_silent_periods_with_spikes_emit_bursty() -> None: + """Five tight bursts at session start, long silence, five at end.""" + starts = [0.0, 0.5, 1.0, 1.5, 2.0, # spike 1 + 200.0, 200.5, 201.0, 201.5, 202.0] # spike 2 after silence + out = list(extract_session(_commands_at(starts), sid="esc-burst")) + obs = _of(out, PRIMITIVE) + assert obs.value == "bursty" + + +def test_variable_no_silence_emits_erratic() -> None: + """Variable rate but every window populated → CV in (0.5, 1.0), zero_frac=0 → erratic.""" + # Last event at 120s so width = 12.0, n_windows = 10, bins [0,12), ..., [108,120). + # Each window populated; counts skewed enough to push CV above 0.5 but + # zero_frac stays at 0 so it can't qualify as bursty. + starts = [ + 0.0, # window 0 [0,12): 1 + 13.0, 15.0, # window 1 [12,24): 2 + 25.0, # window 2 [24,36): 1 + 37.0, 38.0, 39.0, 40.0, 41.0, # window 3 [36,48): 5 + 50.0, # window 4 [48,60): 1 + 62.0, 64.0, # window 5 [60,72): 2 + 73.0, # window 6 [72,84): 1 + 86.0, 87.0, 88.0, 89.0, 90.0, # window 7 [84,96): 5 + 100.0, # window 8 [96,108): 1 + 115.0, 120.0, # window 9 [108,120]: 2 + ] + out = list(extract_session(_commands_at(starts), sid="esc-err")) + obs = _of(out, PRIMITIVE) + assert obs.value == "erratic" + + +def test_short_session_low_confidence() -> None: + """Below the sample-size floor — confidence drops.""" + short = list(extract_session(_commands_at([0.0, 1.0, 2.0]), sid="esc-short")) + full = list(extract_session(_commands_at([i * 12.0 for i in range(15)]), sid="esc-full")) + s = _of(short, PRIMITIVE) + f = _of(full, PRIMITIVE) + assert s.confidence < f.confidence