feat(db): FleetDecky table mirrors decnet-state.json into the DB
Adds a fleet_deckies table so DB-only consumers (orchestrator, web dashboard, REST API) can see unihost / MACVLAN / IPVLAN deckies without reading the JSON state file. Mirrors DeckyShard field-for-field. Composite PK (host_uuid, name) future-proofs for a mothership that runs both a local fleet and acts as a swarm master. host_uuid defaults to the "local" sentinel — no FK to swarm_hosts because the local mothership isn't enrolled as a worker. Repo additions: upsert_fleet_decky, delete_fleet_decky, list_fleet_deckies, list_running_fleet_deckies, update_fleet_decky_state, plus list_running_deckies which unions topology + fleet + shard sources for the orchestrator. Smoke-tested round-trip against MySQL: upsert, list_running, union view (source="fleet"), delete.
This commit is contained in:
@@ -49,6 +49,10 @@ from .deploy import (
|
||||
MutateIntervalRequest,
|
||||
PurgeResponse,
|
||||
)
|
||||
from .fleet import (
|
||||
LOCAL_HOST_SENTINEL,
|
||||
FleetDecky,
|
||||
)
|
||||
from .health import (
|
||||
ComponentHealth,
|
||||
HealthResponse,
|
||||
@@ -182,6 +186,9 @@ __all__ = [
|
||||
"DeployResponse",
|
||||
"MutateIntervalRequest",
|
||||
"PurgeResponse",
|
||||
# fleet
|
||||
"LOCAL_HOST_SENTINEL",
|
||||
"FleetDecky",
|
||||
# health
|
||||
"ComponentHealth",
|
||||
"HealthResponse",
|
||||
|
||||
72
decnet/web/db/models/fleet.py
Normal file
72
decnet/web/db/models/fleet.py
Normal file
@@ -0,0 +1,72 @@
|
||||
"""Fleet decky table — DB mirror of ``decnet-state.json``.
|
||||
|
||||
The legacy unihost / MACVLAN / IPVLAN deploy path persists fleet state to a
|
||||
JSON file (``/var/lib/decnet/decnet-state.json``) via
|
||||
:func:`decnet.config.save_state`. That file is consumed directly by
|
||||
``decnet status``/``decnet teardown``, the sniffer, and the collector — all
|
||||
host-local CLI / worker code that may run on a box without the API daemon.
|
||||
|
||||
The FleetDecky table is a *mirror* of that JSON state inside MySQL/SQLite so
|
||||
DB-only consumers (the orchestrator, the web dashboard, the REST API) can
|
||||
see fleet decoys without touching the filesystem.
|
||||
|
||||
Both writers — CLI ``decnet deploy`` (``engine.deployer.deploy``) and the
|
||||
web/API deploy path (``web.router.fleet.api_deploy_deckies``) — write to
|
||||
*both* surfaces. A reconciler (``decnet.fleet.reconciler``) handles drift.
|
||||
|
||||
Schema mirrors :class:`decnet.web.db.models.swarm.DeckyShard` field-for-field
|
||||
so the dashboard can render fleet rows with the same card shape. The PK is
|
||||
composite ``(host_uuid, name)`` to future-proof for multi-host motherships
|
||||
(a master that runs its own local fleet AND swarm-shards onto workers). In
|
||||
unihost mode ``host_uuid`` defaults to the sentinel
|
||||
:data:`LOCAL_HOST_SENTINEL`; we deliberately do NOT FK to ``swarm_hosts``
|
||||
because the local mothership is not enrolled as a swarm worker.
|
||||
"""
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional
|
||||
|
||||
from sqlalchemy import Column, Text
|
||||
from sqlmodel import Field, SQLModel
|
||||
|
||||
from ._base import _BIG_TEXT
|
||||
|
||||
|
||||
LOCAL_HOST_SENTINEL = "local"
|
||||
|
||||
|
||||
class FleetDecky(SQLModel, table=True):
|
||||
"""A unihost / MACVLAN / IPVLAN decky deployed on the local mothership.
|
||||
|
||||
Disjoint from :class:`DeckyShard` (SWARM-only) and :class:`TopologyDecky`
|
||||
(MazeNET-only). Composite PK lets multiple hosts coexist when a future
|
||||
mothership runs both a local fleet and acts as a swarm master.
|
||||
"""
|
||||
__tablename__ = "fleet_deckies"
|
||||
|
||||
host_uuid: str = Field(
|
||||
default=LOCAL_HOST_SENTINEL, primary_key=True, index=True,
|
||||
)
|
||||
name: str = Field(primary_key=True)
|
||||
# JSON list of service names on this decky (snapshot of assignment).
|
||||
services: str = Field(
|
||||
sa_column=Column("services", _BIG_TEXT, nullable=False, default="[]")
|
||||
)
|
||||
# Full serialised DeckyConfig — lets the dashboard render the same rich
|
||||
# card (hostname/distro/archetype/service_config/mutate_interval) without
|
||||
# round-tripping to load_state() on every page render.
|
||||
decky_config: Optional[str] = Field(
|
||||
default=None, sa_column=Column("decky_config", _BIG_TEXT, nullable=True)
|
||||
)
|
||||
decky_ip: Optional[str] = Field(default=None)
|
||||
# pending|running|failed|torn_down|degraded|tearing_down|teardown_failed
|
||||
state: str = Field(default="pending", index=True)
|
||||
last_error: Optional[str] = Field(
|
||||
default=None, sa_column=Column("last_error", Text, nullable=True),
|
||||
)
|
||||
compose_hash: Optional[str] = Field(default=None)
|
||||
# Last reconciler observation (docker inspect) — lets the dashboard show
|
||||
# "stale" rows whose reconciler hasn't ticked.
|
||||
last_seen: Optional[datetime] = Field(default=None)
|
||||
updated_at: datetime = Field(
|
||||
default_factory=lambda: datetime.now(timezone.utc)
|
||||
)
|
||||
@@ -858,6 +858,56 @@ class BaseRepository(ABC):
|
||||
``enabled=False`` and stamps ``auto_disabled_at``."""
|
||||
raise NotImplementedError
|
||||
|
||||
# ----------------------------------------------------------------- fleet
|
||||
|
||||
async def upsert_fleet_decky(self, data: dict[str, Any]) -> None:
|
||||
"""Insert-or-update a FleetDecky row keyed by ``(host_uuid, name)``.
|
||||
|
||||
Used by ``engine.deployer.deploy`` and the API deploy path to mirror
|
||||
``decnet-state.json`` into the DB. Idempotent: calling with the same
|
||||
key updates the existing row's mutable fields.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
async def delete_fleet_decky(self, *, host_uuid: str, name: str) -> None:
|
||||
"""Remove a FleetDecky row. No-op if the row doesn't exist."""
|
||||
raise NotImplementedError
|
||||
|
||||
async def list_fleet_deckies(
|
||||
self, *, host_uuid: Optional[str] = None,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Return all FleetDecky rows, optionally scoped to a single host."""
|
||||
raise NotImplementedError
|
||||
|
||||
async def list_running_fleet_deckies(self) -> list[dict[str, Any]]:
|
||||
"""Return every FleetDecky row whose ``state == 'running'``.
|
||||
|
||||
Joined alongside :meth:`list_running_topology_deckies` and the SWARM
|
||||
``DeckyShard`` view by :meth:`list_running_deckies`.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
async def update_fleet_decky_state(
|
||||
self, *, host_uuid: str, name: str, state: str,
|
||||
last_error: Optional[str] = None,
|
||||
) -> None:
|
||||
"""Update only the ``state``/``last_error``/``last_seen`` fields.
|
||||
|
||||
Called by the reconciler when ``docker inspect`` reports a fresh
|
||||
container state. Avoids clobbering operator-edited config fields.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
async def list_running_deckies(self) -> list[dict[str, Any]]:
|
||||
"""Union of running deckies across MazeNET, SWARM, and fleet sources.
|
||||
|
||||
Returns dicts shaped for the orchestrator's scheduler:
|
||||
``{uuid, name, ip, services: list[str], source}`` where ``source`` is
|
||||
one of ``"topology" | "shard" | "fleet"``. Rows from each source are
|
||||
normalised so the scheduler doesn't need to branch on origin.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
# ---------------------------------------------------------- orchestrator
|
||||
|
||||
async def list_running_topology_deckies(self) -> list[dict[str, Any]]:
|
||||
|
||||
@@ -43,6 +43,8 @@ from decnet.web.db.models import (
|
||||
SmtpTarget,
|
||||
SwarmHost,
|
||||
DeckyShard,
|
||||
FleetDecky,
|
||||
LOCAL_HOST_SENTINEL,
|
||||
Topology,
|
||||
LAN,
|
||||
TopologyDecky,
|
||||
@@ -2023,6 +2025,135 @@ class SQLModelRepository(BaseRepository):
|
||||
await session.commit()
|
||||
return bool(result.rowcount)
|
||||
|
||||
# -------------------------------------------------------------- fleet
|
||||
|
||||
async def upsert_fleet_decky(self, data: dict[str, Any]) -> None:
|
||||
payload: dict[str, Any] = {
|
||||
**data,
|
||||
"updated_at": datetime.now(timezone.utc),
|
||||
}
|
||||
payload.setdefault("host_uuid", LOCAL_HOST_SENTINEL)
|
||||
if payload.get("host_uuid") is None:
|
||||
payload["host_uuid"] = LOCAL_HOST_SENTINEL
|
||||
if isinstance(payload.get("services"), list):
|
||||
payload["services"] = orjson.dumps(payload["services"]).decode()
|
||||
if isinstance(payload.get("decky_config"), dict):
|
||||
payload["decky_config"] = orjson.dumps(payload["decky_config"]).decode()
|
||||
async with self._session() as session:
|
||||
result = await session.execute(
|
||||
select(FleetDecky).where(
|
||||
FleetDecky.host_uuid == payload["host_uuid"],
|
||||
FleetDecky.name == payload["name"],
|
||||
)
|
||||
)
|
||||
existing = result.scalar_one_or_none()
|
||||
if existing:
|
||||
for k, v in payload.items():
|
||||
setattr(existing, k, v)
|
||||
session.add(existing)
|
||||
else:
|
||||
session.add(FleetDecky(**payload))
|
||||
await session.commit()
|
||||
|
||||
async def delete_fleet_decky(self, *, host_uuid: str, name: str) -> None:
|
||||
async with self._session() as session:
|
||||
await session.execute(
|
||||
text(
|
||||
"DELETE FROM fleet_deckies "
|
||||
"WHERE host_uuid = :h AND name = :n"
|
||||
),
|
||||
{"h": host_uuid, "n": name},
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
async def list_fleet_deckies(
|
||||
self, *, host_uuid: Optional[str] = None,
|
||||
) -> list[dict[str, Any]]:
|
||||
stmt = select(FleetDecky).order_by(asc(FleetDecky.name))
|
||||
if host_uuid:
|
||||
stmt = stmt.where(FleetDecky.host_uuid == host_uuid)
|
||||
async with self._session() as session:
|
||||
result = await session.execute(stmt)
|
||||
return [
|
||||
self._deserialize_json_fields(
|
||||
r.model_dump(mode="json"), ("services", "decky_config")
|
||||
)
|
||||
for r in result.scalars().all()
|
||||
]
|
||||
|
||||
async def list_running_fleet_deckies(self) -> list[dict[str, Any]]:
|
||||
async with self._session() as session:
|
||||
result = await session.execute(
|
||||
select(FleetDecky).where(FleetDecky.state == "running")
|
||||
)
|
||||
return [
|
||||
self._deserialize_json_fields(
|
||||
r.model_dump(mode="json"), ("services", "decky_config")
|
||||
)
|
||||
for r in result.scalars().all()
|
||||
]
|
||||
|
||||
async def update_fleet_decky_state(
|
||||
self, *, host_uuid: str, name: str, state: str,
|
||||
last_error: Optional[str] = None,
|
||||
) -> None:
|
||||
now = datetime.now(timezone.utc)
|
||||
values: dict[str, Any] = {
|
||||
"state": state,
|
||||
"updated_at": now,
|
||||
"last_seen": now,
|
||||
}
|
||||
if last_error is not None:
|
||||
values["last_error"] = last_error
|
||||
async with self._session() as session:
|
||||
await session.execute(
|
||||
update(FleetDecky)
|
||||
.where(
|
||||
FleetDecky.host_uuid == host_uuid,
|
||||
FleetDecky.name == name,
|
||||
)
|
||||
.values(**values)
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
async def list_running_deckies(self) -> list[dict[str, Any]]:
|
||||
out: list[dict[str, Any]] = []
|
||||
# MazeNET — already shaped {uuid, name, ip, services}
|
||||
for d in await self.list_running_topology_deckies():
|
||||
out.append({
|
||||
"uuid": d.get("uuid"),
|
||||
"name": d.get("name"),
|
||||
"ip": d.get("ip"),
|
||||
"services": d.get("services") or [],
|
||||
"source": "topology",
|
||||
})
|
||||
# Fleet — column is `decky_ip`, PK is composite (host_uuid, name)
|
||||
for d in await self.list_running_fleet_deckies():
|
||||
out.append({
|
||||
"uuid": f"{d.get('host_uuid')}:{d.get('name')}",
|
||||
"name": d.get("name"),
|
||||
"ip": d.get("decky_ip"),
|
||||
"services": d.get("services") or [],
|
||||
"source": "fleet",
|
||||
})
|
||||
# SWARM — DeckyShard rows in 'running' state on enrolled workers.
|
||||
async with self._session() as session:
|
||||
shard_rows = await session.execute(
|
||||
select(DeckyShard).where(DeckyShard.state == "running")
|
||||
)
|
||||
for s in shard_rows.scalars().all():
|
||||
d = self._deserialize_json_fields(
|
||||
s.model_dump(mode="json"), ("services", "decky_config")
|
||||
)
|
||||
out.append({
|
||||
"uuid": f"{d.get('host_uuid')}:{d.get('decky_name')}",
|
||||
"name": d.get("decky_name"),
|
||||
"ip": d.get("decky_ip"),
|
||||
"services": d.get("services") or [],
|
||||
"source": "shard",
|
||||
})
|
||||
return out
|
||||
|
||||
# ------------------------------------------------------------ mazenet
|
||||
|
||||
@staticmethod
|
||||
|
||||
Reference in New Issue
Block a user