feat(db): FleetDecky table mirrors decnet-state.json into the DB
Adds a fleet_deckies table so DB-only consumers (orchestrator, web dashboard, REST API) can see unihost / MACVLAN / IPVLAN deckies without reading the JSON state file. Mirrors DeckyShard field-for-field. Composite PK (host_uuid, name) future-proofs for a mothership that runs both a local fleet and acts as a swarm master. host_uuid defaults to the "local" sentinel — no FK to swarm_hosts because the local mothership isn't enrolled as a worker. Repo additions: upsert_fleet_decky, delete_fleet_decky, list_fleet_deckies, list_running_fleet_deckies, update_fleet_decky_state, plus list_running_deckies which unions topology + fleet + shard sources for the orchestrator. Smoke-tested round-trip against MySQL: upsert, list_running, union view (source="fleet"), delete.
This commit is contained in:
@@ -49,6 +49,10 @@ from .deploy import (
|
|||||||
MutateIntervalRequest,
|
MutateIntervalRequest,
|
||||||
PurgeResponse,
|
PurgeResponse,
|
||||||
)
|
)
|
||||||
|
from .fleet import (
|
||||||
|
LOCAL_HOST_SENTINEL,
|
||||||
|
FleetDecky,
|
||||||
|
)
|
||||||
from .health import (
|
from .health import (
|
||||||
ComponentHealth,
|
ComponentHealth,
|
||||||
HealthResponse,
|
HealthResponse,
|
||||||
@@ -182,6 +186,9 @@ __all__ = [
|
|||||||
"DeployResponse",
|
"DeployResponse",
|
||||||
"MutateIntervalRequest",
|
"MutateIntervalRequest",
|
||||||
"PurgeResponse",
|
"PurgeResponse",
|
||||||
|
# fleet
|
||||||
|
"LOCAL_HOST_SENTINEL",
|
||||||
|
"FleetDecky",
|
||||||
# health
|
# health
|
||||||
"ComponentHealth",
|
"ComponentHealth",
|
||||||
"HealthResponse",
|
"HealthResponse",
|
||||||
|
|||||||
72
decnet/web/db/models/fleet.py
Normal file
72
decnet/web/db/models/fleet.py
Normal file
@@ -0,0 +1,72 @@
|
|||||||
|
"""Fleet decky table — DB mirror of ``decnet-state.json``.
|
||||||
|
|
||||||
|
The legacy unihost / MACVLAN / IPVLAN deploy path persists fleet state to a
|
||||||
|
JSON file (``/var/lib/decnet/decnet-state.json``) via
|
||||||
|
:func:`decnet.config.save_state`. That file is consumed directly by
|
||||||
|
``decnet status``/``decnet teardown``, the sniffer, and the collector — all
|
||||||
|
host-local CLI / worker code that may run on a box without the API daemon.
|
||||||
|
|
||||||
|
The FleetDecky table is a *mirror* of that JSON state inside MySQL/SQLite so
|
||||||
|
DB-only consumers (the orchestrator, the web dashboard, the REST API) can
|
||||||
|
see fleet decoys without touching the filesystem.
|
||||||
|
|
||||||
|
Both writers — CLI ``decnet deploy`` (``engine.deployer.deploy``) and the
|
||||||
|
web/API deploy path (``web.router.fleet.api_deploy_deckies``) — write to
|
||||||
|
*both* surfaces. A reconciler (``decnet.fleet.reconciler``) handles drift.
|
||||||
|
|
||||||
|
Schema mirrors :class:`decnet.web.db.models.swarm.DeckyShard` field-for-field
|
||||||
|
so the dashboard can render fleet rows with the same card shape. The PK is
|
||||||
|
composite ``(host_uuid, name)`` to future-proof for multi-host motherships
|
||||||
|
(a master that runs its own local fleet AND swarm-shards onto workers). In
|
||||||
|
unihost mode ``host_uuid`` defaults to the sentinel
|
||||||
|
:data:`LOCAL_HOST_SENTINEL`; we deliberately do NOT FK to ``swarm_hosts``
|
||||||
|
because the local mothership is not enrolled as a swarm worker.
|
||||||
|
"""
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from sqlalchemy import Column, Text
|
||||||
|
from sqlmodel import Field, SQLModel
|
||||||
|
|
||||||
|
from ._base import _BIG_TEXT
|
||||||
|
|
||||||
|
|
||||||
|
LOCAL_HOST_SENTINEL = "local"
|
||||||
|
|
||||||
|
|
||||||
|
class FleetDecky(SQLModel, table=True):
|
||||||
|
"""A unihost / MACVLAN / IPVLAN decky deployed on the local mothership.
|
||||||
|
|
||||||
|
Disjoint from :class:`DeckyShard` (SWARM-only) and :class:`TopologyDecky`
|
||||||
|
(MazeNET-only). Composite PK lets multiple hosts coexist when a future
|
||||||
|
mothership runs both a local fleet and acts as a swarm master.
|
||||||
|
"""
|
||||||
|
__tablename__ = "fleet_deckies"
|
||||||
|
|
||||||
|
host_uuid: str = Field(
|
||||||
|
default=LOCAL_HOST_SENTINEL, primary_key=True, index=True,
|
||||||
|
)
|
||||||
|
name: str = Field(primary_key=True)
|
||||||
|
# JSON list of service names on this decky (snapshot of assignment).
|
||||||
|
services: str = Field(
|
||||||
|
sa_column=Column("services", _BIG_TEXT, nullable=False, default="[]")
|
||||||
|
)
|
||||||
|
# Full serialised DeckyConfig — lets the dashboard render the same rich
|
||||||
|
# card (hostname/distro/archetype/service_config/mutate_interval) without
|
||||||
|
# round-tripping to load_state() on every page render.
|
||||||
|
decky_config: Optional[str] = Field(
|
||||||
|
default=None, sa_column=Column("decky_config", _BIG_TEXT, nullable=True)
|
||||||
|
)
|
||||||
|
decky_ip: Optional[str] = Field(default=None)
|
||||||
|
# pending|running|failed|torn_down|degraded|tearing_down|teardown_failed
|
||||||
|
state: str = Field(default="pending", index=True)
|
||||||
|
last_error: Optional[str] = Field(
|
||||||
|
default=None, sa_column=Column("last_error", Text, nullable=True),
|
||||||
|
)
|
||||||
|
compose_hash: Optional[str] = Field(default=None)
|
||||||
|
# Last reconciler observation (docker inspect) — lets the dashboard show
|
||||||
|
# "stale" rows whose reconciler hasn't ticked.
|
||||||
|
last_seen: Optional[datetime] = Field(default=None)
|
||||||
|
updated_at: datetime = Field(
|
||||||
|
default_factory=lambda: datetime.now(timezone.utc)
|
||||||
|
)
|
||||||
@@ -858,6 +858,56 @@ class BaseRepository(ABC):
|
|||||||
``enabled=False`` and stamps ``auto_disabled_at``."""
|
``enabled=False`` and stamps ``auto_disabled_at``."""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------- fleet
|
||||||
|
|
||||||
|
async def upsert_fleet_decky(self, data: dict[str, Any]) -> None:
|
||||||
|
"""Insert-or-update a FleetDecky row keyed by ``(host_uuid, name)``.
|
||||||
|
|
||||||
|
Used by ``engine.deployer.deploy`` and the API deploy path to mirror
|
||||||
|
``decnet-state.json`` into the DB. Idempotent: calling with the same
|
||||||
|
key updates the existing row's mutable fields.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
async def delete_fleet_decky(self, *, host_uuid: str, name: str) -> None:
|
||||||
|
"""Remove a FleetDecky row. No-op if the row doesn't exist."""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
async def list_fleet_deckies(
|
||||||
|
self, *, host_uuid: Optional[str] = None,
|
||||||
|
) -> list[dict[str, Any]]:
|
||||||
|
"""Return all FleetDecky rows, optionally scoped to a single host."""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
async def list_running_fleet_deckies(self) -> list[dict[str, Any]]:
|
||||||
|
"""Return every FleetDecky row whose ``state == 'running'``.
|
||||||
|
|
||||||
|
Joined alongside :meth:`list_running_topology_deckies` and the SWARM
|
||||||
|
``DeckyShard`` view by :meth:`list_running_deckies`.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
async def update_fleet_decky_state(
|
||||||
|
self, *, host_uuid: str, name: str, state: str,
|
||||||
|
last_error: Optional[str] = None,
|
||||||
|
) -> None:
|
||||||
|
"""Update only the ``state``/``last_error``/``last_seen`` fields.
|
||||||
|
|
||||||
|
Called by the reconciler when ``docker inspect`` reports a fresh
|
||||||
|
container state. Avoids clobbering operator-edited config fields.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
async def list_running_deckies(self) -> list[dict[str, Any]]:
|
||||||
|
"""Union of running deckies across MazeNET, SWARM, and fleet sources.
|
||||||
|
|
||||||
|
Returns dicts shaped for the orchestrator's scheduler:
|
||||||
|
``{uuid, name, ip, services: list[str], source}`` where ``source`` is
|
||||||
|
one of ``"topology" | "shard" | "fleet"``. Rows from each source are
|
||||||
|
normalised so the scheduler doesn't need to branch on origin.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
# ---------------------------------------------------------- orchestrator
|
# ---------------------------------------------------------- orchestrator
|
||||||
|
|
||||||
async def list_running_topology_deckies(self) -> list[dict[str, Any]]:
|
async def list_running_topology_deckies(self) -> list[dict[str, Any]]:
|
||||||
|
|||||||
@@ -43,6 +43,8 @@ from decnet.web.db.models import (
|
|||||||
SmtpTarget,
|
SmtpTarget,
|
||||||
SwarmHost,
|
SwarmHost,
|
||||||
DeckyShard,
|
DeckyShard,
|
||||||
|
FleetDecky,
|
||||||
|
LOCAL_HOST_SENTINEL,
|
||||||
Topology,
|
Topology,
|
||||||
LAN,
|
LAN,
|
||||||
TopologyDecky,
|
TopologyDecky,
|
||||||
@@ -2023,6 +2025,135 @@ class SQLModelRepository(BaseRepository):
|
|||||||
await session.commit()
|
await session.commit()
|
||||||
return bool(result.rowcount)
|
return bool(result.rowcount)
|
||||||
|
|
||||||
|
# -------------------------------------------------------------- fleet
|
||||||
|
|
||||||
|
async def upsert_fleet_decky(self, data: dict[str, Any]) -> None:
|
||||||
|
payload: dict[str, Any] = {
|
||||||
|
**data,
|
||||||
|
"updated_at": datetime.now(timezone.utc),
|
||||||
|
}
|
||||||
|
payload.setdefault("host_uuid", LOCAL_HOST_SENTINEL)
|
||||||
|
if payload.get("host_uuid") is None:
|
||||||
|
payload["host_uuid"] = LOCAL_HOST_SENTINEL
|
||||||
|
if isinstance(payload.get("services"), list):
|
||||||
|
payload["services"] = orjson.dumps(payload["services"]).decode()
|
||||||
|
if isinstance(payload.get("decky_config"), dict):
|
||||||
|
payload["decky_config"] = orjson.dumps(payload["decky_config"]).decode()
|
||||||
|
async with self._session() as session:
|
||||||
|
result = await session.execute(
|
||||||
|
select(FleetDecky).where(
|
||||||
|
FleetDecky.host_uuid == payload["host_uuid"],
|
||||||
|
FleetDecky.name == payload["name"],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
existing = result.scalar_one_or_none()
|
||||||
|
if existing:
|
||||||
|
for k, v in payload.items():
|
||||||
|
setattr(existing, k, v)
|
||||||
|
session.add(existing)
|
||||||
|
else:
|
||||||
|
session.add(FleetDecky(**payload))
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
async def delete_fleet_decky(self, *, host_uuid: str, name: str) -> None:
|
||||||
|
async with self._session() as session:
|
||||||
|
await session.execute(
|
||||||
|
text(
|
||||||
|
"DELETE FROM fleet_deckies "
|
||||||
|
"WHERE host_uuid = :h AND name = :n"
|
||||||
|
),
|
||||||
|
{"h": host_uuid, "n": name},
|
||||||
|
)
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
async def list_fleet_deckies(
|
||||||
|
self, *, host_uuid: Optional[str] = None,
|
||||||
|
) -> list[dict[str, Any]]:
|
||||||
|
stmt = select(FleetDecky).order_by(asc(FleetDecky.name))
|
||||||
|
if host_uuid:
|
||||||
|
stmt = stmt.where(FleetDecky.host_uuid == host_uuid)
|
||||||
|
async with self._session() as session:
|
||||||
|
result = await session.execute(stmt)
|
||||||
|
return [
|
||||||
|
self._deserialize_json_fields(
|
||||||
|
r.model_dump(mode="json"), ("services", "decky_config")
|
||||||
|
)
|
||||||
|
for r in result.scalars().all()
|
||||||
|
]
|
||||||
|
|
||||||
|
async def list_running_fleet_deckies(self) -> list[dict[str, Any]]:
|
||||||
|
async with self._session() as session:
|
||||||
|
result = await session.execute(
|
||||||
|
select(FleetDecky).where(FleetDecky.state == "running")
|
||||||
|
)
|
||||||
|
return [
|
||||||
|
self._deserialize_json_fields(
|
||||||
|
r.model_dump(mode="json"), ("services", "decky_config")
|
||||||
|
)
|
||||||
|
for r in result.scalars().all()
|
||||||
|
]
|
||||||
|
|
||||||
|
async def update_fleet_decky_state(
|
||||||
|
self, *, host_uuid: str, name: str, state: str,
|
||||||
|
last_error: Optional[str] = None,
|
||||||
|
) -> None:
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
values: dict[str, Any] = {
|
||||||
|
"state": state,
|
||||||
|
"updated_at": now,
|
||||||
|
"last_seen": now,
|
||||||
|
}
|
||||||
|
if last_error is not None:
|
||||||
|
values["last_error"] = last_error
|
||||||
|
async with self._session() as session:
|
||||||
|
await session.execute(
|
||||||
|
update(FleetDecky)
|
||||||
|
.where(
|
||||||
|
FleetDecky.host_uuid == host_uuid,
|
||||||
|
FleetDecky.name == name,
|
||||||
|
)
|
||||||
|
.values(**values)
|
||||||
|
)
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
async def list_running_deckies(self) -> list[dict[str, Any]]:
|
||||||
|
out: list[dict[str, Any]] = []
|
||||||
|
# MazeNET — already shaped {uuid, name, ip, services}
|
||||||
|
for d in await self.list_running_topology_deckies():
|
||||||
|
out.append({
|
||||||
|
"uuid": d.get("uuid"),
|
||||||
|
"name": d.get("name"),
|
||||||
|
"ip": d.get("ip"),
|
||||||
|
"services": d.get("services") or [],
|
||||||
|
"source": "topology",
|
||||||
|
})
|
||||||
|
# Fleet — column is `decky_ip`, PK is composite (host_uuid, name)
|
||||||
|
for d in await self.list_running_fleet_deckies():
|
||||||
|
out.append({
|
||||||
|
"uuid": f"{d.get('host_uuid')}:{d.get('name')}",
|
||||||
|
"name": d.get("name"),
|
||||||
|
"ip": d.get("decky_ip"),
|
||||||
|
"services": d.get("services") or [],
|
||||||
|
"source": "fleet",
|
||||||
|
})
|
||||||
|
# SWARM — DeckyShard rows in 'running' state on enrolled workers.
|
||||||
|
async with self._session() as session:
|
||||||
|
shard_rows = await session.execute(
|
||||||
|
select(DeckyShard).where(DeckyShard.state == "running")
|
||||||
|
)
|
||||||
|
for s in shard_rows.scalars().all():
|
||||||
|
d = self._deserialize_json_fields(
|
||||||
|
s.model_dump(mode="json"), ("services", "decky_config")
|
||||||
|
)
|
||||||
|
out.append({
|
||||||
|
"uuid": f"{d.get('host_uuid')}:{d.get('decky_name')}",
|
||||||
|
"name": d.get("decky_name"),
|
||||||
|
"ip": d.get("decky_ip"),
|
||||||
|
"services": d.get("services") or [],
|
||||||
|
"source": "shard",
|
||||||
|
})
|
||||||
|
return out
|
||||||
|
|
||||||
# ------------------------------------------------------------ mazenet
|
# ------------------------------------------------------------ mazenet
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
|||||||
Reference in New Issue
Block a user