"""Attribution-engine bus subscriber — v0 Phase 1 skeleton. Subscribes to ``attacker.observation.>`` and, for each event, ensures the source attacker has a stub identity in ``attacker_identities``. Phase 1 does **not** invoke the merger or write ``attribution_state`` rows; that wiring lands in Phase 4 once the Phase 2/3 mergers are in. Pattern mirrors :mod:`decnet.correlation.reuse_worker`: bus-subscribe with a wake event, fall back to poll-only if the bus is unavailable, publish derived events with :func:`publish_safely`, log per-handler exceptions and continue. Trigger isolation: the per-event handler is wrapped in a single try/except. Any exception is logged and the loop continues with the next event. This is the same posture BEHAVE-SHELL's ``_handler.handle_session_ended`` adopts. """ from __future__ import annotations import asyncio import contextlib from typing import Any from decnet.bus import topics as _topics from decnet.bus.base import BaseBus from decnet.bus.factory import get_bus from decnet.bus.publish import ( run_control_listener_signal as _run_control_listener_signal, run_health_heartbeat as _run_health_heartbeat, ) from decnet.logging import get_logger from decnet.web.db.repository import BaseRepository log = get_logger("correlation.attribution_worker") _WORKER_NAME = "attribution" _OBSERVATION_PATTERN = f"{_topics.ATTACKER}.{_topics.ATTACKER_OBSERVATION_PREFIX}.>" async def run_attribution_loop( repo: BaseRepository, *, shutdown: asyncio.Event | None = None, ) -> None: """Run the attribution worker until cancelled. *shutdown* is an optional external stop signal; the loop also exits cleanly on ``CancelledError`` and ``KeyboardInterrupt``. """ log.info("attribution worker started pattern=%s", _OBSERVATION_PATTERN) bus: BaseBus | None = None sub_task: asyncio.Task | None = None heartbeat_task: asyncio.Task | None = None control_task: asyncio.Task | None = None try: candidate = get_bus(client_name=f"{_WORKER_NAME}-correlator") await candidate.connect() bus = candidate sub_task = asyncio.create_task( _consume_observations(bus, repo), ) heartbeat_task = asyncio.create_task( _run_health_heartbeat(bus, _WORKER_NAME), ) control_task = asyncio.create_task( _run_control_listener_signal(bus, _WORKER_NAME), ) except Exception as exc: # noqa: BLE001 log.warning( "attribution worker: bus unavailable, idle until bus returns: %s", exc, ) if shutdown is None: shutdown = asyncio.Event() try: await shutdown.wait() except (asyncio.CancelledError, KeyboardInterrupt): log.info("attribution worker stopped") finally: for task in (sub_task, heartbeat_task, control_task): if task is None: continue task.cancel() with contextlib.suppress(asyncio.CancelledError, Exception): await task if bus is not None: with contextlib.suppress(Exception): await bus.close() async def _consume_observations( bus: BaseBus, repo: BaseRepository, ) -> None: """Pull events off ``attacker.observation.>`` and dispatch each to :func:`handle_observation_event`. Per-event exceptions are caught and logged; the subscription survives bad payloads. If the subscription itself dies (bus disconnect), the worker idles — the supervisor systemd unit will restart on a clean exit. """ try: sub = bus.subscribe(_OBSERVATION_PATTERN) async with sub: async for event in sub: try: await handle_observation_event(bus, repo, event) except Exception: # noqa: BLE001 log.exception("attribution worker: handler failed") except asyncio.CancelledError: raise except Exception as exc: # noqa: BLE001 log.warning( "attribution worker: subscriber for %s died (%s)", _OBSERVATION_PATTERN, exc, ) async def handle_observation_event( bus: BaseBus | None, repo: BaseRepository, event: Any, ) -> None: """Handle one ``attacker.observation.`` event. Phase 1: ensure the source attacker has a stub identity, then log and return. Phase 4 will: load prior state, run merger, upsert new state, emit ``attribution.profile.state_changed`` on transition. *event* is whatever shape :class:`BaseBus`'s subscription yields — a ``BusEvent`` with ``payload`` (dict) and ``event_type`` (str) fields. The payload carries the BEHAVE envelope plus DECNET-side ``attacker_uuid`` denorm (see ``decnet.profiler.behave_shell._handler._publish_observation``). """ payload = _payload_of(event) attacker_uuid = payload.get("attacker_uuid") primitive = payload.get("primitive") if not attacker_uuid or not primitive: log.debug( "attribution worker: skipping malformed event (uuid=%r primitive=%r)", attacker_uuid, primitive, ) return identity_uuid = await repo.ensure_stub_identity_for_attacker( str(attacker_uuid), ) if identity_uuid is None: log.info( "attribution worker: no Attacker row for uuid=%s yet; deferring", attacker_uuid, ) return # Phase 4 will run the merger here and emit # ``attribution.profile.state_changed`` on transition. Phase 1 # ends with stub materialisation only. log.debug( "attribution worker: stub identity=%s for attacker=%s primitive=%s", identity_uuid, attacker_uuid, primitive, ) def _payload_of(event: Any) -> dict[str, Any]: """Extract the dict payload from a BusEvent or fall through if *event* is already a dict (test fixtures may pass either).""" payload = getattr(event, "payload", event) return payload if isinstance(payload, dict) else {} __all__ = [ "run_attribution_loop", "handle_observation_event", ]