feat(deployer): mirror fleet deploy/teardown into fleet_deckies table

CLI deploy now writes both surfaces: decnet-state.json (existing,
canonical for offline / no-API hosts) and the new fleet_deckies DB
table (visible to orchestrator, web dashboard, REST API).

Best-effort: a DB outage logs a warning and returns. The JSON file
remains the source of truth for `decnet status`, `decnet teardown`,
sniffer, and collector — operators on a CLI-only host keep working.

_run_async helper bridges sync deploy() into the async repository.
Always uses a fresh thread because the API handler at
web.router.fleet.api_deploy_deckies invokes deploy() from inside a
FastAPI event loop, which would otherwise break asyncio.run.

Verified end-to-end against MySQL: deploy mirror inserts rows, union
view (list_running_deckies) returns them with source="fleet",
teardown mirror removes them. Works from both sync (CLI) and async
(API handler) call sites.
This commit is contained in:
2026-04-26 21:05:50 -04:00
parent 095500ae9a
commit 646aeeca40
2 changed files with 185 additions and 0 deletions

View File

@@ -429,6 +429,83 @@ def _emit_lifecycle_event(
decky_name, trigger, exc)
def _run_async(coro_factory) -> None:
"""Run an async coroutine from a sync context, even when an event loop
is already running on this thread.
``deploy()`` / ``teardown()`` are sync, but the API handler at
``web.router.fleet.api_deploy_deckies`` calls them from inside its own
event loop. ``asyncio.run`` refuses to run nested, so we always punt
to a fresh thread — small overhead, but deploy is already a heavy op.
"""
import threading
err: list[BaseException] = []
def _runner() -> None:
try:
asyncio.run(coro_factory())
except BaseException as exc: # noqa: BLE001
err.append(exc)
t = threading.Thread(target=_runner, daemon=False)
t.start()
t.join()
if err:
raise err[0]
def _mirror_fleet_deploy_to_db(config: DecnetConfig) -> None:
"""Mirror fleet rows into the ``fleet_deckies`` DB table.
Best-effort: a DB outage on a CLI-only host must not abort deploy.
The JSON state file (``decnet-state.json``) remains the canonical
artifact for every consumer that runs without the API daemon
(``decnet status``, ``decnet teardown``, sniffer, collector).
State defaults to ``running`` to mirror what the dashboard already
assumes about JSON-only fleet rows; the reconciler corrects drift
by polling ``docker inspect``.
"""
try:
from decnet.web.db.factory import get_repository
from decnet.web.db.models import LOCAL_HOST_SENTINEL
repo = get_repository()
async def _go() -> None:
for d in config.deckies:
await repo.upsert_fleet_decky({
"host_uuid": d.host_uuid or LOCAL_HOST_SENTINEL,
"name": d.name,
"services": list(d.services),
"decky_config": d.model_dump(mode="json"),
"decky_ip": d.ip,
"state": "running",
})
_run_async(_go)
except Exception as exc: # noqa: BLE001
log.warning("fleet DB mirror (deploy) failed (best-effort): %s", exc)
def _mirror_fleet_teardown_to_db(deckies) -> None:
"""Remove fleet rows from the DB. Best-effort, same rationale."""
try:
from decnet.web.db.factory import get_repository
from decnet.web.db.models import LOCAL_HOST_SENTINEL
repo = get_repository()
async def _go() -> None:
for d in deckies:
await repo.delete_fleet_decky(
host_uuid=d.host_uuid or LOCAL_HOST_SENTINEL,
name=d.name,
)
_run_async(_go)
except Exception as exc: # noqa: BLE001
log.warning("fleet DB mirror (teardown) failed (best-effort): %s", exc)
@_traced("engine.deploy")
def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False, parallel: bool = False) -> None:
log.info("deployment started n_deckies=%d interface=%s subnet=%s dry_run=%s", len(config.deckies), config.interface, config.subnet, dry_run)
@@ -476,6 +553,7 @@ def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False,
return
save_state(config, compose_path)
_mirror_fleet_deploy_to_db(config)
# Emit one creation event per decky so the correlation graph has a
# well-formed lifecycle start (old_services=[] ⇒ new_services=<initial>).
@@ -546,6 +624,7 @@ def teardown(decky_id: str | None = None) -> None:
)
_compose("stop", *svc_names, compose_file=compose_path)
_compose("rm", "-f", *svc_names, compose_file=compose_path)
_mirror_fleet_teardown_to_db([decky])
else:
for decky in config.deckies:
_emit_lifecycle_event(
@@ -564,6 +643,7 @@ def teardown(decky_id: str | None = None) -> None:
teardown_host_macvlan(decky_range)
remove_macvlan_network(client)
clear_state()
_mirror_fleet_teardown_to_db(config.deckies)
net_driver = "IPvlan" if config.ipvlan else "MACVLAN"
log.info("teardown complete all deckies removed network_driver=%s", net_driver)