diff --git a/decnet/bus/topics.py b/decnet/bus/topics.py index 544aedde..c8d83b83 100644 --- a/decnet/bus/topics.py +++ b/decnet/bus/topics.py @@ -29,6 +29,8 @@ Token structure (NATS-style, dot-separated): campaign.unmerged credential.captured credential.reuse.detected + attribution.profile.state_changed + attribution.profile.multi_actor_suspected canary.{token_id}.triggered canary.{token_id}.placed canary.{token_id}.revoked @@ -57,6 +59,7 @@ IDENTITY = "identity" CAMPAIGN = "campaign" SYSTEM = "system" CREDENTIAL = "credential" +ATTRIBUTION = "attribution" ORCHESTRATOR = "orchestrator" CANARY = "canary" SMTP = "smtp" @@ -210,6 +213,42 @@ CAMPAIGN_UNMERGED = "unmerged" CREDENTIAL_CAPTURED = "captured" CREDENTIAL_REUSE_DETECTED = "reuse.detected" +# Attribution-engine event types (second/third tokens under +# ``attribution``). Published by the v0 attribution worker +# (``decnet.correlation.attribution_worker``) which subscribes to +# ``attacker.observation.>`` and runs the per-(identity, primitive) +# state machine. See ``development/ATTRIBUTION-ENGINE.md``. +# +# attribution.profile.state_changed — per-primitive state +# transition (e.g. +# stable → drifting). +# Payload: identity_uuid, +# primitive, old_state, +# new_state, current_value, +# confidence, +# observation_count, ts. +# attribution.profile.multi_actor_suspected — fires when ≥ 2 +# primitives flag the same +# identity as multi_actor +# concurrently. Cross- +# primitive correlator; +# single-primitive +# multi_actor is too noisy +# on its own. Payload: +# identity_uuid, primitives, +# evidence_summary, +# confidence, ts. +# +# These are *derived* signals — distinct from +# ``identity.*`` (clusterer lifecycle, IDENTITY_RESOLUTION.md) and +# ``attacker.observation.*`` (raw extractor envelopes, +# BEHAVE-INTEGRATION.md). The three families compose: observations feed +# the attribution engine, the engine emits derived state, the clusterer +# reads observations + state to form / merge identities. +ATTRIBUTION_PROFILE_PREFIX = "profile" +ATTRIBUTION_PROFILE_STATE_CHANGED = "profile.state_changed" +ATTRIBUTION_PROFILE_MULTI_ACTOR_SUSPECTED = "profile.multi_actor_suspected" + # Canary-token event types (third token under ``canary``). # # canary.{token_id}.placed — orchestrator/API successfully planted a @@ -402,6 +441,20 @@ def attacker_observation(primitive: str) -> str: return f"{ATTACKER}.{ATTACKER_OBSERVATION_PREFIX}.{primitive}" +def attribution(event_type: str) -> str: + """Build ``attribution.``. + + *event_type* is typically one of + :data:`ATTRIBUTION_PROFILE_STATE_CHANGED` or + :data:`ATTRIBUTION_PROFILE_MULTI_ACTOR_SUSPECTED` — both contain a + dot (``profile.state_changed``) which is permitted under the same + "trailing dotted leaf" rule that ``attacker.session.started`` uses. + """ + if not event_type: + raise ValueError("attribution topic requires a non-empty event_type") + return f"{ATTRIBUTION}.{event_type}" + + def campaign(event_type: str) -> str: """Build ``campaign.``. diff --git a/decnet/correlation/attribution/__init__.py b/decnet/correlation/attribution/__init__.py new file mode 100644 index 00000000..bb58774a --- /dev/null +++ b/decnet/correlation/attribution/__init__.py @@ -0,0 +1,21 @@ +"""DECNET attribution engine — v0 aggregation library. + +Pure library: per-(identity, primitive) state machine over BEHAVE-SHELL +observations. No I/O, no bus, no DB. The bus subscriber and DB writes +live in :mod:`decnet.correlation.attribution_worker` so this package +stays trivially testable with synthetic observation lists. + +See ``development/ATTRIBUTION-ENGINE.md`` for the full design and the +explicit bright line: this engine does NOT do persona classification +(HUMAN/LLM/SCRIPTED), does NOT gate access, does NOT attribute to +named persons. It surfaces *behavioural coherence* and *behavioural +drift*, and stops there. +""" +from __future__ import annotations + +from decnet.correlation.attribution.aggregate import ( + AttributionState, + aggregate_observations, +) + +__all__ = ["AttributionState", "aggregate_observations"] diff --git a/decnet/correlation/attribution/_thresholds.py b/decnet/correlation/attribution/_thresholds.py new file mode 100644 index 00000000..76009287 --- /dev/null +++ b/decnet/correlation/attribution/_thresholds.py @@ -0,0 +1,62 @@ +"""Calibration thresholds for the attribution engine — every magic +number lives here, named, with the calibration source cited. + +v0 values are heuristic. Real calibration ships when red-team +exercises produce labelled trace data +(``ATTRIBUTION-ENGINE.md`` §"Out of scope"). Until then these constants +are the engine's only knobs; aggregate.py never embeds a literal. +""" +from __future__ import annotations + +# ── Categorical merger ──────────────────────────────────────────────── +# Last-N window size for the categorical state machine. 5 calibrates +# against typical session counts (most attackers are observed < 10 +# times before they go quiet — ATTRIBUTION-ENGINE.md §"Open question +# 2"). Operators with long-running attackers will want a wider window +# in v1. +CATEGORICAL_WINDOW_N = 5 + +# Minimum observations before the merger emits anything other than +# ``unknown``. Below this floor the state machine has no signal. +MIN_OBSERVATIONS_FOR_STATE = 3 + +# Categorical merger is one-outlier-tolerant: in a window of N=5, the +# state is ``stable`` if at least ``MAJORITY_THRESHOLD`` agree. +CATEGORICAL_MAJORITY_THRESHOLD = 4 + +# ── Numeric merger ──────────────────────────────────────────────────── +# EWMA smoothing factor for numeric primitives. 0.3 weights recent +# observations enough to surface drift quickly without flapping on +# single outliers. +NUMERIC_EWMA_ALPHA = 0.3 + +# Coefficient-of-variation thresholds: dispersion / |mean|. +NUMERIC_STABLE_DISPERSION_PCT = 0.20 # < 20% of mean → stable +NUMERIC_DRIFT_MEAN_SHIFT_PCT = 0.30 # mean moved > 30% → drifting +NUMERIC_CONFLICT_DISPERSION_PCT = 1.0 # > 100% of mean → conflicted + +# ── Hash merger ─────────────────────────────────────────────────────── +# Rotations within HASH_DRIFT_WINDOW count toward state transitions. +# Below DRIFT_MAX → drifting; above → conflicted. The values mirror the +# DEBT-032 fingerprint-rotation calibration — bumped by one because +# the attribution engine takes one rotation as evidence-of-life, not +# yet evidence-of-drift. +HASH_DRIFT_MAX = 2 +HASH_DRIFT_WINDOW_SECS = 24 * 60 * 60 # 24h + +# ── Multi-actor cap ─────────────────────────────────────────────────── +# multi_actor confidence is capped to keep the dashboard honest about +# how noisy this signal is. ATTRIBUTION-ENGINE.md §"Open question 1": +# flapping primitives on flaky networks look like two operators. +MULTI_ACTOR_MAX_CONFIDENCE = 0.6 + +# ── Cross-primitive correlator (Phase 5) ────────────────────────────── +# Minimum number of primitives that must independently flag +# ``multi_actor`` for the same identity before +# ``attribution.profile.multi_actor_suspected`` fires. +MULTI_ACTOR_MIN_PRIMITIVES = 2 + +# Tick interval for the periodic walk in +# :mod:`decnet.correlation.attribution_worker`. Configurable via env +# var in v1; hardcoded in v0. +MULTI_ACTOR_TICK_SECS = 60.0 diff --git a/decnet/correlation/attribution/aggregate.py b/decnet/correlation/attribution/aggregate.py new file mode 100644 index 00000000..70446bc8 --- /dev/null +++ b/decnet/correlation/attribution/aggregate.py @@ -0,0 +1,87 @@ +"""Per-(identity, primitive) state-machine — the attribution engine's +core merge logic. + +Pure: given a list of BEHAVE observations for one +``(identity_uuid, primitive)`` pair, returns the derived state and +mirror metadata. No DB, no bus, no I/O. The worker +(``decnet.correlation.attribution_worker``) is responsible for loading +the observations and writing the state row. + +State vocabulary is frozen at five values (see +``ATTRIBUTION-ENGINE.md``): + +* ``unknown`` — < 3 observations (insufficient signal) +* ``stable`` — recent N agree +* ``drifting`` — recent N stable but disagree with older N +* ``conflicted`` — recent N split +* ``multi_actor`` — conflicted + cross-session alternation pattern + +Phase 2 ships :func:`_aggregate_categorical`. Phase 3 will add +:func:`_aggregate_numeric` and :func:`_aggregate_hash` and the +ValueKind dispatcher. +""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Iterable, Sequence + +__all__ = ["AttributionState", "aggregate_observations"] + + +@dataclass(frozen=True) +class AttributionState: + """Output of the merger for one ``(identity, primitive)`` pair. + + The fields map 1:1 onto :class:`AttributionStateRow` columns — + callers compose the final dict for ``upsert_attribution_state`` + by adding ``identity_uuid`` and ``primitive`` (the merger does not + own the natural key). + """ + + current_value: Any + state: str + confidence: float + observation_count: int + last_observation_ts: float + + +def aggregate_observations( + observations: Sequence[dict[str, Any]], +) -> AttributionState: + """Run the merger over *observations* and return the derived state. + + *observations* is a list of dicts with at minimum ``value``, + ``ts``, and ``confidence`` fields (matching the BEHAVE + ``Observation`` envelope shape that + ``ObservationRow.observations_time_series`` returns). They MUST + arrive ordered by ``ts`` ascending; the merger assumes that. + + Phase 2 only supports categorical values. Phase 3 will dispatch + on the BEHAVE primitive's ``ValueKind`` and pick the right merger. + """ + if not observations: + return AttributionState( + current_value=None, + state="unknown", + confidence=0.0, + observation_count=0, + last_observation_ts=0.0, + ) + # Phase 2 stub — categorical only. Phase 3 will inspect + # ``primitive`` (passed in alongside observations) to pick a + # merger; for now defer to the categorical implementation + # (``_aggregate_categorical``) which Phase 2 lands. + raise NotImplementedError( + "aggregate_observations is implemented in Phase 2 (categorical) " + "and Phase 3 (numeric + hash). v0 Phase 1 ships the substrate " + "only; the worker logs without invoking the merger.", + ) + + +def _coerce_obs_iter( + observations: Iterable[dict[str, Any]], +) -> list[dict[str, Any]]: + """Defensive: accept any iterable, return a list. Used by the + worker which pulls observations off the bus + DB into mixed + iterables.""" + return list(observations) diff --git a/decnet/correlation/attribution_worker.py b/decnet/correlation/attribution_worker.py new file mode 100644 index 00000000..653fbccd --- /dev/null +++ b/decnet/correlation/attribution_worker.py @@ -0,0 +1,178 @@ +"""Attribution-engine bus subscriber — v0 Phase 1 skeleton. + +Subscribes to ``attacker.observation.>`` and, for each event, ensures +the source attacker has a stub identity in ``attacker_identities``. +Phase 1 does **not** invoke the merger or write +``attribution_state`` rows; that wiring lands in Phase 4 once the +Phase 2/3 mergers are in. + +Pattern mirrors :mod:`decnet.correlation.reuse_worker`: bus-subscribe +with a wake event, fall back to poll-only if the bus is unavailable, +publish derived events with :func:`publish_safely`, log per-handler +exceptions and continue. + +Trigger isolation: the per-event handler is wrapped in a single +try/except. Any exception is logged and the loop continues with the +next event. This is the same posture BEHAVE-SHELL's +``_handler.handle_session_ended`` adopts. +""" +from __future__ import annotations + +import asyncio +import contextlib +from typing import Any + +from decnet.bus import topics as _topics +from decnet.bus.base import BaseBus +from decnet.bus.factory import get_bus +from decnet.bus.publish import ( + run_control_listener_signal as _run_control_listener_signal, + run_health_heartbeat as _run_health_heartbeat, +) +from decnet.logging import get_logger +from decnet.web.db.repository import BaseRepository + +log = get_logger("correlation.attribution_worker") + +_WORKER_NAME = "attribution" +_OBSERVATION_PATTERN = f"{_topics.ATTACKER}.{_topics.ATTACKER_OBSERVATION_PREFIX}.>" + + +async def run_attribution_loop( + repo: BaseRepository, + *, + shutdown: asyncio.Event | None = None, +) -> None: + """Run the attribution worker until cancelled. + + *shutdown* is an optional external stop signal; the loop also + exits cleanly on ``CancelledError`` and ``KeyboardInterrupt``. + """ + log.info("attribution worker started pattern=%s", _OBSERVATION_PATTERN) + + bus: BaseBus | None = None + sub_task: asyncio.Task | None = None + heartbeat_task: asyncio.Task | None = None + control_task: asyncio.Task | None = None + try: + candidate = get_bus(client_name=f"{_WORKER_NAME}-correlator") + await candidate.connect() + bus = candidate + sub_task = asyncio.create_task( + _consume_observations(bus, repo), + ) + heartbeat_task = asyncio.create_task( + _run_health_heartbeat(bus, _WORKER_NAME), + ) + control_task = asyncio.create_task( + _run_control_listener_signal(bus, _WORKER_NAME), + ) + except Exception as exc: # noqa: BLE001 + log.warning( + "attribution worker: bus unavailable, idle until bus returns: %s", + exc, + ) + + if shutdown is None: + shutdown = asyncio.Event() + + try: + await shutdown.wait() + except (asyncio.CancelledError, KeyboardInterrupt): + log.info("attribution worker stopped") + finally: + for task in (sub_task, heartbeat_task, control_task): + if task is None: + continue + task.cancel() + with contextlib.suppress(asyncio.CancelledError, Exception): + await task + if bus is not None: + with contextlib.suppress(Exception): + await bus.close() + + +async def _consume_observations( + bus: BaseBus, repo: BaseRepository, +) -> None: + """Pull events off ``attacker.observation.>`` and dispatch each + to :func:`handle_observation_event`. + + Per-event exceptions are caught and logged; the subscription + survives bad payloads. If the subscription itself dies (bus + disconnect), the worker idles — the supervisor systemd unit + will restart on a clean exit. + """ + try: + sub = bus.subscribe(_OBSERVATION_PATTERN) + async with sub: + async for event in sub: + try: + await handle_observation_event(bus, repo, event) + except Exception: # noqa: BLE001 + log.exception("attribution worker: handler failed") + except asyncio.CancelledError: + raise + except Exception as exc: # noqa: BLE001 + log.warning( + "attribution worker: subscriber for %s died (%s)", + _OBSERVATION_PATTERN, exc, + ) + + +async def handle_observation_event( + bus: BaseBus | None, + repo: BaseRepository, + event: Any, +) -> None: + """Handle one ``attacker.observation.`` event. + + Phase 1: ensure the source attacker has a stub identity, then log + and return. Phase 4 will: load prior state, run merger, upsert + new state, emit ``attribution.profile.state_changed`` on + transition. + + *event* is whatever shape :class:`BaseBus`'s subscription yields — + a ``BusEvent`` with ``payload`` (dict) and ``event_type`` (str) + fields. The payload carries the BEHAVE envelope plus DECNET-side + ``attacker_uuid`` denorm (see + ``decnet.profiler.behave_shell._handler._publish_observation``). + """ + payload = _payload_of(event) + attacker_uuid = payload.get("attacker_uuid") + primitive = payload.get("primitive") + if not attacker_uuid or not primitive: + log.debug( + "attribution worker: skipping malformed event (uuid=%r primitive=%r)", + attacker_uuid, primitive, + ) + return + identity_uuid = await repo.ensure_stub_identity_for_attacker( + str(attacker_uuid), + ) + if identity_uuid is None: + log.info( + "attribution worker: no Attacker row for uuid=%s yet; deferring", + attacker_uuid, + ) + return + # Phase 4 will run the merger here and emit + # ``attribution.profile.state_changed`` on transition. Phase 1 + # ends with stub materialisation only. + log.debug( + "attribution worker: stub identity=%s for attacker=%s primitive=%s", + identity_uuid, attacker_uuid, primitive, + ) + + +def _payload_of(event: Any) -> dict[str, Any]: + """Extract the dict payload from a BusEvent or fall through if + *event* is already a dict (test fixtures may pass either).""" + payload = getattr(event, "payload", event) + return payload if isinstance(payload, dict) else {} + + +__all__ = [ + "run_attribution_loop", + "handle_observation_event", +] diff --git a/decnet/web/db/models/__init__.py b/decnet/web/db/models/__init__.py index ea32a0fc..c1c63635 100644 --- a/decnet/web/db/models/__init__.py +++ b/decnet/web/db/models/__init__.py @@ -59,6 +59,9 @@ from .attachments import ( from .observations import ( ObservationRow, ) +from .attribution_state import ( + AttributionStateRow, +) from .campaigns import ( Campaign, CampaignsResponse, @@ -252,6 +255,7 @@ __all__ = [ "AttackerIdentity", "AttackerIntel", "AttackersResponse", + "AttributionStateRow", "ObservationRow", "ObservedAttachment", "SmtpTarget", diff --git a/decnet/web/db/models/attribution_state.py b/decnet/web/db/models/attribution_state.py new file mode 100644 index 00000000..39fc9df7 --- /dev/null +++ b/decnet/web/db/models/attribution_state.py @@ -0,0 +1,78 @@ +"""Per-(identity, primitive) attribution state — v0 of the +attribution engine. + +Materialised view of the state machine in +``decnet.correlation.attribution.aggregate``. Re-derivable from +``observations`` + the DEBT-032 fingerprint-rotation log; this row is +a cache for cheap dashboard reads, not a source of truth. + +Keyed on ``identity_uuid``, not ``attacker_uuid``: pre-clusterer, +every Attacker maps 1:1 to a stub row in ``attacker_identities`` +(``merged_into_uuid = NULL``) so the key is stable across the v0 / v1 +boundary. When v1's clusterer eventually merges identities, the loser +row's state is recomputed from the union of observations under the +winner — no schema change, no column-rename migration. + +This deviates from ``development/ATTRIBUTION-ENGINE.md`` §"Subject of +attribution in v0" (which resolved on ``attacker_uuid``); the doc gets +a deviation note in the same commit that ships this file. +""" +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any + +from sqlalchemy import JSON, Column, Index +from sqlmodel import Field, SQLModel + + +class AttributionStateRow(SQLModel, table=True): + """One state row per (identity, primitive). At most one row per + pair — composite PK enforces it. + """ + + __tablename__ = "attribution_state" + __table_args__ = ( + Index("ix_attribution_state_state", "state"), + Index("ix_attribution_state_last_change", "last_change_ts"), + Index( + "ix_attribution_state_identity_state", + "identity_uuid", "state", + ), + ) + + # ── key ──────────────────────────────────────────────────────────── + identity_uuid: str = Field( + foreign_key="attacker_identities.uuid", primary_key=True, + ) + primitive: str = Field(primary_key=True) + + # ── derived state ────────────────────────────────────────────────── + # Mirrors the BEHAVE Observation ``value`` column shape so the + # frontend can render the merger output the same way it renders raw + # latest-wins values today (BEHAVE-INTEGRATION.md Q3). + current_value: dict[str, Any] | str | int | float | bool | list = Field( + sa_column=Column(JSON, nullable=False), + ) + # 'unknown' | 'stable' | 'drifting' | 'conflicted' | 'multi_actor'. + # Five states, frozen — see ATTRIBUTION-ENGINE.md §"State machine". + state: str + # Engine's confidence in the *state assertion*, not in any verdict + # about the attacker. ``multi_actor`` is capped at 0.6 by + # convention; other states use the merger's per-ValueKind formula. + confidence: float + # How many observations underlie this row. Used by the API to gate + # ``unknown`` (< 3 obs) without re-querying ``observations``. + observation_count: int = Field(default=0) + # When ``state`` last flipped. Equals ``updated_at`` on insert. + last_change_ts: float + # Most recent observation that fed this row. Used by the merger to + # detect drift windows without a full observation re-scan. + last_observation_ts: float + + # ── audit ────────────────────────────────────────────────────────── + # Mirrors AttackerIdentity convention (federation gossip in v2). + schema_version: int = Field(default=1) + updated_at: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc), + ) diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index 8ffc86d0..29b6e4a9 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -1492,3 +1492,86 @@ class BaseRepository(ABC): SQLModel TTP mixin. """ return [] + + # ─── Attribution engine (v0 — aggregation only) ──────────────────── + # See development/ATTRIBUTION-ENGINE.md. The engine consumes + # ``attacker.observation.*`` events and writes per-(identity, + # primitive) state rows. Pre-clusterer, every Attacker maps 1:1 + # to a stub AttackerIdentity row so the keying is stable across + # the v0 / v1 boundary. + + @abstractmethod + async def ensure_stub_identity_for_attacker( + self, attacker_uuid: str, + ) -> Optional[str]: + """Return the ``identity_uuid`` for *attacker_uuid*, creating a + degenerate 1:1 stub in ``attacker_identities`` if the attacker + does not yet have one. + + Returns ``None`` if the attacker row itself is missing (the + worker treats that as "defer" — the profiler tick has not yet + materialised the Attacker; same posture as + ``_handler.handle_session_ended`` in BEHAVE-SHELL). + + Idempotent under concurrent calls: the second caller sees the + first caller's stamp and returns the same uuid. Implementations + are responsible for serialising the read-then-insert against + the bus's at-least-once delivery. + + The third return value (boolean) signalling "newly created" is + deliberately omitted — the worker emits ``identity.formed`` on + a transition observed via the row's absence on its first call, + not via a flag from the repo. Keeps the repo idempotent and + flag-free. + """ + raise NotImplementedError + + @abstractmethod + async def upsert_attribution_state(self, data: dict[str, Any]) -> None: + """Insert or update an :class:`AttributionStateRow` keyed on + ``(identity_uuid, primitive)``. + + ``data`` MUST carry: ``identity_uuid``, ``primitive``, + ``current_value``, ``state``, ``confidence``, + ``observation_count``, ``last_change_ts``, + ``last_observation_ts``. ``schema_version`` defaults to 1. + """ + raise NotImplementedError + + @abstractmethod + async def get_attribution_state_for_identity( + self, identity_uuid: str, + ) -> list[dict[str, Any]]: + """Return every attribution-state row for *identity_uuid*. + + Empty list when the identity has no derived state yet (e.g. + observations have arrived but the engine has not run, or the + engine has not produced ≥ 3 observations per primitive). The + attribution API surface and AttackerDetail badge renderer both + consume this projection. + """ + raise NotImplementedError + + @abstractmethod + async def get_attribution_state( + self, identity_uuid: str, primitive: str, + ) -> Optional[dict[str, Any]]: + """Return one ``(identity_uuid, primitive)`` row, or ``None``. + + Used by the attribution worker on each inbound observation to + load the prior state before running the merger. ``None`` means + "no prior state — initialise from this observation alone". + """ + raise NotImplementedError + + @abstractmethod + async def list_multi_actor_identities( + self, + ) -> list[dict[str, Any]]: + """List ``{identity_uuid, primitives}`` for identities that + currently have ≥ 2 primitives flagged ``multi_actor``. + + Backs the cross-primitive correlator (Phase 5). Empty list when + no identity is co-flagged. + """ + raise NotImplementedError diff --git a/decnet/web/db/sqlmodel_repo/__init__.py b/decnet/web/db/sqlmodel_repo/__init__.py index 6944cd86..9585df36 100644 --- a/decnet/web/db/sqlmodel_repo/__init__.py +++ b/decnet/web/db/sqlmodel_repo/__init__.py @@ -35,6 +35,7 @@ from decnet.web.db.sqlmodel_repo._helpers import ( # noqa: F401 (re-exported f ) from decnet.web.db.sqlmodel_repo.attacker_intel import AttackerIntelMixin from decnet.web.db.sqlmodel_repo.attackers import AttackersMixin +from decnet.web.db.sqlmodel_repo.attribution import AttributionMixin from decnet.web.db.sqlmodel_repo.auth import AuthMixin from decnet.web.db.sqlmodel_repo.bounties import BountiesMixin from decnet.web.db.sqlmodel_repo.campaigns import CampaignsMixin @@ -58,6 +59,7 @@ from decnet.web.db.sqlmodel_repo.webhooks import WebhooksMixin class SQLModelRepository( AttackerIntelMixin, AttackersMixin, + AttributionMixin, AuthMixin, BountiesMixin, CampaignsMixin, diff --git a/decnet/web/db/sqlmodel_repo/attribution.py b/decnet/web/db/sqlmodel_repo/attribution.py new file mode 100644 index 00000000..f89bca9f --- /dev/null +++ b/decnet/web/db/sqlmodel_repo/attribution.py @@ -0,0 +1,215 @@ +"""Repo mixin for the ``attribution_state`` table + identity stub +materialisation. + +Composed onto :class:`SQLModelRepository`. Five public methods, all +serving the v0 attribution engine +(``decnet.correlation.attribution_worker``): + +* :meth:`ensure_stub_identity_for_attacker` — pre-clusterer 1:1 stub + identity creation. Idempotent under concurrent observation bursts. +* :meth:`upsert_attribution_state` — keyed on + ``(identity_uuid, primitive)``. +* :meth:`get_attribution_state` / :meth:`get_attribution_state_for_identity` + — single-row and per-identity reads. +* :meth:`list_multi_actor_identities` — feeds the Phase 5 cross- + primitive correlator. + +See ``development/ATTRIBUTION-ENGINE.md`` for the full design. +""" +from __future__ import annotations + +import uuid as _uuid +from datetime import datetime, timezone +from typing import Any, Optional + +from sqlalchemy import func, select +from sqlmodel import col + +from decnet.web.db.models import ( + Attacker, + AttackerIdentity, + AttributionStateRow, +) +from decnet.web.db.sqlmodel_repo._helpers import _MixinBase + + +class AttributionMixin(_MixinBase): + """Mixin: methods composed onto :class:`SQLModelRepository`.""" + + async def ensure_stub_identity_for_attacker( + self, attacker_uuid: str, + ) -> Optional[str]: + """Return the ``identity_uuid`` for *attacker_uuid*, creating a + degenerate 1:1 stub in ``attacker_identities`` if absent. + + Returns ``None`` when the Attacker row itself is missing — the + attribution worker treats that as "defer" (mirrors the + ``_handler.handle_session_ended`` posture in BEHAVE-SHELL). + + Idempotent: the second caller for the same attacker reads the + ``identity_id`` stamped by the first caller and returns it + without inserting again. Race: two concurrent first-callers + could both see ``identity_id = NULL`` and both create stubs; + the loser's commit would leave a dangling AttackerIdentity row + with no Attacker referencing it. Acceptable in v0 (rare; rows + are tiny; gc'd in v1 when the clusterer runs). The + single-writer attribution worker plus the bus's per-identity + ordering make even that race vanishingly rare in practice. + """ + async with self._session() as session: + attacker_row = ( + await session.execute( + select(Attacker).where(Attacker.uuid == attacker_uuid) + ) + ).scalar_one_or_none() + if attacker_row is None: + return None + if attacker_row.identity_id: + return attacker_row.identity_id + new_uuid = _uuid.uuid4().hex + now = datetime.now(timezone.utc) + session.add( + AttackerIdentity( + uuid=new_uuid, + schema_version=1, + first_seen_at=attacker_row.first_seen, + last_seen_at=attacker_row.last_seen, + created_at=now, + updated_at=now, + observation_count=1, + ) + ) + attacker_row.identity_id = new_uuid + session.add(attacker_row) + await session.commit() + return new_uuid + + async def upsert_attribution_state( + self, data: dict[str, Any], + ) -> None: + """Insert or update one ``(identity_uuid, primitive)`` row. + + ``data`` MUST carry: ``identity_uuid``, ``primitive``, + ``current_value``, ``state``, ``confidence``, + ``observation_count``, ``last_change_ts``, + ``last_observation_ts``. ``schema_version`` and ``updated_at`` + are managed here. + """ + identity_uuid = data["identity_uuid"] + primitive = data["primitive"] + async with self._session() as session: + existing = ( + await session.execute( + select(AttributionStateRow).where( + AttributionStateRow.identity_uuid == identity_uuid, + AttributionStateRow.primitive == primitive, + ) + ) + ).scalar_one_or_none() + now = datetime.now(timezone.utc) + if existing is not None: + for k, v in data.items(): + if k in ("identity_uuid", "primitive"): + continue + setattr(existing, k, v) + existing.updated_at = now + session.add(existing) + else: + session.add( + AttributionStateRow( + **{**data, "schema_version": 1, "updated_at": now} + ) + ) + await session.commit() + + async def get_attribution_state( + self, identity_uuid: str, primitive: str, + ) -> Optional[dict[str, Any]]: + """Single-row lookup. ``None`` when the merger has not yet run + for this ``(identity_uuid, primitive)`` pair.""" + async with self._session() as session: + row = ( + await session.execute( + select(AttributionStateRow).where( + AttributionStateRow.identity_uuid == identity_uuid, + AttributionStateRow.primitive == primitive, + ) + ) + ).scalar_one_or_none() + return None if row is None else row.model_dump(mode="json") + + async def get_attribution_state_for_identity( + self, identity_uuid: str, + ) -> list[dict[str, Any]]: + """All attribution-state rows for one identity, primitive- + ordered for deterministic API output.""" + async with self._session() as session: + rows = ( + await session.execute( + select(AttributionStateRow) + .where(AttributionStateRow.identity_uuid == identity_uuid) + .order_by(AttributionStateRow.primitive) + ) + ).scalars().all() + return [r.model_dump(mode="json") for r in rows] + + async def list_multi_actor_identities( + self, + ) -> list[dict[str, Any]]: + """Identities with ≥ 2 primitives currently in ``multi_actor``. + + Output shape:: + + [{"identity_uuid": "...", "primitives": ["motor.input_modality", + "cognitive.feedback_loop_engagement"]}, + ...] + + Empty list when no identity is co-flagged. Used by the Phase 5 + cross-primitive correlator — single-primitive ``multi_actor`` + is too noisy to alarm on, two independent primitives is the + threshold for ``attribution.profile.multi_actor_suspected``. + """ + async with self._session() as session: + # First pass: identities with ≥ 2 multi_actor rows. + count_stmt = ( + select( + col(AttributionStateRow.identity_uuid), + func.count().label("ct"), + ) + .where(AttributionStateRow.state == "multi_actor") + .group_by(col(AttributionStateRow.identity_uuid)) + .having(func.count() >= 2) + ) + co_flagged = [ + row.identity_uuid + for row in (await session.execute(count_stmt)).all() + ] + if not co_flagged: + return [] + # Second pass: collect the primitive list per co-flagged + # identity. Two queries beat one wide one because the + # first query's count-having filter prunes the second + # query's row set without a self-join. + detail_stmt = ( + select( + col(AttributionStateRow.identity_uuid), + col(AttributionStateRow.primitive), + ) + .where( + AttributionStateRow.state == "multi_actor", + col(AttributionStateRow.identity_uuid).in_(co_flagged), + ) + .order_by( + col(AttributionStateRow.identity_uuid), + col(AttributionStateRow.primitive), + ) + ) + grouped: dict[str, list[str]] = {} + for row in (await session.execute(detail_stmt)).all(): + grouped.setdefault(row.identity_uuid, []).append( + row.primitive, + ) + return [ + {"identity_uuid": iuuid, "primitives": prims} + for iuuid, prims in grouped.items() + ] diff --git a/development/ATTRIBUTION-ENGINE.md b/development/ATTRIBUTION-ENGINE.md index 4902d51c..497c96c5 100644 --- a/development/ATTRIBUTION-ENGINE.md +++ b/development/ATTRIBUTION-ENGINE.md @@ -506,6 +506,17 @@ v0. Five states, no more (resist the urge to grow the enum). - **Subject of attribution in v0.** RESOLVED: `attacker_uuid`, not `identity_uuid`. v1 widens. + - **Deviation (Phase 1 implementation):** the engine actually keys + state rows on `identity_uuid` from day one, materialising a 1:1 + stub `attacker_identities` row per Attacker on first observation. + Rationale: re-keying state rows when the v1 clusterer eventually + merges attackers is exactly the migration debt v0 should not + bake in. With identity-keyed state from the start, the v1 + clusterer becomes a natural rollup operation (merge B's stub + identity into A's identity, recompute the union once on the + merge event) instead of a column-rename. No polymorphic + `subject_uuid` column. ANTI sign-off in conversation; saved as + memory `feedback_attribution_keys_identity`. ## Real open questions diff --git a/tests/bus/test_topics.py b/tests/bus/test_topics.py index 958b473e..db385b35 100644 --- a/tests/bus/test_topics.py +++ b/tests/bus/test_topics.py @@ -87,3 +87,19 @@ def test_identity_builder() -> None: def test_identity_builder_rejects_empty() -> None: with pytest.raises(ValueError): topics.identity("") + + +def test_attribution_builder() -> None: + assert ( + topics.attribution(topics.ATTRIBUTION_PROFILE_STATE_CHANGED) + == "attribution.profile.state_changed" + ) + assert ( + topics.attribution(topics.ATTRIBUTION_PROFILE_MULTI_ACTOR_SUSPECTED) + == "attribution.profile.multi_actor_suspected" + ) + + +def test_attribution_builder_rejects_empty() -> None: + with pytest.raises(ValueError): + topics.attribution("") diff --git a/tests/correlation/attribution/__init__.py b/tests/correlation/attribution/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/correlation/attribution/test_attribution_worker_phase1.py b/tests/correlation/attribution/test_attribution_worker_phase1.py new file mode 100644 index 00000000..2ed2f72e --- /dev/null +++ b/tests/correlation/attribution/test_attribution_worker_phase1.py @@ -0,0 +1,169 @@ +"""Phase 1 attribution worker — wiring smoke tests. + +The Phase 1 worker subscribes to ``attacker.observation.>`` and, for +each event, ensures the source attacker has a stub identity row. +That's it — no merger, no state writes, no derived events. These +tests pin the wiring + the stub-materialisation contract. + +Phase 4 will extend with end-to-end state-row + transition-event +assertions. +""" +from __future__ import annotations + +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import pytest + +from decnet.bus.fake import FakeBus +from decnet.correlation import attribution_worker as _aw +from decnet.web.db.factory import get_repository + + +@pytest.fixture +async def repo(tmp_path: Path): + r = get_repository(db_path=str(tmp_path / "attribution_wiring.db")) + await r.initialize() + return r + + +@pytest.fixture +async def attacker_uuid(repo) -> str: + now = datetime.now(timezone.utc) + return await repo.upsert_attacker({ + "ip": "10.0.0.42", + "first_seen": now, + "last_seen": now, + }) + + +def _make_event(payload: dict[str, Any]) -> Any: + """Light Event-shaped object — the handler reads ``.payload`` + via ``getattr`` and falls back to dicts. We pass a dict directly + because that's what tests give the BEHAVE handler too.""" + return payload + + +@pytest.mark.anyio +async def test_handle_event_creates_stub_for_known_attacker( + repo, attacker_uuid: str, +) -> None: + """First observation for an attacker → stub identity created and + stamped onto the Attacker row.""" + bus = FakeBus() + await bus.connect() + payload = { + "attacker_uuid": attacker_uuid, + "primitive": "motor.input_modality", + "value": "pasted", + "ts": 1714000000.0, + "confidence": 0.9, + } + await _aw.handle_observation_event(bus, repo, _make_event(payload)) + + attacker = await repo.get_attacker_by_uuid(attacker_uuid) + assert attacker is not None + assert attacker["identity_id"] is not None + + # Second event re-uses the same stub. + await _aw.handle_observation_event(bus, repo, _make_event(payload)) + attacker_again = await repo.get_attacker_by_uuid(attacker_uuid) + assert attacker_again["identity_id"] == attacker["identity_id"] + await bus.close() + + +@pytest.mark.anyio +async def test_handle_event_defers_for_missing_attacker(repo) -> None: + """No Attacker row yet → handler returns without raising and + without inserting an orphan identity (the worker treats this as + 'profiler hasn't materialised the attacker, defer').""" + bus = FakeBus() + await bus.connect() + payload = { + "attacker_uuid": "00000000000000000000000000000000", + "primitive": "motor.input_modality", + "value": "pasted", + "ts": 1714000000.0, + "confidence": 0.9, + } + # Should NOT raise. + await _aw.handle_observation_event(bus, repo, _make_event(payload)) + # No identities materialised. + identities = await repo.list_all_identities() + assert identities == [] + await bus.close() + + +@pytest.mark.anyio +async def test_handle_event_skips_malformed_payload( + repo, attacker_uuid: str, +) -> None: + """Missing attacker_uuid or primitive → log + continue, never + raise. Bus delivery is at-least-once; bad payloads must not + poison the consumer.""" + bus = FakeBus() + await bus.connect() + for bad in ( + {"primitive": "motor.input_modality"}, # missing attacker_uuid + {"attacker_uuid": attacker_uuid}, # missing primitive + {}, # both missing + ): + await _aw.handle_observation_event(bus, repo, _make_event(bad)) + + # No identity materialised because every payload was rejected + # before the stub helper ran. + attacker = await repo.get_attacker_by_uuid(attacker_uuid) + assert attacker is not None + assert attacker["identity_id"] is None + await bus.close() + + +@pytest.mark.anyio +async def test_handle_event_idempotent_per_observation( + repo, attacker_uuid: str, +) -> None: + """Hammer the same payload N times — one stub identity, no + duplicate rows, no exception.""" + bus = FakeBus() + await bus.connect() + payload = { + "attacker_uuid": attacker_uuid, + "primitive": "motor.input_modality", + "value": "pasted", + "ts": 1714000000.0, + "confidence": 0.9, + } + for _ in range(5): + await _aw.handle_observation_event(bus, repo, _make_event(payload)) + + identities = await repo.list_all_identities() + assert len(identities) == 1 + await bus.close() + + +@pytest.mark.anyio +async def test_event_object_payload_attribute( + repo, attacker_uuid: str, +) -> None: + """Real bus events carry payload on ``.payload``; the handler + must follow the attribute, not assume the event itself is the + dict.""" + class _Evt: + def __init__(self, payload: dict[str, Any]) -> None: + self.payload = payload + + bus = FakeBus() + await bus.connect() + payload = { + "attacker_uuid": attacker_uuid, + "primitive": "cognitive.feedback_loop_engagement", + "value": "closed_loop", + "ts": 1714000000.0, + "confidence": 0.85, + } + await _aw.handle_observation_event(bus, repo, _Evt(payload)) + attacker = await repo.get_attacker_by_uuid(attacker_uuid) + assert attacker is not None + assert attacker["identity_id"] is not None + await bus.close() diff --git a/tests/db/test_attribution_state.py b/tests/db/test_attribution_state.py new file mode 100644 index 00000000..24578a9b --- /dev/null +++ b/tests/db/test_attribution_state.py @@ -0,0 +1,224 @@ +"""AttributionStateRow + identity-stub repo tests — Phase 1 substrate. + +Mirrors ``tests/db/test_observations.py``: SQLite ``tmp_path`` factory, +``@pytest.mark.anyio`` markers, an ``Attacker`` seeded so the stub- +materialisation path has a valid FK. +""" +from __future__ import annotations + +from datetime import datetime, timezone +from pathlib import Path + +import pytest + +from decnet.web.db.factory import get_repository + + +@pytest.fixture +async def repo(tmp_path: Path): + r = get_repository(db_path=str(tmp_path / "attribution.db")) + await r.initialize() + return r + + +@pytest.fixture +async def attacker_uuid(repo) -> str: + now = datetime.now(timezone.utc) + return await repo.upsert_attacker({ + "ip": "10.0.0.7", + "first_seen": now, + "last_seen": now, + }) + + +@pytest.mark.anyio +async def test_ensure_stub_creates_identity_for_new_attacker( + repo, attacker_uuid: str, +) -> None: + """First call: Attacker has no identity_id → stub created and + stamped onto the Attacker row.""" + identity_uuid = await repo.ensure_stub_identity_for_attacker(attacker_uuid) + assert identity_uuid is not None + assert isinstance(identity_uuid, str) + + attacker = await repo.get_attacker_by_uuid(attacker_uuid) + assert attacker is not None + assert attacker["identity_id"] == identity_uuid + + identity = await repo.get_identity_by_uuid(identity_uuid) + assert identity is not None + assert identity["uuid"] == identity_uuid + assert identity["merged_into_uuid"] is None + assert identity["schema_version"] == 1 + + +@pytest.mark.anyio +async def test_ensure_stub_idempotent(repo, attacker_uuid: str) -> None: + """Second call returns the same identity_uuid; no second insert.""" + first = await repo.ensure_stub_identity_for_attacker(attacker_uuid) + second = await repo.ensure_stub_identity_for_attacker(attacker_uuid) + third = await repo.ensure_stub_identity_for_attacker(attacker_uuid) + assert first == second == third + + +@pytest.mark.anyio +async def test_ensure_stub_returns_none_for_missing_attacker(repo) -> None: + """Worker treats missing-Attacker as 'defer' — repo returns None + without raising or inserting an orphan identity.""" + out = await repo.ensure_stub_identity_for_attacker( + "00000000000000000000000000000000", + ) + assert out is None + + +@pytest.mark.anyio +async def test_upsert_and_read_back_state(repo, attacker_uuid: str) -> None: + """Round-trip: every column on the state row survives one + insert + read.""" + identity_uuid = await repo.ensure_stub_identity_for_attacker(attacker_uuid) + assert identity_uuid is not None + + await repo.upsert_attribution_state({ + "identity_uuid": identity_uuid, + "primitive": "motor.input_modality", + "current_value": "pasted", + "state": "stable", + "confidence": 0.91, + "observation_count": 5, + "last_change_ts": 1714521660.456, + "last_observation_ts": 1714521660.456, + }) + + out = await repo.get_attribution_state( + identity_uuid, "motor.input_modality", + ) + assert out is not None + assert out["state"] == "stable" + assert out["confidence"] == 0.91 + assert out["current_value"] == "pasted" + assert out["observation_count"] == 5 + assert out["last_change_ts"] == 1714521660.456 + + +@pytest.mark.anyio +async def test_upsert_idempotent_on_natural_key( + repo, attacker_uuid: str, +) -> None: + """Same (identity_uuid, primitive) twice → one row, second wins + on mutable fields.""" + identity_uuid = await repo.ensure_stub_identity_for_attacker(attacker_uuid) + assert identity_uuid is not None + + base = { + "identity_uuid": identity_uuid, + "primitive": "motor.input_modality", + "current_value": "typed", + "state": "stable", + "confidence": 0.7, + "observation_count": 3, + "last_change_ts": 1714000000.0, + "last_observation_ts": 1714000000.0, + } + await repo.upsert_attribution_state(base) + await repo.upsert_attribution_state({ + **base, + "current_value": "pasted", + "state": "drifting", + "confidence": 0.85, + "observation_count": 8, + "last_change_ts": 1714000300.0, + "last_observation_ts": 1714000400.0, + }) + + rows = await repo.get_attribution_state_for_identity(identity_uuid) + assert len(rows) == 1 + assert rows[0]["state"] == "drifting" + assert rows[0]["confidence"] == 0.85 + assert rows[0]["current_value"] == "pasted" + + +@pytest.mark.anyio +async def test_get_state_for_identity_orders_by_primitive( + repo, attacker_uuid: str, +) -> None: + """Multiple primitives → one row each, primitive-ordered for + deterministic API output.""" + identity_uuid = await repo.ensure_stub_identity_for_attacker(attacker_uuid) + assert identity_uuid is not None + primitives = [ + "motor.input_modality", + "cognitive.feedback_loop_engagement", + "temporal.weekend_cadence", + ] + for i, p in enumerate(primitives): + await repo.upsert_attribution_state({ + "identity_uuid": identity_uuid, + "primitive": p, + "current_value": "x", + "state": "stable", + "confidence": 0.8, + "observation_count": 5, + "last_change_ts": 1714000000.0 + i, + "last_observation_ts": 1714000000.0 + i, + }) + + rows = await repo.get_attribution_state_for_identity(identity_uuid) + assert [r["primitive"] for r in rows] == sorted(primitives) + + +@pytest.mark.anyio +async def test_list_multi_actor_requires_two_primitives( + repo, attacker_uuid: str, +) -> None: + """Single-primitive multi_actor flag is too noisy. Correlator + only fires on ≥ 2 primitives independently flagging the same + identity.""" + identity_uuid = await repo.ensure_stub_identity_for_attacker(attacker_uuid) + assert identity_uuid is not None + + # One multi_actor row → no co-flag yet. + await repo.upsert_attribution_state({ + "identity_uuid": identity_uuid, + "primitive": "motor.input_modality", + "current_value": "conflicted", + "state": "multi_actor", + "confidence": 0.55, + "observation_count": 10, + "last_change_ts": 1714000000.0, + "last_observation_ts": 1714000000.0, + }) + assert await repo.list_multi_actor_identities() == [] + + # Add a second multi_actor row → identity surfaces with both + # primitives. + await repo.upsert_attribution_state({ + "identity_uuid": identity_uuid, + "primitive": "cognitive.feedback_loop_engagement", + "current_value": "conflicted", + "state": "multi_actor", + "confidence": 0.6, + "observation_count": 8, + "last_change_ts": 1714000100.0, + "last_observation_ts": 1714000100.0, + }) + out = await repo.list_multi_actor_identities() + assert len(out) == 1 + assert out[0]["identity_uuid"] == identity_uuid + assert sorted(out[0]["primitives"]) == [ + "cognitive.feedback_loop_engagement", + "motor.input_modality", + ] + + +@pytest.mark.anyio +async def test_get_state_returns_none_for_unknown_pair( + repo, attacker_uuid: str, +) -> None: + """Worker uses None as 'no prior state, initialise from this + observation' — surface the contract directly.""" + identity_uuid = await repo.ensure_stub_identity_for_attacker(attacker_uuid) + assert identity_uuid is not None + out = await repo.get_attribution_state( + identity_uuid, "motor.input_modality", + ) + assert out is None