feat(profiler/behave_shell): emit temporal.lifecycle_markers.landing_ritual

Inspect the first N commands; if at least K of their first_token_hashes
match the recon-survey vocabulary (uname/id/whoami/pwd/hostname/w/who),
emit present, else absent. Hashes precomputed at module load; PII-safe.
v0.1 N=5, K=2.
This commit is contained in:
2026-05-04 00:15:05 -04:00
parent d40495d71b
commit 1341df2705
4 changed files with 139 additions and 0 deletions

View File

@@ -26,6 +26,7 @@ from decnet.profiler.behave_shell._features.cognitive import (
)
from decnet.profiler.behave_shell._features.temporal import (
escalation_pattern,
landing_ritual,
session_duration,
)
from decnet.profiler.behave_shell._features.motor import (
@@ -65,4 +66,5 @@ FEATURES: tuple[FeatureFn, ...] = (
error_resilience_fallback_to_man,
session_duration,
escalation_pattern,
landing_ritual,
)

View File

@@ -7,6 +7,7 @@ and computed by the attribution engine, not the extractor.
Step E.1: ``temporal.session_duration``.
Step E.2: ``temporal.escalation_pattern``.
Step E.3: ``temporal.lifecycle_markers.landing_ritual``.
"""
from __future__ import annotations
@@ -18,6 +19,7 @@ from decnet_behave_core.spec.envelope import Observation
from decnet.profiler.behave_shell._ctx import SessionContext
from decnet.profiler.behave_shell._features._emit import make_observation
from decnet.profiler.behave_shell._parse import hash_token
from decnet.profiler.behave_shell._thresholds import (
ESCALATION_BURSTY_CV,
ESCALATION_BURSTY_ZERO_FRAC,
@@ -26,12 +28,29 @@ from decnet.profiler.behave_shell._thresholds import (
ESCALATION_SUSTAINED_CV,
ESCALATION_WINDOW_MIN_S,
ESCALATION_WINDOW_TARGET,
LANDING_RITUAL_FIRST_N,
LANDING_RITUAL_HIT_MIN,
LANDING_RITUAL_MIN_COMMANDS,
SESSION_DURATION_LONG_MAX,
SESSION_DURATION_MEDIUM_MAX,
SESSION_DURATION_SHORT_MAX,
)
# Precomputed at import time so the per-session check is a set lookup,
# not 7 sha256 ops per session. The recon-survey vocabulary an attacker
# (or scripted runner) typically opens with on a freshly-landed shell.
_LANDING_RITUAL_HASHES: frozenset[str] = frozenset({
hash_token("uname"),
hash_token("id"),
hash_token("whoami"),
hash_token("pwd"),
hash_token("hostname"),
hash_token("w"),
hash_token("who"),
})
def session_duration(ctx: SessionContext) -> Iterator[Observation]:
"""Emit ``temporal.session_duration`` ∈ {short, medium, long, marathon}.
@@ -113,3 +132,37 @@ def escalation_pattern(ctx: SessionContext) -> Iterator[Observation]:
value=value,
confidence=confidence,
)
def landing_ritual(ctx: SessionContext) -> Iterator[Observation]:
"""Emit ``temporal.lifecycle_markers.landing_ritual`` ∈ {present, absent}.
Inspect the first ``LANDING_RITUAL_FIRST_N`` commands; if at least
``LANDING_RITUAL_HIT_MIN`` of their first_token_hashes match the
recon-survey vocabulary set (``uname`` / ``id`` / ``whoami`` /
``pwd`` / ``hostname`` / ``w`` / ``who``), the operator opened
with a landing ritual.
Skip emission when there are no commands at all — the registry's
binary doesn't admit ``unknown`` and emitting ``absent`` from
nothing would be dishonest. Below ``LANDING_RITUAL_MIN_COMMANDS``
we still emit, but at lower confidence — short sessions can still
show or fail to show a ritual.
"""
n = len(ctx.commands)
if n == 0:
return
head = ctx.commands[:LANDING_RITUAL_FIRST_N]
hits = sum(1 for c in head if c.first_token_hash in _LANDING_RITUAL_HASHES)
value = "present" if hits >= LANDING_RITUAL_HIT_MIN else "absent"
if n < LANDING_RITUAL_MIN_COMMANDS:
confidence = 0.40
else:
confidence = 0.65
yield make_observation(
ctx,
primitive="temporal.lifecycle_markers.landing_ritual",
value=value,
confidence=confidence,
)

View File

@@ -210,6 +210,14 @@ ESCALATION_SUSTAINED_CV: float = 0.50
ESCALATION_MIN_WINDOWS: int = 5
ESCALATION_MIN_COMMANDS: int = 5
# ── temporal.lifecycle_markers.landing_ritual (Step E.3) ──────────────────
# How many of the first ``LANDING_RITUAL_FIRST_N`` commands must hit
# the recon-token set (uname / id / whoami / pwd / hostname / w / who)
# for the session to count as having a landing ritual.
LANDING_RITUAL_FIRST_N: int = 5
LANDING_RITUAL_HIT_MIN: int = 2
LANDING_RITUAL_MIN_COMMANDS: int = 3
# ── motor.keystroke_cadence (Step B.1) ──────────────────────────────────────
# Typing bursts split at gaps > IKI_THINK_MAX_S so think-pauses between
# commands don't inflate the within-burst CV. Mirrors the prototype's

View File

@@ -0,0 +1,76 @@
"""Step E.3: ``temporal.lifecycle_markers.landing_ritual``."""
from __future__ import annotations
from decnet.profiler.behave_shell import extract_session
from decnet.profiler.behave_shell._parse import AsciinemaEvent
PRIMITIVE = "temporal.lifecycle_markers.landing_ritual"
def _of(observations: list, primitive: str):
obs = [o for o in observations if o.primitive == primitive]
assert len(obs) == 1, f"expected exactly one {primitive}, got {len(obs)}"
return obs[0]
def _cmds(tokens: list[str]) -> list[AsciinemaEvent]:
events: list[AsciinemaEvent] = []
for i, tok in enumerate(tokens):
t0 = i * 1.0
for j, c in enumerate(tok):
events.append((t0 + j * 0.05, "i", c))
events.append((t0 + len(tok) * 0.05, "i", "\r"))
return events
def test_no_commands_no_emission() -> None:
out = list(extract_session([(0.0, "i", "a")], sid="lr-empty"))
assert [o for o in out if o.primitive == PRIMITIVE] == []
def test_recon_survey_emits_present() -> None:
"""First commands are uname/id/whoami → present."""
out = list(extract_session(
_cmds(["uname", "id", "whoami", "ls", "ps", "cat"]),
sid="lr-present",
))
obs = _of(out, PRIMITIVE)
assert obs.value == "present"
def test_single_recon_token_below_threshold_emits_absent() -> None:
"""One recon token in first-5 isn't enough (need ≥2) → absent."""
out = list(extract_session(
_cmds(["uname", "vim", "edit", "save", "exit", "ls"]),
sid="lr-onehit",
))
obs = _of(out, PRIMITIVE)
assert obs.value == "absent"
def test_no_recon_tokens_emits_absent() -> None:
out = list(extract_session(
_cmds(["vim", "edit", "save", "make", "ls", "cat"]),
sid="lr-absent",
))
obs = _of(out, PRIMITIVE)
assert obs.value == "absent"
def test_recon_after_first_n_does_not_count() -> None:
"""Only the first N=5 commands are considered."""
out = list(extract_session(
_cmds(["vim", "edit", "save", "make", "test", "uname", "id", "whoami"]),
sid="lr-late",
))
obs = _of(out, PRIMITIVE)
assert obs.value == "absent"
def test_short_session_low_confidence() -> None:
short = list(extract_session(_cmds(["uname", "id"]), sid="lr-short"))
full = list(extract_session(_cmds(["uname", "id", "whoami", "ls", "ps"]), sid="lr-full"))
s = _of(short, PRIMITIVE)
f = _of(full, PRIMITIVE)
assert s.confidence < f.confidence