feat(deployer): mirror fleet deploy/teardown into fleet_deckies table
CLI deploy now writes both surfaces: decnet-state.json (existing, canonical for offline / no-API hosts) and the new fleet_deckies DB table (visible to orchestrator, web dashboard, REST API). Best-effort: a DB outage logs a warning and returns. The JSON file remains the source of truth for `decnet status`, `decnet teardown`, sniffer, and collector — operators on a CLI-only host keep working. _run_async helper bridges sync deploy() into the async repository. Always uses a fresh thread because the API handler at web.router.fleet.api_deploy_deckies invokes deploy() from inside a FastAPI event loop, which would otherwise break asyncio.run. Verified end-to-end against MySQL: deploy mirror inserts rows, union view (list_running_deckies) returns them with source="fleet", teardown mirror removes them. Works from both sync (CLI) and async (API handler) call sites.
This commit is contained in:
@@ -429,6 +429,83 @@ def _emit_lifecycle_event(
|
||||
decky_name, trigger, exc)
|
||||
|
||||
|
||||
def _run_async(coro_factory) -> None:
|
||||
"""Run an async coroutine from a sync context, even when an event loop
|
||||
is already running on this thread.
|
||||
|
||||
``deploy()`` / ``teardown()`` are sync, but the API handler at
|
||||
``web.router.fleet.api_deploy_deckies`` calls them from inside its own
|
||||
event loop. ``asyncio.run`` refuses to run nested, so we always punt
|
||||
to a fresh thread — small overhead, but deploy is already a heavy op.
|
||||
"""
|
||||
import threading
|
||||
err: list[BaseException] = []
|
||||
|
||||
def _runner() -> None:
|
||||
try:
|
||||
asyncio.run(coro_factory())
|
||||
except BaseException as exc: # noqa: BLE001
|
||||
err.append(exc)
|
||||
|
||||
t = threading.Thread(target=_runner, daemon=False)
|
||||
t.start()
|
||||
t.join()
|
||||
if err:
|
||||
raise err[0]
|
||||
|
||||
|
||||
def _mirror_fleet_deploy_to_db(config: DecnetConfig) -> None:
|
||||
"""Mirror fleet rows into the ``fleet_deckies`` DB table.
|
||||
|
||||
Best-effort: a DB outage on a CLI-only host must not abort deploy.
|
||||
The JSON state file (``decnet-state.json``) remains the canonical
|
||||
artifact for every consumer that runs without the API daemon
|
||||
(``decnet status``, ``decnet teardown``, sniffer, collector).
|
||||
|
||||
State defaults to ``running`` to mirror what the dashboard already
|
||||
assumes about JSON-only fleet rows; the reconciler corrects drift
|
||||
by polling ``docker inspect``.
|
||||
"""
|
||||
try:
|
||||
from decnet.web.db.factory import get_repository
|
||||
from decnet.web.db.models import LOCAL_HOST_SENTINEL
|
||||
repo = get_repository()
|
||||
|
||||
async def _go() -> None:
|
||||
for d in config.deckies:
|
||||
await repo.upsert_fleet_decky({
|
||||
"host_uuid": d.host_uuid or LOCAL_HOST_SENTINEL,
|
||||
"name": d.name,
|
||||
"services": list(d.services),
|
||||
"decky_config": d.model_dump(mode="json"),
|
||||
"decky_ip": d.ip,
|
||||
"state": "running",
|
||||
})
|
||||
|
||||
_run_async(_go)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning("fleet DB mirror (deploy) failed (best-effort): %s", exc)
|
||||
|
||||
|
||||
def _mirror_fleet_teardown_to_db(deckies) -> None:
|
||||
"""Remove fleet rows from the DB. Best-effort, same rationale."""
|
||||
try:
|
||||
from decnet.web.db.factory import get_repository
|
||||
from decnet.web.db.models import LOCAL_HOST_SENTINEL
|
||||
repo = get_repository()
|
||||
|
||||
async def _go() -> None:
|
||||
for d in deckies:
|
||||
await repo.delete_fleet_decky(
|
||||
host_uuid=d.host_uuid or LOCAL_HOST_SENTINEL,
|
||||
name=d.name,
|
||||
)
|
||||
|
||||
_run_async(_go)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
log.warning("fleet DB mirror (teardown) failed (best-effort): %s", exc)
|
||||
|
||||
|
||||
@_traced("engine.deploy")
|
||||
def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False, parallel: bool = False) -> None:
|
||||
log.info("deployment started n_deckies=%d interface=%s subnet=%s dry_run=%s", len(config.deckies), config.interface, config.subnet, dry_run)
|
||||
@@ -476,6 +553,7 @@ def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False,
|
||||
return
|
||||
|
||||
save_state(config, compose_path)
|
||||
_mirror_fleet_deploy_to_db(config)
|
||||
|
||||
# Emit one creation event per decky so the correlation graph has a
|
||||
# well-formed lifecycle start (old_services=[] ⇒ new_services=<initial>).
|
||||
@@ -546,6 +624,7 @@ def teardown(decky_id: str | None = None) -> None:
|
||||
)
|
||||
_compose("stop", *svc_names, compose_file=compose_path)
|
||||
_compose("rm", "-f", *svc_names, compose_file=compose_path)
|
||||
_mirror_fleet_teardown_to_db([decky])
|
||||
else:
|
||||
for decky in config.deckies:
|
||||
_emit_lifecycle_event(
|
||||
@@ -564,6 +643,7 @@ def teardown(decky_id: str | None = None) -> None:
|
||||
teardown_host_macvlan(decky_range)
|
||||
remove_macvlan_network(client)
|
||||
clear_state()
|
||||
_mirror_fleet_teardown_to_db(config.deckies)
|
||||
|
||||
net_driver = "IPvlan" if config.ipvlan else "MACVLAN"
|
||||
log.info("teardown complete all deckies removed network_driver=%s", net_driver)
|
||||
|
||||
@@ -15,6 +15,19 @@ import pytest
|
||||
from decnet.config import DeckyConfig, DecnetConfig
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _stub_fleet_db_mirror(request):
|
||||
"""The DB-mirror helpers are exercised in :class:`TestMirrorFleetToDb`;
|
||||
every other test in this file mocks filesystem + docker but not the DB,
|
||||
so we no-op the mirrors elsewhere to keep the suite self-contained."""
|
||||
if "MirrorFleetToDb" in request.node.nodeid:
|
||||
yield
|
||||
return
|
||||
with patch("decnet.engine.deployer._mirror_fleet_deploy_to_db"), \
|
||||
patch("decnet.engine.deployer._mirror_fleet_teardown_to_db"):
|
||||
yield
|
||||
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def _decky(name: str = "decky-01", ip: str = "192.168.1.10",
|
||||
@@ -557,3 +570,95 @@ class TestPrintStatus:
|
||||
from decnet.engine.deployer import _print_status
|
||||
config = _config(deckies=[_decky(), _decky("decky-02", "192.168.1.11")])
|
||||
_print_status(config) # should not raise
|
||||
|
||||
|
||||
# ── DB mirror (engine ↔ fleet_deckies) ────────────────────────────────────────
|
||||
|
||||
class TestMirrorFleetToDb:
|
||||
"""The mirror helpers are best-effort: they replicate fleet state into
|
||||
the ``fleet_deckies`` table so DB-only consumers (orchestrator, web,
|
||||
REST API) see the same view as JSON consumers, but a DB failure must
|
||||
never abort a CLI deploy."""
|
||||
|
||||
def _make_repo(self):
|
||||
repo = MagicMock()
|
||||
|
||||
async def _upsert(data):
|
||||
self.upserts.append(data)
|
||||
async def _delete(*, host_uuid, name):
|
||||
self.deletes.append((host_uuid, name))
|
||||
|
||||
repo.upsert_fleet_decky = MagicMock(side_effect=_upsert)
|
||||
repo.delete_fleet_decky = MagicMock(side_effect=_delete)
|
||||
return repo
|
||||
|
||||
def setup_method(self) -> None:
|
||||
self.upserts: list[dict] = []
|
||||
self.deletes: list[tuple[str, str]] = []
|
||||
|
||||
@patch("decnet.web.db.factory.get_repository")
|
||||
def test_deploy_mirror_upserts_each_decky(self, mock_get_repo):
|
||||
from decnet.engine.deployer import _mirror_fleet_deploy_to_db
|
||||
mock_get_repo.return_value = self._make_repo()
|
||||
cfg = _config(deckies=[
|
||||
_decky(name="d1", ip="10.0.0.1", services=["ssh"]),
|
||||
_decky(name="d2", ip="10.0.0.2", services=["http", "ftp"]),
|
||||
])
|
||||
_mirror_fleet_deploy_to_db(cfg)
|
||||
assert len(self.upserts) == 2
|
||||
names = sorted(u["name"] for u in self.upserts)
|
||||
assert names == ["d1", "d2"]
|
||||
u1 = next(u for u in self.upserts if u["name"] == "d1")
|
||||
assert u1["host_uuid"] == "local"
|
||||
assert u1["services"] == ["ssh"]
|
||||
assert u1["state"] == "running"
|
||||
assert u1["decky_ip"] == "10.0.0.1"
|
||||
assert u1["decky_config"]["name"] == "d1"
|
||||
|
||||
@patch("decnet.web.db.factory.get_repository")
|
||||
def test_deploy_mirror_honors_explicit_host_uuid(self, mock_get_repo):
|
||||
from decnet.engine.deployer import _mirror_fleet_deploy_to_db
|
||||
mock_get_repo.return_value = self._make_repo()
|
||||
d = _decky()
|
||||
d.host_uuid = "remote-host-abc"
|
||||
_mirror_fleet_deploy_to_db(_config(deckies=[d]))
|
||||
assert self.upserts[0]["host_uuid"] == "remote-host-abc"
|
||||
|
||||
@patch("decnet.web.db.factory.get_repository")
|
||||
def test_deploy_mirror_swallows_db_failure(self, mock_get_repo):
|
||||
from decnet.engine.deployer import _mirror_fleet_deploy_to_db
|
||||
mock_get_repo.side_effect = RuntimeError("db down")
|
||||
_mirror_fleet_deploy_to_db(_config()) # must not raise
|
||||
|
||||
@patch("decnet.web.db.factory.get_repository")
|
||||
def test_teardown_mirror_deletes_each_decky(self, mock_get_repo):
|
||||
from decnet.engine.deployer import _mirror_fleet_teardown_to_db
|
||||
mock_get_repo.return_value = self._make_repo()
|
||||
deckies = [
|
||||
_decky(name="d1", ip="10.0.0.1"),
|
||||
_decky(name="d2", ip="10.0.0.2"),
|
||||
]
|
||||
_mirror_fleet_teardown_to_db(deckies)
|
||||
assert sorted(self.deletes) == [("local", "d1"), ("local", "d2")]
|
||||
|
||||
@patch("decnet.web.db.factory.get_repository")
|
||||
def test_teardown_mirror_swallows_db_failure(self, mock_get_repo):
|
||||
from decnet.engine.deployer import _mirror_fleet_teardown_to_db
|
||||
mock_get_repo.side_effect = RuntimeError("db down")
|
||||
_mirror_fleet_teardown_to_db([_decky()]) # must not raise
|
||||
|
||||
def test_run_async_works_with_running_loop(self):
|
||||
"""``_run_async`` must work even when the caller is already inside
|
||||
an asyncio loop (the API path calls deploy() from a FastAPI handler)."""
|
||||
import asyncio
|
||||
from decnet.engine.deployer import _run_async
|
||||
|
||||
result: list[int] = []
|
||||
|
||||
async def caller() -> None:
|
||||
async def work() -> None:
|
||||
result.append(42)
|
||||
_run_async(work)
|
||||
|
||||
asyncio.run(caller())
|
||||
assert result == [42]
|
||||
|
||||
Reference in New Issue
Block a user