Files
DECNET/decnet/web/db/models.py

96 lines
3.2 KiB
Python

from datetime import datetime, timezone
from typing import Optional, Any, List, Annotated
from sqlmodel import SQLModel, Field
from pydantic import BaseModel, ConfigDict, Field as PydanticField, BeforeValidator
from decnet.models import IniContent
def _normalize_null(v: Any) -> Any:
if isinstance(v, str) and v.lower() in ("null", "undefined", ""):
return None
return v
NullableDatetime = Annotated[Optional[datetime], BeforeValidator(_normalize_null)]
NullableString = Annotated[Optional[str], BeforeValidator(_normalize_null)]
# --- Database Tables (SQLModel) ---
class User(SQLModel, table=True):
__tablename__ = "users"
uuid: str = Field(primary_key=True)
username: str = Field(index=True, unique=True)
password_hash: str
role: str = Field(default="viewer")
must_change_password: bool = Field(default=False)
class Log(SQLModel, table=True):
__tablename__ = "logs"
id: Optional[int] = Field(default=None, primary_key=True)
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), index=True)
decky: str = Field(index=True)
service: str = Field(index=True)
event_type: str = Field(index=True)
attacker_ip: str = Field(index=True)
raw_line: str
fields: str
msg: Optional[str] = None
class Bounty(SQLModel, table=True):
__tablename__ = "bounty"
id: Optional[int] = Field(default=None, primary_key=True)
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), index=True)
decky: str = Field(index=True)
service: str = Field(index=True)
attacker_ip: str = Field(index=True)
bounty_type: str = Field(index=True)
payload: str
class State(SQLModel, table=True):
__tablename__ = "state"
key: str = Field(primary_key=True)
value: str # Stores JSON serialized DecnetConfig or other state blobs
# --- API Request/Response Models (Pydantic) ---
class Token(BaseModel):
access_token: str
token_type: str
must_change_password: bool = False
class LoginRequest(BaseModel):
username: str
password: str = PydanticField(..., max_length=72)
class ChangePasswordRequest(BaseModel):
old_password: str = PydanticField(..., max_length=72)
new_password: str = PydanticField(..., max_length=72)
class LogsResponse(BaseModel):
total: int
limit: int
offset: int
data: List[dict[str, Any]]
class BountyResponse(BaseModel):
total: int
limit: int
offset: int
data: List[dict[str, Any]]
class StatsResponse(BaseModel):
total_logs: int
unique_attackers: int
active_deckies: int
deployed_deckies: int
class MutateIntervalRequest(BaseModel):
# Human-readable duration: <number><unit> where unit is m(inutes), d(ays), M(onths), y/Y(ears).
# Minimum granularity is 1 minute. Seconds are not accepted.
mutate_interval: Optional[str] = PydanticField(None, pattern=r"^[1-9]\d*[mdMyY]$")
class DeployIniRequest(BaseModel):
model_config = ConfigDict(extra="forbid")
# This field now enforces strict INI structure during Pydantic initialization.
# The OpenAPI schema correctly shows it as a required string.
ini_content: IniContent = PydanticField(..., description="A valid INI formatted string")