From 5116023bf7239655f6e41c9d16092d07732b6355 Mon Sep 17 00:00:00 2001 From: anti Date: Fri, 8 May 2026 20:18:32 -0400 Subject: [PATCH] feat(profiler/behave_shell): stamp attacker_uuid on bus payload (Phase 5 prep) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The profiler worker's per-observation publish now re-merges attacker_uuid into the bus payload alongside id/ts/v. Same shape as the existing DECNET-side deviation from BEHAVE's wire-format docstring (BEHAVE-INTEGRATION.md §339-366) — widens the deviation by one DECNET denorm field. Phase 5's per-attacker SSE route can now filter attacker.observation.* events to one attacker in O(1) without a repo round-trip per event. identity_ref stays None today (until the attribution engine ships); attacker_uuid is independent. Two test changes: * test_happy_path_persists_and_publishes asserts attacker_uuid is in every published payload. * New test_attacker_uuid_in_payload_for_filter pins the field explicitly and confirms it doesn't conflate with identity_ref. --- decnet/profiler/behave_shell/_handler.py | 16 +++++++++---- .../test_handler_session_ended.py | 23 +++++++++++++++++++ 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/decnet/profiler/behave_shell/_handler.py b/decnet/profiler/behave_shell/_handler.py index 00bf2ed4..777a8aa0 100644 --- a/decnet/profiler/behave_shell/_handler.py +++ b/decnet/profiler/behave_shell/_handler.py @@ -96,15 +96,23 @@ def _flatten_observation(obs: Observation, attacker_uuid: str) -> dict[str, Any] } -def _publish_observation(publish: Optional[PublishFn], obs: Observation) -> None: - """Best-effort publish; never raise. Re-merges id/ts/v into payload - per BEHAVE-INTEGRATION.md §339-366 deviation note.""" +def _publish_observation( + publish: Optional[PublishFn], + obs: Observation, + attacker_uuid: str, +) -> None: + """Best-effort publish; never raise. Re-merges id/ts/v plus + DECNET-side ``attacker_uuid`` denorm into payload per + BEHAVE-INTEGRATION.md §339-366 deviation note. The ``attacker_uuid`` + stamp gives the per-attacker SSE route an O(1) filter without a + repo round-trip per event (Phase 5).""" if publish is None: return payload = to_event_payload(obs) | { "id": obs.id, "ts": obs.ts, "v": obs.v, + "attacker_uuid": attacker_uuid, } try: publish(event_topic_for(obs.primitive), payload, obs.primitive) @@ -202,7 +210,7 @@ async def handle_session_ended( # 7. Publish — fire-and-forget, never raises out. for obs in observations: - _publish_observation(publish, obs) + _publish_observation(publish, obs, attacker_uuid) log.info( "behave_handler: persisted=%d primitives sid=%s attacker_ip=%s", diff --git a/tests/profiler/behave_shell/test_handler_session_ended.py b/tests/profiler/behave_shell/test_handler_session_ended.py index 9a058177..d3fa6f33 100644 --- a/tests/profiler/behave_shell/test_handler_session_ended.py +++ b/tests/profiler/behave_shell/test_handler_session_ended.py @@ -98,6 +98,9 @@ async def test_happy_path_persists_and_publishes(tmp_path) -> None: assert topic.startswith("attacker.observation.") # Adapter excludes id/ts/v from payload body; handler re-merges. assert "id" in payload and "ts" in payload and "v" in payload + # Phase 5 amendment: attacker_uuid is also re-merged so the + # per-attacker SSE route can filter in O(1). + assert payload["attacker_uuid"] == _ATTACKER_UUID async def test_missing_session_id_skipped(tmp_path) -> None: @@ -175,3 +178,23 @@ async def test_publish_none_is_silent(tmp_path) -> None: repo = _make_repo() n = await handle_session_ended(repo, _payload(shard_path), None) assert n > 0 + + +async def test_attacker_uuid_in_payload_for_filter(tmp_path) -> None: + """Phase 5 amendment: every published observation carries the + DECNET-side ``attacker_uuid`` denorm (NOT the BEHAVE + ``identity_ref``, which stays None until attribution exists).""" + shard_path = _shard_with_typing_session(tmp_path) + repo = _make_repo() + published: list[tuple[str, dict[str, Any], str]] = [] + publish = lambda topic, payload, etype: published.append((topic, payload, etype)) + + n = await handle_session_ended(repo, _payload(shard_path), publish) + + assert n > 0 + for _topic, payload, _etype in published: + assert payload["attacker_uuid"] == _ATTACKER_UUID + # identity_ref ride-along comes from the BEHAVE adapter's + # to_event_payload — None today, that's fine. The point is the + # attacker_uuid is INDEPENDENT of identity_ref. + assert payload.get("identity_ref") is None