test(profiler/behave_shell): Phase 6 smoke harness + live-decky runbook
Two-half deliverable per BEHAVE-INTEGRATION.md §587-594: * scripts/behave_shell/replay_calibration.py — Python helper that drives the production handler against one asciinema shard, mints a temp SQLite repo + an Attacker per session, captures bus emissions in-process. Exits non-zero on zero-observation sessions. * scripts/behave_shell/smoke.sh — bash entry that replays all five 2026-05-02 calibration shards (HUMAN / YOU-sim / LW-sim / CLAUDE-FF / CLAUDE-CL). Auto-activates .311 venv, forces DECNET_DB_TYPE=sqlite, prints per-class summary. Suitable for CI. * scripts/behave_shell/README.md — runbook covering both halves. Pins the manual live-decky procedure (one SSH session per class against a deployed smoke-decky, expected dominant primitives table, SQL verification query, AttackerDetail panel check, pass criteria). * BEHAVE-INTEGRATION.md — Phase 6 completion log appended with current corpus results table (15 sessions, 424 observations across the five classes) and a note that the v0 tag (drop -pre) is gated on the manual live-decky round-trip and lands as a separate commit. Live-decky run is intentionally NOT scripted — the integration doc calls for manual SSH sessions per class so an operator confirms the bus / collector / disk-reach plumbing under real PTY conditions.
This commit is contained in:
110
scripts/behave_shell/README.md
Normal file
110
scripts/behave_shell/README.md
Normal file
@@ -0,0 +1,110 @@
|
||||
# BEHAVE-SHELL — Phase 6 smoke
|
||||
|
||||
Two halves:
|
||||
|
||||
1. **Offline replay** — `smoke.sh` replays the five 2026-05-02
|
||||
calibration shards through the production handler. Exercises the
|
||||
engine + storage layer end-to-end without a live PTY. Suitable for
|
||||
CI.
|
||||
2. **Live decky round-trip** — manual procedure below. Confirms the
|
||||
bus / collector / disk-reach plumbing on a real session.
|
||||
|
||||
## 1. Offline replay
|
||||
|
||||
```sh
|
||||
$ scripts/behave_shell/smoke.sh # auto-discovers ../BEHAVE/prototype_extractors/shell
|
||||
$ scripts/behave_shell/smoke.sh /path/to/calibration/dir # explicit dir
|
||||
```
|
||||
|
||||
Expected output (15 sessions across 5 classes, 424 total observations
|
||||
on the current corpus):
|
||||
|
||||
```
|
||||
[HUMAN] sessions=1 observations=34 distinct_primitives=34
|
||||
[YOU-sim] sessions=2 observations=59 distinct_primitives=34
|
||||
[LW-sim] sessions=5 observations=136 distinct_primitives=34
|
||||
[CLAUDE-FF] sessions=3 observations=84 distinct_primitives=34
|
||||
[CLAUDE-CL] sessions=4 observations=111 distinct_primitives=34
|
||||
smoke: OK — all classes emit observations end-to-end
|
||||
```
|
||||
|
||||
Exit codes: `0` full pass, `1` any class regressed, `2` argument /
|
||||
IO error.
|
||||
|
||||
The replay drives `decnet.profiler.behave_shell._handler.handle_session_ended`
|
||||
directly against a temp SQLite DB seeded with one Attacker per
|
||||
session. Bus emission is captured by an in-process publisher; no
|
||||
real bus is required.
|
||||
|
||||
## 2. Live decky round-trip (manual)
|
||||
|
||||
End-to-end confirmation. Run **once** before tagging v0 and **after**
|
||||
any change to the bus / collector / disk-reach layer.
|
||||
|
||||
### Setup
|
||||
|
||||
1. Init a fresh DECNET host (see `decnet init`).
|
||||
2. `decnet bus` worker is up (systemd unit
|
||||
`decnet-bus.service` or `scripts/bus/smoke.sh`).
|
||||
3. `decnet-profiler.service` is up — it owns the
|
||||
`attacker.session.ended` subscription and the BEHAVE-SHELL handler.
|
||||
4. `decnet-collector.service` is up — it publishes
|
||||
`attacker.session.ended` from `session_recorded` log events.
|
||||
5. Web API is up; you have a viewer JWT in your browser localStorage.
|
||||
6. Deploy a single `ssh` decky:
|
||||
```sh
|
||||
$ decnet decky deploy --service ssh --decky smoke-decky
|
||||
```
|
||||
The decky's sessrec wrapper appends to
|
||||
`/var/lib/decnet/artifacts/smoke-decky/ssh/transcripts/sessions-<UTC-DAY>.jsonl`.
|
||||
|
||||
### Run one session per calibration class
|
||||
|
||||
For each class, SSH into the decky and reproduce the canonical
|
||||
workload. Log out via the documented exit path so the
|
||||
`session_recorded` event fires. The collector aggregates the session
|
||||
and publishes `attacker.session.ended`; the profiler worker
|
||||
disk-reaches the shard, runs `extract_session()`, persists rows,
|
||||
publishes one `attacker.observation.<primitive>` per emission.
|
||||
|
||||
| Class | Workload sketch | Expected dominant primitives |
|
||||
|---|---|---|
|
||||
| HUMAN | Type each command live; correct typos; pause to read output. | `motor.input_modality=typed`, `cognitive.feedback_loop_engagement=closed_loop` |
|
||||
| YOU-sim | Paste short pre-canned commands at typing speed; minimal repeats. | `motor.input_modality=pasted`, `motor.paste_burst_rate=occasional`, `cognitive.command_branch_diversity=linear_playbook` |
|
||||
| LW-sim | Paste a recon sweep generated by a small LLM; ~2-8s between pastes. | `cognitive.inter_command_latency_class=llm_lightweight` |
|
||||
| CLAUDE-FF | Paste outputs from a fire-and-forget reasoning agent; ~8-30s gaps. | `cognitive.inter_command_latency_class=llm_heavyweight`, `cognitive.feedback_loop_engagement=fire_and_forget` |
|
||||
| CLAUDE-CL | Drive a closed-loop plan-execute-observe agent; >30s pauses on long output. | `cognitive.inter_command_latency_class=long`, `cognitive.feedback_loop_engagement=closed_loop` |
|
||||
|
||||
### Verify
|
||||
|
||||
For each class, after disconnecting:
|
||||
|
||||
1. **DB row landing** — within ~30s
|
||||
(the profiler tick interval), `observations` carries one row per
|
||||
primitive for the new attacker:
|
||||
```sh
|
||||
$ sqlite3 /var/lib/decnet/decnet.db \
|
||||
"SELECT primitive, value, confidence FROM observations \
|
||||
WHERE evidence_ref LIKE 'shard:smoke-decky/%' ORDER BY ts DESC LIMIT 40;"
|
||||
```
|
||||
2. **Bus events** — tail the bus worker log; you should see one
|
||||
`attacker.observation.<primitive>` per emitted row, plus the
|
||||
originating `attacker.session.ended`.
|
||||
3. **AttackerDetail panel** — open
|
||||
`/attackers/<uuid>` in the browser. The Behavioural primitives
|
||||
section should hydrate from the REST snapshot and live-update
|
||||
each time you replay the session
|
||||
(the SSE route forwards the new emissions in real time).
|
||||
|
||||
### Pass criteria
|
||||
|
||||
* All 5 classes produce ≥ 27 distinct primitives in
|
||||
`observations` (the per-shard hard gate from
|
||||
`tests/profiler/behave_shell/test_calibration_grid.py`).
|
||||
* The four day-one priority primitives appear in the panel and carry
|
||||
the expected values per class (table above).
|
||||
* No collector / profiler / web errors in the journal during the
|
||||
round-trip.
|
||||
|
||||
If any class regresses: rollback the last commit and run the offline
|
||||
replay (`smoke.sh`) to localise — same handler, no transport noise.
|
||||
169
scripts/behave_shell/replay_calibration.py
Normal file
169
scripts/behave_shell/replay_calibration.py
Normal file
@@ -0,0 +1,169 @@
|
||||
"""Replay one calibration-corpus shard through the BEHAVE-SHELL handler.
|
||||
|
||||
Phase 6 smoke helper. Drives the production handler
|
||||
(``decnet.profiler.behave_shell._handler.handle_session_ended``)
|
||||
against an asciinema shard from
|
||||
``BEHAVE/prototype_extractors/shell/`` *without* a live decky.
|
||||
Mints a temp SQLite repo, an Attacker row, and an
|
||||
``attacker.session.ended``-shape payload, then calls the handler
|
||||
exactly the way the worker does.
|
||||
|
||||
This is **not** a substitute for the manual decky run described in
|
||||
``scripts/behave_shell/README.md`` — the integration doc's Phase 6
|
||||
calls for a real PTY round-trip. This helper exercises the handler +
|
||||
storage layer end-to-end without the worker loop, so a failure here
|
||||
points at the engine and not at the bus / collector / disk-reach
|
||||
plumbing.
|
||||
|
||||
Usage::
|
||||
|
||||
python scripts/behave_shell/replay_calibration.py \\
|
||||
--shard /path/to/sessions-2026-05-02.jsonl \\
|
||||
--label HUMAN
|
||||
|
||||
Exit codes:
|
||||
0 every session in the shard produced ≥ 1 observation
|
||||
1 zero observations produced for at least one session
|
||||
2 argument / IO error
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import collections
|
||||
import json
|
||||
import sys
|
||||
import tempfile
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from decnet.profiler.behave_shell._handler import handle_session_ended
|
||||
from decnet.web.db.factory import get_repository
|
||||
|
||||
|
||||
def _sids_in_shard(shard: Path) -> list[str]:
|
||||
sids: list[str] = []
|
||||
seen: set[str] = set()
|
||||
with shard.open() as f:
|
||||
for line in f:
|
||||
try:
|
||||
rec = json.loads(line)
|
||||
except (ValueError, json.JSONDecodeError):
|
||||
continue
|
||||
if not isinstance(rec, dict):
|
||||
continue
|
||||
sid = rec.get("sid")
|
||||
if not isinstance(sid, str) or sid in seen:
|
||||
continue
|
||||
seen.add(sid)
|
||||
sids.append(sid)
|
||||
return sids
|
||||
|
||||
|
||||
async def _seed_attacker(repo: Any, ip: str) -> str:
|
||||
return await repo.upsert_attacker({
|
||||
"ip": ip,
|
||||
"first_seen": datetime.now(timezone.utc),
|
||||
"last_seen": datetime.now(timezone.utc),
|
||||
"event_count": 1,
|
||||
"service_count": 1,
|
||||
"decky_count": 1,
|
||||
"services": "[\"ssh\"]",
|
||||
"deckies": "[\"smoke-decky\"]",
|
||||
"traversal_path": None,
|
||||
"is_traversal": False,
|
||||
"bounty_count": 0,
|
||||
"credential_count": 0,
|
||||
"fingerprints": "[]",
|
||||
"commands": "[]",
|
||||
"country_code": None,
|
||||
"country_source": None,
|
||||
"asn": None,
|
||||
"as_name": None,
|
||||
"asn_source": None,
|
||||
"updated_at": datetime.now(timezone.utc),
|
||||
})
|
||||
|
||||
|
||||
def _payload_for(shard: Path, sid: str, ip: str) -> dict[str, Any]:
|
||||
return {
|
||||
"session_id": sid,
|
||||
"attacker_uuid": None,
|
||||
"attacker_ip": ip,
|
||||
"decky_id": "smoke-decky",
|
||||
"service": "ssh",
|
||||
"ended_at": datetime.now(timezone.utc).isoformat(),
|
||||
"duration_s": 0.0,
|
||||
"commands": [],
|
||||
"shard_path": str(shard),
|
||||
}
|
||||
|
||||
|
||||
async def _replay(shard: Path, label: str) -> int:
|
||||
sids = _sids_in_shard(shard)
|
||||
if not sids:
|
||||
print(f"[{label}] FAIL — no sids found in shard", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
with tempfile.TemporaryDirectory(prefix="behave-smoke.") as tmp:
|
||||
db_path = Path(tmp) / "smoke.db"
|
||||
repo = get_repository(db_path=str(db_path))
|
||||
await repo.initialize()
|
||||
|
||||
bus_events: list[tuple[str, dict[str, Any], str]] = []
|
||||
|
||||
def _publish(topic: str, payload: dict[str, Any], event_type: str) -> None:
|
||||
bus_events.append((topic, payload, event_type))
|
||||
|
||||
per_sid_counts: dict[str, int] = {}
|
||||
per_sid_primitives: dict[str, collections.Counter] = {}
|
||||
for sid in sids:
|
||||
ip = f"10.{abs(hash(sid)) % 256}.{abs(hash(sid + label)) % 256}.5"
|
||||
await _seed_attacker(repo, ip)
|
||||
n = await handle_session_ended(repo, _payload_for(shard, sid, ip), _publish)
|
||||
per_sid_counts[sid] = n
|
||||
per_sid_primitives[sid] = collections.Counter()
|
||||
|
||||
# Snapshot the observations table for each sid via evidence_ref.
|
||||
all_primitives: collections.Counter[str] = collections.Counter()
|
||||
for topic, payload, _etype in bus_events:
|
||||
primitive = payload.get("primitive") or topic.split(".", 2)[2]
|
||||
all_primitives[primitive] += 1
|
||||
|
||||
total_obs = sum(per_sid_counts.values())
|
||||
empty_sids = [sid for sid, n in per_sid_counts.items() if n == 0]
|
||||
|
||||
print(f"[{label}] sessions={len(sids)} observations={total_obs} "
|
||||
f"distinct_primitives={len(all_primitives)} bus_events={len(bus_events)}")
|
||||
if empty_sids:
|
||||
print(f"[{label}] FAIL — {len(empty_sids)}/{len(sids)} sessions emitted "
|
||||
f"zero observations", file=sys.stderr)
|
||||
for sid in empty_sids[:3]:
|
||||
print(f"[{label}] empty sid={sid}", file=sys.stderr)
|
||||
return 1
|
||||
# One-line top-5 primitive sample for visual sanity.
|
||||
top = ", ".join(
|
||||
f"{p}={c}" for p, c in all_primitives.most_common(5)
|
||||
)
|
||||
print(f"[{label}] top: {top}")
|
||||
return 0
|
||||
|
||||
|
||||
async def _main() -> int:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--shard", required=True, type=Path,
|
||||
help="Path to a sessions-YYYY-MM-DD.jsonl shard")
|
||||
parser.add_argument("--label", required=True,
|
||||
help="Calibration class label (HUMAN / YOU-sim / "
|
||||
"LW-sim / CLAUDE-FF / CLAUDE-CL)")
|
||||
args = parser.parse_args()
|
||||
if not args.shard.is_file():
|
||||
print(f"shard not a file: {args.shard}", file=sys.stderr)
|
||||
return 2
|
||||
return await _replay(args.shard, args.label)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(asyncio.run(_main()))
|
||||
96
scripts/behave_shell/smoke.sh
Executable file
96
scripts/behave_shell/smoke.sh
Executable file
@@ -0,0 +1,96 @@
|
||||
#!/usr/bin/env bash
|
||||
### Usage: scripts/behave_shell/smoke.sh [BEHAVE_CALIBRATION_DIR]
|
||||
#
|
||||
# BEHAVE-INTEGRATION Phase 6 — offline replay smoke test.
|
||||
#
|
||||
# Runs the production handler
|
||||
# (`decnet.profiler.behave_shell._handler.handle_session_ended`) against
|
||||
# each of the five 2026-05-02 calibration shards, asserts every session
|
||||
# in every shard produces ≥ 1 observation, and prints a per-class
|
||||
# summary.
|
||||
#
|
||||
# This is the **offline** half of Phase 6. The **live-decky** half is
|
||||
# documented in `scripts/behave_shell/README.md` — that one needs a
|
||||
# real PTY round-trip and stays manual.
|
||||
#
|
||||
# Argument:
|
||||
# $1 Optional path to the directory holding
|
||||
# sessions-2026-05-02-*.jsonl. Defaults to
|
||||
# ../BEHAVE/prototype_extractors/shell relative to this repo.
|
||||
#
|
||||
# Exits 0 on full pass, 1 on any class regression, 2 on bad input.
|
||||
set -euo pipefail
|
||||
|
||||
HERE="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
REPO_ROOT="$(cd "${HERE}/../.." && pwd)"
|
||||
DEFAULT_DIR="${REPO_ROOT}/../BEHAVE/prototype_extractors/shell"
|
||||
CALIB_DIR="${1:-${DEFAULT_DIR}}"
|
||||
|
||||
if [[ ! -d "${CALIB_DIR}" ]]; then
|
||||
echo "smoke: FAIL — calibration dir not found: ${CALIB_DIR}" >&2
|
||||
echo "smoke: pass it as \$1 or symlink it next to DECNET/" >&2
|
||||
exit 2
|
||||
fi
|
||||
|
||||
# Auto-activate the project venv so the script works whether or not
|
||||
# the caller already sourced it (mirrors the .311 convention from the
|
||||
# pre-commit hook).
|
||||
if [[ -d "${REPO_ROOT}/.311" ]]; then
|
||||
# shellcheck disable=SC1091
|
||||
source "${REPO_ROOT}/.311/bin/activate"
|
||||
fi
|
||||
|
||||
# Force sqlite so the smoke doesn't depend on a running mysql.
|
||||
export DECNET_DB_TYPE="sqlite"
|
||||
|
||||
# Suppress the verbose decnet logger so the per-class summary lines
|
||||
# stay readable. ANTI's developer log has DEBUG enabled via env; mute
|
||||
# at the smoke entrypoint.
|
||||
export DECNET_LOG_LEVEL="${DECNET_LOG_LEVEL:-WARNING}"
|
||||
unset DECNET_DEVELOPER_MODE 2>/dev/null || true
|
||||
|
||||
declare -a SHARDS=(
|
||||
"sessions-2026-05-02.jsonl|HUMAN"
|
||||
"sessions-2026-05-02-with-llm.jsonl|YOU-sim"
|
||||
"sessions-2026-05-02-new.jsonl|LW-sim"
|
||||
"sessions-2026-05-02-with-claude.jsonl|CLAUDE-FF"
|
||||
"sessions-2026-05-02-closed-loop.jsonl|CLAUDE-CL"
|
||||
)
|
||||
|
||||
LOGDIR="$(mktemp -d -t behave-smoke.XXXXXX)"
|
||||
trap 'rm -rf "${LOGDIR}"' EXIT
|
||||
|
||||
echo "smoke: replaying ${#SHARDS[@]} calibration classes from ${CALIB_DIR}"
|
||||
echo "smoke: per-class logs in ${LOGDIR}"
|
||||
echo
|
||||
|
||||
failed=0
|
||||
for entry in "${SHARDS[@]}"; do
|
||||
fn="${entry%%|*}"
|
||||
label="${entry##*|}"
|
||||
shard="${CALIB_DIR}/${fn}"
|
||||
if [[ ! -f "${shard}" ]]; then
|
||||
echo "[${label}] SKIP — shard not present: ${shard}" >&2
|
||||
continue
|
||||
fi
|
||||
log="${LOGDIR}/${label}.log"
|
||||
set +e
|
||||
python "${HERE}/replay_calibration.py" \
|
||||
--shard "${shard}" --label "${label}" >"${log}" 2>&1
|
||||
rc=$?
|
||||
set -e
|
||||
# Surface the summary lines (everything starting with '['). They go
|
||||
# to stdout in the python tool; stderr noise stays in the log file.
|
||||
grep -E '^\[' "${log}" || true
|
||||
if [[ "${rc}" -ne 0 ]]; then
|
||||
failed=$((failed + 1))
|
||||
echo "[${label}] (full log: ${log})" >&2
|
||||
fi
|
||||
done
|
||||
|
||||
echo
|
||||
if [[ "${failed}" -gt 0 ]]; then
|
||||
echo "smoke: FAIL — ${failed} class(es) regressed" >&2
|
||||
exit 1
|
||||
fi
|
||||
echo "smoke: OK — all classes emit observations end-to-end"
|
||||
Reference in New Issue
Block a user