Files
DECNET/decnet/bus/topics.py
anti ce4be68501 feat(creds): cred-reuse foundation + vectorstore scaffold
Lays the storage and bus substrate for the "credential reuse patterns"
task in DEVELOPMENT.md and scaffolds decnet/vectorstore/ as the future
substrate for statistical attacker re-identification over behavioral
fingerprints. No correlator, profiler, API, or dashboard wiring in
this commit — see TODO.md for the handoff.

Schema:
  - Credential.attacker_uuid (nullable FK to attackers.uuid),
    backfilled by the profiler post-write to avoid coupling the
    capture path to the profiler's ordering.
  - CredentialReuse table — UUID PK, JSON list columns for the
    accumulating attacker_uuids/ips/deckies/services, target_count
    (the discriminative scalar), confidence reserved for a future
    fuzzy-credential pass.

Repo:
  - upsert_credential_reuse / list_credential_reuses /
    get_credential_reuse_by_id / update_credential_attacker_uuid.
  - Renamed pre-existing get_credential_reuse(secret_sha256) to
    get_credential_attempts_for_secret(secret_sha256) — the new
    findings table needs the cleaner name.

Bus topics:
  - credential.captured (one per Credential upsert)
  - credential.reuse.detected (correlator-emitted on insert/grow)

Vectorstore subpackage (decnet/vectorstore/, flat layout mirroring
decnet/bus/):
  - BaseVectorStore ABC keyed by (kind, id) — kind discriminator
    means new feature families are additive, no schema migration.
  - FakeVectorStore (in-memory L2 KNN), NullVectorStore (no-op for
    DECNET_VECTORSTORE_ENABLED=false), SqliteVecVectorStore (lazy
    sqlite_vec extension load, one vec0 virtual table per kind).
  - get_vectorstore() env-driven dispatch with graceful fallback
    to FakeVectorStore when the sqlite-vec extension isn't on the
    host, so workers don't crash on a missing optional dep.

Tests: 26 new (11 cred-reuse repo, 15 vectorstore). Existing
credentials and base-repo tests updated for the rename. Total: 34
passing on the touched files.
2026-04-26 03:18:34 -04:00

228 lines
8.8 KiB
Python

"""Canonical topic hierarchy for the DECNET ServiceBus.
Locked early so consumers can subscribe with stable wildcard patterns.
Adding new topic families is fine; **renaming** existing ones is a breaking
change for every subscriber and requires a coordinated rollout.
Token structure (NATS-style, dot-separated):
topology.{topology_id}.mutation.{state}
topology.{topology_id}.status
decky.{decky_id}.state
decky.{decky_id}.traffic
attacker.observed
attacker.scored
attacker.session.started
attacker.session.ended
credential.captured
credential.reuse.detected
system.log
system.bus.health
system.{worker}.health
Wildcards (per :func:`decnet.bus.base.matches`):
* ``*`` matches exactly one token.
* ``>`` matches one-or-more trailing tokens (so ``topology.>`` matches
``topology.abc.status`` but not the bare root ``topology``).
"""
from __future__ import annotations
# ─── Root prefixes ───────────────────────────────────────────────────────────
TOPOLOGY = "topology"
DECKY = "decky"
ATTACKER = "attacker"
SYSTEM = "system"
CREDENTIAL = "credential"
# ─── Leaf event-type constants (the last segment of each topic) ──────────────
# Topology mutation lifecycle states — keep in sync with TopologyMutation.state
# in decnet/web/db/models.py; the bus topic mirrors the DB state machine.
MUTATION_ENQUEUED = "enqueued"
MUTATION_APPLYING = "applying"
MUTATION_APPLIED = "applied"
MUTATION_FAILED = "failed"
# Topology-level status transitions (topology.{id}.status): fires when the
# topology row's status column changes (pending/deploying/active/degraded/failed).
TOPOLOGY_STATUS = "status"
# Decky-level event types (second token).
DECKY_STATE = "state"
DECKY_TRAFFIC = "traffic"
# On-demand mutation request — published by the API/CLI/UI, consumed by
# the mutator's watch loop to force an immediate mutation of one decky
# without waiting for its scheduled interval. Underscored (not dotted)
# to stay a single NATS token so the builder's validator accepts it.
DECKY_MUTATE_REQUEST = "mutate_request"
# Mutation transition event — distinct from DECKY_STATE ("current
# shape") because a mutation is a *transition* that carries old/new
# services + trigger + timing. Correlator consumes these (via the
# syslog sidechannel too) to interleave substrate-change markers into
# attacker traversals.
DECKY_MUTATION = "mutation"
# Attacker event types (second token under the ``attacker`` root). First
# sighting, session boundary transitions, and score-threshold crossings
# published by correlator + profiler. Consumers typically subscribe to
# the wildcard ``attacker.>``.
ATTACKER_OBSERVED = "observed"
ATTACKER_SCORED = "scored"
# Published once per successful active probe result (JARM/HASSH/TCPfp).
# Distinct from ``observed`` which is the correlator's first-sight signal —
# a fingerprint is additional evidence about an already-observed attacker.
ATTACKER_FINGERPRINTED = "fingerprinted"
ATTACKER_SESSION_STARTED = "session.started"
ATTACKER_SESSION_ENDED = "session.ended"
# Credential event types (second/third tokens under ``credential``).
# ``credential.captured`` fires once per upserted Credential row — the
# correlator listens for it and runs the cred-reuse query in response,
# so reuse detection latency is sub-second after a fresh capture.
# ``credential.reuse.detected`` fires when the correlator inserts a new
# CredentialReuse row or grows an existing one (added decky/service/IP).
CREDENTIAL_CAPTURED = "captured"
CREDENTIAL_REUSE_DETECTED = "reuse.detected"
# System event types.
SYSTEM_LOG = "log"
SYSTEM_BUS_HEALTH = "bus.health"
# Worker-health leaf — built per-worker as ``system.<worker>.health`` via
# :func:`system_health`. The leaf constant stays the same across workers;
# the worker name goes in the middle token.
SYSTEM_HEALTH = "health"
# Worker-control leaf — built per-worker as ``system.<worker>.control`` via
# :func:`system_control`. Admin-originated stop intents travel on this
# topic; each worker subscribes to its own.
SYSTEM_CONTROL = "control"
# Control payload ``action`` values — the wire vocabulary. Only ``stop`` is
# handled in v1; ``start`` is reserved because a stopped worker has no
# subscriber, so starting requires external supervision (systemd).
WORKER_CONTROL_STOP = "stop"
WORKER_CONTROL_START = "start"
# Webhook subscription-set changed — published by the CRUD router after any
# create / update / delete on WebhookSubscription so the webhook worker can
# reload its in-memory subscription list and re-subscribe to the new union
# of patterns. Payload is currently empty; consumers only need the signal.
WEBHOOK_SUBSCRIPTIONS_CHANGED = "system.webhook.subscriptions_changed"
# ─── Builders ────────────────────────────────────────────────────────────────
def topology_mutation(topology_id: str, state: str) -> str:
"""Build ``topology.<id>.mutation.<state>``.
*state* should be one of the ``MUTATION_*`` constants.
"""
_reject_tokens(topology_id, state)
return f"{TOPOLOGY}.{topology_id}.mutation.{state}"
def topology_status(topology_id: str) -> str:
"""Build ``topology.<id>.status``."""
_reject_tokens(topology_id)
return f"{TOPOLOGY}.{topology_id}.{TOPOLOGY_STATUS}"
def decky(decky_id: str, event_type: str) -> str:
"""Build ``decky.<id>.<event_type>``.
*event_type* is typically one of ``DECKY_STATE`` or ``DECKY_TRAFFIC``.
"""
_reject_tokens(decky_id, event_type)
return f"{DECKY}.{decky_id}.{event_type}"
def decky_mutation(decky_id: str) -> str:
"""Build ``decky.<id>.mutation``."""
_reject_tokens(decky_id)
return f"{DECKY}.{decky_id}.{DECKY_MUTATION}"
def system(event_type: str) -> str:
"""Build ``system.<event_type>``.
*event_type* may itself contain dots (e.g. ``bus.health``) — we don't
re-validate the already-constant leaves; this just prefixes.
"""
if not event_type:
raise ValueError("system topic requires a non-empty event_type")
return f"{SYSTEM}.{event_type}"
def credential(event_type: str) -> str:
"""Build ``credential.<event_type>``.
*event_type* is typically one of :data:`CREDENTIAL_CAPTURED` or
:data:`CREDENTIAL_REUSE_DETECTED`. Dotted leaves
(``reuse.detected``) are permitted — same rationale as
:func:`system`.
"""
if not event_type:
raise ValueError("credential topic requires a non-empty event_type")
return f"{CREDENTIAL}.{event_type}"
def attacker(event_type: str) -> str:
"""Build ``attacker.<event_type>``.
*event_type* is typically one of ``ATTACKER_OBSERVED``,
``ATTACKER_SCORED``, ``ATTACKER_SESSION_STARTED``,
``ATTACKER_SESSION_ENDED``. Dotted leaves (``session.started``) are
permitted — same rationale as :func:`system`.
"""
if not event_type:
raise ValueError("attacker topic requires a non-empty event_type")
return f"{ATTACKER}.{event_type}"
def system_health(worker: str) -> str:
"""Build ``system.<worker>.health``.
Worker-health heartbeats live as a nested leaf under ``system`` so
consumers can subscribe to ``system.*.health`` for every worker at
once, or to ``system.mutator.health`` for a single one. *worker* is
validated as a regular segment — no dots, wildcards, or whitespace.
"""
_reject_tokens(worker)
return f"{SYSTEM}.{worker}.{SYSTEM_HEALTH}"
def system_control(worker: str) -> str:
"""Build ``system.<worker>.control``.
Admin-originated stop (and, eventually, start) intents are published
here; the worker in question subscribes to its own address and reacts.
Payload shape::
{"action": "stop", "requested_by": "<username>", "ts": <unix>}
*action* must be one of :data:`WORKER_CONTROL_STOP` /
:data:`WORKER_CONTROL_START`; any other value is ignored by the
listener. Same segment rules as :func:`system_health`.
"""
_reject_tokens(worker)
return f"{SYSTEM}.{worker}.{SYSTEM_CONTROL}"
def _reject_tokens(*parts: str) -> None:
"""Reject topic segments that would break NATS-style tokenization.
Dots, wildcards, whitespace, and empty strings in a *segment* would
silently corrupt the hierarchy (e.g. ``topology.a.b.status`` for a
``topology_id`` of ``"a.b"``). Raise early at the builder instead of
shipping a malformed topic to the wire.
"""
for p in parts:
if not p:
raise ValueError("topic segment must not be empty")
if "." in p or "*" in p or ">" in p or any(c.isspace() for c in p):
raise ValueError(
f"topic segment {p!r} may not contain '.', '*', '>', or whitespace"
)