feat(profiler/behave_shell): F.0 prompt-line detector

Adds PromptLine dataclass + extract_prompt_lines() helper. PromptLine
carries ts, suffix_char ($/#/%/>), raw_line (ANSI-stripped, capped),
is_root flag. Populated during the existing single-pass output-window
walk; SessionContext gains prompt_lines, Command gains
followed_by_prompt.

PII trade-off (ANTI-authorised at Phase F): PS1 text retained on ctx
so F.1 / F.3 / E.4 can read it. Capped at PROMPT_LINE_MAX_CHARS=256.
Observations still only carry derived primitive values.

D.0's regex error helpers stay alongside (NOT subsumed) — they fire
even when PS1 echo is suppressed. F.0 enriches D.0 rather than
replacing it.
This commit is contained in:
2026-05-04 00:29:08 -04:00
parent b7534c311a
commit 1ff02f0c77
4 changed files with 280 additions and 15 deletions

View File

@@ -18,7 +18,9 @@ from decnet.profiler.behave_shell._parse import (
AsciinemaEvent,
Command,
PasteBurst,
PromptLine,
detect_error_in_output,
extract_prompt_lines,
hash_token,
strip_ansi,
)
@@ -26,6 +28,7 @@ from decnet.profiler.behave_shell._thresholds import (
IKI_THINK_MAX_S,
PASTE_BURST_MAX_IAT_S,
PASTE_MIN_CHARS_PER_EVENT,
PROMPT_LINE_MAX_CHARS,
SHORTCUT_CTRL_BYTES,
)
@@ -63,6 +66,9 @@ class SessionContext:
# Step B.4 derivations — per-command intra-typing IATs
intra_command_iats: tuple[tuple[float, ...], ...] = field(default_factory=tuple)
# Step F.0 derivations — PS1 prompt lines detected in the output stream
prompt_lines: tuple[PromptLine, ...] = field(default_factory=tuple)
def _detect_paste_bursts(
inputs: list[AsciinemaEvent],
@@ -225,8 +231,14 @@ def _segment_commands(inputs: list[AsciinemaEvent]) -> tuple[Command, ...]:
def _annotate_commands_with_output(
commands: tuple[Command, ...],
outputs: list[AsciinemaEvent],
) -> tuple[Command, ...]:
"""Re-emit ``commands`` with ``errored`` / ``output_bytes`` filled.
) -> tuple[tuple[Command, ...], tuple[PromptLine, ...]]:
"""Re-emit ``commands`` with output-derived fields filled.
Returns ``(commands, prompt_lines)``. Each ``Command`` gains
``errored``, ``output_bytes``, and ``followed_by_prompt`` (Step
F.0). The flattened tuple of all detected ``PromptLine`` instances
across every command's window is returned alongside for the caller
to install on ``SessionContext.prompt_lines``.
The output window for ``commands[i]`` spans from its ``end_ts``
(the ``\\r``/``\\n`` that ran it) to the ``start_ts`` of the next
@@ -234,11 +246,13 @@ def _annotate_commands_with_output(
so output events arriving at or after ``t_end`` are still captured.
"""
if not commands:
return commands
return commands, ()
annotated: list[Command] = []
all_prompts: list[PromptLine] = []
for i, cmd in enumerate(commands):
win_end = commands[i + 1].start_ts if i + 1 < len(commands) else math.inf
byte_count, errored = _output_window(outputs, cmd.end_ts, win_end)
byte_count, errored, prompts = _output_window(outputs, cmd.end_ts, win_end)
all_prompts.extend(prompts)
annotated.append(Command(
start_ts=cmd.start_ts,
end_ts=cmd.end_ts,
@@ -248,8 +262,9 @@ def _annotate_commands_with_output(
pipe_count=cmd.pipe_count,
errored=errored,
output_bytes=byte_count,
followed_by_prompt=bool(prompts),
))
return tuple(annotated)
return tuple(annotated), tuple(all_prompts)
def _per_command_iats(
@@ -289,26 +304,37 @@ def _output_window(
outputs: list[AsciinemaEvent],
start: float,
end: float,
) -> tuple[int, bool]:
) -> tuple[int, bool, tuple[PromptLine, ...]]:
"""Walk output events in ``[start, end)`` once.
Returns ``(byte_count, errored)``. ``byte_count`` is the raw byte
count (pre-strip); ``errored`` is the canonical-error-pattern match
over the ANSI-stripped concatenation. The stripped text is dropped
on return — PII discipline: only an int and a bool leave this
helper. The full output bytes never enter ``Command`` or the
``SessionContext``.
Returns ``(byte_count, errored, prompt_lines)``. ``byte_count`` is
the raw byte count (pre-strip); ``errored`` is the canonical-error
-pattern match over the ANSI-stripped concatenation;
``prompt_lines`` is the tuple of PS1 lines detected in the same
stripped text (Step F.0).
PII trade-off (Phase F): the stripped text itself is dropped on
return, but ``prompt_lines`` retains PS1 strings (capped at
``PROMPT_LINE_MAX_CHARS``). Only derived values leave the engine
via observations; the prompt strings live on ``SessionContext``
so F.1 / F.3 / E.4 can read them.
"""
chunks: list[str] = []
last_ts = start
byte_count = 0
for t, _k, d in outputs:
if start <= t < end:
byte_count += len(d)
chunks.append(d)
last_ts = t
if not chunks:
return 0, False
return 0, False, ()
stripped = strip_ansi("".join(chunks))
return byte_count, detect_error_in_output(stripped)
errored = detect_error_in_output(stripped)
prompts = tuple(extract_prompt_lines(
stripped, base_ts=last_ts, max_chars=PROMPT_LINE_MAX_CHARS,
))
return byte_count, errored, prompts
def build_session_context(
@@ -349,7 +375,7 @@ def build_session_context(
typing_bursts = _split_typing_bursts(iats)
backspace_count, backspace_iats, kill_line_count = _scan_correction_signals(inputs)
commands = _segment_commands(inputs)
commands = _annotate_commands_with_output(commands, outputs)
commands, prompt_lines = _annotate_commands_with_output(commands, outputs)
inter_cmd_iats = tuple(
max(0.0, commands[i + 1].start_ts - commands[i].end_ts)
for i in range(len(commands) - 1)
@@ -380,4 +406,5 @@ def build_session_context(
backspace_iats=backspace_iats,
kill_line_count=kill_line_count,
intra_command_iats=intra_command_iats,
prompt_lines=prompt_lines,
)

View File

@@ -74,6 +74,24 @@ class PasteBurst:
event_count: int
@dataclass(frozen=True, slots=True)
class PromptLine:
"""One PS1 prompt line detected in the output stream.
PII trade-off (ANTI-authorised at Phase F): ``raw_line`` retains
the ANSI-stripped text of the prompt — hostnames / usernames /
cwd / etc. — because F.1 / F.3 / E.4 read off it. Capped at
``PROMPT_LINE_MAX_CHARS``. PromptLine instances live on
``SessionContext.prompt_lines``; only derived primitive values
(``bash`` / ``en-US`` / ``present``) leave the engine.
"""
ts: float
suffix_char: str # one of $ # % >
raw_line: str # ANSI stripped, capped at PROMPT_LINE_MAX_CHARS
is_root: bool # suffix_char == '#'
@dataclass(frozen=True, slots=True)
class Command:
"""One command-line invocation, segmented from the input stream.
@@ -115,6 +133,7 @@ class Command:
pipe_count: int = 0
errored: bool = False
output_bytes: int = 0
followed_by_prompt: bool = False
def hash_token(token: str) -> str:
@@ -122,6 +141,73 @@ def hash_token(token: str) -> str:
return hashlib.sha256(token.encode("utf-8")).hexdigest()
# Prompt-line detection (Step F.0). A prompt line ends with one of
# $/#/%/> followed by a space or end-of-line. The trailing space /
# newline is what tells us this is a *prompt* not just a sentence
# ending in those characters. We require either the space variant or
# the EOL variant to be present right after the suffix.
_PROMPT_LINE_RE = re.compile(
r"""
(?:^|\n) # line start
(?P<line> # capture the prompt line itself
[^\n]*? # any line content (non-greedy)
(?P<suffix>[$\#%>]) # prompt suffix
\ ? # optional trailing space (PS1 default has it)
)
(?=\n|\Z) # at end of line / end of buffer
""",
re.VERBOSE,
)
def _detect_prompt_suffix(line: str) -> str | None:
"""Return the suffix character if ``line`` looks like a PS1 prompt.
``line`` is one logical output line, ANSI-stripped, trailing
whitespace included. The discriminating shape: any text ending in
one of ``$ # % >`` optionally followed by a single space. We require
the line to be non-empty and the suffix to be the rightmost
non-whitespace character.
"""
stripped = line.rstrip()
if not stripped:
return None
last = stripped[-1]
return last if last in ("$", "#", "%", ">") else None
def extract_prompt_lines(
text: str,
*,
base_ts: float,
max_chars: int,
) -> Iterator[PromptLine]:
"""Yield prompt lines detected in ``text`` (already ANSI-stripped).
All emitted prompts share ``base_ts`` — the caller is responsible
for slicing output by event window before calling. A given output
chunk yields **at most one prompt line** (the trailing one), but
multi-line chunks containing multiple distinct prompts (mid-stream
redraws) yield each. ``raw_line`` is capped at ``max_chars`` and
leading/trailing whitespace stripped (preserving internal layout).
"""
if not text:
return
for raw in text.split("\n"):
suffix = _detect_prompt_suffix(raw)
if suffix is None:
continue
line = raw.strip()
if len(line) > max_chars:
line = line[-max_chars:]
yield PromptLine(
ts=base_ts,
suffix_char=suffix,
raw_line=line,
is_root=(suffix == "#"),
)
def parse_shard_line(line: str) -> AsciinemaEvent | None:
"""Turn one shard JSONL line into an :data:`AsciinemaEvent`.

View File

@@ -218,6 +218,16 @@ LANDING_RITUAL_FIRST_N: int = 5
LANDING_RITUAL_HIT_MIN: int = 2
LANDING_RITUAL_MIN_COMMANDS: int = 3
# ── F.0 prompt-line detector ──────────────────────────────────────────────
# A prompt line in the output stream ends with one of these characters
# followed by a space or EOL. ``$`` and ``#`` are sh/bash; ``%`` is zsh;
# ``>`` is fish / cmd.exe / powershell (disambiguated by line content
# at F.1 time). Capped at 256 chars to bound memory; ANTI authorised
# retaining PS1 text on ctx (PII relaxation), but a malicious operator
# inflating the prompt buffer is still bounded.
PROMPT_SUFFIX_CHARS: frozenset[str] = frozenset({"$", "#", "%", ">"})
PROMPT_LINE_MAX_CHARS: int = 256
# ── motor.keystroke_cadence (Step B.1) ──────────────────────────────────────
# Typing bursts split at gaps > IKI_THINK_MAX_S so think-pauses between
# commands don't inflate the within-burst CV. Mirrors the prototype's

View File

@@ -0,0 +1,142 @@
"""Step F.0: prompt-line detector.
The detector is shared infrastructure (no primitive emit). These tests
pin ``PromptLine`` semantics + ``Command.followed_by_prompt`` directly
via ``build_session_context``. F.1 / F.3 / E.4 all depend on these
fields, so any drift here breaks four downstream primitives.
"""
from __future__ import annotations
from decnet.profiler.behave_shell import extract_session
from decnet.profiler.behave_shell._ctx import build_session_context
from decnet.profiler.behave_shell._parse import (
AsciinemaEvent,
PromptLine,
extract_prompt_lines,
)
def _typed(text: str, t0: float = 0.0, dt: float = 0.05) -> list[AsciinemaEvent]:
return [(t0 + i * dt, "i", c) for i, c in enumerate(text)]
# ── extract_prompt_lines ────────────────────────────────────────────────────
def test_bash_prompt_detected() -> None:
lines = list(extract_prompt_lines(
"anti@host:~$ ", base_ts=1.0, max_chars=256,
))
assert len(lines) == 1
assert lines[0].suffix_char == "$"
assert lines[0].is_root is False
assert "anti@host" in lines[0].raw_line
def test_root_prompt_detected_as_root() -> None:
lines = list(extract_prompt_lines(
"root@host:/etc# ", base_ts=2.0, max_chars=256,
))
assert len(lines) == 1
assert lines[0].suffix_char == "#"
assert lines[0].is_root is True
def test_zsh_prompt_detected() -> None:
lines = list(extract_prompt_lines(
"host% ", base_ts=3.0, max_chars=256,
))
assert len(lines) == 1
assert lines[0].suffix_char == "%"
def test_powershell_prompt_detected() -> None:
lines = list(extract_prompt_lines(
"PS C:\\Users\\anti> ", base_ts=4.0, max_chars=256,
))
assert len(lines) == 1
assert lines[0].suffix_char == ">"
assert "PS " in lines[0].raw_line
def test_clean_output_no_prompt() -> None:
lines = list(extract_prompt_lines(
"file1\nfile2\nfile3\n", base_ts=5.0, max_chars=256,
))
assert lines == []
def test_long_prompt_capped_to_max_chars() -> None:
long = "x" * 500 + "$ "
lines = list(extract_prompt_lines(long, base_ts=6.0, max_chars=256))
assert len(lines) == 1
assert len(lines[0].raw_line) <= 256
assert lines[0].suffix_char == "$"
def test_multi_line_output_with_trailing_prompt() -> None:
"""Mid-stream output then trailing prompt → one prompt detected."""
text = "total 12\ndrwxr-xr-x user 4096 May 4 .\nanti@host:~$ "
lines = list(extract_prompt_lines(text, base_ts=7.0, max_chars=256))
assert len(lines) == 1
assert lines[0].suffix_char == "$"
def test_ansi_wrapped_prompt_detected_after_strip() -> None:
"""ANSI-coloured prompt → still detected (strip happens inside _output_window)."""
events: list[AsciinemaEvent] = [
*_typed("ls\r", t0=0.0),
(0.20, "o", "file1\n"),
(0.30, "o", "\x1b[1;32manti@host\x1b[0m:\x1b[34m~\x1b[0m$ "),
]
ctx = build_session_context(events, sid="prompt-ansi", source="test")
assert len(ctx.prompt_lines) == 1
assert ctx.prompt_lines[0].suffix_char == "$"
# ── SessionContext.prompt_lines + Command.followed_by_prompt ────────────────
def test_no_output_no_prompts() -> None:
events = _typed("ls\r", t0=0.0)
ctx = build_session_context(events, sid="prompt-empty", source="test")
assert ctx.prompt_lines == ()
assert ctx.commands[0].followed_by_prompt is False
def test_command_followed_by_prompt_marks_field() -> None:
events: list[AsciinemaEvent] = [
*_typed("ls\r", t0=0.0),
(0.20, "o", "file1\nanti@host:~$ "),
]
ctx = build_session_context(events, sid="prompt-followed", source="test")
assert ctx.commands[0].followed_by_prompt is True
assert len(ctx.prompt_lines) == 1
def test_last_command_no_trailing_prompt() -> None:
"""Two commands, only the first has a trailing prompt."""
events: list[AsciinemaEvent] = [
*_typed("ls\r", t0=0.0),
(0.20, "o", "file1\nanti@host:~$ "),
*_typed("foo\r", t0=1.0),
(1.20, "o", "bash: foo: command not found\n"),
]
ctx = build_session_context(events, sid="prompt-mid", source="test")
assert len(ctx.commands) == 2
assert ctx.commands[0].followed_by_prompt is True
assert ctx.commands[1].followed_by_prompt is False
# ── PII regression ──────────────────────────────────────────────────────────
def test_pii_prompt_text_does_not_leak_to_observations() -> None:
"""PromptLine.raw_line lives on ctx, never in observation JSON."""
events: list[AsciinemaEvent] = [
*_typed("ls\r", t0=0.0),
(0.20, "o", "file1\nsecret-host-name@internal:~$ "),
]
out = list(extract_session(events, sid="prompt-pii"))
for obs in out:
assert "secret-host-name" not in obs.model_dump_json()