From 0972325527e8787d8ed732024c687d9f0bcf9899 Mon Sep 17 00:00:00 2001 From: anti Date: Sun, 3 May 2026 07:25:10 -0400 Subject: [PATCH] feat(web/db): observations table + repo + bus prefix (BEHAVE-INTEGRATION Phase 1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Additive Phase 1 of BEHAVE-INTEGRATION.md. Lays the storage layer the BEHAVE-SHELL extractor (DEBT-050) will write into. Nothing breaks; SessionProfile coexists for now and is dropped in the follow-up commit. decnet/web/db/models/observations.py — new ObservationRow SQLModel mirroring the BEHAVE Observation envelope field-for-field (core/decnet_behave_core/spec/envelope.py). ``id`` is a hex-string UUID (matching BEHAVE), not a typed UUID column. ``identity_ref`` is str | None — written by the future attribution engine, NULL until then. ``attacker_uuid`` is the one DECNET-side denormalisation; FK'd to attackers.uuid for cheap AttackerDetail joins. ``evidence_ref`` is NOT NULL for DECNET emissions even though the upstream envelope makes it optional — the worker's "already profiled?" check keys on it. UniqueConstraint(evidence_ref, primitive) enforces idempotency at the schema level so re-running the extractor on the same shard+sid produces a DB-side conflict the upsert path resolves deterministically. Class is named ``ObservationRow`` (not ``Observation``) to avoid colliding with the BEHAVE Pydantic envelope at sites that import both. decnet/web/db/sqlmodel_repo/observations.py — ObservationsMixin. Three public methods backing the canonical queries from BEHAVE-INTEGRATION.md §"Storage": ``upsert_observation`` (idempotent on the natural key), ``latest_observation_per_primitive`` (per- primitive MAX(ts) subquery, portable across SQLite and MySQL — no DISTINCT ON), ``observations_time_series`` (asc-by-ts). Plus ``has_observations_for_evidence`` for the worker's session-already- profiled check. decnet/bus/topics.py — ATTACKER_OBSERVATION_PREFIX = "observation" constant + ``attacker_observation(primitive)`` builder. Full topic shape ``attacker.observation.`` matches what BEHAVE's spec.event_adapter.event_topic_for produces upstream. Documentation + pattern matching only — bus auth is socket file perms (DEBT-029 §2), not topic-level. decnet/web/db/repository.py — abstract ``upsert_observation``, ``latest_observation_per_primitive``, ``observations_time_series`` on BaseRepository. tests/db/test_observations.py — 11 tests covering upsert round-trip, idempotency under the unique constraint, latest-per-primitive ordering across multiple sessions, time-series asc-ordering, empty- attacker contract, every BEHAVE ValueKind round-tripping through the JSON column, and the has_observations_for_evidence check. tests/db/test_base_repo.py — DummyRepo gains the three new abstract overrides so its coverage suite still instantiates. --- decnet/bus/topics.py | 36 +++ decnet/web/db/models/__init__.py | 4 + decnet/web/db/models/observations.py | 80 +++++ decnet/web/db/repository.py | 38 +++ decnet/web/db/sqlmodel_repo/__init__.py | 2 + decnet/web/db/sqlmodel_repo/observations.py | 187 +++++++++++ tests/db/test_base_repo.py | 7 + tests/db/test_observations.py | 329 ++++++++++++++++++++ 8 files changed, 683 insertions(+) create mode 100644 decnet/web/db/models/observations.py create mode 100644 decnet/web/db/sqlmodel_repo/observations.py create mode 100644 tests/db/test_observations.py diff --git a/decnet/bus/topics.py b/decnet/bus/topics.py index 1f5adb93..544aedde 100644 --- a/decnet/bus/topics.py +++ b/decnet/bus/topics.py @@ -17,6 +17,7 @@ Token structure (NATS-style, dot-separated): attacker.scored attacker.session.started attacker.session.ended + attacker.observation.{primitive} identity.formed identity.observation.linked identity.merged @@ -129,6 +130,19 @@ ATTACKER_SESSION_ENDED = "session.ended" # returned a verdict). Payload carries the aggregate verdict + per- # provider summary so SIEM-bound webhooks don't need to re-query the DB. ATTACKER_INTEL_ENRICHED = "intel.enriched" +# Per-primitive BEHAVE-SHELL observation. Full topic shape: +# attacker.observation. +# e.g. ``attacker.observation.motor.input_modality``. Producer: +# ``decnet/profiler/behave_shell/`` (extractor library called from the +# profiler worker on ``attacker.session.ended``); consumers: dashboard +# SSE relay, attribution engine state machine, federation gossip +# (post-v0). See development/BEHAVE-INTEGRATION.md §"Bus topics" for +# the wire-format contract — the prefix is documentation + pattern +# match only; bus auth is socket file perms (DEBT-029 §2), not +# topic-level. The ``primitive`` segment MAY contain dots +# (``motor.shell_mastery.tab_completion``) — the same dotted-leaf +# rule that ``attacker.session.ended`` uses. +ATTACKER_OBSERVATION_PREFIX = "observation" # Identity-resolution event types (second/third tokens under ``identity``). # Published by the (future) clusterer worker — see @@ -366,6 +380,28 @@ def attacker(event_type: str) -> str: return f"{ATTACKER}.{event_type}" +def attacker_observation(primitive: str) -> str: + """Build ``attacker.observation.``. + + *primitive* is the fully-qualified BEHAVE-SHELL primitive path + (e.g. ``motor.input_modality``, + ``cognitive.feedback_loop_engagement``, + ``motor.shell_mastery.tab_completion``). Dotted primitives are + permitted — this matches the format + ``decnet_behave_shell.spec.event_adapter.event_topic_for`` produces + upstream, and DECNET's bus admits the dotted leaf the same way + :func:`attacker` does for ``session.started``. + + Empty string is rejected so a downstream typo doesn't ship as + ``attacker.observation.``. + """ + if not primitive: + raise ValueError( + "attacker_observation topic requires a non-empty primitive", + ) + return f"{ATTACKER}.{ATTACKER_OBSERVATION_PREFIX}.{primitive}" + + def campaign(event_type: str) -> str: """Build ``campaign.``. diff --git a/decnet/web/db/models/__init__.py b/decnet/web/db/models/__init__.py index f56c50f2..cda33950 100644 --- a/decnet/web/db/models/__init__.py +++ b/decnet/web/db/models/__init__.py @@ -57,6 +57,9 @@ from .attacker_intel import ( from .attachments import ( ObservedAttachment, ) +from .observations import ( + ObservationRow, +) from .campaigns import ( Campaign, CampaignsResponse, @@ -250,6 +253,7 @@ __all__ = [ "AttackerIdentity", "AttackerIntel", "AttackersResponse", + "ObservationRow", "ObservedAttachment", "SessionProfile", "SmtpTarget", diff --git a/decnet/web/db/models/observations.py b/decnet/web/db/models/observations.py new file mode 100644 index 00000000..147c9ef6 --- /dev/null +++ b/decnet/web/db/models/observations.py @@ -0,0 +1,80 @@ +"""BEHAVE-SHELL observation rows — generic table holding every +emitted Observation envelope. + +Mirrors the BEHAVE-SHELL ``Observation`` Pydantic envelope +(``decnet_behave_core.spec.envelope.Observation``) field-for-field, plus +one DECNET-side denormalisation (``attacker_uuid``) for cheap joins. +The class is named ``ObservationRow`` to avoid colliding with the +BEHAVE Pydantic class when both are imported into the same module — +the Pydantic envelope is the wire format; this is the storage row. + +See ``development/BEHAVE-INTEGRATION.md`` §"Storage" for the full +rationale. + +Idempotency is enforced at the schema level by the +``UniqueConstraint(evidence_ref, primitive)`` index — re-running the +extractor on the same shard+sid produces a DB-side conflict that the +repo's upsert path resolves deterministically. ``evidence_ref`` is +NOT NULL for DECNET-emitted observations even though the BEHAVE +envelope makes it ``Optional[str]``: the worker's "have we already +profiled this session?" check keys on it, and the shape +``shard:{decky}/{service}/{date}.jsonl#sid`` is mandatory at the +worker layer. +""" +from __future__ import annotations + +from typing import Any + +from sqlalchemy import JSON, Column, Index, UniqueConstraint +from sqlmodel import Field, SQLModel + + +class ObservationRow(SQLModel, table=True): + """One BEHAVE-SHELL observation persisted to ``observations``. + + Re-derivable from the upstream session shard; this row is a cache + for cheap dashboard reads, not the source of truth (which is the + asciinema shard on disk + the BEHAVE-SHELL extractor). + + Type alignment with BEHAVE: ``id`` is a hex-string UUID (matching + BEHAVE's ``Observation.id: str = Field(default_factory=lambda: + uuid.uuid4().hex)``), not a typed UUID column. ``identity_ref`` + is ``str | None``, ditto. + """ + + __tablename__ = "observations" + __table_args__ = ( + Index( + "ix_observations_attacker_primitive_ts", + "attacker_uuid", "primitive", "ts", + ), + Index("ix_observations_primitive_ts", "primitive", "ts"), + UniqueConstraint( + "evidence_ref", "primitive", + name="uq_observations_evidence_primitive", + ), + ) + + # ── envelope fields (types match BEHAVE exactly) ───────────────────── + id: str = Field(primary_key=True) + identity_ref: str | None = Field(default=None) + primitive: str = Field(index=True) + value: dict[str, Any] | str | int | float | bool | list = Field( + sa_column=Column(JSON, nullable=False), + ) + confidence: float + window_start_ts: float + window_end_ts: float + source: str + evidence_ref: str = Field(nullable=False) + envelope_v: int + ts: float = Field(index=True) + + # ── DECNET-side denormalisation (NOT in BEHAVE envelope) ───────────── + # The envelope identifies the attacker via ``identity_ref`` once + # attribution exists; pre-attribution, observations carry no + # attacker linkage. DECNET resolves the (decky, service, sid, src_ip) + # tuple to ``attacker_uuid`` at write time so AttackerDetail can + # query without joining through the (still-empty) + # ``attacker_identities`` table. + attacker_uuid: str = Field(foreign_key="attackers.uuid", index=True) diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index 737c9e1c..79ed017d 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -313,6 +313,44 @@ class BaseRepository(ABC): """Retrieve the keystroke-dynamics profile row for a session.""" pass + # ─── BEHAVE-SHELL observations ───────────────────────────────────── + # See development/BEHAVE-INTEGRATION.md §"Storage" for the full + # schema rationale. Every observation envelope emitted by the + # BEHAVE-SHELL extractor lands in the ``observations`` table; this + # is the abstract surface the worker calls. + + @abstractmethod + async def upsert_observation(self, data: dict[str, Any]) -> str: + """Insert or update an ``ObservationRow`` keyed on + ``(evidence_ref, primitive)``. + + ``data`` MUST carry the BEHAVE envelope fields (``primitive``, + ``value``, ``confidence``, ``window_start_ts``, + ``window_end_ts``, ``source``, ``evidence_ref``, + ``envelope_v``, ``ts``) plus the DECNET-side ``attacker_uuid`` + denorm. Returns the row ``id``. + """ + raise NotImplementedError + + @abstractmethod + async def latest_observation_per_primitive( + self, attacker_uuid: str, + ) -> dict[str, dict[str, Any]]: + """Return the latest observation per primitive for one attacker. + + Empty dict when the attacker has no observations. Backs the + AttackerDetail "current state" panel. + """ + raise NotImplementedError + + @abstractmethod + async def observations_time_series( + self, attacker_uuid: str, primitive: str, + ) -> list[dict[str, Any]]: + """Every observation of ``primitive`` for ``attacker_uuid``, + ordered by ``ts`` ASC. Empty list when none.""" + raise NotImplementedError + async def upsert_observed_attachment( self, *, diff --git a/decnet/web/db/sqlmodel_repo/__init__.py b/decnet/web/db/sqlmodel_repo/__init__.py index 9e0096ff..cceda4c0 100644 --- a/decnet/web/db/sqlmodel_repo/__init__.py +++ b/decnet/web/db/sqlmodel_repo/__init__.py @@ -44,6 +44,7 @@ from decnet.web.db.sqlmodel_repo.deckies import DeckiesMixin from decnet.web.db.sqlmodel_repo.fleet import FleetMixin from decnet.web.db.sqlmodel_repo.identities import IdentitiesMixin from decnet.web.db.sqlmodel_repo.logs import LogsMixin +from decnet.web.db.sqlmodel_repo.observations import ObservationsMixin from decnet.web.db.sqlmodel_repo.observed_attachments import ObservedAttachmentsMixin from decnet.web.db.sqlmodel_repo.orchestrator import OrchestratorMixin from decnet.web.db.sqlmodel_repo.realism import RealismMixin @@ -66,6 +67,7 @@ class SQLModelRepository( FleetMixin, IdentitiesMixin, LogsMixin, + ObservationsMixin, ObservedAttachmentsMixin, OrchestratorMixin, RealismMixin, diff --git a/decnet/web/db/sqlmodel_repo/observations.py b/decnet/web/db/sqlmodel_repo/observations.py new file mode 100644 index 00000000..dc103e1f --- /dev/null +++ b/decnet/web/db/sqlmodel_repo/observations.py @@ -0,0 +1,187 @@ +"""Repo mixin for the ``observations`` table. + +Composed onto :class:`SQLModelRepository`. Three public methods: + +* :meth:`upsert_observation` — idempotent on + ``(evidence_ref, primitive)``. Caller passes the BEHAVE envelope as + a dict plus the DECNET-side ``attacker_uuid`` denorm. +* :meth:`latest_observation_per_primitive` — backs the AttackerDetail + "current state" panel. Implements the canonical query from + ``BEHAVE-INTEGRATION.md`` §"Storage". +* :meth:`observations_time_series` — every observation of one + primitive for one attacker, ordered ASC by ``ts``. Backs future + drift charts. + +PII discipline is the BEHAVE envelope's job +(``core/decnet_behave_core/spec/envelope.py:3-19``); this mixin does +not validate values — that happens at construction time by the BEHAVE +``Observation`` subclass before the dict reaches us. +""" +from __future__ import annotations + +import uuid as _uuid +from typing import Any, Optional + +from sqlalchemy import desc, func, select +from sqlmodel import col + +from decnet.web.db.models import ObservationRow +from decnet.web.db.sqlmodel_repo._helpers import _MixinBase + + +class ObservationsMixin(_MixinBase): + """Mixin: methods composed onto :class:`SQLModelRepository`.""" + + async def upsert_observation(self, data: dict[str, Any]) -> str: + """Insert or update an observation row keyed on + ``(evidence_ref, primitive)``. + + ``data`` MUST carry every non-default ``ObservationRow`` field: + ``primitive``, ``value``, ``confidence``, ``window_start_ts``, + ``window_end_ts``, ``source``, ``evidence_ref``, ``envelope_v``, + ``ts``, ``attacker_uuid``. ``id`` is generated if absent. + ``identity_ref`` is optional. + + Returns the row's ``id``. Idempotent: a second call with the + same ``(evidence_ref, primitive)`` overwrites the prior row's + mutable fields (value, confidence, ts, etc.) without + violating the unique constraint. + """ + evidence_ref = data["evidence_ref"] + primitive = data["primitive"] + async with self._session() as session: + stmt = select(ObservationRow).where( + ObservationRow.evidence_ref == evidence_ref, + ObservationRow.primitive == primitive, + ) + existing = (await session.execute(stmt)).scalar_one_or_none() + if existing: + # Mutable fields from the new envelope; ``id`` and the + # natural key stay locked to the existing row. + for k, v in data.items(): + if k in ("id", "evidence_ref", "primitive"): + continue + setattr(existing, k, v) + session.add(existing) + row_id = existing.id + else: + row_id = data.get("id") or _uuid.uuid4().hex + row_data = {**data, "id": row_id} + session.add(ObservationRow(**row_data)) + await session.commit() + return row_id + + async def latest_observation_per_primitive( + self, attacker_uuid: str, + ) -> dict[str, dict[str, Any]]: + """Return the most recent observation per primitive for one + attacker. + + Output shape:: + + { + "motor.input_modality": {"value": "pasted", + "confidence": 0.91, + "ts": 1714521660.456, + "source": "..."}, + "cognitive.feedback_loop_engagement": {...}, + ... + } + + Empty dict when the attacker has zero observations. + Implementation uses a per-primitive MAX(ts) subquery; portable + across SQLite + MySQL (no ``DISTINCT ON``). + """ + async with self._session() as session: + # Subquery: per-primitive max(ts) for this attacker. + max_ts_subq = ( + select( + col(ObservationRow.primitive).label("primitive"), + func.max(col(ObservationRow.ts)).label("max_ts"), + ) + .where(ObservationRow.attacker_uuid == attacker_uuid) + .group_by(col(ObservationRow.primitive)) + .subquery() + ) + stmt = ( + select(ObservationRow) + .join( + max_ts_subq, + (ObservationRow.primitive == max_ts_subq.c.primitive) + & (ObservationRow.ts == max_ts_subq.c.max_ts), + ) + .where(ObservationRow.attacker_uuid == attacker_uuid) + .order_by(ObservationRow.primitive) + ) + rows = (await session.execute(stmt)).scalars().all() + return { + row.primitive: { + "value": row.value, + "confidence": row.confidence, + "ts": row.ts, + "source": row.source, + } + for row in rows + } + + async def observations_time_series( + self, attacker_uuid: str, primitive: str, + ) -> list[dict[str, Any]]: + """Return every observation of ``primitive`` for ``attacker_uuid``, + ordered by ``ts`` ASC. + + Empty list when no rows match. + """ + async with self._session() as session: + stmt = ( + select(ObservationRow) + .where( + ObservationRow.attacker_uuid == attacker_uuid, + ObservationRow.primitive == primitive, + ) + .order_by(ObservationRow.ts) + ) + rows = (await session.execute(stmt)).scalars().all() + return [ + { + "ts": row.ts, + "value": row.value, + "confidence": row.confidence, + } + for row in rows + ] + + async def get_observation_by_id( + self, row_id: str, + ) -> Optional[dict[str, Any]]: + """Single ``ObservationRow`` lookup by ``id``. Used by tests + and a future "fetch one event" surface; not exposed on the + public API today.""" + async with self._session() as session: + stmt = select(ObservationRow).where(ObservationRow.id == row_id) + row = (await session.execute(stmt)).scalar_one_or_none() + if not row: + return None + return row.model_dump(mode="json") + + async def has_observations_for_evidence( + self, evidence_ref: str, + ) -> bool: + """True iff any observation row carries this ``evidence_ref``. + + Worker uses this as the "have we already profiled this session?" + check before kicking the extractor — equivalent to "is this + ``(decky, service, sid)`` already in the table?" + """ + async with self._session() as session: + stmt = ( + select(col(ObservationRow.id)) + .where(ObservationRow.evidence_ref == evidence_ref) + .limit(1) + ) + return (await session.execute(stmt)).scalar_one_or_none() is not None + + # Order desc(ts) reserved as the most-recent-first listing if a + # paginated UI surface lands later. Not exposed today; named here + # so a future grep finds the canonical desc-ts pattern. + _LATEST_FIRST = staticmethod(desc) diff --git a/tests/db/test_base_repo.py b/tests/db/test_base_repo.py index ed44c8c5..e9c0f83b 100644 --- a/tests/db/test_base_repo.py +++ b/tests/db/test_base_repo.py @@ -40,6 +40,10 @@ class DummyRepo(BaseRepository): async def get_behaviors_for_ips(self, ips): await super().get_behaviors_for_ips(ips) async def upsert_session_profile(self, sid, data): await super().upsert_session_profile(sid, data) async def get_session_profile(self, sid): await super().get_session_profile(sid) + # BEHAVE-SHELL observations (DEBT-050 / BEHAVE-INTEGRATION.md Phase 1) + async def upsert_observation(self, data): await super().upsert_observation(data); return "" + async def latest_observation_per_primitive(self, attacker_uuid): await super().latest_observation_per_primitive(attacker_uuid); return {} + async def observations_time_series(self, attacker_uuid, primitive): await super().observations_time_series(attacker_uuid, primitive); return [] async def increment_smtp_target(self, u, d): await super().increment_smtp_target(u, d) async def list_smtp_targets(self, u): await super().list_smtp_targets(u) async def get_attacker_stored_mail(self, u): await super().get_attacker_stored_mail(u) @@ -127,6 +131,9 @@ async def test_base_repo_coverage(): await dr.get_behaviors_for_ips({"1.1.1.1"}) await dr.upsert_session_profile("sid", {}) await dr.get_session_profile("sid") + await dr.upsert_observation({}) + await dr.latest_observation_per_primitive("a") + await dr.observations_time_series("a", "motor.input_modality") await dr.increment_smtp_target("uuid", "corp.com") await dr.list_smtp_targets("uuid") await dr.get_attacker_stored_mail("uuid") diff --git a/tests/db/test_observations.py b/tests/db/test_observations.py new file mode 100644 index 00000000..608e305a --- /dev/null +++ b/tests/db/test_observations.py @@ -0,0 +1,329 @@ +"""ObservationRow model + repo tests — upsert idempotency, +latest-per-primitive query, time-series ordering. + +Mirrors the test style of ``tests/db/test_credentials.py``: SQLite +``tmp_path`` factory, ``@pytest.mark.anyio`` markers, an ``Attacker`` +seeded so observations have a valid FK target. +""" +from __future__ import annotations + +import uuid +from datetime import datetime, timezone +from pathlib import Path + +import pytest + +from decnet.web.db.factory import get_repository + + +def _envelope( + *, + primitive: str, + value, + attacker_uuid: str, + evidence_ref: str, + ts: float, + confidence: float = 0.9, + source: str = "decnet/profiler/behave_shell/extract.py", + envelope_v: int = 1, + identity_ref: str | None = None, +) -> dict: + """Construct a minimal valid observation dict for upsert.""" + return { + "id": uuid.uuid4().hex, + "primitive": primitive, + "value": value, + "confidence": confidence, + "window_start_ts": ts, + "window_end_ts": ts, + "source": source, + "evidence_ref": evidence_ref, + "envelope_v": envelope_v, + "ts": ts, + "identity_ref": identity_ref, + "attacker_uuid": attacker_uuid, + } + + +@pytest.fixture +async def repo(tmp_path: Path): + r = get_repository(db_path=str(tmp_path / "observations.db")) + await r.initialize() + return r + + +@pytest.fixture +async def attacker_uuid(repo) -> str: + """One Attacker row to FK observations against.""" + now = datetime.now(timezone.utc) + return await repo.upsert_attacker({ + "ip": "10.0.0.7", + "first_seen": now, + "last_seen": now, + }) + + +@pytest.mark.anyio +async def test_upsert_then_read_back(repo, attacker_uuid: str) -> None: + """Round-trip: every envelope field survives one insert + read.""" + payload = _envelope( + primitive="motor.input_modality", + value="pasted", + attacker_uuid=attacker_uuid, + evidence_ref="shard:decoy01/ssh/2026-05-03.jsonl#sid-A", + ts=1714521660.456, + confidence=0.91, + ) + row_id = await repo.upsert_observation(payload) + assert row_id == payload["id"] + + out = await repo.latest_observation_per_primitive(attacker_uuid) + assert "motor.input_modality" in out + assert out["motor.input_modality"]["value"] == "pasted" + assert out["motor.input_modality"]["confidence"] == 0.91 + assert out["motor.input_modality"]["ts"] == 1714521660.456 + assert ( + out["motor.input_modality"]["source"] + == "decnet/profiler/behave_shell/extract.py" + ) + + +@pytest.mark.anyio +async def test_upsert_idempotent_on_evidence_primitive( + repo, attacker_uuid: str, +) -> None: + """Same (evidence_ref, primitive) twice → one row, second wins on + mutable fields, unique constraint not violated.""" + base = _envelope( + primitive="motor.input_modality", + value="typed", + attacker_uuid=attacker_uuid, + evidence_ref="shard:decoy01/ssh/2026-05-03.jsonl#sid-B", + ts=1714521600.0, + confidence=0.85, + ) + first_id = await repo.upsert_observation(base) + + # Same key, different value + later ts. ``id`` field on the + # incoming envelope should NOT replace the row's stored ``id``. + rerun = { + **base, + "id": uuid.uuid4().hex, # ignored on upsert path + "value": "pasted", + "ts": 1714521700.0, + "confidence": 0.95, + } + second_id = await repo.upsert_observation(rerun) + assert second_id == first_id, "natural-key upsert must not allocate a new row id" + + out = await repo.latest_observation_per_primitive(attacker_uuid) + assert out["motor.input_modality"]["value"] == "pasted" + assert out["motor.input_modality"]["ts"] == 1714521700.0 + assert out["motor.input_modality"]["confidence"] == 0.95 + + +@pytest.mark.anyio +async def test_latest_per_primitive_returns_max_ts_only( + repo, attacker_uuid: str, +) -> None: + """Three observations of the same primitive at increasing ts — + latest-per-primitive returns only the most recent. + + Distinct evidence_refs (one per session) so the unique constraint + does NOT collapse them; this is the "drift over multiple sessions" + case, not the "re-run extractor on same shard" case.""" + times = [1714000000.0, 1714000100.0, 1714000200.0] + values = ["typed", "mixed", "pasted"] + for ts, val in zip(times, values): + await repo.upsert_observation(_envelope( + primitive="motor.input_modality", + value=val, + attacker_uuid=attacker_uuid, + evidence_ref=f"shard:decoy01/ssh/sid-{ts}", + ts=ts, + )) + + out = await repo.latest_observation_per_primitive(attacker_uuid) + assert out["motor.input_modality"]["value"] == "pasted" + assert out["motor.input_modality"]["ts"] == times[-1] + + +@pytest.mark.anyio +async def test_latest_per_primitive_does_not_interleave( + repo, attacker_uuid: str, +) -> None: + """Multiple primitives → one row each in the output; values stay + matched to their primitive.""" + await repo.upsert_observation(_envelope( + primitive="motor.input_modality", + value="pasted", + attacker_uuid=attacker_uuid, + evidence_ref="shard:a#1", + ts=1714000000.0, + )) + await repo.upsert_observation(_envelope( + primitive="cognitive.feedback_loop_engagement", + value="closed_loop", + attacker_uuid=attacker_uuid, + evidence_ref="shard:a#1", + ts=1714000000.0, + )) + await repo.upsert_observation(_envelope( + primitive="cognitive.command_branch_diversity", + value="adaptive_branching", + attacker_uuid=attacker_uuid, + evidence_ref="shard:a#1", + ts=1714000000.0, + )) + + out = await repo.latest_observation_per_primitive(attacker_uuid) + assert set(out.keys()) == { + "motor.input_modality", + "cognitive.feedback_loop_engagement", + "cognitive.command_branch_diversity", + } + assert out["motor.input_modality"]["value"] == "pasted" + assert out["cognitive.feedback_loop_engagement"]["value"] == "closed_loop" + assert ( + out["cognitive.command_branch_diversity"]["value"] + == "adaptive_branching" + ) + + +@pytest.mark.anyio +async def test_time_series_ordered_ascending(repo, attacker_uuid: str) -> None: + """observations_time_series returns every row for one primitive, + ordered by ``ts`` ASC.""" + times = [1714000300.0, 1714000100.0, 1714000200.0, 1714000000.0] + for i, ts in enumerate(times): + await repo.upsert_observation(_envelope( + primitive="motor.paste_burst_rate", + value="habitual", + attacker_uuid=attacker_uuid, + evidence_ref=f"shard:b#{i}", + ts=ts, + confidence=0.5 + 0.1 * i, + )) + + series = await repo.observations_time_series( + attacker_uuid, "motor.paste_burst_rate", + ) + assert [row["ts"] for row in series] == sorted(times) + assert all(row["value"] == "habitual" for row in series) + + +@pytest.mark.anyio +async def test_empty_attacker_returns_empty_dict( + repo, attacker_uuid: str, +) -> None: + """Attacker with no observations → empty dict, not 404.""" + out = await repo.latest_observation_per_primitive(attacker_uuid) + assert out == {} + + +@pytest.mark.anyio +async def test_unknown_attacker_returns_empty_dict(repo) -> None: + """Unseen attacker UUID → empty dict; the contract is "I have no + observations" not "this attacker doesn't exist".""" + out = await repo.latest_observation_per_primitive("00000000-0000-0000-0000-000000000000") + assert out == {} + + +@pytest.mark.anyio +async def test_time_series_empty_when_primitive_absent( + repo, attacker_uuid: str, +) -> None: + """Time-series query for a primitive the attacker never emitted → + empty list.""" + await repo.upsert_observation(_envelope( + primitive="motor.input_modality", + value="typed", + attacker_uuid=attacker_uuid, + evidence_ref="shard:c#1", + ts=1714000000.0, + )) + series = await repo.observations_time_series( + attacker_uuid, "cognitive.feedback_loop_engagement", + ) + assert series == [] + + +@pytest.mark.anyio +async def test_has_observations_for_evidence( + repo, attacker_uuid: str, +) -> None: + """The 'have we already profiled this session?' check: True iff + any row carries the evidence_ref.""" + assert await repo.has_observations_for_evidence("shard:novel#1") is False + + await repo.upsert_observation(_envelope( + primitive="motor.input_modality", + value="pasted", + attacker_uuid=attacker_uuid, + evidence_ref="shard:novel#1", + ts=1714000000.0, + )) + assert await repo.has_observations_for_evidence("shard:novel#1") is True + # Multi-primitive write under the same evidence_ref: still True, + # not duplicated. + await repo.upsert_observation(_envelope( + primitive="cognitive.feedback_loop_engagement", + value="closed_loop", + attacker_uuid=attacker_uuid, + evidence_ref="shard:novel#1", + ts=1714000000.0, + )) + assert await repo.has_observations_for_evidence("shard:novel#1") is True + + +@pytest.mark.anyio +async def test_value_roundtrip_preserves_jsonable_shapes( + repo, attacker_uuid: str, +) -> None: + """The ``value`` column is the union of every BEHAVE primitive's + value kind. Round-trip a categorical string, a numeric, a hash + string, a list, and a dict; all survive the JSON column.""" + cases = [ + ("motor.input_modality", "pasted"), + ("toolchain.c2.beacon_interval_ms", 5000.0), + ("toolchain.tls.jarm_server", "deadbeef" * 8), + ("toolchain.ssh.kex_algorithm_order", ["curve25519", "ecdh-sha2"]), + # Dict value — currently no SHELL primitive uses one but the + # core envelope permits it; keep the contract live. + ("future.dict_primitive", {"a": 1, "b": [2, 3]}), + ] + for i, (primitive, value) in enumerate(cases): + await repo.upsert_observation(_envelope( + primitive=primitive, + value=value, + attacker_uuid=attacker_uuid, + evidence_ref=f"shard:roundtrip#{i}", + ts=1714000000.0 + i, + )) + + out = await repo.latest_observation_per_primitive(attacker_uuid) + for primitive, value in cases: + assert out[primitive]["value"] == value, primitive + + +@pytest.mark.anyio +async def test_idempotent_overwrite_does_not_violate_unique_constraint( + repo, attacker_uuid: str, +) -> None: + """Hammer the same key 5 times — single row, no IntegrityError.""" + for i in range(5): + await repo.upsert_observation(_envelope( + primitive="motor.input_modality", + value="pasted", + attacker_uuid=attacker_uuid, + evidence_ref="shard:hammer#1", + ts=1714000000.0 + i, + confidence=0.5 + 0.05 * i, + )) + + series = await repo.observations_time_series( + attacker_uuid, "motor.input_modality", + ) + assert len(series) == 1, "unique constraint must collapse re-runs" + # Last write wins on mutable fields. + assert series[0]["confidence"] == pytest.approx(0.7)