feat(profiler/behave_shell): emit temporal.lifecycle_markers.exit_behavior
Resolves the E.4 hold from Phase E. F.0's Command.followed_by_prompt
gives us the exit-code proxy (prompt-after-last-command) we couldn't
get in Phase E.
Logic: last command without trailing prompt → abrupt; first_token_hash
in {exit, logout, quit, logoff} → graceful; any of the last K=3
commands' first_token_hash in {history, unset, rm, shred, clear, kill}
→ cleanup; else → graceful (clean Ctrl-D / window close).
This commit is contained in:
@@ -33,6 +33,7 @@ from decnet.profiler.behave_shell._features.environmental import (
|
||||
)
|
||||
from decnet.profiler.behave_shell._features.temporal import (
|
||||
escalation_pattern,
|
||||
exit_behavior,
|
||||
landing_ritual,
|
||||
session_duration,
|
||||
)
|
||||
@@ -74,6 +75,7 @@ FEATURES: tuple[FeatureFn, ...] = (
|
||||
session_duration,
|
||||
escalation_pattern,
|
||||
landing_ritual,
|
||||
exit_behavior,
|
||||
shell_type,
|
||||
terminal_multiplexer,
|
||||
locale,
|
||||
|
||||
@@ -8,6 +8,7 @@ and computed by the attribution engine, not the extractor.
|
||||
Step E.1: ``temporal.session_duration``.
|
||||
Step E.2: ``temporal.escalation_pattern``.
|
||||
Step E.3: ``temporal.lifecycle_markers.landing_ritual``.
|
||||
Step E.4: ``temporal.lifecycle_markers.exit_behavior`` (unblocked by F.0).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
@@ -28,6 +29,7 @@ from decnet.profiler.behave_shell._thresholds import (
|
||||
ESCALATION_SUSTAINED_CV,
|
||||
ESCALATION_WINDOW_MIN_S,
|
||||
ESCALATION_WINDOW_TARGET,
|
||||
EXIT_BEHAVIOR_LOOKBACK_K,
|
||||
LANDING_RITUAL_FIRST_N,
|
||||
LANDING_RITUAL_HIT_MIN,
|
||||
LANDING_RITUAL_MIN_COMMANDS,
|
||||
@@ -37,6 +39,25 @@ from decnet.profiler.behave_shell._thresholds import (
|
||||
)
|
||||
|
||||
|
||||
# Precomputed at import time. ``graceful`` is operator-typed shutdown;
|
||||
# ``cleanup`` is the wipe-tracks vocabulary. Both expand to v0.2 once
|
||||
# the corpus shows what gets missed.
|
||||
_GRACEFUL_EXIT_HASHES: frozenset[str] = frozenset({
|
||||
hash_token("exit"),
|
||||
hash_token("logout"),
|
||||
hash_token("quit"),
|
||||
hash_token("logoff"),
|
||||
})
|
||||
_CLEANUP_TOKEN_HASHES: frozenset[str] = frozenset({
|
||||
hash_token("history"),
|
||||
hash_token("unset"),
|
||||
hash_token("rm"),
|
||||
hash_token("shred"),
|
||||
hash_token("clear"),
|
||||
hash_token("kill"),
|
||||
})
|
||||
|
||||
|
||||
# Precomputed at import time so the per-session check is a set lookup,
|
||||
# not 7 sha256 ops per session. The recon-survey vocabulary an attacker
|
||||
# (or scripted runner) typically opens with on a freshly-landed shell.
|
||||
@@ -166,3 +187,51 @@ def landing_ritual(ctx: SessionContext) -> Iterator[Observation]:
|
||||
value=value,
|
||||
confidence=confidence,
|
||||
)
|
||||
|
||||
|
||||
def exit_behavior(ctx: SessionContext) -> Iterator[Observation]:
|
||||
"""Emit ``temporal.lifecycle_markers.exit_behavior`` ∈ {graceful, abrupt, cleanup}.
|
||||
|
||||
Resolution of the E.4 hold from Phase E. Now that F.0's
|
||||
``Command.followed_by_prompt`` gives us prompt-after-last-command
|
||||
visibility — the exit-code proxy we couldn't get in Phase E:
|
||||
|
||||
1. Last command **lacks** a trailing prompt → ``abrupt`` (session
|
||||
cut mid-output, custom PS1 swallowing, or genuinely interrupted).
|
||||
2. Last command's first_token_hash ∈ ``_GRACEFUL_EXIT_HASHES``
|
||||
(``exit`` / ``logout`` / ``quit`` / ``logoff``) → ``graceful``.
|
||||
3. Any of the last ``EXIT_BEHAVIOR_LOOKBACK_K`` (3) commands'
|
||||
first_token_hash ∈ ``_CLEANUP_TOKEN_HASHES`` (``history`` /
|
||||
``unset`` / ``rm`` / ``shred`` / ``clear`` / ``kill``) →
|
||||
``cleanup``.
|
||||
4. Else → ``graceful`` (clean Ctrl-D / window close).
|
||||
|
||||
Skip emission when no commands.
|
||||
|
||||
Confidence 0.65 when the trailing prompt is clear; 0.45 for
|
||||
``abrupt`` (a custom PS1 suppressing prompt echo could also yield
|
||||
``followed_by_prompt=False``).
|
||||
"""
|
||||
if not ctx.commands:
|
||||
return
|
||||
last = ctx.commands[-1]
|
||||
if not last.followed_by_prompt:
|
||||
value = "abrupt"
|
||||
confidence = 0.45
|
||||
elif last.first_token_hash in _GRACEFUL_EXIT_HASHES:
|
||||
value = "graceful"
|
||||
confidence = 0.65
|
||||
else:
|
||||
tail = ctx.commands[-EXIT_BEHAVIOR_LOOKBACK_K:]
|
||||
if any(c.first_token_hash in _CLEANUP_TOKEN_HASHES for c in tail):
|
||||
value = "cleanup"
|
||||
confidence = 0.65
|
||||
else:
|
||||
value = "graceful"
|
||||
confidence = 0.65
|
||||
yield make_observation(
|
||||
ctx,
|
||||
primitive="temporal.lifecycle_markers.exit_behavior",
|
||||
value=value,
|
||||
confidence=confidence,
|
||||
)
|
||||
|
||||
@@ -272,6 +272,10 @@ NUMPAD_RUN_MIN: int = 4
|
||||
# Below this many typed chars total, skip emission (no honest signal).
|
||||
NUMPAD_MIN_TYPED_CHARS: int = 50
|
||||
|
||||
# ── temporal.lifecycle_markers.exit_behavior (Step E.4, unblocked by F.0) ──
|
||||
# How many of the last commands to inspect for cleanup-family tokens.
|
||||
EXIT_BEHAVIOR_LOOKBACK_K: int = 3
|
||||
|
||||
# ── motor.keystroke_cadence (Step B.1) ──────────────────────────────────────
|
||||
# Typing bursts split at gaps > IKI_THINK_MAX_S so think-pauses between
|
||||
# commands don't inflate the within-burst CV. Mirrors the prototype's
|
||||
|
||||
79
tests/profiler/behave_shell/test_temporal_exit_behavior.py
Normal file
79
tests/profiler/behave_shell/test_temporal_exit_behavior.py
Normal file
@@ -0,0 +1,79 @@
|
||||
"""Step E.4: ``temporal.lifecycle_markers.exit_behavior`` (unblocked by F.0)."""
|
||||
from __future__ import annotations
|
||||
|
||||
from decnet.profiler.behave_shell import extract_session
|
||||
from decnet.profiler.behave_shell._parse import AsciinemaEvent
|
||||
|
||||
|
||||
PRIMITIVE = "temporal.lifecycle_markers.exit_behavior"
|
||||
|
||||
|
||||
def _of(observations: list, primitive: str):
|
||||
obs = [o for o in observations if o.primitive == primitive]
|
||||
assert len(obs) == 1, f"expected exactly one {primitive}, got {len(obs)}"
|
||||
return obs[0]
|
||||
|
||||
|
||||
def _typed(text: str, t0: float = 0.0, dt: float = 0.05) -> list[AsciinemaEvent]:
|
||||
return [(t0 + i * dt, "i", c) for i, c in enumerate(text)]
|
||||
|
||||
|
||||
def _cmd(token: str, t0: float, *, with_prompt: bool = True) -> list[AsciinemaEvent]:
|
||||
"""Emit one command + (optionally) a trailing prompt-line."""
|
||||
events = _typed(f"{token}\r", t0=t0)
|
||||
cmd_end = t0 + len(token) * 0.05
|
||||
if with_prompt:
|
||||
events.append((cmd_end + 0.10, "o", "out\nanti@host:~$ "))
|
||||
else:
|
||||
events.append((cmd_end + 0.10, "o", "out\n"))
|
||||
return events
|
||||
|
||||
|
||||
def test_no_commands_no_emission() -> None:
|
||||
out = list(extract_session([(0.0, "i", "x")], sid="ex-empty"))
|
||||
assert [o for o in out if o.primitive == PRIMITIVE] == []
|
||||
|
||||
|
||||
def test_last_command_no_prompt_emits_abrupt() -> None:
|
||||
"""Session cut mid-output → no trailing prompt → abrupt."""
|
||||
events = _cmd("ls", t0=0.0) + _cmd("foo", t0=1.0, with_prompt=False)
|
||||
obs = _of(list(extract_session(events, sid="ex-abrupt")), PRIMITIVE)
|
||||
assert obs.value == "abrupt"
|
||||
|
||||
|
||||
def test_explicit_exit_token_emits_graceful() -> None:
|
||||
events = _cmd("ls", t0=0.0) + _cmd("exit", t0=1.0)
|
||||
obs = _of(list(extract_session(events, sid="ex-graceful")), PRIMITIVE)
|
||||
assert obs.value == "graceful"
|
||||
|
||||
|
||||
def test_logout_token_emits_graceful() -> None:
|
||||
events = _cmd("ls", t0=0.0) + _cmd("logout", t0=1.0)
|
||||
obs = _of(list(extract_session(events, sid="ex-logout")), PRIMITIVE)
|
||||
assert obs.value == "graceful"
|
||||
|
||||
|
||||
def test_cleanup_token_in_tail_emits_cleanup() -> None:
|
||||
"""Last few commands include cleanup vocabulary → cleanup."""
|
||||
events = (
|
||||
_cmd("ls", t0=0.0)
|
||||
+ _cmd("cat", t0=1.0)
|
||||
+ _cmd("history", t0=2.0) # cleanup-family token in tail
|
||||
)
|
||||
obs = _of(list(extract_session(events, sid="ex-cleanup")), PRIMITIVE)
|
||||
assert obs.value == "cleanup"
|
||||
|
||||
|
||||
def test_clean_session_with_prompt_emits_graceful() -> None:
|
||||
"""Trailing prompt + no exit/cleanup tokens → graceful (Ctrl-D path)."""
|
||||
events = _cmd("ls", t0=0.0) + _cmd("ps", t0=1.0) + _cmd("cat", t0=2.0)
|
||||
obs = _of(list(extract_session(events, sid="ex-clean")), PRIMITIVE)
|
||||
assert obs.value == "graceful"
|
||||
|
||||
|
||||
def test_abrupt_lower_confidence_than_graceful() -> None:
|
||||
abrupt_events = _cmd("ls", t0=0.0) + _cmd("foo", t0=1.0, with_prompt=False)
|
||||
graceful_events = _cmd("ls", t0=0.0) + _cmd("exit", t0=1.0)
|
||||
a = _of(list(extract_session(abrupt_events, sid="ex-a")), PRIMITIVE)
|
||||
g = _of(list(extract_session(graceful_events, sid="ex-g")), PRIMITIVE)
|
||||
assert a.confidence < g.confidence
|
||||
Reference in New Issue
Block a user