feat(profiler/behave_shell): asciinema parser + paste-burst detection

BEHAVE-EXTRACTOR.md Phase A Step 1. Lays the shared primitives that
Steps 2-3 (motor.input_modality, motor.paste_burst_rate) will
consume:

* parse_shard_line / parse_shard turn a shard JSONL line/file into
  AsciinemaEvents, skipping headers and malformed records.
* PasteBurst dataclass + _detect_paste_bursts group consecutive
  paste-class input events (len(d) >= 4 chars per the prototype's
  empirical floor) into contiguous bursts, splitting on IAT gaps
  larger than PASTE_BURST_MAX_IAT_S (200ms).
* SessionContext now carries iats and paste_bursts derivations.
* Threshold constants harvested from
  BEHAVE/prototype_extractors/shell/extract.py — calibrated against
  the five 2026-05-02 shards.

Tests cover pure-typed, pure-pasted, mixed streams; close vs far
paste events; typed events breaking a burst; PasteBurst immutability;
and the JSON parser's junk handling.
This commit is contained in:
2026-05-03 07:46:01 -04:00
parent f8eae04e5d
commit c9a81a23c2
4 changed files with 298 additions and 24 deletions

View File

@@ -4,16 +4,20 @@ A naïve engine re-walks the event stream once per primitive. We don't
do that — one walk over the events builds this context, every feature
reads from it. Adding a new feature is O(1) cost on the parse side.
Step 0 ships only the structural fields (sid / source / evidence_ref /
timing envelope). Step 1+ fills ``iats`` / ``paste_bursts`` /
``commands`` / ``inter_cmd_iats`` / ``output_per_cmd``.
Step 1 fills ``iats`` (inter-key intervals between input events) and
``paste_bursts`` (contiguous runs of paste-class events). Step 4
will fill ``commands`` / ``inter_cmd_iats`` / ``output_per_cmd``.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Iterable
from decnet.profiler.behave_shell._parse import AsciinemaEvent
from decnet.profiler.behave_shell._parse import AsciinemaEvent, PasteBurst
from decnet.profiler.behave_shell._thresholds import (
PASTE_BURST_MAX_IAT_S,
PASTE_MIN_CHARS_PER_EVENT,
)
@dataclass(frozen=True, slots=True)
@@ -28,6 +32,65 @@ class SessionContext:
input_events: tuple[AsciinemaEvent, ...] = field(default_factory=tuple)
output_events: tuple[AsciinemaEvent, ...] = field(default_factory=tuple)
# Step 1 derivations
iats: tuple[float, ...] = field(default_factory=tuple)
paste_bursts: tuple[PasteBurst, ...] = field(default_factory=tuple)
paste_event_count: int = 0
def _detect_paste_bursts(
inputs: list[AsciinemaEvent],
) -> tuple[tuple[PasteBurst, ...], int]:
"""Group consecutive paste-class input events into PasteBursts.
A paste-class event is one with ``len(data) >= PASTE_MIN_CHARS_PER_EVENT``.
Two adjacent paste-class events collapse into the same burst when
their IAT is within ``PASTE_BURST_MAX_IAT_S``; otherwise a new
burst opens. Returns the bursts and the total count of paste-class
events (the same number ``BEHAVE`` prototype calls ``paste_events``).
"""
bursts: list[PasteBurst] = []
paste_count = 0
cur_start: float | None = None
cur_end: float = 0.0
cur_chars: int = 0
cur_events: int = 0
last_t: float | None = None
def _close() -> None:
nonlocal cur_start, cur_end, cur_chars, cur_events
if cur_start is not None and cur_events > 0:
bursts.append(PasteBurst(
start_ts=cur_start,
end_ts=cur_end,
char_count=cur_chars,
event_count=cur_events,
))
cur_start = None
cur_end = 0.0
cur_chars = 0
cur_events = 0
for t, _kind, data in inputs:
is_paste = len(data) >= PASTE_MIN_CHARS_PER_EVENT
if is_paste:
paste_count += 1
if cur_start is None or (
last_t is not None and (t - last_t) > PASTE_BURST_MAX_IAT_S
):
_close()
cur_start = t
cur_end = t
cur_chars += len(data)
cur_events += 1
else:
_close()
last_t = t
_close()
return tuple(bursts), paste_count
def build_session_context(
events: Iterable[AsciinemaEvent],
@@ -36,13 +99,7 @@ def build_session_context(
source: str,
evidence_ref: str | None = None,
) -> SessionContext:
"""Single-pass build of the SessionContext for ``events``.
``evidence_ref`` defaults to ``"session:" + sid`` so callers that
don't yet plumb a real evidence pointer still get a stable,
BEHAVE-envelope-valid string. Workers should pass an explicit
pointer to the on-disk shard.
"""
"""Single-pass build of the SessionContext for ``events``."""
inputs: list[AsciinemaEvent] = []
outputs: list[AsciinemaEvent] = []
t_first: float | None = None
@@ -66,6 +123,11 @@ def build_session_context(
t_start = t_first
t_end = t_last
iats: tuple[float, ...] = tuple(
max(0.0, inputs[i][0] - inputs[i - 1][0]) for i in range(1, len(inputs))
)
paste_bursts, paste_count = _detect_paste_bursts(inputs)
return SessionContext(
sid=sid,
source=source,
@@ -75,4 +137,7 @@ def build_session_context(
duration_s=max(0.0, t_end - t_start),
input_events=tuple(inputs),
output_events=tuple(outputs),
iats=iats,
paste_bursts=paste_bursts,
paste_event_count=paste_count,
)

View File

@@ -1,14 +1,76 @@
"""Asciinema event types.
"""Asciinema event types + shard-line parsing helpers.
The on-disk shard format is a list of 3-tuples ``(t, kind, data)`` where
``t`` is seconds since session start (float), ``kind`` is ``'i'`` (input)
or ``'o'`` (output), and ``data`` is the captured bytes decoded as a
Python ``str``. Step 0 ships only the type aliases — Step 1 fills the
parsing helpers and paste-burst detector.
Shard lines are JSON objects ``{"sid": ..., "t": float, "ch": "i"|"o",
"d": str}`` produced by the DECNET PTY-recording wrapper and held in
sensor-side blob storage. The first line of each file is a header
(``{"sid": ..., "hdr": {...}}``) which carries no event payload — the
parser skips it.
The on-wire engine input is the simpler 3-tuple ``(t, kind, data)``
:data:`AsciinemaEvent`. Workers (``BEHAVE-INTEGRATION.md`` Phase 4)
either feed the 3-tuple directly or use :func:`parse_shard_line` to
turn a raw JSON string into one.
"""
from __future__ import annotations
from typing import Literal, Tuple
import json
from dataclasses import dataclass
from typing import Iterable, Iterator, Literal, Tuple
EventKind = Literal["i", "o"]
AsciinemaEvent = Tuple[float, EventKind, str]
@dataclass(frozen=True, slots=True)
class PasteBurst:
"""Contiguous run of paste-class input events.
A paste-class event is a single input event whose ``data`` length
is at least ``PASTE_MIN_CHARS_PER_EVENT`` — terminal pastes from
xterm/kitty/iTerm arrive as one bulk write, so checking event size
is the cheap-and-correct proxy for the bracketed-paste signal we
don't get to see.
Multiple consecutive paste-class events with low IATs collapse
into one ``PasteBurst`` for higher-level reasoning (paste-rate /
paste-style classification later).
"""
start_ts: float
end_ts: float
char_count: int
event_count: int
def parse_shard_line(line: str) -> AsciinemaEvent | None:
"""Turn one shard JSONL line into an :data:`AsciinemaEvent`.
Returns ``None`` for the header line and for any line that is not
a well-formed event record. Workers must filter ``None``s out
before passing to :func:`extract_session`.
"""
line = line.strip()
if not line:
return None
try:
rec = json.loads(line)
except (json.JSONDecodeError, ValueError):
return None
if not isinstance(rec, dict):
return None
if "hdr" in rec or "t" not in rec or "ch" not in rec:
return None
t = rec.get("t")
ch = rec.get("ch")
d = rec.get("d", "")
if not isinstance(t, (int, float)) or ch not in ("i", "o") or not isinstance(d, str):
return None
return (float(t), ch, d)
def parse_shard(lines: Iterable[str]) -> Iterator[AsciinemaEvent]:
"""Stream-parse a shard file's lines into events, skipping junk."""
for line in lines:
ev = parse_shard_line(line)
if ev is not None:
yield ev

View File

@@ -1,11 +1,37 @@
"""Numeric thresholds for BEHAVE-SHELL primitive classification.
Each constant added here cites its calibration source. When the
registry's ``notes:`` field disagrees with a constant in this file the
registry is authoritative — fix the constant and re-run the
calibration grid.
Each constant cites its calibration source. When the registry's
``notes:`` field disagrees with a constant here, the registry is
authoritative — fix the constant, re-run the calibration grid.
Step 0 ships this file empty by design; thresholds land alongside the
feature functions that consume them (Steps 1+).
Empirical thresholds inherited from the BEHAVE prototype extractor
(``BEHAVE/prototype_extractors/shell/extract.py``); see lines 40-90 of
that file for the calibration history. Any change here must keep the
five-class grid green.
"""
from __future__ import annotations
# ── paste-burst detection (Step 1) ──────────────────────────────────────────
# A single input event with ≥ PASTE_MIN_CHARS_PER_EVENT chars is the
# paste-class proxy used by the prototype; xterm-kitty / iTerm / VS Code
# pastes arrive as one bulk write.
PASTE_MIN_CHARS_PER_EVENT: int = 4
# Consecutive paste-class events arriving within this IAT collapse into
# one PasteBurst record. 200ms is the prototype's IKI burst cap.
PASTE_BURST_MAX_IAT_S: float = 0.20
# ── motor.input_modality (Step 2) ───────────────────────────────────────────
# Paste-event ratio thresholds. ≥ 40% paste events → "pasted" (LLM-driven);
# ≤ 5% → "typed" (human at the keyboard); in between → "mixed".
# Lowered from 0.5 after the 47.6% case in sessions-2026-05-02-with-llm.jsonl
# was clearly LLM-driven but missed the 0.5 floor.
MODALITY_PASTED_MIN: float = 0.40
MODALITY_TYPED_MAX: float = 0.05
# ── motor.paste_burst_rate (Step 3) ─────────────────────────────────────────
# Same paste-event ratio re-bucketed for the "how often does the operator
# paste" axis. Coarser than input_modality on purpose: this primitive is the
# habit signal, input_modality is the dominant-channel signal.
PASTE_RATE_HABITUAL_MIN: float = 0.50
PASTE_RATE_OCCASIONAL_MIN: float = 0.10

View File

@@ -0,0 +1,121 @@
"""Step 1: shard parsing + paste-burst detection.
Synthetic streams covering pure-typed, pure-pasted, and mixed input,
plus the JSONL shard line parser and its junk handling.
"""
from __future__ import annotations
import json
from decnet.profiler.behave_shell._ctx import build_session_context
from decnet.profiler.behave_shell._parse import (
PasteBurst,
parse_shard,
parse_shard_line,
)
# ── parse_shard_line ───────────────────────────────────────────────────────
def test_parse_shard_line_event() -> None:
line = json.dumps({"sid": "x", "t": 1.5, "ch": "i", "d": "ls"})
assert parse_shard_line(line) == (1.5, "i", "ls")
def test_parse_shard_line_skips_header() -> None:
line = json.dumps({"sid": "x", "hdr": {"version": 2}})
assert parse_shard_line(line) is None
def test_parse_shard_line_handles_blank_and_garbage() -> None:
assert parse_shard_line("") is None
assert parse_shard_line(" ") is None
assert parse_shard_line("not json") is None
assert parse_shard_line('{"t": "string-not-number", "ch": "i", "d": "x"}') is None
assert parse_shard_line('{"t": 1.0, "ch": "z", "d": "x"}') is None # bad kind
assert parse_shard_line('"just a string"') is None
def test_parse_shard_skips_junk_in_stream() -> None:
lines = [
json.dumps({"sid": "x", "hdr": {"version": 2}}),
"",
"garbage",
json.dumps({"sid": "x", "t": 0.1, "ch": "i", "d": "a"}),
json.dumps({"sid": "x", "t": 0.2, "ch": "o", "d": "a\r\n"}),
]
out = list(parse_shard(lines))
assert out == [(0.1, "i", "a"), (0.2, "o", "a\r\n")]
# ── paste-burst detection ──────────────────────────────────────────────────
def test_pure_typed_stream_has_no_paste_bursts() -> None:
# Single-char input events spaced by 0.1s: pure typing.
events = [(i * 0.1, "i", c) for i, c in enumerate("hello world\r")]
ctx = build_session_context(events, sid="t-typed", source="test")
assert ctx.paste_bursts == ()
assert ctx.paste_event_count == 0
# IATs computed correctly
assert len(ctx.iats) == len(events) - 1
for iat in ctx.iats:
assert abs(iat - 0.1) < 1e-9
def test_single_paste_event_one_burst() -> None:
events = [
(0.0, "i", "echo hello world\r"), # 17 chars — paste class
]
ctx = build_session_context(events, sid="t-paste-1", source="test")
assert len(ctx.paste_bursts) == 1
assert ctx.paste_event_count == 1
burst = ctx.paste_bursts[0]
assert burst.start_ts == 0.0
assert burst.end_ts == 0.0
assert burst.char_count == 17
assert burst.event_count == 1
def test_two_close_paste_events_collapse_into_one_burst() -> None:
events = [
(0.0, "i", "first paste line\r"),
(0.05, "i", "second paste line\r"), # within PASTE_BURST_MAX_IAT_S
]
ctx = build_session_context(events, sid="t-paste-2", source="test")
assert len(ctx.paste_bursts) == 1
assert ctx.paste_event_count == 2
assert ctx.paste_bursts[0].event_count == 2
assert ctx.paste_bursts[0].char_count == 17 + 18
def test_two_far_paste_events_split_into_two_bursts() -> None:
events = [
(0.0, "i", "first paste\r"),
(5.0, "i", "second paste\r"), # well past the IAT cap
]
ctx = build_session_context(events, sid="t-paste-far", source="test")
assert len(ctx.paste_bursts) == 2
assert ctx.paste_event_count == 2
def test_typing_between_pastes_breaks_the_burst() -> None:
events: list = [
(0.0, "i", "long pasted line\r"),
(0.5, "i", "x"), # single typed char interrupts
(0.6, "i", "another pasted line\r"),
]
ctx = build_session_context(events, sid="t-mixed", source="test")
assert len(ctx.paste_bursts) == 2
assert ctx.paste_event_count == 2
# The "x" event is not a paste-class event
assert ctx.input_events[1][2] == "x"
def test_paste_burst_record_is_immutable() -> None:
b = PasteBurst(start_ts=0.0, end_ts=1.0, char_count=10, event_count=1)
try:
b.char_count = 99 # type: ignore[misc]
except Exception:
pass
else:
raise AssertionError("PasteBurst should be frozen")