# SPDX-License-Identifier: GPL-3.0-or-later """BEHAVE primitive registry. Source-of-truth for what `Observation.primitive` may be and what `Observation.value` must look like. Adding a new primitive is a deliberate registry edit. Sensors are expected to fail loudly if they construct an `Observation` with an unknown primitive — that is by design. PII discipline: the value-type specs here describe the SHAPE of the value, not its content. Sensors are still bound by the rules in `spec/envelope.py`'s module docstring — never put raw keystrokes, command bodies, credentials, or payload bytes into a value, regardless of what shape this registry permits. """ from __future__ import annotations from enum import Enum from typing import Any, Optional from pydantic import BaseModel, Field class ValueKind(str, Enum): """Discriminator for the shape an `Observation.value` must take.""" CATEGORICAL = "categorical" # str, must appear in `allowed` NUMERIC = "numeric" # int | float, optional min/max bounds HASH = "hash" # str — hex / base64 / fingerprint string ARRAY = "array" # list, element shape given by `array_of` FREE_STRING = "free_string" # arbitrary string (e.g. BCP-47 locale, p0f label) BOOL = "bool" # plain boolean class ValueTypeSpec(BaseModel): """Per-primitive value-type spec. Only the fields relevant to ``kind`` should be populated; the rest stay None. Validation in ``Observation`` consults this spec to accept or reject a value for a given primitive. """ kind: ValueKind allowed: Optional[list[str]] = Field( default=None, description="CATEGORICAL only — enum of valid string values" ) min_val: Optional[float] = Field(default=None, description="NUMERIC lower bound (inclusive)") max_val: Optional[float] = Field(default=None, description="NUMERIC upper bound (inclusive)") array_of: Optional[ValueKind] = Field( default=None, description="ARRAY only — kind of each element" ) notes: Optional[str] = Field(default=None, description="Free-form note for registry readers") def validate_value(self, value: Any) -> None: """Raise ``ValueError`` if *value* does not conform to this spec.""" if self.kind is ValueKind.CATEGORICAL: if not isinstance(value, str): raise ValueError(f"expected categorical string, got {type(value).__name__}") if self.allowed is not None and value not in self.allowed: raise ValueError( f"value {value!r} not in allowed set {self.allowed!r}" ) elif self.kind is ValueKind.NUMERIC: if isinstance(value, bool) or not isinstance(value, (int, float)): raise ValueError(f"expected numeric, got {type(value).__name__}") if self.min_val is not None and value < self.min_val: raise ValueError(f"value {value} below min_val {self.min_val}") if self.max_val is not None and value > self.max_val: raise ValueError(f"value {value} above max_val {self.max_val}") elif self.kind is ValueKind.HASH: if not isinstance(value, str) or not value: raise ValueError("expected non-empty hash string") elif self.kind is ValueKind.FREE_STRING: if not isinstance(value, str): raise ValueError(f"expected string, got {type(value).__name__}") elif self.kind is ValueKind.BOOL: if not isinstance(value, bool): raise ValueError(f"expected bool, got {type(value).__name__}") elif self.kind is ValueKind.ARRAY: if not isinstance(value, list): raise ValueError(f"expected array, got {type(value).__name__}") if self.array_of is None: return element_spec = ValueTypeSpec(kind=self.array_of) for i, element in enumerate(value): try: element_spec.validate_value(element) except ValueError as exc: raise ValueError(f"array element [{i}]: {exc}") from None # ─── Convenience constructors (keep the registry table readable) ──────────── def _cat(*allowed: str, notes: Optional[str] = None) -> ValueTypeSpec: return ValueTypeSpec(kind=ValueKind.CATEGORICAL, allowed=list(allowed), notes=notes) def _num(min_val: Optional[float] = None, max_val: Optional[float] = None, notes: Optional[str] = None) -> ValueTypeSpec: return ValueTypeSpec(kind=ValueKind.NUMERIC, min_val=min_val, max_val=max_val, notes=notes) def _hash(notes: Optional[str] = None) -> ValueTypeSpec: return ValueTypeSpec(kind=ValueKind.HASH, notes=notes) def _str(notes: Optional[str] = None) -> ValueTypeSpec: return ValueTypeSpec(kind=ValueKind.FREE_STRING, notes=notes) def _bool(notes: Optional[str] = None) -> ValueTypeSpec: return ValueTypeSpec(kind=ValueKind.BOOL, notes=notes) def _array(of: ValueKind, notes: Optional[str] = None) -> ValueTypeSpec: return ValueTypeSpec(kind=ValueKind.ARRAY, array_of=of, notes=notes) # ─── The registry ─────────────────────────────────────────────────────────── PRIMITIVE_REGISTRY: dict[str, ValueTypeSpec] = { # ── motor.* ──────────────────────────────────────────────────────────── # Motor primitives capture the physical mechanics of keyboard interaction — # rhythm, precision, and habitual movements that are hard to fake and stable # across sessions even when operators change tools or objectives. "motor.keystroke_cadence": _cat( "steady", "bursty", "hunt_and_peck", "machine", notes="Rhythm of raw key input across the session. steady=metronomic rate " "matching a confident typist. bursty=fast bursts separated by thinking " "pauses. hunt_and_peck=search-first-then-type characteristic of unfamiliar " "keyboard layout or low typing skill. machine=mechanically regular cadence " "suggesting scripted or pasted input rather than live typing.", ), "motor.motor_stability": _cat( "steady", "variable", "tremor", notes="Consistency of individual key hold and flight times (dwell/flight). " "steady=low variance, typical of a confident touch-typist. variable=high " "variance, common under cognitive load or on an unfamiliar keyboard. " "tremor=rhythmic instability distinct from cognitive-load variance — may " "indicate physical condition or a non-human input device.", ), "motor.error_correction": _cat( "immediate", "deferred", "absent", "route_around", notes="How the operator corrects typing mistakes. immediate=backspace within ~1s " "of the error (automatic self-monitoring, muscle memory). deferred=correction " "after pausing to read output. absent=no correction — operator proceeds " "despite errors, typical of scripts or operators who know the shell will " "fail loudly. route_around=operator avoids retyping by using history recall " "or rewriting the command differently.", ), "motor.command_chunking": _cat( "fluent", "fragmented", "single_command", notes="Whether commands are typed in a single continuous flow or as fragments. " "fluent=typed in one pass from memory with no mid-command pauses. " "fragmented=typed in chunks with mid-command pauses — operator is composing " "while typing, common when adapting a remembered skeleton to the current " "context. single_command=operator runs exactly one complete command at a " "time and never constructs pipelines inline.", ), "motor.paste_burst_rate": _cat( "none", "occasional", "habitual", notes="Frequency of large clipboard-paste events relative to typed input. " "Distinguishes an operator driving a terminal interactively from a script " "feeding one. habitual=operator primarily works by pasting pre-prepared " "command blocks; none=entirely typed.", ), "motor.input_modality": _cat( "typed", "pasted", "mixed", notes="Dominant input modality across the session — first-class promotion of " "the paste-vs-type axis. typed=operator types commands character by " "character. pasted=operator pastes pre-prepared blocks. mixed=substantial " "use of both.", ), # motor.shell_mastery.* "motor.shell_mastery.tab_completion": _cat( "none", "occasional", "habitual", notes="Tab key completion usage across the session. habitual=operator relies on " "it constantly (inferred from the latency pattern: short pause then rapid " "continuation after a partial path or command). none=operator types full " "paths and commands without completion. Strong indicator of shell familiarity.", ), "motor.shell_mastery.shortcut_usage": _cat( "none", "moderate", "heavy", notes="Use of shell keyboard shortcuts (Ctrl+R for history search, Ctrl+A/E for " "line navigation, Ctrl+L for clear, Alt+. for last argument, etc.). Heavy " "usage indicates deep shell muscle memory, reliably stable across sessions.", ), "motor.shell_mastery.pipe_chaining_depth": _cat( "shallow", "moderate", "deep", notes="Maximum depth of pipeline chains observed (cmd | cmd | cmd...). shallow=0-1 " "pipes, moderate=2-3, deep=4+. Reflects preference for composing Unix tools " "rather than running one-off commands. Correlates with cognitive.tool_vocabulary.", ), # ── cognitive.* ──────────────────────────────────────────────────────── # Cognitive primitives capture how the operator thinks and makes decisions — # their planning style, how they respond to uncertainty, and signs that they # are human vs. automated. "cognitive.cognitive_load": _cat( "low", "medium", "high", notes="Inferred mental workload derived from timing patterns, error rate, and " "inter-command variance. high=long pauses before and after commands, " "frequent error-retry cycles, fragmented command chunking. Collapses " "multiple temporal and motor signals into a holistic load estimate. " "Useful as a composite feature for downstream attribution rather than " "a standalone signal.", ), "cognitive.exploration_style": _cat( "methodical", "chaotic", "targeted", notes="How the operator navigates an unfamiliar environment. methodical=systematic " "enumeration (ls→cat→id→uname in a logical sequence). chaotic=non-sequential " "jumps between unrelated commands with no visible thread. targeted=operator " "knows exactly what they want and goes straight for it without exploring.", ), "cognitive.planning_depth": _cat( "deep", "shallow", "reactive", notes="Whether the operator works from a pre-formed plan. deep=commands follow a " "visible logical sequence (recon→pivot→exfil) with little backtracking. " "shallow=opportunistic — follows each output where it leads. reactive=operator " "responds only to errors or surprises rather than driving toward an objective.", ), "cognitive.tool_vocabulary": _cat( "narrow", "moderate", "broad", notes="Breadth of distinct tools and commands used across the session. narrow=operator " "relies on a small fixed toolset (e.g. only curl, grep, ls). broad=operator " "reaches for the best tool for each subtask, suggesting deep familiarity with " "the Unix ecosystem or the target environment.", ), "cognitive.inter_command_latency_class": _cat( "instant", "typing_speed", "deliberate", "llm_lightweight", "llm_heavyweight", "long", notes="llm_lightweight = 2-8s (orchestrated agents w/ small models or terse " "prompts); llm_heavyweight = 8-30s (reasoning-class agents in tool " "loops with text generation between calls); long = >30s (likely " "human-supervised LLM workflow). The two LLM bands are the v0.2 " "split of the original llm_roundtrip 2-8s band, which conflated " "lightweight and reasoning-class operators.", ), "cognitive.inter_command_consistency": _cat( "metronomic", "variable", "bimodal", notes="Dispersion (CV) of inter-command pauses; metronomic = LLM-pure, " "variable = human, bimodal = LLM-assisted human (LLM-paced bursts + " "human-thinking gaps). v0.1 uses CV thresholds; true bimodal " "detection (Hartigan dip / two-peak detection) is v0.2.", ), "cognitive.command_branch_diversity": _cat( "linear_playbook", "adaptive_branching", "unknown", notes="Content-based (not timing-based) discriminator between scripted " "playbook execution and adaptive branching. Computed from the " "set of first-token binaries in the session: low repetition " "(unique/total ratio near 1) = linear_playbook (each step a " "different canonical recon command). High repetition (multiple " "invocations of the same tool with different args) = adaptive_" "branching (operator iterating on a tool to follow up on a " "finding). Empirically (CLAUDE-FF vs CLAUDE-CL on 2026-05-02): " "fire-and-forget runs 10 distinct tools, closed-loop runs 5-6 " "tools with curl repeated as the operator chases a thread.", ), "cognitive.feedback_loop_engagement": _cat( "closed_loop", "fire_and_forget", "unknown", notes="Whether the operator's pace correlates with the volume of output " "they observed before issuing the next command. closed_loop = " "positive Pearson r between preceding output bytes and subsequent " "pause (pause grows with output to read/ingest). fire_and_forget = " "no correlation (operator paces independently of output, e.g. " "scripted recon, prerecorded playbook). unknown = insufficient " "samples to compute. CUTS ACROSS the LLM/human axis: humans reading " "real output are closed_loop, scripted humans and fire-and-forget " "LLM agents are fire_and_forget, closed-loop LLM agents (true plan-" "execute-observe) are closed_loop. Replaces the v0.1 " "output_pause_correlation primitive — same underlying measurement, " "more honest framing.", ), # cognitive.error_resilience.* "cognitive.error_resilience.retry_tactic": _cat( "rerun", "modify", "switch", "abort", notes="What the operator does when a command fails. rerun=identical retry with " "no changes (hoping transient error clears). modify=adjusts the command " "before retrying (flags, paths, arguments). switch=abandons the tool and " "tries a different one for the same goal. abort=gives up on that objective " "and moves on.", ), "cognitive.error_resilience.frustration_typing": _cat( "low", "moderate", "high", notes="Elevated typing speed or error rate immediately after a command failure, " "indicating an emotional response to the setback. high=sharp speed spike " "and error burst post-failure. A behavioral tell that separates emotionally " "reactive humans from scripted operators or composed professionals.", ), "cognitive.error_resilience.fallback_to_man": _cat( "absent", "present", notes="Whether the operator invokes man, --help, or -h when stuck. present is a " "tell for unfamiliarity with the specific tool in use — an operator who " "knows their tools cold rarely needs to. Absent in scripted runs.", ), # ── temporal.* ───────────────────────────────────────────────────────── # Temporal primitives characterize WHEN and HOW LONG an operator works. # Stable across sessions; hard to fake consistently over a campaign. "temporal.session_timing": _cat( "diurnal", "nocturnal", "irregular", notes="Hour-of-day distribution of the operator's activity. diurnal=activity " "peaks align with local business hours (09:00-18:00). nocturnal=peaks in " "local night hours (22:00-06:00). irregular=no discernible daily pattern. " "The local timezone must be established separately (see cultural.*) to " "interpret diurnal/nocturnal meaningfully.", ), "temporal.session_duration": _cat( "short", "medium", "long", "marathon", notes="Typical duration of a single continuous session. short=<15min, " "medium=15-90min, long=90min-4hr, marathon=>4hr. Stable individual " "characteristic — some operators always work in short sprints, others " "in long unbroken stretches.", ), "temporal.escalation_pattern": _cat( "sustained", "erratic", "bursty", notes="How activity intensity changes across a session. sustained=constant " "command rate throughout. erratic=unpredictable spikes and lulls. " "bursty=concentrated activity followed by extended quiet — common when " "an operator waits for a long-running process before continuing.", ), "temporal.persistence": _cat( "hit_and_run", "return_visitor", "resident", notes="Cross-session return behavior. hit_and_run=one or very few sessions then " "disappears. return_visitor=returns periodically (e.g. weekly maintenance). " "resident=near-continuous presence, behaves as if the compromised host is " "a persistent workstation.", ), # temporal.lifecycle_markers.* "temporal.lifecycle_markers.landing_ritual": _cat( "present", "absent", notes="Whether the operator runs a recognizable sequence of commands at session " "start (e.g. whoami → id → uname -a → hostname → ip addr). present=a " "fingerprinted landing ritual is detected, suggesting established habit or " "a pre-written checklist. absent=operator jumps straight to objective work.", ), "temporal.lifecycle_markers.exit_behavior": _cat( "graceful", "abrupt", "cleanup", notes="How the session ends. graceful=explicit logout or exit command. " "abrupt=connection drops without cleanup (killed, network failure, or " "scripted timeout). cleanup=operator deletes logs, tools, or temp files " "before exiting — the strongest opsec signal in this category.", ), "temporal.lifecycle_markers.idle_periodicity": _cat( "random", "periodic", notes="Whether intra-session pauses (idle gaps >30s) occur at statistically " "regular intervals or at random. periodic=heartbeat-like idle pattern — " "may indicate an LLM polling loop, an automated keepalive, or a human " "following a timed workflow. random=human thinking pauses with no " "detectable rhythm.", ), # ── operational.* ────────────────────────────────────────────────────── # Operational primitives describe WHAT the operator is trying to do and HOW # carefully they're hiding it. These are coarser inferences from command patterns # rather than direct measurements. "operational.opsec_discipline": _cat( "careful", "careless", "learning", notes="How carefully the operator minimizes their forensic footprint. " "careful=history disabled (HISTFILE=/dev/null), tools removed after use, " "proxy/VPN confirmed, log entries tampered. careless=no precautions — " "history on, tools left in /tmp, no timestamp cover. learning=inconsistent " "and improving across sessions, characteristic of an operator developing " "their craft mid-campaign.", ), "operational.cleanup_behavior": _cat( "thorough", "partial", "none", notes="What the operator does with artifacts (uploaded tools, compiled binaries, " "temp files) at session end. thorough=removes everything explicitly, " "including bash history. partial=removes some artifacts but misses others " "(common). none=leaves all artifacts — operator either trusts the implant " "to cover or does not expect forensic review.", ), "operational.objective": _cat( "recon", "exfil", "persistence", "lateral", "destructive", notes="Inferred mission objective from command-pattern analysis. recon=enumeration " "and data collection without exfiltration. exfil=active data transfer out " "of scope. persistence=installing mechanisms to survive reboot or session " "end (cron, systemd, ssh key). lateral=pivoting to adjacent hosts. " "destructive=wipe, encrypt, or sabotage commands.", ), "operational.multi_actor_indicators": _cat( "solo", "handoff_detected", "team_coordinated", notes="Whether the session shows signs of more than one person operating. " "handoff_detected=a detectable style break mid-session (motor cadence, " "vocabulary, or latency class changes sharply at a point in time). " "team_coordinated=multiple style signatures interleaved or simultaneous " "activity from the same account across sessions.", ), # ── environmental.* ──────────────────────────────────────────────────── # Environmental primitives describe the physical and software context the # operator works from. Stable per-campaign; often reveals national origin # or infrastructure choices. "environmental.keyboard_layout": _cat( "qwerty", "azerty", "qwertz", "other", notes="Inferred keyboard layout from characteristic key-sequence errors. An " "AZERTY-trained typist on a QWERTY keyboard makes specific substitutions " "(q↔a, z↔w, m→,) that are statistically distinguishable from random " "errors. Reliable when error volume is sufficient (typically >50 errors " "in the session).", ), "environmental.locale": _str( notes="BCP-47 tag (e.g. 'en-US', 'pt-BR'); free string by deliberate choice — " "locale is not a closed enum. Inferred from keyboard layout, cultural " "timing patterns, and command-line character encoding artifacts.", ), "environmental.numpad_usage": _cat( "detected", "not_detected", notes="Whether the operator uses a numeric keypad for digit entry, inferred from " "keycode patterns. detected signals a desktop keyboard rather than a laptop, " "which narrows the physical environment.", ), "environmental.terminal_multiplexer": _cat( "none", "tmux", "screen", notes="Presence of tmux or screen, inferred from keybinding escape sequences " "(Ctrl+B or Ctrl+A prefixes) and window-switching patterns. Multiplexer use " "suggests a persistent, organized working style.", ), "environmental.shell_type": _cat( "bash", "zsh", "fish", "cmd.exe", "powershell", notes="Shell environment, inferred from syntax patterns (array syntax, string " "quoting style, builtin names). powershell and cmd.exe immediately flag a " "Windows-native operator, which constraints the likely toolchain.", ), # ── cultural.* ───────────────────────────────────────────────────────── # Cultural primitives exploit the fact that human work patterns are shaped by # local time, religion, and social convention. These signals are hard to sustain # as deception across a long campaign. "cultural.meal_break_gaps": _cat( "none_detected", "morning", "midday", "evening", "late_night", notes="Whether activity gaps align with regional meal times. morning=09:00-10:00 " "local, midday=12:00-14:00, evening=19:00-21:00, late_night=00:00-02:00. " "Absent if the operator works through typical meal windows. Requires " "environmental.locale or a known timezone to interpret.", ), "cultural.periodic_micro_pauses": _cat( "none_detected", "regular_intervals_detected", notes="Short, rhythmic pauses of 5-15 minutes recurring at consistent intervals " "within a session. May correspond to prayer times (Salah — 5 daily, " "spaced ~2-3hr in active hours), smoke breaks, or other cultural micro-" "rituals. regular_intervals_detected means the null hypothesis of random " "pauses is rejected at p<0.05.", ), "cultural.dst_behavior": _cat( "shifts_with_dst", "anchored_to_utc", "unknown", notes="Whether the operator's active-hours window shifts by 1 hour at daylight " "saving transitions. shifts_with_dst=schedule follows local civil time " "(the operator lives there). anchored_to_utc=schedule is clock-fixed, " "suggesting automated infrastructure or an operator who deliberately anchors " "to UTC to defeat this analysis.", ), "cultural.weekend_cadence": _cat( "fri_sat", "sat_sun", "no_weekend", "irregular", notes="Which two-day block the operator treats as a weekend (low-activity days). " "fri_sat=Middle Eastern / Israeli weekend pattern. sat_sun=Western / " "East Asian pattern. no_weekend=operator works 7 days at uniform intensity. " "A reliable national-origin signal when observed across multiple weeks.", ), "cultural.holiday_gaps": _cat( "none_detected", "specific_dates_detected", notes="Whether unexplained multi-day inactivity gaps align with known public " "holiday calendars. specific_dates_detected triggers when a gap of >=2 days " "falls within ±1 day of a public holiday in at least one candidate locale. " "Requires a multi-session corpus spanning calendar events.", ), # ── emotional_valence.* ──────────────────────────────────────────────── # Emotional valence primitives infer affective state from TYPING DYNAMICS — # pace, error rate, and aggression in key input. They do NOT read message # content; BEHAVE-SHELL is content-blind. "emotional_valence.valence": _cat( "positive", "neutral", "negative", notes="Overall affective tone inferred from typing dynamics across the session. " "Positive=fluent, low-error, engaged pace. Negative=error-heavy, erratic, " "showing markers of frustration or stress. This is a coarse aggregate; " "see arousal and stress_response for finer-grained breakdown.", ), "emotional_valence.arousal": _cat( "low_calm", "medium_engaged", "high_agitated", notes="How energized or activated the operator appears. low_calm=slow, deliberate " "pace with long inter-command gaps. high_agitated=fast, error-prone bursts " "with short pauses. This dimension is orthogonal to valence: a calm " "professional and a calm automated script are both low_calm.", ), "emotional_valence.stress_response": _cat( "none", "eustress_positive", "distress_negative", notes="Whether detected high arousal reflects positive challenge or negative overload. " "eustress_positive=speed-up with low error rate (operator in the zone, engaged " "problem-solving). distress_negative=speed-up accompanied by rising error rate " "and frustration-venting markers (overloaded, panicking). none=arousal is " "insufficient to classify.", ), "emotional_valence.frustration_venting": _cat( "none", "detected", notes="Detectable outburst signal: a sudden spike in typing speed or rapid-fire " "backspace/delete keys immediately following a string of command failures. " "Distinct from sustained high arousal — this is a transient, failure-triggered " "event. Absent in scripted runs; strong human indicator.", ), # ── toolchain.tls.* ──────────────────────────────────────────────────── # TLS fingerprints identify the client and server stacks by their handshake # parameters. Each tool, library, and OS tends to produce a recognizable # fingerprint even when the payload is encrypted. "toolchain.tls.ja3_client": _hash( notes="MD5 hash of TLS ClientHello parameters: SSLVersion, Ciphers, Extensions, " "EllipticCurves, EllipticCurvePointFormats (Salesforce, 2017). Fingerprints " "the client TLS stack — curl, OpenSSL, Metasploit, Cobalt Strike, and most " "offensive tools each produce a distinct hash. Searchable against public " "databases (e.g. ja3er.com).", ), "toolchain.tls.ja3s_server": _hash( notes="MD5 hash of TLS ServerHello parameters: SSLVersion, Cipher, Extensions. " "Fingerprints the server TLS stack. Useful for identifying C2 servers by " "their TLS response even when IP addresses rotate — the server library " "version (e.g. OpenSSL vs. WolfSSL) is often stable.", ), "toolchain.tls.ja4_client": _hash( notes="JA4 fingerprint (FoxIO, 2023): replaces JA3 with a sortable, " "human-readable format (e.g. t13d1516h2_8daaf6152771_e5627efa2ab1) that " "is more robust to TLS extension order randomization. Encodes TLS version, " "cipher count, extension count, ALPN, cipher hash, and extension hash in " "three underscore-separated fields.", ), "toolchain.tls.ja4s_server": _hash( notes="JA4 server-side fingerprint: encodes the chosen cipher, extension list, " "and ALPN from the ServerHello. More stable than JA3S when the server " "randomizes cipher ordering — JA4S hashes the sorted cipher list. " ), "toolchain.tls.jarm_server": _hash( notes="62-char JARM hash (Salesforce, 2020). Actively probes the server by " "sending 10 specially crafted TLS ClientHellos and hashing the ServerHello " "responses. Fingerprints the server TLS stack at a deeper level than JA3S — " "detects Cobalt Strike, Metasploit, and major C2 frameworks reliably even " "when they use custom certificates.", ), "toolchain.tls.tls_cert_simhash": _hash( notes="SHA-256 hex of the leaf certificate's DER-encoded bytes. Tracks the " "specific certificate in use, not just the stack. Useful for correlating " "C2 infrastructure that reuses self-signed certs across campaigns.", ), # ── toolchain.transport.* ────────────────────────────────────────────── "toolchain.transport.tcp_stack": _str( notes="p0f label for the TCP/IP stack (e.g. 'Linux 5.x', 'Windows 10'). Inferred " "from TCP header field quirks (TTL, window size, options order, DF bit). " "Reveals the OS of the connecting host even before any application-layer " "protocol is seen.", ), "toolchain.transport.h2_akamai_fingerprint": _str( notes="HTTP/2 SETTINGS frame + priority frame + pseudo-header order hash. " "Different HTTP/2 client libraries produce distinct SETTINGS and priority " "combinations (curl vs. Python requests vs. Go net/http). " "status: planned", ), "toolchain.transport.quic_client": _str( notes="QUIC initial packet fingerprint derived from transport parameters and " "connection ID length patterns. Fingerprints the QUIC library in use. " "status: planned", ), # ── toolchain.ssh.* ──────────────────────────────────────────────────── "toolchain.ssh.hassh_client": _hash( notes="MD5 hash of SSH client KEX parameters: kex_algorithms, encryption_algorithms, " "mac_algorithms, compression_algorithms (Salesforce, 2018). Each SSH client " "library (OpenSSH, PuTTY, libssh, Paramiko, Impacket) produces a distinct " "HASSH. Stable across versions within a major release.", ), "toolchain.ssh.hassh_server": _hash( notes="MD5 hash of SSH server KEX parameters (same field set as HASSH client). " "Fingerprints the SSH daemon — useful for identifying honeypots, implants, " "or non-standard SSH servers. status: partial", ), "toolchain.ssh.ssh_client_banner": _str( notes="RFC 4253 protocol version string sent by the SSH client (e.g. " "'SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.6'). Often unmodified even in " "offensive tooling, providing an easy first-pass fingerprint.", ), "toolchain.ssh.kex_algorithm_order": _array( ValueKind.FREE_STRING, notes="Ordered list of key-exchange algorithms offered in the SSH ClientHello " "(e.g. ['curve25519-sha256', 'ecdh-sha2-nistp256', 'diffie-hellman-group14-sha256']). " "Different clients (OpenSSH, PuTTY, Paramiko, Impacket's smbexec) advertise " "distinct KEX orderings, providing a secondary fingerprint beyond HASSH. " ), # ── toolchain.http.* ─────────────────────────────────────────────────── "toolchain.http.user_agent_tool_class": _cat( "nmap_nse", "sqlmap", "nuclei", "masscan", "curl", "metasploit", "ffuf", "gobuster", "feroxbuster", "nikto", "wpscan", "evilwinrm", "impacket", "unknown", notes="Tool classification from User-Agent string and HTTP behavior fingerprint. " "Known offensive tools typically use default User-Agent strings or omit the " "header entirely, making them trivially classifiable. unknown=no match in " "the known-tool list.", ), "toolchain.http.header_order_fingerprint": _str( notes="Hash of the HTTP request header name order. Different HTTP client libraries " "emit headers in distinct sequences (Host first vs. last, Accept-Encoding " "presence, etc.). Fingerprints the underlying HTTP library independently of " "the User-Agent. status: planned", ), "toolchain.http.body_oddities": _array( ValueKind.FREE_STRING, notes="List of anomalous body characteristics (e.g. 'multipart_boundary_static', " "'json_key_order_fixed', 'soap_envelope_namespace_style'). Captures " "tool-specific body serialization tics. status: planned", ), # ── toolchain.c2.* ───────────────────────────────────────────────────── # C2 (Command and Control) primitives characterize the beaconing and callback # behavior of implants. Even encrypted C2 traffic leaves timing and structural # fingerprints. "toolchain.c2.beacon_family": _cat( "cobalt_strike", "sliver", "havoc", "mythic", "merlin", "brc4", "nighthawk", "unknown", notes="C2 framework identified from beacon timing, traffic shape, and protocol " "fingerprints. cobalt_strike, sliver, havoc, mythic=well-characterized " "open-source or widely-used commercial frameworks. merlin, brc4, " "nighthawk=status: planned (less common; less training data).", ), "toolchain.c2.beacon_interval_ms": _num( min_val=0, notes="Median inter-arrival time (IAT) between beacon callbacks, in milliseconds. " "Cobalt Strike default is 60000ms (60s). Operators often lower this for " "interactivity. Very short intervals (<1000ms) suggest an interactive shell " "rather than a true beacon.", ), "toolchain.c2.beacon_jitter_cv": _num( min_val=0, notes="Coefficient of variation (std/mean) of beacon IATs. Higher CV means more " "randomized jitter — a deliberate evasion technique to defeat fixed-interval " "detection. Cobalt Strike's default jitter is 0% (CV≈0); operators who " "understand detection set it to 20-50%.", ), "toolchain.c2.sleep_skew": _cat( "none", "gaussian", "uniform", "walk", notes="Type of jitter applied to beacon sleep intervals. none=fixed interval " "(detectable by timing analysis). gaussian=normally-distributed jitter " "(common in Cobalt Strike with jitter set). uniform=flat random range. " "walk=random-walk drift (each sleep shifts from the previous). " "status: partial", ), "toolchain.c2.c2_callback_endpoint": _str( notes="URL or host:port of the C2 callback endpoint observed in traffic. " "Plain string — do not store post-decryption content here.", ), "toolchain.c2.attack_software_id": _str( notes="MITRE ATT&CK Software ID (e.g. 'S0154' for Cobalt Strike). Provides a " "stable cross-reference to the MITRE knowledge base for attribution reporting.", ), # ── toolchain.protocol_abuse.* ───────────────────────────────────────── # Protocol abuse primitives capture non-standard or offensive use of standard # protocols — DNS tunneling, SMB negotiation quirks, Kerberos downgrade attempts, # and LLMNR/NBNS poisoning tools. "toolchain.protocol_abuse.dns_exfil_tool": _cat( "iodine", "dnscat2", "custom_high_entropy", "none", notes="DNS tunneling tool identified from query patterns. iodine=base32-encoded " "data in subdomains with TYPE NULL queries. dnscat2=TYPE TXT queries with " "specific length/entropy patterns. custom_high_entropy=high-entropy " "subdomains consistent with tunneling but not matching a known tool signature. " "status: planned", ), "toolchain.protocol_abuse.smb_dialect": _cat( "SMB1", "SMB2.0.2", "SMB2.1", "SMB3.0", "SMB3.0.2", "SMB3.1.1", notes="SMB protocol dialect negotiated by the client. SMB1 use in 2024+ is a " "strong indicator of legacy tooling or deliberate downgrade (EternalBlue-era " "exploits require SMB1). SMB3.1.1 with pre-auth integrity check is the " "modern hardened default. status: planned", ), "toolchain.protocol_abuse.kerberos_etype_offer": _hash( notes="Hash of the set of encryption types offered in the Kerberos AS-REQ etype " "list. Clients that offer RC4-HMAC (etype 23) alongside modern etypes are " "candidates for AS-REP roasting or Kerberoasting tooling (Rubeus, Impacket " "GetUserSPNs). The hash captures the exact etype combination without " "storing the cleartext list.", ), "toolchain.protocol_abuse.ldap_bind_pattern": _cat( "simple", "sasl_gssapi", "ntlm", "ntlmssp_v1", "responder_like", notes="LDAP bind mechanism used by the client. simple=cleartext credentials " "(dangerous, immediately suspicious in modern environments). " "sasl_gssapi=Kerberos-backed GSSAPI (normal). ntlm=NTLM challenge-response. " "ntlmssp_v1=downgraded NTLMv1 (Responder target). responder_like=sequence " "of binds matching Responder or similar MITM tools. status: partial", ), "toolchain.protocol_abuse.responder_signature": _str( notes="Boolean + variant string indicating whether Responder (or a compatible tool) " "was detected. Convention: 'false' if absent; 'true:llmnr', 'true:nbtns', " "'true:mdns' for the poisoning protocol detected. Responder poisons LLMNR, " "NBNS, and mDNS broadcasts to capture Net-NTLMv2 hashes. status: planned", ), "toolchain.protocol_abuse.mitm6_signature": _bool( notes="Whether mitm6 (Fox-IT tool) activity is detected. mitm6 abuses IPv6 router " "advertisement messages on predominantly IPv4 networks to force Windows hosts " "to use an attacker-controlled DNS server, enabling credential relay attacks. " "status: planned", ), # ── toolchain.payload.* ──────────────────────────────────────────────── "toolchain.payload.payload_simhash": _hash( notes="64-bit SimHash of the observed payload binary or shellcode. SimHash " "preserves near-duplicate relationships: two payloads that are 90% similar " "will have low Hamming distance (<4 bits difference on a 64-bit hash), " "enabling family clustering even when the operator applies minor obfuscation. " "Stored as a 16-char hex string.", ), "toolchain.payload.payload_entropy_class": _cat( "low", "medium", "high", "packed", notes="Shannon entropy class of the payload bytes. packed=entropy >7.2 bits/byte, " "characteristic of UPX or custom packing, encrypted shellcode, or base64-" "compressed payloads. high=6.5-7.2, typical of unencrypted compiled code. " "low=<5.5, typical of scripts or plaintext. status: planned", ), "toolchain.payload.loader_family": _cat( "donut", "sgn", "pe2sh", "nimcrypt", "unknown", notes="Shellcode/loader family identified from structural signatures. donut=Donut " "framework (TheWover) — converts .NET assemblies and PE files to position-" "independent shellcode with a recognizable header. sgn=Shikata-Ga-Nai encoder " "(Metasploit) — polymorphic XOR encoder with a distinct feedback register " "pattern. pe2sh=PE-to-shellcode conversion. nimcrypt=Nim-based loader with " "AES-encrypted payload. status: planned", ), } def is_known(primitive: str) -> bool: return primitive in PRIMITIVE_REGISTRY def get(primitive: str) -> ValueTypeSpec: """Return the value-type spec for *primitive*; raise KeyError if unknown.""" return PRIMITIVE_REGISTRY[primitive]