feat(profiler/behave_shell): G.4 operational.multi_actor_indicators

Compare median intra-command IATs of the two temporal halves of the
session. ≥ MULTI_ACTOR_HALF_MIN_COMMANDS (4) per half required;
relative delta > MULTI_ACTOR_HANDOFF_DELTA (0.5) → handoff_detected.

team_coordinated is Tier B (cross-session); never emitted from a
single session. Confidence 0.55 with both halves ≥ 8 commands; 0.40
otherwise.
This commit is contained in:
2026-05-08 16:33:15 -04:00
parent 17b53dad4d
commit acf8382bcf
3 changed files with 140 additions and 0 deletions

View File

@@ -33,6 +33,7 @@ from decnet.profiler.behave_shell._features.environmental import (
)
from decnet.profiler.behave_shell._features.operational import (
cleanup_behavior,
multi_actor_indicators,
objective,
opsec_discipline,
)
@@ -89,4 +90,5 @@ FEATURES: tuple[FeatureFn, ...] = (
objective,
opsec_discipline,
cleanup_behavior,
multi_actor_indicators,
)

View File

@@ -8,6 +8,7 @@ Step G.4: ``operational.multi_actor_indicators`` (lands later).
from __future__ import annotations
import collections
import statistics
from typing import Iterator
from decnet_behave_core.spec.envelope import Observation
@@ -28,6 +29,9 @@ from decnet.profiler.behave_shell._thresholds import (
INTENT_FULL_CONFIDENCE_MIN,
INTENT_MIN_COMMANDS,
MIN_COMMANDS_FOR_FULL_CONFIDENCE,
MULTI_ACTOR_HALF_MIN_COMMANDS,
MULTI_ACTOR_HANDOFF_DELTA,
MULTI_ACTOR_MIN_COMMANDS,
)
@@ -151,3 +155,64 @@ def cleanup_behavior(ctx: SessionContext) -> Iterator[Observation]:
value=value,
confidence=confidence,
)
def multi_actor_indicators(ctx: SessionContext) -> Iterator[Observation]:
"""Emit ``operational.multi_actor_indicators`` ∈ {solo, handoff_detected}.
Compare first-half vs second-half typing rhythm. ``team_coordinated``
is **never** emitted from a single session — it's Tier B and lands
in the attribution engine.
Algorithm:
* Split commands at the temporal midpoint
(``t_start + duration_s / 2``).
* Flatten ``ctx.intra_command_iats`` per half.
* If both halves have ≥ ``MULTI_ACTOR_HALF_MIN_COMMANDS`` (4)
commands AND
``abs(median_a - median_b) / max(median_a, median_b)`` >
``MULTI_ACTOR_HANDOFF_DELTA`` (0.5) → ``handoff_detected``.
* Else → ``solo``.
Skip emission when fewer than ``MULTI_ACTOR_MIN_COMMANDS`` (8)
total. Confidence 0.40 (single-session is a weak handoff signal);
0.55 when both halves are ≥ 8 commands.
"""
n = len(ctx.commands)
if n < MULTI_ACTOR_MIN_COMMANDS:
return
midpoint = ctx.t_start + ctx.duration_s / 2.0
a_iats: list[float] = []
b_iats: list[float] = []
a_count = 0
b_count = 0
for cmd, iats in zip(ctx.commands, ctx.intra_command_iats):
if cmd.start_ts < midpoint:
a_iats.extend(iats)
a_count += 1
else:
b_iats.extend(iats)
b_count += 1
if a_count < MULTI_ACTOR_HALF_MIN_COMMANDS or b_count < MULTI_ACTOR_HALF_MIN_COMMANDS:
value = "solo"
elif not a_iats or not b_iats:
value = "solo"
else:
median_a = statistics.median(a_iats)
median_b = statistics.median(b_iats)
denom = max(median_a, median_b)
if denom > 0.0:
delta = abs(median_a - median_b) / denom
value = "handoff_detected" if delta > MULTI_ACTOR_HANDOFF_DELTA else "solo"
else:
value = "solo"
if a_count >= 8 and b_count >= 8:
confidence = 0.55
else:
confidence = 0.40
yield make_observation(
ctx,
primitive="operational.multi_actor_indicators",
value=value,
confidence=confidence,
)

View File

@@ -0,0 +1,73 @@
"""Step G.4: ``operational.multi_actor_indicators`` ∈ {solo, handoff_detected}.
``team_coordinated`` is Tier B (cross-session) — never emitted here.
"""
from __future__ import annotations
from decnet.profiler.behave_shell import extract_session
from decnet.profiler.behave_shell._parse import AsciinemaEvent
PRIMITIVE = "operational.multi_actor_indicators"
def _of(observations: list, primitive: str):
obs = [o for o in observations if o.primitive == primitive]
assert len(obs) == 1, f"expected exactly one {primitive}, got {len(obs)}"
return obs[0]
def _typed(text: str, t0: float, dt: float = 0.05) -> list[AsciinemaEvent]:
return [(t0 + i * dt, "i", c) for i, c in enumerate(text)]
def _cmd(token: str, t0: float, dt: float = 0.05) -> list[AsciinemaEvent]:
"""Emit one command (no prompt — output unused for this primitive)."""
events = _typed(f"{token}\r", t0=t0, dt=dt)
return events
def test_too_few_commands_no_emission() -> None:
events: list[AsciinemaEvent] = []
for i in range(5):
events += _cmd("ls", t0=float(i))
out = list(extract_session(events, sid="g4-thin"))
assert [o for o in out if o.primitive == PRIMITIVE] == []
def test_solo_consistent_typing() -> None:
"""Same dt across both halves → small delta → solo."""
events: list[AsciinemaEvent] = []
for i in range(10):
events += _cmd("hostname", t0=float(i * 2), dt=0.10)
obs = _of(list(extract_session(events, sid="g4-solo")), PRIMITIVE)
assert obs.value == "solo"
def test_handoff_detected_speed_jump() -> None:
"""First half slow typing (dt=0.20), second half fast (dt=0.05)."""
events: list[AsciinemaEvent] = []
for i in range(8):
events += _cmd("hostname", t0=float(i * 2), dt=0.20)
for i in range(8):
events += _cmd("hostname", t0=float(20 + i * 2), dt=0.05)
obs = _of(list(extract_session(events, sid="g4-handoff")), PRIMITIVE)
assert obs.value == "handoff_detected"
def test_team_coordinated_never_emitted() -> None:
"""Tier B value must never appear, regardless of input."""
events: list[AsciinemaEvent] = []
for i in range(20):
dt = 0.10 + (0.20 if i % 2 else 0.0)
events += _cmd("hostname", t0=float(i * 2), dt=dt)
obs = _of(list(extract_session(events, sid="g4-no-team")), PRIMITIVE)
assert obs.value in ("solo", "handoff_detected")
def test_high_count_raises_confidence() -> None:
events: list[AsciinemaEvent] = []
for i in range(20):
events += _cmd("hostname", t0=float(i * 2), dt=0.10)
obs = _of(list(extract_session(events, sid="g4-conf")), PRIMITIVE)
assert obs.confidence == 0.55