feat(core): initial decnet_behave_core spec + tests
Shared observation envelope and schema contract. GPLv3.
This commit is contained in:
0
core/decnet_behave_core/__init__.py
Normal file
0
core/decnet_behave_core/__init__.py
Normal file
28
core/decnet_behave_core/spec/__init__.py
Normal file
28
core/decnet_behave_core/spec/__init__.py
Normal file
@@ -0,0 +1,28 @@
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
"""BEHAVE shared observation envelope — package surface.
|
||||
|
||||
Importable as::
|
||||
|
||||
from decnet_behave_core.spec.envelope import Observation, Window, OBSERVATION_SCHEMA_VERSION
|
||||
# or, equivalently, from this top-level re-export:
|
||||
from decnet_behave_core.spec import Observation, Window, OBSERVATION_SCHEMA_VERSION
|
||||
|
||||
Both BEHAVE-SHELL and BEHAVE-TEXT depend on this package as their single source of
|
||||
truth for the wire-format envelope. JSON Schema artifacts in each sibling package
|
||||
are generated from THIS Pydantic model — they should always be byte-identical
|
||||
modulo the ``$id`` URL.
|
||||
"""
|
||||
|
||||
from .envelope import (
|
||||
OBSERVATION_SCHEMA_VERSION,
|
||||
Observation,
|
||||
ObservationValue,
|
||||
Window,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"OBSERVATION_SCHEMA_VERSION",
|
||||
"Observation",
|
||||
"ObservationValue",
|
||||
"Window",
|
||||
]
|
||||
137
core/decnet_behave_core/spec/envelope.py
Normal file
137
core/decnet_behave_core/spec/envelope.py
Normal file
@@ -0,0 +1,137 @@
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
"""BEHAVE observation envelope.
|
||||
|
||||
PII discipline (non-negotiable, lifted from
|
||||
DECNET decnet/web/db/models/attackers.py:268-285,308-311):
|
||||
|
||||
BEHAVE observations carry CATEGORICAL LABELS, TIMING AGGREGATES, and
|
||||
HASHES only. They MUST NOT carry:
|
||||
* raw keystroke content
|
||||
* command bodies or argument values
|
||||
* passwords, tokens, session keys, or any authentication material
|
||||
* file contents or payload bytes
|
||||
|
||||
The `evidence_ref` field is a POINTER to underlying evidence held
|
||||
elsewhere (e.g. session tape, packet capture, sensor-side blob store).
|
||||
Never the evidence itself.
|
||||
|
||||
Sensors that cannot satisfy this constraint must not emit BEHAVE
|
||||
observations.
|
||||
|
||||
Intended use: sensor → bus envelope for behavioral observations consumed
|
||||
by attribution engines, analytics, and federation gossip.
|
||||
Explicitly NOT for:
|
||||
identity attribution to named natural persons; access or
|
||||
admission decisions; biometric login; ML-driven user
|
||||
identification. Those framings push into legal/ethics
|
||||
territory the project will not walk into by accident.
|
||||
|
||||
Schema version is non-negotiable from day one — federation gossip will
|
||||
share observation streams across operators in v2; bumping field shapes
|
||||
without a version field silently poisons receivers
|
||||
(see DECNET attackers.py:117-120,267 for the same rationale).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
import uuid
|
||||
from typing import Any, Optional, Union
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, Field, model_validator
|
||||
|
||||
OBSERVATION_SCHEMA_VERSION: int = 1
|
||||
|
||||
# Broad-enough union for every primitive's value shape. Per-primitive validation
|
||||
# is the consumer's job — see the registry-aware Observation subclasses in each
|
||||
# sibling package (BEHAVE-SHELL, BEHAVE-TEXT). The core envelope is intentionally
|
||||
# registry-agnostic so it can be shared across frameworks with different
|
||||
# primitive vocabularies.
|
||||
ObservationValue = Union[
|
||||
str, int, float, bool, list[str], list[int], list[float], dict[str, Any]
|
||||
]
|
||||
|
||||
|
||||
class Window(BaseModel):
|
||||
"""Measurement window. For point observations, ``start_ts == end_ts``.
|
||||
|
||||
Both fields are epoch seconds (float). Distinct from ``Observation.ts``
|
||||
(the emission time), because a sensor may compute an observation over
|
||||
a window in the past and emit it later.
|
||||
"""
|
||||
|
||||
model_config = ConfigDict(frozen=True)
|
||||
|
||||
start_ts: float = Field(..., description="Window start, epoch seconds")
|
||||
end_ts: float = Field(..., description="Window end, epoch seconds (>= start_ts)")
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _end_after_start(self) -> "Window":
|
||||
if self.end_ts < self.start_ts:
|
||||
raise ValueError(f"end_ts ({self.end_ts}) must be >= start_ts ({self.start_ts})")
|
||||
return self
|
||||
|
||||
|
||||
class Observation(BaseModel):
|
||||
"""A single BEHAVE observation. See module docstring for PII discipline.
|
||||
|
||||
Wire-format alignment with DECNET's ``Event`` (decnet/bus/base.py:26):
|
||||
|
||||
topic = "attacker.observation." + observation.primitive
|
||||
payload = observation.model_dump(exclude={"id", "ts", "v"})
|
||||
type = observation.primitive
|
||||
v = observation.v
|
||||
ts = observation.ts
|
||||
id = observation.id
|
||||
|
||||
See ``spec.event_adapter`` for the helpers that perform this projection.
|
||||
"""
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
primitive: str = Field(
|
||||
...,
|
||||
description="Fully-qualified primitive path, e.g. 'motor.keystroke_cadence'",
|
||||
)
|
||||
value: ObservationValue = Field(
|
||||
...,
|
||||
description="Value typed by the primitive's registry entry; see spec.primitives",
|
||||
)
|
||||
confidence: float = Field(
|
||||
...,
|
||||
ge=0.0,
|
||||
le=1.0,
|
||||
description="Sensor's confidence in this measurement (not in any downstream verdict)",
|
||||
)
|
||||
window: Window = Field(..., description="Measurement window")
|
||||
source: str = Field(
|
||||
...,
|
||||
min_length=1,
|
||||
description="Canonical sensor identifier, e.g. 'decnet/sniffer/timing.py'",
|
||||
)
|
||||
evidence_ref: Optional[str] = Field(
|
||||
default=None,
|
||||
description="Pointer to underlying raw evidence; NEVER the evidence itself",
|
||||
)
|
||||
identity_ref: Optional[str] = Field(
|
||||
default=None,
|
||||
description="AttackerIdentity UUID if the observation is pre-attributed",
|
||||
)
|
||||
ts: float = Field(
|
||||
default_factory=time.time,
|
||||
description="Emission timestamp, epoch seconds",
|
||||
)
|
||||
id: str = Field(
|
||||
default_factory=lambda: uuid.uuid4().hex,
|
||||
description="UUID for dedup",
|
||||
)
|
||||
v: int = Field(
|
||||
default=OBSERVATION_SCHEMA_VERSION,
|
||||
description="Envelope schema version",
|
||||
)
|
||||
|
||||
# Note: this base class does NOT validate `primitive` against any registry,
|
||||
# nor `value` against per-primitive type specs. Sibling packages (BEHAVE-SHELL,
|
||||
# BEHAVE-TEXT) provide registry-aware subclasses that add those checks via an
|
||||
# additional model_validator. The core class enforces only structural
|
||||
# invariants (window ordering, confidence bounds, required fields, no extras).
|
||||
25
core/pyproject.toml
Normal file
25
core/pyproject.toml
Normal file
@@ -0,0 +1,25 @@
|
||||
[build-system]
|
||||
requires = ["setuptools>=68", "wheel"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "decnet-behave-core"
|
||||
version = "0.1.0"
|
||||
description = "BEHAVE shared observation envelope — schema contract used by BEHAVE-SHELL and BEHAVE-TEXT"
|
||||
requires-python = ">=3.11"
|
||||
license = { text = "GPL-3.0-or-later" }
|
||||
authors = [{ name = "ANTI" }]
|
||||
dependencies = ["pydantic>=2.6"]
|
||||
|
||||
[project.optional-dependencies]
|
||||
dev = ["pytest>=8", "pytest-cov", "ruff"]
|
||||
|
||||
[project.urls]
|
||||
"Source" = "https://git.resacachile.cl/anti/BEHAVE"
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
include = ["decnet_behave_core*"]
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
testpaths = ["tests"]
|
||||
addopts = "-q --import-mode=importlib"
|
||||
88
core/tests/test_envelope.py
Normal file
88
core/tests/test_envelope.py
Normal file
@@ -0,0 +1,88 @@
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
"""Structural validation tests for the shared Observation envelope.
|
||||
|
||||
These tests are STRUCTURAL ONLY — window ordering, confidence bounds, schema
|
||||
version, round-trip serialization, extra-field rejection. Registry-aware
|
||||
validation (unknown primitives, categorical-allowed-set, numeric-min-bound,
|
||||
etc.) lives in each sibling package's own test_envelope.py because the
|
||||
registry IS the sibling-specific concern.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from pydantic import ValidationError
|
||||
|
||||
from decnet_behave_core.spec import OBSERVATION_SCHEMA_VERSION, Observation, Window
|
||||
|
||||
|
||||
def _make(primitive: str = "motor.example", value="x", **kwargs) -> Observation:
|
||||
base = dict(
|
||||
primitive=primitive,
|
||||
value=value,
|
||||
confidence=0.8,
|
||||
window=Window(start_ts=1.0, end_ts=2.0),
|
||||
source="test/sensor",
|
||||
)
|
||||
base.update(kwargs)
|
||||
return Observation(**base)
|
||||
|
||||
|
||||
def test_minimal_observation_round_trips():
|
||||
obs = _make()
|
||||
obs2 = Observation.model_validate_json(obs.model_dump_json())
|
||||
assert obs == obs2
|
||||
|
||||
|
||||
def test_schema_version_pinned_to_one():
|
||||
assert OBSERVATION_SCHEMA_VERSION == 1
|
||||
obs = _make()
|
||||
assert obs.v == 1
|
||||
|
||||
|
||||
def test_window_end_must_be_after_start():
|
||||
with pytest.raises(ValidationError):
|
||||
Window(start_ts=2.0, end_ts=1.0)
|
||||
|
||||
|
||||
def test_window_point_event_allowed():
|
||||
w = Window(start_ts=5.0, end_ts=5.0)
|
||||
assert w.start_ts == w.end_ts
|
||||
|
||||
|
||||
def test_confidence_must_be_in_unit_interval():
|
||||
with pytest.raises(ValidationError):
|
||||
_make(confidence=-0.01)
|
||||
with pytest.raises(ValidationError):
|
||||
_make(confidence=1.01)
|
||||
|
||||
|
||||
def test_extra_fields_forbidden():
|
||||
with pytest.raises(ValidationError):
|
||||
Observation(
|
||||
primitive="motor.example",
|
||||
value="x",
|
||||
confidence=0.5,
|
||||
window=Window(start_ts=1.0, end_ts=2.0),
|
||||
source="test/sensor",
|
||||
unknown_field="oops",
|
||||
)
|
||||
|
||||
|
||||
def test_id_and_ts_auto_default():
|
||||
obs1 = _make()
|
||||
obs2 = _make()
|
||||
assert obs1.id != obs2.id
|
||||
assert obs1.ts > 0
|
||||
|
||||
|
||||
def test_core_envelope_is_registry_agnostic():
|
||||
"""The base Observation accepts any primitive string; sibling subclasses validate."""
|
||||
obs = _make(primitive="anything.goes.here", value="anything")
|
||||
assert obs.primitive == "anything.goes.here"
|
||||
assert obs.value == "anything"
|
||||
|
||||
|
||||
def test_source_must_be_nonempty():
|
||||
with pytest.raises(ValidationError):
|
||||
_make(source="")
|
||||
Reference in New Issue
Block a user