# SPDX-License-Identifier: GPL-3.0-or-later """BEHAVE-TEXT primitive registry. Source-of-truth for what `Observation.primitive` may be in the text/messaging domain and what `Observation.value` must look like. Mirrors every row in the primitive tables of `scratchpad.md`. PII discipline notice (carried over from decnet-behave-core's envelope module): TEXT-domain observations carry CATEGORICAL LABELS, AGGREGATE RATES, and HASHES of distributions. Sensors operating on Telegram/messaging text MUST NOT emit raw message content into BEHAVE-TEXT observations — only derived features. The `evidence_ref` field points to the underlying message store held elsewhere; never into the message body itself. This is a tighter constraint than BEHAVE-SHELL's because the source signal IS text content. Sensors must hash/aggregate before emitting. Adding a new primitive is a deliberate registry edit. Drift between this file and `scratchpad.md` is a bug; v0 keeps the registry hand-written so PR review catches drift, v0.x may auto-extract from the markdown if drift becomes a maintenance issue. Status flags appear in the `notes` field. `EXPERIMENTAL` marks primitives in the `content.*` layer whose detector implementations are likely brittle; an attribution engine may choose to weight those at zero until field-validated. """ from __future__ import annotations from enum import Enum from typing import Any, Optional from pydantic import BaseModel, Field class ValueKind(str, Enum): """Discriminator for the shape an `Observation.value` must take.""" CATEGORICAL = "categorical" NUMERIC = "numeric" HASH = "hash" ARRAY = "array" FREE_STRING = "free_string" BOOL = "bool" class ValueTypeSpec(BaseModel): """Per-primitive value-type spec (mirrors BEHAVE-SHELL's shape).""" kind: ValueKind allowed: Optional[list[str]] = Field(default=None) min_val: Optional[float] = Field(default=None) max_val: Optional[float] = Field(default=None) array_of: Optional[ValueKind] = Field(default=None) notes: Optional[str] = Field(default=None) def validate_value(self, value: Any) -> None: if self.kind is ValueKind.CATEGORICAL: if not isinstance(value, str): raise ValueError(f"expected categorical string, got {type(value).__name__}") if self.allowed is not None and value not in self.allowed: raise ValueError(f"value {value!r} not in allowed set {self.allowed!r}") elif self.kind is ValueKind.NUMERIC: if isinstance(value, bool) or not isinstance(value, (int, float)): raise ValueError(f"expected numeric, got {type(value).__name__}") if self.min_val is not None and value < self.min_val: raise ValueError(f"value {value} below min_val {self.min_val}") if self.max_val is not None and value > self.max_val: raise ValueError(f"value {value} above max_val {self.max_val}") elif self.kind is ValueKind.HASH: if not isinstance(value, str) or not value: raise ValueError("expected non-empty hash string") elif self.kind is ValueKind.FREE_STRING: if not isinstance(value, str): raise ValueError(f"expected string, got {type(value).__name__}") elif self.kind is ValueKind.BOOL: if not isinstance(value, bool): raise ValueError(f"expected bool, got {type(value).__name__}") elif self.kind is ValueKind.ARRAY: if not isinstance(value, list): raise ValueError(f"expected array, got {type(value).__name__}") if self.array_of is None: return element_spec = ValueTypeSpec(kind=self.array_of) for i, element in enumerate(value): try: element_spec.validate_value(element) except ValueError as exc: raise ValueError(f"array element [{i}]: {exc}") from None # ─── Convenience constructors ─────────────────────────────────────────────── def _cat(*allowed: str, notes: Optional[str] = None) -> ValueTypeSpec: return ValueTypeSpec(kind=ValueKind.CATEGORICAL, allowed=list(allowed), notes=notes) def _num(min_val: Optional[float] = None, max_val: Optional[float] = None, notes: Optional[str] = None) -> ValueTypeSpec: return ValueTypeSpec(kind=ValueKind.NUMERIC, min_val=min_val, max_val=max_val, notes=notes) def _hash(notes: Optional[str] = None) -> ValueTypeSpec: return ValueTypeSpec(kind=ValueKind.HASH, notes=notes) def _str(notes: Optional[str] = None) -> ValueTypeSpec: return ValueTypeSpec(kind=ValueKind.FREE_STRING, notes=notes) def _array(of: ValueKind, notes: Optional[str] = None) -> ValueTypeSpec: return ValueTypeSpec(kind=ValueKind.ARRAY, array_of=of, notes=notes) # ─── The registry ─────────────────────────────────────────────────────────── # # 28 primitives across 4 layers. Mirrors scratchpad.md row-for-row. PRIMITIVE_REGISTRY: dict[str, ValueTypeSpec] = { # ── stylometric.* (motor analog — 8) ────────────────────────────────── "stylometric.punctuation_style": _hash(notes="canonical punctuation-pattern fingerprint"), "stylometric.capitalization_habit": _cat("lowercase", "proper", "random_caps", "mixed_i"), "stylometric.emoji_usage": _cat("none", "occasional", "frequent", "exclusive"), "stylometric.emoji_placement": _cat("pre_punctuation", "post_punctuation", "no_punctuation", "mixed"), "stylometric.message_length_class": _cat("short", "medium", "long", "paragraph"), "stylometric.message_length_variance_class": _cat( "tight", "varied", "bimodal", notes="Coefficient of variation of per-message word counts. Captures " "DISTRIBUTION SHAPE that message_length_class collapses by " "emitting only the median bucket. Two authors can share the same " "median length but have wildly different variance: `tight` (CV<0.5) " "= consistent (always 1-3 words), `varied` (0.5<=CV<1.5) = normal " "mix, `bimodal` (CV>=1.5) = long-tail (mostly short with occasional " "rants). Added in v0.2 after Rutify calibration found median-only " "bucketing discarded most of the per-author variance signal.", ), "stylometric.linebreak_style": _cat("single_thought", "multi_line", "wall_of_text"), "stylometric.typo_signature": _hash(notes="sha256 of canonical persistent-typo set"), "stylometric.function_word_distribution_top50": _hash( notes="64-bit simhash over the 50-most-common Spanish function-word frequency " "vector. Mosteller-Wallace gold standard for English long-form authorship; " "EMPIRICALLY DOMAIN-FLAWED for Spanish chat-domain — calibrated 2026-05-02 " "against the Rutify corpus showed within-author and cross-author Hamming " "distance distributions overlap (within median 8 bits, cross median 10 " "bits) so this primitive ALONE cannot discriminate authors in chat-style " "short-message corpora. Engines should weight it low until paired with " "the larger top-200 variant or composited with character n-gram and " "distinctive-vocabulary signatures (see siblings below). Kept in v0 for " "calibration grids and documentary purposes.", ), "stylometric.function_word_distribution_top200": _hash( notes="64-bit simhash over the 200-most-common Spanish function-word frequency " "vector. The wider list reaches into the long tail (rare-but-individual " "function words like `tampoco`, `aunque`, `mientras`) that carry more " "discriminating signal in short-message chat domains. NOT YET EMITTED by " "the v0 prototype extractor; populated when v0.2 calibration is done.", ), "stylometric.character_ngram_simhash": _hash( notes="64-bit simhash over a frequency vector of character n-grams (default " "n=3) from the author's lowercased text corpus. ORTHOGONAL to " "function-word distributions: captures punctuation tics, accent-" "stripping habits, typo patterns, and idiom-fragment fingerprints " "that survive paraphrase. Lowercases input so that capitalization " "habits — already captured by stylometric.capitalization_habit — " "do not double-count. Accents PRESERVED because accent-stripping is " "itself a stylistic tic worth catching. Source label declares n size " "(e.g. `#char3gram`, `#char4gram`).", ), "stylometric.distinctive_vocabulary_signature": _hash( notes="64-bit simhash over a TF-IDF-weighted top-K rare-word vector. " "COMPLEMENTARY to function-word distributions: where function_word_* " "captures common-word *style*, this captures the author's distinctive " "*lexicon* (the words this person uses that other authors in the same " "corpus do NOT). Strong against context-shift because rare words are " "where authorial choice lives. Requires the chat corpus for IDF " "computation, performed once per extraction. Source label declares the " "top-K size and corpus tag (e.g. `#tfidf-top50`).", ), # ── lexical.* (cognitive analog — 8) ────────────────────────────────── "lexical.vocabulary_richness": _num( min_val=0.0, max_val=1.0, notes="Moving-Average Type-Token Ratio (MATTR) over a sliding window " "(default 50 tokens). Volume-independent: each window contributes " "its own unique/total ratio, the primitive's value is the mean. " "Avoids the standard TTR bias where larger corpora mechanically " "score lower. Source label declares the window size.", ), "lexical.slang_density": _num(min_val=0.0, max_val=1.0, notes="rate per message; locale-tuned slang corpus"), "lexical.code_switching_rate": _num(min_val=0.0, max_val=1.0, notes="switches per N tokens; Solorio & Liu metric"), "lexical.code_switching_matrix_language": _str(notes="BCP-47 of dominant language"), "lexical.code_switching_embedded_languages": _array(ValueKind.FREE_STRING, notes="BCP-47 list of non-matrix languages observed"), "lexical.sentence_complexity_class": _cat("simple", "compound", "complex"), "lexical.question_formation_style": _cat("punctuation_only", "lexical", "formal"), "lexical.imperative_style": _cat("informal_directive", "formal_directive", "polite"), # ── temporal_evolution.* (lifecycle / change-over-time — 1) ─────────── "temporal_evolution.lifecycle_phase": _cat( "arrival_burst", "stable_member", "fluctuating_member", "inflection_member", "declining_member", "unknown", notes="Auto-classified lifecycle stage derived from windowed within-" "corpus analysis. arrival_burst: tenure < 24hr with first-window " "volume dominating later windows and high inter-window drift " "(empirically validated 2026-05-03 against OxPayload's first 12 " "hours on Rutify). stable_member: low drift between consecutive " "windows across the whole tenure. fluctuating_member (added v0.3): " "tenure ≥ 24hr with median drift in [stable_max, inflection_min) " "and no single window crossing inflection_min — established noisy " "regulars who don't fit clean stable/inflection classes (e.g. " "labelled admin lamarabitch, formerly classified unknown). " "inflection_member: long-tenure actor whose drift spikes in at " "least one window-pair (a real behavioral shift mid-corpus). " "declining_member: monotonically decreasing per-window message " "counts. unknown: insufficient windowed data for classification. " "Window size adapts to tenure: <24hr → 2h windows, <7d → 12h, " "<30d → 1d, otherwise 7d.", ), # ── network.* (governance/role-shape signals — 2, added v0.3) ───────── "network.is_likely_bot": _cat( "likely_bot", "not_bot", "unknown", notes="Heuristic bot detector composited from existing primitives. " "Classifies as likely_bot when conversation_initiation_rate ≥ 0.95 " "AND attention_pattern = broadcast AND vocabulary_richness < 0.65. " "Empirically validated 2026-05-03 against the tdl-labeled Rutify " "bot SangMata_beta_bot (correctly caught) vs 11 high-volume humans " "in the same corpus (none false-positive). NOT a verdict — engines " "should treat as a candidate signal, especially since low-volume " "bots (e.g. QuotLyBot with 9 messages) sit below the fingerprint " "threshold and emit nothing here. Source label declares the " "heuristic version (e.g. #bot-heuristic-v1).", ), "network.governance_role_signal": _cat( "admin_pattern", "responder_pattern", "regular", "bot_pattern", "unknown", notes="Heuristic role-shape composited from interaction primitives + " "lifecycle_phase. admin_pattern: init_rate ≥ 0.80 AND attn = " "reciprocal AND non-bot AND not arrival_burst. responder_pattern: " "init_rate ≤ 0.45 AND attn = reciprocal. bot_pattern: matches " "network.is_likely_bot likely_bot. regular: everything else above " "the volume threshold. Empirically caught all 4 high-volume " "tdl-labeled Rutify admins, sebaImlI as responder, " "SangMata_beta_bot as bot, OxPayload/bopxcx as regular (their " "arrival_burst lifecycle overrides the admin-shaped init_rate). " "NOT a ground-truth admin label — kkaxlazer matches admin_pattern " "while not formally admin, but the 2026-05-03 reply-graph cohort " "analysis showed they're operationally embedded in the admin " "layer (4/4 cohort signal with the top admin), so the heuristic " "is doing the right thing.", ), # ── interaction.* (temporal analog — 6) ─────────────────────────────── "interaction.response_latency_class": _cat("immediate", "fast", "normal", "slow", "sporadic"), "interaction.conversation_initiation_rate": _num(min_val=0.0, max_val=1.0, notes="thread-starting messages / total"), "interaction.message_burst_rate": _cat("single", "occasional", "habitual"), "interaction.active_hours_class": _str(notes="UTC active-hours window summary"), "interaction.session_duration_class": _cat("short", "medium", "long", "marathon", notes="REUSED enum from BEHAVE-SHELL temporal.session_duration"), "interaction.attention_pattern": _cat("broadcast", "focused", "reciprocal", notes="from reply-graph centrality"), # ── content.* (operational analog — 6, EXPERIMENTAL) ────────────────── "content.role_signal": _cat("admin", "seller", "buyer", "lurker", "newbie", notes="EXPERIMENTAL — locale-tuned role-vocabulary classifier; " "may be moved to a separate IOC/keyword-detection layer " "once tested against the Rutify corpus"), "content.transactional_language": _num(min_val=0.0, max_val=1.0, notes="EXPERIMENTAL — rate of transactional terms; " "locale-specific, brittle to vocabulary drift"), "content.opsec_awareness": _num(min_val=0.0, max_val=1.0, notes="EXPERIMENTAL — rate of security-conscious phrases; " "HIGH FALSE-POSITIVE RISK on casual conversation about " "deleting files / messages"), "content.targeting_language": _array(ValueKind.FREE_STRING, notes="EXPERIMENTAL — IOC-shaped target patterns " "(bank names, government portals, RUT ranges, etc); " "consider moving to dedicated IOC layer"), "content.boasting_pattern": _cat("none", "occasional", "frequent", notes="EXPERIMENTAL — success-claim regex; corpus-dependent"), "content.conflict_style": _cat("aggressive", "defusing", "appellate", notes="EXPERIMENTAL — dispute-tone classifier; needs " "labelled training data"), } def is_known(primitive: str) -> bool: return primitive in PRIMITIVE_REGISTRY def get(primitive: str) -> ValueTypeSpec: """Return the value-type spec for *primitive*; raise KeyError if unknown.""" return PRIMITIVE_REGISTRY[primitive]