feat(db): FleetDecky table mirrors decnet-state.json into the DB

Adds a fleet_deckies table so DB-only consumers (orchestrator, web
dashboard, REST API) can see unihost / MACVLAN / IPVLAN deckies
without reading the JSON state file. Mirrors DeckyShard field-for-field.

Composite PK (host_uuid, name) future-proofs for a mothership that
runs both a local fleet and acts as a swarm master. host_uuid defaults
to the "local" sentinel — no FK to swarm_hosts because the local
mothership isn't enrolled as a worker.

Repo additions: upsert_fleet_decky, delete_fleet_decky,
list_fleet_deckies, list_running_fleet_deckies,
update_fleet_decky_state, plus list_running_deckies which unions
topology + fleet + shard sources for the orchestrator.

Smoke-tested round-trip against MySQL: upsert, list_running, union
view (source="fleet"), delete.
This commit is contained in:
2026-04-26 21:00:01 -04:00
parent 10fa8a84d1
commit 095500ae9a
4 changed files with 260 additions and 0 deletions

View File

@@ -43,6 +43,8 @@ from decnet.web.db.models import (
SmtpTarget,
SwarmHost,
DeckyShard,
FleetDecky,
LOCAL_HOST_SENTINEL,
Topology,
LAN,
TopologyDecky,
@@ -2023,6 +2025,135 @@ class SQLModelRepository(BaseRepository):
await session.commit()
return bool(result.rowcount)
# -------------------------------------------------------------- fleet
async def upsert_fleet_decky(self, data: dict[str, Any]) -> None:
payload: dict[str, Any] = {
**data,
"updated_at": datetime.now(timezone.utc),
}
payload.setdefault("host_uuid", LOCAL_HOST_SENTINEL)
if payload.get("host_uuid") is None:
payload["host_uuid"] = LOCAL_HOST_SENTINEL
if isinstance(payload.get("services"), list):
payload["services"] = orjson.dumps(payload["services"]).decode()
if isinstance(payload.get("decky_config"), dict):
payload["decky_config"] = orjson.dumps(payload["decky_config"]).decode()
async with self._session() as session:
result = await session.execute(
select(FleetDecky).where(
FleetDecky.host_uuid == payload["host_uuid"],
FleetDecky.name == payload["name"],
)
)
existing = result.scalar_one_or_none()
if existing:
for k, v in payload.items():
setattr(existing, k, v)
session.add(existing)
else:
session.add(FleetDecky(**payload))
await session.commit()
async def delete_fleet_decky(self, *, host_uuid: str, name: str) -> None:
async with self._session() as session:
await session.execute(
text(
"DELETE FROM fleet_deckies "
"WHERE host_uuid = :h AND name = :n"
),
{"h": host_uuid, "n": name},
)
await session.commit()
async def list_fleet_deckies(
self, *, host_uuid: Optional[str] = None,
) -> list[dict[str, Any]]:
stmt = select(FleetDecky).order_by(asc(FleetDecky.name))
if host_uuid:
stmt = stmt.where(FleetDecky.host_uuid == host_uuid)
async with self._session() as session:
result = await session.execute(stmt)
return [
self._deserialize_json_fields(
r.model_dump(mode="json"), ("services", "decky_config")
)
for r in result.scalars().all()
]
async def list_running_fleet_deckies(self) -> list[dict[str, Any]]:
async with self._session() as session:
result = await session.execute(
select(FleetDecky).where(FleetDecky.state == "running")
)
return [
self._deserialize_json_fields(
r.model_dump(mode="json"), ("services", "decky_config")
)
for r in result.scalars().all()
]
async def update_fleet_decky_state(
self, *, host_uuid: str, name: str, state: str,
last_error: Optional[str] = None,
) -> None:
now = datetime.now(timezone.utc)
values: dict[str, Any] = {
"state": state,
"updated_at": now,
"last_seen": now,
}
if last_error is not None:
values["last_error"] = last_error
async with self._session() as session:
await session.execute(
update(FleetDecky)
.where(
FleetDecky.host_uuid == host_uuid,
FleetDecky.name == name,
)
.values(**values)
)
await session.commit()
async def list_running_deckies(self) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
# MazeNET — already shaped {uuid, name, ip, services}
for d in await self.list_running_topology_deckies():
out.append({
"uuid": d.get("uuid"),
"name": d.get("name"),
"ip": d.get("ip"),
"services": d.get("services") or [],
"source": "topology",
})
# Fleet — column is `decky_ip`, PK is composite (host_uuid, name)
for d in await self.list_running_fleet_deckies():
out.append({
"uuid": f"{d.get('host_uuid')}:{d.get('name')}",
"name": d.get("name"),
"ip": d.get("decky_ip"),
"services": d.get("services") or [],
"source": "fleet",
})
# SWARM — DeckyShard rows in 'running' state on enrolled workers.
async with self._session() as session:
shard_rows = await session.execute(
select(DeckyShard).where(DeckyShard.state == "running")
)
for s in shard_rows.scalars().all():
d = self._deserialize_json_fields(
s.model_dump(mode="json"), ("services", "decky_config")
)
out.append({
"uuid": f"{d.get('host_uuid')}:{d.get('decky_name')}",
"name": d.get("decky_name"),
"ip": d.get("decky_ip"),
"services": d.get("services") or [],
"source": "shard",
})
return out
# ------------------------------------------------------------ mazenet
@staticmethod