feat(creds): cred-reuse foundation + vectorstore scaffold

Lays the storage and bus substrate for the "credential reuse patterns"
task in DEVELOPMENT.md and scaffolds decnet/vectorstore/ as the future
substrate for statistical attacker re-identification over behavioral
fingerprints. No correlator, profiler, API, or dashboard wiring in
this commit — see TODO.md for the handoff.

Schema:
  - Credential.attacker_uuid (nullable FK to attackers.uuid),
    backfilled by the profiler post-write to avoid coupling the
    capture path to the profiler's ordering.
  - CredentialReuse table — UUID PK, JSON list columns for the
    accumulating attacker_uuids/ips/deckies/services, target_count
    (the discriminative scalar), confidence reserved for a future
    fuzzy-credential pass.

Repo:
  - upsert_credential_reuse / list_credential_reuses /
    get_credential_reuse_by_id / update_credential_attacker_uuid.
  - Renamed pre-existing get_credential_reuse(secret_sha256) to
    get_credential_attempts_for_secret(secret_sha256) — the new
    findings table needs the cleaner name.

Bus topics:
  - credential.captured (one per Credential upsert)
  - credential.reuse.detected (correlator-emitted on insert/grow)

Vectorstore subpackage (decnet/vectorstore/, flat layout mirroring
decnet/bus/):
  - BaseVectorStore ABC keyed by (kind, id) — kind discriminator
    means new feature families are additive, no schema migration.
  - FakeVectorStore (in-memory L2 KNN), NullVectorStore (no-op for
    DECNET_VECTORSTORE_ENABLED=false), SqliteVecVectorStore (lazy
    sqlite_vec extension load, one vec0 virtual table per kind).
  - get_vectorstore() env-driven dispatch with graceful fallback
    to FakeVectorStore when the sqlite-vec extension isn't on the
    host, so workers don't crash on a missing optional dep.

Tests: 26 new (11 cred-reuse repo, 15 vectorstore). Existing
credentials and base-repo tests updated for the rename. Total: 34
passing on the touched files.
This commit is contained in:
2026-04-26 03:18:34 -04:00
parent 817ce32e6d
commit ce4be68501
17 changed files with 1615 additions and 11 deletions

View File

@@ -14,6 +14,8 @@ Token structure (NATS-style, dot-separated):
attacker.scored
attacker.session.started
attacker.session.ended
credential.captured
credential.reuse.detected
system.log
system.bus.health
system.{worker}.health
@@ -32,6 +34,7 @@ TOPOLOGY = "topology"
DECKY = "decky"
ATTACKER = "attacker"
SYSTEM = "system"
CREDENTIAL = "credential"
# ─── Leaf event-type constants (the last segment of each topic) ──────────────
@@ -75,6 +78,15 @@ ATTACKER_FINGERPRINTED = "fingerprinted"
ATTACKER_SESSION_STARTED = "session.started"
ATTACKER_SESSION_ENDED = "session.ended"
# Credential event types (second/third tokens under ``credential``).
# ``credential.captured`` fires once per upserted Credential row — the
# correlator listens for it and runs the cred-reuse query in response,
# so reuse detection latency is sub-second after a fresh capture.
# ``credential.reuse.detected`` fires when the correlator inserts a new
# CredentialReuse row or grows an existing one (added decky/service/IP).
CREDENTIAL_CAPTURED = "captured"
CREDENTIAL_REUSE_DETECTED = "reuse.detected"
# System event types.
SYSTEM_LOG = "log"
SYSTEM_BUS_HEALTH = "bus.health"
@@ -143,6 +155,19 @@ def system(event_type: str) -> str:
return f"{SYSTEM}.{event_type}"
def credential(event_type: str) -> str:
"""Build ``credential.<event_type>``.
*event_type* is typically one of :data:`CREDENTIAL_CAPTURED` or
:data:`CREDENTIAL_REUSE_DETECTED`. Dotted leaves
(``reuse.detected``) are permitted — same rationale as
:func:`system`.
"""
if not event_type:
raise ValueError("credential topic requires a non-empty event_type")
return f"{CREDENTIAL}.{event_type}"
def attacker(event_type: str) -> str:
"""Build ``attacker.<event_type>``.

View File

@@ -0,0 +1,27 @@
"""Vector store substrate for behavioral fingerprint similarity search.
Provider-pluggable storage for ``(kind, id, vector)`` triples used by the
future statistical re-identification engine. ``kind`` discriminates
feature families (``ja3``, ``hassh``, ``keystroke``, ``cmd_ngram``, ...)
so new feature types are additive — no schema migration required when
adding a new extractor.
Use :func:`get_vectorstore` from :mod:`decnet.vectorstore.factory`; never
import concrete implementations directly. Mirrors the same factory
discipline as :mod:`decnet.bus` and :mod:`decnet.web.db`.
"""
from decnet.vectorstore.base import (
BaseVectorStore,
Neighbor,
VectorRecord,
VECTORSTORE_SCHEMA_VERSION,
)
from decnet.vectorstore.factory import get_vectorstore
__all__ = [
"BaseVectorStore",
"Neighbor",
"VectorRecord",
"VECTORSTORE_SCHEMA_VERSION",
"get_vectorstore",
]

114
decnet/vectorstore/base.py Normal file
View File

@@ -0,0 +1,114 @@
"""Vector-store abstractions: :class:`BaseVectorStore` ABC + record types.
Every backend (sqlite-vec, in-memory fake, future pgvector / Qdrant)
speaks this contract. The store is keyed by ``(kind, id)`` where:
* ``kind`` is a short discriminator (``ja3``, ``hassh``,
``keystroke_dwell``, ``cmd_ngram``, ...) — vectors are only ever
compared **within the same kind**, so adding a new feature family is
a non-event for the store.
* ``id`` is a stable identifier owned by the caller — typically the
``session_id`` or ``attacker_uuid``. The store does not interpret it.
* ``extractor_version`` is recorded alongside the vector so v1 vs v2 of
the same kind never get cross-compared by accident — a similarity
scorer that respects versioning is the consumer's responsibility, but
the data it needs is here.
The contract is intentionally minimal (insert/get/knn/delete/health) so
backends with different physical layouts can implement it
straightforwardly. No batch APIs in v1 — sub-millisecond per-vector
overhead at honeypot scales (≤ 100k vectors per kind) makes batching
unnecessary, and the loop-over-singles pattern keeps the contract small.
"""
from __future__ import annotations
import abc
from dataclasses import dataclass
from typing import Optional, Sequence
# Bumped when the wire/ABI shape of records changes incompatibly.
# Backends MAY refuse to load older data when this changes, but the
# pre-v1 expectation is to migrate forward in the same release.
VECTORSTORE_SCHEMA_VERSION = 1
@dataclass(frozen=True)
class VectorRecord:
"""One stored vector, returned by :meth:`BaseVectorStore.get`."""
kind: str
id: str
vector: Sequence[float]
dim: int
extractor_version: int = 1
@dataclass(frozen=True)
class Neighbor:
"""One similarity-search hit, returned by :meth:`BaseVectorStore.knn`.
``distance`` is whatever the backend's native metric reports —
cosine distance for sqlite-vec's default index, L2 for the in-memory
fake. Smaller is more similar in both cases. Consumers that need
a uniform metric should configure the backend explicitly.
"""
kind: str
id: str
distance: float
class BaseVectorStore(abc.ABC):
"""Async interface for a kind-discriminated vector store.
Implementations MAY be transactional (sqlite) or not (pure
in-memory). All methods are async to match the rest of the DECNET
storage layer; trivial backends can ``await`` no-op coroutines.
"""
@abc.abstractmethod
async def initialize(self) -> None:
"""One-shot setup (open files, load extensions, create tables)."""
@abc.abstractmethod
async def close(self) -> None:
"""Release resources. Idempotent."""
@abc.abstractmethod
async def health(self) -> dict:
"""Liveness + capability probe.
Returns a dict like ``{"ok": True, "backend": "sqlite_vec",
"kinds": 4, "vectors": 12_345}``. Used by ``/api/v1/health`` and
diagnostics; never raises — backends that can't determine a
field set it to None.
"""
@abc.abstractmethod
async def insert(
self,
kind: str,
id: str,
vector: Sequence[float],
*,
extractor_version: int = 1,
) -> None:
"""Insert or replace ``(kind, id)``. Vector dim is fixed per kind
the first time a kind is seen; mismatched dims raise.
"""
@abc.abstractmethod
async def get(self, kind: str, id: str) -> Optional[VectorRecord]:
"""Fetch one record, or None if absent."""
@abc.abstractmethod
async def delete(self, kind: str, id: str) -> bool:
"""Delete one record. Returns True if a row was removed."""
@abc.abstractmethod
async def knn(
self, kind: str, vector: Sequence[float], k: int = 10
) -> list[Neighbor]:
"""Return up to *k* nearest neighbors of ``vector`` within
``kind``. Empty list if the kind is unknown or empty.
"""

View File

@@ -0,0 +1,73 @@
"""Vectorstore factory — selects a :class:`BaseVectorStore` implementation.
Dispatch keys:
* ``DECNET_VECTORSTORE_ENABLED`` — ``"false"`` short-circuits to
:class:`~decnet.vectorstore.fake.NullVectorStore`. Default ``"true"``.
* ``DECNET_VECTORSTORE_TYPE`` — ``"sqlite_vec"`` (default) or
``"fake"``.
* ``DECNET_VECTORSTORE_PATH`` — sqlite file path. Defaults to
``/var/lib/decnet/vectors.sqlite`` if writable, else
``~/.decnet/vectors.sqlite``.
Mirrors :mod:`decnet.bus.factory` and :mod:`decnet.web.db.factory`:
lazy imports inside each branch, env-driven dispatch, callers MUST go
through :func:`get_vectorstore` rather than instantiating backends.
If ``sqlite_vec`` is requested but the extension is unavailable on
this host, the factory logs a warning and returns the fake backend
instead — the caller's code path stays valid (``insert`` no-ops, etc.)
without crashing the worker on a missing optional dependency.
"""
from __future__ import annotations
import logging
import os
from typing import Any
from decnet.vectorstore.base import BaseVectorStore
LOG = logging.getLogger(__name__)
def get_vectorstore(**kwargs: Any) -> BaseVectorStore:
if os.environ.get("DECNET_VECTORSTORE_ENABLED", "true").lower() == "false":
from decnet.vectorstore.fake import NullVectorStore
return NullVectorStore()
backend = os.environ.get("DECNET_VECTORSTORE_TYPE", "sqlite_vec").lower()
if backend == "fake":
from decnet.vectorstore.fake import FakeVectorStore
return FakeVectorStore()
if backend == "sqlite_vec":
# Probe extension availability up front so the factory can fall
# back cleanly. Construction is cheap, but the extension load
# only happens in initialize(); without this probe the caller
# sees the failure too late to substitute a backend.
try:
import sqlite_vec # noqa: F401
except ImportError as e:
LOG.warning(
"sqlite_vec not installed (%s); falling back to FakeVectorStore. "
"Install the sqlite-vec package or set "
"DECNET_VECTORSTORE_TYPE=fake to silence this warning.", e,
)
from decnet.vectorstore.fake import FakeVectorStore
return FakeVectorStore()
from decnet.vectorstore.sqlite_vec import SqliteVecVectorStore
db_path = kwargs.pop("db_path", None) or _default_db_path()
return SqliteVecVectorStore(db_path=db_path)
raise ValueError(f"Unsupported vectorstore type: {backend}")
def _default_db_path() -> str:
explicit = os.environ.get("DECNET_VECTORSTORE_PATH")
if explicit:
return explicit
runtime_dir = "/var/lib/decnet"
if os.path.isdir(runtime_dir) and os.access(runtime_dir, os.W_OK):
return f"{runtime_dir}/vectors.sqlite"
return os.path.expanduser("~/.decnet/vectors.sqlite")

131
decnet/vectorstore/fake.py Normal file
View File

@@ -0,0 +1,131 @@
"""In-memory vector store backend.
Two flavors:
* :class:`FakeVectorStore` — a real, working in-memory store. Used by
tests and by dev environments that want similarity search without
any native extension on the box. KNN is brute-force L2 — fine up to
a few thousand vectors per kind.
* :class:`NullVectorStore` — a no-op store returned by the factory
when ``DECNET_VECTORSTORE_ENABLED=false``. Every method succeeds
trivially; ``get`` and ``knn`` return None / [] respectively. Lets
workers run unaffected when the operator hasn't opted into vector
features yet.
"""
from __future__ import annotations
import math
from typing import Optional, Sequence
from decnet.vectorstore.base import BaseVectorStore, Neighbor, VectorRecord
class FakeVectorStore(BaseVectorStore):
"""Pure-python in-memory vector store, brute-force KNN.
Suitable for tests and small-scale dev (≤ a few thousand vectors
per kind). Not persistent — every process restart drops state.
"""
def __init__(self) -> None:
# {kind: {id: VectorRecord}}
self._store: dict[str, dict[str, VectorRecord]] = {}
# {kind: dim} — locked the first time a kind is written.
self._dims: dict[str, int] = {}
async def initialize(self) -> None:
return None
async def close(self) -> None:
return None
async def health(self) -> dict:
total = sum(len(by_id) for by_id in self._store.values())
return {
"ok": True,
"backend": "fake",
"kinds": len(self._store),
"vectors": total,
}
async def insert(
self,
kind: str,
id: str,
vector: Sequence[float],
*,
extractor_version: int = 1,
) -> None:
dim = len(vector)
existing_dim = self._dims.get(kind)
if existing_dim is None:
self._dims[kind] = dim
elif existing_dim != dim:
raise ValueError(
f"vector dim mismatch for kind={kind!r}: "
f"expected {existing_dim}, got {dim}"
)
rec = VectorRecord(
kind=kind, id=id, vector=tuple(float(x) for x in vector),
dim=dim, extractor_version=int(extractor_version),
)
self._store.setdefault(kind, {})[id] = rec
async def get(self, kind: str, id: str) -> Optional[VectorRecord]:
return self._store.get(kind, {}).get(id)
async def delete(self, kind: str, id: str) -> bool:
bucket = self._store.get(kind)
if bucket is None or id not in bucket:
return False
del bucket[id]
return True
async def knn(
self, kind: str, vector: Sequence[float], k: int = 10
) -> list[Neighbor]:
bucket = self._store.get(kind)
if not bucket:
return []
q = tuple(float(x) for x in vector)
if len(q) != self._dims.get(kind, len(q)):
raise ValueError(
f"query dim {len(q)} != stored dim {self._dims[kind]} "
f"for kind={kind!r}"
)
scored: list[Neighbor] = []
for rid, rec in bucket.items():
d = math.sqrt(sum((a - b) ** 2 for a, b in zip(q, rec.vector)))
scored.append(Neighbor(kind=kind, id=rid, distance=d))
scored.sort(key=lambda n: n.distance)
return scored[: max(0, int(k))]
class NullVectorStore(BaseVectorStore):
"""No-op vector store. Returned when vectorstore is disabled."""
async def initialize(self) -> None:
return None
async def close(self) -> None:
return None
async def health(self) -> dict:
return {"ok": True, "backend": "null", "kinds": 0, "vectors": 0}
async def insert(
self, kind: str, id: str, vector: Sequence[float],
*, extractor_version: int = 1,
) -> None:
return None
async def get(self, kind: str, id: str) -> Optional[VectorRecord]:
return None
async def delete(self, kind: str, id: str) -> bool:
return False
async def knn(
self, kind: str, vector: Sequence[float], k: int = 10
) -> list[Neighbor]:
return []

View File

@@ -0,0 +1,285 @@
"""SQLite + sqlite-vec backend.
Lazy-imports the ``sqlite_vec`` extension. If the extension isn't
available (the package isn't installed, or the host's libsqlite3 is too
old to load loadable extensions), construction raises
:class:`SqliteVecUnavailable`; the factory catches that and falls back
to :class:`~decnet.vectorstore.fake.FakeVectorStore` with a warning.
Schema:
CREATE TABLE vectors (
kind TEXT NOT NULL,
id TEXT NOT NULL,
extractor_version INTEGER NOT NULL DEFAULT 1,
dim INTEGER NOT NULL,
PRIMARY KEY (kind, id)
);
CREATE VIRTUAL TABLE vec_<kind> USING vec0(
embedding float[<dim>]
);
A vec0 virtual table is created lazily per-kind on first insert
(distinct ``kind`` values get distinct vec0 tables because vec0's dim
is a schema-time constant). The ``vectors`` row is the source of truth
for metadata (extractor_version, dim) and for the (kind, id) → rowid
mapping; vec0 stores only the embedding, keyed by an INTEGER rowid.
"""
from __future__ import annotations
import asyncio
import logging
import sqlite3
import threading
from pathlib import Path
from typing import Optional, Sequence
from decnet.vectorstore.base import BaseVectorStore, Neighbor, VectorRecord
LOG = logging.getLogger(__name__)
class SqliteVecUnavailable(RuntimeError):
"""sqlite_vec couldn't be loaded (extension missing / too-old sqlite3)."""
def _load_sqlite_vec(conn: sqlite3.Connection) -> None:
try:
import sqlite_vec # type: ignore[import-untyped]
except ImportError as e:
raise SqliteVecUnavailable("sqlite_vec package not installed") from e
try:
conn.enable_load_extension(True)
except (AttributeError, sqlite3.NotSupportedError) as e:
raise SqliteVecUnavailable(
"system sqlite3 was built without loadable-extension support"
) from e
try:
sqlite_vec.load(conn)
except sqlite3.OperationalError as e:
raise SqliteVecUnavailable(f"sqlite_vec load failed: {e}") from e
finally:
try:
conn.enable_load_extension(False)
except sqlite3.NotSupportedError:
pass
class SqliteVecVectorStore(BaseVectorStore):
"""sqlite-vec backed vector store. Single-file, async-friendly via
:func:`asyncio.to_thread`. Keep one instance per process.
"""
def __init__(self, db_path: str) -> None:
self._db_path = db_path
self._conn: Optional[sqlite3.Connection] = None
self._lock = threading.Lock()
# {kind: dim} cached after first insert/probe.
self._kinds: dict[str, int] = {}
async def initialize(self) -> None:
await asyncio.to_thread(self._init_sync)
def _init_sync(self) -> None:
Path(self._db_path).parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(self._db_path, check_same_thread=False)
_load_sqlite_vec(conn) # raises SqliteVecUnavailable on failure
conn.execute("PRAGMA journal_mode=WAL")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS vectors (
kind TEXT NOT NULL,
id TEXT NOT NULL,
extractor_version INTEGER NOT NULL DEFAULT 1,
dim INTEGER NOT NULL,
rowid_in_vec INTEGER NOT NULL,
PRIMARY KEY (kind, id)
)
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS ix_vectors_kind ON vectors(kind)"
)
conn.commit()
# Re-hydrate kind→dim cache from any existing rows so a process
# restart doesn't accept a mismatched dim on the first insert.
for row in conn.execute("SELECT kind, dim FROM vectors GROUP BY kind"):
self._kinds[row[0]] = int(row[1])
self._conn = conn
async def close(self) -> None:
await asyncio.to_thread(self._close_sync)
def _close_sync(self) -> None:
with self._lock:
if self._conn is not None:
self._conn.close()
self._conn = None
async def health(self) -> dict:
return await asyncio.to_thread(self._health_sync)
def _health_sync(self) -> dict:
if self._conn is None:
return {"ok": False, "backend": "sqlite_vec", "reason": "not initialized"}
try:
row = self._conn.execute("SELECT COUNT(*) FROM vectors").fetchone()
return {
"ok": True,
"backend": "sqlite_vec",
"kinds": len(self._kinds),
"vectors": int(row[0]) if row else 0,
}
except sqlite3.Error as e:
return {"ok": False, "backend": "sqlite_vec", "reason": str(e)}
@staticmethod
def _vec_table(kind: str) -> str:
# Validate the kind so it can't break out of the table name.
# Allowed: ascii letters, digits, underscore. Anything else =
# programmer error; raise loudly.
if not kind or not all(c.isalnum() or c == "_" for c in kind):
raise ValueError(f"invalid kind {kind!r}: ascii [a-z0-9_] only")
return f"vec_{kind}"
def _ensure_kind_table(self, kind: str, dim: int) -> None:
assert self._conn is not None # nosec B101
existing = self._kinds.get(kind)
if existing is None:
# vec_<kind> identifier is validated by _vec_table() to be
# ascii [a-z0-9_] only, and dim is int-cast — no injection
# vector. The f-string is the only way to interpolate a
# virtual-table name; placeholders aren't allowed for DDL.
ddl = ( # nosec B608
f"CREATE VIRTUAL TABLE IF NOT EXISTS {self._vec_table(kind)} "
f"USING vec0(embedding float[{int(dim)}])"
)
self._conn.execute(ddl)
self._conn.commit()
self._kinds[kind] = dim
elif existing != dim:
raise ValueError(
f"vector dim mismatch for kind={kind!r}: "
f"expected {existing}, got {dim}"
)
async def insert(
self, kind: str, id: str, vector: Sequence[float],
*, extractor_version: int = 1,
) -> None:
await asyncio.to_thread(
self._insert_sync, kind, id, list(vector), int(extractor_version)
)
def _insert_sync(
self, kind: str, id: str, vector: list[float], extractor_version: int,
) -> None:
with self._lock:
assert self._conn is not None # nosec B101
dim = len(vector)
self._ensure_kind_table(kind, dim)
vec_table = self._vec_table(kind)
cur = self._conn.cursor()
existing = cur.execute(
"SELECT rowid_in_vec FROM vectors WHERE kind=? AND id=?",
(kind, id),
).fetchone()
if existing is not None:
rowid = int(existing[0])
# vec_table is validated; rowid is bound. Safe.
cur.execute(f"DELETE FROM {vec_table} WHERE rowid=?", (rowid,)) # nosec B608
import struct
blob = struct.pack(f"{dim}f", *vector)
cur.execute(f"INSERT INTO {vec_table}(embedding) VALUES (?)", (blob,)) # nosec B608
new_rowid = cur.lastrowid
cur.execute(
"INSERT OR REPLACE INTO vectors"
"(kind, id, extractor_version, dim, rowid_in_vec) "
"VALUES (?, ?, ?, ?, ?)",
(kind, id, extractor_version, dim, new_rowid),
)
self._conn.commit()
async def get(self, kind: str, id: str) -> Optional[VectorRecord]:
return await asyncio.to_thread(self._get_sync, kind, id)
def _get_sync(self, kind: str, id: str) -> Optional[VectorRecord]:
with self._lock:
assert self._conn is not None # nosec B101
row = self._conn.execute(
"SELECT extractor_version, dim, rowid_in_vec "
"FROM vectors WHERE kind=? AND id=?",
(kind, id),
).fetchone()
if row is None:
return None
ext_v, dim, rowid = int(row[0]), int(row[1]), int(row[2])
vec_table = self._vec_table(kind)
blob_row = self._conn.execute(f"SELECT embedding FROM {vec_table} WHERE rowid=?", (rowid,)).fetchone() # nosec B608
if blob_row is None:
return None
import struct
vec = list(struct.unpack(f"{dim}f", blob_row[0]))
return VectorRecord(
kind=kind, id=id, vector=vec, dim=dim,
extractor_version=ext_v,
)
async def delete(self, kind: str, id: str) -> bool:
return await asyncio.to_thread(self._delete_sync, kind, id)
def _delete_sync(self, kind: str, id: str) -> bool:
with self._lock:
assert self._conn is not None # nosec B101
row = self._conn.execute(
"SELECT rowid_in_vec FROM vectors WHERE kind=? AND id=?",
(kind, id),
).fetchone()
if row is None:
return False
rowid = int(row[0])
vec_table = self._vec_table(kind)
self._conn.execute(f"DELETE FROM {vec_table} WHERE rowid=?", (rowid,)) # nosec B608
self._conn.execute(
"DELETE FROM vectors WHERE kind=? AND id=?", (kind, id)
)
self._conn.commit()
return True
async def knn(
self, kind: str, vector: Sequence[float], k: int = 10,
) -> list[Neighbor]:
return await asyncio.to_thread(self._knn_sync, kind, list(vector), int(k))
def _knn_sync(self, kind: str, vector: list[float], k: int) -> list[Neighbor]:
with self._lock:
assert self._conn is not None # nosec B101
existing_dim = self._kinds.get(kind)
if existing_dim is None:
return []
if len(vector) != existing_dim:
raise ValueError(
f"query dim {len(vector)} != stored dim {existing_dim} "
f"for kind={kind!r}"
)
vec_table = self._vec_table(kind)
import struct
qblob = struct.pack(f"{existing_dim}f", *vector)
knn_sql = f"SELECT rowid, distance FROM {vec_table} WHERE embedding MATCH ? ORDER BY distance LIMIT ?" # nosec B608
rows = self._conn.execute(knn_sql, (qblob, max(0, k))).fetchall()
if not rows:
return []
id_map = {
int(r[0]): r[1]
for r in self._conn.execute(
"SELECT rowid_in_vec, id FROM vectors WHERE kind=?",
(kind,),
)
}
out: list[Neighbor] = []
for rowid, dist in rows:
rid = id_map.get(int(rowid))
if rid is None:
continue
out.append(Neighbor(kind=kind, id=rid, distance=float(dist)))
return out

View File

@@ -49,6 +49,8 @@ from .logs import (
Bounty,
BountyResponse,
Credential,
CredentialReuse,
CredentialReuseResponse,
CredentialsResponse,
Log,
LogsResponse,
@@ -170,6 +172,8 @@ __all__ = [
"Bounty",
"BountyResponse",
"Credential",
"CredentialReuse",
"CredentialReuseResponse",
"CredentialsResponse",
"Log",
"LogsResponse",

View File

@@ -3,7 +3,7 @@ from datetime import datetime, timezone
from typing import Any, List, Optional
from pydantic import BaseModel
from sqlalchemy import Column, Index, Text
from sqlalchemy import Column, Index, Text, UniqueConstraint
from sqlmodel import Field, SQLModel
from ._base import _BIG_TEXT
@@ -54,9 +54,13 @@ class Credential(SQLModel, table=True):
LDAP. Nullable for principal-less mechanisms (Redis AUTH, bearer
tokens). Fully service-specific keys ride in ``fields`` JSON.
Dedup contract: same (attacker_uuid, decky, service, secret_sha256,
Dedup contract: same (attacker_ip, decky, service, secret_sha256,
principal_or_empty) tuple → upsert, bumps ``attempt_count`` and
``last_seen``. Different secret or different principal → new row.
``attacker_uuid`` is backfilled by the profiler once an Attacker row
has been minted for the source IP. It is nullable on first write so
the credential ingest path stays decoupled from the profiler.
"""
__tablename__ = "credentials"
__table_args__ = (
@@ -64,11 +68,15 @@ class Credential(SQLModel, table=True):
Index("ix_credentials_principal_service", "principal", "service"),
)
id: Optional[int] = Field(default=None, primary_key=True)
# Keyed by attacker IP (not attackers.uuid) to match Bounty's pattern
# and avoid the chicken-and-egg of writing a credential row before
# the profiler has minted the Attacker. Index covers the join path
# cred_reuse → Attacker.ip.
# Keyed by attacker IP (not attackers.uuid) on the write path to
# avoid the chicken-and-egg of landing a credential before the
# profiler has minted the Attacker. The profiler backfills
# ``attacker_uuid`` once it knows the IP, so cross-IP reuse queries
# eventually have an indexed FK to traverse.
attacker_ip: str = Field(index=True)
attacker_uuid: Optional[str] = Field(
default=None, foreign_key="attackers.uuid", index=True
)
decky_name: str = Field(index=True)
service: str = Field(index=True)
principal: Optional[str] = Field(default=None, index=True, max_length=256)
@@ -107,6 +115,77 @@ class Credential(SQLModel, table=True):
attempt_count: int = Field(default=1)
class CredentialReuse(SQLModel, table=True):
"""One observed credential reuse pattern across deckies and/or services.
A row here is a *finding* produced by the correlator: the same
``(secret_sha256, secret_kind, principal)`` tuple was observed
against ``target_count`` distinct decky×service pairs. Upserted on
that natural key — the row accumulates new deckies/services/IPs
over time as the credential is reused.
The ``confidence`` column is reserved for a future fuzzy-match pass
(credential variants, e.g. ``hunter2`` vs ``hunter22``); rows
written by the exact-secret correlator are always 1.0.
"""
__tablename__ = "credential_reuse"
__table_args__ = (
UniqueConstraint(
"secret_sha256", "secret_kind", "principal_key",
name="uq_credential_reuse_secret_principal",
),
)
id: str = Field(primary_key=True, max_length=36)
secret_sha256: str = Field(index=True, max_length=64)
secret_kind: str = Field(index=True, max_length=32)
# Optional human-readable principal (e.g. "root"). Nullable — for
# cross-principal reuse rows we leave this null, but we still need
# a unique constraint, so ``principal_key`` is the non-null
# canonicalised form ("" when principal is null) used in the
# uniqueness tuple. SQLite's NULLs-distinct-in-UNIQUE behaviour
# would otherwise let duplicate null-principal rows through.
principal: Optional[str] = Field(default=None, max_length=256)
principal_key: str = Field(default="", max_length=256)
attacker_uuids: str = Field(
default="[]",
sa_column=Column("attacker_uuids", _BIG_TEXT, nullable=False, default="[]"),
) # JSON list[str]
attacker_ips: str = Field(
default="[]",
sa_column=Column("attacker_ips", _BIG_TEXT, nullable=False, default="[]"),
) # JSON list[str]
deckies: str = Field(
default="[]",
sa_column=Column("deckies", _BIG_TEXT, nullable=False, default="[]"),
) # JSON list[str]
services: str = Field(
default="[]",
sa_column=Column("services", _BIG_TEXT, nullable=False, default="[]"),
) # JSON list[str]
# COUNT(DISTINCT decky||':'||service). The discriminative scalar
# for ranking and filtering — a credential seen on 12 targets is
# far more interesting than one seen on 2.
target_count: int = Field(default=0, index=True)
attempt_count: int = Field(default=0)
confidence: float = Field(default=1.0)
first_seen: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc), index=True
)
last_seen: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc), index=True
)
updated_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc), index=True
)
class CredentialReuseResponse(BaseModel):
total: int
limit: int
offset: int
data: List[dict[str, Any]]
class State(SQLModel, table=True):
__tablename__ = "state"
key: str = Field(primary_key=True)

View File

@@ -153,12 +153,59 @@ class BaseRepository(ABC):
pass
@abstractmethod
async def get_credential_reuse(
async def get_credential_attempts_for_secret(
self, secret_sha256: str
) -> list[dict[str, Any]]:
"""Every (attacker, decky, service, principal) row sharing this secret hash."""
pass
@abstractmethod
async def upsert_credential_reuse(
self,
*,
secret_sha256: str,
secret_kind: str,
principal: Optional[str],
attacker_uuid: Optional[str],
attacker_ip: str,
decky: str,
service: str,
attempt_count: int,
ts: Optional[Any] = None,
) -> Optional[dict[str, Any]]:
"""Upsert one credential-reuse finding. Returns the row dict (with
``inserted: bool`` mixed in) on insert/update, or None if the row
is below the reuse threshold and shouldn't be persisted yet.
"""
pass
@abstractmethod
async def list_credential_reuses(
self,
limit: int = 50,
offset: int = 0,
min_target_count: int = 2,
secret_kind: Optional[str] = None,
) -> tuple[int, list[dict[str, Any]]]:
"""Paged list of credential-reuse findings ordered by target_count desc."""
pass
@abstractmethod
async def get_credential_reuse_by_id(
self, reuse_id: str
) -> Optional[dict[str, Any]]:
"""One credential-reuse finding by UUID, or None."""
pass
@abstractmethod
async def update_credential_attacker_uuid(
self, attacker_ip: str, attacker_uuid: str
) -> int:
"""Backfill ``attacker_uuid`` on every Credential row matching the IP
whose ``attacker_uuid`` is currently null. Returns rows updated.
"""
pass
@abstractmethod
async def get_state(self, key: str) -> Optional[dict[str, Any]]:
"""Retrieve a specific state entry by key."""

View File

@@ -32,6 +32,7 @@ from decnet.web.db.models import (
Log,
Bounty,
Credential,
CredentialReuse,
State,
Attacker,
AttackerBehavior,
@@ -684,7 +685,7 @@ class SQLModelRepository(BaseRepository):
out.append(d)
return out
async def get_credential_reuse(
async def get_credential_attempts_for_secret(
self, secret_sha256: str
) -> List[dict[str, Any]]:
"""Every (attacker_ip, decky, service, principal) row sharing this
@@ -706,6 +707,197 @@ class SQLModelRepository(BaseRepository):
out.append(d)
return out
# ─── credential reuse (findings) ──────────────────────────────────────
async def update_credential_attacker_uuid(
self, attacker_ip: str, attacker_uuid: str
) -> int:
"""Backfill ``attacker_uuid`` on every Credential row matching the
given IP whose ``attacker_uuid`` is currently null. Run by the
profiler after it mints/updates an Attacker row.
"""
async with self._session() as session:
result = await session.execute(
update(Credential)
.where(
Credential.attacker_ip == attacker_ip,
Credential.attacker_uuid.is_(None),
)
.values(attacker_uuid=attacker_uuid)
)
await session.commit()
return int(result.rowcount or 0)
@staticmethod
def _merge_unique(existing_json: str, value: Optional[str]) -> tuple[str, bool]:
"""Append ``value`` to a JSON list[str] column if not present.
Returns (new_json, changed). None values and duplicates are skipped.
"""
if value is None:
return existing_json, False
try:
current = json.loads(existing_json) if existing_json else []
if not isinstance(current, list):
current = []
except (json.JSONDecodeError, TypeError):
current = []
if value in current:
return existing_json, False
current.append(value)
return json.dumps(current, ensure_ascii=True), True
async def upsert_credential_reuse(
self,
*,
secret_sha256: str,
secret_kind: str,
principal: Optional[str],
attacker_uuid: Optional[str],
attacker_ip: str,
decky: str,
service: str,
attempt_count: int,
ts: Optional[datetime] = None,
) -> Optional[dict[str, Any]]:
"""Upsert a credential-reuse finding.
The row is keyed by ``(secret_sha256, secret_kind, principal_key)``
— ``principal_key`` is the canonicalised non-null form ("" when
principal is null) so the unique constraint behaves the same on
SQLite and MySQL.
Returns the row dict augmented with ``inserted: bool`` and
``changed: bool`` so the correlator can decide whether to publish
a bus event.
"""
principal_key = principal or ""
now = ts or datetime.now(timezone.utc)
async with self._session() as session:
existing = (await session.execute(
select(CredentialReuse).where(
CredentialReuse.secret_sha256 == secret_sha256,
CredentialReuse.secret_kind == secret_kind,
CredentialReuse.principal_key == principal_key,
)
)).scalar_one_or_none()
if existing is None:
row = CredentialReuse(
id=str(uuid.uuid4()),
secret_sha256=secret_sha256,
secret_kind=secret_kind,
principal=principal,
principal_key=principal_key,
attacker_uuids=json.dumps(
[attacker_uuid] if attacker_uuid else [], ensure_ascii=True
),
attacker_ips=json.dumps([attacker_ip], ensure_ascii=True),
deckies=json.dumps([decky], ensure_ascii=True),
services=json.dumps([service], ensure_ascii=True),
target_count=1,
attempt_count=int(attempt_count),
confidence=1.0,
first_seen=now,
last_seen=now,
updated_at=now,
)
session.add(row)
await session.commit()
await session.refresh(row)
d = row.model_dump(mode="json")
d["inserted"] = True
d["changed"] = True
return d
changed = False
new_uuids, c1 = self._merge_unique(existing.attacker_uuids, attacker_uuid)
new_ips, c2 = self._merge_unique(existing.attacker_ips, attacker_ip)
new_deckies, c3 = self._merge_unique(existing.deckies, decky)
new_services, c4 = self._merge_unique(existing.services, service)
existing.attacker_uuids = new_uuids
existing.attacker_ips = new_ips
if c3 or c4:
existing.deckies = new_deckies
existing.services = new_services
# Recount target tuples from the underlying credentials
# table — a (decky, service) tuple only counts when both
# were observed together, which the JSON lists alone
# can't tell us.
stmt = (
select(func.count(func.distinct(
Credential.decky_name + ":" + Credential.service
)))
.where(
Credential.secret_sha256 == secret_sha256,
Credential.secret_kind == secret_kind,
(Credential.principal == principal) if principal is not None
else Credential.principal.is_(None),
)
)
target_count = (await session.execute(stmt)).scalar() or 0
existing.target_count = int(target_count)
existing.attempt_count = (existing.attempt_count or 0) + int(attempt_count)
existing.last_seen = now
existing.updated_at = now
if c1 or c2 or c3 or c4:
changed = True
session.add(existing)
await session.commit()
await session.refresh(existing)
d = existing.model_dump(mode="json")
d["inserted"] = False
d["changed"] = changed
return d
async def list_credential_reuses(
self,
limit: int = 50,
offset: int = 0,
min_target_count: int = 2,
secret_kind: Optional[str] = None,
) -> tuple[int, List[dict[str, Any]]]:
async with self._session() as session:
base = select(CredentialReuse).where(
CredentialReuse.target_count >= min_target_count
)
if secret_kind:
base = base.where(CredentialReuse.secret_kind == secret_kind)
total_stmt = select(func.count()).select_from(base.subquery())
total = (await session.execute(total_stmt)).scalar() or 0
list_stmt = (
base.order_by(desc(CredentialReuse.target_count),
desc(CredentialReuse.last_seen))
.offset(offset).limit(limit)
)
rows = (await session.execute(list_stmt)).scalars().all()
out: List[dict[str, Any]] = []
for r in rows:
d = r.model_dump(mode="json")
for key in ("attacker_uuids", "attacker_ips", "deckies", "services"):
try:
d[key] = json.loads(d[key])
except (json.JSONDecodeError, TypeError):
d[key] = []
out.append(d)
return int(total), out
async def get_credential_reuse_by_id(
self, reuse_id: str
) -> Optional[dict[str, Any]]:
async with self._session() as session:
row = (await session.execute(
select(CredentialReuse).where(CredentialReuse.id == reuse_id)
)).scalar_one_or_none()
if row is None:
return None
d = row.model_dump(mode="json")
for key in ("attacker_uuids", "attacker_ips", "deckies", "services"):
try:
d[key] = json.loads(d[key])
except (json.JSONDecodeError, TypeError):
d[key] = []
return d
async def get_state(self, key: str) -> Optional[dict[str, Any]]:
async with self._session() as session:
statement = select(State).where(State.key == key)