feat(profiler/behave_shell): output error-signal helper for Phase D

Lifts the error-signal slice of F.0 forward as a D.0 prelude. ANSI
strip + canonical bash/sh error fingerprints classify each command's
post-execution output window; Command gains errored / output_bytes
fields. PII discipline preserved — only a bool and an int leave the
helper, the stripped output text is dropped on return.

Drives D.1 (cognitive_load error_rate term) and D.5–D.7 (error_resilience
family). Phase F.0 will subsume this with PS1 + exit-code parsing.
This commit is contained in:
2026-05-03 23:46:31 -04:00
parent bc62e42ce1
commit 601986bd6d
4 changed files with 259 additions and 0 deletions

View File

@@ -10,6 +10,7 @@ will fill ``commands`` / ``inter_cmd_iats`` / ``output_per_cmd``.
""" """
from __future__ import annotations from __future__ import annotations
import math
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Iterable from typing import Iterable
@@ -17,7 +18,9 @@ from decnet.profiler.behave_shell._parse import (
AsciinemaEvent, AsciinemaEvent,
Command, Command,
PasteBurst, PasteBurst,
detect_error_in_output,
hash_token, hash_token,
strip_ansi,
) )
from decnet.profiler.behave_shell._thresholds import ( from decnet.profiler.behave_shell._thresholds import (
IKI_THINK_MAX_S, IKI_THINK_MAX_S,
@@ -219,6 +222,36 @@ def _segment_commands(inputs: list[AsciinemaEvent]) -> tuple[Command, ...]:
return tuple(cmds) return tuple(cmds)
def _annotate_commands_with_output(
commands: tuple[Command, ...],
outputs: list[AsciinemaEvent],
) -> tuple[Command, ...]:
"""Re-emit ``commands`` with ``errored`` / ``output_bytes`` filled.
The output window for ``commands[i]`` spans from its ``end_ts``
(the ``\\r``/``\\n`` that ran it) to the ``start_ts`` of the next
command. The last command's window is open-ended (``math.inf``)
so output events arriving at or after ``t_end`` are still captured.
"""
if not commands:
return commands
annotated: list[Command] = []
for i, cmd in enumerate(commands):
win_end = commands[i + 1].start_ts if i + 1 < len(commands) else math.inf
byte_count, errored = _output_window(outputs, cmd.end_ts, win_end)
annotated.append(Command(
start_ts=cmd.start_ts,
end_ts=cmd.end_ts,
first_token_hash=cmd.first_token_hash,
tab_count=cmd.tab_count,
shortcut_count=cmd.shortcut_count,
pipe_count=cmd.pipe_count,
errored=errored,
output_bytes=byte_count,
))
return tuple(annotated)
def _per_command_iats( def _per_command_iats(
commands: tuple[Command, ...], commands: tuple[Command, ...],
inputs: list[AsciinemaEvent], inputs: list[AsciinemaEvent],
@@ -252,6 +285,32 @@ def _output_bytes_between(
return sum(len(d) for t, _k, d in outputs if start <= t < end) return sum(len(d) for t, _k, d in outputs if start <= t < end)
def _output_window(
outputs: list[AsciinemaEvent],
start: float,
end: float,
) -> tuple[int, bool]:
"""Walk output events in ``[start, end)`` once.
Returns ``(byte_count, errored)``. ``byte_count`` is the raw byte
count (pre-strip); ``errored`` is the canonical-error-pattern match
over the ANSI-stripped concatenation. The stripped text is dropped
on return — PII discipline: only an int and a bool leave this
helper. The full output bytes never enter ``Command`` or the
``SessionContext``.
"""
chunks: list[str] = []
byte_count = 0
for t, _k, d in outputs:
if start <= t < end:
byte_count += len(d)
chunks.append(d)
if not chunks:
return 0, False
stripped = strip_ansi("".join(chunks))
return byte_count, detect_error_in_output(stripped)
def build_session_context( def build_session_context(
events: Iterable[AsciinemaEvent], events: Iterable[AsciinemaEvent],
*, *,
@@ -290,6 +349,7 @@ def build_session_context(
typing_bursts = _split_typing_bursts(iats) typing_bursts = _split_typing_bursts(iats)
backspace_count, backspace_iats, kill_line_count = _scan_correction_signals(inputs) backspace_count, backspace_iats, kill_line_count = _scan_correction_signals(inputs)
commands = _segment_commands(inputs) commands = _segment_commands(inputs)
commands = _annotate_commands_with_output(commands, outputs)
inter_cmd_iats = tuple( inter_cmd_iats = tuple(
max(0.0, commands[i + 1].start_ts - commands[i].end_ts) max(0.0, commands[i + 1].start_ts - commands[i].end_ts)
for i in range(len(commands) - 1) for i in range(len(commands) - 1)

View File

@@ -15,6 +15,7 @@ from __future__ import annotations
import hashlib import hashlib
import json import json
import re
from dataclasses import dataclass from dataclasses import dataclass
from typing import Iterable, Iterator, Literal, Tuple from typing import Iterable, Iterator, Literal, Tuple
@@ -22,6 +23,47 @@ EventKind = Literal["i", "o"]
AsciinemaEvent = Tuple[float, EventKind, str] AsciinemaEvent = Tuple[float, EventKind, str]
# CSI / OSC / SGR / single-char escape sweeper. One pass, then we drop the
# stripped text on the floor — only the boolean error verdict (and the byte
# count, computed before stripping) leaves the helper. Full prompt-string
# parsing lives in Phase F.0; this is the slice cognitive.error_resilience.*
# needs to ship correctly.
_ANSI_RE = re.compile(
r"""
\x1B # ESC
(?:
\[ [0-?]* [ -/]* [@-~] # CSI
| \] [^\x07\x1B]* (?:\x07|\x1B\\)? # OSC, ST-or-BEL terminated
| [@-Z\\-_] # 2-byte escapes (ESC followed by 0x40-0x5F)
)
""",
re.VERBOSE,
)
def strip_ansi(data: str) -> str:
"""Remove ANSI escape sequences. Used pre-error-pattern match."""
return _ANSI_RE.sub("", data)
# Canonical bash/sh error fingerprints. v0.1 heuristic — Phase F.0's prompt
# parser will subsume this with PS1 + exit-code sniff. Any change here must
# leave the calibration grid green.
_OUTPUT_ERROR_PATTERNS: tuple[re.Pattern[str], ...] = (
re.compile(r"command not found"),
re.compile(r"No such file or directory"),
re.compile(r"Permission denied"),
re.compile(r": cannot "),
re.compile(r"Operation not permitted"),
re.compile(r"syntax error near unexpected token"),
)
def detect_error_in_output(stripped: str) -> bool:
"""True if any canonical error fingerprint matches the stripped output."""
return any(p.search(stripped) for p in _OUTPUT_ERROR_PATTERNS)
@dataclass(frozen=True, slots=True) @dataclass(frozen=True, slots=True)
class PasteBurst: class PasteBurst:
"""Contiguous run of paste-class input events.""" """Contiguous run of paste-class input events."""
@@ -53,6 +95,16 @@ class Command:
byte sweep. They feed the ``motor.shell_mastery.*`` primitives byte sweep. They feed the ``motor.shell_mastery.*`` primitives
(Phase C). The raw bytes themselves are read once during the (Phase C). The raw bytes themselves are read once during the
sweep and discarded — only the counters are retained. sweep and discarded — only the counters are retained.
``errored`` (Step D.0) is set when the output stream between this
command and the next contains a canonical bash/sh error fingerprint
(see :func:`detect_error_in_output`). ``output_bytes`` is the byte
count of that same window. Both are populated in the segmentation
walk; the underlying output text is stripped of ANSI then matched,
and the stripped text is discarded — only the bool and the int
leave the segmentation pass. Drives the ``cognitive.error_resilience.*``
family (Phase D) and the ``error_rate`` term of
``cognitive.cognitive_load``.
""" """
start_ts: float start_ts: float
@@ -61,6 +113,8 @@ class Command:
tab_count: int = 0 tab_count: int = 0
shortcut_count: int = 0 shortcut_count: int = 0
pipe_count: int = 0 pipe_count: int = 0
errored: bool = False
output_bytes: int = 0
def hash_token(token: str) -> str: def hash_token(token: str) -> str:

View File

@@ -76,6 +76,17 @@ FEEDBACK_MIN_PAIRS: int = 5
PAUSE_CV_METRONOMIC_MAX: float = 0.40 PAUSE_CV_METRONOMIC_MAX: float = 0.40
PAUSE_CV_BIMODAL_MIN: float = 1.50 PAUSE_CV_BIMODAL_MIN: float = 1.50
# ── output error-signal helper (Step D.0) ──────────────────────────────────
# The canonical bash/sh error fingerprints live in ``_parse.py`` as
# ``_OUTPUT_ERROR_PATTERNS`` (compiled regexes). They're not threshold
# numbers, so they live next to the helper that uses them rather than
# here. This v0.1 heuristic will be subsumed by Phase F.0's prompt
# parser (PS1 echo + exit-code sniff), at which point this comment and
# the patterns block move to ``_parse.py``'s prompt section. Until then,
# any drift in registry value definitions for ``error_resilience.*`` or
# ``cognitive_load`` must be reflected by editing the patterns tuple
# (not a constant, so no boundary-band logic applies).
# ── motor.keystroke_cadence (Step B.1) ────────────────────────────────────── # ── motor.keystroke_cadence (Step B.1) ──────────────────────────────────────
# Typing bursts split at gaps > IKI_THINK_MAX_S so think-pauses between # Typing bursts split at gaps > IKI_THINK_MAX_S so think-pauses between
# commands don't inflate the within-burst CV. Mirrors the prototype's # commands don't inflate the within-burst CV. Mirrors the prototype's

View File

@@ -0,0 +1,134 @@
"""Step D.0: per-command error-signal helper.
The helper is infrastructure (no primitive emit) — these tests pin
``Command.errored`` and ``Command.output_bytes`` semantics directly via
``build_session_context``. The Phase D primitives (D.1, D.5D.7) all
read the same fields, so any drift here breaks four downstream
primitives at once.
"""
from __future__ import annotations
from decnet.profiler.behave_shell import extract_session
from decnet.profiler.behave_shell._ctx import build_session_context
from decnet.profiler.behave_shell._parse import (
AsciinemaEvent,
detect_error_in_output,
strip_ansi,
)
def _typed(text: str, t0: float = 0.0, dt: float = 0.05) -> list[AsciinemaEvent]:
return [(t0 + i * dt, "i", c) for i, c in enumerate(text)]
# ── strip_ansi ──────────────────────────────────────────────────────────────
def test_strip_ansi_removes_csi_sgr() -> None:
assert strip_ansi("\x1b[31mPermission denied\x1b[0m") == "Permission denied"
def test_strip_ansi_removes_osc_with_bel() -> None:
# OSC: ESC ] ... BEL — terminal title escape
assert strip_ansi("\x1b]0;title\x07hello") == "hello"
def test_strip_ansi_passthrough_clean_text() -> None:
assert strip_ansi("plain output\nwith newline") == "plain output\nwith newline"
# ── detect_error_in_output ──────────────────────────────────────────────────
def test_detect_error_command_not_found() -> None:
assert detect_error_in_output("bash: foo: command not found") is True
def test_detect_error_no_such_file() -> None:
assert detect_error_in_output("ls: /nope: No such file or directory") is True
def test_detect_error_permission_denied() -> None:
assert detect_error_in_output("cat: /etc/shadow: Permission denied") is True
def test_detect_error_cannot_access() -> None:
assert detect_error_in_output("ls: cannot access '/x': No such file") is True
def test_detect_error_clean_output() -> None:
assert detect_error_in_output("total 12\ndrwxr-xr-x 3 user user 4096 May 3 12:00 .") is False
# ── Command.errored / output_bytes wired through build_session_context ──────
def test_command_clean_output_not_errored() -> None:
events: list[AsciinemaEvent] = [
*_typed("ls\r"),
(0.20, "o", "file1\nfile2\n"),
]
ctx = build_session_context(events, sid="d0-clean", source="test")
assert len(ctx.commands) == 1
assert ctx.commands[0].errored is False
assert ctx.commands[0].output_bytes == len("file1\nfile2\n")
def test_command_with_error_pattern_marked_errored() -> None:
events: list[AsciinemaEvent] = [
*_typed("foo\r"),
(0.20, "o", "bash: foo: command not found\n"),
]
ctx = build_session_context(events, sid="d0-err", source="test")
assert ctx.commands[0].errored is True
assert ctx.commands[0].output_bytes == len("bash: foo: command not found\n")
def test_command_with_ansi_wrapped_error_marked_errored() -> None:
"""ANSI strip must run before pattern match (red-coloured `Permission denied`)."""
events: list[AsciinemaEvent] = [
*_typed("cat /etc/shadow\r"),
(1.50, "o", "\x1b[31mcat: /etc/shadow: Permission denied\x1b[0m\n"),
]
ctx = build_session_context(events, sid="d0-ansi", source="test")
assert ctx.commands[0].errored is True
def test_last_command_output_window_extends_to_t_end() -> None:
"""The last command's window has no ``commands[i+1]`` — it spans to t_end."""
events: list[AsciinemaEvent] = [
*_typed("ls\r", t0=0.0),
*_typed("foo\r", t0=1.0),
(1.50, "o", "bash: foo: command not found\n"),
]
ctx = build_session_context(events, sid="d0-last", source="test")
assert len(ctx.commands) == 2
assert ctx.commands[0].errored is False
assert ctx.commands[1].errored is True
def test_no_output_events_no_errored() -> None:
"""A shard with no ``'o'`` events emits clean ``errored=False`` per command."""
events: list[AsciinemaEvent] = _typed("ls\r")
ctx = build_session_context(events, sid="d0-noout", source="test")
assert ctx.commands[0].errored is False
assert ctx.commands[0].output_bytes == 0
# ── PII regression ──────────────────────────────────────────────────────────
def test_pii_no_output_bodies_in_observations() -> None:
"""Output bytes containing operator-identifying strings must not leak.
The error pattern triggers ``errored=True``; the surrounding output
contains the literal ``secret_payload_xyz`` token. No observation may
serialise that token, since the engine only retains a bool + an int.
"""
events: list[AsciinemaEvent] = [
*_typed("foo\r"),
(0.20, "o", "secret_payload_xyz\nbash: foo: command not found\n"),
]
out = list(extract_session(events, sid="d0-pii"))
for obs in out:
assert "secret_payload_xyz" not in obs.model_dump_json()