diff --git a/decnet/profiler/behave_shell/_ctx.py b/decnet/profiler/behave_shell/_ctx.py index bb77bc56..cd0b7913 100644 --- a/decnet/profiler/behave_shell/_ctx.py +++ b/decnet/profiler/behave_shell/_ctx.py @@ -4,16 +4,20 @@ A naïve engine re-walks the event stream once per primitive. We don't do that — one walk over the events builds this context, every feature reads from it. Adding a new feature is O(1) cost on the parse side. -Step 0 ships only the structural fields (sid / source / evidence_ref / -timing envelope). Step 1+ fills ``iats`` / ``paste_bursts`` / -``commands`` / ``inter_cmd_iats`` / ``output_per_cmd``. +Step 1 fills ``iats`` (inter-key intervals between input events) and +``paste_bursts`` (contiguous runs of paste-class events). Step 4 +will fill ``commands`` / ``inter_cmd_iats`` / ``output_per_cmd``. """ from __future__ import annotations from dataclasses import dataclass, field from typing import Iterable -from decnet.profiler.behave_shell._parse import AsciinemaEvent +from decnet.profiler.behave_shell._parse import AsciinemaEvent, PasteBurst +from decnet.profiler.behave_shell._thresholds import ( + PASTE_BURST_MAX_IAT_S, + PASTE_MIN_CHARS_PER_EVENT, +) @dataclass(frozen=True, slots=True) @@ -28,6 +32,65 @@ class SessionContext: input_events: tuple[AsciinemaEvent, ...] = field(default_factory=tuple) output_events: tuple[AsciinemaEvent, ...] = field(default_factory=tuple) + # Step 1 derivations + iats: tuple[float, ...] = field(default_factory=tuple) + paste_bursts: tuple[PasteBurst, ...] = field(default_factory=tuple) + paste_event_count: int = 0 + + +def _detect_paste_bursts( + inputs: list[AsciinemaEvent], +) -> tuple[tuple[PasteBurst, ...], int]: + """Group consecutive paste-class input events into PasteBursts. + + A paste-class event is one with ``len(data) >= PASTE_MIN_CHARS_PER_EVENT``. + Two adjacent paste-class events collapse into the same burst when + their IAT is within ``PASTE_BURST_MAX_IAT_S``; otherwise a new + burst opens. Returns the bursts and the total count of paste-class + events (the same number ``BEHAVE`` prototype calls ``paste_events``). + """ + bursts: list[PasteBurst] = [] + paste_count = 0 + + cur_start: float | None = None + cur_end: float = 0.0 + cur_chars: int = 0 + cur_events: int = 0 + last_t: float | None = None + + def _close() -> None: + nonlocal cur_start, cur_end, cur_chars, cur_events + if cur_start is not None and cur_events > 0: + bursts.append(PasteBurst( + start_ts=cur_start, + end_ts=cur_end, + char_count=cur_chars, + event_count=cur_events, + )) + cur_start = None + cur_end = 0.0 + cur_chars = 0 + cur_events = 0 + + for t, _kind, data in inputs: + is_paste = len(data) >= PASTE_MIN_CHARS_PER_EVENT + if is_paste: + paste_count += 1 + if cur_start is None or ( + last_t is not None and (t - last_t) > PASTE_BURST_MAX_IAT_S + ): + _close() + cur_start = t + cur_end = t + cur_chars += len(data) + cur_events += 1 + else: + _close() + last_t = t + + _close() + return tuple(bursts), paste_count + def build_session_context( events: Iterable[AsciinemaEvent], @@ -36,13 +99,7 @@ def build_session_context( source: str, evidence_ref: str | None = None, ) -> SessionContext: - """Single-pass build of the SessionContext for ``events``. - - ``evidence_ref`` defaults to ``"session:" + sid`` so callers that - don't yet plumb a real evidence pointer still get a stable, - BEHAVE-envelope-valid string. Workers should pass an explicit - pointer to the on-disk shard. - """ + """Single-pass build of the SessionContext for ``events``.""" inputs: list[AsciinemaEvent] = [] outputs: list[AsciinemaEvent] = [] t_first: float | None = None @@ -66,6 +123,11 @@ def build_session_context( t_start = t_first t_end = t_last + iats: tuple[float, ...] = tuple( + max(0.0, inputs[i][0] - inputs[i - 1][0]) for i in range(1, len(inputs)) + ) + paste_bursts, paste_count = _detect_paste_bursts(inputs) + return SessionContext( sid=sid, source=source, @@ -75,4 +137,7 @@ def build_session_context( duration_s=max(0.0, t_end - t_start), input_events=tuple(inputs), output_events=tuple(outputs), + iats=iats, + paste_bursts=paste_bursts, + paste_event_count=paste_count, ) diff --git a/decnet/profiler/behave_shell/_parse.py b/decnet/profiler/behave_shell/_parse.py index 831f4204..c8178273 100644 --- a/decnet/profiler/behave_shell/_parse.py +++ b/decnet/profiler/behave_shell/_parse.py @@ -1,14 +1,76 @@ -"""Asciinema event types. +"""Asciinema event types + shard-line parsing helpers. -The on-disk shard format is a list of 3-tuples ``(t, kind, data)`` where -``t`` is seconds since session start (float), ``kind`` is ``'i'`` (input) -or ``'o'`` (output), and ``data`` is the captured bytes decoded as a -Python ``str``. Step 0 ships only the type aliases — Step 1 fills the -parsing helpers and paste-burst detector. +Shard lines are JSON objects ``{"sid": ..., "t": float, "ch": "i"|"o", +"d": str}`` produced by the DECNET PTY-recording wrapper and held in +sensor-side blob storage. The first line of each file is a header +(``{"sid": ..., "hdr": {...}}``) which carries no event payload — the +parser skips it. + +The on-wire engine input is the simpler 3-tuple ``(t, kind, data)`` +:data:`AsciinemaEvent`. Workers (``BEHAVE-INTEGRATION.md`` Phase 4) +either feed the 3-tuple directly or use :func:`parse_shard_line` to +turn a raw JSON string into one. """ from __future__ import annotations -from typing import Literal, Tuple +import json +from dataclasses import dataclass +from typing import Iterable, Iterator, Literal, Tuple EventKind = Literal["i", "o"] AsciinemaEvent = Tuple[float, EventKind, str] + + +@dataclass(frozen=True, slots=True) +class PasteBurst: + """Contiguous run of paste-class input events. + + A paste-class event is a single input event whose ``data`` length + is at least ``PASTE_MIN_CHARS_PER_EVENT`` — terminal pastes from + xterm/kitty/iTerm arrive as one bulk write, so checking event size + is the cheap-and-correct proxy for the bracketed-paste signal we + don't get to see. + + Multiple consecutive paste-class events with low IATs collapse + into one ``PasteBurst`` for higher-level reasoning (paste-rate / + paste-style classification later). + """ + + start_ts: float + end_ts: float + char_count: int + event_count: int + + +def parse_shard_line(line: str) -> AsciinemaEvent | None: + """Turn one shard JSONL line into an :data:`AsciinemaEvent`. + + Returns ``None`` for the header line and for any line that is not + a well-formed event record. Workers must filter ``None``s out + before passing to :func:`extract_session`. + """ + line = line.strip() + if not line: + return None + try: + rec = json.loads(line) + except (json.JSONDecodeError, ValueError): + return None + if not isinstance(rec, dict): + return None + if "hdr" in rec or "t" not in rec or "ch" not in rec: + return None + t = rec.get("t") + ch = rec.get("ch") + d = rec.get("d", "") + if not isinstance(t, (int, float)) or ch not in ("i", "o") or not isinstance(d, str): + return None + return (float(t), ch, d) + + +def parse_shard(lines: Iterable[str]) -> Iterator[AsciinemaEvent]: + """Stream-parse a shard file's lines into events, skipping junk.""" + for line in lines: + ev = parse_shard_line(line) + if ev is not None: + yield ev diff --git a/decnet/profiler/behave_shell/_thresholds.py b/decnet/profiler/behave_shell/_thresholds.py index c857ad0e..843373c1 100644 --- a/decnet/profiler/behave_shell/_thresholds.py +++ b/decnet/profiler/behave_shell/_thresholds.py @@ -1,11 +1,37 @@ """Numeric thresholds for BEHAVE-SHELL primitive classification. -Each constant added here cites its calibration source. When the -registry's ``notes:`` field disagrees with a constant in this file the -registry is authoritative — fix the constant and re-run the -calibration grid. +Each constant cites its calibration source. When the registry's +``notes:`` field disagrees with a constant here, the registry is +authoritative — fix the constant, re-run the calibration grid. -Step 0 ships this file empty by design; thresholds land alongside the -feature functions that consume them (Steps 1+). +Empirical thresholds inherited from the BEHAVE prototype extractor +(``BEHAVE/prototype_extractors/shell/extract.py``); see lines 40-90 of +that file for the calibration history. Any change here must keep the +five-class grid green. """ from __future__ import annotations + +# ── paste-burst detection (Step 1) ────────────────────────────────────────── +# A single input event with ≥ PASTE_MIN_CHARS_PER_EVENT chars is the +# paste-class proxy used by the prototype; xterm-kitty / iTerm / VS Code +# pastes arrive as one bulk write. +PASTE_MIN_CHARS_PER_EVENT: int = 4 + +# Consecutive paste-class events arriving within this IAT collapse into +# one PasteBurst record. 200ms is the prototype's IKI burst cap. +PASTE_BURST_MAX_IAT_S: float = 0.20 + +# ── motor.input_modality (Step 2) ─────────────────────────────────────────── +# Paste-event ratio thresholds. ≥ 40% paste events → "pasted" (LLM-driven); +# ≤ 5% → "typed" (human at the keyboard); in between → "mixed". +# Lowered from 0.5 after the 47.6% case in sessions-2026-05-02-with-llm.jsonl +# was clearly LLM-driven but missed the 0.5 floor. +MODALITY_PASTED_MIN: float = 0.40 +MODALITY_TYPED_MAX: float = 0.05 + +# ── motor.paste_burst_rate (Step 3) ───────────────────────────────────────── +# Same paste-event ratio re-bucketed for the "how often does the operator +# paste" axis. Coarser than input_modality on purpose: this primitive is the +# habit signal, input_modality is the dominant-channel signal. +PASTE_RATE_HABITUAL_MIN: float = 0.50 +PASTE_RATE_OCCASIONAL_MIN: float = 0.10 diff --git a/tests/profiler/behave_shell/test_parse_and_bursts.py b/tests/profiler/behave_shell/test_parse_and_bursts.py new file mode 100644 index 00000000..56467fb0 --- /dev/null +++ b/tests/profiler/behave_shell/test_parse_and_bursts.py @@ -0,0 +1,121 @@ +"""Step 1: shard parsing + paste-burst detection. + +Synthetic streams covering pure-typed, pure-pasted, and mixed input, +plus the JSONL shard line parser and its junk handling. +""" +from __future__ import annotations + +import json + +from decnet.profiler.behave_shell._ctx import build_session_context +from decnet.profiler.behave_shell._parse import ( + PasteBurst, + parse_shard, + parse_shard_line, +) + + +# ── parse_shard_line ─────────────────────────────────────────────────────── + +def test_parse_shard_line_event() -> None: + line = json.dumps({"sid": "x", "t": 1.5, "ch": "i", "d": "ls"}) + assert parse_shard_line(line) == (1.5, "i", "ls") + + +def test_parse_shard_line_skips_header() -> None: + line = json.dumps({"sid": "x", "hdr": {"version": 2}}) + assert parse_shard_line(line) is None + + +def test_parse_shard_line_handles_blank_and_garbage() -> None: + assert parse_shard_line("") is None + assert parse_shard_line(" ") is None + assert parse_shard_line("not json") is None + assert parse_shard_line('{"t": "string-not-number", "ch": "i", "d": "x"}') is None + assert parse_shard_line('{"t": 1.0, "ch": "z", "d": "x"}') is None # bad kind + assert parse_shard_line('"just a string"') is None + + +def test_parse_shard_skips_junk_in_stream() -> None: + lines = [ + json.dumps({"sid": "x", "hdr": {"version": 2}}), + "", + "garbage", + json.dumps({"sid": "x", "t": 0.1, "ch": "i", "d": "a"}), + json.dumps({"sid": "x", "t": 0.2, "ch": "o", "d": "a\r\n"}), + ] + out = list(parse_shard(lines)) + assert out == [(0.1, "i", "a"), (0.2, "o", "a\r\n")] + + +# ── paste-burst detection ────────────────────────────────────────────────── + +def test_pure_typed_stream_has_no_paste_bursts() -> None: + # Single-char input events spaced by 0.1s: pure typing. + events = [(i * 0.1, "i", c) for i, c in enumerate("hello world\r")] + ctx = build_session_context(events, sid="t-typed", source="test") + assert ctx.paste_bursts == () + assert ctx.paste_event_count == 0 + # IATs computed correctly + assert len(ctx.iats) == len(events) - 1 + for iat in ctx.iats: + assert abs(iat - 0.1) < 1e-9 + + +def test_single_paste_event_one_burst() -> None: + events = [ + (0.0, "i", "echo hello world\r"), # 17 chars — paste class + ] + ctx = build_session_context(events, sid="t-paste-1", source="test") + assert len(ctx.paste_bursts) == 1 + assert ctx.paste_event_count == 1 + burst = ctx.paste_bursts[0] + assert burst.start_ts == 0.0 + assert burst.end_ts == 0.0 + assert burst.char_count == 17 + assert burst.event_count == 1 + + +def test_two_close_paste_events_collapse_into_one_burst() -> None: + events = [ + (0.0, "i", "first paste line\r"), + (0.05, "i", "second paste line\r"), # within PASTE_BURST_MAX_IAT_S + ] + ctx = build_session_context(events, sid="t-paste-2", source="test") + assert len(ctx.paste_bursts) == 1 + assert ctx.paste_event_count == 2 + assert ctx.paste_bursts[0].event_count == 2 + assert ctx.paste_bursts[0].char_count == 17 + 18 + + +def test_two_far_paste_events_split_into_two_bursts() -> None: + events = [ + (0.0, "i", "first paste\r"), + (5.0, "i", "second paste\r"), # well past the IAT cap + ] + ctx = build_session_context(events, sid="t-paste-far", source="test") + assert len(ctx.paste_bursts) == 2 + assert ctx.paste_event_count == 2 + + +def test_typing_between_pastes_breaks_the_burst() -> None: + events: list = [ + (0.0, "i", "long pasted line\r"), + (0.5, "i", "x"), # single typed char interrupts + (0.6, "i", "another pasted line\r"), + ] + ctx = build_session_context(events, sid="t-mixed", source="test") + assert len(ctx.paste_bursts) == 2 + assert ctx.paste_event_count == 2 + # The "x" event is not a paste-class event + assert ctx.input_events[1][2] == "x" + + +def test_paste_burst_record_is_immutable() -> None: + b = PasteBurst(start_ts=0.0, end_ts=1.0, char_count=10, event_count=1) + try: + b.char_count = 99 # type: ignore[misc] + except Exception: + pass + else: + raise AssertionError("PasteBurst should be frozen")