diff --git a/decnet/web/db/models.py b/decnet/web/db/models.py index a98654b..aec1735 100644 --- a/decnet/web/db/models.py +++ b/decnet/web/db/models.py @@ -103,6 +103,40 @@ class Attacker(SQLModel, table=True): ) +class SwarmHost(SQLModel, table=True): + """A worker host enrolled into a DECNET swarm. + + Rows exist only on the master. Populated by `decnet swarm enroll` and + read by the swarm controller when sharding deckies onto workers. + """ + __tablename__ = "swarm_hosts" + uuid: str = Field(primary_key=True) + name: str = Field(index=True, unique=True) + address: str # IP or hostname reachable by the master + agent_port: int = Field(default=8765) + status: str = Field(default="enrolled", index=True) + # ISO-8601 string of the last successful agent /health probe + last_heartbeat: Optional[datetime] = Field(default=None) + client_cert_fingerprint: str # SHA-256 hex of worker's issued client cert + # Directory on the master where the per-worker cert bundle lives + cert_bundle_path: str + enrolled_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + notes: Optional[str] = Field(default=None, sa_column=Column("notes", Text, nullable=True)) + + +class DeckyShard(SQLModel, table=True): + """Mapping of a single decky to the worker host running it (swarm mode).""" + __tablename__ = "decky_shards" + decky_name: str = Field(primary_key=True) + host_uuid: str = Field(foreign_key="swarm_hosts.uuid", index=True) + # JSON list of service names running on this decky (snapshot of assignment). + services: str = Field(sa_column=Column("services", _BIG_TEXT, nullable=False, default="[]")) + state: str = Field(default="pending", index=True) # pending|running|failed|torn_down + last_error: Optional[str] = Field(default=None, sa_column=Column("last_error", Text, nullable=True)) + compose_hash: Optional[str] = Field(default=None) + updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + + class AttackerBehavior(SQLModel, table=True): """ Timing & behavioral profile for an attacker, joined to Attacker by uuid. diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index 07f5e8a..1269a06 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -197,3 +197,34 @@ class BaseRepository(ABC): async def get_attacker_artifacts(self, uuid: str) -> list[dict[str, Any]]: """Return `file_captured` log rows for this attacker, newest first.""" pass + + # ------------------------------------------------------------- swarm + # Swarm methods have default no-op / empty implementations so existing + # subclasses and non-swarm deployments continue to work without change. + + async def add_swarm_host(self, data: dict[str, Any]) -> None: + raise NotImplementedError + + async def get_swarm_host_by_name(self, name: str) -> Optional[dict[str, Any]]: + raise NotImplementedError + + async def get_swarm_host_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]: + raise NotImplementedError + + async def list_swarm_hosts(self, status: Optional[str] = None) -> list[dict[str, Any]]: + raise NotImplementedError + + async def update_swarm_host(self, uuid: str, fields: dict[str, Any]) -> None: + raise NotImplementedError + + async def delete_swarm_host(self, uuid: str) -> bool: + raise NotImplementedError + + async def upsert_decky_shard(self, data: dict[str, Any]) -> None: + raise NotImplementedError + + async def list_decky_shards(self, host_uuid: Optional[str] = None) -> list[dict[str, Any]]: + raise NotImplementedError + + async def delete_decky_shards_for_host(self, host_uuid: str) -> int: + raise NotImplementedError diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index e6cb46f..b7064cc 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -27,7 +27,16 @@ from decnet.config import load_state from decnet.env import DECNET_ADMIN_USER, DECNET_ADMIN_PASSWORD from decnet.web.auth import get_password_hash from decnet.web.db.repository import BaseRepository -from decnet.web.db.models import User, Log, Bounty, State, Attacker, AttackerBehavior +from decnet.web.db.models import ( + User, + Log, + Bounty, + State, + Attacker, + AttackerBehavior, + SwarmHost, + DeckyShard, +) from contextlib import asynccontextmanager @@ -753,3 +762,102 @@ class SQLModelRepository(BaseRepository): .limit(200) ) return [r.model_dump(mode="json") for r in rows.scalars().all()] + + # ------------------------------------------------------------- swarm + + async def add_swarm_host(self, data: dict[str, Any]) -> None: + async with self._session() as session: + session.add(SwarmHost(**data)) + await session.commit() + + async def get_swarm_host_by_name(self, name: str) -> Optional[dict[str, Any]]: + async with self._session() as session: + result = await session.execute(select(SwarmHost).where(SwarmHost.name == name)) + row = result.scalar_one_or_none() + return row.model_dump(mode="json") if row else None + + async def get_swarm_host_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]: + async with self._session() as session: + result = await session.execute(select(SwarmHost).where(SwarmHost.uuid == uuid)) + row = result.scalar_one_or_none() + return row.model_dump(mode="json") if row else None + + async def list_swarm_hosts(self, status: Optional[str] = None) -> list[dict[str, Any]]: + statement = select(SwarmHost).order_by(asc(SwarmHost.name)) + if status: + statement = statement.where(SwarmHost.status == status) + async with self._session() as session: + result = await session.execute(statement) + return [r.model_dump(mode="json") for r in result.scalars().all()] + + async def update_swarm_host(self, uuid: str, fields: dict[str, Any]) -> None: + if not fields: + return + async with self._session() as session: + await session.execute( + update(SwarmHost).where(SwarmHost.uuid == uuid).values(**fields) + ) + await session.commit() + + async def delete_swarm_host(self, uuid: str) -> bool: + async with self._session() as session: + # Clean up child shards first (no ON DELETE CASCADE portable across dialects). + await session.execute( + text("DELETE FROM decky_shards WHERE host_uuid = :u"), {"u": uuid} + ) + result = await session.execute( + select(SwarmHost).where(SwarmHost.uuid == uuid) + ) + host = result.scalar_one_or_none() + if not host: + await session.commit() + return False + await session.delete(host) + await session.commit() + return True + + async def upsert_decky_shard(self, data: dict[str, Any]) -> None: + payload = {**data, "updated_at": datetime.now(timezone.utc)} + if isinstance(payload.get("services"), list): + payload["services"] = orjson.dumps(payload["services"]).decode() + async with self._session() as session: + result = await session.execute( + select(DeckyShard).where(DeckyShard.decky_name == payload["decky_name"]) + ) + existing = result.scalar_one_or_none() + if existing: + for k, v in payload.items(): + setattr(existing, k, v) + session.add(existing) + else: + session.add(DeckyShard(**payload)) + await session.commit() + + async def list_decky_shards( + self, host_uuid: Optional[str] = None + ) -> list[dict[str, Any]]: + statement = select(DeckyShard).order_by(asc(DeckyShard.decky_name)) + if host_uuid: + statement = statement.where(DeckyShard.host_uuid == host_uuid) + async with self._session() as session: + result = await session.execute(statement) + out: list[dict[str, Any]] = [] + for r in result.scalars().all(): + d = r.model_dump(mode="json") + raw = d.get("services") + if isinstance(raw, str): + try: + d["services"] = json.loads(raw) + except (json.JSONDecodeError, TypeError): + d["services"] = [] + out.append(d) + return out + + async def delete_decky_shards_for_host(self, host_uuid: str) -> int: + async with self._session() as session: + result = await session.execute( + text("DELETE FROM decky_shards WHERE host_uuid = :u"), + {"u": host_uuid}, + ) + await session.commit() + return result.rowcount or 0