feat(db): add DeckyLifecycle table for async deploy/mutate tracking

One row per (decky, operation) attempt. State machine:
pending -> running -> succeeded | failed (+ error text). Rows are
append-only after terminal; retries write a new row.

Sibling of DeckyShard rather than a rework -- DeckyShard tracks
runtime container state observed via heartbeat, this tracks
operation lifecycle. New table, UUID PK.

Adds BaseRepository abstract methods (create_lifecycle,
update_lifecycle, get_lifecycle_by_ids, find_open_lifecycle,
sweep_stale_lifecycle) with SQLModelRepository mixin impl.
Backbone for the upcoming 202-Accepted async API.
This commit is contained in:
2026-05-22 16:20:00 -04:00
parent ade8bbe30a
commit 05c0721a51
7 changed files with 451 additions and 0 deletions

View File

@@ -82,6 +82,15 @@ from .decky import (
ServiceConfigFieldDTO,
ServiceSchemaResponse,
)
from .decky_lifecycle import (
DeckyLifecycle,
DeckyLifecycleListResponse,
DeckyLifecycleView,
LifecycleAcceptedResponse,
LifecycleDelta,
LifecycleOperation,
LifecycleStatus,
)
from .fleet import (
LOCAL_HOST_SENTINEL,
FleetDecky,
@@ -278,6 +287,14 @@ __all__ = [
"FleetDecky",
"ServiceConfigFieldDTO",
"ServiceSchemaResponse",
# decky_lifecycle
"DeckyLifecycle",
"DeckyLifecycleListResponse",
"DeckyLifecycleView",
"LifecycleAcceptedResponse",
"LifecycleDelta",
"LifecycleOperation",
"LifecycleStatus",
# health
"ComponentHealth",
"HealthResponse",

View File

@@ -0,0 +1,87 @@
"""DeckyLifecycle table + DTOs.
Tracks one row per (decky, operation) attempt — `deploy` or `mutate` —
so the API can return 202 Accepted immediately and the wizard can poll
state instead of holding an open HTTP request open for minutes.
State machine: ``pending`` (row created, runner not yet started) →
``running`` (runner picked it up) → terminal ``succeeded`` | ``failed``
(+ ``error`` text). Rows are immutable after terminal status; a retry
writes a new row.
Sibling of DeckyShard rather than a rework — DeckyShard tracks runtime
container state observed via heartbeat, this tracks operation lifecycle.
Per ``feedback_uuid_over_natural_keys``: new use case, new table, UUID PK.
"""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from typing import Literal, Optional
from pydantic import BaseModel, Field as PydanticField
from sqlalchemy import Column, Text
from sqlmodel import Field, SQLModel
LifecycleOperation = Literal["deploy", "mutate"]
LifecycleStatus = Literal["pending", "running", "succeeded", "failed"]
def _now_utc() -> datetime:
return datetime.now(timezone.utc)
class DeckyLifecycle(SQLModel, table=True):
"""One row per (decky, operation) attempt."""
__tablename__ = "decky_lifecycle"
id: str = Field(
primary_key=True,
default_factory=lambda: str(uuid.uuid4()),
)
decky_name: str = Field(index=True)
# None for unihost / master-resident deckies.
host_uuid: Optional[str] = Field(default=None, index=True)
operation: str = Field(index=True) # LifecycleOperation
status: str = Field(default="pending", index=True) # LifecycleStatus
error: Optional[str] = Field(
default=None, sa_column=Column("error", Text, nullable=True),
)
started_at: datetime = Field(default_factory=_now_utc)
updated_at: datetime = Field(default_factory=_now_utc)
completed_at: Optional[datetime] = Field(default=None)
# --- HTTP DTOs ---
class DeckyLifecycleView(BaseModel):
"""One lifecycle row, serialised for the wizard polling loop."""
id: str
decky_name: str
host_uuid: Optional[str] = None
operation: str
status: str
error: Optional[str] = None
started_at: datetime
updated_at: datetime
completed_at: Optional[datetime] = None
class DeckyLifecycleListResponse(BaseModel):
rows: list[DeckyLifecycleView] = PydanticField(default_factory=list)
class LifecycleAcceptedResponse(BaseModel):
"""Returned by 202 deploy/mutate endpoints — lets the client subscribe
to the matching DeckyLifecycle rows via the polling endpoint."""
lifecycle_ids: list[str]
class LifecycleDelta(BaseModel):
"""One per-decky completion record in a worker → master heartbeat."""
decky_name: str
operation: str
status: str # one of LifecycleStatus, typically "succeeded" | "failed"
error: Optional[str] = None
completed_at: Optional[datetime] = None

View File

@@ -872,6 +872,71 @@ class BaseRepository(ABC):
async def delete_decky_shard(self, decky_name: str) -> bool:
raise NotImplementedError
# ----------------------------------------------------------- lifecycle
# Per-(decky, operation) attempt tracking for the async deploy/mutate
# state machine. Rows are append-only after terminal status; retries
# write new rows.
async def create_lifecycle(self, data: dict[str, Any]) -> str:
"""Insert a new lifecycle row in ``pending`` status.
``data`` must include ``decky_name``, ``operation``; ``host_uuid``
and ``id`` are optional. Returns the row's ``id``.
"""
raise NotImplementedError
async def update_lifecycle(
self,
lifecycle_id: str,
fields: dict[str, Any],
) -> None:
"""Partial update of an open row (status / error / timestamps).
Callers must bump ``updated_at`` themselves, or rely on the impl
to stamp it. Terminal rows are not protected at this layer — the
runner / heartbeat handler must enforce the immutability rule.
"""
raise NotImplementedError
async def get_lifecycle_by_ids(
self, lifecycle_ids: list[str],
) -> list[dict[str, Any]]:
"""Fetch lifecycle rows by id, in undefined order.
Used by the wizard polling endpoint; missing ids are silently
omitted from the result rather than raising.
"""
raise NotImplementedError
async def find_open_lifecycle(
self,
decky_name: str,
operation: str,
host_uuid: Optional[str] = None,
) -> Optional[dict[str, Any]]:
"""Return the most-recently-started row whose status is
``pending`` or ``running`` for (decky_name, operation,
host_uuid). ``None`` if none open.
The worker-heartbeat path uses this to match deltas to the
master's view of an in-flight operation.
"""
raise NotImplementedError
async def sweep_stale_lifecycle(
self,
older_than: datetime,
reason: str,
) -> int:
"""Mark every non-terminal row started before ``older_than`` as
``failed`` with ``error=reason``. Returns the row count.
Called on master boot to flush orphans from a previous crash.
Pre-v1 substitute for a durable queue per
``feedback_prev1_no_user_problems``.
"""
raise NotImplementedError
# ----------------------------------------------------------- mazenet
# MazeNET topology persistence. Default no-op / NotImplementedError so
# non-default backends stay functional; SQLModelRepository provides the

View File

@@ -42,6 +42,7 @@ from decnet.web.db.sqlmodel_repo.campaigns import CampaignsMixin
from decnet.web.db.sqlmodel_repo.canary import CanaryMixin
from decnet.web.db.sqlmodel_repo.credentials import CredentialsMixin
from decnet.web.db.sqlmodel_repo.deckies import DeckiesMixin
from decnet.web.db.sqlmodel_repo.decky_lifecycle import LifecycleMixin
from decnet.web.db.sqlmodel_repo.fleet import FleetMixin
from decnet.web.db.sqlmodel_repo.identities import IdentitiesMixin
from decnet.web.db.sqlmodel_repo.logs import LogsMixin
@@ -66,6 +67,7 @@ class SQLModelRepository(
CanaryMixin,
CredentialsMixin,
DeckiesMixin,
LifecycleMixin,
FleetMixin,
IdentitiesMixin,
LogsMixin,

View File

@@ -0,0 +1,106 @@
"""DeckyLifecycle CRUD + sweep.
One row per (decky, operation) attempt. States: pending → running →
succeeded | failed. Mixed into ``SQLModelRepository`` for both SQLite
and MySQL via MRO composition.
"""
from __future__ import annotations
import uuid as _uuid
from datetime import datetime, timezone
from typing import Any, Optional
from sqlalchemy import asc, select, update
from decnet.web.db.models import DeckyLifecycle
from decnet.web.db.sqlmodel_repo._helpers import _MixinBase
_TERMINAL = ("succeeded", "failed")
class LifecycleMixin(_MixinBase):
"""Mixin: composed onto ``SQLModelRepository``."""
async def create_lifecycle(self, data: dict[str, Any]) -> str:
payload = dict(data)
payload.setdefault("id", str(_uuid.uuid4()))
payload.setdefault("status", "pending")
now = datetime.now(timezone.utc)
payload.setdefault("started_at", now)
payload["updated_at"] = now
async with self._session() as session:
session.add(DeckyLifecycle(**payload))
await session.commit()
return str(payload["id"])
async def update_lifecycle(
self,
lifecycle_id: str,
fields: dict[str, Any],
) -> None:
payload = dict(fields)
payload["updated_at"] = datetime.now(timezone.utc)
if payload.get("status") in _TERMINAL and "completed_at" not in payload:
payload["completed_at"] = payload["updated_at"]
async with self._session() as session:
await session.execute(
update(DeckyLifecycle)
.where(DeckyLifecycle.id == lifecycle_id)
.values(**payload)
)
await session.commit()
async def get_lifecycle_by_ids(
self, lifecycle_ids: list[str],
) -> list[dict[str, Any]]:
if not lifecycle_ids:
return []
async with self._session() as session:
result = await session.execute(
select(DeckyLifecycle)
.where(DeckyLifecycle.id.in_(lifecycle_ids)) # type: ignore[attr-defined]
.order_by(asc(DeckyLifecycle.started_at))
)
return [r.model_dump(mode="json") for r in result.scalars().all()]
async def find_open_lifecycle(
self,
decky_name: str,
operation: str,
host_uuid: Optional[str] = None,
) -> Optional[dict[str, Any]]:
stmt = (
select(DeckyLifecycle)
.where(DeckyLifecycle.decky_name == decky_name)
.where(DeckyLifecycle.operation == operation)
.where(DeckyLifecycle.status.in_(("pending", "running"))) # type: ignore[attr-defined]
.order_by(DeckyLifecycle.started_at.desc()) # type: ignore[attr-defined]
)
if host_uuid is not None:
stmt = stmt.where(DeckyLifecycle.host_uuid == host_uuid)
async with self._session() as session:
result = await session.execute(stmt)
row = result.scalars().first()
return row.model_dump(mode="json") if row else None
async def sweep_stale_lifecycle(
self,
older_than: datetime,
reason: str,
) -> int:
now = datetime.now(timezone.utc)
async with self._session() as session:
result = await session.execute(
update(DeckyLifecycle)
.where(DeckyLifecycle.status.in_(("pending", "running"))) # type: ignore[attr-defined]
.where(DeckyLifecycle.started_at < older_than)
.values(
status="failed",
error=reason,
updated_at=now,
completed_at=now,
)
)
await session.commit()
return result.rowcount or 0

View File

@@ -153,6 +153,17 @@ class DummyRepo(BaseRepository):
async def iter_canary_triggers_since(self, since):
return
yield
# DeckyLifecycle surface
async def create_lifecycle(self, data):
await super().create_lifecycle(data); return ""
async def update_lifecycle(self, lifecycle_id, fields):
await super().update_lifecycle(lifecycle_id, fields)
async def get_lifecycle_by_ids(self, ids):
await super().get_lifecycle_by_ids(ids); return []
async def find_open_lifecycle(self, decky_name, operation, host_uuid=None):
await super().find_open_lifecycle(decky_name, operation, host_uuid); return None
async def sweep_stale_lifecycle(self, older_than, reason):
await super().sweep_stale_lifecycle(older_than, reason); return 0
@pytest.mark.asyncio
async def test_base_repo_coverage():
@@ -319,6 +330,11 @@ async def test_base_repo_coverage():
(dr.upsert_decky_shard, ({},)),
(dr.list_decky_shards, ()),
(dr.delete_decky_shards_for_host, ("u",)),
(dr.create_lifecycle, ({"decky_name": "d", "operation": "deploy"},)),
(dr.update_lifecycle, ("id", {})),
(dr.get_lifecycle_by_ids, (["id"],)),
(dr.find_open_lifecycle, ("d", "deploy")),
(dr.sweep_stale_lifecycle, (datetime.now(timezone.utc), "reason")),
(dr.create_topology, ({},)),
(dr.get_topology, ("t",)),
(dr.list_topologies, ()),

View File

@@ -0,0 +1,158 @@
"""DeckyLifecycle repo CRUD + sweep tests.
State machine: pending → running → succeeded | failed. Rows are
append-only after terminal; retries write a new row.
"""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from pathlib import Path
import pytest
from decnet.web.db.factory import get_repository
@pytest.fixture
async def repo(tmp_path: Path):
r = get_repository(db_path=str(tmp_path / "lifecycle.db"))
await r.initialize()
return r
@pytest.mark.anyio
async def test_create_lifecycle_returns_id_and_defaults_pending(repo) -> None:
lid = await repo.create_lifecycle(
{"decky_name": "decky-01", "operation": "deploy"},
)
assert isinstance(lid, str) and lid
rows = await repo.get_lifecycle_by_ids([lid])
assert len(rows) == 1
row = rows[0]
assert row["decky_name"] == "decky-01"
assert row["operation"] == "deploy"
assert row["status"] == "pending"
assert row["error"] is None
assert row["completed_at"] is None
assert row["started_at"] is not None
assert row["updated_at"] is not None
@pytest.mark.anyio
async def test_update_lifecycle_terminal_stamps_completed_at(repo) -> None:
lid = await repo.create_lifecycle(
{"decky_name": "decky-01", "operation": "mutate"},
)
await repo.update_lifecycle(lid, {"status": "running"})
rows = await repo.get_lifecycle_by_ids([lid])
assert rows[0]["status"] == "running"
assert rows[0]["completed_at"] is None
await repo.update_lifecycle(
lid, {"status": "succeeded"},
)
rows = await repo.get_lifecycle_by_ids([lid])
assert rows[0]["status"] == "succeeded"
assert rows[0]["completed_at"] is not None
@pytest.mark.anyio
async def test_update_lifecycle_failure_carries_error(repo) -> None:
lid = await repo.create_lifecycle(
{"decky_name": "decky-01", "operation": "deploy"},
)
await repo.update_lifecycle(
lid, {"status": "failed", "error": "compose blew up"},
)
rows = await repo.get_lifecycle_by_ids([lid])
assert rows[0]["status"] == "failed"
assert rows[0]["error"] == "compose blew up"
assert rows[0]["completed_at"] is not None
@pytest.mark.anyio
async def test_get_lifecycle_by_ids_empty_list_returns_empty(repo) -> None:
assert await repo.get_lifecycle_by_ids([]) == []
@pytest.mark.anyio
async def test_get_lifecycle_by_ids_unknown_id_silently_omitted(repo) -> None:
lid = await repo.create_lifecycle(
{"decky_name": "d", "operation": "deploy"},
)
rows = await repo.get_lifecycle_by_ids([lid, "no-such-id"])
assert len(rows) == 1
assert rows[0]["id"] == lid
@pytest.mark.anyio
async def test_find_open_lifecycle_matches_pending_and_running(repo) -> None:
p = await repo.create_lifecycle(
{"decky_name": "decky-01", "operation": "deploy"},
)
found = await repo.find_open_lifecycle("decky-01", "deploy")
assert found is not None
assert found["id"] == p
await repo.update_lifecycle(p, {"status": "running"})
found = await repo.find_open_lifecycle("decky-01", "deploy")
assert found is not None
assert found["status"] == "running"
@pytest.mark.anyio
async def test_find_open_lifecycle_skips_terminal_rows(repo) -> None:
lid = await repo.create_lifecycle(
{"decky_name": "decky-01", "operation": "deploy"},
)
await repo.update_lifecycle(lid, {"status": "succeeded"})
assert await repo.find_open_lifecycle("decky-01", "deploy") is None
@pytest.mark.anyio
async def test_find_open_lifecycle_host_uuid_filter(repo) -> None:
a = await repo.create_lifecycle(
{"decky_name": "d", "operation": "deploy", "host_uuid": "h1"},
)
await repo.create_lifecycle(
{"decky_name": "d", "operation": "deploy", "host_uuid": "h2"},
)
found = await repo.find_open_lifecycle("d", "deploy", host_uuid="h1")
assert found is not None
assert found["id"] == a
@pytest.mark.anyio
async def test_sweep_marks_stale_rows_failed(repo) -> None:
# Stale: started_at well in the past, still pending.
stale_id = await repo.create_lifecycle(
{"decky_name": "old", "operation": "deploy"},
)
# Force its started_at backwards via update (sweep relies on it).
long_ago = datetime.now(timezone.utc) - timedelta(hours=2)
await repo.update_lifecycle(stale_id, {"started_at": long_ago})
# Fresh: just-created, must NOT be swept.
fresh_id = await repo.create_lifecycle(
{"decky_name": "new", "operation": "deploy"},
)
cutoff = datetime.now(timezone.utc) - timedelta(hours=1)
swept = await repo.sweep_stale_lifecycle(
cutoff, reason="master restarted during operation",
)
assert swept == 1
rows = await repo.get_lifecycle_by_ids([stale_id, fresh_id])
by_id = {r["id"]: r for r in rows}
assert by_id[stale_id]["status"] == "failed"
assert by_id[stale_id]["error"] == "master restarted during operation"
assert by_id[stale_id]["completed_at"] is not None
assert by_id[fresh_id]["status"] == "pending"
@pytest.mark.anyio
async def test_sweep_no_op_when_no_stale_rows(repo) -> None:
await repo.create_lifecycle({"decky_name": "d", "operation": "deploy"})
cutoff = datetime.now(timezone.utc) - timedelta(hours=1)
assert await repo.sweep_stale_lifecycle(cutoff, reason="x") == 0