feat(correlation/attribution): cross-primitive multi-actor detection (Phase 5)

Add tick_multi_actor() — periodic walk of attribution_state firing
attribution.profile.multi_actor_suspected when an identity carries
>= MULTI_ACTOR_MIN_PRIMITIVES rows in multi_actor state.

* Repo's list_multi_actor_identities() already filters to >= 2
  primitives; the correlator just dispatches.
* In-memory dedup keyed on identity_uuid -> frozenset(primitives):
  same set as last fire -> no re-emit. Set grows -> re-emit.
  Set shrinks below threshold -> evict so a future re-flap re-fires.
  Restart-resets are honest because attribution_state persists; a
  v1 multi_actor_suspect_log table can replace this if needed.
* run_attribution_loop() now supervises three concurrent tasks:
  observation handler, multi_actor tick loop, health/control. Tick
  interval comes from _thresholds.MULTI_ACTOR_TICK_SECS (60s) with
  test override.

Tests: 6 scenarios — single-primitive doesn't fire, two-primitive
co-flag fires, dedup blocks unchanged set, set growth re-fires,
threshold drop re-arms, multiple identities fire independently.
This commit is contained in:
2026-05-09 02:18:42 -04:00
parent dd265d7520
commit e2c7e16793
2 changed files with 340 additions and 3 deletions

View File

@@ -30,6 +30,7 @@ from decnet.bus.publish import (
run_control_listener_signal as _run_control_listener_signal,
run_health_heartbeat as _run_health_heartbeat,
)
from decnet.correlation.attribution import _thresholds as _T
from decnet.correlation.attribution.aggregate import aggregate_observations
from decnet.logging import get_logger
from decnet.web.db.repository import BaseRepository
@@ -55,18 +56,37 @@ async def run_attribution_loop(
repo: BaseRepository,
*,
shutdown: asyncio.Event | None = None,
multi_actor_tick_secs: float | None = None,
) -> None:
"""Run the attribution worker until cancelled.
*shutdown* is an optional external stop signal; the loop also
exits cleanly on ``CancelledError`` and ``KeyboardInterrupt``.
Three concurrent tasks under one supervisor:
1. ``_consume_observations`` — bus subscription on
``attacker.observation.>``; per-event handler upserts state.
2. ``_multi_actor_tick`` — periodic walk of ``attribution_state``
firing ``attribution.profile.multi_actor_suspected`` when an
identity carries ≥ ``MULTI_ACTOR_MIN_PRIMITIVES`` rows in
``multi_actor`` state. Phase 5.
3. Health + control standard channels.
*shutdown* is an optional external stop signal.
*multi_actor_tick_secs* overrides ``_thresholds.MULTI_ACTOR_TICK_SECS``
(tests use this to drive the correlator without sleeping for a
minute).
"""
log.info("attribution worker started pattern=%s", _OBSERVATION_PATTERN)
bus: BaseBus | None = None
sub_task: asyncio.Task | None = None
tick_task: asyncio.Task | None = None
heartbeat_task: asyncio.Task | None = None
control_task: asyncio.Task | None = None
tick_secs = (
multi_actor_tick_secs
if multi_actor_tick_secs is not None
else _T.MULTI_ACTOR_TICK_SECS
)
try:
candidate = get_bus(client_name=f"{_WORKER_NAME}-correlator")
await candidate.connect()
@@ -74,6 +94,9 @@ async def run_attribution_loop(
sub_task = asyncio.create_task(
_consume_observations(bus, repo),
)
tick_task = asyncio.create_task(
_multi_actor_tick_loop(bus, repo, tick_secs),
)
heartbeat_task = asyncio.create_task(
_run_health_heartbeat(bus, _WORKER_NAME),
)
@@ -94,7 +117,7 @@ async def run_attribution_loop(
except (asyncio.CancelledError, KeyboardInterrupt):
log.info("attribution worker stopped")
finally:
for task in (sub_task, heartbeat_task, control_task):
for task in (sub_task, tick_task, heartbeat_task, control_task):
if task is None:
continue
task.cancel()
@@ -275,7 +298,97 @@ def _payload_of(event: Any) -> dict[str, Any]:
return payload if isinstance(payload, dict) else {}
async def _multi_actor_tick_loop(
bus: BaseBus, repo: BaseRepository, interval_secs: float,
) -> None:
"""Walk ``attribution_state`` every *interval_secs* and emit
``attribution.profile.multi_actor_suspected`` for any identity
whose multi_actor primitives changed since the last tick.
Dedupe: in-memory ``last_fired`` map keyed on identity_uuid →
frozenset(primitives). Same primitive set as last fire → no
re-emit. New primitive joining the set → re-emit. Set shrinks
below ``MULTI_ACTOR_MIN_PRIMITIVES`` → drop the entry so it
re-arms.
In-memory dedup is honest for v0 — restart-resets are
acceptable because the underlying ``attribution_state`` rows
persist; on first tick after restart we re-emit the current
set. v1 may persist a ``multi_actor_suspect_log`` table.
"""
last_fired: dict[str, frozenset[str]] = {}
try:
while True:
try:
await tick_multi_actor(bus, repo, last_fired)
except Exception: # noqa: BLE001
log.exception("attribution worker: multi_actor tick failed")
await asyncio.sleep(interval_secs)
except asyncio.CancelledError:
raise
async def tick_multi_actor(
bus: BaseBus | None,
repo: BaseRepository,
last_fired: dict[str, frozenset[str]],
) -> int:
"""One pass of the cross-primitive correlator. Public for tests.
Returns the number of ``multi_actor_suspected`` events emitted.
"""
candidates = await repo.list_multi_actor_identities()
fired = 0
seen_now: set[str] = set()
for entry in candidates:
identity_uuid = str(entry["identity_uuid"])
primitives: list[str] = sorted(entry.get("primitives") or [])
seen_now.add(identity_uuid)
if len(primitives) < _T.MULTI_ACTOR_MIN_PRIMITIVES:
# Repo already filters to >= 2 today; defensive against
# future schema drift.
continue
signature = frozenset(primitives)
if last_fired.get(identity_uuid) == signature:
continue
last_fired[identity_uuid] = signature
if bus is None:
continue
await publish_safely(
bus,
_topics.attribution(_topics.ATTRIBUTION_PROFILE_MULTI_ACTOR_SUSPECTED),
{
"identity_uuid": identity_uuid,
"primitives": primitives,
"evidence_summary": (
f"{len(primitives)} primitives flagged multi_actor"
),
"confidence": _T.MULTI_ACTOR_MAX_CONFIDENCE,
"ts": _now(),
},
event_type=_topics.ATTRIBUTION_PROFILE_MULTI_ACTOR_SUSPECTED,
)
fired += 1
log.info(
"attribution worker: multi_actor_suspected identity=%s primitives=%s",
identity_uuid, primitives,
)
# Rearm: any identity that was in last_fired but no longer in
# candidates dropped below the threshold; remove so the next
# qualifying flap re-fires.
for stale in [k for k in last_fired if k not in seen_now]:
del last_fired[stale]
return fired
def _now() -> float:
"""Wall-clock seconds. Wrapped so tests can monkeypatch."""
import time
return time.time()
__all__ = [
"run_attribution_loop",
"handle_observation_event",
"tick_multi_actor",
]

View File

@@ -0,0 +1,224 @@
"""Phase 5 — cross-primitive multi_actor correlator.
Periodic tick over attribution_state rows; fires
``attribution.profile.multi_actor_suspected`` when ≥ 2 primitives flag
the same identity. Dedup keeps it from spamming on every tick.
"""
from __future__ import annotations
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import pytest
from decnet.bus.fake import FakeBus
from decnet.correlation import attribution_worker as _aw
from decnet.web.db.factory import get_repository
@pytest.fixture
async def repo(tmp_path: Path):
r = get_repository(db_path=str(tmp_path / "ma.db"))
await r.initialize()
return r
async def _seed_identity(repo, ip: str = "10.0.0.42") -> str:
now = datetime.now(timezone.utc)
auid = await repo.upsert_attacker({
"ip": ip, "first_seen": now, "last_seen": now,
})
iuid = await repo.ensure_stub_identity_for_attacker(auid)
assert iuid is not None
return iuid
async def _set_state(
repo, identity_uuid: str, primitive: str, state: str,
) -> None:
await repo.upsert_attribution_state({
"identity_uuid": identity_uuid,
"primitive": primitive,
"current_value": "x",
"state": state,
"confidence": 0.55,
"observation_count": 10,
"last_change_ts": 1714000000.0,
"last_observation_ts": 1714000000.0,
})
@pytest.mark.anyio
async def test_no_event_for_single_primitive_multi_actor(
repo, monkeypatch: pytest.MonkeyPatch,
) -> None:
"""One primitive flagged multi_actor on its own is too noisy
(flapping primitive, flaky network). The correlator must not
fire."""
bus = FakeBus(); await bus.connect()
captured: list[dict[str, Any]] = []
async def cap(_b, t, p, *, event_type=""):
captured.append({"topic": t, "payload": p})
monkeypatch.setattr(_aw, "publish_safely", cap)
iuid = await _seed_identity(repo)
await _set_state(repo, iuid, "motor.input_modality", "multi_actor")
fired = await _aw.tick_multi_actor(bus, repo, {})
assert fired == 0
assert captured == []
await bus.close()
@pytest.mark.anyio
async def test_event_fires_when_two_primitives_co_flag(
repo, monkeypatch: pytest.MonkeyPatch,
) -> None:
bus = FakeBus(); await bus.connect()
captured: list[dict[str, Any]] = []
async def cap(_b, t, p, *, event_type=""):
captured.append({"topic": t, "payload": p})
monkeypatch.setattr(_aw, "publish_safely", cap)
iuid = await _seed_identity(repo)
await _set_state(repo, iuid, "motor.input_modality", "multi_actor")
await _set_state(repo, iuid, "cognitive.feedback_loop_engagement", "multi_actor")
fired = await _aw.tick_multi_actor(bus, repo, {})
assert fired == 1
assert len(captured) == 1
payload = captured[0]["payload"]
assert payload["identity_uuid"] == iuid
assert sorted(payload["primitives"]) == [
"cognitive.feedback_loop_engagement",
"motor.input_modality",
]
assert payload["confidence"] <= 0.6
await bus.close()
@pytest.mark.anyio
async def test_dedup_no_refire_on_unchanged_primitive_set(
repo, monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Same identity, same primitive set across two ticks → fire
once. The correlator must dedup so the SIEM channel doesn't
drown in repeats."""
bus = FakeBus(); await bus.connect()
captured: list[dict[str, Any]] = []
async def cap(_b, t, p, *, event_type=""):
captured.append({"topic": t, "payload": p})
monkeypatch.setattr(_aw, "publish_safely", cap)
iuid = await _seed_identity(repo)
await _set_state(repo, iuid, "motor.input_modality", "multi_actor")
await _set_state(repo, iuid, "cognitive.feedback_loop_engagement", "multi_actor")
last_fired: dict[str, frozenset[str]] = {}
fired1 = await _aw.tick_multi_actor(bus, repo, last_fired)
fired2 = await _aw.tick_multi_actor(bus, repo, last_fired)
fired3 = await _aw.tick_multi_actor(bus, repo, last_fired)
assert fired1 == 1
assert fired2 == 0
assert fired3 == 0
assert len(captured) == 1
await bus.close()
@pytest.mark.anyio
async def test_refires_when_primitive_set_grows(
repo, monkeypatch: pytest.MonkeyPatch,
) -> None:
"""A third primitive joining the multi_actor set is new
information — re-emit so subscribers see the expanded
evidence."""
bus = FakeBus(); await bus.connect()
captured: list[dict[str, Any]] = []
async def cap(_b, t, p, *, event_type=""):
captured.append({"topic": t, "payload": p})
monkeypatch.setattr(_aw, "publish_safely", cap)
iuid = await _seed_identity(repo)
await _set_state(repo, iuid, "motor.input_modality", "multi_actor")
await _set_state(repo, iuid, "cognitive.feedback_loop_engagement", "multi_actor")
last_fired: dict[str, frozenset[str]] = {}
await _aw.tick_multi_actor(bus, repo, last_fired)
assert len(captured) == 1
# Add a third primitive.
await _set_state(repo, iuid, "temporal.weekend_cadence", "multi_actor")
await _aw.tick_multi_actor(bus, repo, last_fired)
assert len(captured) == 2
# Latest payload carries all three.
assert len(captured[1]["payload"]["primitives"]) == 3
await bus.close()
@pytest.mark.anyio
async def test_rearms_when_primitives_drop_below_threshold(
repo, monkeypatch: pytest.MonkeyPatch,
) -> None:
"""If an identity's multi_actor count falls below 2, the
correlator should evict it from the dedup map so a future
re-flap re-fires."""
bus = FakeBus(); await bus.connect()
captured: list[dict[str, Any]] = []
async def cap(_b, t, p, *, event_type=""):
captured.append({"topic": t, "payload": p})
monkeypatch.setattr(_aw, "publish_safely", cap)
iuid = await _seed_identity(repo)
await _set_state(repo, iuid, "motor.input_modality", "multi_actor")
await _set_state(repo, iuid, "cognitive.feedback_loop_engagement", "multi_actor")
last_fired: dict[str, frozenset[str]] = {}
await _aw.tick_multi_actor(bus, repo, last_fired)
assert len(captured) == 1
assert iuid in last_fired
# One primitive recovers to stable; identity drops below threshold.
await _set_state(repo, iuid, "motor.input_modality", "stable")
await _aw.tick_multi_actor(bus, repo, last_fired)
assert iuid not in last_fired
# Re-flap: same primitives flag again. Dedup should NOT block.
await _set_state(repo, iuid, "motor.input_modality", "multi_actor")
await _aw.tick_multi_actor(bus, repo, last_fired)
assert len(captured) == 2
await bus.close()
@pytest.mark.anyio
async def test_independent_dedup_per_identity(
repo, monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Two identities, both co-flagged → both fire on the same tick."""
bus = FakeBus(); await bus.connect()
captured: list[dict[str, Any]] = []
async def cap(_b, t, p, *, event_type=""):
captured.append({"topic": t, "payload": p})
monkeypatch.setattr(_aw, "publish_safely", cap)
iuid_a = await _seed_identity(repo, ip="10.0.0.1")
iuid_b = await _seed_identity(repo, ip="10.0.0.2")
for iuid in (iuid_a, iuid_b):
await _set_state(repo, iuid, "motor.input_modality", "multi_actor")
await _set_state(
repo, iuid, "cognitive.feedback_loop_engagement", "multi_actor",
)
fired = await _aw.tick_multi_actor(bus, repo, {})
assert fired == 2
seen = {c["payload"]["identity_uuid"] for c in captured}
assert seen == {iuid_a, iuid_b}
await bus.close()