feat(shell): initial decnet_behave_shell spec + tests

Shell-session behavioral observation registry layered on core.
SPDX: GPL-3.0-or-later (code) / CC-BY-SA-4.0 (attribution-recipes.md).
This commit is contained in:
2026-05-10 06:17:28 -04:00
parent abe7dde778
commit bf654b9aed
12 changed files with 1337 additions and 0 deletions

View File

@@ -0,0 +1,89 @@
# SPDX-License-Identifier: GPL-3.0-or-later
"""DECNET interop tests for the event adapter."""
from __future__ import annotations
import pytest
from decnet_behave_shell.spec import (
Observation,
Window,
event_topic_for,
from_event_payload,
to_event_payload,
)
def _obs(**kwargs) -> Observation:
base = dict(
primitive="motor.keystroke_cadence",
value="steady",
confidence=0.8,
window=Window(start_ts=1.0, end_ts=2.0),
source="test/sensor",
)
base.update(kwargs)
return Observation(**base)
def test_topic_derivation_uses_attacker_observation_prefix():
topic = event_topic_for("motor.keystroke_cadence")
assert topic == "attacker.observation.motor.keystroke_cadence"
def test_topic_handles_deeply_nested_primitive():
topic = event_topic_for("toolchain.protocol_abuse.smb_dialect")
assert topic == "attacker.observation.toolchain.protocol_abuse.smb_dialect"
def test_payload_excludes_envelope_level_fields():
obs = _obs()
payload = to_event_payload(obs)
# These fields ride at the DECNET Event envelope, not in the payload body.
assert "id" not in payload
assert "ts" not in payload
assert "v" not in payload
# These remain in the payload body.
assert payload["primitive"] == "motor.keystroke_cadence"
assert payload["value"] == "steady"
assert payload["confidence"] == 0.8
assert payload["source"] == "test/sensor"
def test_round_trip_through_event_payload():
obs = _obs(
evidence_ref="session_X/keystrokes[0:42]",
identity_ref="00000000000000000000000000000001",
)
payload = to_event_payload(obs)
reconstructed = from_event_payload("motor.keystroke_cadence", payload)
# id and ts will differ (auto-generated on reconstruct), v defaults match.
assert reconstructed.primitive == obs.primitive
assert reconstructed.value == obs.value
assert reconstructed.confidence == obs.confidence
assert reconstructed.window == obs.window
assert reconstructed.source == obs.source
assert reconstructed.evidence_ref == obs.evidence_ref
assert reconstructed.identity_ref == obs.identity_ref
assert reconstructed.v == obs.v
def test_from_event_payload_rejects_topic_payload_mismatch():
obs = _obs()
payload = to_event_payload(obs)
# payload still carries primitive="motor.keystroke_cadence"; reconstructing
# under a different topic-derived primitive must refuse rather than silently
# adopt the wire-side value (see decnet/bus/base.py:60-76 for the same anti-
# spoofing discipline).
with pytest.raises(ValueError, match="does not match"):
from_event_payload("toolchain.tls.ja3_client", payload)
def test_payload_is_json_serializable():
import json
obs = _obs(primitive="toolchain.ssh.kex_algorithm_order", value=["a", "b"])
payload = to_event_payload(obs)
serialized = json.dumps(payload)
deserialized = json.loads(serialized)
assert deserialized["value"] == ["a", "b"]