Files
BEHAVE/BEHAVE-TEXT/behave_text/spec/primitives.py
anti 7f585027b3 docs: per-package READMEs with full primitive catalog and registry notes backfill
- core/README.md: envelope contract, field table, PII discipline, quickstart
- BEHAVE-SHELL/README.md: all 76 primitives documented across 9 categories;
  TLS/SSH/C2 fingerprint sections with [DRAFT — verify] markers on uncertain entries
- BEHAVE-TEXT/README.md: all 35 primitives across 6 categories; Rutify calibration
  notes inline; content.* layer marked EXPERIMENTAL throughout
- primitives.py (SHELL): backfilled notes for all previously undocumented primitives
- primitives.py (TEXT): backfilled notes for capitalization_habit, emoji_*, length,
  linebreak_style, sentence_complexity_class, question_formation_style,
  imperative_style, response_latency_class, message_burst_rate

License: CC-BY-SA-4.0 (prose) / GPL-3.0-or-later (code)
2026-05-10 08:33:02 -04:00

354 lines
22 KiB
Python

# SPDX-License-Identifier: GPL-3.0-or-later
"""BEHAVE-TEXT primitive registry.
Source-of-truth for what `Observation.primitive` may be in the text/messaging
domain and what `Observation.value` must look like. Mirrors every row in the
primitive tables of `scratchpad.md`.
PII discipline notice (carried over from behave-core's envelope module):
TEXT-domain observations carry CATEGORICAL LABELS, AGGREGATE RATES, and
HASHES of distributions. Sensors operating on Telegram/messaging text MUST
NOT emit raw message content into BEHAVE-TEXT observations — only derived
features. The `evidence_ref` field points to the underlying message store
held elsewhere; never into the message body itself.
This is a tighter constraint than BEHAVE-SHELL's because the source signal
IS text content. Sensors must hash/aggregate before emitting.
Adding a new primitive is a deliberate registry edit. Drift between this file
and `scratchpad.md` is a bug; v0 keeps the registry hand-written so PR review
catches drift, v0.x may auto-extract from the markdown if drift becomes a
maintenance issue.
Status flags appear in the `notes` field. `EXPERIMENTAL` marks primitives in
the `content.*` layer whose detector implementations are likely brittle; an
attribution engine may choose to weight those at zero until field-validated.
"""
from __future__ import annotations
from enum import Enum
from typing import Any, Optional
from pydantic import BaseModel, Field
class ValueKind(str, Enum):
"""Discriminator for the shape an `Observation.value` must take."""
CATEGORICAL = "categorical"
NUMERIC = "numeric"
HASH = "hash"
ARRAY = "array"
FREE_STRING = "free_string"
BOOL = "bool"
class ValueTypeSpec(BaseModel):
"""Per-primitive value-type spec (mirrors BEHAVE-SHELL's shape)."""
kind: ValueKind
allowed: Optional[list[str]] = Field(default=None)
min_val: Optional[float] = Field(default=None)
max_val: Optional[float] = Field(default=None)
array_of: Optional[ValueKind] = Field(default=None)
notes: Optional[str] = Field(default=None)
def validate_value(self, value: Any) -> None:
if self.kind is ValueKind.CATEGORICAL:
if not isinstance(value, str):
raise ValueError(f"expected categorical string, got {type(value).__name__}")
if self.allowed is not None and value not in self.allowed:
raise ValueError(f"value {value!r} not in allowed set {self.allowed!r}")
elif self.kind is ValueKind.NUMERIC:
if isinstance(value, bool) or not isinstance(value, (int, float)):
raise ValueError(f"expected numeric, got {type(value).__name__}")
if self.min_val is not None and value < self.min_val:
raise ValueError(f"value {value} below min_val {self.min_val}")
if self.max_val is not None and value > self.max_val:
raise ValueError(f"value {value} above max_val {self.max_val}")
elif self.kind is ValueKind.HASH:
if not isinstance(value, str) or not value:
raise ValueError("expected non-empty hash string")
elif self.kind is ValueKind.FREE_STRING:
if not isinstance(value, str):
raise ValueError(f"expected string, got {type(value).__name__}")
elif self.kind is ValueKind.BOOL:
if not isinstance(value, bool):
raise ValueError(f"expected bool, got {type(value).__name__}")
elif self.kind is ValueKind.ARRAY:
if not isinstance(value, list):
raise ValueError(f"expected array, got {type(value).__name__}")
if self.array_of is None:
return
element_spec = ValueTypeSpec(kind=self.array_of)
for i, element in enumerate(value):
try:
element_spec.validate_value(element)
except ValueError as exc:
raise ValueError(f"array element [{i}]: {exc}") from None
# ─── Convenience constructors ───────────────────────────────────────────────
def _cat(*allowed: str, notes: Optional[str] = None) -> ValueTypeSpec:
return ValueTypeSpec(kind=ValueKind.CATEGORICAL, allowed=list(allowed), notes=notes)
def _num(min_val: Optional[float] = None, max_val: Optional[float] = None, notes: Optional[str] = None) -> ValueTypeSpec:
return ValueTypeSpec(kind=ValueKind.NUMERIC, min_val=min_val, max_val=max_val, notes=notes)
def _hash(notes: Optional[str] = None) -> ValueTypeSpec:
return ValueTypeSpec(kind=ValueKind.HASH, notes=notes)
def _str(notes: Optional[str] = None) -> ValueTypeSpec:
return ValueTypeSpec(kind=ValueKind.FREE_STRING, notes=notes)
def _array(of: ValueKind, notes: Optional[str] = None) -> ValueTypeSpec:
return ValueTypeSpec(kind=ValueKind.ARRAY, array_of=of, notes=notes)
# ─── The registry ───────────────────────────────────────────────────────────
#
# 28 primitives across 4 layers. Mirrors scratchpad.md row-for-row.
PRIMITIVE_REGISTRY: dict[str, ValueTypeSpec] = {
# ── stylometric.* (motor analog — 8) ──────────────────────────────────
"stylometric.punctuation_style": _hash(notes="canonical punctuation-pattern fingerprint"),
"stylometric.capitalization_habit": _cat(
"lowercase", "proper", "random_caps", "mixed_i",
notes="Dominant capitalization rule the author applies. lowercase=no capitals except "
"after sentence breaks. proper=standard title/sentence case. random_caps=no "
"consistent rule. mixed_i=author consistently writes 'i' in lowercase even "
"mid-sentence — common in Spanish chat where 'I' is not a standalone word "
"but the habit transfers from the native language's lowercase 'yo'.",
),
"stylometric.emoji_usage": _cat(
"none", "occasional", "frequent", "exclusive",
notes="Rate of emoji use per message. exclusive=messages rarely contain text without "
"emoji. This captures tone and register — heavy emoji use in a criminal-market "
"context is a distinct style trait worth preserving.",
),
"stylometric.emoji_placement": _cat(
"pre_punctuation", "post_punctuation", "no_punctuation", "mixed",
notes="Where emojis appear relative to sentence-ending punctuation. "
"pre_punctuation='Hola 😊.' post_punctuation='Hola. 😊' "
"Individual authors are strikingly consistent in this micro-habit.",
),
"stylometric.message_length_class": _cat(
"short", "medium", "long", "paragraph",
notes="Median message length bucket: short=1-5 words, medium=6-20 words, "
"long=21-50 words, paragraph=>50 words. See also "
"stylometric.message_length_variance_class for the distribution shape.",
),
"stylometric.message_length_variance_class": _cat(
"tight", "varied", "bimodal",
notes="Coefficient of variation of per-message word counts. Captures "
"DISTRIBUTION SHAPE that message_length_class collapses by "
"emitting only the median bucket. Two authors can share the same "
"median length but have wildly different variance: `tight` (CV<0.5) "
"= consistent (always 1-3 words), `varied` (0.5<=CV<1.5) = normal "
"mix, `bimodal` (CV>=1.5) = long-tail (mostly short with occasional "
"rants). Added in v0.2 after Rutify calibration found median-only "
"bucketing discarded most of the per-author variance signal.",
),
"stylometric.linebreak_style": _cat(
"single_thought", "multi_line", "wall_of_text",
notes="Whether the author sends one complete thought per message or breaks a single "
"statement into multiple sequential short messages. multi_line=habitual "
"message-burst style (sends 3-5 short messages in rapid succession instead "
"of one composed message). wall_of_text=rarely uses line breaks, sends dense "
"blocks. Captures a stylistic rhythm that is hard to consciously alter.",
),
"stylometric.typo_signature": _hash(notes="sha256 of canonical persistent-typo set"),
"stylometric.function_word_distribution_top50": _hash(
notes="64-bit simhash over the 50-most-common Spanish function-word frequency "
"vector. Mosteller-Wallace gold standard for English long-form authorship; "
"EMPIRICALLY DOMAIN-FLAWED for Spanish chat-domain — calibrated 2026-05-02 "
"against the Rutify corpus showed within-author and cross-author Hamming "
"distance distributions overlap (within median 8 bits, cross median 10 "
"bits) so this primitive ALONE cannot discriminate authors in chat-style "
"short-message corpora. Engines should weight it low until paired with "
"the larger top-200 variant or composited with character n-gram and "
"distinctive-vocabulary signatures (see siblings below). Kept in v0 for "
"calibration grids and documentary purposes.",
),
"stylometric.function_word_distribution_top200": _hash(
notes="64-bit simhash over the 200-most-common Spanish function-word frequency "
"vector. The wider list reaches into the long tail (rare-but-individual "
"function words like `tampoco`, `aunque`, `mientras`) that carry more "
"discriminating signal in short-message chat domains. NOT YET EMITTED by "
"the v0 prototype extractor; populated when v0.2 calibration is done.",
),
"stylometric.character_ngram_simhash": _hash(
notes="64-bit simhash over a frequency vector of character n-grams (default "
"n=3) from the author's lowercased text corpus. ORTHOGONAL to "
"function-word distributions: captures punctuation tics, accent-"
"stripping habits, typo patterns, and idiom-fragment fingerprints "
"that survive paraphrase. Lowercases input so that capitalization "
"habits — already captured by stylometric.capitalization_habit — "
"do not double-count. Accents PRESERVED because accent-stripping is "
"itself a stylistic tic worth catching. Source label declares n size "
"(e.g. `#char3gram`, `#char4gram`).",
),
"stylometric.distinctive_vocabulary_signature": _hash(
notes="64-bit simhash over a TF-IDF-weighted top-K rare-word vector. "
"COMPLEMENTARY to function-word distributions: where function_word_* "
"captures common-word *style*, this captures the author's distinctive "
"*lexicon* (the words this person uses that other authors in the same "
"corpus do NOT). Strong against context-shift because rare words are "
"where authorial choice lives. Requires the chat corpus for IDF "
"computation, performed once per extraction. Source label declares the "
"top-K size and corpus tag (e.g. `#tfidf-top50`).",
),
# ── lexical.* (cognitive analog — 8) ──────────────────────────────────
"lexical.vocabulary_richness": _num(
min_val=0.0, max_val=1.0,
notes="Moving-Average Type-Token Ratio (MATTR) over a sliding window "
"(default 50 tokens). Volume-independent: each window contributes "
"its own unique/total ratio, the primitive's value is the mean. "
"Avoids the standard TTR bias where larger corpora mechanically "
"score lower. Source label declares the window size.",
),
"lexical.slang_density": _num(min_val=0.0, max_val=1.0,
notes="rate per message; locale-tuned slang corpus"),
"lexical.code_switching_rate": _num(min_val=0.0, max_val=1.0,
notes="switches per N tokens; Solorio & Liu metric"),
"lexical.code_switching_matrix_language": _str(notes="BCP-47 of dominant language"),
"lexical.code_switching_embedded_languages": _array(ValueKind.FREE_STRING,
notes="BCP-47 list of non-matrix languages observed"),
"lexical.sentence_complexity_class": _cat(
"simple", "compound", "complex",
notes="Dominant clause structure. simple=single-clause messages (no conjunctions "
"or subordination). compound=two independent clauses joined by coordinating "
"conjunctions (pero, y, o, ni). complex=dependent clauses and subordination "
"(aunque, porque, cuando, que + verb). Reflects education level and "
"cognitive investment in message composition.",
),
"lexical.question_formation_style": _cat(
"punctuation_only", "lexical", "formal",
notes="How questions are formed. punctuation_only=question mark appended without "
"interrogative words ('¿Cuánto?' or 'Mañana?') — very common in Spanish "
"chat. lexical=explicit interrogatives (¿qué, cómo, cuándo, dónde). "
"formal=inverted subject-verb order or formal register ('¿Podría usted...'). "
"Captures register and education level.",
),
"lexical.imperative_style": _cat(
"informal_directive", "formal_directive", "polite",
notes="How commands and requests are framed. informal_directive=tú/vos imperative "
"('dame', 'hazlo', 'mándame'). formal_directive=usted imperative "
"('hágame el favor', 'envíeme'). polite=conditional or modal softening "
"('¿podría...?', 'me gustaría...'). Stable per-author trait in criminal "
"market contexts where hierarchical and peer relationships are expressed "
"through register choice.",
),
# ── temporal_evolution.* (lifecycle / change-over-time — 1) ───────────
"temporal_evolution.lifecycle_phase": _cat(
"arrival_burst", "stable_member", "fluctuating_member",
"inflection_member", "declining_member", "unknown",
notes="Auto-classified lifecycle stage derived from windowed within-"
"corpus analysis. arrival_burst: tenure < 24hr with first-window "
"volume dominating later windows and high inter-window drift "
"(empirically validated 2026-05-03 against OxPayload's first 12 "
"hours on Rutify). stable_member: low drift between consecutive "
"windows across the whole tenure. fluctuating_member (added v0.3): "
"tenure ≥ 24hr with median drift in [stable_max, inflection_min) "
"and no single window crossing inflection_min — established noisy "
"regulars who don't fit clean stable/inflection classes (e.g. "
"labelled admin lamarabitch, formerly classified unknown). "
"inflection_member: long-tenure actor whose drift spikes in at "
"least one window-pair (a real behavioral shift mid-corpus). "
"declining_member: monotonically decreasing per-window message "
"counts. unknown: insufficient windowed data for classification. "
"Window size adapts to tenure: <24hr → 2h windows, <7d → 12h, "
"<30d → 1d, otherwise 7d.",
),
# ── network.* (governance/role-shape signals — 2, added v0.3) ─────────
"network.is_likely_bot": _cat(
"likely_bot", "not_bot", "unknown",
notes="Heuristic bot detector composited from existing primitives. "
"Classifies as likely_bot when conversation_initiation_rate ≥ 0.95 "
"AND attention_pattern = broadcast AND vocabulary_richness < 0.65. "
"Empirically validated 2026-05-03 against the tdl-labeled Rutify "
"bot SangMata_beta_bot (correctly caught) vs 11 high-volume humans "
"in the same corpus (none false-positive). NOT a verdict — engines "
"should treat as a candidate signal, especially since low-volume "
"bots (e.g. QuotLyBot with 9 messages) sit below the fingerprint "
"threshold and emit nothing here. Source label declares the "
"heuristic version (e.g. #bot-heuristic-v1).",
),
"network.governance_role_signal": _cat(
"admin_pattern", "responder_pattern", "regular", "bot_pattern", "unknown",
notes="Heuristic role-shape composited from interaction primitives + "
"lifecycle_phase. admin_pattern: init_rate ≥ 0.80 AND attn = "
"reciprocal AND non-bot AND not arrival_burst. responder_pattern: "
"init_rate ≤ 0.45 AND attn = reciprocal. bot_pattern: matches "
"network.is_likely_bot likely_bot. regular: everything else above "
"the volume threshold. Empirically caught all 4 high-volume "
"tdl-labeled Rutify admins, sebaImlI as responder, "
"SangMata_beta_bot as bot, OxPayload/bopxcx as regular (their "
"arrival_burst lifecycle overrides the admin-shaped init_rate). "
"NOT a ground-truth admin label — kkaxlazer matches admin_pattern "
"while not formally admin, but the 2026-05-03 reply-graph cohort "
"analysis showed they're operationally embedded in the admin "
"layer (4/4 cohort signal with the top admin), so the heuristic "
"is doing the right thing.",
),
# ── interaction.* (temporal analog — 6) ───────────────────────────────
"interaction.response_latency_class": _cat(
"immediate", "fast", "normal", "slow", "sporadic",
notes="How quickly the actor responds to messages directed at them. "
"immediate=<30s (suggests active monitoring or automated response). "
"fast=30s-5min. normal=5-60min (typical async chat). slow=1-24hr. "
"sporadic=no consistent response latency — appears and disappears.",
),
"interaction.conversation_initiation_rate": _num(min_val=0.0, max_val=1.0,
notes="thread-starting messages / total"),
"interaction.message_burst_rate": _cat(
"single", "occasional", "habitual",
notes="Whether the actor sends multiple messages in rapid sequence within a "
"conversation turn. habitual=almost always bursts (sends 3+ messages "
"before any reply). single=almost always one message per turn. Tied to "
"stylometric.linebreak_style multi_line.",
),
"interaction.active_hours_class": _str(notes="UTC active-hours window summary"),
"interaction.session_duration_class": _cat("short", "medium", "long", "marathon",
notes="REUSED enum from BEHAVE-SHELL temporal.session_duration"),
"interaction.attention_pattern": _cat("broadcast", "focused", "reciprocal",
notes="from reply-graph centrality"),
# ── content.* (operational analog — 6, EXPERIMENTAL) ──────────────────
"content.role_signal": _cat("admin", "seller", "buyer", "lurker", "newbie",
notes="EXPERIMENTAL — locale-tuned role-vocabulary classifier; "
"may be moved to a separate IOC/keyword-detection layer "
"once tested against the Rutify corpus"),
"content.transactional_language": _num(min_val=0.0, max_val=1.0,
notes="EXPERIMENTAL — rate of transactional terms; "
"locale-specific, brittle to vocabulary drift"),
"content.opsec_awareness": _num(min_val=0.0, max_val=1.0,
notes="EXPERIMENTAL — rate of security-conscious phrases; "
"HIGH FALSE-POSITIVE RISK on casual conversation about "
"deleting files / messages"),
"content.targeting_language": _array(ValueKind.FREE_STRING,
notes="EXPERIMENTAL — IOC-shaped target patterns "
"(bank names, government portals, RUT ranges, etc); "
"consider moving to dedicated IOC layer"),
"content.boasting_pattern": _cat("none", "occasional", "frequent",
notes="EXPERIMENTAL — success-claim regex; corpus-dependent"),
"content.conflict_style": _cat("aggressive", "defusing", "appellate",
notes="EXPERIMENTAL — dispute-tone classifier; needs "
"labelled training data"),
}
def is_known(primitive: str) -> bool:
return primitive in PRIMITIVE_REGISTRY
def get(primitive: str) -> ValueTypeSpec:
"""Return the value-type spec for *primitive*; raise KeyError if unknown."""
return PRIMITIVE_REGISTRY[primitive]