from datetime import datetime, timezone from typing import Literal, Optional, Any, List, Annotated from sqlalchemy import Column, Text from sqlalchemy.dialects.mysql import MEDIUMTEXT from sqlmodel import SQLModel, Field from pydantic import BaseModel, ConfigDict, Field as PydanticField, BeforeValidator from decnet.models import IniContent, DecnetConfig # Use on columns that accumulate over an attacker's lifetime (commands, # fingerprints, state blobs). TEXT on MySQL caps at 64 KiB; MEDIUMTEXT # stretches to 16 MiB. SQLite has no fixed-width text types so Text() # stays unchanged there. _BIG_TEXT = Text().with_variant(MEDIUMTEXT(), "mysql") def _normalize_null(v: Any) -> Any: if isinstance(v, str) and v.lower() in ("null", "undefined", ""): return None return v NullableDatetime = Annotated[Optional[datetime], BeforeValidator(_normalize_null)] NullableString = Annotated[Optional[str], BeforeValidator(_normalize_null)] # --- Database Tables (SQLModel) --- class User(SQLModel, table=True): __tablename__ = "users" uuid: str = Field(primary_key=True) username: str = Field(index=True, unique=True) password_hash: str role: str = Field(default="viewer") must_change_password: bool = Field(default=False) class Log(SQLModel, table=True): __tablename__ = "logs" id: Optional[int] = Field(default=None, primary_key=True) timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), index=True) decky: str = Field(index=True) service: str = Field(index=True) event_type: str = Field(index=True) attacker_ip: str = Field(index=True) # Long-text columns — use TEXT so MySQL DDL doesn't truncate to VARCHAR(255). # TEXT is equivalent to plain text in SQLite. raw_line: str = Field(sa_column=Column("raw_line", Text, nullable=False)) fields: str = Field(sa_column=Column("fields", Text, nullable=False)) msg: Optional[str] = Field(default=None, sa_column=Column("msg", Text, nullable=True)) # OTEL trace context — bridges the collector→ingester trace to the SSE # read path. Nullable so pre-existing rows and non-traced deployments # are unaffected. trace_id: Optional[str] = Field(default=None) span_id: Optional[str] = Field(default=None) class Bounty(SQLModel, table=True): __tablename__ = "bounty" id: Optional[int] = Field(default=None, primary_key=True) timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), index=True) decky: str = Field(index=True) service: str = Field(index=True) attacker_ip: str = Field(index=True) bounty_type: str = Field(index=True) payload: str = Field(sa_column=Column("payload", Text, nullable=False)) class State(SQLModel, table=True): __tablename__ = "state" key: str = Field(primary_key=True) # JSON-serialized DecnetConfig or other state blobs — can be large as # deckies/services accumulate. MEDIUMTEXT on MySQL (16 MiB ceiling). value: str = Field(sa_column=Column("value", _BIG_TEXT, nullable=False)) class Attacker(SQLModel, table=True): __tablename__ = "attackers" uuid: str = Field(primary_key=True) ip: str = Field(index=True) first_seen: datetime = Field(index=True) last_seen: datetime = Field(index=True) event_count: int = Field(default=0) service_count: int = Field(default=0) decky_count: int = Field(default=0) # JSON blobs — these grow over the attacker's lifetime. Use MEDIUMTEXT on # MySQL (16 MiB) for the fields that accumulate (fingerprints, commands, # and the deckies/services lists that are unbounded in principle). services: str = Field( default="[]", sa_column=Column("services", _BIG_TEXT, nullable=False, default="[]") ) # JSON list[str] deckies: str = Field( default="[]", sa_column=Column("deckies", _BIG_TEXT, nullable=False, default="[]") ) # JSON list[str], first-contact ordered traversal_path: Optional[str] = Field( default=None, sa_column=Column("traversal_path", Text, nullable=True) ) # "decky-01 → decky-03 → decky-05" is_traversal: bool = Field(default=False) bounty_count: int = Field(default=0) credential_count: int = Field(default=0) fingerprints: str = Field( default="[]", sa_column=Column("fingerprints", _BIG_TEXT, nullable=False, default="[]") ) # JSON list[dict] — bounty fingerprints commands: str = Field( default="[]", sa_column=Column("commands", _BIG_TEXT, nullable=False, default="[]") ) # JSON list[dict] — commands per service/decky updated_at: datetime = Field( default_factory=lambda: datetime.now(timezone.utc), index=True ) class SwarmHost(SQLModel, table=True): """A worker host enrolled into a DECNET swarm. Rows exist only on the master. Populated by `decnet swarm enroll` and read by the swarm controller when sharding deckies onto workers. """ __tablename__ = "swarm_hosts" uuid: str = Field(primary_key=True) name: str = Field(index=True, unique=True) address: str # IP or hostname reachable by the master agent_port: int = Field(default=8765) status: str = Field(default="enrolled", index=True) # ISO-8601 string of the last successful agent /health probe last_heartbeat: Optional[datetime] = Field(default=None) client_cert_fingerprint: str # SHA-256 hex of worker's issued client cert # SHA-256 hex of the updater-identity cert, if the host was enrolled # with ``--updater`` / ``issue_updater_bundle``. ``None`` for hosts # that only have an agent identity. updater_cert_fingerprint: Optional[str] = Field(default=None) # Directory on the master where the per-worker cert bundle lives cert_bundle_path: str enrolled_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) notes: Optional[str] = Field(default=None, sa_column=Column("notes", Text, nullable=True)) class DeckyShard(SQLModel, table=True): """Mapping of a single decky to the worker host running it (swarm mode).""" __tablename__ = "decky_shards" decky_name: str = Field(primary_key=True) host_uuid: str = Field(foreign_key="swarm_hosts.uuid", index=True) # JSON list of service names running on this decky (snapshot of assignment). services: str = Field(sa_column=Column("services", _BIG_TEXT, nullable=False, default="[]")) state: str = Field(default="pending", index=True) # pending|running|failed|torn_down last_error: Optional[str] = Field(default=None, sa_column=Column("last_error", Text, nullable=True)) compose_hash: Optional[str] = Field(default=None) updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class AttackerBehavior(SQLModel, table=True): """ Timing & behavioral profile for an attacker, joined to Attacker by uuid. Kept in a separate table so the core Attacker row stays narrow and behavior data can be updated independently (e.g. as the sniffer observes more packets) without touching the event-count aggregates. """ __tablename__ = "attacker_behavior" attacker_uuid: str = Field(primary_key=True, foreign_key="attackers.uuid") # OS / TCP stack fingerprint (rolled up from sniffer events) os_guess: Optional[str] = None hop_distance: Optional[int] = None tcp_fingerprint: str = Field( default="{}", sa_column=Column("tcp_fingerprint", Text, nullable=False, default="{}"), ) # JSON: window, wscale, mss, options_sig retransmit_count: int = Field(default=0) # Behavioral (derived by the profiler from log-event timing) behavior_class: Optional[str] = None # beaconing | interactive | scanning | brute_force | slow_scan | mixed | unknown beacon_interval_s: Optional[float] = None beacon_jitter_pct: Optional[float] = None tool_guesses: Optional[str] = None # JSON list[str] — all matched tools timing_stats: str = Field( default="{}", sa_column=Column("timing_stats", Text, nullable=False, default="{}"), ) # JSON: mean/median/stdev/min/max IAT phase_sequence: str = Field( default="{}", sa_column=Column("phase_sequence", Text, nullable=False, default="{}"), ) # JSON: recon_end/exfil_start/latency updated_at: datetime = Field( default_factory=lambda: datetime.now(timezone.utc), index=True ) # --- API Request/Response Models (Pydantic) --- class Token(BaseModel): access_token: str token_type: str must_change_password: bool = False class LoginRequest(BaseModel): username: str password: str = PydanticField(..., max_length=72) class ChangePasswordRequest(BaseModel): old_password: str = PydanticField(..., max_length=72) new_password: str = PydanticField(..., max_length=72) class LogsResponse(BaseModel): total: int limit: int offset: int data: List[dict[str, Any]] class BountyResponse(BaseModel): total: int limit: int offset: int data: List[dict[str, Any]] class AttackersResponse(BaseModel): total: int limit: int offset: int data: List[dict[str, Any]] class StatsResponse(BaseModel): total_logs: int unique_attackers: int active_deckies: int deployed_deckies: int class MutateIntervalRequest(BaseModel): # Human-readable duration: where unit is m(inutes), d(ays), M(onths), y/Y(ears). # Minimum granularity is 1 minute. Seconds are not accepted. mutate_interval: Optional[str] = PydanticField(None, pattern=r"^[1-9]\d*[mdMyY]$") class DeployIniRequest(BaseModel): model_config = ConfigDict(extra="forbid") # This field now enforces strict INI structure during Pydantic initialization. # The OpenAPI schema correctly shows it as a required string. ini_content: IniContent = PydanticField(..., description="A valid INI formatted string") # --- Configuration Models --- class CreateUserRequest(BaseModel): username: str = PydanticField(..., min_length=1, max_length=64) password: str = PydanticField(..., min_length=8, max_length=72) role: Literal["admin", "viewer"] = "viewer" class UpdateUserRoleRequest(BaseModel): role: Literal["admin", "viewer"] class ResetUserPasswordRequest(BaseModel): new_password: str = PydanticField(..., min_length=8, max_length=72) class DeploymentLimitRequest(BaseModel): deployment_limit: int = PydanticField(..., ge=1, le=500) class GlobalMutationIntervalRequest(BaseModel): global_mutation_interval: str = PydanticField(..., pattern=r"^[1-9]\d*[mdMyY]$") class UserResponse(BaseModel): uuid: str username: str role: str must_change_password: bool class ConfigResponse(BaseModel): role: str deployment_limit: int global_mutation_interval: str class AdminConfigResponse(ConfigResponse): users: List[UserResponse] class ComponentHealth(BaseModel): status: Literal["ok", "failing"] detail: Optional[str] = None class HealthResponse(BaseModel): status: Literal["healthy", "degraded", "unhealthy"] components: dict[str, ComponentHealth] # --- Swarm API DTOs --- # Request/response contracts for the master-side swarm controller # (decnet/web/swarm_api.py). The underlying SQLModel tables — SwarmHost and # DeckyShard — live above; these are the HTTP-facing shapes. class SwarmEnrollRequest(BaseModel): name: str = PydanticField(..., min_length=1, max_length=128) address: str = PydanticField(..., description="IP or DNS the master uses to reach the worker") agent_port: int = PydanticField(default=8765, ge=1, le=65535) sans: list[str] = PydanticField( default_factory=list, description="Extra SANs (IPs / hostnames) to embed in the worker cert", ) notes: Optional[str] = None issue_updater_bundle: bool = PydanticField( default=False, description="If true, also issue an updater cert (CN=updater@) for the remote self-updater", ) class SwarmUpdaterBundle(BaseModel): """Subset of SwarmEnrolledBundle for the updater identity.""" fingerprint: str updater_cert_pem: str updater_key_pem: str class SwarmEnrolledBundle(BaseModel): """Cert bundle returned to the operator — must be delivered to the worker.""" host_uuid: str name: str address: str agent_port: int fingerprint: str ca_cert_pem: str worker_cert_pem: str worker_key_pem: str updater: Optional[SwarmUpdaterBundle] = None class SwarmHostView(BaseModel): uuid: str name: str address: str agent_port: int status: str last_heartbeat: Optional[datetime] = None client_cert_fingerprint: str updater_cert_fingerprint: Optional[str] = None enrolled_at: datetime notes: Optional[str] = None class DeckyShardView(BaseModel): """One decky → host mapping, enriched with the host's identity for display.""" decky_name: str host_uuid: str host_name: str host_address: str host_status: str services: list[str] state: str last_error: Optional[str] = None compose_hash: Optional[str] = None updated_at: datetime class SwarmDeployRequest(BaseModel): config: DecnetConfig dry_run: bool = False no_cache: bool = False class SwarmTeardownRequest(BaseModel): host_uuid: Optional[str] = PydanticField( default=None, description="If set, tear down only this worker; otherwise tear down all hosts", ) decky_id: Optional[str] = None class SwarmHostResult(BaseModel): host_uuid: str host_name: str ok: bool detail: Any | None = None class SwarmDeployResponse(BaseModel): results: list[SwarmHostResult] class SwarmHostHealth(BaseModel): host_uuid: str name: str address: str reachable: bool detail: Any | None = None class SwarmCheckResponse(BaseModel): results: list[SwarmHostHealth]