Adds /api/v1/swarm-updates/{hosts,push,push-self,rollback} behind
require_admin. Reuses the existing UpdaterClient + tar_working_tree + the
per-host asyncio.gather pattern from api_deploy_swarm.py; tarball is
built exactly once per /push request and fanned out to every selected
worker. /hosts filters out decommissioned hosts and agent-only
enrollments (no updater bundle = not a target).
Connection drops during /update-self are treated as success — the
updater re-execs itself mid-response, so httpx always raises.
Pydantic models live in decnet/web/db/models.py (single source of
truth). 24 tests cover happy paths, rollback, transport failures,
include_self ordering (skip on rolled-back agents), validation, and
RBAC gating.
445 lines
16 KiB
Python
445 lines
16 KiB
Python
from datetime import datetime, timezone
|
|
from typing import Literal, Optional, Any, List, Annotated
|
|
from sqlalchemy import Column, Text
|
|
from sqlalchemy.dialects.mysql import MEDIUMTEXT
|
|
from sqlmodel import SQLModel, Field
|
|
from pydantic import BaseModel, ConfigDict, Field as PydanticField, BeforeValidator
|
|
from decnet.models import IniContent, DecnetConfig
|
|
|
|
# Use on columns that accumulate over an attacker's lifetime (commands,
|
|
# fingerprints, state blobs). TEXT on MySQL caps at 64 KiB; MEDIUMTEXT
|
|
# stretches to 16 MiB. SQLite has no fixed-width text types so Text()
|
|
# stays unchanged there.
|
|
_BIG_TEXT = Text().with_variant(MEDIUMTEXT(), "mysql")
|
|
|
|
def _normalize_null(v: Any) -> Any:
|
|
if isinstance(v, str) and v.lower() in ("null", "undefined", ""):
|
|
return None
|
|
return v
|
|
|
|
NullableDatetime = Annotated[Optional[datetime], BeforeValidator(_normalize_null)]
|
|
NullableString = Annotated[Optional[str], BeforeValidator(_normalize_null)]
|
|
|
|
# --- Database Tables (SQLModel) ---
|
|
|
|
class User(SQLModel, table=True):
|
|
__tablename__ = "users"
|
|
uuid: str = Field(primary_key=True)
|
|
username: str = Field(index=True, unique=True)
|
|
password_hash: str
|
|
role: str = Field(default="viewer")
|
|
must_change_password: bool = Field(default=False)
|
|
|
|
class Log(SQLModel, table=True):
|
|
__tablename__ = "logs"
|
|
id: Optional[int] = Field(default=None, primary_key=True)
|
|
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), index=True)
|
|
decky: str = Field(index=True)
|
|
service: str = Field(index=True)
|
|
event_type: str = Field(index=True)
|
|
attacker_ip: str = Field(index=True)
|
|
# Long-text columns — use TEXT so MySQL DDL doesn't truncate to VARCHAR(255).
|
|
# TEXT is equivalent to plain text in SQLite.
|
|
raw_line: str = Field(sa_column=Column("raw_line", Text, nullable=False))
|
|
fields: str = Field(sa_column=Column("fields", Text, nullable=False))
|
|
msg: Optional[str] = Field(default=None, sa_column=Column("msg", Text, nullable=True))
|
|
# OTEL trace context — bridges the collector→ingester trace to the SSE
|
|
# read path. Nullable so pre-existing rows and non-traced deployments
|
|
# are unaffected.
|
|
trace_id: Optional[str] = Field(default=None)
|
|
span_id: Optional[str] = Field(default=None)
|
|
|
|
class Bounty(SQLModel, table=True):
|
|
__tablename__ = "bounty"
|
|
id: Optional[int] = Field(default=None, primary_key=True)
|
|
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), index=True)
|
|
decky: str = Field(index=True)
|
|
service: str = Field(index=True)
|
|
attacker_ip: str = Field(index=True)
|
|
bounty_type: str = Field(index=True)
|
|
payload: str = Field(sa_column=Column("payload", Text, nullable=False))
|
|
|
|
|
|
class State(SQLModel, table=True):
|
|
__tablename__ = "state"
|
|
key: str = Field(primary_key=True)
|
|
# JSON-serialized DecnetConfig or other state blobs — can be large as
|
|
# deckies/services accumulate. MEDIUMTEXT on MySQL (16 MiB ceiling).
|
|
value: str = Field(sa_column=Column("value", _BIG_TEXT, nullable=False))
|
|
|
|
|
|
class Attacker(SQLModel, table=True):
|
|
__tablename__ = "attackers"
|
|
uuid: str = Field(primary_key=True)
|
|
ip: str = Field(index=True)
|
|
first_seen: datetime = Field(index=True)
|
|
last_seen: datetime = Field(index=True)
|
|
event_count: int = Field(default=0)
|
|
service_count: int = Field(default=0)
|
|
decky_count: int = Field(default=0)
|
|
# JSON blobs — these grow over the attacker's lifetime. Use MEDIUMTEXT on
|
|
# MySQL (16 MiB) for the fields that accumulate (fingerprints, commands,
|
|
# and the deckies/services lists that are unbounded in principle).
|
|
services: str = Field(
|
|
default="[]", sa_column=Column("services", _BIG_TEXT, nullable=False, default="[]")
|
|
) # JSON list[str]
|
|
deckies: str = Field(
|
|
default="[]", sa_column=Column("deckies", _BIG_TEXT, nullable=False, default="[]")
|
|
) # JSON list[str], first-contact ordered
|
|
traversal_path: Optional[str] = Field(
|
|
default=None, sa_column=Column("traversal_path", Text, nullable=True)
|
|
) # "decky-01 → decky-03 → decky-05"
|
|
is_traversal: bool = Field(default=False)
|
|
bounty_count: int = Field(default=0)
|
|
credential_count: int = Field(default=0)
|
|
fingerprints: str = Field(
|
|
default="[]", sa_column=Column("fingerprints", _BIG_TEXT, nullable=False, default="[]")
|
|
) # JSON list[dict] — bounty fingerprints
|
|
commands: str = Field(
|
|
default="[]", sa_column=Column("commands", _BIG_TEXT, nullable=False, default="[]")
|
|
) # JSON list[dict] — commands per service/decky
|
|
updated_at: datetime = Field(
|
|
default_factory=lambda: datetime.now(timezone.utc), index=True
|
|
)
|
|
|
|
|
|
class SwarmHost(SQLModel, table=True):
|
|
"""A worker host enrolled into a DECNET swarm.
|
|
|
|
Rows exist only on the master. Populated by `decnet swarm enroll` and
|
|
read by the swarm controller when sharding deckies onto workers.
|
|
"""
|
|
__tablename__ = "swarm_hosts"
|
|
uuid: str = Field(primary_key=True)
|
|
name: str = Field(index=True, unique=True)
|
|
address: str # IP or hostname reachable by the master
|
|
agent_port: int = Field(default=8765)
|
|
status: str = Field(default="enrolled", index=True)
|
|
# ISO-8601 string of the last successful agent /health probe
|
|
last_heartbeat: Optional[datetime] = Field(default=None)
|
|
client_cert_fingerprint: str # SHA-256 hex of worker's issued client cert
|
|
# SHA-256 hex of the updater-identity cert, if the host was enrolled
|
|
# with ``--updater`` / ``issue_updater_bundle``. ``None`` for hosts
|
|
# that only have an agent identity.
|
|
updater_cert_fingerprint: Optional[str] = Field(default=None)
|
|
# Directory on the master where the per-worker cert bundle lives
|
|
cert_bundle_path: str
|
|
enrolled_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
|
notes: Optional[str] = Field(default=None, sa_column=Column("notes", Text, nullable=True))
|
|
|
|
|
|
class DeckyShard(SQLModel, table=True):
|
|
"""Mapping of a single decky to the worker host running it (swarm mode)."""
|
|
__tablename__ = "decky_shards"
|
|
decky_name: str = Field(primary_key=True)
|
|
host_uuid: str = Field(foreign_key="swarm_hosts.uuid", index=True)
|
|
# JSON list of service names running on this decky (snapshot of assignment).
|
|
services: str = Field(sa_column=Column("services", _BIG_TEXT, nullable=False, default="[]"))
|
|
state: str = Field(default="pending", index=True) # pending|running|failed|torn_down
|
|
last_error: Optional[str] = Field(default=None, sa_column=Column("last_error", Text, nullable=True))
|
|
compose_hash: Optional[str] = Field(default=None)
|
|
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
|
|
|
|
|
class AttackerBehavior(SQLModel, table=True):
|
|
"""
|
|
Timing & behavioral profile for an attacker, joined to Attacker by uuid.
|
|
|
|
Kept in a separate table so the core Attacker row stays narrow and
|
|
behavior data can be updated independently (e.g. as the sniffer observes
|
|
more packets) without touching the event-count aggregates.
|
|
"""
|
|
__tablename__ = "attacker_behavior"
|
|
attacker_uuid: str = Field(primary_key=True, foreign_key="attackers.uuid")
|
|
# OS / TCP stack fingerprint (rolled up from sniffer events)
|
|
os_guess: Optional[str] = None
|
|
hop_distance: Optional[int] = None
|
|
tcp_fingerprint: str = Field(
|
|
default="{}",
|
|
sa_column=Column("tcp_fingerprint", Text, nullable=False, default="{}"),
|
|
) # JSON: window, wscale, mss, options_sig
|
|
retransmit_count: int = Field(default=0)
|
|
# Behavioral (derived by the profiler from log-event timing)
|
|
behavior_class: Optional[str] = None # beaconing | interactive | scanning | brute_force | slow_scan | mixed | unknown
|
|
beacon_interval_s: Optional[float] = None
|
|
beacon_jitter_pct: Optional[float] = None
|
|
tool_guesses: Optional[str] = None # JSON list[str] — all matched tools
|
|
timing_stats: str = Field(
|
|
default="{}",
|
|
sa_column=Column("timing_stats", Text, nullable=False, default="{}"),
|
|
) # JSON: mean/median/stdev/min/max IAT
|
|
phase_sequence: str = Field(
|
|
default="{}",
|
|
sa_column=Column("phase_sequence", Text, nullable=False, default="{}"),
|
|
) # JSON: recon_end/exfil_start/latency
|
|
updated_at: datetime = Field(
|
|
default_factory=lambda: datetime.now(timezone.utc), index=True
|
|
)
|
|
|
|
# --- API Request/Response Models (Pydantic) ---
|
|
|
|
class Token(BaseModel):
|
|
access_token: str
|
|
token_type: str
|
|
must_change_password: bool = False
|
|
|
|
class LoginRequest(BaseModel):
|
|
username: str
|
|
password: str = PydanticField(..., max_length=72)
|
|
|
|
class ChangePasswordRequest(BaseModel):
|
|
old_password: str = PydanticField(..., max_length=72)
|
|
new_password: str = PydanticField(..., max_length=72)
|
|
|
|
class LogsResponse(BaseModel):
|
|
total: int
|
|
limit: int
|
|
offset: int
|
|
data: List[dict[str, Any]]
|
|
|
|
class BountyResponse(BaseModel):
|
|
total: int
|
|
limit: int
|
|
offset: int
|
|
data: List[dict[str, Any]]
|
|
|
|
class AttackersResponse(BaseModel):
|
|
total: int
|
|
limit: int
|
|
offset: int
|
|
data: List[dict[str, Any]]
|
|
|
|
class StatsResponse(BaseModel):
|
|
total_logs: int
|
|
unique_attackers: int
|
|
active_deckies: int
|
|
deployed_deckies: int
|
|
|
|
class MutateIntervalRequest(BaseModel):
|
|
# Human-readable duration: <number><unit> where unit is m(inutes), d(ays), M(onths), y/Y(ears).
|
|
# Minimum granularity is 1 minute. Seconds are not accepted.
|
|
mutate_interval: Optional[str] = PydanticField(None, pattern=r"^[1-9]\d*[mdMyY]$")
|
|
|
|
class DeployIniRequest(BaseModel):
|
|
model_config = ConfigDict(extra="forbid")
|
|
# This field now enforces strict INI structure during Pydantic initialization.
|
|
# The OpenAPI schema correctly shows it as a required string.
|
|
ini_content: IniContent = PydanticField(..., description="A valid INI formatted string")
|
|
|
|
|
|
# --- Configuration Models ---
|
|
|
|
class CreateUserRequest(BaseModel):
|
|
username: str = PydanticField(..., min_length=1, max_length=64)
|
|
password: str = PydanticField(..., min_length=8, max_length=72)
|
|
role: Literal["admin", "viewer"] = "viewer"
|
|
|
|
class UpdateUserRoleRequest(BaseModel):
|
|
role: Literal["admin", "viewer"]
|
|
|
|
class ResetUserPasswordRequest(BaseModel):
|
|
new_password: str = PydanticField(..., min_length=8, max_length=72)
|
|
|
|
class DeploymentLimitRequest(BaseModel):
|
|
deployment_limit: int = PydanticField(..., ge=1, le=500)
|
|
|
|
class GlobalMutationIntervalRequest(BaseModel):
|
|
global_mutation_interval: str = PydanticField(..., pattern=r"^[1-9]\d*[mdMyY]$")
|
|
|
|
class UserResponse(BaseModel):
|
|
uuid: str
|
|
username: str
|
|
role: str
|
|
must_change_password: bool
|
|
|
|
class ConfigResponse(BaseModel):
|
|
role: str
|
|
deployment_limit: int
|
|
global_mutation_interval: str
|
|
|
|
class AdminConfigResponse(ConfigResponse):
|
|
users: List[UserResponse]
|
|
|
|
|
|
class ComponentHealth(BaseModel):
|
|
status: Literal["ok", "failing"]
|
|
detail: Optional[str] = None
|
|
|
|
|
|
class HealthResponse(BaseModel):
|
|
status: Literal["healthy", "degraded", "unhealthy"]
|
|
components: dict[str, ComponentHealth]
|
|
|
|
|
|
# --- Swarm API DTOs ---
|
|
# Request/response contracts for the master-side swarm controller
|
|
# (decnet/web/swarm_api.py). The underlying SQLModel tables — SwarmHost and
|
|
# DeckyShard — live above; these are the HTTP-facing shapes.
|
|
|
|
class SwarmEnrollRequest(BaseModel):
|
|
name: str = PydanticField(..., min_length=1, max_length=128)
|
|
address: str = PydanticField(..., description="IP or DNS the master uses to reach the worker")
|
|
agent_port: int = PydanticField(default=8765, ge=1, le=65535)
|
|
sans: list[str] = PydanticField(
|
|
default_factory=list,
|
|
description="Extra SANs (IPs / hostnames) to embed in the worker cert",
|
|
)
|
|
notes: Optional[str] = None
|
|
issue_updater_bundle: bool = PydanticField(
|
|
default=False,
|
|
description="If true, also issue an updater cert (CN=updater@<name>) for the remote self-updater",
|
|
)
|
|
|
|
|
|
class SwarmUpdaterBundle(BaseModel):
|
|
"""Subset of SwarmEnrolledBundle for the updater identity."""
|
|
fingerprint: str
|
|
updater_cert_pem: str
|
|
updater_key_pem: str
|
|
|
|
|
|
class SwarmEnrolledBundle(BaseModel):
|
|
"""Cert bundle returned to the operator — must be delivered to the worker."""
|
|
host_uuid: str
|
|
name: str
|
|
address: str
|
|
agent_port: int
|
|
fingerprint: str
|
|
ca_cert_pem: str
|
|
worker_cert_pem: str
|
|
worker_key_pem: str
|
|
updater: Optional[SwarmUpdaterBundle] = None
|
|
|
|
|
|
class SwarmHostView(BaseModel):
|
|
uuid: str
|
|
name: str
|
|
address: str
|
|
agent_port: int
|
|
status: str
|
|
last_heartbeat: Optional[datetime] = None
|
|
client_cert_fingerprint: str
|
|
updater_cert_fingerprint: Optional[str] = None
|
|
enrolled_at: datetime
|
|
notes: Optional[str] = None
|
|
|
|
|
|
class DeckyShardView(BaseModel):
|
|
"""One decky → host mapping, enriched with the host's identity for display."""
|
|
decky_name: str
|
|
host_uuid: str
|
|
host_name: str
|
|
host_address: str
|
|
host_status: str
|
|
services: list[str]
|
|
state: str
|
|
last_error: Optional[str] = None
|
|
compose_hash: Optional[str] = None
|
|
updated_at: datetime
|
|
|
|
|
|
class SwarmDeployRequest(BaseModel):
|
|
config: DecnetConfig
|
|
dry_run: bool = False
|
|
no_cache: bool = False
|
|
|
|
|
|
class SwarmTeardownRequest(BaseModel):
|
|
host_uuid: Optional[str] = PydanticField(
|
|
default=None,
|
|
description="If set, tear down only this worker; otherwise tear down all hosts",
|
|
)
|
|
decky_id: Optional[str] = None
|
|
|
|
|
|
class SwarmHostResult(BaseModel):
|
|
host_uuid: str
|
|
host_name: str
|
|
ok: bool
|
|
detail: Any | None = None
|
|
|
|
|
|
class SwarmDeployResponse(BaseModel):
|
|
results: list[SwarmHostResult]
|
|
|
|
|
|
class SwarmHostHealth(BaseModel):
|
|
host_uuid: str
|
|
name: str
|
|
address: str
|
|
reachable: bool
|
|
detail: Any | None = None
|
|
|
|
|
|
class SwarmCheckResponse(BaseModel):
|
|
results: list[SwarmHostHealth]
|
|
|
|
|
|
# --- Remote Updates (master → worker /updater) DTOs ---
|
|
# Powers the dashboard's Remote Updates page. The master dashboard calls
|
|
# these (auth-gated) endpoints; internally they fan out to each worker's
|
|
# updater daemon over mTLS via UpdaterClient.
|
|
|
|
class HostReleaseInfo(BaseModel):
|
|
host_uuid: str
|
|
host_name: str
|
|
address: str
|
|
reachable: bool
|
|
# These fields mirror the updater's /health payload when reachable; they
|
|
# are all Optional so an unreachable host still serializes cleanly.
|
|
agent_status: Optional[str] = None
|
|
current_sha: Optional[str] = None
|
|
previous_sha: Optional[str] = None
|
|
releases: list[dict[str, Any]] = PydanticField(default_factory=list)
|
|
detail: Optional[str] = None # populated when unreachable
|
|
|
|
|
|
class HostReleasesResponse(BaseModel):
|
|
hosts: list[HostReleaseInfo]
|
|
|
|
|
|
class PushUpdateRequest(BaseModel):
|
|
host_uuids: Optional[list[str]] = PydanticField(
|
|
default=None,
|
|
description="Target specific hosts; mutually exclusive with 'all'.",
|
|
)
|
|
all: bool = PydanticField(default=False, description="Target every non-decommissioned host with an updater bundle.")
|
|
include_self: bool = PydanticField(
|
|
default=False,
|
|
description="After a successful /update, also push /update-self to upgrade the updater itself.",
|
|
)
|
|
exclude: list[str] = PydanticField(
|
|
default_factory=list,
|
|
description="Additional tarball exclude globs (on top of the built-in defaults).",
|
|
)
|
|
|
|
|
|
class PushUpdateResult(BaseModel):
|
|
host_uuid: str
|
|
host_name: str
|
|
# updated = /update 200. rolled-back = /update 409 (auto-recovered).
|
|
# failed = transport error or non-200/409 response. self-updated = /update-self succeeded.
|
|
status: Literal["updated", "rolled-back", "failed", "self-updated", "self-failed"]
|
|
http_status: Optional[int] = None
|
|
sha: Optional[str] = None
|
|
detail: Optional[str] = None
|
|
stderr: Optional[str] = None
|
|
|
|
|
|
class PushUpdateResponse(BaseModel):
|
|
sha: str
|
|
tarball_bytes: int
|
|
results: list[PushUpdateResult]
|
|
|
|
|
|
class RollbackRequest(BaseModel):
|
|
host_uuid: str = PydanticField(..., description="Host to roll back to its previous release slot.")
|
|
|
|
|
|
class RollbackResponse(BaseModel):
|
|
host_uuid: str
|
|
host_name: str
|
|
status: Literal["rolled-back", "failed"]
|
|
http_status: Optional[int] = None
|
|
detail: Optional[str] = None
|