diff --git a/decnet/profiler/behave_shell/_features/__init__.py b/decnet/profiler/behave_shell/_features/__init__.py index 10dda731..838f8a08 100644 --- a/decnet/profiler/behave_shell/_features/__init__.py +++ b/decnet/profiler/behave_shell/_features/__init__.py @@ -26,6 +26,7 @@ from decnet.profiler.behave_shell._features.cognitive import ( ) from decnet.profiler.behave_shell._features.temporal import ( escalation_pattern, + landing_ritual, session_duration, ) from decnet.profiler.behave_shell._features.motor import ( @@ -65,4 +66,5 @@ FEATURES: tuple[FeatureFn, ...] = ( error_resilience_fallback_to_man, session_duration, escalation_pattern, + landing_ritual, ) diff --git a/decnet/profiler/behave_shell/_features/temporal.py b/decnet/profiler/behave_shell/_features/temporal.py index 9b72fbb9..91a9476f 100644 --- a/decnet/profiler/behave_shell/_features/temporal.py +++ b/decnet/profiler/behave_shell/_features/temporal.py @@ -7,6 +7,7 @@ and computed by the attribution engine, not the extractor. Step E.1: ``temporal.session_duration``. Step E.2: ``temporal.escalation_pattern``. +Step E.3: ``temporal.lifecycle_markers.landing_ritual``. """ from __future__ import annotations @@ -18,6 +19,7 @@ from decnet_behave_core.spec.envelope import Observation from decnet.profiler.behave_shell._ctx import SessionContext from decnet.profiler.behave_shell._features._emit import make_observation +from decnet.profiler.behave_shell._parse import hash_token from decnet.profiler.behave_shell._thresholds import ( ESCALATION_BURSTY_CV, ESCALATION_BURSTY_ZERO_FRAC, @@ -26,12 +28,29 @@ from decnet.profiler.behave_shell._thresholds import ( ESCALATION_SUSTAINED_CV, ESCALATION_WINDOW_MIN_S, ESCALATION_WINDOW_TARGET, + LANDING_RITUAL_FIRST_N, + LANDING_RITUAL_HIT_MIN, + LANDING_RITUAL_MIN_COMMANDS, SESSION_DURATION_LONG_MAX, SESSION_DURATION_MEDIUM_MAX, SESSION_DURATION_SHORT_MAX, ) +# Precomputed at import time so the per-session check is a set lookup, +# not 7 sha256 ops per session. The recon-survey vocabulary an attacker +# (or scripted runner) typically opens with on a freshly-landed shell. +_LANDING_RITUAL_HASHES: frozenset[str] = frozenset({ + hash_token("uname"), + hash_token("id"), + hash_token("whoami"), + hash_token("pwd"), + hash_token("hostname"), + hash_token("w"), + hash_token("who"), +}) + + def session_duration(ctx: SessionContext) -> Iterator[Observation]: """Emit ``temporal.session_duration`` ∈ {short, medium, long, marathon}. @@ -113,3 +132,37 @@ def escalation_pattern(ctx: SessionContext) -> Iterator[Observation]: value=value, confidence=confidence, ) + + +def landing_ritual(ctx: SessionContext) -> Iterator[Observation]: + """Emit ``temporal.lifecycle_markers.landing_ritual`` ∈ {present, absent}. + + Inspect the first ``LANDING_RITUAL_FIRST_N`` commands; if at least + ``LANDING_RITUAL_HIT_MIN`` of their first_token_hashes match the + recon-survey vocabulary set (``uname`` / ``id`` / ``whoami`` / + ``pwd`` / ``hostname`` / ``w`` / ``who``), the operator opened + with a landing ritual. + + Skip emission when there are no commands at all — the registry's + binary doesn't admit ``unknown`` and emitting ``absent`` from + nothing would be dishonest. Below ``LANDING_RITUAL_MIN_COMMANDS`` + we still emit, but at lower confidence — short sessions can still + show or fail to show a ritual. + """ + n = len(ctx.commands) + if n == 0: + return + head = ctx.commands[:LANDING_RITUAL_FIRST_N] + hits = sum(1 for c in head if c.first_token_hash in _LANDING_RITUAL_HASHES) + value = "present" if hits >= LANDING_RITUAL_HIT_MIN else "absent" + + if n < LANDING_RITUAL_MIN_COMMANDS: + confidence = 0.40 + else: + confidence = 0.65 + yield make_observation( + ctx, + primitive="temporal.lifecycle_markers.landing_ritual", + value=value, + confidence=confidence, + ) diff --git a/decnet/profiler/behave_shell/_thresholds.py b/decnet/profiler/behave_shell/_thresholds.py index b6b17e25..a708f7db 100644 --- a/decnet/profiler/behave_shell/_thresholds.py +++ b/decnet/profiler/behave_shell/_thresholds.py @@ -210,6 +210,14 @@ ESCALATION_SUSTAINED_CV: float = 0.50 ESCALATION_MIN_WINDOWS: int = 5 ESCALATION_MIN_COMMANDS: int = 5 +# ── temporal.lifecycle_markers.landing_ritual (Step E.3) ────────────────── +# How many of the first ``LANDING_RITUAL_FIRST_N`` commands must hit +# the recon-token set (uname / id / whoami / pwd / hostname / w / who) +# for the session to count as having a landing ritual. +LANDING_RITUAL_FIRST_N: int = 5 +LANDING_RITUAL_HIT_MIN: int = 2 +LANDING_RITUAL_MIN_COMMANDS: int = 3 + # ── motor.keystroke_cadence (Step B.1) ────────────────────────────────────── # Typing bursts split at gaps > IKI_THINK_MAX_S so think-pauses between # commands don't inflate the within-burst CV. Mirrors the prototype's diff --git a/tests/profiler/behave_shell/test_temporal_landing_ritual.py b/tests/profiler/behave_shell/test_temporal_landing_ritual.py new file mode 100644 index 00000000..fc3ef20d --- /dev/null +++ b/tests/profiler/behave_shell/test_temporal_landing_ritual.py @@ -0,0 +1,76 @@ +"""Step E.3: ``temporal.lifecycle_markers.landing_ritual``.""" +from __future__ import annotations + +from decnet.profiler.behave_shell import extract_session +from decnet.profiler.behave_shell._parse import AsciinemaEvent + + +PRIMITIVE = "temporal.lifecycle_markers.landing_ritual" + + +def _of(observations: list, primitive: str): + obs = [o for o in observations if o.primitive == primitive] + assert len(obs) == 1, f"expected exactly one {primitive}, got {len(obs)}" + return obs[0] + + +def _cmds(tokens: list[str]) -> list[AsciinemaEvent]: + events: list[AsciinemaEvent] = [] + for i, tok in enumerate(tokens): + t0 = i * 1.0 + for j, c in enumerate(tok): + events.append((t0 + j * 0.05, "i", c)) + events.append((t0 + len(tok) * 0.05, "i", "\r")) + return events + + +def test_no_commands_no_emission() -> None: + out = list(extract_session([(0.0, "i", "a")], sid="lr-empty")) + assert [o for o in out if o.primitive == PRIMITIVE] == [] + + +def test_recon_survey_emits_present() -> None: + """First commands are uname/id/whoami → present.""" + out = list(extract_session( + _cmds(["uname", "id", "whoami", "ls", "ps", "cat"]), + sid="lr-present", + )) + obs = _of(out, PRIMITIVE) + assert obs.value == "present" + + +def test_single_recon_token_below_threshold_emits_absent() -> None: + """One recon token in first-5 isn't enough (need ≥2) → absent.""" + out = list(extract_session( + _cmds(["uname", "vim", "edit", "save", "exit", "ls"]), + sid="lr-onehit", + )) + obs = _of(out, PRIMITIVE) + assert obs.value == "absent" + + +def test_no_recon_tokens_emits_absent() -> None: + out = list(extract_session( + _cmds(["vim", "edit", "save", "make", "ls", "cat"]), + sid="lr-absent", + )) + obs = _of(out, PRIMITIVE) + assert obs.value == "absent" + + +def test_recon_after_first_n_does_not_count() -> None: + """Only the first N=5 commands are considered.""" + out = list(extract_session( + _cmds(["vim", "edit", "save", "make", "test", "uname", "id", "whoami"]), + sid="lr-late", + )) + obs = _of(out, PRIMITIVE) + assert obs.value == "absent" + + +def test_short_session_low_confidence() -> None: + short = list(extract_session(_cmds(["uname", "id"]), sid="lr-short")) + full = list(extract_session(_cmds(["uname", "id", "whoami", "ls", "ps"]), sid="lr-full")) + s = _of(short, PRIMITIVE) + f = _of(full, PRIMITIVE) + assert s.confidence < f.confidence