diff --git a/decnet/profiler/behave_shell/_features/__init__.py b/decnet/profiler/behave_shell/_features/__init__.py index 537078b6..d30bfebd 100644 --- a/decnet/profiler/behave_shell/_features/__init__.py +++ b/decnet/profiler/behave_shell/_features/__init__.py @@ -32,6 +32,7 @@ from decnet.profiler.behave_shell._features.environmental import ( terminal_multiplexer, ) from decnet.profiler.behave_shell._features.operational import ( + cleanup_behavior, objective, opsec_discipline, ) @@ -87,4 +88,5 @@ FEATURES: tuple[FeatureFn, ...] = ( numpad_usage, objective, opsec_discipline, + cleanup_behavior, ) diff --git a/decnet/profiler/behave_shell/_features/operational.py b/decnet/profiler/behave_shell/_features/operational.py index 7093276d..0e528b6e 100644 --- a/decnet/profiler/behave_shell/_features/operational.py +++ b/decnet/profiler/behave_shell/_features/operational.py @@ -22,6 +22,8 @@ from decnet.profiler.behave_shell._intent import ( classify_intent, ) from decnet.profiler.behave_shell._thresholds import ( + CLEANUP_TAIL_K, + CLEANUP_THOROUGH_MIN_DISTINCT, EXIT_BEHAVIOR_LOOKBACK_K, INTENT_FULL_CONFIDENCE_MIN, INTENT_MIN_COMMANDS, @@ -109,3 +111,43 @@ def opsec_discipline(ctx: SessionContext) -> Iterator[Observation]: value=value, confidence=confidence, ) + + +def cleanup_behavior(ctx: SessionContext) -> Iterator[Observation]: + """Emit ``operational.cleanup_behavior`` ∈ {thorough, partial, none}. + + Inspect the last ``CLEANUP_TAIL_K`` (=5) commands. Count distinct + cleanup-family hashes (``history`` / ``unset`` / ``rm`` / ``shred`` + / ``clear`` / ``kill``) in that window: + + * ``thorough`` — ≥ ``CLEANUP_THOROUGH_MIN_DISTINCT`` (3) distinct + cleanup tokens. + * ``partial`` — 1-2 distinct cleanup tokens. + * ``none`` — zero hits. + + Adjacent to E.4's ``exit_behavior=cleanup`` emission — E.4 is + binary "did it happen", G.3 graduates intensity. Both ride. + + Skip emission when no commands. Confidence 0.55 when commands ≥ 8; + 0.35 below. + """ + if not ctx.commands: + return + tail = ctx.commands[-CLEANUP_TAIL_K:] + distinct = { + c.first_token_hash for c in tail + if c.first_token_hash in _CLEANUP_TOKEN_HASHES + } + if len(distinct) >= CLEANUP_THOROUGH_MIN_DISTINCT: + value = "thorough" + elif len(distinct) >= 1: + value = "partial" + else: + value = "none" + confidence = 0.55 if len(ctx.commands) >= 8 else 0.35 + yield make_observation( + ctx, + primitive="operational.cleanup_behavior", + value=value, + confidence=confidence, + ) diff --git a/tests/profiler/behave_shell/test_operational_cleanup_behavior.py b/tests/profiler/behave_shell/test_operational_cleanup_behavior.py new file mode 100644 index 00000000..b36b5f05 --- /dev/null +++ b/tests/profiler/behave_shell/test_operational_cleanup_behavior.py @@ -0,0 +1,95 @@ +"""Step G.3: ``operational.cleanup_behavior`` ∈ {thorough, partial, none}.""" +from __future__ import annotations + +from decnet.profiler.behave_shell import extract_session +from decnet.profiler.behave_shell._parse import AsciinemaEvent + + +PRIMITIVE = "operational.cleanup_behavior" + + +def _of(observations: list, primitive: str): + obs = [o for o in observations if o.primitive == primitive] + assert len(obs) == 1, f"expected exactly one {primitive}, got {len(obs)}" + return obs[0] + + +def _typed(text: str, t0: float = 0.0, dt: float = 0.05) -> list[AsciinemaEvent]: + return [(t0 + i * dt, "i", c) for i, c in enumerate(text)] + + +def _cmd(token: str, t0: float, *, with_prompt: bool = True) -> list[AsciinemaEvent]: + events = _typed(f"{token}\r", t0=t0) + cmd_end = t0 + len(token) * 0.05 + if with_prompt: + events.append((cmd_end + 0.10, "o", "out\nanti@host:~$ ")) + else: + events.append((cmd_end + 0.10, "o", "out\n")) + return events + + +def test_no_commands_no_emission() -> None: + out = list(extract_session([(0.0, "i", "x")], sid="g3-empty")) + assert [o for o in out if o.primitive == PRIMITIVE] == [] + + +def test_thorough_three_distinct_cleanup_in_tail() -> None: + events = ( + _cmd("ls", t0=0.0) + + _cmd("history", t0=1.0) + + _cmd("rm", t0=2.0) + + _cmd("shred", t0=3.0) + + _cmd("clear", t0=4.0) + ) + obs = _of(list(extract_session(events, sid="g3-thorough")), PRIMITIVE) + assert obs.value == "thorough" + + +def test_partial_two_distinct_cleanup() -> None: + events = ( + _cmd("ls", t0=0.0) + + _cmd("pwd", t0=1.0) + + _cmd("rm", t0=2.0) + + _cmd("clear", t0=3.0) + ) + obs = _of(list(extract_session(events, sid="g3-partial")), PRIMITIVE) + assert obs.value == "partial" + + +def test_none_no_cleanup() -> None: + events = ( + _cmd("ls", t0=0.0) + + _cmd("pwd", t0=1.0) + + _cmd("cat", t0=2.0) + ) + obs = _of(list(extract_session(events, sid="g3-none")), PRIMITIVE) + assert obs.value == "none" + + +def test_low_command_count_lower_confidence() -> None: + events = _cmd("ls", t0=0.0) + _cmd("pwd", t0=1.0) + obs = _of(list(extract_session(events, sid="g3-thin")), PRIMITIVE) + assert obs.confidence == 0.35 + + +def test_high_command_count_higher_confidence() -> None: + events: list[AsciinemaEvent] = [] + for i, tok in enumerate(["ls", "pwd", "cat", "find", "ps", "ss", "id", "uname", "rm", "clear"]): + events += _cmd(tok, t0=float(i)) + obs = _of(list(extract_session(events, sid="g3-conf")), PRIMITIVE) + assert obs.confidence == 0.55 + + +def test_only_tail_window_counts() -> None: + """``rm`` early then 5 recon commands → tail has zero cleanup → none.""" + events = ( + _cmd("rm", t0=0.0) + + _cmd("ls", t0=1.0) + + _cmd("pwd", t0=2.0) + + _cmd("cat", t0=3.0) + + _cmd("find", t0=4.0) + + _cmd("ps", t0=5.0) + + _cmd("ss", t0=6.0) + ) + obs = _of(list(extract_session(events, sid="g3-window")), PRIMITIVE) + assert obs.value == "none"