diff --git a/TODO.md b/TODO.md new file mode 100644 index 00000000..d76332f9 --- /dev/null +++ b/TODO.md @@ -0,0 +1,210 @@ +# TODO — credential reuse + vectorstore (handoff) + +This document hands off in-progress work on the **credential reuse +patterns** task from `development/DEVELOPMENT.md` (under *Service-Level +Behavioral Profiling*) plus the **`decnet/vectorstore/`** scaffolding +that prepares the substrate for a future statistical re-identification +engine over behavioral fingerprints. See +`/home/anti/.claude/plans/ah-excellent-alright-claude-vivid-thimble.md` +for the full approved plan and motivation. + +## Done in the previous session + +Foundation is shipped + tested (26 new tests passing, no regressions): + +- **Schema** — `decnet/web/db/models/logs.py` + - `Credential.attacker_uuid: Optional[str]` FK to `attackers.uuid`, + nullable. Backfilled by the profiler post-write. + - `CredentialReuse` table (UUID PK; JSON list columns for + `attacker_uuids`, `attacker_ips`, `deckies`, `services`; + `target_count`, `attempt_count`, `confidence` reserved for future + fuzzy matching). Unique key: `(secret_sha256, secret_kind, + principal_key)`. + - `CredentialReuseResponse` Pydantic DTO. +- **Repo** — `decnet/web/db/sqlmodel_repo.py` + `repository.py` + - `upsert_credential_reuse(...)`, + `list_credential_reuses(limit, offset, min_target_count, secret_kind)`, + `get_credential_reuse_by_id(id)`, + `update_credential_attacker_uuid(attacker_ip, attacker_uuid) -> int`. + - **Rename**: pre-existing `get_credential_reuse(secret_sha256)` → + `get_credential_attempts_for_secret(secret_sha256)`. All callers + updated. +- **Bus topics** — `decnet/bus/topics.py` + - `CREDENTIAL_CAPTURED = "captured"` (one per Credential upsert). + - `CREDENTIAL_REUSE_DETECTED = "reuse.detected"` (correlator emits + on insert/grow). + - `credential(event_type)` builder. +- **Vectorstore** — `decnet/vectorstore/` (NEW; flat layout mirroring + `decnet/bus/`) + - `base.py` — `BaseVectorStore` ABC, `VectorRecord`, `Neighbor`, + `VECTORSTORE_SCHEMA_VERSION`. Methods: `initialize`, `close`, + `health`, `insert`, `get`, `delete`, `knn`. Keyed by `(kind, id)`. + - `fake.py` — `FakeVectorStore` (in-memory, brute-force L2 KNN) + + `NullVectorStore` (no-op when `DECNET_VECTORSTORE_ENABLED=false`). + - `sqlite_vec.py` — `SqliteVecVectorStore`; lazy-loads the + `sqlite_vec` extension; one `vec0` virtual table per `kind` so + new feature families don't require schema migration. Per-kind + dim is locked on first insert. + - `factory.py` — `get_vectorstore()` env-driven dispatch + (`DECNET_VECTORSTORE_TYPE` ∈ {sqlite_vec, fake}; + `DECNET_VECTORSTORE_ENABLED`; `DECNET_VECTORSTORE_PATH`). On + missing `sqlite_vec` extension: logs a warning and returns + `FakeVectorStore` so workers don't crash. +- **Tests** + - `tests/db/test_credential_reuse.py` — 11 tests (upsert idempotency, + list filters/pagination, FK backfill semantics, null-principal + uniqueness, JSON-list merging). + - `tests/vectorstore/test_factory.py` (6) + + `tests/vectorstore/test_fake.py` (9) — factory dispatch + fallback, + round-trip, dim-mismatch raises, KNN ordering, NullStore no-op. + - Updated `tests/db/test_base_repo.py` and + `tests/db/test_credentials.py` for the rename. + +## Not yet done — what the next agent should pick up + +Tasks below are roughly in dependency order. Backend first, dashboard +last (it's the largest unknown and benefits from a fresh context). + +### 1. Profiler backfill of `Credential.attacker_uuid` + +Smallest task; do this first to validate the FK column end-to-end. + +- File: `decnet/profiler/` — find the spot where the profiler + mints/updates an `Attacker` row from observed events. There's + likely an `upsert_attacker(...)` call that produces the `(ip, uuid)` + pair. +- Add immediately after a successful upsert: + ```python + await repo.update_credential_attacker_uuid(ip, uuid) + ``` +- Test in `tests/profiler/` (whatever the existing test file is) that + after the profiler processes events for an IP, all `Credential` + rows for that IP have their `attacker_uuid` populated. Use the + pattern from `tests/db/test_credential_reuse.py:: + test_update_credential_attacker_uuid_backfills_only_nulls`. + +### 2. Correlator engine + worker wiring + +- File: `decnet/correlation/engine.py` — add + `correlate_credential_reuse(min_targets: int = 2)` to + `CorrelationEngine`. Signature suggested in the plan: + ```sql + SELECT secret_sha256, secret_kind, principal, + COUNT(DISTINCT decky_name||':'||service) AS target_count + FROM credentials + GROUP BY secret_sha256, secret_kind, principal + HAVING target_count >= :min_targets + ``` + For each group, fetch the underlying credential rows and call + `repo.upsert_credential_reuse(...)` per row. The repo upsert + recomputes `target_count` from the `credentials` table on each + update, so you don't need to pass aggregates in. +- On insert/grow (`out["inserted"] is True or out["changed"] is True`), + publish `bus.publish(topics.credential(topics.CREDENTIAL_REUSE_DETECTED), {...})` + with payload `{id, secret_kind, target_count, attacker_uuids, + attacker_ips, deckies, services}`. +- Worker file: `decnet/correlation/main.py` (or wherever + `CorrelationEngine` is loop-driven). Subscribe to: + - `attacker.observed` — re-runs reuse pass for that IP. + - `credential.captured` — re-runs reuse pass for that secret. + - Heartbeat tick every 60s as a fallback (mirror the mutator's + bus-wake + slow-tick pattern). +- Where is `credential.captured` emitted? Find the credential ingest + path — probably `decnet/collector/` or wherever + `repo.upsert_credential(...)` is called. Add a `bus.publish( + topics.credential(topics.CREDENTIAL_CAPTURED), {secret_sha256, + secret_kind, attacker_ip, decky, service})` after a successful + upsert. Bus is fire-and-forget — don't block on it. +- Tests: + - `tests/correlation/test_credential_reuse.py` — engine emits the + right `CredentialReuse` rows from synthetic credentials; bus + event published exactly once per insert/grow. + - Use `decnet.bus.fake.FakeBus` in tests; collect published + events for assertion. + +### 3. API routes — `GET /api/v1/credential-reuse` + +- File: probably `decnet/web/api/routes/` — see how existing + credentials routes are organized (recent commit + `feat(api): GET /credentials endpoint` → `4566146`). +- Endpoints: + - `GET /api/v1/credential-reuse?limit=50&offset=0&min_target_count=2&secret_kind=plaintext` + → `CredentialReuseResponse` (already in models). + - `GET /api/v1/credential-reuse/{id}` → single row dict, 404 if + missing. +- JWT-gated like all other routes. Use the existing dependency. +- No POST/PUT/PATCH — read-only this release. Per the + `feedback_schemathesis_400` memory there's no 400 contract to + document since there's no body parsing. +- Tests: `tests/api/test_credential_reuse_routes.py` — JWT gate, + pagination, filters, 404 for missing id. + +### 4. Dashboard — Credentials Reuse tab + drawer + +The big unknown. Next agent should: + +1. Survey `decnet/web/dashboard/` (React app) — how the existing + Credentials view is structured (commit `4ea4b0b feat(web): + Credentials view + inspector`). +2. Add a "Reuse" tab/filter that lists `CredentialReuse` rows sorted + by `target_count desc`. +3. Drawer on row-click showing decky×service breakdown, + `attacker_uuid` list (link to `/attackers/:id`), timeline. Reuse + the existing drawer pattern (see `feedback_react_stop_propagation_native_delegation` + memory — backdrop click closes via `target===currentTarget`, + never `stopPropagation`). +4. On the existing Credentials list, add a "seen on N targets" + badge when a credential has a corresponding `CredentialReuse` + row, so the connection is bidirectional. + +### 5. DEVELOPMENT.md + +Tick `[x] Credential reuse patterns` under *Service-Level Behavioral +Profiling*. Add a one-liner under *Attacker Intelligence Collection* +noting `decnet/vectorstore/` is scaffolded for the future statistical +re-ID engine (no behavioural change yet). + +## Architectural decisions worth knowing + +These came out of the design conversation that produced the plan; the +next agent should respect them: + +- **Classical statistics, not ML**, for attacker re-identification. + Cosine/Mahalanobis/KS-test over per-kind feature vectors, weighted + voting, versioned thresholds. Reproducible, explainable, no model + drift. ML is reserved for a future *advisory* layer behind the + factory, never primary. +- **Provider factory pattern is mandatory** for any new pluggable + backend (storage, transport, similarity). Mirror `decnet/web/db/` + and `decnet/bus/` — never let workers import concrete backends. +- **`kind` discriminator is the extension point** for new feature + families. Adding `kind="cmd_ngram"` later does not require schema + changes — the `vec_` table is created lazily on first insert. +- **`Credential.attacker_uuid` is nullable on write** by design — the + credential capture path runs before the profiler mints `Attacker`, + so coupling them would create a chicken-and-egg ordering bug. The + profiler backfills. +- **`CredentialReuse.confidence` is always 1.0 today** (exact-secret + match). The column exists so a future fuzzy-credential pass + (`hunter2` ≈ `hunter22`) can write 0.x rows without schema work. + +## Verification checklist for the next agent + +After finishing each chunk: + +- `pytest tests/ --timeout=30 --timeout-method=thread` — must + be green before moving on. +- Don't run fuzz/bench/live/stress in the dev loop (memory: + `feedback_skip_heavy_tests`). +- Don't pre-clear with custom bandit/ruff flags (memory: + `feedback_trust_git_hooks`) — the pre-commit hook is authoritative. +- Commit per task, not batched (memory: `feedback_commit_per_task`). + Don't add Co-Authored-By to commit messages. + +## Open questions to surface to ANTI before tackling §4 + +- Should the dashboard "Reuse" surface live as a tab on the existing + Credentials page, or as a sibling page? (The plan said tab, but + worth confirming once you've seen the code.) +- Pagination size for the reuse list — match the existing Credentials + view default, or use a smaller page since the rows are wider? diff --git a/decnet/bus/topics.py b/decnet/bus/topics.py index 9c4566af..ccd9d7a0 100644 --- a/decnet/bus/topics.py +++ b/decnet/bus/topics.py @@ -14,6 +14,8 @@ Token structure (NATS-style, dot-separated): attacker.scored attacker.session.started attacker.session.ended + credential.captured + credential.reuse.detected system.log system.bus.health system.{worker}.health @@ -32,6 +34,7 @@ TOPOLOGY = "topology" DECKY = "decky" ATTACKER = "attacker" SYSTEM = "system" +CREDENTIAL = "credential" # ─── Leaf event-type constants (the last segment of each topic) ────────────── @@ -75,6 +78,15 @@ ATTACKER_FINGERPRINTED = "fingerprinted" ATTACKER_SESSION_STARTED = "session.started" ATTACKER_SESSION_ENDED = "session.ended" +# Credential event types (second/third tokens under ``credential``). +# ``credential.captured`` fires once per upserted Credential row — the +# correlator listens for it and runs the cred-reuse query in response, +# so reuse detection latency is sub-second after a fresh capture. +# ``credential.reuse.detected`` fires when the correlator inserts a new +# CredentialReuse row or grows an existing one (added decky/service/IP). +CREDENTIAL_CAPTURED = "captured" +CREDENTIAL_REUSE_DETECTED = "reuse.detected" + # System event types. SYSTEM_LOG = "log" SYSTEM_BUS_HEALTH = "bus.health" @@ -143,6 +155,19 @@ def system(event_type: str) -> str: return f"{SYSTEM}.{event_type}" +def credential(event_type: str) -> str: + """Build ``credential.``. + + *event_type* is typically one of :data:`CREDENTIAL_CAPTURED` or + :data:`CREDENTIAL_REUSE_DETECTED`. Dotted leaves + (``reuse.detected``) are permitted — same rationale as + :func:`system`. + """ + if not event_type: + raise ValueError("credential topic requires a non-empty event_type") + return f"{CREDENTIAL}.{event_type}" + + def attacker(event_type: str) -> str: """Build ``attacker.``. diff --git a/decnet/vectorstore/__init__.py b/decnet/vectorstore/__init__.py new file mode 100644 index 00000000..0a1f0f86 --- /dev/null +++ b/decnet/vectorstore/__init__.py @@ -0,0 +1,27 @@ +"""Vector store substrate for behavioral fingerprint similarity search. + +Provider-pluggable storage for ``(kind, id, vector)`` triples used by the +future statistical re-identification engine. ``kind`` discriminates +feature families (``ja3``, ``hassh``, ``keystroke``, ``cmd_ngram``, ...) +so new feature types are additive — no schema migration required when +adding a new extractor. + +Use :func:`get_vectorstore` from :mod:`decnet.vectorstore.factory`; never +import concrete implementations directly. Mirrors the same factory +discipline as :mod:`decnet.bus` and :mod:`decnet.web.db`. +""" +from decnet.vectorstore.base import ( + BaseVectorStore, + Neighbor, + VectorRecord, + VECTORSTORE_SCHEMA_VERSION, +) +from decnet.vectorstore.factory import get_vectorstore + +__all__ = [ + "BaseVectorStore", + "Neighbor", + "VectorRecord", + "VECTORSTORE_SCHEMA_VERSION", + "get_vectorstore", +] diff --git a/decnet/vectorstore/base.py b/decnet/vectorstore/base.py new file mode 100644 index 00000000..ad6d27fa --- /dev/null +++ b/decnet/vectorstore/base.py @@ -0,0 +1,114 @@ +"""Vector-store abstractions: :class:`BaseVectorStore` ABC + record types. + +Every backend (sqlite-vec, in-memory fake, future pgvector / Qdrant) +speaks this contract. The store is keyed by ``(kind, id)`` where: + +* ``kind`` is a short discriminator (``ja3``, ``hassh``, + ``keystroke_dwell``, ``cmd_ngram``, ...) — vectors are only ever + compared **within the same kind**, so adding a new feature family is + a non-event for the store. +* ``id`` is a stable identifier owned by the caller — typically the + ``session_id`` or ``attacker_uuid``. The store does not interpret it. +* ``extractor_version`` is recorded alongside the vector so v1 vs v2 of + the same kind never get cross-compared by accident — a similarity + scorer that respects versioning is the consumer's responsibility, but + the data it needs is here. + +The contract is intentionally minimal (insert/get/knn/delete/health) so +backends with different physical layouts can implement it +straightforwardly. No batch APIs in v1 — sub-millisecond per-vector +overhead at honeypot scales (≤ 100k vectors per kind) makes batching +unnecessary, and the loop-over-singles pattern keeps the contract small. +""" +from __future__ import annotations + +import abc +from dataclasses import dataclass +from typing import Optional, Sequence + +# Bumped when the wire/ABI shape of records changes incompatibly. +# Backends MAY refuse to load older data when this changes, but the +# pre-v1 expectation is to migrate forward in the same release. +VECTORSTORE_SCHEMA_VERSION = 1 + + +@dataclass(frozen=True) +class VectorRecord: + """One stored vector, returned by :meth:`BaseVectorStore.get`.""" + + kind: str + id: str + vector: Sequence[float] + dim: int + extractor_version: int = 1 + + +@dataclass(frozen=True) +class Neighbor: + """One similarity-search hit, returned by :meth:`BaseVectorStore.knn`. + + ``distance`` is whatever the backend's native metric reports — + cosine distance for sqlite-vec's default index, L2 for the in-memory + fake. Smaller is more similar in both cases. Consumers that need + a uniform metric should configure the backend explicitly. + """ + + kind: str + id: str + distance: float + + +class BaseVectorStore(abc.ABC): + """Async interface for a kind-discriminated vector store. + + Implementations MAY be transactional (sqlite) or not (pure + in-memory). All methods are async to match the rest of the DECNET + storage layer; trivial backends can ``await`` no-op coroutines. + """ + + @abc.abstractmethod + async def initialize(self) -> None: + """One-shot setup (open files, load extensions, create tables).""" + + @abc.abstractmethod + async def close(self) -> None: + """Release resources. Idempotent.""" + + @abc.abstractmethod + async def health(self) -> dict: + """Liveness + capability probe. + + Returns a dict like ``{"ok": True, "backend": "sqlite_vec", + "kinds": 4, "vectors": 12_345}``. Used by ``/api/v1/health`` and + diagnostics; never raises — backends that can't determine a + field set it to None. + """ + + @abc.abstractmethod + async def insert( + self, + kind: str, + id: str, + vector: Sequence[float], + *, + extractor_version: int = 1, + ) -> None: + """Insert or replace ``(kind, id)``. Vector dim is fixed per kind + the first time a kind is seen; mismatched dims raise. + """ + + @abc.abstractmethod + async def get(self, kind: str, id: str) -> Optional[VectorRecord]: + """Fetch one record, or None if absent.""" + + @abc.abstractmethod + async def delete(self, kind: str, id: str) -> bool: + """Delete one record. Returns True if a row was removed.""" + + @abc.abstractmethod + async def knn( + self, kind: str, vector: Sequence[float], k: int = 10 + ) -> list[Neighbor]: + """Return up to *k* nearest neighbors of ``vector`` within + ``kind``. Empty list if the kind is unknown or empty. + """ diff --git a/decnet/vectorstore/factory.py b/decnet/vectorstore/factory.py new file mode 100644 index 00000000..a75ee1fb --- /dev/null +++ b/decnet/vectorstore/factory.py @@ -0,0 +1,73 @@ +"""Vectorstore factory — selects a :class:`BaseVectorStore` implementation. + +Dispatch keys: + +* ``DECNET_VECTORSTORE_ENABLED`` — ``"false"`` short-circuits to + :class:`~decnet.vectorstore.fake.NullVectorStore`. Default ``"true"``. +* ``DECNET_VECTORSTORE_TYPE`` — ``"sqlite_vec"`` (default) or + ``"fake"``. +* ``DECNET_VECTORSTORE_PATH`` — sqlite file path. Defaults to + ``/var/lib/decnet/vectors.sqlite`` if writable, else + ``~/.decnet/vectors.sqlite``. + +Mirrors :mod:`decnet.bus.factory` and :mod:`decnet.web.db.factory`: +lazy imports inside each branch, env-driven dispatch, callers MUST go +through :func:`get_vectorstore` rather than instantiating backends. + +If ``sqlite_vec`` is requested but the extension is unavailable on +this host, the factory logs a warning and returns the fake backend +instead — the caller's code path stays valid (``insert`` no-ops, etc.) +without crashing the worker on a missing optional dependency. +""" +from __future__ import annotations + +import logging +import os +from typing import Any + +from decnet.vectorstore.base import BaseVectorStore + +LOG = logging.getLogger(__name__) + + +def get_vectorstore(**kwargs: Any) -> BaseVectorStore: + if os.environ.get("DECNET_VECTORSTORE_ENABLED", "true").lower() == "false": + from decnet.vectorstore.fake import NullVectorStore + return NullVectorStore() + + backend = os.environ.get("DECNET_VECTORSTORE_TYPE", "sqlite_vec").lower() + + if backend == "fake": + from decnet.vectorstore.fake import FakeVectorStore + return FakeVectorStore() + + if backend == "sqlite_vec": + # Probe extension availability up front so the factory can fall + # back cleanly. Construction is cheap, but the extension load + # only happens in initialize(); without this probe the caller + # sees the failure too late to substitute a backend. + try: + import sqlite_vec # noqa: F401 + except ImportError as e: + LOG.warning( + "sqlite_vec not installed (%s); falling back to FakeVectorStore. " + "Install the sqlite-vec package or set " + "DECNET_VECTORSTORE_TYPE=fake to silence this warning.", e, + ) + from decnet.vectorstore.fake import FakeVectorStore + return FakeVectorStore() + from decnet.vectorstore.sqlite_vec import SqliteVecVectorStore + db_path = kwargs.pop("db_path", None) or _default_db_path() + return SqliteVecVectorStore(db_path=db_path) + + raise ValueError(f"Unsupported vectorstore type: {backend}") + + +def _default_db_path() -> str: + explicit = os.environ.get("DECNET_VECTORSTORE_PATH") + if explicit: + return explicit + runtime_dir = "/var/lib/decnet" + if os.path.isdir(runtime_dir) and os.access(runtime_dir, os.W_OK): + return f"{runtime_dir}/vectors.sqlite" + return os.path.expanduser("~/.decnet/vectors.sqlite") diff --git a/decnet/vectorstore/fake.py b/decnet/vectorstore/fake.py new file mode 100644 index 00000000..93c64694 --- /dev/null +++ b/decnet/vectorstore/fake.py @@ -0,0 +1,131 @@ +"""In-memory vector store backend. + +Two flavors: + +* :class:`FakeVectorStore` — a real, working in-memory store. Used by + tests and by dev environments that want similarity search without + any native extension on the box. KNN is brute-force L2 — fine up to + a few thousand vectors per kind. +* :class:`NullVectorStore` — a no-op store returned by the factory + when ``DECNET_VECTORSTORE_ENABLED=false``. Every method succeeds + trivially; ``get`` and ``knn`` return None / [] respectively. Lets + workers run unaffected when the operator hasn't opted into vector + features yet. +""" +from __future__ import annotations + +import math +from typing import Optional, Sequence + +from decnet.vectorstore.base import BaseVectorStore, Neighbor, VectorRecord + + +class FakeVectorStore(BaseVectorStore): + """Pure-python in-memory vector store, brute-force KNN. + + Suitable for tests and small-scale dev (≤ a few thousand vectors + per kind). Not persistent — every process restart drops state. + """ + + def __init__(self) -> None: + # {kind: {id: VectorRecord}} + self._store: dict[str, dict[str, VectorRecord]] = {} + # {kind: dim} — locked the first time a kind is written. + self._dims: dict[str, int] = {} + + async def initialize(self) -> None: + return None + + async def close(self) -> None: + return None + + async def health(self) -> dict: + total = sum(len(by_id) for by_id in self._store.values()) + return { + "ok": True, + "backend": "fake", + "kinds": len(self._store), + "vectors": total, + } + + async def insert( + self, + kind: str, + id: str, + vector: Sequence[float], + *, + extractor_version: int = 1, + ) -> None: + dim = len(vector) + existing_dim = self._dims.get(kind) + if existing_dim is None: + self._dims[kind] = dim + elif existing_dim != dim: + raise ValueError( + f"vector dim mismatch for kind={kind!r}: " + f"expected {existing_dim}, got {dim}" + ) + rec = VectorRecord( + kind=kind, id=id, vector=tuple(float(x) for x in vector), + dim=dim, extractor_version=int(extractor_version), + ) + self._store.setdefault(kind, {})[id] = rec + + async def get(self, kind: str, id: str) -> Optional[VectorRecord]: + return self._store.get(kind, {}).get(id) + + async def delete(self, kind: str, id: str) -> bool: + bucket = self._store.get(kind) + if bucket is None or id not in bucket: + return False + del bucket[id] + return True + + async def knn( + self, kind: str, vector: Sequence[float], k: int = 10 + ) -> list[Neighbor]: + bucket = self._store.get(kind) + if not bucket: + return [] + q = tuple(float(x) for x in vector) + if len(q) != self._dims.get(kind, len(q)): + raise ValueError( + f"query dim {len(q)} != stored dim {self._dims[kind]} " + f"for kind={kind!r}" + ) + scored: list[Neighbor] = [] + for rid, rec in bucket.items(): + d = math.sqrt(sum((a - b) ** 2 for a, b in zip(q, rec.vector))) + scored.append(Neighbor(kind=kind, id=rid, distance=d)) + scored.sort(key=lambda n: n.distance) + return scored[: max(0, int(k))] + + +class NullVectorStore(BaseVectorStore): + """No-op vector store. Returned when vectorstore is disabled.""" + + async def initialize(self) -> None: + return None + + async def close(self) -> None: + return None + + async def health(self) -> dict: + return {"ok": True, "backend": "null", "kinds": 0, "vectors": 0} + + async def insert( + self, kind: str, id: str, vector: Sequence[float], + *, extractor_version: int = 1, + ) -> None: + return None + + async def get(self, kind: str, id: str) -> Optional[VectorRecord]: + return None + + async def delete(self, kind: str, id: str) -> bool: + return False + + async def knn( + self, kind: str, vector: Sequence[float], k: int = 10 + ) -> list[Neighbor]: + return [] diff --git a/decnet/vectorstore/sqlite_vec.py b/decnet/vectorstore/sqlite_vec.py new file mode 100644 index 00000000..b38c414d --- /dev/null +++ b/decnet/vectorstore/sqlite_vec.py @@ -0,0 +1,285 @@ +"""SQLite + sqlite-vec backend. + +Lazy-imports the ``sqlite_vec`` extension. If the extension isn't +available (the package isn't installed, or the host's libsqlite3 is too +old to load loadable extensions), construction raises +:class:`SqliteVecUnavailable`; the factory catches that and falls back +to :class:`~decnet.vectorstore.fake.FakeVectorStore` with a warning. + +Schema: + + CREATE TABLE vectors ( + kind TEXT NOT NULL, + id TEXT NOT NULL, + extractor_version INTEGER NOT NULL DEFAULT 1, + dim INTEGER NOT NULL, + PRIMARY KEY (kind, id) + ); + CREATE VIRTUAL TABLE vec_ USING vec0( + embedding float[] + ); + +A vec0 virtual table is created lazily per-kind on first insert +(distinct ``kind`` values get distinct vec0 tables because vec0's dim +is a schema-time constant). The ``vectors`` row is the source of truth +for metadata (extractor_version, dim) and for the (kind, id) → rowid +mapping; vec0 stores only the embedding, keyed by an INTEGER rowid. +""" +from __future__ import annotations + +import asyncio +import logging +import sqlite3 +import threading +from pathlib import Path +from typing import Optional, Sequence + +from decnet.vectorstore.base import BaseVectorStore, Neighbor, VectorRecord + +LOG = logging.getLogger(__name__) + + +class SqliteVecUnavailable(RuntimeError): + """sqlite_vec couldn't be loaded (extension missing / too-old sqlite3).""" + + +def _load_sqlite_vec(conn: sqlite3.Connection) -> None: + try: + import sqlite_vec # type: ignore[import-untyped] + except ImportError as e: + raise SqliteVecUnavailable("sqlite_vec package not installed") from e + try: + conn.enable_load_extension(True) + except (AttributeError, sqlite3.NotSupportedError) as e: + raise SqliteVecUnavailable( + "system sqlite3 was built without loadable-extension support" + ) from e + try: + sqlite_vec.load(conn) + except sqlite3.OperationalError as e: + raise SqliteVecUnavailable(f"sqlite_vec load failed: {e}") from e + finally: + try: + conn.enable_load_extension(False) + except sqlite3.NotSupportedError: + pass + + +class SqliteVecVectorStore(BaseVectorStore): + """sqlite-vec backed vector store. Single-file, async-friendly via + :func:`asyncio.to_thread`. Keep one instance per process. + """ + + def __init__(self, db_path: str) -> None: + self._db_path = db_path + self._conn: Optional[sqlite3.Connection] = None + self._lock = threading.Lock() + # {kind: dim} cached after first insert/probe. + self._kinds: dict[str, int] = {} + + async def initialize(self) -> None: + await asyncio.to_thread(self._init_sync) + + def _init_sync(self) -> None: + Path(self._db_path).parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(self._db_path, check_same_thread=False) + _load_sqlite_vec(conn) # raises SqliteVecUnavailable on failure + conn.execute("PRAGMA journal_mode=WAL") + conn.execute( + """ + CREATE TABLE IF NOT EXISTS vectors ( + kind TEXT NOT NULL, + id TEXT NOT NULL, + extractor_version INTEGER NOT NULL DEFAULT 1, + dim INTEGER NOT NULL, + rowid_in_vec INTEGER NOT NULL, + PRIMARY KEY (kind, id) + ) + """ + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS ix_vectors_kind ON vectors(kind)" + ) + conn.commit() + # Re-hydrate kind→dim cache from any existing rows so a process + # restart doesn't accept a mismatched dim on the first insert. + for row in conn.execute("SELECT kind, dim FROM vectors GROUP BY kind"): + self._kinds[row[0]] = int(row[1]) + self._conn = conn + + async def close(self) -> None: + await asyncio.to_thread(self._close_sync) + + def _close_sync(self) -> None: + with self._lock: + if self._conn is not None: + self._conn.close() + self._conn = None + + async def health(self) -> dict: + return await asyncio.to_thread(self._health_sync) + + def _health_sync(self) -> dict: + if self._conn is None: + return {"ok": False, "backend": "sqlite_vec", "reason": "not initialized"} + try: + row = self._conn.execute("SELECT COUNT(*) FROM vectors").fetchone() + return { + "ok": True, + "backend": "sqlite_vec", + "kinds": len(self._kinds), + "vectors": int(row[0]) if row else 0, + } + except sqlite3.Error as e: + return {"ok": False, "backend": "sqlite_vec", "reason": str(e)} + + @staticmethod + def _vec_table(kind: str) -> str: + # Validate the kind so it can't break out of the table name. + # Allowed: ascii letters, digits, underscore. Anything else = + # programmer error; raise loudly. + if not kind or not all(c.isalnum() or c == "_" for c in kind): + raise ValueError(f"invalid kind {kind!r}: ascii [a-z0-9_] only") + return f"vec_{kind}" + + def _ensure_kind_table(self, kind: str, dim: int) -> None: + assert self._conn is not None # nosec B101 + existing = self._kinds.get(kind) + if existing is None: + # vec_ identifier is validated by _vec_table() to be + # ascii [a-z0-9_] only, and dim is int-cast — no injection + # vector. The f-string is the only way to interpolate a + # virtual-table name; placeholders aren't allowed for DDL. + ddl = ( # nosec B608 + f"CREATE VIRTUAL TABLE IF NOT EXISTS {self._vec_table(kind)} " + f"USING vec0(embedding float[{int(dim)}])" + ) + self._conn.execute(ddl) + self._conn.commit() + self._kinds[kind] = dim + elif existing != dim: + raise ValueError( + f"vector dim mismatch for kind={kind!r}: " + f"expected {existing}, got {dim}" + ) + + async def insert( + self, kind: str, id: str, vector: Sequence[float], + *, extractor_version: int = 1, + ) -> None: + await asyncio.to_thread( + self._insert_sync, kind, id, list(vector), int(extractor_version) + ) + + def _insert_sync( + self, kind: str, id: str, vector: list[float], extractor_version: int, + ) -> None: + with self._lock: + assert self._conn is not None # nosec B101 + dim = len(vector) + self._ensure_kind_table(kind, dim) + vec_table = self._vec_table(kind) + cur = self._conn.cursor() + existing = cur.execute( + "SELECT rowid_in_vec FROM vectors WHERE kind=? AND id=?", + (kind, id), + ).fetchone() + if existing is not None: + rowid = int(existing[0]) + # vec_table is validated; rowid is bound. Safe. + cur.execute(f"DELETE FROM {vec_table} WHERE rowid=?", (rowid,)) # nosec B608 + import struct + blob = struct.pack(f"{dim}f", *vector) + cur.execute(f"INSERT INTO {vec_table}(embedding) VALUES (?)", (blob,)) # nosec B608 + new_rowid = cur.lastrowid + cur.execute( + "INSERT OR REPLACE INTO vectors" + "(kind, id, extractor_version, dim, rowid_in_vec) " + "VALUES (?, ?, ?, ?, ?)", + (kind, id, extractor_version, dim, new_rowid), + ) + self._conn.commit() + + async def get(self, kind: str, id: str) -> Optional[VectorRecord]: + return await asyncio.to_thread(self._get_sync, kind, id) + + def _get_sync(self, kind: str, id: str) -> Optional[VectorRecord]: + with self._lock: + assert self._conn is not None # nosec B101 + row = self._conn.execute( + "SELECT extractor_version, dim, rowid_in_vec " + "FROM vectors WHERE kind=? AND id=?", + (kind, id), + ).fetchone() + if row is None: + return None + ext_v, dim, rowid = int(row[0]), int(row[1]), int(row[2]) + vec_table = self._vec_table(kind) + blob_row = self._conn.execute(f"SELECT embedding FROM {vec_table} WHERE rowid=?", (rowid,)).fetchone() # nosec B608 + if blob_row is None: + return None + import struct + vec = list(struct.unpack(f"{dim}f", blob_row[0])) + return VectorRecord( + kind=kind, id=id, vector=vec, dim=dim, + extractor_version=ext_v, + ) + + async def delete(self, kind: str, id: str) -> bool: + return await asyncio.to_thread(self._delete_sync, kind, id) + + def _delete_sync(self, kind: str, id: str) -> bool: + with self._lock: + assert self._conn is not None # nosec B101 + row = self._conn.execute( + "SELECT rowid_in_vec FROM vectors WHERE kind=? AND id=?", + (kind, id), + ).fetchone() + if row is None: + return False + rowid = int(row[0]) + vec_table = self._vec_table(kind) + self._conn.execute(f"DELETE FROM {vec_table} WHERE rowid=?", (rowid,)) # nosec B608 + self._conn.execute( + "DELETE FROM vectors WHERE kind=? AND id=?", (kind, id) + ) + self._conn.commit() + return True + + async def knn( + self, kind: str, vector: Sequence[float], k: int = 10, + ) -> list[Neighbor]: + return await asyncio.to_thread(self._knn_sync, kind, list(vector), int(k)) + + def _knn_sync(self, kind: str, vector: list[float], k: int) -> list[Neighbor]: + with self._lock: + assert self._conn is not None # nosec B101 + existing_dim = self._kinds.get(kind) + if existing_dim is None: + return [] + if len(vector) != existing_dim: + raise ValueError( + f"query dim {len(vector)} != stored dim {existing_dim} " + f"for kind={kind!r}" + ) + vec_table = self._vec_table(kind) + import struct + qblob = struct.pack(f"{existing_dim}f", *vector) + knn_sql = f"SELECT rowid, distance FROM {vec_table} WHERE embedding MATCH ? ORDER BY distance LIMIT ?" # nosec B608 + rows = self._conn.execute(knn_sql, (qblob, max(0, k))).fetchall() + if not rows: + return [] + id_map = { + int(r[0]): r[1] + for r in self._conn.execute( + "SELECT rowid_in_vec, id FROM vectors WHERE kind=?", + (kind,), + ) + } + out: list[Neighbor] = [] + for rowid, dist in rows: + rid = id_map.get(int(rowid)) + if rid is None: + continue + out.append(Neighbor(kind=kind, id=rid, distance=float(dist))) + return out diff --git a/decnet/web/db/models/__init__.py b/decnet/web/db/models/__init__.py index 6de83d61..c55f99ac 100644 --- a/decnet/web/db/models/__init__.py +++ b/decnet/web/db/models/__init__.py @@ -49,6 +49,8 @@ from .logs import ( Bounty, BountyResponse, Credential, + CredentialReuse, + CredentialReuseResponse, CredentialsResponse, Log, LogsResponse, @@ -170,6 +172,8 @@ __all__ = [ "Bounty", "BountyResponse", "Credential", + "CredentialReuse", + "CredentialReuseResponse", "CredentialsResponse", "Log", "LogsResponse", diff --git a/decnet/web/db/models/logs.py b/decnet/web/db/models/logs.py index af5ce93d..779819cd 100644 --- a/decnet/web/db/models/logs.py +++ b/decnet/web/db/models/logs.py @@ -3,7 +3,7 @@ from datetime import datetime, timezone from typing import Any, List, Optional from pydantic import BaseModel -from sqlalchemy import Column, Index, Text +from sqlalchemy import Column, Index, Text, UniqueConstraint from sqlmodel import Field, SQLModel from ._base import _BIG_TEXT @@ -54,9 +54,13 @@ class Credential(SQLModel, table=True): LDAP. Nullable for principal-less mechanisms (Redis AUTH, bearer tokens). Fully service-specific keys ride in ``fields`` JSON. - Dedup contract: same (attacker_uuid, decky, service, secret_sha256, + Dedup contract: same (attacker_ip, decky, service, secret_sha256, principal_or_empty) tuple → upsert, bumps ``attempt_count`` and ``last_seen``. Different secret or different principal → new row. + + ``attacker_uuid`` is backfilled by the profiler once an Attacker row + has been minted for the source IP. It is nullable on first write so + the credential ingest path stays decoupled from the profiler. """ __tablename__ = "credentials" __table_args__ = ( @@ -64,11 +68,15 @@ class Credential(SQLModel, table=True): Index("ix_credentials_principal_service", "principal", "service"), ) id: Optional[int] = Field(default=None, primary_key=True) - # Keyed by attacker IP (not attackers.uuid) to match Bounty's pattern - # and avoid the chicken-and-egg of writing a credential row before - # the profiler has minted the Attacker. Index covers the join path - # cred_reuse → Attacker.ip. + # Keyed by attacker IP (not attackers.uuid) on the write path to + # avoid the chicken-and-egg of landing a credential before the + # profiler has minted the Attacker. The profiler backfills + # ``attacker_uuid`` once it knows the IP, so cross-IP reuse queries + # eventually have an indexed FK to traverse. attacker_ip: str = Field(index=True) + attacker_uuid: Optional[str] = Field( + default=None, foreign_key="attackers.uuid", index=True + ) decky_name: str = Field(index=True) service: str = Field(index=True) principal: Optional[str] = Field(default=None, index=True, max_length=256) @@ -107,6 +115,77 @@ class Credential(SQLModel, table=True): attempt_count: int = Field(default=1) +class CredentialReuse(SQLModel, table=True): + """One observed credential reuse pattern across deckies and/or services. + + A row here is a *finding* produced by the correlator: the same + ``(secret_sha256, secret_kind, principal)`` tuple was observed + against ``target_count`` distinct decky×service pairs. Upserted on + that natural key — the row accumulates new deckies/services/IPs + over time as the credential is reused. + + The ``confidence`` column is reserved for a future fuzzy-match pass + (credential variants, e.g. ``hunter2`` vs ``hunter22``); rows + written by the exact-secret correlator are always 1.0. + """ + __tablename__ = "credential_reuse" + __table_args__ = ( + UniqueConstraint( + "secret_sha256", "secret_kind", "principal_key", + name="uq_credential_reuse_secret_principal", + ), + ) + id: str = Field(primary_key=True, max_length=36) + secret_sha256: str = Field(index=True, max_length=64) + secret_kind: str = Field(index=True, max_length=32) + # Optional human-readable principal (e.g. "root"). Nullable — for + # cross-principal reuse rows we leave this null, but we still need + # a unique constraint, so ``principal_key`` is the non-null + # canonicalised form ("" when principal is null) used in the + # uniqueness tuple. SQLite's NULLs-distinct-in-UNIQUE behaviour + # would otherwise let duplicate null-principal rows through. + principal: Optional[str] = Field(default=None, max_length=256) + principal_key: str = Field(default="", max_length=256) + attacker_uuids: str = Field( + default="[]", + sa_column=Column("attacker_uuids", _BIG_TEXT, nullable=False, default="[]"), + ) # JSON list[str] + attacker_ips: str = Field( + default="[]", + sa_column=Column("attacker_ips", _BIG_TEXT, nullable=False, default="[]"), + ) # JSON list[str] + deckies: str = Field( + default="[]", + sa_column=Column("deckies", _BIG_TEXT, nullable=False, default="[]"), + ) # JSON list[str] + services: str = Field( + default="[]", + sa_column=Column("services", _BIG_TEXT, nullable=False, default="[]"), + ) # JSON list[str] + # COUNT(DISTINCT decky||':'||service). The discriminative scalar + # for ranking and filtering — a credential seen on 12 targets is + # far more interesting than one seen on 2. + target_count: int = Field(default=0, index=True) + attempt_count: int = Field(default=0) + confidence: float = Field(default=1.0) + first_seen: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc), index=True + ) + last_seen: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc), index=True + ) + updated_at: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc), index=True + ) + + +class CredentialReuseResponse(BaseModel): + total: int + limit: int + offset: int + data: List[dict[str, Any]] + + class State(SQLModel, table=True): __tablename__ = "state" key: str = Field(primary_key=True) diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index 22d0ea02..fcb0c030 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -153,12 +153,59 @@ class BaseRepository(ABC): pass @abstractmethod - async def get_credential_reuse( + async def get_credential_attempts_for_secret( self, secret_sha256: str ) -> list[dict[str, Any]]: """Every (attacker, decky, service, principal) row sharing this secret hash.""" pass + @abstractmethod + async def upsert_credential_reuse( + self, + *, + secret_sha256: str, + secret_kind: str, + principal: Optional[str], + attacker_uuid: Optional[str], + attacker_ip: str, + decky: str, + service: str, + attempt_count: int, + ts: Optional[Any] = None, + ) -> Optional[dict[str, Any]]: + """Upsert one credential-reuse finding. Returns the row dict (with + ``inserted: bool`` mixed in) on insert/update, or None if the row + is below the reuse threshold and shouldn't be persisted yet. + """ + pass + + @abstractmethod + async def list_credential_reuses( + self, + limit: int = 50, + offset: int = 0, + min_target_count: int = 2, + secret_kind: Optional[str] = None, + ) -> tuple[int, list[dict[str, Any]]]: + """Paged list of credential-reuse findings ordered by target_count desc.""" + pass + + @abstractmethod + async def get_credential_reuse_by_id( + self, reuse_id: str + ) -> Optional[dict[str, Any]]: + """One credential-reuse finding by UUID, or None.""" + pass + + @abstractmethod + async def update_credential_attacker_uuid( + self, attacker_ip: str, attacker_uuid: str + ) -> int: + """Backfill ``attacker_uuid`` on every Credential row matching the IP + whose ``attacker_uuid`` is currently null. Returns rows updated. + """ + pass + @abstractmethod async def get_state(self, key: str) -> Optional[dict[str, Any]]: """Retrieve a specific state entry by key.""" diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index 256959bc..f4b1e964 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -32,6 +32,7 @@ from decnet.web.db.models import ( Log, Bounty, Credential, + CredentialReuse, State, Attacker, AttackerBehavior, @@ -684,7 +685,7 @@ class SQLModelRepository(BaseRepository): out.append(d) return out - async def get_credential_reuse( + async def get_credential_attempts_for_secret( self, secret_sha256: str ) -> List[dict[str, Any]]: """Every (attacker_ip, decky, service, principal) row sharing this @@ -706,6 +707,197 @@ class SQLModelRepository(BaseRepository): out.append(d) return out + # ─── credential reuse (findings) ────────────────────────────────────── + + async def update_credential_attacker_uuid( + self, attacker_ip: str, attacker_uuid: str + ) -> int: + """Backfill ``attacker_uuid`` on every Credential row matching the + given IP whose ``attacker_uuid`` is currently null. Run by the + profiler after it mints/updates an Attacker row. + """ + async with self._session() as session: + result = await session.execute( + update(Credential) + .where( + Credential.attacker_ip == attacker_ip, + Credential.attacker_uuid.is_(None), + ) + .values(attacker_uuid=attacker_uuid) + ) + await session.commit() + return int(result.rowcount or 0) + + @staticmethod + def _merge_unique(existing_json: str, value: Optional[str]) -> tuple[str, bool]: + """Append ``value`` to a JSON list[str] column if not present. + Returns (new_json, changed). None values and duplicates are skipped. + """ + if value is None: + return existing_json, False + try: + current = json.loads(existing_json) if existing_json else [] + if not isinstance(current, list): + current = [] + except (json.JSONDecodeError, TypeError): + current = [] + if value in current: + return existing_json, False + current.append(value) + return json.dumps(current, ensure_ascii=True), True + + async def upsert_credential_reuse( + self, + *, + secret_sha256: str, + secret_kind: str, + principal: Optional[str], + attacker_uuid: Optional[str], + attacker_ip: str, + decky: str, + service: str, + attempt_count: int, + ts: Optional[datetime] = None, + ) -> Optional[dict[str, Any]]: + """Upsert a credential-reuse finding. + + The row is keyed by ``(secret_sha256, secret_kind, principal_key)`` + — ``principal_key`` is the canonicalised non-null form ("" when + principal is null) so the unique constraint behaves the same on + SQLite and MySQL. + + Returns the row dict augmented with ``inserted: bool`` and + ``changed: bool`` so the correlator can decide whether to publish + a bus event. + """ + principal_key = principal or "" + now = ts or datetime.now(timezone.utc) + async with self._session() as session: + existing = (await session.execute( + select(CredentialReuse).where( + CredentialReuse.secret_sha256 == secret_sha256, + CredentialReuse.secret_kind == secret_kind, + CredentialReuse.principal_key == principal_key, + ) + )).scalar_one_or_none() + + if existing is None: + row = CredentialReuse( + id=str(uuid.uuid4()), + secret_sha256=secret_sha256, + secret_kind=secret_kind, + principal=principal, + principal_key=principal_key, + attacker_uuids=json.dumps( + [attacker_uuid] if attacker_uuid else [], ensure_ascii=True + ), + attacker_ips=json.dumps([attacker_ip], ensure_ascii=True), + deckies=json.dumps([decky], ensure_ascii=True), + services=json.dumps([service], ensure_ascii=True), + target_count=1, + attempt_count=int(attempt_count), + confidence=1.0, + first_seen=now, + last_seen=now, + updated_at=now, + ) + session.add(row) + await session.commit() + await session.refresh(row) + d = row.model_dump(mode="json") + d["inserted"] = True + d["changed"] = True + return d + + changed = False + new_uuids, c1 = self._merge_unique(existing.attacker_uuids, attacker_uuid) + new_ips, c2 = self._merge_unique(existing.attacker_ips, attacker_ip) + new_deckies, c3 = self._merge_unique(existing.deckies, decky) + new_services, c4 = self._merge_unique(existing.services, service) + existing.attacker_uuids = new_uuids + existing.attacker_ips = new_ips + if c3 or c4: + existing.deckies = new_deckies + existing.services = new_services + # Recount target tuples from the underlying credentials + # table — a (decky, service) tuple only counts when both + # were observed together, which the JSON lists alone + # can't tell us. + stmt = ( + select(func.count(func.distinct( + Credential.decky_name + ":" + Credential.service + ))) + .where( + Credential.secret_sha256 == secret_sha256, + Credential.secret_kind == secret_kind, + (Credential.principal == principal) if principal is not None + else Credential.principal.is_(None), + ) + ) + target_count = (await session.execute(stmt)).scalar() or 0 + existing.target_count = int(target_count) + existing.attempt_count = (existing.attempt_count or 0) + int(attempt_count) + existing.last_seen = now + existing.updated_at = now + if c1 or c2 or c3 or c4: + changed = True + session.add(existing) + await session.commit() + await session.refresh(existing) + d = existing.model_dump(mode="json") + d["inserted"] = False + d["changed"] = changed + return d + + async def list_credential_reuses( + self, + limit: int = 50, + offset: int = 0, + min_target_count: int = 2, + secret_kind: Optional[str] = None, + ) -> tuple[int, List[dict[str, Any]]]: + async with self._session() as session: + base = select(CredentialReuse).where( + CredentialReuse.target_count >= min_target_count + ) + if secret_kind: + base = base.where(CredentialReuse.secret_kind == secret_kind) + total_stmt = select(func.count()).select_from(base.subquery()) + total = (await session.execute(total_stmt)).scalar() or 0 + list_stmt = ( + base.order_by(desc(CredentialReuse.target_count), + desc(CredentialReuse.last_seen)) + .offset(offset).limit(limit) + ) + rows = (await session.execute(list_stmt)).scalars().all() + out: List[dict[str, Any]] = [] + for r in rows: + d = r.model_dump(mode="json") + for key in ("attacker_uuids", "attacker_ips", "deckies", "services"): + try: + d[key] = json.loads(d[key]) + except (json.JSONDecodeError, TypeError): + d[key] = [] + out.append(d) + return int(total), out + + async def get_credential_reuse_by_id( + self, reuse_id: str + ) -> Optional[dict[str, Any]]: + async with self._session() as session: + row = (await session.execute( + select(CredentialReuse).where(CredentialReuse.id == reuse_id) + )).scalar_one_or_none() + if row is None: + return None + d = row.model_dump(mode="json") + for key in ("attacker_uuids", "attacker_ips", "deckies", "services"): + try: + d[key] = json.loads(d[key]) + except (json.JSONDecodeError, TypeError): + d[key] = [] + return d + async def get_state(self, key: str) -> Optional[dict[str, Any]]: async with self._session() as session: statement = select(State).where(State.key == key) diff --git a/tests/db/test_base_repo.py b/tests/db/test_base_repo.py index 52121d68..c4c74c71 100644 --- a/tests/db/test_base_repo.py +++ b/tests/db/test_base_repo.py @@ -23,7 +23,11 @@ class DummyRepo(BaseRepository): async def get_credentials(self, **kw): await super().get_credentials(**kw) async def get_total_credentials(self, **kw): await super().get_total_credentials(**kw) async def get_credentials_for_attacker(self, ip): await super().get_credentials_for_attacker(ip) - async def get_credential_reuse(self, h): await super().get_credential_reuse(h) + async def get_credential_attempts_for_secret(self, h): await super().get_credential_attempts_for_secret(h) + async def upsert_credential_reuse(self, **kw): await super().upsert_credential_reuse(**kw); return None + async def list_credential_reuses(self, **kw): await super().list_credential_reuses(**kw); return (0, []) + async def get_credential_reuse_by_id(self, i): await super().get_credential_reuse_by_id(i) + async def update_credential_attacker_uuid(self, ip, u): await super().update_credential_attacker_uuid(ip, u); return 0 async def get_state(self, k): await super().get_state(k) async def set_state(self, k, v): await super().set_state(k, v) async def get_max_log_id(self): await super().get_max_log_id() @@ -73,7 +77,15 @@ async def test_base_repo_coverage(): await dr.get_credentials() await dr.get_total_credentials() await dr.get_credentials_for_attacker("1.2.3.4") - await dr.get_credential_reuse("abc") + await dr.get_credential_attempts_for_secret("abc") + await dr.upsert_credential_reuse( + secret_sha256="x", secret_kind="plaintext", principal=None, + attacker_uuid=None, attacker_ip="1.2.3.4", decky="d", service="ssh", + attempt_count=1, ts=None, + ) + await dr.list_credential_reuses() + await dr.get_credential_reuse_by_id("a") + await dr.update_credential_attacker_uuid("1.2.3.4", "u") await dr.get_state("k") await dr.set_state("k", "v") await dr.get_max_log_id() diff --git a/tests/db/test_credential_reuse.py b/tests/db/test_credential_reuse.py new file mode 100644 index 00000000..4c857032 --- /dev/null +++ b/tests/db/test_credential_reuse.py @@ -0,0 +1,226 @@ +"""CredentialReuse repo tests — upsert idempotency, list pagination, FK backfill.""" +from __future__ import annotations + +import hashlib +from pathlib import Path + +import pytest + +from decnet.web.db.factory import get_repository + + +@pytest.fixture +async def repo(tmp_path: Path): + r = get_repository(db_path=str(tmp_path / "reuse.db")) + await r.initialize() + return r + + +def _sha256(s: str) -> str: + return hashlib.sha256(s.encode("utf-8")).hexdigest() + + +async def _seed_credential(repo, **overrides): + base = { + "attacker_ip": "10.0.0.5", + "decky_name": "decky-01", + "service": "ssh", + "principal": "root", + "secret_sha256": _sha256("hunter2"), + "secret_b64": "aHVudGVyMg==", + "secret_printable": "hunter2", + "fields": {}, + } + base.update(overrides) + return await repo.upsert_credential(base) + + +@pytest.mark.anyio +async def test_upsert_inserts_first_observation(repo) -> None: + sha = _sha256("hunter2") + out = await repo.upsert_credential_reuse( + secret_sha256=sha, secret_kind="plaintext", principal="root", + attacker_uuid=None, attacker_ip="10.0.0.5", + decky="decky-01", service="ssh", attempt_count=1, + ) + assert out is not None + assert out["inserted"] is True + assert out["target_count"] == 1 + assert out["confidence"] == 1.0 + + +@pytest.mark.anyio +async def test_upsert_grows_target_count_across_services(repo) -> None: + """Same secret on two distinct (decky, service) pairs → target_count=2. + + target_count is recomputed from the credentials table, so the test + must seed actual Credential rows first. + """ + sha = _sha256("p4ssw0rd") + await _seed_credential(repo, secret_sha256=sha, decky_name="d1", service="ssh") + await _seed_credential(repo, secret_sha256=sha, decky_name="d2", service="ftp") + + await repo.upsert_credential_reuse( + secret_sha256=sha, secret_kind="plaintext", principal="root", + attacker_uuid=None, attacker_ip="10.0.0.5", + decky="d1", service="ssh", attempt_count=1, + ) + out = await repo.upsert_credential_reuse( + secret_sha256=sha, secret_kind="plaintext", principal="root", + attacker_uuid=None, attacker_ip="10.0.0.5", + decky="d2", service="ftp", attempt_count=1, + ) + assert out["inserted"] is False + assert out["changed"] is True + assert out["target_count"] == 2 + + +@pytest.mark.anyio +async def test_upsert_dedups_same_decky_service(repo) -> None: + """Repeated upserts for the same (decky, service) don't grow target_count.""" + sha = _sha256("samepw") + await _seed_credential(repo, secret_sha256=sha) + for _ in range(3): + await repo.upsert_credential_reuse( + secret_sha256=sha, secret_kind="plaintext", principal="root", + attacker_uuid=None, attacker_ip="10.0.0.5", + decky="decky-01", service="ssh", attempt_count=1, + ) + rows = (await repo.list_credential_reuses(min_target_count=1))[1] + assert len(rows) == 1 + assert rows[0]["target_count"] == 1 + assert rows[0]["attempt_count"] == 3 + + +@pytest.mark.anyio +async def test_upsert_merges_attacker_lists(repo) -> None: + """Distinct attacker_uuid/ip values accumulate into the JSON lists.""" + sha = _sha256("shared") + await _seed_credential(repo, secret_sha256=sha, attacker_ip="1.1.1.1") + await _seed_credential( + repo, secret_sha256=sha, attacker_ip="2.2.2.2", decky_name="d2", + ) + await repo.upsert_credential_reuse( + secret_sha256=sha, secret_kind="plaintext", principal="root", + attacker_uuid="uuid-A", attacker_ip="1.1.1.1", + decky="decky-01", service="ssh", attempt_count=1, + ) + await repo.upsert_credential_reuse( + secret_sha256=sha, secret_kind="plaintext", principal="root", + attacker_uuid="uuid-B", attacker_ip="2.2.2.2", + decky="d2", service="ssh", attempt_count=1, + ) + rows = (await repo.list_credential_reuses(min_target_count=1))[1] + assert set(rows[0]["attacker_uuids"]) == {"uuid-A", "uuid-B"} + assert set(rows[0]["attacker_ips"]) == {"1.1.1.1", "2.2.2.2"} + + +@pytest.mark.anyio +async def test_null_principal_uniqueness(repo) -> None: + """Two upserts with principal=None go to the same row, not two rows.""" + sha = _sha256("redis-auth") + await _seed_credential(repo, secret_sha256=sha, service="redis", principal=None) + for _ in range(2): + await repo.upsert_credential_reuse( + secret_sha256=sha, secret_kind="plaintext", principal=None, + attacker_uuid=None, attacker_ip="1.1.1.1", + decky="decky-01", service="redis", attempt_count=1, + ) + rows = (await repo.list_credential_reuses(min_target_count=1))[1] + assert len(rows) == 1 + assert rows[0]["principal"] is None + + +@pytest.mark.anyio +async def test_list_filters_by_min_target_count(repo) -> None: + """min_target_count=2 hides 1-target findings.""" + sha = _sha256("only-once") + await _seed_credential(repo, secret_sha256=sha) + await repo.upsert_credential_reuse( + secret_sha256=sha, secret_kind="plaintext", principal="root", + attacker_uuid=None, attacker_ip="1.1.1.1", + decky="decky-01", service="ssh", attempt_count=1, + ) + total, rows = await repo.list_credential_reuses(min_target_count=2) + assert total == 0 + assert rows == [] + total, _ = await repo.list_credential_reuses(min_target_count=1) + assert total == 1 + + +@pytest.mark.anyio +async def test_list_pagination_orders_by_target_count_desc(repo) -> None: + sha_a = _sha256("a") + sha_b = _sha256("b") + # secret a → 1 target + await _seed_credential(repo, secret_sha256=sha_a) + await repo.upsert_credential_reuse( + secret_sha256=sha_a, secret_kind="plaintext", principal="root", + attacker_uuid=None, attacker_ip="1.1.1.1", + decky="d1", service="ssh", attempt_count=1, + ) + # secret b → 2 targets + await _seed_credential(repo, secret_sha256=sha_b, service="ssh") + await _seed_credential(repo, secret_sha256=sha_b, service="ftp", decky_name="d2") + await repo.upsert_credential_reuse( + secret_sha256=sha_b, secret_kind="plaintext", principal="root", + attacker_uuid=None, attacker_ip="1.1.1.1", + decky="decky-01", service="ssh", attempt_count=1, + ) + await repo.upsert_credential_reuse( + secret_sha256=sha_b, secret_kind="plaintext", principal="root", + attacker_uuid=None, attacker_ip="1.1.1.1", + decky="d2", service="ftp", attempt_count=1, + ) + total, rows = await repo.list_credential_reuses(min_target_count=1) + assert total == 2 + assert rows[0]["secret_sha256"] == sha_b # higher target_count first + + +@pytest.mark.anyio +async def test_get_by_id_roundtrip(repo) -> None: + sha = _sha256("rt") + await _seed_credential(repo, secret_sha256=sha) + out = await repo.upsert_credential_reuse( + secret_sha256=sha, secret_kind="plaintext", principal="root", + attacker_uuid=None, attacker_ip="1.1.1.1", + decky="decky-01", service="ssh", attempt_count=1, + ) + fetched = await repo.get_credential_reuse_by_id(out["id"]) + assert fetched is not None + assert fetched["id"] == out["id"] + assert fetched["secret_sha256"] == sha + assert isinstance(fetched["deckies"], list) + + +@pytest.mark.anyio +async def test_get_by_id_missing_returns_none(repo) -> None: + assert await repo.get_credential_reuse_by_id("nope") is None + + +@pytest.mark.anyio +async def test_update_credential_attacker_uuid_backfills_only_nulls(repo) -> None: + """The profiler hook must backfill attacker_uuid only on rows where it + is currently null — pre-existing UUIDs must not be overwritten.""" + sha = _sha256("backfill") + await _seed_credential(repo, secret_sha256=sha, attacker_ip="9.9.9.9") + await _seed_credential( + repo, secret_sha256=sha, attacker_ip="9.9.9.9", + service="ftp", decky_name="d2", + ) + # Backfill: both null, both should update. + n = await repo.update_credential_attacker_uuid("9.9.9.9", "uuid-9") + assert n == 2 + + # Second call: both already set, nothing should change. + n2 = await repo.update_credential_attacker_uuid("9.9.9.9", "uuid-other") + assert n2 == 0 + + rows = await repo.get_credentials_for_attacker("9.9.9.9") + assert all(r["attacker_uuid"] == "uuid-9" for r in rows) + + +@pytest.mark.anyio +async def test_update_credential_attacker_uuid_no_match(repo) -> None: + n = await repo.update_credential_attacker_uuid("0.0.0.0", "uuid-x") + assert n == 0 diff --git a/tests/db/test_credentials.py b/tests/db/test_credentials.py index 958959df..f426ea0b 100644 --- a/tests/db/test_credentials.py +++ b/tests/db/test_credentials.py @@ -101,7 +101,7 @@ async def test_cross_service_reuse_query(repo) -> None: "secret_printable": secret, "fields": {}, }) - reuse = await repo.get_credential_reuse(sha) + reuse = await repo.get_credential_attempts_for_secret(sha) assert {r["service"] for r in reuse} == {"ssh", "ftp", "smtp"} diff --git a/tests/vectorstore/__init__.py b/tests/vectorstore/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/vectorstore/test_factory.py b/tests/vectorstore/test_factory.py new file mode 100644 index 00000000..5cdbc053 --- /dev/null +++ b/tests/vectorstore/test_factory.py @@ -0,0 +1,66 @@ +"""Tests for :func:`decnet.vectorstore.factory.get_vectorstore` dispatch.""" +from __future__ import annotations + +import os + +import pytest + +from decnet.vectorstore.factory import _default_db_path, get_vectorstore +from decnet.vectorstore.fake import FakeVectorStore, NullVectorStore + + +def test_disabled_returns_null(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("DECNET_VECTORSTORE_ENABLED", "false") + monkeypatch.setenv("DECNET_VECTORSTORE_TYPE", "sqlite_vec") # ignored when disabled + s = get_vectorstore() + assert isinstance(s, NullVectorStore) + + +def test_fake_dispatch(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("DECNET_VECTORSTORE_ENABLED", "true") + monkeypatch.setenv("DECNET_VECTORSTORE_TYPE", "fake") + s = get_vectorstore() + assert isinstance(s, FakeVectorStore) + + +def test_sqlite_vec_falls_back_to_fake_when_extension_missing( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """The factory must degrade gracefully when sqlite_vec isn't installed: + log a warning, return FakeVectorStore. Workers stay alive instead of + crashing on a missing optional dep.""" + monkeypatch.setenv("DECNET_VECTORSTORE_ENABLED", "true") + monkeypatch.setenv("DECNET_VECTORSTORE_TYPE", "sqlite_vec") + # Force the import to fail regardless of what's actually installed, + # so this test is deterministic on dev boxes that have the extension. + import builtins + real_import = builtins.__import__ + + def _fake_import(name, *a, **kw): # noqa: ANN001 + if name == "sqlite_vec": + raise ImportError("forced") + return real_import(name, *a, **kw) + + monkeypatch.setattr(builtins, "__import__", _fake_import) + s = get_vectorstore() + assert isinstance(s, FakeVectorStore) + + +def test_unknown_type_raises(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("DECNET_VECTORSTORE_ENABLED", "true") + monkeypatch.setenv("DECNET_VECTORSTORE_TYPE", "qdrant") + with pytest.raises(ValueError, match="Unsupported vectorstore type"): + get_vectorstore() + + +def test_default_db_path_honors_env(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("DECNET_VECTORSTORE_PATH", "/tmp/explicit.sqlite") + assert _default_db_path() == "/tmp/explicit.sqlite" + + +def test_default_db_path_falls_back_to_home(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("DECNET_VECTORSTORE_PATH", raising=False) + monkeypatch.setattr("os.path.isdir", lambda p: False) + p = _default_db_path() + assert p.endswith(".decnet/vectors.sqlite") + assert p.startswith(os.path.expanduser("~")) diff --git a/tests/vectorstore/test_fake.py b/tests/vectorstore/test_fake.py new file mode 100644 index 00000000..4c132823 --- /dev/null +++ b/tests/vectorstore/test_fake.py @@ -0,0 +1,113 @@ +"""Tests for :class:`FakeVectorStore` and :class:`NullVectorStore`. + +The fake is the reference implementation of the BaseVectorStore +contract — every behavior assertion here doubles as a contract test +that any future backend must satisfy. +""" +from __future__ import annotations + +import pytest + +from decnet.vectorstore.fake import FakeVectorStore, NullVectorStore + + +@pytest.mark.anyio +async def test_fake_round_trip() -> None: + s = FakeVectorStore() + await s.initialize() + await s.insert("ja3", "sess-1", [1.0, 0.0, 0.0]) + await s.insert("ja3", "sess-2", [0.9, 0.1, 0.0]) + await s.insert("ja3", "sess-3", [0.0, 1.0, 0.0]) + + rec = await s.get("ja3", "sess-1") + assert rec is not None + assert rec.kind == "ja3" + assert rec.id == "sess-1" + assert rec.dim == 3 + assert tuple(rec.vector) == (1.0, 0.0, 0.0) + + +@pytest.mark.anyio +async def test_fake_knn_orders_by_distance() -> None: + s = FakeVectorStore() + await s.initialize() + await s.insert("ja3", "near", [1.0, 0.0]) + await s.insert("ja3", "far", [0.0, 1.0]) + await s.insert("ja3", "exact", [0.99, 0.01]) + + n = await s.knn("ja3", [1.0, 0.0], k=3) + assert [x.id for x in n] == ["near", "exact", "far"] + assert n[0].distance == 0.0 + assert n[2].distance > n[1].distance + + +@pytest.mark.anyio +async def test_fake_knn_unknown_kind_returns_empty() -> None: + s = FakeVectorStore() + await s.initialize() + assert await s.knn("never_seen", [0.1, 0.2]) == [] + + +@pytest.mark.anyio +async def test_fake_dim_mismatch_raises() -> None: + s = FakeVectorStore() + await s.initialize() + await s.insert("hassh", "a", [1.0, 2.0, 3.0]) + with pytest.raises(ValueError, match="dim mismatch"): + await s.insert("hassh", "b", [1.0, 2.0]) + + +@pytest.mark.anyio +async def test_fake_knn_query_dim_mismatch_raises() -> None: + s = FakeVectorStore() + await s.initialize() + await s.insert("kd", "a", [0.1, 0.2, 0.3]) + with pytest.raises(ValueError): + await s.knn("kd", [0.1, 0.2]) + + +@pytest.mark.anyio +async def test_fake_replace_existing_id() -> None: + s = FakeVectorStore() + await s.initialize() + await s.insert("k", "id1", [1.0, 0.0]) + await s.insert("k", "id1", [0.0, 1.0]) + rec = await s.get("k", "id1") + assert tuple(rec.vector) == (0.0, 1.0) + + +@pytest.mark.anyio +async def test_fake_delete() -> None: + s = FakeVectorStore() + await s.initialize() + await s.insert("k", "id1", [1.0]) + assert await s.delete("k", "id1") is True + assert await s.delete("k", "id1") is False + assert await s.get("k", "id1") is None + + +@pytest.mark.anyio +async def test_fake_health_reports_counts() -> None: + s = FakeVectorStore() + await s.initialize() + h = await s.health() + assert h == {"ok": True, "backend": "fake", "kinds": 0, "vectors": 0} + await s.insert("a", "1", [1.0]) + await s.insert("a", "2", [2.0]) + await s.insert("b", "1", [3.0, 4.0]) + h = await s.health() + assert h["kinds"] == 2 + assert h["vectors"] == 3 + + +@pytest.mark.anyio +async def test_null_store_is_inert() -> None: + s = NullVectorStore() + await s.initialize() + await s.insert("k", "id", [1.0, 2.0]) # no-op + assert await s.get("k", "id") is None + assert await s.knn("k", [1.0, 2.0]) == [] + assert await s.delete("k", "id") is False + h = await s.health() + assert h["backend"] == "null" + await s.close()