# SPDX-License-Identifier: GPL-3.0-or-later """BEHAVE primitive registry. Source-of-truth for what `Observation.primitive` may be and what `Observation.value` must look like. Mirrors every row in the primitive tables of `scratchpad.md`. Adding a new primitive is a deliberate registry edit. Sensors are expected to fail loudly if they construct an `Observation` with an unknown primitive — that is by design. Drift between this registry and `scratchpad.md` is a bug; v0.1 keeps the registry hand-written so PR review catches drift, v0.2 may auto-extract from the markdown if drift becomes a maintenance issue. PII discipline: the value-type specs here describe the SHAPE of the value, not its content. Sensors are still bound by the rules in `spec/envelope.py`'s module docstring — never put raw keystrokes, command bodies, credentials, or payload bytes into a value, regardless of what shape this registry permits. """ from __future__ import annotations from enum import Enum from typing import Any, Optional from pydantic import BaseModel, Field class ValueKind(str, Enum): """Discriminator for the shape an `Observation.value` must take.""" CATEGORICAL = "categorical" # str, must appear in `allowed` NUMERIC = "numeric" # int | float, optional min/max bounds HASH = "hash" # str — hex / base64 / fingerprint string ARRAY = "array" # list, element shape given by `array_of` FREE_STRING = "free_string" # arbitrary string (e.g. BCP-47 locale, p0f label) BOOL = "bool" # plain boolean class ValueTypeSpec(BaseModel): """Per-primitive value-type spec. Only the fields relevant to ``kind`` should be populated; the rest stay None. Validation in ``Observation`` consults this spec to accept or reject a value for a given primitive. """ kind: ValueKind allowed: Optional[list[str]] = Field( default=None, description="CATEGORICAL only — enum of valid string values" ) min_val: Optional[float] = Field(default=None, description="NUMERIC lower bound (inclusive)") max_val: Optional[float] = Field(default=None, description="NUMERIC upper bound (inclusive)") array_of: Optional[ValueKind] = Field( default=None, description="ARRAY only — kind of each element" ) notes: Optional[str] = Field(default=None, description="Free-form note for registry readers") def validate_value(self, value: Any) -> None: """Raise ``ValueError`` if *value* does not conform to this spec.""" if self.kind is ValueKind.CATEGORICAL: if not isinstance(value, str): raise ValueError(f"expected categorical string, got {type(value).__name__}") if self.allowed is not None and value not in self.allowed: raise ValueError( f"value {value!r} not in allowed set {self.allowed!r}" ) elif self.kind is ValueKind.NUMERIC: if isinstance(value, bool) or not isinstance(value, (int, float)): raise ValueError(f"expected numeric, got {type(value).__name__}") if self.min_val is not None and value < self.min_val: raise ValueError(f"value {value} below min_val {self.min_val}") if self.max_val is not None and value > self.max_val: raise ValueError(f"value {value} above max_val {self.max_val}") elif self.kind is ValueKind.HASH: if not isinstance(value, str) or not value: raise ValueError("expected non-empty hash string") elif self.kind is ValueKind.FREE_STRING: if not isinstance(value, str): raise ValueError(f"expected string, got {type(value).__name__}") elif self.kind is ValueKind.BOOL: if not isinstance(value, bool): raise ValueError(f"expected bool, got {type(value).__name__}") elif self.kind is ValueKind.ARRAY: if not isinstance(value, list): raise ValueError(f"expected array, got {type(value).__name__}") if self.array_of is None: return element_spec = ValueTypeSpec(kind=self.array_of) for i, element in enumerate(value): try: element_spec.validate_value(element) except ValueError as exc: raise ValueError(f"array element [{i}]: {exc}") from None # ─── Convenience constructors (keep the registry table readable) ──────────── def _cat(*allowed: str, notes: Optional[str] = None) -> ValueTypeSpec: return ValueTypeSpec(kind=ValueKind.CATEGORICAL, allowed=list(allowed), notes=notes) def _num(min_val: Optional[float] = None, max_val: Optional[float] = None, notes: Optional[str] = None) -> ValueTypeSpec: return ValueTypeSpec(kind=ValueKind.NUMERIC, min_val=min_val, max_val=max_val, notes=notes) def _hash(notes: Optional[str] = None) -> ValueTypeSpec: return ValueTypeSpec(kind=ValueKind.HASH, notes=notes) def _str(notes: Optional[str] = None) -> ValueTypeSpec: return ValueTypeSpec(kind=ValueKind.FREE_STRING, notes=notes) def _bool(notes: Optional[str] = None) -> ValueTypeSpec: return ValueTypeSpec(kind=ValueKind.BOOL, notes=notes) def _array(of: ValueKind, notes: Optional[str] = None) -> ValueTypeSpec: return ValueTypeSpec(kind=ValueKind.ARRAY, array_of=of, notes=notes) # ─── The registry ─────────────────────────────────────────────────────────── # # Mirrors scratchpad.md row-for-row. If you edit one, edit the other. PRIMITIVE_REGISTRY: dict[str, ValueTypeSpec] = { # ── motor.* ──────────────────────────────────────────────────────────── "motor.keystroke_cadence": _cat("steady", "bursty", "hunt_and_peck", "machine"), "motor.motor_stability": _cat("steady", "variable", "tremor"), "motor.error_correction": _cat("immediate", "deferred", "absent", "route_around"), "motor.command_chunking": _cat("fluent", "fragmented", "single_command"), "motor.paste_burst_rate": _cat("none", "occasional", "habitual"), "motor.input_modality": _cat( "typed", "pasted", "mixed", notes="dominant input modality across the session — first-class promotion of the paste-vs-type axis", ), # motor.shell_mastery.* "motor.shell_mastery.tab_completion": _cat("none", "occasional", "habitual"), "motor.shell_mastery.shortcut_usage": _cat("none", "moderate", "heavy"), "motor.shell_mastery.pipe_chaining_depth": _cat("shallow", "moderate", "deep"), # ── cognitive.* ──────────────────────────────────────────────────────── "cognitive.cognitive_load": _cat("low", "medium", "high"), "cognitive.exploration_style": _cat("methodical", "chaotic", "targeted"), "cognitive.planning_depth": _cat("deep", "shallow", "reactive"), "cognitive.tool_vocabulary": _cat("narrow", "moderate", "broad"), "cognitive.inter_command_latency_class": _cat( "instant", "typing_speed", "deliberate", "llm_lightweight", "llm_heavyweight", "long", notes="llm_lightweight = 2-8s (orchestrated agents w/ small models or terse " "prompts); llm_heavyweight = 8-30s (reasoning-class agents in tool " "loops with text generation between calls); long = >30s (likely " "human-supervised LLM workflow). The two LLM bands are the v0.2 " "split of the original llm_roundtrip 2-8s band, which conflated " "lightweight and reasoning-class operators.", ), "cognitive.inter_command_consistency": _cat( "metronomic", "variable", "bimodal", notes="dispersion (CV) of inter-command pauses; metronomic = LLM-pure, " "variable = human, bimodal = LLM-assisted human (LLM-paced bursts + " "human-thinking gaps). v0.1 uses CV thresholds; true bimodal " "detection (Hartigan dip / two-peak detection) is v0.2.", ), "cognitive.command_branch_diversity": _cat( "linear_playbook", "adaptive_branching", "unknown", notes="Content-based (not timing-based) discriminator between scripted " "playbook execution and adaptive branching. Computed from the " "set of first-token binaries in the session: low repetition " "(unique/total ratio near 1) = linear_playbook (each step a " "different canonical recon command). High repetition (multiple " "invocations of the same tool with different args) = adaptive_" "branching (operator iterating on a tool to follow up on a " "finding). Empirically (CLAUDE-FF vs CLAUDE-CL on 2026-05-02): " "fire-and-forget runs 10 distinct tools, closed-loop runs 5-6 " "tools with curl repeated as the operator chases a thread.", ), "cognitive.feedback_loop_engagement": _cat( "closed_loop", "fire_and_forget", "unknown", notes="Whether the operator's pace correlates with the volume of output " "they observed before issuing the next command. closed_loop = " "positive Pearson r between preceding output bytes and subsequent " "pause (pause grows with output to read/ingest). fire_and_forget = " "no correlation (operator paces independently of output, e.g. " "scripted recon, prerecorded playbook). unknown = insufficient " "samples to compute. CUTS ACROSS the LLM/human axis: humans reading " "real output are closed_loop, scripted humans and fire-and-forget " "LLM agents are fire_and_forget, closed-loop LLM agents (true plan-" "execute-observe) are closed_loop. Replaces the v0.1 " "output_pause_correlation primitive — same underlying measurement, " "more honest framing.", ), # cognitive.error_resilience.* "cognitive.error_resilience.retry_tactic": _cat("rerun", "modify", "switch", "abort"), "cognitive.error_resilience.frustration_typing": _cat("low", "moderate", "high"), "cognitive.error_resilience.fallback_to_man": _cat("absent", "present"), # ── temporal.* ───────────────────────────────────────────────────────── "temporal.session_timing": _cat("diurnal", "nocturnal", "irregular"), "temporal.session_duration": _cat("short", "medium", "long", "marathon"), "temporal.escalation_pattern": _cat("sustained", "erratic", "bursty"), "temporal.persistence": _cat("hit_and_run", "return_visitor", "resident"), # temporal.lifecycle_markers.* "temporal.lifecycle_markers.landing_ritual": _cat("present", "absent"), "temporal.lifecycle_markers.exit_behavior": _cat("graceful", "abrupt", "cleanup"), "temporal.lifecycle_markers.idle_periodicity": _cat("random", "periodic"), # ── operational.* ────────────────────────────────────────────────────── "operational.opsec_discipline": _cat("careful", "careless", "learning"), "operational.cleanup_behavior": _cat("thorough", "partial", "none"), "operational.objective": _cat("recon", "exfil", "persistence", "lateral", "destructive"), "operational.multi_actor_indicators": _cat("solo", "handoff_detected", "team_coordinated"), # ── environmental.* ──────────────────────────────────────────────────── "environmental.keyboard_layout": _cat("qwerty", "azerty", "qwertz", "other"), "environmental.locale": _str(notes="BCP-47 tag (e.g. 'en-US', 'pt-BR'); free string by deliberate choice"), "environmental.numpad_usage": _cat("detected", "not_detected"), "environmental.terminal_multiplexer": _cat("none", "tmux", "screen"), "environmental.shell_type": _cat("bash", "zsh", "fish", "cmd.exe", "powershell"), # ── cultural.* ───────────────────────────────────────────────────────── "cultural.meal_break_gaps": _cat("none_detected", "morning", "midday", "evening", "late_night"), "cultural.periodic_micro_pauses": _cat("none_detected", "regular_intervals_detected"), "cultural.dst_behavior": _cat("shifts_with_dst", "anchored_to_utc", "unknown"), "cultural.weekend_cadence": _cat("fri_sat", "sat_sun", "no_weekend", "irregular"), "cultural.holiday_gaps": _cat("none_detected", "specific_dates_detected"), # ── emotional_valence.* ──────────────────────────────────────────────── "emotional_valence.valence": _cat("positive", "neutral", "negative"), "emotional_valence.arousal": _cat("low_calm", "medium_engaged", "high_agitated"), "emotional_valence.stress_response": _cat("none", "eustress_positive", "distress_negative"), "emotional_valence.frustration_venting": _cat("none", "detected"), # ── toolchain.tls.* ──────────────────────────────────────────────────── "toolchain.tls.ja3_client": _hash(), "toolchain.tls.ja3s_server": _hash(), "toolchain.tls.ja4_client": _hash(), "toolchain.tls.ja4s_server": _hash(), "toolchain.tls.jarm_server": _hash(notes="62-char JARM hash"), "toolchain.tls.tls_cert_simhash": _hash(notes="SHA-256 hex of leaf cert"), # ── toolchain.transport.* ────────────────────────────────────────────── "toolchain.transport.tcp_stack": _str(notes="p0f label, e.g. 'Linux 5.x'"), "toolchain.transport.h2_akamai_fingerprint": _str(notes="HTTP/2 SETTINGS+priority+pseudo-header order hash; status: planned"), "toolchain.transport.quic_client": _str(notes="QUIC initial packet fingerprint; status: planned"), # ── toolchain.ssh.* ──────────────────────────────────────────────────── "toolchain.ssh.hassh_client": _hash(notes="md5"), "toolchain.ssh.hassh_server": _hash(notes="md5; status: partial"), "toolchain.ssh.ssh_client_banner": _str(notes="RFC 4253 banner string"), "toolchain.ssh.kex_algorithm_order": _array(ValueKind.FREE_STRING), # ── toolchain.http.* ─────────────────────────────────────────────────── "toolchain.http.user_agent_tool_class": _cat( "nmap_nse", "sqlmap", "nuclei", "masscan", "curl", "metasploit", "ffuf", "gobuster", "feroxbuster", "nikto", "wpscan", "evilwinrm", "impacket", "unknown", ), "toolchain.http.header_order_fingerprint": _str(notes="status: planned"), "toolchain.http.body_oddities": _array(ValueKind.FREE_STRING, notes="status: planned"), # ── toolchain.c2.* ───────────────────────────────────────────────────── "toolchain.c2.beacon_family": _cat( "cobalt_strike", "sliver", "havoc", "mythic", "merlin", "brc4", "nighthawk", "unknown", notes="last 3 = status: planned", ), "toolchain.c2.beacon_interval_ms": _num(min_val=0, notes="median IAT in milliseconds"), "toolchain.c2.beacon_jitter_cv": _num(min_val=0, notes="coefficient of variation"), "toolchain.c2.sleep_skew": _cat("none", "gaussian", "uniform", "walk", notes="status: partial"), "toolchain.c2.c2_callback_endpoint": _str(notes="url or host:port"), "toolchain.c2.attack_software_id": _str(notes="MITRE Software ID, e.g. 'S0154'"), # ── toolchain.protocol_abuse.* ───────────────────────────────────────── "toolchain.protocol_abuse.dns_exfil_tool": _cat( "iodine", "dnscat2", "custom_high_entropy", "none", notes="status: planned", ), "toolchain.protocol_abuse.smb_dialect": _cat( "SMB1", "SMB2.0.2", "SMB2.1", "SMB3.0", "SMB3.0.2", "SMB3.1.1", notes="status: planned", ), "toolchain.protocol_abuse.kerberos_etype_offer": _hash(notes="status: planned — hash of supported etypes"), "toolchain.protocol_abuse.ldap_bind_pattern": _cat( "simple", "sasl_gssapi", "ntlm", "ntlmssp_v1", "responder_like", notes="status: partial", ), "toolchain.protocol_abuse.responder_signature": _str( notes="bool + variant; convention: 'false' or 'true:llmnr', 'true:nbtns', etc.; status: planned", ), "toolchain.protocol_abuse.mitm6_signature": _bool(notes="status: planned"), # ── toolchain.payload.* ──────────────────────────────────────────────── "toolchain.payload.payload_simhash": _hash(notes="64-bit SimHash, hex string"), "toolchain.payload.payload_entropy_class": _cat("low", "medium", "high", "packed", notes="status: planned"), "toolchain.payload.loader_family": _cat("donut", "sgn", "pe2sh", "nimcrypt", "unknown", notes="status: planned"), } def is_known(primitive: str) -> bool: return primitive in PRIMITIVE_REGISTRY def get(primitive: str) -> ValueTypeSpec: """Return the value-type spec for *primitive*; raise KeyError if unknown.""" return PRIMITIVE_REGISTRY[primitive]