feat(creds): future-proof Credential storage model

Replaces the opaque Bounty.bounty_type='credential' path with a
dedicated `credentials` table whose schema is forward-compatible
across every auth-bearing service in the fleet. Hoisted indexed
columns (secret_sha256, principal, service, attacker_ip) carry the
universal reuse-analytics signal; service-specific JSON keys ride
in `fields`. Cross-service reuse queries become an indexed lookup
on secret_sha256 instead of JSON_EXTRACT scans.

Schema decisions baked in (per ANTI):
- New `Credential` table, not extension to Bounty
- Hoisted `principal` column for cross-service principal-reuse
- Standardized JSON keys: every payload carries secret_b64 +
  secret_printable + principal universally; service-specific extras
  (user, domain, dn, mech, …) ride alongside

The auth-helper SD-block emits the new shape natively. The ingester
forks at _extract_bounty:
- Native shape (SSH/Telnet, future emitters): secret_b64 present →
  direct upsert_credential
- Legacy shape (FTP/POP3/IMAP/SMTP today): username + password →
  adapter synthesizes secret_{b64,sha256,printable} on the fly,
  upserts into the same Credential table. Tracked as DEBT-039;
  one-shot bridge until those service templates migrate.

Defense-in-depth across five layers (input validation):
- C helper: bytes outside [0x20, 0x7f) collapse to '?', RFC 5424
  escape rules for \\, ", ]; b64 preserves exact bytes
- Ingester native branch: rejects malformed secret_b64 (regex), drops
  the credential row but keeps the underlying Log
- Ingester legacy adapter: same printable-ASCII filter as the C
  code; sha256 + b64 over the original utf-8 bytes (lossless, even
  when secret_printable is sanitized)
- DB column caps with truncation warning; sha256 always over the
  full pre-truncation bytes so reuse queries match across truncation
- JSON serialized with ensure_ascii=True so utf8mb4 columns stay
  safe even with non-ASCII service-specific keys

Bounty.bounty_type='credential' is no longer written. Pre-v1: no
historical backfill; existing rows stay untouched but unused.

595 tests pass; new tests cover the model + repo (upsert dedup,
null-principal independence, cross-service reuse, filters), both
ingester branches, b64 validation, sanitization preserving the
fingerprinting signal in b64.
This commit is contained in:
2026-04-25 05:29:26 -04:00
parent 50c12d9e16
commit 2f47f67eef
12 changed files with 760 additions and 63 deletions

View File

@@ -48,6 +48,8 @@ from .health import (
from .logs import (
Bounty,
BountyResponse,
Credential,
CredentialsResponse,
Log,
LogsResponse,
State,
@@ -167,6 +169,8 @@ __all__ = [
# logs
"Bounty",
"BountyResponse",
"Credential",
"CredentialsResponse",
"Log",
"LogsResponse",
"State",

View File

@@ -1,9 +1,9 @@
"""Log / Bounty / State tables + their list-response DTOs."""
"""Log / Bounty / Credential / State tables + their list-response DTOs."""
from datetime import datetime, timezone
from typing import Any, List, Optional
from pydantic import BaseModel
from sqlalchemy import Column, Text
from sqlalchemy import Column, Index, Text
from sqlmodel import Field, SQLModel
from ._base import _BIG_TEXT
@@ -40,6 +40,58 @@ class Bounty(SQLModel, table=True):
payload: str = Field(sa_column=Column("payload", Text, nullable=False))
class Credential(SQLModel, table=True):
"""One observed credential attempt against a decky service.
Forward-compatible across every auth-bearing service in the fleet:
SSH user+pass, Telnet user+pass, SMTP domain+pass, LDAP dn+pass,
Redis password-only, etc. The two universal lossless representations
(``secret_b64`` + ``secret_sha256``) hoist to indexed columns so
cross-service reuse queries don't scan opaque JSON.
Per-service identity (the human-meaningful "who's authenticating")
lives in ``principal`` — username for SSH, domain for SMTP, dn for
LDAP. Nullable for principal-less mechanisms (Redis AUTH, bearer
tokens). Fully service-specific keys ride in ``fields`` JSON.
Dedup contract: same (attacker_uuid, decky, service, secret_sha256,
principal_or_empty) tuple → upsert, bumps ``attempt_count`` and
``last_seen``. Different secret or different principal → new row.
"""
__tablename__ = "credentials"
__table_args__ = (
Index("ix_credentials_secret_service", "secret_sha256", "service"),
Index("ix_credentials_principal_service", "principal", "service"),
)
id: Optional[int] = Field(default=None, primary_key=True)
# Keyed by attacker IP (not attackers.uuid) to match Bounty's pattern
# and avoid the chicken-and-egg of writing a credential row before
# the profiler has minted the Attacker. Index covers the join path
# cred_reuse → Attacker.ip.
attacker_ip: str = Field(index=True)
decky_name: str = Field(index=True)
service: str = Field(index=True)
principal: Optional[str] = Field(default=None, index=True, max_length=256)
# Universal lossless secret representations.
secret_sha256: str = Field(index=True, max_length=64)
secret_b64: Optional[str] = Field(default=None, max_length=2048)
# Best-effort printable form — non-printable bytes collapsed to '?'
# by either auth-helper.c (SSH/Telnet) or the ingester's legacy
# adapter (FTP/POP3/IMAP/SMTP). May be lossy on non-UTF8.
secret_printable: Optional[str] = Field(default=None, max_length=512)
outcome: Optional[str] = Field(default=None, max_length=16) # success|failure|observed
fields: str = Field(
sa_column=Column("fields", _BIG_TEXT, nullable=False, default="{}")
)
first_seen: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc), index=True
)
last_seen: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc), index=True
)
attempt_count: int = Field(default=1)
class State(SQLModel, table=True):
__tablename__ = "state"
key: str = Field(primary_key=True)
@@ -62,6 +114,13 @@ class BountyResponse(BaseModel):
data: List[dict[str, Any]]
class CredentialsResponse(BaseModel):
total: int
limit: int
offset: int
data: List[dict[str, Any]]
class StatsResponse(BaseModel):
total_logs: int
unique_attackers: int