feat(swarm): add SwarmHost and DeckyShard tables + repo CRUD
Introduces the master-side persistence layer for swarm mode: - SwarmHost: enrolled worker metadata, cert fingerprint, heartbeat. - DeckyShard: per-decky host assignment, state, last error. Repo methods are added as default-raising on BaseRepository so unihost deployments are untouched; SQLModelRepository implements them (shared between the sqlite and mysql subclasses per the existing pattern).
This commit is contained in:
@@ -103,6 +103,40 @@ class Attacker(SQLModel, table=True):
|
||||
)
|
||||
|
||||
|
||||
class SwarmHost(SQLModel, table=True):
|
||||
"""A worker host enrolled into a DECNET swarm.
|
||||
|
||||
Rows exist only on the master. Populated by `decnet swarm enroll` and
|
||||
read by the swarm controller when sharding deckies onto workers.
|
||||
"""
|
||||
__tablename__ = "swarm_hosts"
|
||||
uuid: str = Field(primary_key=True)
|
||||
name: str = Field(index=True, unique=True)
|
||||
address: str # IP or hostname reachable by the master
|
||||
agent_port: int = Field(default=8765)
|
||||
status: str = Field(default="enrolled", index=True)
|
||||
# ISO-8601 string of the last successful agent /health probe
|
||||
last_heartbeat: Optional[datetime] = Field(default=None)
|
||||
client_cert_fingerprint: str # SHA-256 hex of worker's issued client cert
|
||||
# Directory on the master where the per-worker cert bundle lives
|
||||
cert_bundle_path: str
|
||||
enrolled_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
||||
notes: Optional[str] = Field(default=None, sa_column=Column("notes", Text, nullable=True))
|
||||
|
||||
|
||||
class DeckyShard(SQLModel, table=True):
|
||||
"""Mapping of a single decky to the worker host running it (swarm mode)."""
|
||||
__tablename__ = "decky_shards"
|
||||
decky_name: str = Field(primary_key=True)
|
||||
host_uuid: str = Field(foreign_key="swarm_hosts.uuid", index=True)
|
||||
# JSON list of service names running on this decky (snapshot of assignment).
|
||||
services: str = Field(sa_column=Column("services", _BIG_TEXT, nullable=False, default="[]"))
|
||||
state: str = Field(default="pending", index=True) # pending|running|failed|torn_down
|
||||
last_error: Optional[str] = Field(default=None, sa_column=Column("last_error", Text, nullable=True))
|
||||
compose_hash: Optional[str] = Field(default=None)
|
||||
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
||||
|
||||
|
||||
class AttackerBehavior(SQLModel, table=True):
|
||||
"""
|
||||
Timing & behavioral profile for an attacker, joined to Attacker by uuid.
|
||||
|
||||
@@ -197,3 +197,34 @@ class BaseRepository(ABC):
|
||||
async def get_attacker_artifacts(self, uuid: str) -> list[dict[str, Any]]:
|
||||
"""Return `file_captured` log rows for this attacker, newest first."""
|
||||
pass
|
||||
|
||||
# ------------------------------------------------------------- swarm
|
||||
# Swarm methods have default no-op / empty implementations so existing
|
||||
# subclasses and non-swarm deployments continue to work without change.
|
||||
|
||||
async def add_swarm_host(self, data: dict[str, Any]) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
async def get_swarm_host_by_name(self, name: str) -> Optional[dict[str, Any]]:
|
||||
raise NotImplementedError
|
||||
|
||||
async def get_swarm_host_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]:
|
||||
raise NotImplementedError
|
||||
|
||||
async def list_swarm_hosts(self, status: Optional[str] = None) -> list[dict[str, Any]]:
|
||||
raise NotImplementedError
|
||||
|
||||
async def update_swarm_host(self, uuid: str, fields: dict[str, Any]) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
async def delete_swarm_host(self, uuid: str) -> bool:
|
||||
raise NotImplementedError
|
||||
|
||||
async def upsert_decky_shard(self, data: dict[str, Any]) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
async def list_decky_shards(self, host_uuid: Optional[str] = None) -> list[dict[str, Any]]:
|
||||
raise NotImplementedError
|
||||
|
||||
async def delete_decky_shards_for_host(self, host_uuid: str) -> int:
|
||||
raise NotImplementedError
|
||||
|
||||
@@ -27,7 +27,16 @@ from decnet.config import load_state
|
||||
from decnet.env import DECNET_ADMIN_USER, DECNET_ADMIN_PASSWORD
|
||||
from decnet.web.auth import get_password_hash
|
||||
from decnet.web.db.repository import BaseRepository
|
||||
from decnet.web.db.models import User, Log, Bounty, State, Attacker, AttackerBehavior
|
||||
from decnet.web.db.models import (
|
||||
User,
|
||||
Log,
|
||||
Bounty,
|
||||
State,
|
||||
Attacker,
|
||||
AttackerBehavior,
|
||||
SwarmHost,
|
||||
DeckyShard,
|
||||
)
|
||||
|
||||
|
||||
from contextlib import asynccontextmanager
|
||||
@@ -753,3 +762,102 @@ class SQLModelRepository(BaseRepository):
|
||||
.limit(200)
|
||||
)
|
||||
return [r.model_dump(mode="json") for r in rows.scalars().all()]
|
||||
|
||||
# ------------------------------------------------------------- swarm
|
||||
|
||||
async def add_swarm_host(self, data: dict[str, Any]) -> None:
|
||||
async with self._session() as session:
|
||||
session.add(SwarmHost(**data))
|
||||
await session.commit()
|
||||
|
||||
async def get_swarm_host_by_name(self, name: str) -> Optional[dict[str, Any]]:
|
||||
async with self._session() as session:
|
||||
result = await session.execute(select(SwarmHost).where(SwarmHost.name == name))
|
||||
row = result.scalar_one_or_none()
|
||||
return row.model_dump(mode="json") if row else None
|
||||
|
||||
async def get_swarm_host_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]:
|
||||
async with self._session() as session:
|
||||
result = await session.execute(select(SwarmHost).where(SwarmHost.uuid == uuid))
|
||||
row = result.scalar_one_or_none()
|
||||
return row.model_dump(mode="json") if row else None
|
||||
|
||||
async def list_swarm_hosts(self, status: Optional[str] = None) -> list[dict[str, Any]]:
|
||||
statement = select(SwarmHost).order_by(asc(SwarmHost.name))
|
||||
if status:
|
||||
statement = statement.where(SwarmHost.status == status)
|
||||
async with self._session() as session:
|
||||
result = await session.execute(statement)
|
||||
return [r.model_dump(mode="json") for r in result.scalars().all()]
|
||||
|
||||
async def update_swarm_host(self, uuid: str, fields: dict[str, Any]) -> None:
|
||||
if not fields:
|
||||
return
|
||||
async with self._session() as session:
|
||||
await session.execute(
|
||||
update(SwarmHost).where(SwarmHost.uuid == uuid).values(**fields)
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
async def delete_swarm_host(self, uuid: str) -> bool:
|
||||
async with self._session() as session:
|
||||
# Clean up child shards first (no ON DELETE CASCADE portable across dialects).
|
||||
await session.execute(
|
||||
text("DELETE FROM decky_shards WHERE host_uuid = :u"), {"u": uuid}
|
||||
)
|
||||
result = await session.execute(
|
||||
select(SwarmHost).where(SwarmHost.uuid == uuid)
|
||||
)
|
||||
host = result.scalar_one_or_none()
|
||||
if not host:
|
||||
await session.commit()
|
||||
return False
|
||||
await session.delete(host)
|
||||
await session.commit()
|
||||
return True
|
||||
|
||||
async def upsert_decky_shard(self, data: dict[str, Any]) -> None:
|
||||
payload = {**data, "updated_at": datetime.now(timezone.utc)}
|
||||
if isinstance(payload.get("services"), list):
|
||||
payload["services"] = orjson.dumps(payload["services"]).decode()
|
||||
async with self._session() as session:
|
||||
result = await session.execute(
|
||||
select(DeckyShard).where(DeckyShard.decky_name == payload["decky_name"])
|
||||
)
|
||||
existing = result.scalar_one_or_none()
|
||||
if existing:
|
||||
for k, v in payload.items():
|
||||
setattr(existing, k, v)
|
||||
session.add(existing)
|
||||
else:
|
||||
session.add(DeckyShard(**payload))
|
||||
await session.commit()
|
||||
|
||||
async def list_decky_shards(
|
||||
self, host_uuid: Optional[str] = None
|
||||
) -> list[dict[str, Any]]:
|
||||
statement = select(DeckyShard).order_by(asc(DeckyShard.decky_name))
|
||||
if host_uuid:
|
||||
statement = statement.where(DeckyShard.host_uuid == host_uuid)
|
||||
async with self._session() as session:
|
||||
result = await session.execute(statement)
|
||||
out: list[dict[str, Any]] = []
|
||||
for r in result.scalars().all():
|
||||
d = r.model_dump(mode="json")
|
||||
raw = d.get("services")
|
||||
if isinstance(raw, str):
|
||||
try:
|
||||
d["services"] = json.loads(raw)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
d["services"] = []
|
||||
out.append(d)
|
||||
return out
|
||||
|
||||
async def delete_decky_shards_for_host(self, host_uuid: str) -> int:
|
||||
async with self._session() as session:
|
||||
result = await session.execute(
|
||||
text("DELETE FROM decky_shards WHERE host_uuid = :u"),
|
||||
{"u": host_uuid},
|
||||
)
|
||||
await session.commit()
|
||||
return result.rowcount or 0
|
||||
|
||||
Reference in New Issue
Block a user