feat(swarm): persist DeckyConfig snapshot per shard + enrich list API
Dispatch now writes the full serialised DeckyConfig into DeckyShard.decky_config (plus decky_ip as a cheap extract), so the master can render the same rich per-decky card the local-fleet view uses — hostname, distro, archetype, service_config, mutate_interval, last_mutated — without round-tripping to the worker on every page render. DeckyShardView gains the corresponding fields; the repository flattens the snapshot at read time. Pre-migration rows keep working (fields fall through as None/defaults). Columns are additive + nullable so SQLModel.metadata.create_all handles the change on both SQLite and MySQL. Backfill happens organically on the next dispatch or (in a follow-up) agent heartbeat.
This commit is contained in:
@@ -140,9 +140,20 @@ class DeckyShard(SQLModel, table=True):
|
|||||||
host_uuid: str = Field(foreign_key="swarm_hosts.uuid", index=True)
|
host_uuid: str = Field(foreign_key="swarm_hosts.uuid", index=True)
|
||||||
# JSON list of service names running on this decky (snapshot of assignment).
|
# JSON list of service names running on this decky (snapshot of assignment).
|
||||||
services: str = Field(sa_column=Column("services", _BIG_TEXT, nullable=False, default="[]"))
|
services: str = Field(sa_column=Column("services", _BIG_TEXT, nullable=False, default="[]"))
|
||||||
state: str = Field(default="pending", index=True) # pending|running|failed|torn_down
|
# Full serialised DeckyConfig from the most recent dispatch or heartbeat.
|
||||||
|
# Lets the dashboard render the same rich card (hostname/distro/archetype/
|
||||||
|
# service_config/mutate_interval) that the local-fleet view uses, without
|
||||||
|
# needing a live round-trip to the worker for every page render.
|
||||||
|
decky_config: Optional[str] = Field(
|
||||||
|
default=None, sa_column=Column("decky_config", _BIG_TEXT, nullable=True)
|
||||||
|
)
|
||||||
|
decky_ip: Optional[str] = Field(default=None)
|
||||||
|
state: str = Field(default="pending", index=True) # pending|running|failed|torn_down|degraded|tearing_down|teardown_failed
|
||||||
last_error: Optional[str] = Field(default=None, sa_column=Column("last_error", Text, nullable=True))
|
last_error: Optional[str] = Field(default=None, sa_column=Column("last_error", Text, nullable=True))
|
||||||
compose_hash: Optional[str] = Field(default=None)
|
compose_hash: Optional[str] = Field(default=None)
|
||||||
|
# Timestamp of the last heartbeat that echoed this shard; lets the UI
|
||||||
|
# show "stale" decks whose agent has gone silent.
|
||||||
|
last_seen: Optional[datetime] = Field(default=None)
|
||||||
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
||||||
|
|
||||||
|
|
||||||
@@ -343,6 +354,15 @@ class DeckyShardView(BaseModel):
|
|||||||
last_error: Optional[str] = None
|
last_error: Optional[str] = None
|
||||||
compose_hash: Optional[str] = None
|
compose_hash: Optional[str] = None
|
||||||
updated_at: datetime
|
updated_at: datetime
|
||||||
|
# Enriched fields lifted from the stored DeckyConfig snapshot so the
|
||||||
|
# dashboard can render the same card shape as the local-fleet view.
|
||||||
|
hostname: Optional[str] = None
|
||||||
|
distro: Optional[str] = None
|
||||||
|
archetype: Optional[str] = None
|
||||||
|
service_config: dict[str, dict[str, Any]] = {}
|
||||||
|
mutate_interval: Optional[int] = None
|
||||||
|
last_mutated: float = 0.0
|
||||||
|
last_seen: Optional[datetime] = None
|
||||||
|
|
||||||
|
|
||||||
class SwarmDeployRequest(BaseModel):
|
class SwarmDeployRequest(BaseModel):
|
||||||
|
|||||||
@@ -850,6 +850,27 @@ class SQLModelRepository(BaseRepository):
|
|||||||
d["services"] = json.loads(raw)
|
d["services"] = json.loads(raw)
|
||||||
except (json.JSONDecodeError, TypeError):
|
except (json.JSONDecodeError, TypeError):
|
||||||
d["services"] = []
|
d["services"] = []
|
||||||
|
# Flatten the stored DeckyConfig snapshot into the row so
|
||||||
|
# routers can hand it to DeckyShardView without re-parsing.
|
||||||
|
# Rows predating the migration have decky_config=NULL and
|
||||||
|
# fall through with the default (None/{}) view values.
|
||||||
|
cfg_raw = d.get("decky_config")
|
||||||
|
if isinstance(cfg_raw, str):
|
||||||
|
try:
|
||||||
|
cfg = json.loads(cfg_raw)
|
||||||
|
except (json.JSONDecodeError, TypeError):
|
||||||
|
cfg = {}
|
||||||
|
if isinstance(cfg, dict):
|
||||||
|
for k in ("hostname", "distro", "archetype",
|
||||||
|
"service_config", "mutate_interval",
|
||||||
|
"last_mutated"):
|
||||||
|
if k in cfg and d.get(k) is None:
|
||||||
|
d[k] = cfg[k]
|
||||||
|
# Keep decky_ip authoritative from the column (newer
|
||||||
|
# heartbeats overwrite it) but fall back to the
|
||||||
|
# snapshot if the column is still NULL.
|
||||||
|
if not d.get("decky_ip") and cfg.get("ip"):
|
||||||
|
d["decky_ip"] = cfg["ip"]
|
||||||
out.append(d)
|
out.append(d)
|
||||||
return out
|
return out
|
||||||
|
|
||||||
|
|||||||
@@ -89,6 +89,8 @@ async def dispatch_decnet_config(
|
|||||||
"decky_name": d.name,
|
"decky_name": d.name,
|
||||||
"host_uuid": host_uuid,
|
"host_uuid": host_uuid,
|
||||||
"services": json.dumps(d.services),
|
"services": json.dumps(d.services),
|
||||||
|
"decky_config": d.model_dump_json(),
|
||||||
|
"decky_ip": d.ip,
|
||||||
"state": "running" if not dry_run else "pending",
|
"state": "running" if not dry_run else "pending",
|
||||||
"last_error": None,
|
"last_error": None,
|
||||||
"updated_at": datetime.now(timezone.utc),
|
"updated_at": datetime.now(timezone.utc),
|
||||||
@@ -118,6 +120,8 @@ async def dispatch_decnet_config(
|
|||||||
"decky_name": d.name,
|
"decky_name": d.name,
|
||||||
"host_uuid": host_uuid,
|
"host_uuid": host_uuid,
|
||||||
"services": json.dumps(d.services),
|
"services": json.dumps(d.services),
|
||||||
|
"decky_config": d.model_dump_json(),
|
||||||
|
"decky_ip": d.ip,
|
||||||
"state": "running" if is_up else "failed",
|
"state": "running" if is_up else "failed",
|
||||||
"last_error": None if is_up else str(exc)[:512],
|
"last_error": None if is_up else str(exc)[:512],
|
||||||
"updated_at": datetime.now(timezone.utc),
|
"updated_at": datetime.now(timezone.utc),
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ async def api_list_deckies(
|
|||||||
host = hosts.get(s["host_uuid"], {})
|
host = hosts.get(s["host_uuid"], {})
|
||||||
out.append(DeckyShardView(
|
out.append(DeckyShardView(
|
||||||
decky_name=s["decky_name"],
|
decky_name=s["decky_name"],
|
||||||
|
decky_ip=s.get("decky_ip"),
|
||||||
host_uuid=s["host_uuid"],
|
host_uuid=s["host_uuid"],
|
||||||
host_name=host.get("name") or "<unknown>",
|
host_name=host.get("name") or "<unknown>",
|
||||||
host_address=host.get("address") or "",
|
host_address=host.get("address") or "",
|
||||||
@@ -43,5 +44,12 @@ async def api_list_deckies(
|
|||||||
last_error=s.get("last_error"),
|
last_error=s.get("last_error"),
|
||||||
compose_hash=s.get("compose_hash"),
|
compose_hash=s.get("compose_hash"),
|
||||||
updated_at=s["updated_at"],
|
updated_at=s["updated_at"],
|
||||||
|
hostname=s.get("hostname"),
|
||||||
|
distro=s.get("distro"),
|
||||||
|
archetype=s.get("archetype"),
|
||||||
|
service_config=s.get("service_config") or {},
|
||||||
|
mutate_interval=s.get("mutate_interval"),
|
||||||
|
last_mutated=s.get("last_mutated") or 0.0,
|
||||||
|
last_seen=s.get("last_seen"),
|
||||||
))
|
))
|
||||||
return out
|
return out
|
||||||
|
|||||||
@@ -22,9 +22,8 @@ async def list_deckies(
|
|||||||
shards = await repo.list_decky_shards(host_uuid)
|
shards = await repo.list_decky_shards(host_uuid)
|
||||||
hosts = {h["uuid"]: h for h in await repo.list_swarm_hosts()}
|
hosts = {h["uuid"]: h for h in await repo.list_swarm_hosts()}
|
||||||
|
|
||||||
# IPs live on the stored DecnetConfig, not on the shard row. Resolve by
|
# Pre-heartbeat fallback — older rows without decky_config can still
|
||||||
# decky_name — if the master rebooted without a config, the column falls
|
# surface their IP from the master's deploy state snapshot.
|
||||||
# back to "—" rather than blocking the list.
|
|
||||||
deploy_state = await repo.get_state("deployment") or {}
|
deploy_state = await repo.get_state("deployment") or {}
|
||||||
cfg_deckies = (deploy_state.get("config") or {}).get("deckies") or []
|
cfg_deckies = (deploy_state.get("config") or {}).get("deckies") or []
|
||||||
ip_by_name: dict[str, str] = {
|
ip_by_name: dict[str, str] = {
|
||||||
@@ -38,7 +37,7 @@ async def list_deckies(
|
|||||||
host = hosts.get(s["host_uuid"], {})
|
host = hosts.get(s["host_uuid"], {})
|
||||||
out.append(DeckyShardView(
|
out.append(DeckyShardView(
|
||||||
decky_name=s["decky_name"],
|
decky_name=s["decky_name"],
|
||||||
decky_ip=ip_by_name.get(s["decky_name"]),
|
decky_ip=s.get("decky_ip") or ip_by_name.get(s["decky_name"]),
|
||||||
host_uuid=s["host_uuid"],
|
host_uuid=s["host_uuid"],
|
||||||
host_name=host.get("name") or "<unknown>",
|
host_name=host.get("name") or "<unknown>",
|
||||||
host_address=host.get("address") or "",
|
host_address=host.get("address") or "",
|
||||||
@@ -48,5 +47,12 @@ async def list_deckies(
|
|||||||
last_error=s.get("last_error"),
|
last_error=s.get("last_error"),
|
||||||
compose_hash=s.get("compose_hash"),
|
compose_hash=s.get("compose_hash"),
|
||||||
updated_at=s["updated_at"],
|
updated_at=s["updated_at"],
|
||||||
|
hostname=s.get("hostname"),
|
||||||
|
distro=s.get("distro"),
|
||||||
|
archetype=s.get("archetype"),
|
||||||
|
service_config=s.get("service_config") or {},
|
||||||
|
mutate_interval=s.get("mutate_interval"),
|
||||||
|
last_mutated=s.get("last_mutated") or 0.0,
|
||||||
|
last_seen=s.get("last_seen"),
|
||||||
))
|
))
|
||||||
return out
|
return out
|
||||||
|
|||||||
Reference in New Issue
Block a user