Files
BEHAVE/BEHAVE-SHELL/tests/test_event_adapter.py
anti 22b57307cf refactor: drop decnet- prefix — BEHAVE is now standalone
Rename packages and imports:
  decnet-behave-core  → behave-core
  decnet-behave-shell → behave-shell
  decnet-behave-text  → behave-text
  decnet_behave_*     → behave_*

BEHAVE is no longer a DECNET sub-project.
2026-05-10 06:20:01 -04:00

90 lines
3.0 KiB
Python

# SPDX-License-Identifier: GPL-3.0-or-later
"""DECNET interop tests for the event adapter."""
from __future__ import annotations
import pytest
from behave_shell.spec import (
Observation,
Window,
event_topic_for,
from_event_payload,
to_event_payload,
)
def _obs(**kwargs) -> Observation:
base = dict(
primitive="motor.keystroke_cadence",
value="steady",
confidence=0.8,
window=Window(start_ts=1.0, end_ts=2.0),
source="test/sensor",
)
base.update(kwargs)
return Observation(**base)
def test_topic_derivation_uses_attacker_observation_prefix():
topic = event_topic_for("motor.keystroke_cadence")
assert topic == "attacker.observation.motor.keystroke_cadence"
def test_topic_handles_deeply_nested_primitive():
topic = event_topic_for("toolchain.protocol_abuse.smb_dialect")
assert topic == "attacker.observation.toolchain.protocol_abuse.smb_dialect"
def test_payload_excludes_envelope_level_fields():
obs = _obs()
payload = to_event_payload(obs)
# These fields ride at the DECNET Event envelope, not in the payload body.
assert "id" not in payload
assert "ts" not in payload
assert "v" not in payload
# These remain in the payload body.
assert payload["primitive"] == "motor.keystroke_cadence"
assert payload["value"] == "steady"
assert payload["confidence"] == 0.8
assert payload["source"] == "test/sensor"
def test_round_trip_through_event_payload():
obs = _obs(
evidence_ref="session_X/keystrokes[0:42]",
identity_ref="00000000000000000000000000000001",
)
payload = to_event_payload(obs)
reconstructed = from_event_payload("motor.keystroke_cadence", payload)
# id and ts will differ (auto-generated on reconstruct), v defaults match.
assert reconstructed.primitive == obs.primitive
assert reconstructed.value == obs.value
assert reconstructed.confidence == obs.confidence
assert reconstructed.window == obs.window
assert reconstructed.source == obs.source
assert reconstructed.evidence_ref == obs.evidence_ref
assert reconstructed.identity_ref == obs.identity_ref
assert reconstructed.v == obs.v
def test_from_event_payload_rejects_topic_payload_mismatch():
obs = _obs()
payload = to_event_payload(obs)
# payload still carries primitive="motor.keystroke_cadence"; reconstructing
# under a different topic-derived primitive must refuse rather than silently
# adopt the wire-side value (see decnet/bus/base.py:60-76 for the same anti-
# spoofing discipline).
with pytest.raises(ValueError, match="does not match"):
from_event_payload("toolchain.tls.ja3_client", payload)
def test_payload_is_json_serializable():
import json
obs = _obs(primitive="toolchain.ssh.kex_algorithm_order", value=["a", "b"])
payload = to_event_payload(obs)
serialized = json.dumps(payload)
deserialized = json.loads(serialized)
assert deserialized["value"] == ["a", "b"]