feat(profiler/behave_shell): emit motor.shell_mastery.pipe_chaining_depth
This commit is contained in:
@@ -24,6 +24,7 @@ from decnet.profiler.behave_shell._features.motor import (
|
||||
keystroke_cadence,
|
||||
motor_stability,
|
||||
paste_burst_rate,
|
||||
pipe_chaining_depth,
|
||||
shortcut_usage,
|
||||
tab_completion,
|
||||
)
|
||||
@@ -39,6 +40,7 @@ FEATURES: tuple[FeatureFn, ...] = (
|
||||
command_chunking,
|
||||
tab_completion,
|
||||
shortcut_usage,
|
||||
pipe_chaining_depth,
|
||||
inter_command_latency_class,
|
||||
command_branch_diversity,
|
||||
feedback_loop_engagement,
|
||||
|
||||
@@ -28,6 +28,8 @@ from decnet.profiler.behave_shell._thresholds import (
|
||||
PASTE_RATE_OCCASIONAL_MIN,
|
||||
SHELL_MASTERY_BOUNDARY_BAND,
|
||||
SHELL_MASTERY_MIN_COMMANDS,
|
||||
PIPE_CHAINING_DEEP_MEDIAN,
|
||||
PIPE_CHAINING_MODERATE_MEDIAN,
|
||||
SHORTCUT_USAGE_HEAVY_MIN,
|
||||
SHORTCUT_USAGE_MODERATE_MIN,
|
||||
TAB_COMPLETION_HABITUAL_MIN,
|
||||
@@ -366,3 +368,55 @@ def shortcut_usage(ctx: SessionContext) -> Iterator[Observation]:
|
||||
value=value,
|
||||
confidence=confidence,
|
||||
)
|
||||
|
||||
|
||||
def pipe_chaining_depth(ctx: SessionContext) -> Iterator[Observation]:
|
||||
"""Emit ``motor.shell_mastery.pipe_chaining_depth`` ∈ {shallow, moderate, deep}.
|
||||
|
||||
Metric: median ``|`` count across commands. Pipes are counted on
|
||||
every byte regardless of whether they came from a paste-burst —
|
||||
a pasted pipeline is still a pipeline the operator chose to run,
|
||||
and the registry's intent is "what does this operator's typical
|
||||
command look like?", not "did they type it themselves?".
|
||||
|
||||
Buckets (median):
|
||||
* ≤ 1 → shallow (no pipe, or one-stage pipeline)
|
||||
* == 2 → moderate
|
||||
* ≥ 3 → deep
|
||||
|
||||
Confidence:
|
||||
* < ``SHELL_MASTERY_MIN_COMMANDS`` → 0.40.
|
||||
* Median within ±10% of either integer boundary (2 or 3) → 0.55.
|
||||
* Otherwise → 0.70.
|
||||
|
||||
Skips emission when the session has no commands.
|
||||
"""
|
||||
n = len(ctx.commands)
|
||||
if n == 0:
|
||||
return
|
||||
pipes_per_cmd = sorted(c.pipe_count for c in ctx.commands)
|
||||
median = statistics.median(pipes_per_cmd)
|
||||
|
||||
if median >= PIPE_CHAINING_DEEP_MEDIAN:
|
||||
value = "deep"
|
||||
elif median >= PIPE_CHAINING_MODERATE_MEDIAN:
|
||||
value = "moderate"
|
||||
else:
|
||||
value = "shallow"
|
||||
|
||||
if n < SHELL_MASTERY_MIN_COMMANDS:
|
||||
confidence = 0.40
|
||||
elif (
|
||||
_near(median, PIPE_CHAINING_MODERATE_MEDIAN)
|
||||
or _near(median, PIPE_CHAINING_DEEP_MEDIAN)
|
||||
):
|
||||
confidence = 0.55
|
||||
else:
|
||||
confidence = 0.70
|
||||
|
||||
yield make_observation(
|
||||
ctx,
|
||||
primitive="motor.shell_mastery.pipe_chaining_depth",
|
||||
value=value,
|
||||
confidence=confidence,
|
||||
)
|
||||
|
||||
@@ -142,6 +142,19 @@ TAB_COMPLETION_HABITUAL_MIN: float = 0.50
|
||||
SHORTCUT_USAGE_MODERATE_MIN: float = 0.05
|
||||
SHORTCUT_USAGE_HEAVY_MIN: float = 0.30
|
||||
|
||||
# motor.shell_mastery.pipe_chaining_depth — median ``|`` count across
|
||||
# commands. Pipes are counted on every byte (typed AND pasted) — a
|
||||
# pasted pipeline still indicates pipeline fluency the operator chose
|
||||
# to execute. Registry buckets per BEHAVE-EXTRACTOR.md line 473:
|
||||
# median ≤ 1 → shallow (no pipeline at all, or one stage)
|
||||
# median == 2 → moderate
|
||||
# median ≥ 3 → deep
|
||||
# Median is integer-valued (sum of ints over commands), so the
|
||||
# boundaries here are integer step boundaries; the proximity-band
|
||||
# logic uses integer equality.
|
||||
PIPE_CHAINING_MODERATE_MEDIAN: int = 2
|
||||
PIPE_CHAINING_DEEP_MEDIAN: int = 3
|
||||
|
||||
# Sample-size floor below which Phase C primitives drop confidence to
|
||||
# 0.40 (sample-size honesty). Mirrors MIN_COMMANDS_FOR_FULL_CONFIDENCE
|
||||
# but is named separately so a future tune can move them independently.
|
||||
|
||||
@@ -648,7 +648,7 @@ unchecked = no v0 tag.**
|
||||
### Phase C — `motor.shell_mastery.*`
|
||||
- [x] C.1 `motor.shell_mastery.tab_completion`
|
||||
- [x] C.2 `motor.shell_mastery.shortcut_usage`
|
||||
- [ ] C.3 `motor.shell_mastery.pipe_chaining_depth`
|
||||
- [x] C.3 `motor.shell_mastery.pipe_chaining_depth`
|
||||
|
||||
### Phase D — `cognitive.*` completion
|
||||
- [ ] D.1 `cognitive.cognitive_load`
|
||||
@@ -760,6 +760,40 @@ Phase C (``motor.shell_mastery.*``, 3 primitives) lands next.
|
||||
|
||||
---
|
||||
|
||||
## Phase C completion log
|
||||
|
||||
Closed in 3 commits, one primitive per commit. The
|
||||
``motor.shell_mastery.*`` block now emits — three per-command counters
|
||||
(`tab_count`, `shortcut_count`, `pipe_count`) populated during the
|
||||
single-pass `_segment_commands()` sweep, fed to three independent
|
||||
classifiers.
|
||||
|
||||
| Primitive | Confidence | Source signal |
|
||||
|---|---|---|
|
||||
| `motor.shell_mastery.tab_completion` | 0.40 / 0.55 / 0.75 | fraction of commands containing ≥1 ``\t``; <30% → occasional, ≥50% → habitual, 30%-50% gap rounds down |
|
||||
| `motor.shell_mastery.shortcut_usage` | 0.40 / 0.55 / 0.65 | total readline ctrl bytes (^A/^E/^W/^U/^R/^B/^F) per command; v0.1 thresholds 0.05 / 0.30 awaiting corpus calibration |
|
||||
| `motor.shell_mastery.pipe_chaining_depth` | 0.40 / 0.55 / 0.70 | median ``\|`` count across commands; 2 → moderate, ≥3 → deep; pasted pipelines count too |
|
||||
|
||||
Implementation note: ANTI relaxed the Phase A/B PII discipline for
|
||||
this phase — full attacker profiles outweigh residual PII paranoia
|
||||
on a honeypot byte stream. Even so, only **integer counters** land
|
||||
on `Command`; the raw bytes are read once during the segmentation
|
||||
walk and discarded. No character data is retained or serialised.
|
||||
|
||||
The ^U / ^W bytes that drive ``shortcut_usage`` also count toward
|
||||
``motor.error_correction``'s ``kill_line_count`` channel (Step B.3).
|
||||
These are independent measurements over the same byte stream — not
|
||||
double-counting, just two different questions about the same key.
|
||||
|
||||
**Calibration grid widened:** ``PHASE_ABC_PRIMITIVES`` now contains
|
||||
13 names and is binding for every subsequent phase. The set rename
|
||||
from ``PHASE_AB_PRIMITIVES`` lands in C.1; downstream phases extend
|
||||
the same set without renaming again until v0.
|
||||
|
||||
Phase D (``cognitive.*`` completion, 7+1 primitives) lands next.
|
||||
|
||||
---
|
||||
|
||||
**Owner:** ANTI.
|
||||
**Implementation gate:** Step 0 starts after this doc is reviewed +
|
||||
Phase 1 of `BEHAVE-INTEGRATION.md` lands (storage table exists).
|
||||
|
||||
@@ -47,6 +47,7 @@ PHASE_ABC_PRIMITIVES: frozenset[str] = frozenset({
|
||||
# Phase C — motor.shell_mastery.* (lands one primitive per commit)
|
||||
"motor.shell_mastery.tab_completion",
|
||||
"motor.shell_mastery.shortcut_usage",
|
||||
"motor.shell_mastery.pipe_chaining_depth",
|
||||
})
|
||||
|
||||
|
||||
|
||||
103
tests/profiler/behave_shell/test_motor_pipe_chaining_depth.py
Normal file
103
tests/profiler/behave_shell/test_motor_pipe_chaining_depth.py
Normal file
@@ -0,0 +1,103 @@
|
||||
"""Step C.3: ``motor.shell_mastery.pipe_chaining_depth``."""
|
||||
from __future__ import annotations
|
||||
|
||||
from decnet.profiler.behave_shell import extract_session
|
||||
from decnet.profiler.behave_shell._ctx import build_session_context
|
||||
from decnet.profiler.behave_shell._parse import AsciinemaEvent
|
||||
|
||||
PRIMITIVE = "motor.shell_mastery.pipe_chaining_depth"
|
||||
|
||||
|
||||
def _of(observations: list, primitive: str):
|
||||
obs = [o for o in observations if o.primitive == primitive]
|
||||
assert len(obs) == 1, f"expected exactly one {primitive}, got {len(obs)}"
|
||||
return obs[0]
|
||||
|
||||
|
||||
def _command(t0: float, body: str) -> list[AsciinemaEvent]:
|
||||
events: list[AsciinemaEvent] = []
|
||||
t = t0
|
||||
for c in body:
|
||||
events.append((t, "i", c))
|
||||
t += 0.05
|
||||
events.append((t, "i", "\r"))
|
||||
return events
|
||||
|
||||
|
||||
def _session(bodies: list[str], gap: float = 1.0) -> list[AsciinemaEvent]:
|
||||
events: list[AsciinemaEvent] = []
|
||||
t = 0.0
|
||||
for body in bodies:
|
||||
events.extend(_command(t, body))
|
||||
t = events[-1][0] + gap
|
||||
return events
|
||||
|
||||
|
||||
def test_no_commands_no_emission() -> None:
|
||||
out = list(extract_session([(0.0, "i", "ls")], sid="pipe-empty"))
|
||||
assert [o for o in out if o.primitive == PRIMITIVE] == []
|
||||
|
||||
|
||||
def test_no_pipes_emit_shallow() -> None:
|
||||
out = list(extract_session(_session(["ls", "pwd", "id", "uname", "whoami"]),
|
||||
sid="pipe-shallow"))
|
||||
obs = _of(out, PRIMITIVE)
|
||||
assert obs.value == "shallow"
|
||||
assert obs.confidence == 0.70
|
||||
|
||||
|
||||
def test_one_stage_pipeline_emit_shallow() -> None:
|
||||
# median = 1 → shallow.
|
||||
out = list(extract_session(_session(["ls | wc"] * 5), sid="pipe-one"))
|
||||
obs = _of(out, PRIMITIVE)
|
||||
assert obs.value == "shallow"
|
||||
|
||||
|
||||
def test_two_stage_pipeline_emit_moderate() -> None:
|
||||
# median = 2 → moderate.
|
||||
out = list(extract_session(_session(["ls | grep x | wc"] * 5),
|
||||
sid="pipe-moderate"))
|
||||
obs = _of(out, PRIMITIVE)
|
||||
assert obs.value == "moderate"
|
||||
|
||||
|
||||
def test_three_stage_pipeline_emit_deep() -> None:
|
||||
# median = 3 → deep.
|
||||
out = list(extract_session(_session(["ls | grep x | sort | uniq"] * 5),
|
||||
sid="pipe-deep"))
|
||||
obs = _of(out, PRIMITIVE)
|
||||
assert obs.value == "deep"
|
||||
|
||||
|
||||
def test_pasted_pipeline_still_counts() -> None:
|
||||
"""Pipes inside a paste-burst event count toward pipe_count — the
|
||||
operator chose to execute the pipeline, regardless of provenance."""
|
||||
# Single big paste event then \r — one command.
|
||||
events: list[AsciinemaEvent] = [
|
||||
(0.0, "i", "ls | grep x | sort | uniq | wc"),
|
||||
(0.1, "i", "\r"),
|
||||
]
|
||||
# Need ≥5 commands to get past the SHELL_MASTERY_MIN_COMMANDS gate.
|
||||
t = 1.0
|
||||
for _ in range(4):
|
||||
events.append((t, "i", "ls | grep x | sort | uniq | wc"))
|
||||
events.append((t + 0.1, "i", "\r"))
|
||||
t += 1.0
|
||||
out = list(extract_session(events, sid="pipe-pasted"))
|
||||
obs = _of(out, PRIMITIVE)
|
||||
assert obs.value == "deep"
|
||||
|
||||
|
||||
def test_few_commands_drops_confidence() -> None:
|
||||
out = list(extract_session(_session(["ls", "pwd", "id"]),
|
||||
sid="pipe-low-n"))
|
||||
obs = _of(out, PRIMITIVE)
|
||||
assert obs.confidence == 0.40
|
||||
|
||||
|
||||
def test_segmentation_populates_pipe_count() -> None:
|
||||
events = _command(0.0, "ls | grep x | wc") + _command(5.0, "pwd")
|
||||
ctx = build_session_context(events, sid="seg-pipe", source="t")
|
||||
assert len(ctx.commands) == 2
|
||||
assert ctx.commands[0].pipe_count == 2
|
||||
assert ctx.commands[1].pipe_count == 0
|
||||
Reference in New Issue
Block a user