From 1ff02f0c776e4e1f242fbc522f1b6a589b1f643c Mon Sep 17 00:00:00 2001 From: anti Date: Mon, 4 May 2026 00:29:08 -0400 Subject: [PATCH] feat(profiler/behave_shell): F.0 prompt-line detector MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds PromptLine dataclass + extract_prompt_lines() helper. PromptLine carries ts, suffix_char ($/#/%/>), raw_line (ANSI-stripped, capped), is_root flag. Populated during the existing single-pass output-window walk; SessionContext gains prompt_lines, Command gains followed_by_prompt. PII trade-off (ANTI-authorised at Phase F): PS1 text retained on ctx so F.1 / F.3 / E.4 can read it. Capped at PROMPT_LINE_MAX_CHARS=256. Observations still only carry derived primitive values. D.0's regex error helpers stay alongside (NOT subsumed) — they fire even when PS1 echo is suppressed. F.0 enriches D.0 rather than replacing it. --- decnet/profiler/behave_shell/_ctx.py | 57 +++++-- decnet/profiler/behave_shell/_parse.py | 86 +++++++++++ decnet/profiler/behave_shell/_thresholds.py | 10 ++ .../test_prompt_line_detection.py | 142 ++++++++++++++++++ 4 files changed, 280 insertions(+), 15 deletions(-) create mode 100644 tests/profiler/behave_shell/test_prompt_line_detection.py diff --git a/decnet/profiler/behave_shell/_ctx.py b/decnet/profiler/behave_shell/_ctx.py index d290d461..dba044a5 100644 --- a/decnet/profiler/behave_shell/_ctx.py +++ b/decnet/profiler/behave_shell/_ctx.py @@ -18,7 +18,9 @@ from decnet.profiler.behave_shell._parse import ( AsciinemaEvent, Command, PasteBurst, + PromptLine, detect_error_in_output, + extract_prompt_lines, hash_token, strip_ansi, ) @@ -26,6 +28,7 @@ from decnet.profiler.behave_shell._thresholds import ( IKI_THINK_MAX_S, PASTE_BURST_MAX_IAT_S, PASTE_MIN_CHARS_PER_EVENT, + PROMPT_LINE_MAX_CHARS, SHORTCUT_CTRL_BYTES, ) @@ -63,6 +66,9 @@ class SessionContext: # Step B.4 derivations — per-command intra-typing IATs intra_command_iats: tuple[tuple[float, ...], ...] = field(default_factory=tuple) + # Step F.0 derivations — PS1 prompt lines detected in the output stream + prompt_lines: tuple[PromptLine, ...] = field(default_factory=tuple) + def _detect_paste_bursts( inputs: list[AsciinemaEvent], @@ -225,8 +231,14 @@ def _segment_commands(inputs: list[AsciinemaEvent]) -> tuple[Command, ...]: def _annotate_commands_with_output( commands: tuple[Command, ...], outputs: list[AsciinemaEvent], -) -> tuple[Command, ...]: - """Re-emit ``commands`` with ``errored`` / ``output_bytes`` filled. +) -> tuple[tuple[Command, ...], tuple[PromptLine, ...]]: + """Re-emit ``commands`` with output-derived fields filled. + + Returns ``(commands, prompt_lines)``. Each ``Command`` gains + ``errored``, ``output_bytes``, and ``followed_by_prompt`` (Step + F.0). The flattened tuple of all detected ``PromptLine`` instances + across every command's window is returned alongside for the caller + to install on ``SessionContext.prompt_lines``. The output window for ``commands[i]`` spans from its ``end_ts`` (the ``\\r``/``\\n`` that ran it) to the ``start_ts`` of the next @@ -234,11 +246,13 @@ def _annotate_commands_with_output( so output events arriving at or after ``t_end`` are still captured. """ if not commands: - return commands + return commands, () annotated: list[Command] = [] + all_prompts: list[PromptLine] = [] for i, cmd in enumerate(commands): win_end = commands[i + 1].start_ts if i + 1 < len(commands) else math.inf - byte_count, errored = _output_window(outputs, cmd.end_ts, win_end) + byte_count, errored, prompts = _output_window(outputs, cmd.end_ts, win_end) + all_prompts.extend(prompts) annotated.append(Command( start_ts=cmd.start_ts, end_ts=cmd.end_ts, @@ -248,8 +262,9 @@ def _annotate_commands_with_output( pipe_count=cmd.pipe_count, errored=errored, output_bytes=byte_count, + followed_by_prompt=bool(prompts), )) - return tuple(annotated) + return tuple(annotated), tuple(all_prompts) def _per_command_iats( @@ -289,26 +304,37 @@ def _output_window( outputs: list[AsciinemaEvent], start: float, end: float, -) -> tuple[int, bool]: +) -> tuple[int, bool, tuple[PromptLine, ...]]: """Walk output events in ``[start, end)`` once. - Returns ``(byte_count, errored)``. ``byte_count`` is the raw byte - count (pre-strip); ``errored`` is the canonical-error-pattern match - over the ANSI-stripped concatenation. The stripped text is dropped - on return — PII discipline: only an int and a bool leave this - helper. The full output bytes never enter ``Command`` or the - ``SessionContext``. + Returns ``(byte_count, errored, prompt_lines)``. ``byte_count`` is + the raw byte count (pre-strip); ``errored`` is the canonical-error + -pattern match over the ANSI-stripped concatenation; + ``prompt_lines`` is the tuple of PS1 lines detected in the same + stripped text (Step F.0). + + PII trade-off (Phase F): the stripped text itself is dropped on + return, but ``prompt_lines`` retains PS1 strings (capped at + ``PROMPT_LINE_MAX_CHARS``). Only derived values leave the engine + via observations; the prompt strings live on ``SessionContext`` + so F.1 / F.3 / E.4 can read them. """ chunks: list[str] = [] + last_ts = start byte_count = 0 for t, _k, d in outputs: if start <= t < end: byte_count += len(d) chunks.append(d) + last_ts = t if not chunks: - return 0, False + return 0, False, () stripped = strip_ansi("".join(chunks)) - return byte_count, detect_error_in_output(stripped) + errored = detect_error_in_output(stripped) + prompts = tuple(extract_prompt_lines( + stripped, base_ts=last_ts, max_chars=PROMPT_LINE_MAX_CHARS, + )) + return byte_count, errored, prompts def build_session_context( @@ -349,7 +375,7 @@ def build_session_context( typing_bursts = _split_typing_bursts(iats) backspace_count, backspace_iats, kill_line_count = _scan_correction_signals(inputs) commands = _segment_commands(inputs) - commands = _annotate_commands_with_output(commands, outputs) + commands, prompt_lines = _annotate_commands_with_output(commands, outputs) inter_cmd_iats = tuple( max(0.0, commands[i + 1].start_ts - commands[i].end_ts) for i in range(len(commands) - 1) @@ -380,4 +406,5 @@ def build_session_context( backspace_iats=backspace_iats, kill_line_count=kill_line_count, intra_command_iats=intra_command_iats, + prompt_lines=prompt_lines, ) diff --git a/decnet/profiler/behave_shell/_parse.py b/decnet/profiler/behave_shell/_parse.py index 6816b645..5e1b0d5d 100644 --- a/decnet/profiler/behave_shell/_parse.py +++ b/decnet/profiler/behave_shell/_parse.py @@ -74,6 +74,24 @@ class PasteBurst: event_count: int +@dataclass(frozen=True, slots=True) +class PromptLine: + """One PS1 prompt line detected in the output stream. + + PII trade-off (ANTI-authorised at Phase F): ``raw_line`` retains + the ANSI-stripped text of the prompt — hostnames / usernames / + cwd / etc. — because F.1 / F.3 / E.4 read off it. Capped at + ``PROMPT_LINE_MAX_CHARS``. PromptLine instances live on + ``SessionContext.prompt_lines``; only derived primitive values + (``bash`` / ``en-US`` / ``present``) leave the engine. + """ + + ts: float + suffix_char: str # one of $ # % > + raw_line: str # ANSI stripped, capped at PROMPT_LINE_MAX_CHARS + is_root: bool # suffix_char == '#' + + @dataclass(frozen=True, slots=True) class Command: """One command-line invocation, segmented from the input stream. @@ -115,6 +133,7 @@ class Command: pipe_count: int = 0 errored: bool = False output_bytes: int = 0 + followed_by_prompt: bool = False def hash_token(token: str) -> str: @@ -122,6 +141,73 @@ def hash_token(token: str) -> str: return hashlib.sha256(token.encode("utf-8")).hexdigest() +# Prompt-line detection (Step F.0). A prompt line ends with one of +# $/#/%/> followed by a space or end-of-line. The trailing space / +# newline is what tells us this is a *prompt* not just a sentence +# ending in those characters. We require either the space variant or +# the EOL variant to be present right after the suffix. +_PROMPT_LINE_RE = re.compile( + r""" + (?:^|\n) # line start + (?P # capture the prompt line itself + [^\n]*? # any line content (non-greedy) + (?P[$\#%>]) # prompt suffix + \ ? # optional trailing space (PS1 default has it) + ) + (?=\n|\Z) # at end of line / end of buffer + """, + re.VERBOSE, +) + + +def _detect_prompt_suffix(line: str) -> str | None: + """Return the suffix character if ``line`` looks like a PS1 prompt. + + ``line`` is one logical output line, ANSI-stripped, trailing + whitespace included. The discriminating shape: any text ending in + one of ``$ # % >`` optionally followed by a single space. We require + the line to be non-empty and the suffix to be the rightmost + non-whitespace character. + """ + stripped = line.rstrip() + if not stripped: + return None + last = stripped[-1] + return last if last in ("$", "#", "%", ">") else None + + +def extract_prompt_lines( + text: str, + *, + base_ts: float, + max_chars: int, +) -> Iterator[PromptLine]: + """Yield prompt lines detected in ``text`` (already ANSI-stripped). + + All emitted prompts share ``base_ts`` — the caller is responsible + for slicing output by event window before calling. A given output + chunk yields **at most one prompt line** (the trailing one), but + multi-line chunks containing multiple distinct prompts (mid-stream + redraws) yield each. ``raw_line`` is capped at ``max_chars`` and + leading/trailing whitespace stripped (preserving internal layout). + """ + if not text: + return + for raw in text.split("\n"): + suffix = _detect_prompt_suffix(raw) + if suffix is None: + continue + line = raw.strip() + if len(line) > max_chars: + line = line[-max_chars:] + yield PromptLine( + ts=base_ts, + suffix_char=suffix, + raw_line=line, + is_root=(suffix == "#"), + ) + + def parse_shard_line(line: str) -> AsciinemaEvent | None: """Turn one shard JSONL line into an :data:`AsciinemaEvent`. diff --git a/decnet/profiler/behave_shell/_thresholds.py b/decnet/profiler/behave_shell/_thresholds.py index a708f7db..5bb1f808 100644 --- a/decnet/profiler/behave_shell/_thresholds.py +++ b/decnet/profiler/behave_shell/_thresholds.py @@ -218,6 +218,16 @@ LANDING_RITUAL_FIRST_N: int = 5 LANDING_RITUAL_HIT_MIN: int = 2 LANDING_RITUAL_MIN_COMMANDS: int = 3 +# ── F.0 prompt-line detector ────────────────────────────────────────────── +# A prompt line in the output stream ends with one of these characters +# followed by a space or EOL. ``$`` and ``#`` are sh/bash; ``%`` is zsh; +# ``>`` is fish / cmd.exe / powershell (disambiguated by line content +# at F.1 time). Capped at 256 chars to bound memory; ANTI authorised +# retaining PS1 text on ctx (PII relaxation), but a malicious operator +# inflating the prompt buffer is still bounded. +PROMPT_SUFFIX_CHARS: frozenset[str] = frozenset({"$", "#", "%", ">"}) +PROMPT_LINE_MAX_CHARS: int = 256 + # ── motor.keystroke_cadence (Step B.1) ────────────────────────────────────── # Typing bursts split at gaps > IKI_THINK_MAX_S so think-pauses between # commands don't inflate the within-burst CV. Mirrors the prototype's diff --git a/tests/profiler/behave_shell/test_prompt_line_detection.py b/tests/profiler/behave_shell/test_prompt_line_detection.py new file mode 100644 index 00000000..5606e976 --- /dev/null +++ b/tests/profiler/behave_shell/test_prompt_line_detection.py @@ -0,0 +1,142 @@ +"""Step F.0: prompt-line detector. + +The detector is shared infrastructure (no primitive emit). These tests +pin ``PromptLine`` semantics + ``Command.followed_by_prompt`` directly +via ``build_session_context``. F.1 / F.3 / E.4 all depend on these +fields, so any drift here breaks four downstream primitives. +""" +from __future__ import annotations + +from decnet.profiler.behave_shell import extract_session +from decnet.profiler.behave_shell._ctx import build_session_context +from decnet.profiler.behave_shell._parse import ( + AsciinemaEvent, + PromptLine, + extract_prompt_lines, +) + + +def _typed(text: str, t0: float = 0.0, dt: float = 0.05) -> list[AsciinemaEvent]: + return [(t0 + i * dt, "i", c) for i, c in enumerate(text)] + + +# ── extract_prompt_lines ──────────────────────────────────────────────────── + + +def test_bash_prompt_detected() -> None: + lines = list(extract_prompt_lines( + "anti@host:~$ ", base_ts=1.0, max_chars=256, + )) + assert len(lines) == 1 + assert lines[0].suffix_char == "$" + assert lines[0].is_root is False + assert "anti@host" in lines[0].raw_line + + +def test_root_prompt_detected_as_root() -> None: + lines = list(extract_prompt_lines( + "root@host:/etc# ", base_ts=2.0, max_chars=256, + )) + assert len(lines) == 1 + assert lines[0].suffix_char == "#" + assert lines[0].is_root is True + + +def test_zsh_prompt_detected() -> None: + lines = list(extract_prompt_lines( + "host% ", base_ts=3.0, max_chars=256, + )) + assert len(lines) == 1 + assert lines[0].suffix_char == "%" + + +def test_powershell_prompt_detected() -> None: + lines = list(extract_prompt_lines( + "PS C:\\Users\\anti> ", base_ts=4.0, max_chars=256, + )) + assert len(lines) == 1 + assert lines[0].suffix_char == ">" + assert "PS " in lines[0].raw_line + + +def test_clean_output_no_prompt() -> None: + lines = list(extract_prompt_lines( + "file1\nfile2\nfile3\n", base_ts=5.0, max_chars=256, + )) + assert lines == [] + + +def test_long_prompt_capped_to_max_chars() -> None: + long = "x" * 500 + "$ " + lines = list(extract_prompt_lines(long, base_ts=6.0, max_chars=256)) + assert len(lines) == 1 + assert len(lines[0].raw_line) <= 256 + assert lines[0].suffix_char == "$" + + +def test_multi_line_output_with_trailing_prompt() -> None: + """Mid-stream output then trailing prompt → one prompt detected.""" + text = "total 12\ndrwxr-xr-x user 4096 May 4 .\nanti@host:~$ " + lines = list(extract_prompt_lines(text, base_ts=7.0, max_chars=256)) + assert len(lines) == 1 + assert lines[0].suffix_char == "$" + + +def test_ansi_wrapped_prompt_detected_after_strip() -> None: + """ANSI-coloured prompt → still detected (strip happens inside _output_window).""" + events: list[AsciinemaEvent] = [ + *_typed("ls\r", t0=0.0), + (0.20, "o", "file1\n"), + (0.30, "o", "\x1b[1;32manti@host\x1b[0m:\x1b[34m~\x1b[0m$ "), + ] + ctx = build_session_context(events, sid="prompt-ansi", source="test") + assert len(ctx.prompt_lines) == 1 + assert ctx.prompt_lines[0].suffix_char == "$" + + +# ── SessionContext.prompt_lines + Command.followed_by_prompt ──────────────── + + +def test_no_output_no_prompts() -> None: + events = _typed("ls\r", t0=0.0) + ctx = build_session_context(events, sid="prompt-empty", source="test") + assert ctx.prompt_lines == () + assert ctx.commands[0].followed_by_prompt is False + + +def test_command_followed_by_prompt_marks_field() -> None: + events: list[AsciinemaEvent] = [ + *_typed("ls\r", t0=0.0), + (0.20, "o", "file1\nanti@host:~$ "), + ] + ctx = build_session_context(events, sid="prompt-followed", source="test") + assert ctx.commands[0].followed_by_prompt is True + assert len(ctx.prompt_lines) == 1 + + +def test_last_command_no_trailing_prompt() -> None: + """Two commands, only the first has a trailing prompt.""" + events: list[AsciinemaEvent] = [ + *_typed("ls\r", t0=0.0), + (0.20, "o", "file1\nanti@host:~$ "), + *_typed("foo\r", t0=1.0), + (1.20, "o", "bash: foo: command not found\n"), + ] + ctx = build_session_context(events, sid="prompt-mid", source="test") + assert len(ctx.commands) == 2 + assert ctx.commands[0].followed_by_prompt is True + assert ctx.commands[1].followed_by_prompt is False + + +# ── PII regression ────────────────────────────────────────────────────────── + + +def test_pii_prompt_text_does_not_leak_to_observations() -> None: + """PromptLine.raw_line lives on ctx, never in observation JSON.""" + events: list[AsciinemaEvent] = [ + *_typed("ls\r", t0=0.0), + (0.20, "o", "file1\nsecret-host-name@internal:~$ "), + ] + out = list(extract_session(events, sid="prompt-pii")) + for obs in out: + assert "secret-host-name" not in obs.model_dump_json()