diff --git a/decnet/profiler/behave_shell/_ctx.py b/decnet/profiler/behave_shell/_ctx.py index 1a13cbbf..d290d461 100644 --- a/decnet/profiler/behave_shell/_ctx.py +++ b/decnet/profiler/behave_shell/_ctx.py @@ -10,6 +10,7 @@ will fill ``commands`` / ``inter_cmd_iats`` / ``output_per_cmd``. """ from __future__ import annotations +import math from dataclasses import dataclass, field from typing import Iterable @@ -17,7 +18,9 @@ from decnet.profiler.behave_shell._parse import ( AsciinemaEvent, Command, PasteBurst, + detect_error_in_output, hash_token, + strip_ansi, ) from decnet.profiler.behave_shell._thresholds import ( IKI_THINK_MAX_S, @@ -219,6 +222,36 @@ def _segment_commands(inputs: list[AsciinemaEvent]) -> tuple[Command, ...]: return tuple(cmds) +def _annotate_commands_with_output( + commands: tuple[Command, ...], + outputs: list[AsciinemaEvent], +) -> tuple[Command, ...]: + """Re-emit ``commands`` with ``errored`` / ``output_bytes`` filled. + + The output window for ``commands[i]`` spans from its ``end_ts`` + (the ``\\r``/``\\n`` that ran it) to the ``start_ts`` of the next + command. The last command's window is open-ended (``math.inf``) + so output events arriving at or after ``t_end`` are still captured. + """ + if not commands: + return commands + annotated: list[Command] = [] + for i, cmd in enumerate(commands): + win_end = commands[i + 1].start_ts if i + 1 < len(commands) else math.inf + byte_count, errored = _output_window(outputs, cmd.end_ts, win_end) + annotated.append(Command( + start_ts=cmd.start_ts, + end_ts=cmd.end_ts, + first_token_hash=cmd.first_token_hash, + tab_count=cmd.tab_count, + shortcut_count=cmd.shortcut_count, + pipe_count=cmd.pipe_count, + errored=errored, + output_bytes=byte_count, + )) + return tuple(annotated) + + def _per_command_iats( commands: tuple[Command, ...], inputs: list[AsciinemaEvent], @@ -252,6 +285,32 @@ def _output_bytes_between( return sum(len(d) for t, _k, d in outputs if start <= t < end) +def _output_window( + outputs: list[AsciinemaEvent], + start: float, + end: float, +) -> tuple[int, bool]: + """Walk output events in ``[start, end)`` once. + + Returns ``(byte_count, errored)``. ``byte_count`` is the raw byte + count (pre-strip); ``errored`` is the canonical-error-pattern match + over the ANSI-stripped concatenation. The stripped text is dropped + on return — PII discipline: only an int and a bool leave this + helper. The full output bytes never enter ``Command`` or the + ``SessionContext``. + """ + chunks: list[str] = [] + byte_count = 0 + for t, _k, d in outputs: + if start <= t < end: + byte_count += len(d) + chunks.append(d) + if not chunks: + return 0, False + stripped = strip_ansi("".join(chunks)) + return byte_count, detect_error_in_output(stripped) + + def build_session_context( events: Iterable[AsciinemaEvent], *, @@ -290,6 +349,7 @@ def build_session_context( typing_bursts = _split_typing_bursts(iats) backspace_count, backspace_iats, kill_line_count = _scan_correction_signals(inputs) commands = _segment_commands(inputs) + commands = _annotate_commands_with_output(commands, outputs) inter_cmd_iats = tuple( max(0.0, commands[i + 1].start_ts - commands[i].end_ts) for i in range(len(commands) - 1) diff --git a/decnet/profiler/behave_shell/_parse.py b/decnet/profiler/behave_shell/_parse.py index 1de05ce9..6816b645 100644 --- a/decnet/profiler/behave_shell/_parse.py +++ b/decnet/profiler/behave_shell/_parse.py @@ -15,6 +15,7 @@ from __future__ import annotations import hashlib import json +import re from dataclasses import dataclass from typing import Iterable, Iterator, Literal, Tuple @@ -22,6 +23,47 @@ EventKind = Literal["i", "o"] AsciinemaEvent = Tuple[float, EventKind, str] +# CSI / OSC / SGR / single-char escape sweeper. One pass, then we drop the +# stripped text on the floor — only the boolean error verdict (and the byte +# count, computed before stripping) leaves the helper. Full prompt-string +# parsing lives in Phase F.0; this is the slice cognitive.error_resilience.* +# needs to ship correctly. +_ANSI_RE = re.compile( + r""" + \x1B # ESC + (?: + \[ [0-?]* [ -/]* [@-~] # CSI + | \] [^\x07\x1B]* (?:\x07|\x1B\\)? # OSC, ST-or-BEL terminated + | [@-Z\\-_] # 2-byte escapes (ESC followed by 0x40-0x5F) + ) + """, + re.VERBOSE, +) + + +def strip_ansi(data: str) -> str: + """Remove ANSI escape sequences. Used pre-error-pattern match.""" + return _ANSI_RE.sub("", data) + + +# Canonical bash/sh error fingerprints. v0.1 heuristic — Phase F.0's prompt +# parser will subsume this with PS1 + exit-code sniff. Any change here must +# leave the calibration grid green. +_OUTPUT_ERROR_PATTERNS: tuple[re.Pattern[str], ...] = ( + re.compile(r"command not found"), + re.compile(r"No such file or directory"), + re.compile(r"Permission denied"), + re.compile(r": cannot "), + re.compile(r"Operation not permitted"), + re.compile(r"syntax error near unexpected token"), +) + + +def detect_error_in_output(stripped: str) -> bool: + """True if any canonical error fingerprint matches the stripped output.""" + return any(p.search(stripped) for p in _OUTPUT_ERROR_PATTERNS) + + @dataclass(frozen=True, slots=True) class PasteBurst: """Contiguous run of paste-class input events.""" @@ -53,6 +95,16 @@ class Command: byte sweep. They feed the ``motor.shell_mastery.*`` primitives (Phase C). The raw bytes themselves are read once during the sweep and discarded — only the counters are retained. + + ``errored`` (Step D.0) is set when the output stream between this + command and the next contains a canonical bash/sh error fingerprint + (see :func:`detect_error_in_output`). ``output_bytes`` is the byte + count of that same window. Both are populated in the segmentation + walk; the underlying output text is stripped of ANSI then matched, + and the stripped text is discarded — only the bool and the int + leave the segmentation pass. Drives the ``cognitive.error_resilience.*`` + family (Phase D) and the ``error_rate`` term of + ``cognitive.cognitive_load``. """ start_ts: float @@ -61,6 +113,8 @@ class Command: tab_count: int = 0 shortcut_count: int = 0 pipe_count: int = 0 + errored: bool = False + output_bytes: int = 0 def hash_token(token: str) -> str: diff --git a/decnet/profiler/behave_shell/_thresholds.py b/decnet/profiler/behave_shell/_thresholds.py index 82ccadd3..a337efc8 100644 --- a/decnet/profiler/behave_shell/_thresholds.py +++ b/decnet/profiler/behave_shell/_thresholds.py @@ -76,6 +76,17 @@ FEEDBACK_MIN_PAIRS: int = 5 PAUSE_CV_METRONOMIC_MAX: float = 0.40 PAUSE_CV_BIMODAL_MIN: float = 1.50 +# ── output error-signal helper (Step D.0) ────────────────────────────────── +# The canonical bash/sh error fingerprints live in ``_parse.py`` as +# ``_OUTPUT_ERROR_PATTERNS`` (compiled regexes). They're not threshold +# numbers, so they live next to the helper that uses them rather than +# here. This v0.1 heuristic will be subsumed by Phase F.0's prompt +# parser (PS1 echo + exit-code sniff), at which point this comment and +# the patterns block move to ``_parse.py``'s prompt section. Until then, +# any drift in registry value definitions for ``error_resilience.*`` or +# ``cognitive_load`` must be reflected by editing the patterns tuple +# (not a constant, so no boundary-band logic applies). + # ── motor.keystroke_cadence (Step B.1) ────────────────────────────────────── # Typing bursts split at gaps > IKI_THINK_MAX_S so think-pauses between # commands don't inflate the within-burst CV. Mirrors the prototype's diff --git a/tests/profiler/behave_shell/test_command_error_detection.py b/tests/profiler/behave_shell/test_command_error_detection.py new file mode 100644 index 00000000..25757a6d --- /dev/null +++ b/tests/profiler/behave_shell/test_command_error_detection.py @@ -0,0 +1,134 @@ +"""Step D.0: per-command error-signal helper. + +The helper is infrastructure (no primitive emit) — these tests pin +``Command.errored`` and ``Command.output_bytes`` semantics directly via +``build_session_context``. The Phase D primitives (D.1, D.5–D.7) all +read the same fields, so any drift here breaks four downstream +primitives at once. +""" +from __future__ import annotations + +from decnet.profiler.behave_shell import extract_session +from decnet.profiler.behave_shell._ctx import build_session_context +from decnet.profiler.behave_shell._parse import ( + AsciinemaEvent, + detect_error_in_output, + strip_ansi, +) + + +def _typed(text: str, t0: float = 0.0, dt: float = 0.05) -> list[AsciinemaEvent]: + return [(t0 + i * dt, "i", c) for i, c in enumerate(text)] + + +# ── strip_ansi ────────────────────────────────────────────────────────────── + + +def test_strip_ansi_removes_csi_sgr() -> None: + assert strip_ansi("\x1b[31mPermission denied\x1b[0m") == "Permission denied" + + +def test_strip_ansi_removes_osc_with_bel() -> None: + # OSC: ESC ] ... BEL — terminal title escape + assert strip_ansi("\x1b]0;title\x07hello") == "hello" + + +def test_strip_ansi_passthrough_clean_text() -> None: + assert strip_ansi("plain output\nwith newline") == "plain output\nwith newline" + + +# ── detect_error_in_output ────────────────────────────────────────────────── + + +def test_detect_error_command_not_found() -> None: + assert detect_error_in_output("bash: foo: command not found") is True + + +def test_detect_error_no_such_file() -> None: + assert detect_error_in_output("ls: /nope: No such file or directory") is True + + +def test_detect_error_permission_denied() -> None: + assert detect_error_in_output("cat: /etc/shadow: Permission denied") is True + + +def test_detect_error_cannot_access() -> None: + assert detect_error_in_output("ls: cannot access '/x': No such file") is True + + +def test_detect_error_clean_output() -> None: + assert detect_error_in_output("total 12\ndrwxr-xr-x 3 user user 4096 May 3 12:00 .") is False + + +# ── Command.errored / output_bytes wired through build_session_context ────── + + +def test_command_clean_output_not_errored() -> None: + events: list[AsciinemaEvent] = [ + *_typed("ls\r"), + (0.20, "o", "file1\nfile2\n"), + ] + ctx = build_session_context(events, sid="d0-clean", source="test") + assert len(ctx.commands) == 1 + assert ctx.commands[0].errored is False + assert ctx.commands[0].output_bytes == len("file1\nfile2\n") + + +def test_command_with_error_pattern_marked_errored() -> None: + events: list[AsciinemaEvent] = [ + *_typed("foo\r"), + (0.20, "o", "bash: foo: command not found\n"), + ] + ctx = build_session_context(events, sid="d0-err", source="test") + assert ctx.commands[0].errored is True + assert ctx.commands[0].output_bytes == len("bash: foo: command not found\n") + + +def test_command_with_ansi_wrapped_error_marked_errored() -> None: + """ANSI strip must run before pattern match (red-coloured `Permission denied`).""" + events: list[AsciinemaEvent] = [ + *_typed("cat /etc/shadow\r"), + (1.50, "o", "\x1b[31mcat: /etc/shadow: Permission denied\x1b[0m\n"), + ] + ctx = build_session_context(events, sid="d0-ansi", source="test") + assert ctx.commands[0].errored is True + + +def test_last_command_output_window_extends_to_t_end() -> None: + """The last command's window has no ``commands[i+1]`` — it spans to t_end.""" + events: list[AsciinemaEvent] = [ + *_typed("ls\r", t0=0.0), + *_typed("foo\r", t0=1.0), + (1.50, "o", "bash: foo: command not found\n"), + ] + ctx = build_session_context(events, sid="d0-last", source="test") + assert len(ctx.commands) == 2 + assert ctx.commands[0].errored is False + assert ctx.commands[1].errored is True + + +def test_no_output_events_no_errored() -> None: + """A shard with no ``'o'`` events emits clean ``errored=False`` per command.""" + events: list[AsciinemaEvent] = _typed("ls\r") + ctx = build_session_context(events, sid="d0-noout", source="test") + assert ctx.commands[0].errored is False + assert ctx.commands[0].output_bytes == 0 + + +# ── PII regression ────────────────────────────────────────────────────────── + + +def test_pii_no_output_bodies_in_observations() -> None: + """Output bytes containing operator-identifying strings must not leak. + + The error pattern triggers ``errored=True``; the surrounding output + contains the literal ``secret_payload_xyz`` token. No observation may + serialise that token, since the engine only retains a bool + an int. + """ + events: list[AsciinemaEvent] = [ + *_typed("foo\r"), + (0.20, "o", "secret_payload_xyz\nbash: foo: command not found\n"), + ] + out = list(extract_session(events, sid="d0-pii")) + for obs in out: + assert "secret_payload_xyz" not in obs.model_dump_json()