feat(deployer): mirror fleet deploy/teardown into fleet_deckies table

CLI deploy now writes both surfaces: decnet-state.json (existing,
canonical for offline / no-API hosts) and the new fleet_deckies DB
table (visible to orchestrator, web dashboard, REST API).

Best-effort: a DB outage logs a warning and returns. The JSON file
remains the source of truth for `decnet status`, `decnet teardown`,
sniffer, and collector — operators on a CLI-only host keep working.

_run_async helper bridges sync deploy() into the async repository.
Always uses a fresh thread because the API handler at
web.router.fleet.api_deploy_deckies invokes deploy() from inside a
FastAPI event loop, which would otherwise break asyncio.run.

Verified end-to-end against MySQL: deploy mirror inserts rows, union
view (list_running_deckies) returns them with source="fleet",
teardown mirror removes them. Works from both sync (CLI) and async
(API handler) call sites.
This commit is contained in:
2026-04-26 21:05:50 -04:00
parent 095500ae9a
commit 646aeeca40
2 changed files with 185 additions and 0 deletions

View File

@@ -429,6 +429,83 @@ def _emit_lifecycle_event(
decky_name, trigger, exc)
def _run_async(coro_factory) -> None:
"""Run an async coroutine from a sync context, even when an event loop
is already running on this thread.
``deploy()`` / ``teardown()`` are sync, but the API handler at
``web.router.fleet.api_deploy_deckies`` calls them from inside its own
event loop. ``asyncio.run`` refuses to run nested, so we always punt
to a fresh thread — small overhead, but deploy is already a heavy op.
"""
import threading
err: list[BaseException] = []
def _runner() -> None:
try:
asyncio.run(coro_factory())
except BaseException as exc: # noqa: BLE001
err.append(exc)
t = threading.Thread(target=_runner, daemon=False)
t.start()
t.join()
if err:
raise err[0]
def _mirror_fleet_deploy_to_db(config: DecnetConfig) -> None:
"""Mirror fleet rows into the ``fleet_deckies`` DB table.
Best-effort: a DB outage on a CLI-only host must not abort deploy.
The JSON state file (``decnet-state.json``) remains the canonical
artifact for every consumer that runs without the API daemon
(``decnet status``, ``decnet teardown``, sniffer, collector).
State defaults to ``running`` to mirror what the dashboard already
assumes about JSON-only fleet rows; the reconciler corrects drift
by polling ``docker inspect``.
"""
try:
from decnet.web.db.factory import get_repository
from decnet.web.db.models import LOCAL_HOST_SENTINEL
repo = get_repository()
async def _go() -> None:
for d in config.deckies:
await repo.upsert_fleet_decky({
"host_uuid": d.host_uuid or LOCAL_HOST_SENTINEL,
"name": d.name,
"services": list(d.services),
"decky_config": d.model_dump(mode="json"),
"decky_ip": d.ip,
"state": "running",
})
_run_async(_go)
except Exception as exc: # noqa: BLE001
log.warning("fleet DB mirror (deploy) failed (best-effort): %s", exc)
def _mirror_fleet_teardown_to_db(deckies) -> None:
"""Remove fleet rows from the DB. Best-effort, same rationale."""
try:
from decnet.web.db.factory import get_repository
from decnet.web.db.models import LOCAL_HOST_SENTINEL
repo = get_repository()
async def _go() -> None:
for d in deckies:
await repo.delete_fleet_decky(
host_uuid=d.host_uuid or LOCAL_HOST_SENTINEL,
name=d.name,
)
_run_async(_go)
except Exception as exc: # noqa: BLE001
log.warning("fleet DB mirror (teardown) failed (best-effort): %s", exc)
@_traced("engine.deploy")
def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False, parallel: bool = False) -> None:
log.info("deployment started n_deckies=%d interface=%s subnet=%s dry_run=%s", len(config.deckies), config.interface, config.subnet, dry_run)
@@ -476,6 +553,7 @@ def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False,
return
save_state(config, compose_path)
_mirror_fleet_deploy_to_db(config)
# Emit one creation event per decky so the correlation graph has a
# well-formed lifecycle start (old_services=[] ⇒ new_services=<initial>).
@@ -546,6 +624,7 @@ def teardown(decky_id: str | None = None) -> None:
)
_compose("stop", *svc_names, compose_file=compose_path)
_compose("rm", "-f", *svc_names, compose_file=compose_path)
_mirror_fleet_teardown_to_db([decky])
else:
for decky in config.deckies:
_emit_lifecycle_event(
@@ -564,6 +643,7 @@ def teardown(decky_id: str | None = None) -> None:
teardown_host_macvlan(decky_range)
remove_macvlan_network(client)
clear_state()
_mirror_fleet_teardown_to_db(config.deckies)
net_driver = "IPvlan" if config.ipvlan else "MACVLAN"
log.info("teardown complete all deckies removed network_driver=%s", net_driver)

View File

@@ -15,6 +15,19 @@ import pytest
from decnet.config import DeckyConfig, DecnetConfig
@pytest.fixture(autouse=True)
def _stub_fleet_db_mirror(request):
"""The DB-mirror helpers are exercised in :class:`TestMirrorFleetToDb`;
every other test in this file mocks filesystem + docker but not the DB,
so we no-op the mirrors elsewhere to keep the suite self-contained."""
if "MirrorFleetToDb" in request.node.nodeid:
yield
return
with patch("decnet.engine.deployer._mirror_fleet_deploy_to_db"), \
patch("decnet.engine.deployer._mirror_fleet_teardown_to_db"):
yield
# ── Helpers ───────────────────────────────────────────────────────────────────
def _decky(name: str = "decky-01", ip: str = "192.168.1.10",
@@ -557,3 +570,95 @@ class TestPrintStatus:
from decnet.engine.deployer import _print_status
config = _config(deckies=[_decky(), _decky("decky-02", "192.168.1.11")])
_print_status(config) # should not raise
# ── DB mirror (engine ↔ fleet_deckies) ────────────────────────────────────────
class TestMirrorFleetToDb:
"""The mirror helpers are best-effort: they replicate fleet state into
the ``fleet_deckies`` table so DB-only consumers (orchestrator, web,
REST API) see the same view as JSON consumers, but a DB failure must
never abort a CLI deploy."""
def _make_repo(self):
repo = MagicMock()
async def _upsert(data):
self.upserts.append(data)
async def _delete(*, host_uuid, name):
self.deletes.append((host_uuid, name))
repo.upsert_fleet_decky = MagicMock(side_effect=_upsert)
repo.delete_fleet_decky = MagicMock(side_effect=_delete)
return repo
def setup_method(self) -> None:
self.upserts: list[dict] = []
self.deletes: list[tuple[str, str]] = []
@patch("decnet.web.db.factory.get_repository")
def test_deploy_mirror_upserts_each_decky(self, mock_get_repo):
from decnet.engine.deployer import _mirror_fleet_deploy_to_db
mock_get_repo.return_value = self._make_repo()
cfg = _config(deckies=[
_decky(name="d1", ip="10.0.0.1", services=["ssh"]),
_decky(name="d2", ip="10.0.0.2", services=["http", "ftp"]),
])
_mirror_fleet_deploy_to_db(cfg)
assert len(self.upserts) == 2
names = sorted(u["name"] for u in self.upserts)
assert names == ["d1", "d2"]
u1 = next(u for u in self.upserts if u["name"] == "d1")
assert u1["host_uuid"] == "local"
assert u1["services"] == ["ssh"]
assert u1["state"] == "running"
assert u1["decky_ip"] == "10.0.0.1"
assert u1["decky_config"]["name"] == "d1"
@patch("decnet.web.db.factory.get_repository")
def test_deploy_mirror_honors_explicit_host_uuid(self, mock_get_repo):
from decnet.engine.deployer import _mirror_fleet_deploy_to_db
mock_get_repo.return_value = self._make_repo()
d = _decky()
d.host_uuid = "remote-host-abc"
_mirror_fleet_deploy_to_db(_config(deckies=[d]))
assert self.upserts[0]["host_uuid"] == "remote-host-abc"
@patch("decnet.web.db.factory.get_repository")
def test_deploy_mirror_swallows_db_failure(self, mock_get_repo):
from decnet.engine.deployer import _mirror_fleet_deploy_to_db
mock_get_repo.side_effect = RuntimeError("db down")
_mirror_fleet_deploy_to_db(_config()) # must not raise
@patch("decnet.web.db.factory.get_repository")
def test_teardown_mirror_deletes_each_decky(self, mock_get_repo):
from decnet.engine.deployer import _mirror_fleet_teardown_to_db
mock_get_repo.return_value = self._make_repo()
deckies = [
_decky(name="d1", ip="10.0.0.1"),
_decky(name="d2", ip="10.0.0.2"),
]
_mirror_fleet_teardown_to_db(deckies)
assert sorted(self.deletes) == [("local", "d1"), ("local", "d2")]
@patch("decnet.web.db.factory.get_repository")
def test_teardown_mirror_swallows_db_failure(self, mock_get_repo):
from decnet.engine.deployer import _mirror_fleet_teardown_to_db
mock_get_repo.side_effect = RuntimeError("db down")
_mirror_fleet_teardown_to_db([_decky()]) # must not raise
def test_run_async_works_with_running_loop(self):
"""``_run_async`` must work even when the caller is already inside
an asyncio loop (the API path calls deploy() from a FastAPI handler)."""
import asyncio
from decnet.engine.deployer import _run_async
result: list[int] = []
async def caller() -> None:
async def work() -> None:
result.append(42)
_run_async(work)
asyncio.run(caller())
assert result == [42]