feat(profiler/behave_shell): emit environmental.shell_type
Per-prompt classification mode over ctx.prompt_lines. $/# → bash; % → zsh; > with 'PS ' prefix → powershell; > with 'C:\' substring → cmd.exe; > otherwise → fish. New _features/environmental.py module opens Phase F.
This commit is contained in:
@@ -24,6 +24,9 @@ from decnet.profiler.behave_shell._features.cognitive import (
|
||||
inter_command_consistency,
|
||||
inter_command_latency_class,
|
||||
)
|
||||
from decnet.profiler.behave_shell._features.environmental import (
|
||||
shell_type,
|
||||
)
|
||||
from decnet.profiler.behave_shell._features.temporal import (
|
||||
escalation_pattern,
|
||||
landing_ritual,
|
||||
@@ -67,4 +70,5 @@ FEATURES: tuple[FeatureFn, ...] = (
|
||||
session_duration,
|
||||
escalation_pattern,
|
||||
landing_ritual,
|
||||
shell_type,
|
||||
)
|
||||
|
||||
75
decnet/profiler/behave_shell/_features/environmental.py
Normal file
75
decnet/profiler/behave_shell/_features/environmental.py
Normal file
@@ -0,0 +1,75 @@
|
||||
"""``environmental.*`` feature functions.
|
||||
|
||||
Phase F ships the five environmental primitives plus F.0's shared
|
||||
prompt-line detector. F.0 itself emits no primitive — it populates
|
||||
``SessionContext.prompt_lines`` and ``Command.followed_by_prompt``
|
||||
which F.1 / F.3 / E.4 read.
|
||||
|
||||
Step F.1: ``environmental.shell_type``.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import collections
|
||||
from typing import Iterator
|
||||
|
||||
from decnet_behave_core.spec.envelope import Observation
|
||||
|
||||
from decnet.profiler.behave_shell._ctx import SessionContext
|
||||
from decnet.profiler.behave_shell._features._emit import make_observation
|
||||
from decnet.profiler.behave_shell._parse import PromptLine
|
||||
from decnet.profiler.behave_shell._thresholds import (
|
||||
SHELL_TYPE_MIN_PROMPTS,
|
||||
)
|
||||
|
||||
|
||||
def _classify_shell_from_prompt(p: PromptLine) -> str:
|
||||
"""Map one prompt line to a shell-type label."""
|
||||
suffix = p.suffix_char
|
||||
line = p.raw_line
|
||||
if suffix in ("$", "#"):
|
||||
# bash / sh / dash all share these — collapsed to "bash" per
|
||||
# registry's bash-family stance. zsh CAN be configured to use
|
||||
# $/# but that's the user's PS1 override; default zsh is %.
|
||||
return "bash"
|
||||
if suffix == "%":
|
||||
return "zsh"
|
||||
if suffix == ">":
|
||||
# Disambiguate by line content. powershell's PS1 starts with
|
||||
# "PS "; cmd.exe's prompt typically contains a Windows path
|
||||
# like "C:\". Everything else is fish.
|
||||
if line.lstrip().startswith("PS "):
|
||||
return "powershell"
|
||||
if "C:\\" in line or "c:\\" in line:
|
||||
return "cmd.exe"
|
||||
return "fish"
|
||||
return "bash" # defensive — _detect_prompt_suffix only emits one of $#%>
|
||||
|
||||
|
||||
def shell_type(ctx: SessionContext) -> Iterator[Observation]:
|
||||
"""Emit ``environmental.shell_type``.
|
||||
|
||||
Mode of per-prompt-line classification across
|
||||
``ctx.prompt_lines``. Skip emission when no prompts detected —
|
||||
the registry's enum doesn't admit ``unknown`` and emitting
|
||||
``bash`` from no observation at all would be dishonest.
|
||||
|
||||
Confidence drops below ``SHELL_TYPE_MIN_PROMPTS`` (3 prompts);
|
||||
above that threshold the vote is solid.
|
||||
"""
|
||||
if not ctx.prompt_lines:
|
||||
return
|
||||
votes = collections.Counter(
|
||||
_classify_shell_from_prompt(p) for p in ctx.prompt_lines
|
||||
)
|
||||
value, _ = votes.most_common(1)[0]
|
||||
|
||||
if len(ctx.prompt_lines) < SHELL_TYPE_MIN_PROMPTS:
|
||||
confidence = 0.40
|
||||
else:
|
||||
confidence = 0.75
|
||||
yield make_observation(
|
||||
ctx,
|
||||
primitive="environmental.shell_type",
|
||||
value=value,
|
||||
confidence=confidence,
|
||||
)
|
||||
@@ -228,6 +228,11 @@ LANDING_RITUAL_MIN_COMMANDS: int = 3
|
||||
PROMPT_SUFFIX_CHARS: frozenset[str] = frozenset({"$", "#", "%", ">"})
|
||||
PROMPT_LINE_MAX_CHARS: int = 256
|
||||
|
||||
# ── environmental.shell_type (Step F.1) ────────────────────────────────────
|
||||
# Below this many detected prompt-lines, drop confidence (sample-size
|
||||
# honesty). Above, the shell-type vote is robust.
|
||||
SHELL_TYPE_MIN_PROMPTS: int = 3
|
||||
|
||||
# ── motor.keystroke_cadence (Step B.1) ──────────────────────────────────────
|
||||
# Typing bursts split at gaps > IKI_THINK_MAX_S so think-pauses between
|
||||
# commands don't inflate the within-burst CV. Mirrors the prototype's
|
||||
|
||||
87
tests/profiler/behave_shell/test_environmental_shell_type.py
Normal file
87
tests/profiler/behave_shell/test_environmental_shell_type.py
Normal file
@@ -0,0 +1,87 @@
|
||||
"""Step F.1: ``environmental.shell_type``."""
|
||||
from __future__ import annotations
|
||||
|
||||
from decnet.profiler.behave_shell import extract_session
|
||||
from decnet.profiler.behave_shell._parse import AsciinemaEvent
|
||||
|
||||
|
||||
PRIMITIVE = "environmental.shell_type"
|
||||
|
||||
|
||||
def _of(observations: list, primitive: str):
|
||||
obs = [o for o in observations if o.primitive == primitive]
|
||||
assert len(obs) == 1, f"expected exactly one {primitive}, got {len(obs)}"
|
||||
return obs[0]
|
||||
|
||||
|
||||
def _typed(text: str, t0: float = 0.0, dt: float = 0.05) -> list[AsciinemaEvent]:
|
||||
return [(t0 + i * dt, "i", c) for i, c in enumerate(text)]
|
||||
|
||||
|
||||
def _session(prompt_lines: list[str]) -> list[AsciinemaEvent]:
|
||||
"""Build a synthetic session: one ``ls`` per prompt, prompt printed
|
||||
as the post-execution output of that command."""
|
||||
events: list[AsciinemaEvent] = []
|
||||
for i, prompt in enumerate(prompt_lines):
|
||||
events.extend(_typed("ls\r", t0=i * 1.0))
|
||||
events.append((i * 1.0 + 0.5, "o", f"out\n{prompt}"))
|
||||
return events
|
||||
|
||||
|
||||
def test_no_prompts_no_emission() -> None:
|
||||
events = _typed("ls\r", t0=0.0) + [(0.5, "o", "file1\n")]
|
||||
out = list(extract_session(events, sid="sht-noprompt"))
|
||||
assert [o for o in out if o.primitive == PRIMITIVE] == []
|
||||
|
||||
|
||||
def test_bash_prompt_emits_bash() -> None:
|
||||
out = list(extract_session(_session(["anti@host:~$ "] * 5), sid="sht-bash"))
|
||||
assert _of(out, PRIMITIVE).value == "bash"
|
||||
|
||||
|
||||
def test_root_prompt_still_bash() -> None:
|
||||
"""# is bash root, not a separate shell."""
|
||||
out = list(extract_session(_session(["root@host:/etc# "] * 5), sid="sht-root"))
|
||||
assert _of(out, PRIMITIVE).value == "bash"
|
||||
|
||||
|
||||
def test_zsh_prompt_emits_zsh() -> None:
|
||||
out = list(extract_session(_session(["host% "] * 5), sid="sht-zsh"))
|
||||
assert _of(out, PRIMITIVE).value == "zsh"
|
||||
|
||||
|
||||
def test_fish_prompt_emits_fish() -> None:
|
||||
out = list(extract_session(_session(["anti@host ~> "] * 5), sid="sht-fish"))
|
||||
assert _of(out, PRIMITIVE).value == "fish"
|
||||
|
||||
|
||||
def test_powershell_prompt_emits_powershell() -> None:
|
||||
out = list(extract_session(
|
||||
_session(["PS C:\\Users\\anti> "] * 5), sid="sht-ps",
|
||||
))
|
||||
assert _of(out, PRIMITIVE).value == "powershell"
|
||||
|
||||
|
||||
def test_cmd_exe_prompt_emits_cmd_exe() -> None:
|
||||
out = list(extract_session(_session(["C:\\Users\\anti>"] * 5), sid="sht-cmd"))
|
||||
assert _of(out, PRIMITIVE).value == "cmd.exe"
|
||||
|
||||
|
||||
def test_majority_wins() -> None:
|
||||
"""Mixed prompts, bash majority → bash."""
|
||||
out = list(extract_session(_session([
|
||||
"anti@host:~$ ",
|
||||
"anti@host:~$ ",
|
||||
"anti@host:~$ ",
|
||||
"host% ", # one zsh outlier
|
||||
"anti@host:~$ ",
|
||||
]), sid="sht-mix"))
|
||||
assert _of(out, PRIMITIVE).value == "bash"
|
||||
|
||||
|
||||
def test_few_prompts_low_confidence() -> None:
|
||||
short = list(extract_session(_session(["anti@host:~$ "] * 2), sid="sht-short"))
|
||||
full = list(extract_session(_session(["anti@host:~$ "] * 6), sid="sht-full"))
|
||||
s = _of(short, PRIMITIVE)
|
||||
f = _of(full, PRIMITIVE)
|
||||
assert s.confidence < f.confidence
|
||||
Reference in New Issue
Block a user