Shell-session behavioral observation registry layered on core. SPDX: GPL-3.0-or-later (code) / CC-BY-SA-4.0 (attribution-recipes.md). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
299 lines
18 KiB
Python
299 lines
18 KiB
Python
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
"""BEHAVE primitive registry.
|
|
|
|
Source-of-truth for what `Observation.primitive` may be and what `Observation.value`
|
|
must look like. Mirrors every row in the primitive tables of `scratchpad.md`.
|
|
|
|
Adding a new primitive is a deliberate registry edit. Sensors are expected to fail
|
|
loudly if they construct an `Observation` with an unknown primitive — that is by
|
|
design. Drift between this registry and `scratchpad.md` is a bug; v0.1 keeps the
|
|
registry hand-written so PR review catches drift, v0.2 may auto-extract from the
|
|
markdown if drift becomes a maintenance issue.
|
|
|
|
PII discipline: the value-type specs here describe the SHAPE of the value, not
|
|
its content. Sensors are still bound by the rules in `spec/envelope.py`'s module
|
|
docstring — never put raw keystrokes, command bodies, credentials, or payload
|
|
bytes into a value, regardless of what shape this registry permits.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from enum import Enum
|
|
from typing import Any, Optional
|
|
|
|
from pydantic import BaseModel, Field
|
|
|
|
|
|
class ValueKind(str, Enum):
|
|
"""Discriminator for the shape an `Observation.value` must take."""
|
|
|
|
CATEGORICAL = "categorical" # str, must appear in `allowed`
|
|
NUMERIC = "numeric" # int | float, optional min/max bounds
|
|
HASH = "hash" # str — hex / base64 / fingerprint string
|
|
ARRAY = "array" # list, element shape given by `array_of`
|
|
FREE_STRING = "free_string" # arbitrary string (e.g. BCP-47 locale, p0f label)
|
|
BOOL = "bool" # plain boolean
|
|
|
|
|
|
class ValueTypeSpec(BaseModel):
|
|
"""Per-primitive value-type spec.
|
|
|
|
Only the fields relevant to ``kind`` should be populated; the rest stay None.
|
|
Validation in ``Observation`` consults this spec to accept or reject a value
|
|
for a given primitive.
|
|
"""
|
|
|
|
kind: ValueKind
|
|
allowed: Optional[list[str]] = Field(
|
|
default=None, description="CATEGORICAL only — enum of valid string values"
|
|
)
|
|
min_val: Optional[float] = Field(default=None, description="NUMERIC lower bound (inclusive)")
|
|
max_val: Optional[float] = Field(default=None, description="NUMERIC upper bound (inclusive)")
|
|
array_of: Optional[ValueKind] = Field(
|
|
default=None, description="ARRAY only — kind of each element"
|
|
)
|
|
notes: Optional[str] = Field(default=None, description="Free-form note for registry readers")
|
|
|
|
def validate_value(self, value: Any) -> None:
|
|
"""Raise ``ValueError`` if *value* does not conform to this spec."""
|
|
if self.kind is ValueKind.CATEGORICAL:
|
|
if not isinstance(value, str):
|
|
raise ValueError(f"expected categorical string, got {type(value).__name__}")
|
|
if self.allowed is not None and value not in self.allowed:
|
|
raise ValueError(
|
|
f"value {value!r} not in allowed set {self.allowed!r}"
|
|
)
|
|
elif self.kind is ValueKind.NUMERIC:
|
|
if isinstance(value, bool) or not isinstance(value, (int, float)):
|
|
raise ValueError(f"expected numeric, got {type(value).__name__}")
|
|
if self.min_val is not None and value < self.min_val:
|
|
raise ValueError(f"value {value} below min_val {self.min_val}")
|
|
if self.max_val is not None and value > self.max_val:
|
|
raise ValueError(f"value {value} above max_val {self.max_val}")
|
|
elif self.kind is ValueKind.HASH:
|
|
if not isinstance(value, str) or not value:
|
|
raise ValueError("expected non-empty hash string")
|
|
elif self.kind is ValueKind.FREE_STRING:
|
|
if not isinstance(value, str):
|
|
raise ValueError(f"expected string, got {type(value).__name__}")
|
|
elif self.kind is ValueKind.BOOL:
|
|
if not isinstance(value, bool):
|
|
raise ValueError(f"expected bool, got {type(value).__name__}")
|
|
elif self.kind is ValueKind.ARRAY:
|
|
if not isinstance(value, list):
|
|
raise ValueError(f"expected array, got {type(value).__name__}")
|
|
if self.array_of is None:
|
|
return
|
|
element_spec = ValueTypeSpec(kind=self.array_of)
|
|
for i, element in enumerate(value):
|
|
try:
|
|
element_spec.validate_value(element)
|
|
except ValueError as exc:
|
|
raise ValueError(f"array element [{i}]: {exc}") from None
|
|
|
|
|
|
# ─── Convenience constructors (keep the registry table readable) ────────────
|
|
|
|
def _cat(*allowed: str, notes: Optional[str] = None) -> ValueTypeSpec:
|
|
return ValueTypeSpec(kind=ValueKind.CATEGORICAL, allowed=list(allowed), notes=notes)
|
|
|
|
def _num(min_val: Optional[float] = None, max_val: Optional[float] = None, notes: Optional[str] = None) -> ValueTypeSpec:
|
|
return ValueTypeSpec(kind=ValueKind.NUMERIC, min_val=min_val, max_val=max_val, notes=notes)
|
|
|
|
def _hash(notes: Optional[str] = None) -> ValueTypeSpec:
|
|
return ValueTypeSpec(kind=ValueKind.HASH, notes=notes)
|
|
|
|
def _str(notes: Optional[str] = None) -> ValueTypeSpec:
|
|
return ValueTypeSpec(kind=ValueKind.FREE_STRING, notes=notes)
|
|
|
|
def _bool(notes: Optional[str] = None) -> ValueTypeSpec:
|
|
return ValueTypeSpec(kind=ValueKind.BOOL, notes=notes)
|
|
|
|
def _array(of: ValueKind, notes: Optional[str] = None) -> ValueTypeSpec:
|
|
return ValueTypeSpec(kind=ValueKind.ARRAY, array_of=of, notes=notes)
|
|
|
|
|
|
# ─── The registry ───────────────────────────────────────────────────────────
|
|
#
|
|
# Mirrors scratchpad.md row-for-row. If you edit one, edit the other.
|
|
|
|
PRIMITIVE_REGISTRY: dict[str, ValueTypeSpec] = {
|
|
# ── motor.* ────────────────────────────────────────────────────────────
|
|
"motor.keystroke_cadence": _cat("steady", "bursty", "hunt_and_peck", "machine"),
|
|
"motor.motor_stability": _cat("steady", "variable", "tremor"),
|
|
"motor.error_correction": _cat("immediate", "deferred", "absent", "route_around"),
|
|
"motor.command_chunking": _cat("fluent", "fragmented", "single_command"),
|
|
"motor.paste_burst_rate": _cat("none", "occasional", "habitual"),
|
|
"motor.input_modality": _cat(
|
|
"typed", "pasted", "mixed",
|
|
notes="dominant input modality across the session — first-class promotion of the paste-vs-type axis",
|
|
),
|
|
# motor.shell_mastery.*
|
|
"motor.shell_mastery.tab_completion": _cat("none", "occasional", "habitual"),
|
|
"motor.shell_mastery.shortcut_usage": _cat("none", "moderate", "heavy"),
|
|
"motor.shell_mastery.pipe_chaining_depth": _cat("shallow", "moderate", "deep"),
|
|
|
|
# ── cognitive.* ────────────────────────────────────────────────────────
|
|
"cognitive.cognitive_load": _cat("low", "medium", "high"),
|
|
"cognitive.exploration_style": _cat("methodical", "chaotic", "targeted"),
|
|
"cognitive.planning_depth": _cat("deep", "shallow", "reactive"),
|
|
"cognitive.tool_vocabulary": _cat("narrow", "moderate", "broad"),
|
|
"cognitive.inter_command_latency_class": _cat(
|
|
"instant", "typing_speed", "deliberate",
|
|
"llm_lightweight", "llm_heavyweight", "long",
|
|
notes="llm_lightweight = 2-8s (orchestrated agents w/ small models or terse "
|
|
"prompts); llm_heavyweight = 8-30s (reasoning-class agents in tool "
|
|
"loops with text generation between calls); long = >30s (likely "
|
|
"human-supervised LLM workflow). The two LLM bands are the v0.2 "
|
|
"split of the original llm_roundtrip 2-8s band, which conflated "
|
|
"lightweight and reasoning-class operators.",
|
|
),
|
|
"cognitive.inter_command_consistency": _cat(
|
|
"metronomic", "variable", "bimodal",
|
|
notes="dispersion (CV) of inter-command pauses; metronomic = LLM-pure, "
|
|
"variable = human, bimodal = LLM-assisted human (LLM-paced bursts + "
|
|
"human-thinking gaps). v0.1 uses CV thresholds; true bimodal "
|
|
"detection (Hartigan dip / two-peak detection) is v0.2.",
|
|
),
|
|
"cognitive.command_branch_diversity": _cat(
|
|
"linear_playbook", "adaptive_branching", "unknown",
|
|
notes="Content-based (not timing-based) discriminator between scripted "
|
|
"playbook execution and adaptive branching. Computed from the "
|
|
"set of first-token binaries in the session: low repetition "
|
|
"(unique/total ratio near 1) = linear_playbook (each step a "
|
|
"different canonical recon command). High repetition (multiple "
|
|
"invocations of the same tool with different args) = adaptive_"
|
|
"branching (operator iterating on a tool to follow up on a "
|
|
"finding). Empirically (CLAUDE-FF vs CLAUDE-CL on 2026-05-02): "
|
|
"fire-and-forget runs 10 distinct tools, closed-loop runs 5-6 "
|
|
"tools with curl repeated as the operator chases a thread.",
|
|
),
|
|
"cognitive.feedback_loop_engagement": _cat(
|
|
"closed_loop", "fire_and_forget", "unknown",
|
|
notes="Whether the operator's pace correlates with the volume of output "
|
|
"they observed before issuing the next command. closed_loop = "
|
|
"positive Pearson r between preceding output bytes and subsequent "
|
|
"pause (pause grows with output to read/ingest). fire_and_forget = "
|
|
"no correlation (operator paces independently of output, e.g. "
|
|
"scripted recon, prerecorded playbook). unknown = insufficient "
|
|
"samples to compute. CUTS ACROSS the LLM/human axis: humans reading "
|
|
"real output are closed_loop, scripted humans and fire-and-forget "
|
|
"LLM agents are fire_and_forget, closed-loop LLM agents (true plan-"
|
|
"execute-observe) are closed_loop. Replaces the v0.1 "
|
|
"output_pause_correlation primitive — same underlying measurement, "
|
|
"more honest framing.",
|
|
),
|
|
# cognitive.error_resilience.*
|
|
"cognitive.error_resilience.retry_tactic": _cat("rerun", "modify", "switch", "abort"),
|
|
"cognitive.error_resilience.frustration_typing": _cat("low", "moderate", "high"),
|
|
"cognitive.error_resilience.fallback_to_man": _cat("absent", "present"),
|
|
|
|
# ── temporal.* ─────────────────────────────────────────────────────────
|
|
"temporal.session_timing": _cat("diurnal", "nocturnal", "irregular"),
|
|
"temporal.session_duration": _cat("short", "medium", "long", "marathon"),
|
|
"temporal.escalation_pattern": _cat("sustained", "erratic", "bursty"),
|
|
"temporal.persistence": _cat("hit_and_run", "return_visitor", "resident"),
|
|
# temporal.lifecycle_markers.*
|
|
"temporal.lifecycle_markers.landing_ritual": _cat("present", "absent"),
|
|
"temporal.lifecycle_markers.exit_behavior": _cat("graceful", "abrupt", "cleanup"),
|
|
"temporal.lifecycle_markers.idle_periodicity": _cat("random", "periodic"),
|
|
|
|
# ── operational.* ──────────────────────────────────────────────────────
|
|
"operational.opsec_discipline": _cat("careful", "careless", "learning"),
|
|
"operational.cleanup_behavior": _cat("thorough", "partial", "none"),
|
|
"operational.objective": _cat("recon", "exfil", "persistence", "lateral", "destructive"),
|
|
"operational.multi_actor_indicators": _cat("solo", "handoff_detected", "team_coordinated"),
|
|
|
|
# ── environmental.* ────────────────────────────────────────────────────
|
|
"environmental.keyboard_layout": _cat("qwerty", "azerty", "qwertz", "other"),
|
|
"environmental.locale": _str(notes="BCP-47 tag (e.g. 'en-US', 'pt-BR'); free string by deliberate choice"),
|
|
"environmental.numpad_usage": _cat("detected", "not_detected"),
|
|
"environmental.terminal_multiplexer": _cat("none", "tmux", "screen"),
|
|
"environmental.shell_type": _cat("bash", "zsh", "fish", "cmd.exe", "powershell"),
|
|
|
|
# ── cultural.* ─────────────────────────────────────────────────────────
|
|
"cultural.meal_break_gaps": _cat("none_detected", "morning", "midday", "evening", "late_night"),
|
|
"cultural.periodic_micro_pauses": _cat("none_detected", "regular_intervals_detected"),
|
|
"cultural.dst_behavior": _cat("shifts_with_dst", "anchored_to_utc", "unknown"),
|
|
"cultural.weekend_cadence": _cat("fri_sat", "sat_sun", "no_weekend", "irregular"),
|
|
"cultural.holiday_gaps": _cat("none_detected", "specific_dates_detected"),
|
|
|
|
# ── emotional_valence.* ────────────────────────────────────────────────
|
|
"emotional_valence.valence": _cat("positive", "neutral", "negative"),
|
|
"emotional_valence.arousal": _cat("low_calm", "medium_engaged", "high_agitated"),
|
|
"emotional_valence.stress_response": _cat("none", "eustress_positive", "distress_negative"),
|
|
"emotional_valence.frustration_venting": _cat("none", "detected"),
|
|
|
|
# ── toolchain.tls.* ────────────────────────────────────────────────────
|
|
"toolchain.tls.ja3_client": _hash(),
|
|
"toolchain.tls.ja3s_server": _hash(),
|
|
"toolchain.tls.ja4_client": _hash(),
|
|
"toolchain.tls.ja4s_server": _hash(),
|
|
"toolchain.tls.jarm_server": _hash(notes="62-char JARM hash"),
|
|
"toolchain.tls.tls_cert_simhash": _hash(notes="SHA-256 hex of leaf cert"),
|
|
|
|
# ── toolchain.transport.* ──────────────────────────────────────────────
|
|
"toolchain.transport.tcp_stack": _str(notes="p0f label, e.g. 'Linux 5.x'"),
|
|
"toolchain.transport.h2_akamai_fingerprint": _str(notes="HTTP/2 SETTINGS+priority+pseudo-header order hash; status: planned"),
|
|
"toolchain.transport.quic_client": _str(notes="QUIC initial packet fingerprint; status: planned"),
|
|
|
|
# ── toolchain.ssh.* ────────────────────────────────────────────────────
|
|
"toolchain.ssh.hassh_client": _hash(notes="md5"),
|
|
"toolchain.ssh.hassh_server": _hash(notes="md5; status: partial"),
|
|
"toolchain.ssh.ssh_client_banner": _str(notes="RFC 4253 banner string"),
|
|
"toolchain.ssh.kex_algorithm_order": _array(ValueKind.FREE_STRING),
|
|
|
|
# ── toolchain.http.* ───────────────────────────────────────────────────
|
|
"toolchain.http.user_agent_tool_class": _cat(
|
|
"nmap_nse", "sqlmap", "nuclei", "masscan", "curl", "metasploit",
|
|
"ffuf", "gobuster", "feroxbuster", "nikto", "wpscan", "evilwinrm",
|
|
"impacket", "unknown",
|
|
),
|
|
"toolchain.http.header_order_fingerprint": _str(notes="status: planned"),
|
|
"toolchain.http.body_oddities": _array(ValueKind.FREE_STRING, notes="status: planned"),
|
|
|
|
# ── toolchain.c2.* ─────────────────────────────────────────────────────
|
|
"toolchain.c2.beacon_family": _cat(
|
|
"cobalt_strike", "sliver", "havoc", "mythic",
|
|
"merlin", "brc4", "nighthawk", "unknown",
|
|
notes="last 3 = status: planned",
|
|
),
|
|
"toolchain.c2.beacon_interval_ms": _num(min_val=0, notes="median IAT in milliseconds"),
|
|
"toolchain.c2.beacon_jitter_cv": _num(min_val=0, notes="coefficient of variation"),
|
|
"toolchain.c2.sleep_skew": _cat("none", "gaussian", "uniform", "walk", notes="status: partial"),
|
|
"toolchain.c2.c2_callback_endpoint": _str(notes="url or host:port"),
|
|
"toolchain.c2.attack_software_id": _str(notes="MITRE Software ID, e.g. 'S0154'"),
|
|
|
|
# ── toolchain.protocol_abuse.* ─────────────────────────────────────────
|
|
"toolchain.protocol_abuse.dns_exfil_tool": _cat(
|
|
"iodine", "dnscat2", "custom_high_entropy", "none", notes="status: planned",
|
|
),
|
|
"toolchain.protocol_abuse.smb_dialect": _cat(
|
|
"SMB1", "SMB2.0.2", "SMB2.1", "SMB3.0", "SMB3.0.2", "SMB3.1.1",
|
|
notes="status: planned",
|
|
),
|
|
"toolchain.protocol_abuse.kerberos_etype_offer": _hash(notes="status: planned — hash of supported etypes"),
|
|
"toolchain.protocol_abuse.ldap_bind_pattern": _cat(
|
|
"simple", "sasl_gssapi", "ntlm", "ntlmssp_v1", "responder_like",
|
|
notes="status: partial",
|
|
),
|
|
"toolchain.protocol_abuse.responder_signature": _str(
|
|
notes="bool + variant; convention: 'false' or 'true:llmnr', 'true:nbtns', etc.; status: planned",
|
|
),
|
|
"toolchain.protocol_abuse.mitm6_signature": _bool(notes="status: planned"),
|
|
|
|
# ── toolchain.payload.* ────────────────────────────────────────────────
|
|
"toolchain.payload.payload_simhash": _hash(notes="64-bit SimHash, hex string"),
|
|
"toolchain.payload.payload_entropy_class": _cat("low", "medium", "high", "packed", notes="status: planned"),
|
|
"toolchain.payload.loader_family": _cat("donut", "sgn", "pe2sh", "nimcrypt", "unknown", notes="status: planned"),
|
|
}
|
|
|
|
|
|
def is_known(primitive: str) -> bool:
|
|
return primitive in PRIMITIVE_REGISTRY
|
|
|
|
|
|
def get(primitive: str) -> ValueTypeSpec:
|
|
"""Return the value-type spec for *primitive*; raise KeyError if unknown."""
|
|
return PRIMITIVE_REGISTRY[primitive]
|