diff --git a/decnet/correlation/attribution_worker.py b/decnet/correlation/attribution_worker.py index 38487c88..2bba4c0a 100644 --- a/decnet/correlation/attribution_worker.py +++ b/decnet/correlation/attribution_worker.py @@ -30,6 +30,7 @@ from decnet.bus.publish import ( run_control_listener_signal as _run_control_listener_signal, run_health_heartbeat as _run_health_heartbeat, ) +from decnet.correlation.attribution import _thresholds as _T from decnet.correlation.attribution.aggregate import aggregate_observations from decnet.logging import get_logger from decnet.web.db.repository import BaseRepository @@ -55,18 +56,37 @@ async def run_attribution_loop( repo: BaseRepository, *, shutdown: asyncio.Event | None = None, + multi_actor_tick_secs: float | None = None, ) -> None: """Run the attribution worker until cancelled. - *shutdown* is an optional external stop signal; the loop also - exits cleanly on ``CancelledError`` and ``KeyboardInterrupt``. + Three concurrent tasks under one supervisor: + + 1. ``_consume_observations`` — bus subscription on + ``attacker.observation.>``; per-event handler upserts state. + 2. ``_multi_actor_tick`` — periodic walk of ``attribution_state`` + firing ``attribution.profile.multi_actor_suspected`` when an + identity carries ≥ ``MULTI_ACTOR_MIN_PRIMITIVES`` rows in + ``multi_actor`` state. Phase 5. + 3. Health + control standard channels. + + *shutdown* is an optional external stop signal. + *multi_actor_tick_secs* overrides ``_thresholds.MULTI_ACTOR_TICK_SECS`` + (tests use this to drive the correlator without sleeping for a + minute). """ log.info("attribution worker started pattern=%s", _OBSERVATION_PATTERN) bus: BaseBus | None = None sub_task: asyncio.Task | None = None + tick_task: asyncio.Task | None = None heartbeat_task: asyncio.Task | None = None control_task: asyncio.Task | None = None + tick_secs = ( + multi_actor_tick_secs + if multi_actor_tick_secs is not None + else _T.MULTI_ACTOR_TICK_SECS + ) try: candidate = get_bus(client_name=f"{_WORKER_NAME}-correlator") await candidate.connect() @@ -74,6 +94,9 @@ async def run_attribution_loop( sub_task = asyncio.create_task( _consume_observations(bus, repo), ) + tick_task = asyncio.create_task( + _multi_actor_tick_loop(bus, repo, tick_secs), + ) heartbeat_task = asyncio.create_task( _run_health_heartbeat(bus, _WORKER_NAME), ) @@ -94,7 +117,7 @@ async def run_attribution_loop( except (asyncio.CancelledError, KeyboardInterrupt): log.info("attribution worker stopped") finally: - for task in (sub_task, heartbeat_task, control_task): + for task in (sub_task, tick_task, heartbeat_task, control_task): if task is None: continue task.cancel() @@ -275,7 +298,97 @@ def _payload_of(event: Any) -> dict[str, Any]: return payload if isinstance(payload, dict) else {} +async def _multi_actor_tick_loop( + bus: BaseBus, repo: BaseRepository, interval_secs: float, +) -> None: + """Walk ``attribution_state`` every *interval_secs* and emit + ``attribution.profile.multi_actor_suspected`` for any identity + whose multi_actor primitives changed since the last tick. + + Dedupe: in-memory ``last_fired`` map keyed on identity_uuid → + frozenset(primitives). Same primitive set as last fire → no + re-emit. New primitive joining the set → re-emit. Set shrinks + below ``MULTI_ACTOR_MIN_PRIMITIVES`` → drop the entry so it + re-arms. + + In-memory dedup is honest for v0 — restart-resets are + acceptable because the underlying ``attribution_state`` rows + persist; on first tick after restart we re-emit the current + set. v1 may persist a ``multi_actor_suspect_log`` table. + """ + last_fired: dict[str, frozenset[str]] = {} + try: + while True: + try: + await tick_multi_actor(bus, repo, last_fired) + except Exception: # noqa: BLE001 + log.exception("attribution worker: multi_actor tick failed") + await asyncio.sleep(interval_secs) + except asyncio.CancelledError: + raise + + +async def tick_multi_actor( + bus: BaseBus | None, + repo: BaseRepository, + last_fired: dict[str, frozenset[str]], +) -> int: + """One pass of the cross-primitive correlator. Public for tests. + + Returns the number of ``multi_actor_suspected`` events emitted. + """ + candidates = await repo.list_multi_actor_identities() + fired = 0 + seen_now: set[str] = set() + for entry in candidates: + identity_uuid = str(entry["identity_uuid"]) + primitives: list[str] = sorted(entry.get("primitives") or []) + seen_now.add(identity_uuid) + if len(primitives) < _T.MULTI_ACTOR_MIN_PRIMITIVES: + # Repo already filters to >= 2 today; defensive against + # future schema drift. + continue + signature = frozenset(primitives) + if last_fired.get(identity_uuid) == signature: + continue + last_fired[identity_uuid] = signature + if bus is None: + continue + await publish_safely( + bus, + _topics.attribution(_topics.ATTRIBUTION_PROFILE_MULTI_ACTOR_SUSPECTED), + { + "identity_uuid": identity_uuid, + "primitives": primitives, + "evidence_summary": ( + f"{len(primitives)} primitives flagged multi_actor" + ), + "confidence": _T.MULTI_ACTOR_MAX_CONFIDENCE, + "ts": _now(), + }, + event_type=_topics.ATTRIBUTION_PROFILE_MULTI_ACTOR_SUSPECTED, + ) + fired += 1 + log.info( + "attribution worker: multi_actor_suspected identity=%s primitives=%s", + identity_uuid, primitives, + ) + # Rearm: any identity that was in last_fired but no longer in + # candidates dropped below the threshold; remove so the next + # qualifying flap re-fires. + for stale in [k for k in last_fired if k not in seen_now]: + del last_fired[stale] + return fired + + +def _now() -> float: + """Wall-clock seconds. Wrapped so tests can monkeypatch.""" + import time + return time.time() + + __all__ = [ "run_attribution_loop", "handle_observation_event", + "tick_multi_actor", ] diff --git a/tests/correlation/attribution/test_multi_actor_correlator.py b/tests/correlation/attribution/test_multi_actor_correlator.py new file mode 100644 index 00000000..7b3d1c47 --- /dev/null +++ b/tests/correlation/attribution/test_multi_actor_correlator.py @@ -0,0 +1,224 @@ +"""Phase 5 — cross-primitive multi_actor correlator. + +Periodic tick over attribution_state rows; fires +``attribution.profile.multi_actor_suspected`` when ≥ 2 primitives flag +the same identity. Dedup keeps it from spamming on every tick. +""" +from __future__ import annotations + +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import pytest + +from decnet.bus.fake import FakeBus +from decnet.correlation import attribution_worker as _aw +from decnet.web.db.factory import get_repository + + +@pytest.fixture +async def repo(tmp_path: Path): + r = get_repository(db_path=str(tmp_path / "ma.db")) + await r.initialize() + return r + + +async def _seed_identity(repo, ip: str = "10.0.0.42") -> str: + now = datetime.now(timezone.utc) + auid = await repo.upsert_attacker({ + "ip": ip, "first_seen": now, "last_seen": now, + }) + iuid = await repo.ensure_stub_identity_for_attacker(auid) + assert iuid is not None + return iuid + + +async def _set_state( + repo, identity_uuid: str, primitive: str, state: str, +) -> None: + await repo.upsert_attribution_state({ + "identity_uuid": identity_uuid, + "primitive": primitive, + "current_value": "x", + "state": state, + "confidence": 0.55, + "observation_count": 10, + "last_change_ts": 1714000000.0, + "last_observation_ts": 1714000000.0, + }) + + +@pytest.mark.anyio +async def test_no_event_for_single_primitive_multi_actor( + repo, monkeypatch: pytest.MonkeyPatch, +) -> None: + """One primitive flagged multi_actor on its own is too noisy + (flapping primitive, flaky network). The correlator must not + fire.""" + bus = FakeBus(); await bus.connect() + captured: list[dict[str, Any]] = [] + + async def cap(_b, t, p, *, event_type=""): + captured.append({"topic": t, "payload": p}) + monkeypatch.setattr(_aw, "publish_safely", cap) + + iuid = await _seed_identity(repo) + await _set_state(repo, iuid, "motor.input_modality", "multi_actor") + + fired = await _aw.tick_multi_actor(bus, repo, {}) + assert fired == 0 + assert captured == [] + await bus.close() + + +@pytest.mark.anyio +async def test_event_fires_when_two_primitives_co_flag( + repo, monkeypatch: pytest.MonkeyPatch, +) -> None: + bus = FakeBus(); await bus.connect() + captured: list[dict[str, Any]] = [] + + async def cap(_b, t, p, *, event_type=""): + captured.append({"topic": t, "payload": p}) + monkeypatch.setattr(_aw, "publish_safely", cap) + + iuid = await _seed_identity(repo) + await _set_state(repo, iuid, "motor.input_modality", "multi_actor") + await _set_state(repo, iuid, "cognitive.feedback_loop_engagement", "multi_actor") + + fired = await _aw.tick_multi_actor(bus, repo, {}) + assert fired == 1 + assert len(captured) == 1 + payload = captured[0]["payload"] + assert payload["identity_uuid"] == iuid + assert sorted(payload["primitives"]) == [ + "cognitive.feedback_loop_engagement", + "motor.input_modality", + ] + assert payload["confidence"] <= 0.6 + await bus.close() + + +@pytest.mark.anyio +async def test_dedup_no_refire_on_unchanged_primitive_set( + repo, monkeypatch: pytest.MonkeyPatch, +) -> None: + """Same identity, same primitive set across two ticks → fire + once. The correlator must dedup so the SIEM channel doesn't + drown in repeats.""" + bus = FakeBus(); await bus.connect() + captured: list[dict[str, Any]] = [] + + async def cap(_b, t, p, *, event_type=""): + captured.append({"topic": t, "payload": p}) + monkeypatch.setattr(_aw, "publish_safely", cap) + + iuid = await _seed_identity(repo) + await _set_state(repo, iuid, "motor.input_modality", "multi_actor") + await _set_state(repo, iuid, "cognitive.feedback_loop_engagement", "multi_actor") + + last_fired: dict[str, frozenset[str]] = {} + fired1 = await _aw.tick_multi_actor(bus, repo, last_fired) + fired2 = await _aw.tick_multi_actor(bus, repo, last_fired) + fired3 = await _aw.tick_multi_actor(bus, repo, last_fired) + + assert fired1 == 1 + assert fired2 == 0 + assert fired3 == 0 + assert len(captured) == 1 + await bus.close() + + +@pytest.mark.anyio +async def test_refires_when_primitive_set_grows( + repo, monkeypatch: pytest.MonkeyPatch, +) -> None: + """A third primitive joining the multi_actor set is new + information — re-emit so subscribers see the expanded + evidence.""" + bus = FakeBus(); await bus.connect() + captured: list[dict[str, Any]] = [] + + async def cap(_b, t, p, *, event_type=""): + captured.append({"topic": t, "payload": p}) + monkeypatch.setattr(_aw, "publish_safely", cap) + + iuid = await _seed_identity(repo) + await _set_state(repo, iuid, "motor.input_modality", "multi_actor") + await _set_state(repo, iuid, "cognitive.feedback_loop_engagement", "multi_actor") + + last_fired: dict[str, frozenset[str]] = {} + await _aw.tick_multi_actor(bus, repo, last_fired) + assert len(captured) == 1 + + # Add a third primitive. + await _set_state(repo, iuid, "temporal.weekend_cadence", "multi_actor") + await _aw.tick_multi_actor(bus, repo, last_fired) + + assert len(captured) == 2 + # Latest payload carries all three. + assert len(captured[1]["payload"]["primitives"]) == 3 + await bus.close() + + +@pytest.mark.anyio +async def test_rearms_when_primitives_drop_below_threshold( + repo, monkeypatch: pytest.MonkeyPatch, +) -> None: + """If an identity's multi_actor count falls below 2, the + correlator should evict it from the dedup map so a future + re-flap re-fires.""" + bus = FakeBus(); await bus.connect() + captured: list[dict[str, Any]] = [] + + async def cap(_b, t, p, *, event_type=""): + captured.append({"topic": t, "payload": p}) + monkeypatch.setattr(_aw, "publish_safely", cap) + + iuid = await _seed_identity(repo) + await _set_state(repo, iuid, "motor.input_modality", "multi_actor") + await _set_state(repo, iuid, "cognitive.feedback_loop_engagement", "multi_actor") + + last_fired: dict[str, frozenset[str]] = {} + await _aw.tick_multi_actor(bus, repo, last_fired) + assert len(captured) == 1 + assert iuid in last_fired + + # One primitive recovers to stable; identity drops below threshold. + await _set_state(repo, iuid, "motor.input_modality", "stable") + await _aw.tick_multi_actor(bus, repo, last_fired) + assert iuid not in last_fired + + # Re-flap: same primitives flag again. Dedup should NOT block. + await _set_state(repo, iuid, "motor.input_modality", "multi_actor") + await _aw.tick_multi_actor(bus, repo, last_fired) + assert len(captured) == 2 + await bus.close() + + +@pytest.mark.anyio +async def test_independent_dedup_per_identity( + repo, monkeypatch: pytest.MonkeyPatch, +) -> None: + """Two identities, both co-flagged → both fire on the same tick.""" + bus = FakeBus(); await bus.connect() + captured: list[dict[str, Any]] = [] + + async def cap(_b, t, p, *, event_type=""): + captured.append({"topic": t, "payload": p}) + monkeypatch.setattr(_aw, "publish_safely", cap) + + iuid_a = await _seed_identity(repo, ip="10.0.0.1") + iuid_b = await _seed_identity(repo, ip="10.0.0.2") + for iuid in (iuid_a, iuid_b): + await _set_state(repo, iuid, "motor.input_modality", "multi_actor") + await _set_state( + repo, iuid, "cognitive.feedback_loop_engagement", "multi_actor", + ) + + fired = await _aw.tick_multi_actor(bus, repo, {}) + assert fired == 2 + seen = {c["payload"]["identity_uuid"] for c in captured} + assert seen == {iuid_a, iuid_b} + await bus.close()