# SPDX-License-Identifier: GPL-3.0-or-later """BEHAVE observation envelope. PII discipline (non-negotiable, lifted from DECNET decnet/web/db/models/attackers.py:268-285,308-311): BEHAVE observations carry CATEGORICAL LABELS, TIMING AGGREGATES, and HASHES only. They MUST NOT carry: * raw keystroke content * command bodies or argument values * passwords, tokens, session keys, or any authentication material * file contents or payload bytes The `evidence_ref` field is a POINTER to underlying evidence held elsewhere (e.g. session tape, packet capture, sensor-side blob store). Never the evidence itself. Sensors that cannot satisfy this constraint must not emit BEHAVE observations. Intended use: sensor → bus envelope for behavioral observations consumed by attribution engines, analytics, and federation gossip. Explicitly NOT for: identity attribution to named natural persons; access or admission decisions; biometric login; ML-driven user identification. Those framings push into legal/ethics territory the project will not walk into by accident. Schema version is non-negotiable from day one — federation gossip will share observation streams across operators in v2; bumping field shapes without a version field silently poisons receivers (see DECNET attackers.py:117-120,267 for the same rationale). """ from __future__ import annotations import time import uuid from typing import Any, Optional, Union from pydantic import BaseModel, ConfigDict, Field, model_validator OBSERVATION_SCHEMA_VERSION: int = 1 # Broad-enough union for every primitive's value shape. Per-primitive validation # is the consumer's job — see the registry-aware Observation subclasses in each # sibling package (BEHAVE-SHELL, BEHAVE-TEXT). The core envelope is intentionally # registry-agnostic so it can be shared across frameworks with different # primitive vocabularies. ObservationValue = Union[ str, int, float, bool, list[str], list[int], list[float], dict[str, Any] ] class Window(BaseModel): """Measurement window. For point observations, ``start_ts == end_ts``. Both fields are epoch seconds (float). Distinct from ``Observation.ts`` (the emission time), because a sensor may compute an observation over a window in the past and emit it later. """ model_config = ConfigDict(frozen=True) start_ts: float = Field(..., description="Window start, epoch seconds") end_ts: float = Field(..., description="Window end, epoch seconds (>= start_ts)") @model_validator(mode="after") def _end_after_start(self) -> "Window": if self.end_ts < self.start_ts: raise ValueError(f"end_ts ({self.end_ts}) must be >= start_ts ({self.start_ts})") return self class Observation(BaseModel): """A single BEHAVE observation. See module docstring for PII discipline. Wire-format alignment with DECNET's ``Event`` (decnet/bus/base.py:26): topic = "attacker.observation." + observation.primitive payload = observation.model_dump(exclude={"id", "ts", "v"}) type = observation.primitive v = observation.v ts = observation.ts id = observation.id See ``spec.event_adapter`` for the helpers that perform this projection. """ model_config = ConfigDict(extra="forbid") primitive: str = Field( ..., description="Fully-qualified primitive path, e.g. 'motor.keystroke_cadence'", ) value: ObservationValue = Field( ..., description="Value typed by the primitive's registry entry; see spec.primitives", ) confidence: float = Field( ..., ge=0.0, le=1.0, description="Sensor's confidence in this measurement (not in any downstream verdict)", ) window: Window = Field(..., description="Measurement window") source: str = Field( ..., min_length=1, description="Canonical sensor identifier, e.g. 'decnet/sniffer/timing.py'", ) evidence_ref: Optional[str] = Field( default=None, description="Pointer to underlying raw evidence; NEVER the evidence itself", ) identity_ref: Optional[str] = Field( default=None, description="AttackerIdentity UUID if the observation is pre-attributed", ) ts: float = Field( default_factory=time.time, description="Emission timestamp, epoch seconds", ) id: str = Field( default_factory=lambda: uuid.uuid4().hex, description="UUID for dedup", ) v: int = Field( default=OBSERVATION_SCHEMA_VERSION, description="Envelope schema version", ) # Note: this base class does NOT validate `primitive` against any registry, # nor `value` against per-primitive type specs. Sibling packages (BEHAVE-SHELL, # BEHAVE-TEXT) provide registry-aware subclasses that add those checks via an # additional model_validator. The core class enforces only structural # invariants (window ordering, confidence bounds, required fields, no extras).