diff --git a/decnet/bus/topics.py b/decnet/bus/topics.py index 0ef3cba0..6fdfa03b 100644 --- a/decnet/bus/topics.py +++ b/decnet/bus/topics.py @@ -107,6 +107,11 @@ DECKY_SERVICE_REMOVED = "service_removed" # when the operator hit Apply (container was force-recreated to pick up # the new env), false when they only hit Save (DB-only). DECKY_SERVICE_CONFIG_CHANGED = "service_config_changed" +# Async deploy/mutate operation transitions +# (pending/running/succeeded/failed). Payload: {lifecycle_id, operation, +# status, error?}. UI polling endpoint is the source of truth; this +# fires for live subscribers (dashboard, mutator-side audit, etc). +DECKY_LIFECYCLE = "lifecycle" # Attacker event types (second token under the ``attacker`` root). First # sighting, session boundary transitions, and score-threshold crossings @@ -391,6 +396,12 @@ def decky_mutation(decky_id: str) -> str: return f"{DECKY}.{decky_id}.{DECKY_MUTATION}" +def decky_lifecycle(decky_id: str) -> str: + """Build ``decky..lifecycle``.""" + _reject_tokens(decky_id) + return f"{DECKY}.{decky_id}.{DECKY_LIFECYCLE}" + + def system(event_type: str) -> str: """Build ``system.``. diff --git a/decnet/lifecycle/__init__.py b/decnet/lifecycle/__init__.py new file mode 100644 index 00000000..9e3ccd9e --- /dev/null +++ b/decnet/lifecycle/__init__.py @@ -0,0 +1,15 @@ +"""Async deploy/mutate lifecycle runner. + +The runner is invoked by the master API handlers (deploy + mutate) after +they write ``DeckyLifecycle`` rows and return 202 Accepted to the +caller. It executes the actual docker work off the request thread, +flips lifecycle row status through ``running -> succeeded|failed``, and +emits ``decky..lifecycle`` bus signals on every transition. + +Strategy classes encapsulate transport (local docker on master vs +remote agent over mTLS). ``runner.run_deploy`` / ``run_mutate`` pick +the right strategy from the request context. +""" +from decnet.lifecycle.runner import run_deploy, run_mutate + +__all__ = ["run_deploy", "run_mutate"] diff --git a/decnet/lifecycle/events.py b/decnet/lifecycle/events.py new file mode 100644 index 00000000..e5fcb754 --- /dev/null +++ b/decnet/lifecycle/events.py @@ -0,0 +1,43 @@ +"""Bus emit helper for DeckyLifecycle transitions. + +DB is the source of truth (wizard polls ``GET /deckies/lifecycle?ids=``). +The bus is best-effort live notification — publish failures are logged +and swallowed via ``publish_safely``, never propagated. +""" +from __future__ import annotations + +from typing import Optional + +from decnet.bus import topics as _topics +from decnet.bus.base import BaseBus +from decnet.bus.publish import publish_safely + + +async def emit_lifecycle( + bus: BaseBus | None, + *, + lifecycle_id: str, + decky_name: str, + operation: str, + status: str, + error: Optional[str] = None, +) -> None: + """Publish ``decky..lifecycle`` with the current transition. + + Payload keys: ``lifecycle_id``, ``operation``, ``status`` and + optionally ``error``. Documented in + ``wiki-checkout/Service-Bus.md``. + """ + payload: dict = { + "lifecycle_id": lifecycle_id, + "operation": operation, + "status": status, + } + if error is not None: + payload["error"] = error + await publish_safely( + bus, + _topics.decky_lifecycle(decky_name), + payload, + event_type=_topics.DECKY_LIFECYCLE, + ) diff --git a/decnet/lifecycle/runner.py b/decnet/lifecycle/runner.py new file mode 100644 index 00000000..e64aa5b1 --- /dev/null +++ b/decnet/lifecycle/runner.py @@ -0,0 +1,96 @@ +"""Async deploy/mutate orchestration entry points. + +Called by the master API handlers right after they create the lifecycle +rows. Picks the right strategy (local vs swarm) and runs it off the +HTTP request thread via ``asyncio.create_task`` at the caller. +""" +from __future__ import annotations + +from pathlib import Path + +from decnet.bus.base import BaseBus +from decnet.config import DecnetConfig, DeckyConfig +from decnet.lifecycle.strategies import ( + LocalDeployStrategy, + SwarmDeployStrategy, + select_deploy_strategy, + select_mutate_strategy, +) +from decnet.logging import get_logger +from decnet.web.db.repository import BaseRepository + +log = get_logger("lifecycle.runner") + + +async def run_deploy( + repo: BaseRepository, + bus: BaseBus | None, + *, + lifecycle_ids: dict[str, str], + config: DecnetConfig, +) -> None: + """Execute the deploy referenced by *lifecycle_ids* (decky_name -> + lifecycle_id). Never raises — strategy turns errors into failed + rows. Intended to be wrapped in ``asyncio.create_task``. + + In swarm mode the config may contain BOTH worker-resident deckies + (host_uuid set) and master-resident ones (host_uuid is None); we + route each subset through its own strategy. + """ + try: + if config.mode == "swarm": + remote_deckies = [d for d in config.deckies if d.host_uuid is not None] + local_deckies = [d for d in config.deckies if d.host_uuid is None] + if remote_deckies: + remote_ids = { + d.name: lifecycle_ids[d.name] + for d in remote_deckies if d.name in lifecycle_ids + } + remote_cfg = config.model_copy(update={"deckies": remote_deckies}) + await SwarmDeployStrategy().execute( + repo, bus, + lifecycle_ids=remote_ids, config=remote_cfg, + ) + if local_deckies: + local_ids = { + d.name: lifecycle_ids[d.name] + for d in local_deckies if d.name in lifecycle_ids + } + local_cfg = config.model_copy(update={"deckies": local_deckies}) + await LocalDeployStrategy().execute( + repo, bus, + lifecycle_ids=local_ids, config=local_cfg, + ) + else: + strategy = select_deploy_strategy(config) + await strategy.execute( + repo, bus, lifecycle_ids=lifecycle_ids, config=config, + ) + except Exception: # noqa: BLE001 — defense in depth: never crash task + log.exception("lifecycle.run_deploy crashed unexpectedly") + + +async def run_mutate( + repo: BaseRepository, + bus: BaseBus | None, + *, + lifecycle_id: str, + decky: DeckyConfig, + services: list[str], + full_config: DecnetConfig, + compose_path: Path, +) -> None: + """Execute a single-decky mutate. Never raises.""" + try: + strategy = select_mutate_strategy(full_config, decky) + await strategy.execute( + repo, bus, + lifecycle_id=lifecycle_id, decky=decky, + services=services, full_config=full_config, + compose_path=compose_path, + ) + except Exception: # noqa: BLE001 + log.exception("lifecycle.run_mutate crashed unexpectedly") + + +__all__ = ["run_deploy", "run_mutate"] diff --git a/decnet/lifecycle/strategies.py b/decnet/lifecycle/strategies.py new file mode 100644 index 00000000..b950ead6 --- /dev/null +++ b/decnet/lifecycle/strategies.py @@ -0,0 +1,376 @@ +"""Lifecycle execution strategies. + +Each strategy owns the work for one (operation, transport) combo: + +* ``LocalDeployStrategy`` — master-resident deckies: writes a compose + file and runs ``docker compose up -d`` on the master via + ``engine.deployer.deploy`` off the request thread. +* ``SwarmDeployStrategy`` — worker-resident deckies: fans the sharded + config to each worker via ``AgentClient.deploy``. The worker returns + 202 immediately; the worker's next heartbeat drives the terminal + transition (see ``master heartbeat handler accepts lifecycle deltas``). +* ``LocalMutateStrategy`` / ``SwarmMutateStrategy`` — same split, for a + per-decky mutate of services list. + +The runner picks the right concrete class. Strategies update the DB +row + emit bus signals; they never raise back at the runner — they +turn exceptions into ``failed`` rows and return. +""" +from __future__ import annotations + +import abc +from datetime import datetime, timezone + +import anyio + +from decnet.bus.base import BaseBus +from decnet.config import DecnetConfig, DeckyConfig +from decnet.lifecycle.events import emit_lifecycle +from decnet.logging import get_logger +from decnet.web.db.repository import BaseRepository + +log = get_logger("lifecycle.strategy") + + +# --- base ---------------------------------------------------------------- + +class _StrategyBase(abc.ABC): + """Shared helpers — DB row transitions + bus emit.""" + + async def _mark_running( + self, + repo: BaseRepository, + bus: BaseBus | None, + *, + lifecycle_id: str, + decky_name: str, + operation: str, + ) -> None: + await repo.update_lifecycle(lifecycle_id, {"status": "running"}) + await emit_lifecycle( + bus, + lifecycle_id=lifecycle_id, + decky_name=decky_name, + operation=operation, + status="running", + ) + + async def _mark_succeeded( + self, + repo: BaseRepository, + bus: BaseBus | None, + *, + lifecycle_id: str, + decky_name: str, + operation: str, + ) -> None: + await repo.update_lifecycle( + lifecycle_id, + { + "status": "succeeded", + "completed_at": datetime.now(timezone.utc), + }, + ) + await emit_lifecycle( + bus, + lifecycle_id=lifecycle_id, + decky_name=decky_name, + operation=operation, + status="succeeded", + ) + + async def _mark_failed( + self, + repo: BaseRepository, + bus: BaseBus | None, + *, + lifecycle_id: str, + decky_name: str, + operation: str, + error: str, + ) -> None: + await repo.update_lifecycle( + lifecycle_id, + { + "status": "failed", + "error": error[:2000], + "completed_at": datetime.now(timezone.utc), + }, + ) + await emit_lifecycle( + bus, + lifecycle_id=lifecycle_id, + decky_name=decky_name, + operation=operation, + status="failed", + error=error[:2000], + ) + + +# --- deploy -------------------------------------------------------------- + +class DeployStrategy(_StrategyBase): + """ABC for deploy strategies. Concrete implementations override + :meth:`execute`.""" + + @abc.abstractmethod + async def execute( + self, + repo: BaseRepository, + bus: BaseBus | None, + *, + lifecycle_ids: dict[str, str], # decky_name -> lifecycle_id + config: DecnetConfig, + ) -> None: ... + + +class LocalDeployStrategy(DeployStrategy): + """Master-resident deploy via ``engine.deployer.deploy``. + + Coalesces N decky lifecycle rows into one compose-up call (compose + is naturally batched), then flips all rows together. + """ + + async def execute( + self, + repo: BaseRepository, + bus: BaseBus | None, + *, + lifecycle_ids: dict[str, str], + config: DecnetConfig, + ) -> None: + from decnet.engine import deployer as _deployer + + for decky_name, lid in lifecycle_ids.items(): + await self._mark_running( + repo, bus, lifecycle_id=lid, + decky_name=decky_name, operation="deploy", + ) + try: + await anyio.to_thread.run_sync( + _deployer.deploy, config, False, False, False, + ) + except Exception as exc: # noqa: BLE001 + err = f"{type(exc).__name__}: {exc}" + log.exception("local deploy failed") + for decky_name, lid in lifecycle_ids.items(): + await self._mark_failed( + repo, bus, lifecycle_id=lid, + decky_name=decky_name, operation="deploy", + error=err, + ) + return + for decky_name, lid in lifecycle_ids.items(): + await self._mark_succeeded( + repo, bus, lifecycle_id=lid, + decky_name=decky_name, operation="deploy", + ) + + +class SwarmDeployStrategy(DeployStrategy): + """Worker-resident deploy via ``AgentClient.deploy``. + + Marks rows ``running`` on dispatch. The worker's /deploy is async + (202); its next heartbeat carries lifecycle deltas that drive the + terminal transition via the master's heartbeat handler. If the + dispatch itself raises (network / mTLS / 5xx), the row is marked + ``failed`` here. + """ + + async def execute( + self, + repo: BaseRepository, + bus: BaseBus | None, + *, + lifecycle_ids: dict[str, str], + config: DecnetConfig, + ) -> None: + from decnet.engine.deployer import _resolve_swarm_host + from decnet.swarm.client import AgentClient + + # Shard deckies by host so we can fire one AgentClient.deploy + # per host carrying that host's slice of the config. + shards: dict[str, list[DeckyConfig]] = {} + for decky in config.deckies: + if decky.host_uuid is None: + # Master-resident decky in swarm mode: skip here; runner + # routes those through LocalDeployStrategy at the + # caller's discretion. Defensive guard only. + continue + shards.setdefault(decky.host_uuid, []).append(decky) + + for host_uuid, deckies in shards.items(): + shard_lifecycle = { + d.name: lifecycle_ids[d.name] + for d in deckies if d.name in lifecycle_ids + } + for decky in deckies: + lid = shard_lifecycle.get(decky.name) + if lid is None: + continue + await self._mark_running( + repo, bus, lifecycle_id=lid, + decky_name=decky.name, operation="deploy", + ) + try: + host = await _resolve_swarm_host(repo, host_uuid) + shard_cfg = config.model_copy(update={"deckies": deckies}) + async with AgentClient(host=host) as agent: + await agent.deploy(shard_cfg) + except Exception as exc: # noqa: BLE001 + err = f"{type(exc).__name__}: {exc}" + log.exception( + "swarm deploy dispatch failed host_uuid=%s", host_uuid, + ) + for decky_name, lid in shard_lifecycle.items(): + await self._mark_failed( + repo, bus, lifecycle_id=lid, + decky_name=decky_name, operation="deploy", + error=err, + ) + continue + # Successful dispatch -> rows stay running; worker drives + # the terminal via heartbeat. + + +# --- mutate -------------------------------------------------------------- + +class MutateStrategy(_StrategyBase): + """ABC for mutate strategies.""" + + @abc.abstractmethod + async def execute( + self, + repo: BaseRepository, + bus: BaseBus | None, + *, + lifecycle_id: str, + decky: DeckyConfig, + services: list[str], + full_config: DecnetConfig, + compose_path, + ) -> None: ... + + +class LocalMutateStrategy(MutateStrategy): + """Master-local mutate: rewrites compose + ``compose up -d``.""" + + async def execute( + self, + repo: BaseRepository, + bus: BaseBus | None, + *, + lifecycle_id: str, + decky: DeckyConfig, + services: list[str], + full_config: DecnetConfig, + compose_path, + ) -> None: + from decnet.composer import write_compose + from decnet.engine import _compose_with_retry + + await self._mark_running( + repo, bus, lifecycle_id=lifecycle_id, + decky_name=decky.name, operation="mutate", + ) + try: + decky.services = list(services) + write_compose(full_config, compose_path) + await anyio.to_thread.run_sync( + lambda: _compose_with_retry( + "up", "-d", "--remove-orphans", + compose_file=compose_path, + ), + ) + except Exception as exc: # noqa: BLE001 + err = f"{type(exc).__name__}: {exc}" + log.exception("local mutate failed decky=%s", decky.name) + await self._mark_failed( + repo, bus, lifecycle_id=lifecycle_id, + decky_name=decky.name, operation="mutate", + error=err, + ) + return + await self._mark_succeeded( + repo, bus, lifecycle_id=lifecycle_id, + decky_name=decky.name, operation="mutate", + ) + + +class SwarmMutateStrategy(MutateStrategy): + """Worker-resident mutate via ``AgentClient.mutate``. + + Same shape as SwarmDeployStrategy: row -> running on dispatch, + worker drives terminal via heartbeat. + """ + + async def execute( + self, + repo: BaseRepository, + bus: BaseBus | None, + *, + lifecycle_id: str, + decky: DeckyConfig, + services: list[str], + full_config: DecnetConfig, + compose_path, + ) -> None: + from decnet.engine.deployer import _resolve_swarm_host + from decnet.swarm.client import AgentClient + + await self._mark_running( + repo, bus, lifecycle_id=lifecycle_id, + decky_name=decky.name, operation="mutate", + ) + if decky.host_uuid is None: + await self._mark_failed( + repo, bus, lifecycle_id=lifecycle_id, + decky_name=decky.name, operation="mutate", + error="swarm mutate strategy invoked for decky with no host_uuid", + ) + return + try: + host = await _resolve_swarm_host(repo, decky.host_uuid) + async with AgentClient(host=host) as agent: + await agent.mutate(decky.name, list(services)) + except Exception as exc: # noqa: BLE001 + err = f"{type(exc).__name__}: {exc}" + log.exception("swarm mutate dispatch failed decky=%s", decky.name) + await self._mark_failed( + repo, bus, lifecycle_id=lifecycle_id, + decky_name=decky.name, operation="mutate", + error=err, + ) + return + # Worker drives terminal via heartbeat. + + +def select_deploy_strategy(config: DecnetConfig) -> DeployStrategy: + """Pick strategy by deployment mode. In swarm mode deckies with + ``host_uuid`` go remote; the caller must route master-resident + swarm deckies (host_uuid=None) through the local strategy + separately.""" + if config.mode == "swarm": + return SwarmDeployStrategy() + return LocalDeployStrategy() + + +def select_mutate_strategy( + config: DecnetConfig, decky: DeckyConfig, +) -> MutateStrategy: + """Pick strategy by decky placement.""" + if config.mode == "swarm" and decky.host_uuid is not None: + return SwarmMutateStrategy() + return LocalMutateStrategy() + + +__all__ = [ + "DeployStrategy", + "LocalDeployStrategy", + "SwarmDeployStrategy", + "MutateStrategy", + "LocalMutateStrategy", + "SwarmMutateStrategy", + "select_deploy_strategy", + "select_mutate_strategy", +] diff --git a/tests/lifecycle/__init__.py b/tests/lifecycle/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/lifecycle/test_runner.py b/tests/lifecycle/test_runner.py new file mode 100644 index 00000000..ad24f289 --- /dev/null +++ b/tests/lifecycle/test_runner.py @@ -0,0 +1,343 @@ +"""decnet.lifecycle: runner + strategy tests. + +All docker calls and AgentClient I/O are mocked; we exercise the +state-machine transitions (pending -> running -> succeeded|failed) and +the routing (swarm vs unihost; per-decky host_uuid). +""" +from __future__ import annotations + +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from decnet.config import DeckyConfig, DecnetConfig +from decnet.lifecycle.runner import run_deploy, run_mutate +from decnet.lifecycle.strategies import ( + LocalDeployStrategy, + LocalMutateStrategy, + SwarmDeployStrategy, + SwarmMutateStrategy, + select_deploy_strategy, + select_mutate_strategy, +) + + +def _decky(name="decky-01", host_uuid=None) -> DeckyConfig: + return DeckyConfig( + name=name, ip="10.66.0.10", + services=["ssh"], distro="debian", + base_image="debian:bookworm-slim", hostname=name, + host_uuid=host_uuid, + ) + + +def _config(mode="unihost", deckies=None) -> DecnetConfig: + return DecnetConfig( + mode=mode, interface="eth0", + subnet="10.66.0.0/24", gateway="10.66.0.1", + deckies=deckies or [_decky()], + ) + + +class _RepoStub: + def __init__(self): + self.updates: list[tuple[str, dict]] = [] + + async def update_lifecycle(self, lid, fields): + self.updates.append((lid, fields)) + + +# --- strategy selection -------------------------------------------------- + +def test_select_deploy_unihost_returns_local() -> None: + assert isinstance(select_deploy_strategy(_config()), LocalDeployStrategy) + + +def test_select_deploy_swarm_returns_swarm() -> None: + cfg = _config(mode="swarm", deckies=[_decky(host_uuid="h1")]) + assert isinstance(select_deploy_strategy(cfg), SwarmDeployStrategy) + + +def test_select_mutate_master_resident_returns_local() -> None: + cfg = _config(mode="swarm", deckies=[_decky(host_uuid=None)]) + assert isinstance( + select_mutate_strategy(cfg, cfg.deckies[0]), LocalMutateStrategy, + ) + + +def test_select_mutate_swarm_resident_returns_swarm() -> None: + cfg = _config(mode="swarm", deckies=[_decky(host_uuid="h1")]) + assert isinstance( + select_mutate_strategy(cfg, cfg.deckies[0]), SwarmMutateStrategy, + ) + + +# --- LocalDeployStrategy ------------------------------------------------- + +@pytest.mark.asyncio +async def test_local_deploy_success_flips_all_rows() -> None: + cfg = _config(deckies=[_decky("d1"), _decky("d2")]) + repo = _RepoStub() + with patch("anyio.to_thread.run_sync", new_callable=AsyncMock): + await LocalDeployStrategy().execute( + repo, None, + lifecycle_ids={"d1": "lid-1", "d2": "lid-2"}, + config=cfg, + ) + statuses = [(lid, f["status"]) for lid, f in repo.updates] + # Each decky: running then succeeded + assert ("lid-1", "running") in statuses + assert ("lid-2", "running") in statuses + assert ("lid-1", "succeeded") in statuses + assert ("lid-2", "succeeded") in statuses + + +@pytest.mark.asyncio +async def test_local_deploy_failure_flips_all_rows_failed() -> None: + cfg = _config(deckies=[_decky("d1"), _decky("d2")]) + repo = _RepoStub() + with patch( + "anyio.to_thread.run_sync", + new_callable=AsyncMock, + side_effect=RuntimeError("compose boom"), + ): + await LocalDeployStrategy().execute( + repo, None, + lifecycle_ids={"d1": "lid-1", "d2": "lid-2"}, + config=cfg, + ) + failed = [(lid, f) for lid, f in repo.updates if f["status"] == "failed"] + assert len(failed) == 2 + assert all("compose boom" in f["error"] for _, f in failed) + + +# --- SwarmDeployStrategy ------------------------------------------------- + +@pytest.mark.asyncio +async def test_swarm_deploy_dispatches_per_host_shard() -> None: + cfg = _config( + mode="swarm", + deckies=[ + _decky("d1", host_uuid="h1"), + _decky("d2", host_uuid="h1"), + _decky("d3", host_uuid="h2"), + ], + ) + repo = _RepoStub() + deploy_mock = AsyncMock(return_value={"status": "accepted"}) + agent_ctx = MagicMock() + agent_ctx.__aenter__ = AsyncMock( + return_value=MagicMock(deploy=deploy_mock), + ) + agent_ctx.__aexit__ = AsyncMock(return_value=None) + with patch( + "decnet.engine.deployer._resolve_swarm_host", + new_callable=AsyncMock, + return_value={"uuid": "x", "address": "10.0.0.1"}, + ), patch( + "decnet.swarm.client.AgentClient", return_value=agent_ctx, + ): + await SwarmDeployStrategy().execute( + repo, None, + lifecycle_ids={"d1": "lid-1", "d2": "lid-2", "d3": "lid-3"}, + config=cfg, + ) + # One AgentClient.deploy call per host. + assert deploy_mock.await_count == 2 + # All rows transition to running; none reach terminal (worker drives). + statuses = {(lid, f["status"]) for lid, f in repo.updates} + assert ("lid-1", "running") in statuses + assert ("lid-2", "running") in statuses + assert ("lid-3", "running") in statuses + assert not any(s in ("succeeded", "failed") for _, s in statuses) + + +@pytest.mark.asyncio +async def test_swarm_deploy_dispatch_failure_marks_shard_failed() -> None: + cfg = _config( + mode="swarm", + deckies=[_decky("d1", host_uuid="h1"), _decky("d2", host_uuid="h1")], + ) + repo = _RepoStub() + with patch( + "decnet.engine.deployer._resolve_swarm_host", + new_callable=AsyncMock, + side_effect=ValueError("unknown host"), + ): + await SwarmDeployStrategy().execute( + repo, None, + lifecycle_ids={"d1": "lid-1", "d2": "lid-2"}, + config=cfg, + ) + failed = [(lid, f) for lid, f in repo.updates if f["status"] == "failed"] + assert len(failed) == 2 + assert all("unknown host" in f["error"] for _, f in failed) + + +# --- LocalMutateStrategy / runner -------------------------------------- + +@pytest.mark.asyncio +async def test_local_mutate_success(tmp_path: Path) -> None: + cfg = _config(deckies=[_decky("d1")]) + decky = cfg.deckies[0] + repo = _RepoStub() + with patch("decnet.composer.write_compose"), \ + patch("anyio.to_thread.run_sync", new_callable=AsyncMock): + await LocalMutateStrategy().execute( + repo, None, + lifecycle_id="lid-1", + decky=decky, + services=["http"], + full_config=cfg, + compose_path=tmp_path / "c.yml", + ) + statuses = [f["status"] for _, f in repo.updates] + assert "running" in statuses + assert "succeeded" in statuses + # Side effect: decky.services was mutated in place. + assert decky.services == ["http"] + + +@pytest.mark.asyncio +async def test_local_mutate_failure(tmp_path: Path) -> None: + cfg = _config(deckies=[_decky("d1")]) + repo = _RepoStub() + with patch("decnet.composer.write_compose"), \ + patch( + "anyio.to_thread.run_sync", + new_callable=AsyncMock, + side_effect=RuntimeError("docker fail"), + ): + await LocalMutateStrategy().execute( + repo, None, + lifecycle_id="lid-1", + decky=cfg.deckies[0], + services=["http"], + full_config=cfg, + compose_path=tmp_path / "c.yml", + ) + statuses = [f["status"] for _, f in repo.updates] + assert "running" in statuses + assert "failed" in statuses + + +# --- SwarmMutateStrategy ------------------------------------------------- + +@pytest.mark.asyncio +async def test_swarm_mutate_dispatches_via_agent(tmp_path: Path) -> None: + cfg = _config(mode="swarm", deckies=[_decky("d1", host_uuid="h1")]) + repo = _RepoStub() + mutate_mock = AsyncMock(return_value={"status": "accepted"}) + agent_ctx = MagicMock() + agent_ctx.__aenter__ = AsyncMock( + return_value=MagicMock(mutate=mutate_mock), + ) + agent_ctx.__aexit__ = AsyncMock(return_value=None) + with patch( + "decnet.engine.deployer._resolve_swarm_host", + new_callable=AsyncMock, + return_value={"uuid": "h1", "address": "10.0.0.1"}, + ), patch( + "decnet.swarm.client.AgentClient", return_value=agent_ctx, + ): + await SwarmMutateStrategy().execute( + repo, None, + lifecycle_id="lid-1", + decky=cfg.deckies[0], + services=["http"], + full_config=cfg, + compose_path=tmp_path / "c.yml", + ) + mutate_mock.assert_awaited_once() + # Row was flipped to running; worker drives terminal. + statuses = [f["status"] for _, f in repo.updates] + assert "running" in statuses + assert "succeeded" not in statuses + assert "failed" not in statuses + + +# --- runner orchestration ------------------------------------------------ + +@pytest.mark.asyncio +async def test_run_deploy_unihost_uses_local_strategy() -> None: + cfg = _config(deckies=[_decky("d1")]) + repo = _RepoStub() + with patch("anyio.to_thread.run_sync", new_callable=AsyncMock): + await run_deploy(repo, None, lifecycle_ids={"d1": "lid-1"}, config=cfg) + statuses = [f["status"] for _, f in repo.updates] + assert statuses == ["running", "succeeded"] + + +@pytest.mark.asyncio +async def test_run_deploy_swarm_splits_routes() -> None: + """In swarm mode, mixed master-resident + worker-resident deckies + take both strategies.""" + cfg = _config( + mode="swarm", + deckies=[ + _decky("local-one", host_uuid=None), + _decky("remote-one", host_uuid="h1"), + ], + ) + repo = _RepoStub() + deploy_mock = AsyncMock(return_value={"status": "accepted"}) + agent_ctx = MagicMock() + agent_ctx.__aenter__ = AsyncMock( + return_value=MagicMock(deploy=deploy_mock), + ) + agent_ctx.__aexit__ = AsyncMock(return_value=None) + with patch( + "decnet.engine.deployer._resolve_swarm_host", + new_callable=AsyncMock, + return_value={"uuid": "h1", "address": "10.0.0.1"}, + ), patch( + "decnet.swarm.client.AgentClient", return_value=agent_ctx, + ), patch( + "anyio.to_thread.run_sync", new_callable=AsyncMock, + ): + await run_deploy( + repo, None, + lifecycle_ids={"local-one": "lid-L", "remote-one": "lid-R"}, + config=cfg, + ) + # local-one ran end-to-end; remote-one ran -> running only. + by_lid: dict[str, list[str]] = {} + for lid, f in repo.updates: + by_lid.setdefault(lid, []).append(f["status"]) + assert by_lid["lid-L"] == ["running", "succeeded"] + assert by_lid["lid-R"] == ["running"] + deploy_mock.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_run_mutate_local(tmp_path: Path) -> None: + cfg = _config(deckies=[_decky("d1")]) + repo = _RepoStub() + with patch("decnet.composer.write_compose"), \ + patch("anyio.to_thread.run_sync", new_callable=AsyncMock): + await run_mutate( + repo, None, + lifecycle_id="lid-1", + decky=cfg.deckies[0], + services=["http"], + full_config=cfg, + compose_path=tmp_path / "c.yml", + ) + statuses = [f["status"] for _, f in repo.updates] + assert statuses == ["running", "succeeded"] + + +@pytest.mark.asyncio +async def test_run_deploy_never_raises_when_strategy_crashes() -> None: + """Defense in depth: a strategy bug must not crash the task and + leave rows wedged in pending.""" + cfg = _config(deckies=[_decky("d1")]) + repo = _RepoStub() + with patch( + "decnet.lifecycle.strategies.LocalDeployStrategy.execute", + new_callable=AsyncMock, + side_effect=RuntimeError("bug"), + ): + # Should not raise. + await run_deploy(repo, None, lifecycle_ids={"d1": "lid-1"}, config=cfg)