feat(db): FleetDecky table mirrors decnet-state.json into the DB

Adds a fleet_deckies table so DB-only consumers (orchestrator, web
dashboard, REST API) can see unihost / MACVLAN / IPVLAN deckies
without reading the JSON state file. Mirrors DeckyShard field-for-field.

Composite PK (host_uuid, name) future-proofs for a mothership that
runs both a local fleet and acts as a swarm master. host_uuid defaults
to the "local" sentinel — no FK to swarm_hosts because the local
mothership isn't enrolled as a worker.

Repo additions: upsert_fleet_decky, delete_fleet_decky,
list_fleet_deckies, list_running_fleet_deckies,
update_fleet_decky_state, plus list_running_deckies which unions
topology + fleet + shard sources for the orchestrator.

Smoke-tested round-trip against MySQL: upsert, list_running, union
view (source="fleet"), delete.
This commit is contained in:
2026-04-26 21:00:01 -04:00
parent 10fa8a84d1
commit 095500ae9a
4 changed files with 260 additions and 0 deletions

View File

@@ -49,6 +49,10 @@ from .deploy import (
MutateIntervalRequest,
PurgeResponse,
)
from .fleet import (
LOCAL_HOST_SENTINEL,
FleetDecky,
)
from .health import (
ComponentHealth,
HealthResponse,
@@ -182,6 +186,9 @@ __all__ = [
"DeployResponse",
"MutateIntervalRequest",
"PurgeResponse",
# fleet
"LOCAL_HOST_SENTINEL",
"FleetDecky",
# health
"ComponentHealth",
"HealthResponse",

View File

@@ -0,0 +1,72 @@
"""Fleet decky table — DB mirror of ``decnet-state.json``.
The legacy unihost / MACVLAN / IPVLAN deploy path persists fleet state to a
JSON file (``/var/lib/decnet/decnet-state.json``) via
:func:`decnet.config.save_state`. That file is consumed directly by
``decnet status``/``decnet teardown``, the sniffer, and the collector — all
host-local CLI / worker code that may run on a box without the API daemon.
The FleetDecky table is a *mirror* of that JSON state inside MySQL/SQLite so
DB-only consumers (the orchestrator, the web dashboard, the REST API) can
see fleet decoys without touching the filesystem.
Both writers — CLI ``decnet deploy`` (``engine.deployer.deploy``) and the
web/API deploy path (``web.router.fleet.api_deploy_deckies``) — write to
*both* surfaces. A reconciler (``decnet.fleet.reconciler``) handles drift.
Schema mirrors :class:`decnet.web.db.models.swarm.DeckyShard` field-for-field
so the dashboard can render fleet rows with the same card shape. The PK is
composite ``(host_uuid, name)`` to future-proof for multi-host motherships
(a master that runs its own local fleet AND swarm-shards onto workers). In
unihost mode ``host_uuid`` defaults to the sentinel
:data:`LOCAL_HOST_SENTINEL`; we deliberately do NOT FK to ``swarm_hosts``
because the local mothership is not enrolled as a swarm worker.
"""
from datetime import datetime, timezone
from typing import Optional
from sqlalchemy import Column, Text
from sqlmodel import Field, SQLModel
from ._base import _BIG_TEXT
LOCAL_HOST_SENTINEL = "local"
class FleetDecky(SQLModel, table=True):
"""A unihost / MACVLAN / IPVLAN decky deployed on the local mothership.
Disjoint from :class:`DeckyShard` (SWARM-only) and :class:`TopologyDecky`
(MazeNET-only). Composite PK lets multiple hosts coexist when a future
mothership runs both a local fleet and acts as a swarm master.
"""
__tablename__ = "fleet_deckies"
host_uuid: str = Field(
default=LOCAL_HOST_SENTINEL, primary_key=True, index=True,
)
name: str = Field(primary_key=True)
# JSON list of service names on this decky (snapshot of assignment).
services: str = Field(
sa_column=Column("services", _BIG_TEXT, nullable=False, default="[]")
)
# Full serialised DeckyConfig — lets the dashboard render the same rich
# card (hostname/distro/archetype/service_config/mutate_interval) without
# round-tripping to load_state() on every page render.
decky_config: Optional[str] = Field(
default=None, sa_column=Column("decky_config", _BIG_TEXT, nullable=True)
)
decky_ip: Optional[str] = Field(default=None)
# pending|running|failed|torn_down|degraded|tearing_down|teardown_failed
state: str = Field(default="pending", index=True)
last_error: Optional[str] = Field(
default=None, sa_column=Column("last_error", Text, nullable=True),
)
compose_hash: Optional[str] = Field(default=None)
# Last reconciler observation (docker inspect) — lets the dashboard show
# "stale" rows whose reconciler hasn't ticked.
last_seen: Optional[datetime] = Field(default=None)
updated_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc)
)

View File

@@ -858,6 +858,56 @@ class BaseRepository(ABC):
``enabled=False`` and stamps ``auto_disabled_at``."""
raise NotImplementedError
# ----------------------------------------------------------------- fleet
async def upsert_fleet_decky(self, data: dict[str, Any]) -> None:
"""Insert-or-update a FleetDecky row keyed by ``(host_uuid, name)``.
Used by ``engine.deployer.deploy`` and the API deploy path to mirror
``decnet-state.json`` into the DB. Idempotent: calling with the same
key updates the existing row's mutable fields.
"""
raise NotImplementedError
async def delete_fleet_decky(self, *, host_uuid: str, name: str) -> None:
"""Remove a FleetDecky row. No-op if the row doesn't exist."""
raise NotImplementedError
async def list_fleet_deckies(
self, *, host_uuid: Optional[str] = None,
) -> list[dict[str, Any]]:
"""Return all FleetDecky rows, optionally scoped to a single host."""
raise NotImplementedError
async def list_running_fleet_deckies(self) -> list[dict[str, Any]]:
"""Return every FleetDecky row whose ``state == 'running'``.
Joined alongside :meth:`list_running_topology_deckies` and the SWARM
``DeckyShard`` view by :meth:`list_running_deckies`.
"""
raise NotImplementedError
async def update_fleet_decky_state(
self, *, host_uuid: str, name: str, state: str,
last_error: Optional[str] = None,
) -> None:
"""Update only the ``state``/``last_error``/``last_seen`` fields.
Called by the reconciler when ``docker inspect`` reports a fresh
container state. Avoids clobbering operator-edited config fields.
"""
raise NotImplementedError
async def list_running_deckies(self) -> list[dict[str, Any]]:
"""Union of running deckies across MazeNET, SWARM, and fleet sources.
Returns dicts shaped for the orchestrator's scheduler:
``{uuid, name, ip, services: list[str], source}`` where ``source`` is
one of ``"topology" | "shard" | "fleet"``. Rows from each source are
normalised so the scheduler doesn't need to branch on origin.
"""
raise NotImplementedError
# ---------------------------------------------------------- orchestrator
async def list_running_topology_deckies(self) -> list[dict[str, Any]]:

View File

@@ -43,6 +43,8 @@ from decnet.web.db.models import (
SmtpTarget,
SwarmHost,
DeckyShard,
FleetDecky,
LOCAL_HOST_SENTINEL,
Topology,
LAN,
TopologyDecky,
@@ -2023,6 +2025,135 @@ class SQLModelRepository(BaseRepository):
await session.commit()
return bool(result.rowcount)
# -------------------------------------------------------------- fleet
async def upsert_fleet_decky(self, data: dict[str, Any]) -> None:
payload: dict[str, Any] = {
**data,
"updated_at": datetime.now(timezone.utc),
}
payload.setdefault("host_uuid", LOCAL_HOST_SENTINEL)
if payload.get("host_uuid") is None:
payload["host_uuid"] = LOCAL_HOST_SENTINEL
if isinstance(payload.get("services"), list):
payload["services"] = orjson.dumps(payload["services"]).decode()
if isinstance(payload.get("decky_config"), dict):
payload["decky_config"] = orjson.dumps(payload["decky_config"]).decode()
async with self._session() as session:
result = await session.execute(
select(FleetDecky).where(
FleetDecky.host_uuid == payload["host_uuid"],
FleetDecky.name == payload["name"],
)
)
existing = result.scalar_one_or_none()
if existing:
for k, v in payload.items():
setattr(existing, k, v)
session.add(existing)
else:
session.add(FleetDecky(**payload))
await session.commit()
async def delete_fleet_decky(self, *, host_uuid: str, name: str) -> None:
async with self._session() as session:
await session.execute(
text(
"DELETE FROM fleet_deckies "
"WHERE host_uuid = :h AND name = :n"
),
{"h": host_uuid, "n": name},
)
await session.commit()
async def list_fleet_deckies(
self, *, host_uuid: Optional[str] = None,
) -> list[dict[str, Any]]:
stmt = select(FleetDecky).order_by(asc(FleetDecky.name))
if host_uuid:
stmt = stmt.where(FleetDecky.host_uuid == host_uuid)
async with self._session() as session:
result = await session.execute(stmt)
return [
self._deserialize_json_fields(
r.model_dump(mode="json"), ("services", "decky_config")
)
for r in result.scalars().all()
]
async def list_running_fleet_deckies(self) -> list[dict[str, Any]]:
async with self._session() as session:
result = await session.execute(
select(FleetDecky).where(FleetDecky.state == "running")
)
return [
self._deserialize_json_fields(
r.model_dump(mode="json"), ("services", "decky_config")
)
for r in result.scalars().all()
]
async def update_fleet_decky_state(
self, *, host_uuid: str, name: str, state: str,
last_error: Optional[str] = None,
) -> None:
now = datetime.now(timezone.utc)
values: dict[str, Any] = {
"state": state,
"updated_at": now,
"last_seen": now,
}
if last_error is not None:
values["last_error"] = last_error
async with self._session() as session:
await session.execute(
update(FleetDecky)
.where(
FleetDecky.host_uuid == host_uuid,
FleetDecky.name == name,
)
.values(**values)
)
await session.commit()
async def list_running_deckies(self) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
# MazeNET — already shaped {uuid, name, ip, services}
for d in await self.list_running_topology_deckies():
out.append({
"uuid": d.get("uuid"),
"name": d.get("name"),
"ip": d.get("ip"),
"services": d.get("services") or [],
"source": "topology",
})
# Fleet — column is `decky_ip`, PK is composite (host_uuid, name)
for d in await self.list_running_fleet_deckies():
out.append({
"uuid": f"{d.get('host_uuid')}:{d.get('name')}",
"name": d.get("name"),
"ip": d.get("decky_ip"),
"services": d.get("services") or [],
"source": "fleet",
})
# SWARM — DeckyShard rows in 'running' state on enrolled workers.
async with self._session() as session:
shard_rows = await session.execute(
select(DeckyShard).where(DeckyShard.state == "running")
)
for s in shard_rows.scalars().all():
d = self._deserialize_json_fields(
s.model_dump(mode="json"), ("services", "decky_config")
)
out.append({
"uuid": f"{d.get('host_uuid')}:{d.get('decky_name')}",
"name": d.get("decky_name"),
"ip": d.get("decky_ip"),
"services": d.get("services") or [],
"source": "shard",
})
return out
# ------------------------------------------------------------ mazenet
@staticmethod