diff --git a/decnet/web/db/models/__init__.py b/decnet/web/db/models/__init__.py index c1c63635..c62bf52d 100644 --- a/decnet/web/db/models/__init__.py +++ b/decnet/web/db/models/__init__.py @@ -82,6 +82,15 @@ from .decky import ( ServiceConfigFieldDTO, ServiceSchemaResponse, ) +from .decky_lifecycle import ( + DeckyLifecycle, + DeckyLifecycleListResponse, + DeckyLifecycleView, + LifecycleAcceptedResponse, + LifecycleDelta, + LifecycleOperation, + LifecycleStatus, +) from .fleet import ( LOCAL_HOST_SENTINEL, FleetDecky, @@ -278,6 +287,14 @@ __all__ = [ "FleetDecky", "ServiceConfigFieldDTO", "ServiceSchemaResponse", + # decky_lifecycle + "DeckyLifecycle", + "DeckyLifecycleListResponse", + "DeckyLifecycleView", + "LifecycleAcceptedResponse", + "LifecycleDelta", + "LifecycleOperation", + "LifecycleStatus", # health "ComponentHealth", "HealthResponse", diff --git a/decnet/web/db/models/decky_lifecycle.py b/decnet/web/db/models/decky_lifecycle.py new file mode 100644 index 00000000..23572ffc --- /dev/null +++ b/decnet/web/db/models/decky_lifecycle.py @@ -0,0 +1,87 @@ +"""DeckyLifecycle table + DTOs. + +Tracks one row per (decky, operation) attempt — `deploy` or `mutate` — +so the API can return 202 Accepted immediately and the wizard can poll +state instead of holding an open HTTP request open for minutes. + +State machine: ``pending`` (row created, runner not yet started) → +``running`` (runner picked it up) → terminal ``succeeded`` | ``failed`` +(+ ``error`` text). Rows are immutable after terminal status; a retry +writes a new row. + +Sibling of DeckyShard rather than a rework — DeckyShard tracks runtime +container state observed via heartbeat, this tracks operation lifecycle. +Per ``feedback_uuid_over_natural_keys``: new use case, new table, UUID PK. +""" +from __future__ import annotations + +import uuid +from datetime import datetime, timezone +from typing import Literal, Optional + +from pydantic import BaseModel, Field as PydanticField +from sqlalchemy import Column, Text +from sqlmodel import Field, SQLModel + +LifecycleOperation = Literal["deploy", "mutate"] +LifecycleStatus = Literal["pending", "running", "succeeded", "failed"] + + +def _now_utc() -> datetime: + return datetime.now(timezone.utc) + + +class DeckyLifecycle(SQLModel, table=True): + """One row per (decky, operation) attempt.""" + + __tablename__ = "decky_lifecycle" + + id: str = Field( + primary_key=True, + default_factory=lambda: str(uuid.uuid4()), + ) + decky_name: str = Field(index=True) + # None for unihost / master-resident deckies. + host_uuid: Optional[str] = Field(default=None, index=True) + operation: str = Field(index=True) # LifecycleOperation + status: str = Field(default="pending", index=True) # LifecycleStatus + error: Optional[str] = Field( + default=None, sa_column=Column("error", Text, nullable=True), + ) + started_at: datetime = Field(default_factory=_now_utc) + updated_at: datetime = Field(default_factory=_now_utc) + completed_at: Optional[datetime] = Field(default=None) + + +# --- HTTP DTOs --- + +class DeckyLifecycleView(BaseModel): + """One lifecycle row, serialised for the wizard polling loop.""" + id: str + decky_name: str + host_uuid: Optional[str] = None + operation: str + status: str + error: Optional[str] = None + started_at: datetime + updated_at: datetime + completed_at: Optional[datetime] = None + + +class DeckyLifecycleListResponse(BaseModel): + rows: list[DeckyLifecycleView] = PydanticField(default_factory=list) + + +class LifecycleAcceptedResponse(BaseModel): + """Returned by 202 deploy/mutate endpoints — lets the client subscribe + to the matching DeckyLifecycle rows via the polling endpoint.""" + lifecycle_ids: list[str] + + +class LifecycleDelta(BaseModel): + """One per-decky completion record in a worker → master heartbeat.""" + decky_name: str + operation: str + status: str # one of LifecycleStatus, typically "succeeded" | "failed" + error: Optional[str] = None + completed_at: Optional[datetime] = None diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index fc8d7c04..fbe9a703 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -872,6 +872,71 @@ class BaseRepository(ABC): async def delete_decky_shard(self, decky_name: str) -> bool: raise NotImplementedError + # ----------------------------------------------------------- lifecycle + # Per-(decky, operation) attempt tracking for the async deploy/mutate + # state machine. Rows are append-only after terminal status; retries + # write new rows. + + async def create_lifecycle(self, data: dict[str, Any]) -> str: + """Insert a new lifecycle row in ``pending`` status. + + ``data`` must include ``decky_name``, ``operation``; ``host_uuid`` + and ``id`` are optional. Returns the row's ``id``. + """ + raise NotImplementedError + + async def update_lifecycle( + self, + lifecycle_id: str, + fields: dict[str, Any], + ) -> None: + """Partial update of an open row (status / error / timestamps). + + Callers must bump ``updated_at`` themselves, or rely on the impl + to stamp it. Terminal rows are not protected at this layer — the + runner / heartbeat handler must enforce the immutability rule. + """ + raise NotImplementedError + + async def get_lifecycle_by_ids( + self, lifecycle_ids: list[str], + ) -> list[dict[str, Any]]: + """Fetch lifecycle rows by id, in undefined order. + + Used by the wizard polling endpoint; missing ids are silently + omitted from the result rather than raising. + """ + raise NotImplementedError + + async def find_open_lifecycle( + self, + decky_name: str, + operation: str, + host_uuid: Optional[str] = None, + ) -> Optional[dict[str, Any]]: + """Return the most-recently-started row whose status is + ``pending`` or ``running`` for (decky_name, operation, + host_uuid). ``None`` if none open. + + The worker-heartbeat path uses this to match deltas to the + master's view of an in-flight operation. + """ + raise NotImplementedError + + async def sweep_stale_lifecycle( + self, + older_than: datetime, + reason: str, + ) -> int: + """Mark every non-terminal row started before ``older_than`` as + ``failed`` with ``error=reason``. Returns the row count. + + Called on master boot to flush orphans from a previous crash. + Pre-v1 substitute for a durable queue per + ``feedback_prev1_no_user_problems``. + """ + raise NotImplementedError + # ----------------------------------------------------------- mazenet # MazeNET topology persistence. Default no-op / NotImplementedError so # non-default backends stay functional; SQLModelRepository provides the diff --git a/decnet/web/db/sqlmodel_repo/__init__.py b/decnet/web/db/sqlmodel_repo/__init__.py index 9585df36..68b6af01 100644 --- a/decnet/web/db/sqlmodel_repo/__init__.py +++ b/decnet/web/db/sqlmodel_repo/__init__.py @@ -42,6 +42,7 @@ from decnet.web.db.sqlmodel_repo.campaigns import CampaignsMixin from decnet.web.db.sqlmodel_repo.canary import CanaryMixin from decnet.web.db.sqlmodel_repo.credentials import CredentialsMixin from decnet.web.db.sqlmodel_repo.deckies import DeckiesMixin +from decnet.web.db.sqlmodel_repo.decky_lifecycle import LifecycleMixin from decnet.web.db.sqlmodel_repo.fleet import FleetMixin from decnet.web.db.sqlmodel_repo.identities import IdentitiesMixin from decnet.web.db.sqlmodel_repo.logs import LogsMixin @@ -66,6 +67,7 @@ class SQLModelRepository( CanaryMixin, CredentialsMixin, DeckiesMixin, + LifecycleMixin, FleetMixin, IdentitiesMixin, LogsMixin, diff --git a/decnet/web/db/sqlmodel_repo/decky_lifecycle.py b/decnet/web/db/sqlmodel_repo/decky_lifecycle.py new file mode 100644 index 00000000..377534a5 --- /dev/null +++ b/decnet/web/db/sqlmodel_repo/decky_lifecycle.py @@ -0,0 +1,106 @@ +"""DeckyLifecycle CRUD + sweep. + +One row per (decky, operation) attempt. States: pending → running → +succeeded | failed. Mixed into ``SQLModelRepository`` for both SQLite +and MySQL via MRO composition. +""" +from __future__ import annotations + +import uuid as _uuid +from datetime import datetime, timezone +from typing import Any, Optional + +from sqlalchemy import asc, select, update + +from decnet.web.db.models import DeckyLifecycle +from decnet.web.db.sqlmodel_repo._helpers import _MixinBase + + +_TERMINAL = ("succeeded", "failed") + + +class LifecycleMixin(_MixinBase): + """Mixin: composed onto ``SQLModelRepository``.""" + + async def create_lifecycle(self, data: dict[str, Any]) -> str: + payload = dict(data) + payload.setdefault("id", str(_uuid.uuid4())) + payload.setdefault("status", "pending") + now = datetime.now(timezone.utc) + payload.setdefault("started_at", now) + payload["updated_at"] = now + async with self._session() as session: + session.add(DeckyLifecycle(**payload)) + await session.commit() + return str(payload["id"]) + + async def update_lifecycle( + self, + lifecycle_id: str, + fields: dict[str, Any], + ) -> None: + payload = dict(fields) + payload["updated_at"] = datetime.now(timezone.utc) + if payload.get("status") in _TERMINAL and "completed_at" not in payload: + payload["completed_at"] = payload["updated_at"] + async with self._session() as session: + await session.execute( + update(DeckyLifecycle) + .where(DeckyLifecycle.id == lifecycle_id) + .values(**payload) + ) + await session.commit() + + async def get_lifecycle_by_ids( + self, lifecycle_ids: list[str], + ) -> list[dict[str, Any]]: + if not lifecycle_ids: + return [] + async with self._session() as session: + result = await session.execute( + select(DeckyLifecycle) + .where(DeckyLifecycle.id.in_(lifecycle_ids)) # type: ignore[attr-defined] + .order_by(asc(DeckyLifecycle.started_at)) + ) + return [r.model_dump(mode="json") for r in result.scalars().all()] + + async def find_open_lifecycle( + self, + decky_name: str, + operation: str, + host_uuid: Optional[str] = None, + ) -> Optional[dict[str, Any]]: + stmt = ( + select(DeckyLifecycle) + .where(DeckyLifecycle.decky_name == decky_name) + .where(DeckyLifecycle.operation == operation) + .where(DeckyLifecycle.status.in_(("pending", "running"))) # type: ignore[attr-defined] + .order_by(DeckyLifecycle.started_at.desc()) # type: ignore[attr-defined] + ) + if host_uuid is not None: + stmt = stmt.where(DeckyLifecycle.host_uuid == host_uuid) + async with self._session() as session: + result = await session.execute(stmt) + row = result.scalars().first() + return row.model_dump(mode="json") if row else None + + async def sweep_stale_lifecycle( + self, + older_than: datetime, + reason: str, + ) -> int: + now = datetime.now(timezone.utc) + async with self._session() as session: + result = await session.execute( + update(DeckyLifecycle) + .where(DeckyLifecycle.status.in_(("pending", "running"))) # type: ignore[attr-defined] + .where(DeckyLifecycle.started_at < older_than) + .values( + status="failed", + error=reason, + updated_at=now, + completed_at=now, + ) + ) + await session.commit() + return result.rowcount or 0 diff --git a/tests/db/test_base_repo.py b/tests/db/test_base_repo.py index d3e2fbcd..f0c586da 100644 --- a/tests/db/test_base_repo.py +++ b/tests/db/test_base_repo.py @@ -153,6 +153,17 @@ class DummyRepo(BaseRepository): async def iter_canary_triggers_since(self, since): return yield + # DeckyLifecycle surface + async def create_lifecycle(self, data): + await super().create_lifecycle(data); return "" + async def update_lifecycle(self, lifecycle_id, fields): + await super().update_lifecycle(lifecycle_id, fields) + async def get_lifecycle_by_ids(self, ids): + await super().get_lifecycle_by_ids(ids); return [] + async def find_open_lifecycle(self, decky_name, operation, host_uuid=None): + await super().find_open_lifecycle(decky_name, operation, host_uuid); return None + async def sweep_stale_lifecycle(self, older_than, reason): + await super().sweep_stale_lifecycle(older_than, reason); return 0 @pytest.mark.asyncio async def test_base_repo_coverage(): @@ -319,6 +330,11 @@ async def test_base_repo_coverage(): (dr.upsert_decky_shard, ({},)), (dr.list_decky_shards, ()), (dr.delete_decky_shards_for_host, ("u",)), + (dr.create_lifecycle, ({"decky_name": "d", "operation": "deploy"},)), + (dr.update_lifecycle, ("id", {})), + (dr.get_lifecycle_by_ids, (["id"],)), + (dr.find_open_lifecycle, ("d", "deploy")), + (dr.sweep_stale_lifecycle, (datetime.now(timezone.utc), "reason")), (dr.create_topology, ({},)), (dr.get_topology, ("t",)), (dr.list_topologies, ()), diff --git a/tests/db/test_decky_lifecycle.py b/tests/db/test_decky_lifecycle.py new file mode 100644 index 00000000..801b8cfa --- /dev/null +++ b/tests/db/test_decky_lifecycle.py @@ -0,0 +1,158 @@ +"""DeckyLifecycle repo CRUD + sweep tests. + +State machine: pending → running → succeeded | failed. Rows are +append-only after terminal; retries write a new row. +""" +from __future__ import annotations + +from datetime import datetime, timedelta, timezone +from pathlib import Path + +import pytest + +from decnet.web.db.factory import get_repository + + +@pytest.fixture +async def repo(tmp_path: Path): + r = get_repository(db_path=str(tmp_path / "lifecycle.db")) + await r.initialize() + return r + + +@pytest.mark.anyio +async def test_create_lifecycle_returns_id_and_defaults_pending(repo) -> None: + lid = await repo.create_lifecycle( + {"decky_name": "decky-01", "operation": "deploy"}, + ) + assert isinstance(lid, str) and lid + rows = await repo.get_lifecycle_by_ids([lid]) + assert len(rows) == 1 + row = rows[0] + assert row["decky_name"] == "decky-01" + assert row["operation"] == "deploy" + assert row["status"] == "pending" + assert row["error"] is None + assert row["completed_at"] is None + assert row["started_at"] is not None + assert row["updated_at"] is not None + + +@pytest.mark.anyio +async def test_update_lifecycle_terminal_stamps_completed_at(repo) -> None: + lid = await repo.create_lifecycle( + {"decky_name": "decky-01", "operation": "mutate"}, + ) + await repo.update_lifecycle(lid, {"status": "running"}) + rows = await repo.get_lifecycle_by_ids([lid]) + assert rows[0]["status"] == "running" + assert rows[0]["completed_at"] is None + + await repo.update_lifecycle( + lid, {"status": "succeeded"}, + ) + rows = await repo.get_lifecycle_by_ids([lid]) + assert rows[0]["status"] == "succeeded" + assert rows[0]["completed_at"] is not None + + +@pytest.mark.anyio +async def test_update_lifecycle_failure_carries_error(repo) -> None: + lid = await repo.create_lifecycle( + {"decky_name": "decky-01", "operation": "deploy"}, + ) + await repo.update_lifecycle( + lid, {"status": "failed", "error": "compose blew up"}, + ) + rows = await repo.get_lifecycle_by_ids([lid]) + assert rows[0]["status"] == "failed" + assert rows[0]["error"] == "compose blew up" + assert rows[0]["completed_at"] is not None + + +@pytest.mark.anyio +async def test_get_lifecycle_by_ids_empty_list_returns_empty(repo) -> None: + assert await repo.get_lifecycle_by_ids([]) == [] + + +@pytest.mark.anyio +async def test_get_lifecycle_by_ids_unknown_id_silently_omitted(repo) -> None: + lid = await repo.create_lifecycle( + {"decky_name": "d", "operation": "deploy"}, + ) + rows = await repo.get_lifecycle_by_ids([lid, "no-such-id"]) + assert len(rows) == 1 + assert rows[0]["id"] == lid + + +@pytest.mark.anyio +async def test_find_open_lifecycle_matches_pending_and_running(repo) -> None: + p = await repo.create_lifecycle( + {"decky_name": "decky-01", "operation": "deploy"}, + ) + found = await repo.find_open_lifecycle("decky-01", "deploy") + assert found is not None + assert found["id"] == p + + await repo.update_lifecycle(p, {"status": "running"}) + found = await repo.find_open_lifecycle("decky-01", "deploy") + assert found is not None + assert found["status"] == "running" + + +@pytest.mark.anyio +async def test_find_open_lifecycle_skips_terminal_rows(repo) -> None: + lid = await repo.create_lifecycle( + {"decky_name": "decky-01", "operation": "deploy"}, + ) + await repo.update_lifecycle(lid, {"status": "succeeded"}) + assert await repo.find_open_lifecycle("decky-01", "deploy") is None + + +@pytest.mark.anyio +async def test_find_open_lifecycle_host_uuid_filter(repo) -> None: + a = await repo.create_lifecycle( + {"decky_name": "d", "operation": "deploy", "host_uuid": "h1"}, + ) + await repo.create_lifecycle( + {"decky_name": "d", "operation": "deploy", "host_uuid": "h2"}, + ) + found = await repo.find_open_lifecycle("d", "deploy", host_uuid="h1") + assert found is not None + assert found["id"] == a + + +@pytest.mark.anyio +async def test_sweep_marks_stale_rows_failed(repo) -> None: + # Stale: started_at well in the past, still pending. + stale_id = await repo.create_lifecycle( + {"decky_name": "old", "operation": "deploy"}, + ) + # Force its started_at backwards via update (sweep relies on it). + long_ago = datetime.now(timezone.utc) - timedelta(hours=2) + await repo.update_lifecycle(stale_id, {"started_at": long_ago}) + + # Fresh: just-created, must NOT be swept. + fresh_id = await repo.create_lifecycle( + {"decky_name": "new", "operation": "deploy"}, + ) + + cutoff = datetime.now(timezone.utc) - timedelta(hours=1) + swept = await repo.sweep_stale_lifecycle( + cutoff, reason="master restarted during operation", + ) + assert swept == 1 + + rows = await repo.get_lifecycle_by_ids([stale_id, fresh_id]) + by_id = {r["id"]: r for r in rows} + assert by_id[stale_id]["status"] == "failed" + assert by_id[stale_id]["error"] == "master restarted during operation" + assert by_id[stale_id]["completed_at"] is not None + assert by_id[fresh_id]["status"] == "pending" + + +@pytest.mark.anyio +async def test_sweep_no_op_when_no_stale_rows(repo) -> None: + await repo.create_lifecycle({"decky_name": "d", "operation": "deploy"}) + cutoff = datetime.now(timezone.utc) - timedelta(hours=1) + assert await repo.sweep_stale_lifecycle(cutoff, reason="x") == 0