feat(core): initial decnet_behave_core spec + tests

Shared observation envelope and schema contract. GPLv3.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-10 06:17:25 -04:00
parent 96dad4b52c
commit 73c1eabf10
5 changed files with 278 additions and 0 deletions

View File

View File

@@ -0,0 +1,28 @@
# SPDX-License-Identifier: GPL-3.0-or-later
"""BEHAVE shared observation envelope — package surface.
Importable as::
from decnet_behave_core.spec.envelope import Observation, Window, OBSERVATION_SCHEMA_VERSION
# or, equivalently, from this top-level re-export:
from decnet_behave_core.spec import Observation, Window, OBSERVATION_SCHEMA_VERSION
Both BEHAVE-SHELL and BEHAVE-TEXT depend on this package as their single source of
truth for the wire-format envelope. JSON Schema artifacts in each sibling package
are generated from THIS Pydantic model — they should always be byte-identical
modulo the ``$id`` URL.
"""
from .envelope import (
OBSERVATION_SCHEMA_VERSION,
Observation,
ObservationValue,
Window,
)
__all__ = [
"OBSERVATION_SCHEMA_VERSION",
"Observation",
"ObservationValue",
"Window",
]

View File

@@ -0,0 +1,137 @@
# SPDX-License-Identifier: GPL-3.0-or-later
"""BEHAVE observation envelope.
PII discipline (non-negotiable, lifted from
DECNET decnet/web/db/models/attackers.py:268-285,308-311):
BEHAVE observations carry CATEGORICAL LABELS, TIMING AGGREGATES, and
HASHES only. They MUST NOT carry:
* raw keystroke content
* command bodies or argument values
* passwords, tokens, session keys, or any authentication material
* file contents or payload bytes
The `evidence_ref` field is a POINTER to underlying evidence held
elsewhere (e.g. session tape, packet capture, sensor-side blob store).
Never the evidence itself.
Sensors that cannot satisfy this constraint must not emit BEHAVE
observations.
Intended use: sensor → bus envelope for behavioral observations consumed
by attribution engines, analytics, and federation gossip.
Explicitly NOT for:
identity attribution to named natural persons; access or
admission decisions; biometric login; ML-driven user
identification. Those framings push into legal/ethics
territory the project will not walk into by accident.
Schema version is non-negotiable from day one — federation gossip will
share observation streams across operators in v2; bumping field shapes
without a version field silently poisons receivers
(see DECNET attackers.py:117-120,267 for the same rationale).
"""
from __future__ import annotations
import time
import uuid
from typing import Any, Optional, Union
from pydantic import BaseModel, ConfigDict, Field, model_validator
OBSERVATION_SCHEMA_VERSION: int = 1
# Broad-enough union for every primitive's value shape. Per-primitive validation
# is the consumer's job — see the registry-aware Observation subclasses in each
# sibling package (BEHAVE-SHELL, BEHAVE-TEXT). The core envelope is intentionally
# registry-agnostic so it can be shared across frameworks with different
# primitive vocabularies.
ObservationValue = Union[
str, int, float, bool, list[str], list[int], list[float], dict[str, Any]
]
class Window(BaseModel):
"""Measurement window. For point observations, ``start_ts == end_ts``.
Both fields are epoch seconds (float). Distinct from ``Observation.ts``
(the emission time), because a sensor may compute an observation over
a window in the past and emit it later.
"""
model_config = ConfigDict(frozen=True)
start_ts: float = Field(..., description="Window start, epoch seconds")
end_ts: float = Field(..., description="Window end, epoch seconds (>= start_ts)")
@model_validator(mode="after")
def _end_after_start(self) -> "Window":
if self.end_ts < self.start_ts:
raise ValueError(f"end_ts ({self.end_ts}) must be >= start_ts ({self.start_ts})")
return self
class Observation(BaseModel):
"""A single BEHAVE observation. See module docstring for PII discipline.
Wire-format alignment with DECNET's ``Event`` (decnet/bus/base.py:26):
topic = "attacker.observation." + observation.primitive
payload = observation.model_dump(exclude={"id", "ts", "v"})
type = observation.primitive
v = observation.v
ts = observation.ts
id = observation.id
See ``spec.event_adapter`` for the helpers that perform this projection.
"""
model_config = ConfigDict(extra="forbid")
primitive: str = Field(
...,
description="Fully-qualified primitive path, e.g. 'motor.keystroke_cadence'",
)
value: ObservationValue = Field(
...,
description="Value typed by the primitive's registry entry; see spec.primitives",
)
confidence: float = Field(
...,
ge=0.0,
le=1.0,
description="Sensor's confidence in this measurement (not in any downstream verdict)",
)
window: Window = Field(..., description="Measurement window")
source: str = Field(
...,
min_length=1,
description="Canonical sensor identifier, e.g. 'decnet/sniffer/timing.py'",
)
evidence_ref: Optional[str] = Field(
default=None,
description="Pointer to underlying raw evidence; NEVER the evidence itself",
)
identity_ref: Optional[str] = Field(
default=None,
description="AttackerIdentity UUID if the observation is pre-attributed",
)
ts: float = Field(
default_factory=time.time,
description="Emission timestamp, epoch seconds",
)
id: str = Field(
default_factory=lambda: uuid.uuid4().hex,
description="UUID for dedup",
)
v: int = Field(
default=OBSERVATION_SCHEMA_VERSION,
description="Envelope schema version",
)
# Note: this base class does NOT validate `primitive` against any registry,
# nor `value` against per-primitive type specs. Sibling packages (BEHAVE-SHELL,
# BEHAVE-TEXT) provide registry-aware subclasses that add those checks via an
# additional model_validator. The core class enforces only structural
# invariants (window ordering, confidence bounds, required fields, no extras).

25
core/pyproject.toml Normal file
View File

@@ -0,0 +1,25 @@
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "decnet-behave-core"
version = "0.1.0"
description = "BEHAVE shared observation envelope — schema contract used by BEHAVE-SHELL and BEHAVE-TEXT"
requires-python = ">=3.11"
license = { text = "GPL-3.0-or-later" }
authors = [{ name = "ANTI" }]
dependencies = ["pydantic>=2.6"]
[project.optional-dependencies]
dev = ["pytest>=8", "pytest-cov", "ruff"]
[project.urls]
"Source" = "https://git.resacachile.cl/anti/BEHAVE"
[tool.setuptools.packages.find]
include = ["decnet_behave_core*"]
[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "-q --import-mode=importlib"

View File

@@ -0,0 +1,88 @@
# SPDX-License-Identifier: GPL-3.0-or-later
"""Structural validation tests for the shared Observation envelope.
These tests are STRUCTURAL ONLY — window ordering, confidence bounds, schema
version, round-trip serialization, extra-field rejection. Registry-aware
validation (unknown primitives, categorical-allowed-set, numeric-min-bound,
etc.) lives in each sibling package's own test_envelope.py because the
registry IS the sibling-specific concern.
"""
from __future__ import annotations
import pytest
from pydantic import ValidationError
from decnet_behave_core.spec import OBSERVATION_SCHEMA_VERSION, Observation, Window
def _make(primitive: str = "motor.example", value="x", **kwargs) -> Observation:
base = dict(
primitive=primitive,
value=value,
confidence=0.8,
window=Window(start_ts=1.0, end_ts=2.0),
source="test/sensor",
)
base.update(kwargs)
return Observation(**base)
def test_minimal_observation_round_trips():
obs = _make()
obs2 = Observation.model_validate_json(obs.model_dump_json())
assert obs == obs2
def test_schema_version_pinned_to_one():
assert OBSERVATION_SCHEMA_VERSION == 1
obs = _make()
assert obs.v == 1
def test_window_end_must_be_after_start():
with pytest.raises(ValidationError):
Window(start_ts=2.0, end_ts=1.0)
def test_window_point_event_allowed():
w = Window(start_ts=5.0, end_ts=5.0)
assert w.start_ts == w.end_ts
def test_confidence_must_be_in_unit_interval():
with pytest.raises(ValidationError):
_make(confidence=-0.01)
with pytest.raises(ValidationError):
_make(confidence=1.01)
def test_extra_fields_forbidden():
with pytest.raises(ValidationError):
Observation(
primitive="motor.example",
value="x",
confidence=0.5,
window=Window(start_ts=1.0, end_ts=2.0),
source="test/sensor",
unknown_field="oops",
)
def test_id_and_ts_auto_default():
obs1 = _make()
obs2 = _make()
assert obs1.id != obs2.id
assert obs1.ts > 0
def test_core_envelope_is_registry_agnostic():
"""The base Observation accepts any primitive string; sibling subclasses validate."""
obs = _make(primitive="anything.goes.here", value="anything")
assert obs.primitive == "anything.goes.here"
assert obs.value == "anything"
def test_source_must_be_nonempty():
with pytest.raises(ValidationError):
_make(source="")