diff --git a/decnet/web/db/models/__init__.py b/decnet/web/db/models/__init__.py index 9907cde8..f99d396e 100644 --- a/decnet/web/db/models/__init__.py +++ b/decnet/web/db/models/__init__.py @@ -49,6 +49,10 @@ from .deploy import ( MutateIntervalRequest, PurgeResponse, ) +from .fleet import ( + LOCAL_HOST_SENTINEL, + FleetDecky, +) from .health import ( ComponentHealth, HealthResponse, @@ -182,6 +186,9 @@ __all__ = [ "DeployResponse", "MutateIntervalRequest", "PurgeResponse", + # fleet + "LOCAL_HOST_SENTINEL", + "FleetDecky", # health "ComponentHealth", "HealthResponse", diff --git a/decnet/web/db/models/fleet.py b/decnet/web/db/models/fleet.py new file mode 100644 index 00000000..0d17a369 --- /dev/null +++ b/decnet/web/db/models/fleet.py @@ -0,0 +1,72 @@ +"""Fleet decky table — DB mirror of ``decnet-state.json``. + +The legacy unihost / MACVLAN / IPVLAN deploy path persists fleet state to a +JSON file (``/var/lib/decnet/decnet-state.json``) via +:func:`decnet.config.save_state`. That file is consumed directly by +``decnet status``/``decnet teardown``, the sniffer, and the collector — all +host-local CLI / worker code that may run on a box without the API daemon. + +The FleetDecky table is a *mirror* of that JSON state inside MySQL/SQLite so +DB-only consumers (the orchestrator, the web dashboard, the REST API) can +see fleet decoys without touching the filesystem. + +Both writers — CLI ``decnet deploy`` (``engine.deployer.deploy``) and the +web/API deploy path (``web.router.fleet.api_deploy_deckies``) — write to +*both* surfaces. A reconciler (``decnet.fleet.reconciler``) handles drift. + +Schema mirrors :class:`decnet.web.db.models.swarm.DeckyShard` field-for-field +so the dashboard can render fleet rows with the same card shape. The PK is +composite ``(host_uuid, name)`` to future-proof for multi-host motherships +(a master that runs its own local fleet AND swarm-shards onto workers). In +unihost mode ``host_uuid`` defaults to the sentinel +:data:`LOCAL_HOST_SENTINEL`; we deliberately do NOT FK to ``swarm_hosts`` +because the local mothership is not enrolled as a swarm worker. +""" +from datetime import datetime, timezone +from typing import Optional + +from sqlalchemy import Column, Text +from sqlmodel import Field, SQLModel + +from ._base import _BIG_TEXT + + +LOCAL_HOST_SENTINEL = "local" + + +class FleetDecky(SQLModel, table=True): + """A unihost / MACVLAN / IPVLAN decky deployed on the local mothership. + + Disjoint from :class:`DeckyShard` (SWARM-only) and :class:`TopologyDecky` + (MazeNET-only). Composite PK lets multiple hosts coexist when a future + mothership runs both a local fleet and acts as a swarm master. + """ + __tablename__ = "fleet_deckies" + + host_uuid: str = Field( + default=LOCAL_HOST_SENTINEL, primary_key=True, index=True, + ) + name: str = Field(primary_key=True) + # JSON list of service names on this decky (snapshot of assignment). + services: str = Field( + sa_column=Column("services", _BIG_TEXT, nullable=False, default="[]") + ) + # Full serialised DeckyConfig — lets the dashboard render the same rich + # card (hostname/distro/archetype/service_config/mutate_interval) without + # round-tripping to load_state() on every page render. + decky_config: Optional[str] = Field( + default=None, sa_column=Column("decky_config", _BIG_TEXT, nullable=True) + ) + decky_ip: Optional[str] = Field(default=None) + # pending|running|failed|torn_down|degraded|tearing_down|teardown_failed + state: str = Field(default="pending", index=True) + last_error: Optional[str] = Field( + default=None, sa_column=Column("last_error", Text, nullable=True), + ) + compose_hash: Optional[str] = Field(default=None) + # Last reconciler observation (docker inspect) — lets the dashboard show + # "stale" rows whose reconciler hasn't ticked. + last_seen: Optional[datetime] = Field(default=None) + updated_at: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc) + ) diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index 65864fc9..b4c2d866 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -858,6 +858,56 @@ class BaseRepository(ABC): ``enabled=False`` and stamps ``auto_disabled_at``.""" raise NotImplementedError + # ----------------------------------------------------------------- fleet + + async def upsert_fleet_decky(self, data: dict[str, Any]) -> None: + """Insert-or-update a FleetDecky row keyed by ``(host_uuid, name)``. + + Used by ``engine.deployer.deploy`` and the API deploy path to mirror + ``decnet-state.json`` into the DB. Idempotent: calling with the same + key updates the existing row's mutable fields. + """ + raise NotImplementedError + + async def delete_fleet_decky(self, *, host_uuid: str, name: str) -> None: + """Remove a FleetDecky row. No-op if the row doesn't exist.""" + raise NotImplementedError + + async def list_fleet_deckies( + self, *, host_uuid: Optional[str] = None, + ) -> list[dict[str, Any]]: + """Return all FleetDecky rows, optionally scoped to a single host.""" + raise NotImplementedError + + async def list_running_fleet_deckies(self) -> list[dict[str, Any]]: + """Return every FleetDecky row whose ``state == 'running'``. + + Joined alongside :meth:`list_running_topology_deckies` and the SWARM + ``DeckyShard`` view by :meth:`list_running_deckies`. + """ + raise NotImplementedError + + async def update_fleet_decky_state( + self, *, host_uuid: str, name: str, state: str, + last_error: Optional[str] = None, + ) -> None: + """Update only the ``state``/``last_error``/``last_seen`` fields. + + Called by the reconciler when ``docker inspect`` reports a fresh + container state. Avoids clobbering operator-edited config fields. + """ + raise NotImplementedError + + async def list_running_deckies(self) -> list[dict[str, Any]]: + """Union of running deckies across MazeNET, SWARM, and fleet sources. + + Returns dicts shaped for the orchestrator's scheduler: + ``{uuid, name, ip, services: list[str], source}`` where ``source`` is + one of ``"topology" | "shard" | "fleet"``. Rows from each source are + normalised so the scheduler doesn't need to branch on origin. + """ + raise NotImplementedError + # ---------------------------------------------------------- orchestrator async def list_running_topology_deckies(self) -> list[dict[str, Any]]: diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index 75f03ae7..b23e46e7 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -43,6 +43,8 @@ from decnet.web.db.models import ( SmtpTarget, SwarmHost, DeckyShard, + FleetDecky, + LOCAL_HOST_SENTINEL, Topology, LAN, TopologyDecky, @@ -2023,6 +2025,135 @@ class SQLModelRepository(BaseRepository): await session.commit() return bool(result.rowcount) + # -------------------------------------------------------------- fleet + + async def upsert_fleet_decky(self, data: dict[str, Any]) -> None: + payload: dict[str, Any] = { + **data, + "updated_at": datetime.now(timezone.utc), + } + payload.setdefault("host_uuid", LOCAL_HOST_SENTINEL) + if payload.get("host_uuid") is None: + payload["host_uuid"] = LOCAL_HOST_SENTINEL + if isinstance(payload.get("services"), list): + payload["services"] = orjson.dumps(payload["services"]).decode() + if isinstance(payload.get("decky_config"), dict): + payload["decky_config"] = orjson.dumps(payload["decky_config"]).decode() + async with self._session() as session: + result = await session.execute( + select(FleetDecky).where( + FleetDecky.host_uuid == payload["host_uuid"], + FleetDecky.name == payload["name"], + ) + ) + existing = result.scalar_one_or_none() + if existing: + for k, v in payload.items(): + setattr(existing, k, v) + session.add(existing) + else: + session.add(FleetDecky(**payload)) + await session.commit() + + async def delete_fleet_decky(self, *, host_uuid: str, name: str) -> None: + async with self._session() as session: + await session.execute( + text( + "DELETE FROM fleet_deckies " + "WHERE host_uuid = :h AND name = :n" + ), + {"h": host_uuid, "n": name}, + ) + await session.commit() + + async def list_fleet_deckies( + self, *, host_uuid: Optional[str] = None, + ) -> list[dict[str, Any]]: + stmt = select(FleetDecky).order_by(asc(FleetDecky.name)) + if host_uuid: + stmt = stmt.where(FleetDecky.host_uuid == host_uuid) + async with self._session() as session: + result = await session.execute(stmt) + return [ + self._deserialize_json_fields( + r.model_dump(mode="json"), ("services", "decky_config") + ) + for r in result.scalars().all() + ] + + async def list_running_fleet_deckies(self) -> list[dict[str, Any]]: + async with self._session() as session: + result = await session.execute( + select(FleetDecky).where(FleetDecky.state == "running") + ) + return [ + self._deserialize_json_fields( + r.model_dump(mode="json"), ("services", "decky_config") + ) + for r in result.scalars().all() + ] + + async def update_fleet_decky_state( + self, *, host_uuid: str, name: str, state: str, + last_error: Optional[str] = None, + ) -> None: + now = datetime.now(timezone.utc) + values: dict[str, Any] = { + "state": state, + "updated_at": now, + "last_seen": now, + } + if last_error is not None: + values["last_error"] = last_error + async with self._session() as session: + await session.execute( + update(FleetDecky) + .where( + FleetDecky.host_uuid == host_uuid, + FleetDecky.name == name, + ) + .values(**values) + ) + await session.commit() + + async def list_running_deckies(self) -> list[dict[str, Any]]: + out: list[dict[str, Any]] = [] + # MazeNET — already shaped {uuid, name, ip, services} + for d in await self.list_running_topology_deckies(): + out.append({ + "uuid": d.get("uuid"), + "name": d.get("name"), + "ip": d.get("ip"), + "services": d.get("services") or [], + "source": "topology", + }) + # Fleet — column is `decky_ip`, PK is composite (host_uuid, name) + for d in await self.list_running_fleet_deckies(): + out.append({ + "uuid": f"{d.get('host_uuid')}:{d.get('name')}", + "name": d.get("name"), + "ip": d.get("decky_ip"), + "services": d.get("services") or [], + "source": "fleet", + }) + # SWARM — DeckyShard rows in 'running' state on enrolled workers. + async with self._session() as session: + shard_rows = await session.execute( + select(DeckyShard).where(DeckyShard.state == "running") + ) + for s in shard_rows.scalars().all(): + d = self._deserialize_json_fields( + s.model_dump(mode="json"), ("services", "decky_config") + ) + out.append({ + "uuid": f"{d.get('host_uuid')}:{d.get('decky_name')}", + "name": d.get("decky_name"), + "ip": d.get("decky_ip"), + "services": d.get("services") or [], + "source": "shard", + }) + return out + # ------------------------------------------------------------ mazenet @staticmethod