diff --git a/decnet/engine/deployer.py b/decnet/engine/deployer.py index 20a62765..fd0c91eb 100644 --- a/decnet/engine/deployer.py +++ b/decnet/engine/deployer.py @@ -429,6 +429,83 @@ def _emit_lifecycle_event( decky_name, trigger, exc) +def _run_async(coro_factory) -> None: + """Run an async coroutine from a sync context, even when an event loop + is already running on this thread. + + ``deploy()`` / ``teardown()`` are sync, but the API handler at + ``web.router.fleet.api_deploy_deckies`` calls them from inside its own + event loop. ``asyncio.run`` refuses to run nested, so we always punt + to a fresh thread — small overhead, but deploy is already a heavy op. + """ + import threading + err: list[BaseException] = [] + + def _runner() -> None: + try: + asyncio.run(coro_factory()) + except BaseException as exc: # noqa: BLE001 + err.append(exc) + + t = threading.Thread(target=_runner, daemon=False) + t.start() + t.join() + if err: + raise err[0] + + +def _mirror_fleet_deploy_to_db(config: DecnetConfig) -> None: + """Mirror fleet rows into the ``fleet_deckies`` DB table. + + Best-effort: a DB outage on a CLI-only host must not abort deploy. + The JSON state file (``decnet-state.json``) remains the canonical + artifact for every consumer that runs without the API daemon + (``decnet status``, ``decnet teardown``, sniffer, collector). + + State defaults to ``running`` to mirror what the dashboard already + assumes about JSON-only fleet rows; the reconciler corrects drift + by polling ``docker inspect``. + """ + try: + from decnet.web.db.factory import get_repository + from decnet.web.db.models import LOCAL_HOST_SENTINEL + repo = get_repository() + + async def _go() -> None: + for d in config.deckies: + await repo.upsert_fleet_decky({ + "host_uuid": d.host_uuid or LOCAL_HOST_SENTINEL, + "name": d.name, + "services": list(d.services), + "decky_config": d.model_dump(mode="json"), + "decky_ip": d.ip, + "state": "running", + }) + + _run_async(_go) + except Exception as exc: # noqa: BLE001 + log.warning("fleet DB mirror (deploy) failed (best-effort): %s", exc) + + +def _mirror_fleet_teardown_to_db(deckies) -> None: + """Remove fleet rows from the DB. Best-effort, same rationale.""" + try: + from decnet.web.db.factory import get_repository + from decnet.web.db.models import LOCAL_HOST_SENTINEL + repo = get_repository() + + async def _go() -> None: + for d in deckies: + await repo.delete_fleet_decky( + host_uuid=d.host_uuid or LOCAL_HOST_SENTINEL, + name=d.name, + ) + + _run_async(_go) + except Exception as exc: # noqa: BLE001 + log.warning("fleet DB mirror (teardown) failed (best-effort): %s", exc) + + @_traced("engine.deploy") def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False, parallel: bool = False) -> None: log.info("deployment started n_deckies=%d interface=%s subnet=%s dry_run=%s", len(config.deckies), config.interface, config.subnet, dry_run) @@ -476,6 +553,7 @@ def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False, return save_state(config, compose_path) + _mirror_fleet_deploy_to_db(config) # Emit one creation event per decky so the correlation graph has a # well-formed lifecycle start (old_services=[] ⇒ new_services=). @@ -546,6 +624,7 @@ def teardown(decky_id: str | None = None) -> None: ) _compose("stop", *svc_names, compose_file=compose_path) _compose("rm", "-f", *svc_names, compose_file=compose_path) + _mirror_fleet_teardown_to_db([decky]) else: for decky in config.deckies: _emit_lifecycle_event( @@ -564,6 +643,7 @@ def teardown(decky_id: str | None = None) -> None: teardown_host_macvlan(decky_range) remove_macvlan_network(client) clear_state() + _mirror_fleet_teardown_to_db(config.deckies) net_driver = "IPvlan" if config.ipvlan else "MACVLAN" log.info("teardown complete all deckies removed network_driver=%s", net_driver) diff --git a/tests/fleet/test_deployer.py b/tests/fleet/test_deployer.py index 8caf98dc..8068e9b4 100644 --- a/tests/fleet/test_deployer.py +++ b/tests/fleet/test_deployer.py @@ -15,6 +15,19 @@ import pytest from decnet.config import DeckyConfig, DecnetConfig +@pytest.fixture(autouse=True) +def _stub_fleet_db_mirror(request): + """The DB-mirror helpers are exercised in :class:`TestMirrorFleetToDb`; + every other test in this file mocks filesystem + docker but not the DB, + so we no-op the mirrors elsewhere to keep the suite self-contained.""" + if "MirrorFleetToDb" in request.node.nodeid: + yield + return + with patch("decnet.engine.deployer._mirror_fleet_deploy_to_db"), \ + patch("decnet.engine.deployer._mirror_fleet_teardown_to_db"): + yield + + # ── Helpers ─────────────────────────────────────────────────────────────────── def _decky(name: str = "decky-01", ip: str = "192.168.1.10", @@ -557,3 +570,95 @@ class TestPrintStatus: from decnet.engine.deployer import _print_status config = _config(deckies=[_decky(), _decky("decky-02", "192.168.1.11")]) _print_status(config) # should not raise + + +# ── DB mirror (engine ↔ fleet_deckies) ──────────────────────────────────────── + +class TestMirrorFleetToDb: + """The mirror helpers are best-effort: they replicate fleet state into + the ``fleet_deckies`` table so DB-only consumers (orchestrator, web, + REST API) see the same view as JSON consumers, but a DB failure must + never abort a CLI deploy.""" + + def _make_repo(self): + repo = MagicMock() + + async def _upsert(data): + self.upserts.append(data) + async def _delete(*, host_uuid, name): + self.deletes.append((host_uuid, name)) + + repo.upsert_fleet_decky = MagicMock(side_effect=_upsert) + repo.delete_fleet_decky = MagicMock(side_effect=_delete) + return repo + + def setup_method(self) -> None: + self.upserts: list[dict] = [] + self.deletes: list[tuple[str, str]] = [] + + @patch("decnet.web.db.factory.get_repository") + def test_deploy_mirror_upserts_each_decky(self, mock_get_repo): + from decnet.engine.deployer import _mirror_fleet_deploy_to_db + mock_get_repo.return_value = self._make_repo() + cfg = _config(deckies=[ + _decky(name="d1", ip="10.0.0.1", services=["ssh"]), + _decky(name="d2", ip="10.0.0.2", services=["http", "ftp"]), + ]) + _mirror_fleet_deploy_to_db(cfg) + assert len(self.upserts) == 2 + names = sorted(u["name"] for u in self.upserts) + assert names == ["d1", "d2"] + u1 = next(u for u in self.upserts if u["name"] == "d1") + assert u1["host_uuid"] == "local" + assert u1["services"] == ["ssh"] + assert u1["state"] == "running" + assert u1["decky_ip"] == "10.0.0.1" + assert u1["decky_config"]["name"] == "d1" + + @patch("decnet.web.db.factory.get_repository") + def test_deploy_mirror_honors_explicit_host_uuid(self, mock_get_repo): + from decnet.engine.deployer import _mirror_fleet_deploy_to_db + mock_get_repo.return_value = self._make_repo() + d = _decky() + d.host_uuid = "remote-host-abc" + _mirror_fleet_deploy_to_db(_config(deckies=[d])) + assert self.upserts[0]["host_uuid"] == "remote-host-abc" + + @patch("decnet.web.db.factory.get_repository") + def test_deploy_mirror_swallows_db_failure(self, mock_get_repo): + from decnet.engine.deployer import _mirror_fleet_deploy_to_db + mock_get_repo.side_effect = RuntimeError("db down") + _mirror_fleet_deploy_to_db(_config()) # must not raise + + @patch("decnet.web.db.factory.get_repository") + def test_teardown_mirror_deletes_each_decky(self, mock_get_repo): + from decnet.engine.deployer import _mirror_fleet_teardown_to_db + mock_get_repo.return_value = self._make_repo() + deckies = [ + _decky(name="d1", ip="10.0.0.1"), + _decky(name="d2", ip="10.0.0.2"), + ] + _mirror_fleet_teardown_to_db(deckies) + assert sorted(self.deletes) == [("local", "d1"), ("local", "d2")] + + @patch("decnet.web.db.factory.get_repository") + def test_teardown_mirror_swallows_db_failure(self, mock_get_repo): + from decnet.engine.deployer import _mirror_fleet_teardown_to_db + mock_get_repo.side_effect = RuntimeError("db down") + _mirror_fleet_teardown_to_db([_decky()]) # must not raise + + def test_run_async_works_with_running_loop(self): + """``_run_async`` must work even when the caller is already inside + an asyncio loop (the API path calls deploy() from a FastAPI handler).""" + import asyncio + from decnet.engine.deployer import _run_async + + result: list[int] = [] + + async def caller() -> None: + async def work() -> None: + result.append(42) + _run_async(work) + + asyncio.run(caller()) + assert result == [42]