# SPDX-License-Identifier: GPL-3.0-or-later """DECNET interop tests for the event adapter.""" from __future__ import annotations import pytest from behave_shell.spec import ( Observation, Window, event_topic_for, from_event_payload, to_event_payload, ) def _obs(**kwargs) -> Observation: base = dict( primitive="motor.keystroke_cadence", value="steady", confidence=0.8, window=Window(start_ts=1.0, end_ts=2.0), source="test/sensor", ) base.update(kwargs) return Observation(**base) def test_topic_derivation_uses_attacker_observation_prefix(): topic = event_topic_for("motor.keystroke_cadence") assert topic == "attacker.observation.motor.keystroke_cadence" def test_topic_handles_deeply_nested_primitive(): topic = event_topic_for("toolchain.protocol_abuse.smb_dialect") assert topic == "attacker.observation.toolchain.protocol_abuse.smb_dialect" def test_payload_excludes_envelope_level_fields(): obs = _obs() payload = to_event_payload(obs) # These fields ride at the DECNET Event envelope, not in the payload body. assert "id" not in payload assert "ts" not in payload assert "v" not in payload # These remain in the payload body. assert payload["primitive"] == "motor.keystroke_cadence" assert payload["value"] == "steady" assert payload["confidence"] == 0.8 assert payload["source"] == "test/sensor" def test_round_trip_through_event_payload(): obs = _obs( evidence_ref="session_X/keystrokes[0:42]", identity_ref="00000000000000000000000000000001", ) payload = to_event_payload(obs) reconstructed = from_event_payload("motor.keystroke_cadence", payload) # id and ts will differ (auto-generated on reconstruct), v defaults match. assert reconstructed.primitive == obs.primitive assert reconstructed.value == obs.value assert reconstructed.confidence == obs.confidence assert reconstructed.window == obs.window assert reconstructed.source == obs.source assert reconstructed.evidence_ref == obs.evidence_ref assert reconstructed.identity_ref == obs.identity_ref assert reconstructed.v == obs.v def test_from_event_payload_rejects_topic_payload_mismatch(): obs = _obs() payload = to_event_payload(obs) # payload still carries primitive="motor.keystroke_cadence"; reconstructing # under a different topic-derived primitive must refuse rather than silently # adopt the wire-side value (see decnet/bus/base.py:60-76 for the same anti- # spoofing discipline). with pytest.raises(ValueError, match="does not match"): from_event_payload("toolchain.tls.ja3_client", payload) def test_payload_is_json_serializable(): import json obs = _obs(primitive="toolchain.ssh.kex_algorithm_order", value=["a", "b"]) payload = to_event_payload(obs) serialized = json.dumps(payload) deserialized = json.loads(serialized) assert deserialized["value"] == ["a", "b"]