diff --git a/decnet/profiler/behave_shell/_features/__init__.py b/decnet/profiler/behave_shell/_features/__init__.py index f35d9f36..88d47254 100644 --- a/decnet/profiler/behave_shell/_features/__init__.py +++ b/decnet/profiler/behave_shell/_features/__init__.py @@ -33,6 +33,7 @@ from decnet.profiler.behave_shell._features.environmental import ( ) from decnet.profiler.behave_shell._features.temporal import ( escalation_pattern, + exit_behavior, landing_ritual, session_duration, ) @@ -74,6 +75,7 @@ FEATURES: tuple[FeatureFn, ...] = ( session_duration, escalation_pattern, landing_ritual, + exit_behavior, shell_type, terminal_multiplexer, locale, diff --git a/decnet/profiler/behave_shell/_features/temporal.py b/decnet/profiler/behave_shell/_features/temporal.py index 91a9476f..70be329f 100644 --- a/decnet/profiler/behave_shell/_features/temporal.py +++ b/decnet/profiler/behave_shell/_features/temporal.py @@ -8,6 +8,7 @@ and computed by the attribution engine, not the extractor. Step E.1: ``temporal.session_duration``. Step E.2: ``temporal.escalation_pattern``. Step E.3: ``temporal.lifecycle_markers.landing_ritual``. +Step E.4: ``temporal.lifecycle_markers.exit_behavior`` (unblocked by F.0). """ from __future__ import annotations @@ -28,6 +29,7 @@ from decnet.profiler.behave_shell._thresholds import ( ESCALATION_SUSTAINED_CV, ESCALATION_WINDOW_MIN_S, ESCALATION_WINDOW_TARGET, + EXIT_BEHAVIOR_LOOKBACK_K, LANDING_RITUAL_FIRST_N, LANDING_RITUAL_HIT_MIN, LANDING_RITUAL_MIN_COMMANDS, @@ -37,6 +39,25 @@ from decnet.profiler.behave_shell._thresholds import ( ) +# Precomputed at import time. ``graceful`` is operator-typed shutdown; +# ``cleanup`` is the wipe-tracks vocabulary. Both expand to v0.2 once +# the corpus shows what gets missed. +_GRACEFUL_EXIT_HASHES: frozenset[str] = frozenset({ + hash_token("exit"), + hash_token("logout"), + hash_token("quit"), + hash_token("logoff"), +}) +_CLEANUP_TOKEN_HASHES: frozenset[str] = frozenset({ + hash_token("history"), + hash_token("unset"), + hash_token("rm"), + hash_token("shred"), + hash_token("clear"), + hash_token("kill"), +}) + + # Precomputed at import time so the per-session check is a set lookup, # not 7 sha256 ops per session. The recon-survey vocabulary an attacker # (or scripted runner) typically opens with on a freshly-landed shell. @@ -166,3 +187,51 @@ def landing_ritual(ctx: SessionContext) -> Iterator[Observation]: value=value, confidence=confidence, ) + + +def exit_behavior(ctx: SessionContext) -> Iterator[Observation]: + """Emit ``temporal.lifecycle_markers.exit_behavior`` ∈ {graceful, abrupt, cleanup}. + + Resolution of the E.4 hold from Phase E. Now that F.0's + ``Command.followed_by_prompt`` gives us prompt-after-last-command + visibility — the exit-code proxy we couldn't get in Phase E: + + 1. Last command **lacks** a trailing prompt → ``abrupt`` (session + cut mid-output, custom PS1 swallowing, or genuinely interrupted). + 2. Last command's first_token_hash ∈ ``_GRACEFUL_EXIT_HASHES`` + (``exit`` / ``logout`` / ``quit`` / ``logoff``) → ``graceful``. + 3. Any of the last ``EXIT_BEHAVIOR_LOOKBACK_K`` (3) commands' + first_token_hash ∈ ``_CLEANUP_TOKEN_HASHES`` (``history`` / + ``unset`` / ``rm`` / ``shred`` / ``clear`` / ``kill``) → + ``cleanup``. + 4. Else → ``graceful`` (clean Ctrl-D / window close). + + Skip emission when no commands. + + Confidence 0.65 when the trailing prompt is clear; 0.45 for + ``abrupt`` (a custom PS1 suppressing prompt echo could also yield + ``followed_by_prompt=False``). + """ + if not ctx.commands: + return + last = ctx.commands[-1] + if not last.followed_by_prompt: + value = "abrupt" + confidence = 0.45 + elif last.first_token_hash in _GRACEFUL_EXIT_HASHES: + value = "graceful" + confidence = 0.65 + else: + tail = ctx.commands[-EXIT_BEHAVIOR_LOOKBACK_K:] + if any(c.first_token_hash in _CLEANUP_TOKEN_HASHES for c in tail): + value = "cleanup" + confidence = 0.65 + else: + value = "graceful" + confidence = 0.65 + yield make_observation( + ctx, + primitive="temporal.lifecycle_markers.exit_behavior", + value=value, + confidence=confidence, + ) diff --git a/decnet/profiler/behave_shell/_thresholds.py b/decnet/profiler/behave_shell/_thresholds.py index 0d88b046..462420d2 100644 --- a/decnet/profiler/behave_shell/_thresholds.py +++ b/decnet/profiler/behave_shell/_thresholds.py @@ -272,6 +272,10 @@ NUMPAD_RUN_MIN: int = 4 # Below this many typed chars total, skip emission (no honest signal). NUMPAD_MIN_TYPED_CHARS: int = 50 +# ── temporal.lifecycle_markers.exit_behavior (Step E.4, unblocked by F.0) ── +# How many of the last commands to inspect for cleanup-family tokens. +EXIT_BEHAVIOR_LOOKBACK_K: int = 3 + # ── motor.keystroke_cadence (Step B.1) ────────────────────────────────────── # Typing bursts split at gaps > IKI_THINK_MAX_S so think-pauses between # commands don't inflate the within-burst CV. Mirrors the prototype's diff --git a/tests/profiler/behave_shell/test_temporal_exit_behavior.py b/tests/profiler/behave_shell/test_temporal_exit_behavior.py new file mode 100644 index 00000000..1c1c0388 --- /dev/null +++ b/tests/profiler/behave_shell/test_temporal_exit_behavior.py @@ -0,0 +1,79 @@ +"""Step E.4: ``temporal.lifecycle_markers.exit_behavior`` (unblocked by F.0).""" +from __future__ import annotations + +from decnet.profiler.behave_shell import extract_session +from decnet.profiler.behave_shell._parse import AsciinemaEvent + + +PRIMITIVE = "temporal.lifecycle_markers.exit_behavior" + + +def _of(observations: list, primitive: str): + obs = [o for o in observations if o.primitive == primitive] + assert len(obs) == 1, f"expected exactly one {primitive}, got {len(obs)}" + return obs[0] + + +def _typed(text: str, t0: float = 0.0, dt: float = 0.05) -> list[AsciinemaEvent]: + return [(t0 + i * dt, "i", c) for i, c in enumerate(text)] + + +def _cmd(token: str, t0: float, *, with_prompt: bool = True) -> list[AsciinemaEvent]: + """Emit one command + (optionally) a trailing prompt-line.""" + events = _typed(f"{token}\r", t0=t0) + cmd_end = t0 + len(token) * 0.05 + if with_prompt: + events.append((cmd_end + 0.10, "o", "out\nanti@host:~$ ")) + else: + events.append((cmd_end + 0.10, "o", "out\n")) + return events + + +def test_no_commands_no_emission() -> None: + out = list(extract_session([(0.0, "i", "x")], sid="ex-empty")) + assert [o for o in out if o.primitive == PRIMITIVE] == [] + + +def test_last_command_no_prompt_emits_abrupt() -> None: + """Session cut mid-output → no trailing prompt → abrupt.""" + events = _cmd("ls", t0=0.0) + _cmd("foo", t0=1.0, with_prompt=False) + obs = _of(list(extract_session(events, sid="ex-abrupt")), PRIMITIVE) + assert obs.value == "abrupt" + + +def test_explicit_exit_token_emits_graceful() -> None: + events = _cmd("ls", t0=0.0) + _cmd("exit", t0=1.0) + obs = _of(list(extract_session(events, sid="ex-graceful")), PRIMITIVE) + assert obs.value == "graceful" + + +def test_logout_token_emits_graceful() -> None: + events = _cmd("ls", t0=0.0) + _cmd("logout", t0=1.0) + obs = _of(list(extract_session(events, sid="ex-logout")), PRIMITIVE) + assert obs.value == "graceful" + + +def test_cleanup_token_in_tail_emits_cleanup() -> None: + """Last few commands include cleanup vocabulary → cleanup.""" + events = ( + _cmd("ls", t0=0.0) + + _cmd("cat", t0=1.0) + + _cmd("history", t0=2.0) # cleanup-family token in tail + ) + obs = _of(list(extract_session(events, sid="ex-cleanup")), PRIMITIVE) + assert obs.value == "cleanup" + + +def test_clean_session_with_prompt_emits_graceful() -> None: + """Trailing prompt + no exit/cleanup tokens → graceful (Ctrl-D path).""" + events = _cmd("ls", t0=0.0) + _cmd("ps", t0=1.0) + _cmd("cat", t0=2.0) + obs = _of(list(extract_session(events, sid="ex-clean")), PRIMITIVE) + assert obs.value == "graceful" + + +def test_abrupt_lower_confidence_than_graceful() -> None: + abrupt_events = _cmd("ls", t0=0.0) + _cmd("foo", t0=1.0, with_prompt=False) + graceful_events = _cmd("ls", t0=0.0) + _cmd("exit", t0=1.0) + a = _of(list(extract_session(abrupt_events, sid="ex-a")), PRIMITIVE) + g = _of(list(extract_session(graceful_events, sid="ex-g")), PRIMITIVE) + assert a.confidence < g.confidence