feat(deployer): mirror fleet deploy/teardown into fleet_deckies table
CLI deploy now writes both surfaces: decnet-state.json (existing, canonical for offline / no-API hosts) and the new fleet_deckies DB table (visible to orchestrator, web dashboard, REST API). Best-effort: a DB outage logs a warning and returns. The JSON file remains the source of truth for `decnet status`, `decnet teardown`, sniffer, and collector — operators on a CLI-only host keep working. _run_async helper bridges sync deploy() into the async repository. Always uses a fresh thread because the API handler at web.router.fleet.api_deploy_deckies invokes deploy() from inside a FastAPI event loop, which would otherwise break asyncio.run. Verified end-to-end against MySQL: deploy mirror inserts rows, union view (list_running_deckies) returns them with source="fleet", teardown mirror removes them. Works from both sync (CLI) and async (API handler) call sites.
This commit is contained in:
@@ -429,6 +429,83 @@ def _emit_lifecycle_event(
|
|||||||
decky_name, trigger, exc)
|
decky_name, trigger, exc)
|
||||||
|
|
||||||
|
|
||||||
|
def _run_async(coro_factory) -> None:
|
||||||
|
"""Run an async coroutine from a sync context, even when an event loop
|
||||||
|
is already running on this thread.
|
||||||
|
|
||||||
|
``deploy()`` / ``teardown()`` are sync, but the API handler at
|
||||||
|
``web.router.fleet.api_deploy_deckies`` calls them from inside its own
|
||||||
|
event loop. ``asyncio.run`` refuses to run nested, so we always punt
|
||||||
|
to a fresh thread — small overhead, but deploy is already a heavy op.
|
||||||
|
"""
|
||||||
|
import threading
|
||||||
|
err: list[BaseException] = []
|
||||||
|
|
||||||
|
def _runner() -> None:
|
||||||
|
try:
|
||||||
|
asyncio.run(coro_factory())
|
||||||
|
except BaseException as exc: # noqa: BLE001
|
||||||
|
err.append(exc)
|
||||||
|
|
||||||
|
t = threading.Thread(target=_runner, daemon=False)
|
||||||
|
t.start()
|
||||||
|
t.join()
|
||||||
|
if err:
|
||||||
|
raise err[0]
|
||||||
|
|
||||||
|
|
||||||
|
def _mirror_fleet_deploy_to_db(config: DecnetConfig) -> None:
|
||||||
|
"""Mirror fleet rows into the ``fleet_deckies`` DB table.
|
||||||
|
|
||||||
|
Best-effort: a DB outage on a CLI-only host must not abort deploy.
|
||||||
|
The JSON state file (``decnet-state.json``) remains the canonical
|
||||||
|
artifact for every consumer that runs without the API daemon
|
||||||
|
(``decnet status``, ``decnet teardown``, sniffer, collector).
|
||||||
|
|
||||||
|
State defaults to ``running`` to mirror what the dashboard already
|
||||||
|
assumes about JSON-only fleet rows; the reconciler corrects drift
|
||||||
|
by polling ``docker inspect``.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
from decnet.web.db.factory import get_repository
|
||||||
|
from decnet.web.db.models import LOCAL_HOST_SENTINEL
|
||||||
|
repo = get_repository()
|
||||||
|
|
||||||
|
async def _go() -> None:
|
||||||
|
for d in config.deckies:
|
||||||
|
await repo.upsert_fleet_decky({
|
||||||
|
"host_uuid": d.host_uuid or LOCAL_HOST_SENTINEL,
|
||||||
|
"name": d.name,
|
||||||
|
"services": list(d.services),
|
||||||
|
"decky_config": d.model_dump(mode="json"),
|
||||||
|
"decky_ip": d.ip,
|
||||||
|
"state": "running",
|
||||||
|
})
|
||||||
|
|
||||||
|
_run_async(_go)
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
log.warning("fleet DB mirror (deploy) failed (best-effort): %s", exc)
|
||||||
|
|
||||||
|
|
||||||
|
def _mirror_fleet_teardown_to_db(deckies) -> None:
|
||||||
|
"""Remove fleet rows from the DB. Best-effort, same rationale."""
|
||||||
|
try:
|
||||||
|
from decnet.web.db.factory import get_repository
|
||||||
|
from decnet.web.db.models import LOCAL_HOST_SENTINEL
|
||||||
|
repo = get_repository()
|
||||||
|
|
||||||
|
async def _go() -> None:
|
||||||
|
for d in deckies:
|
||||||
|
await repo.delete_fleet_decky(
|
||||||
|
host_uuid=d.host_uuid or LOCAL_HOST_SENTINEL,
|
||||||
|
name=d.name,
|
||||||
|
)
|
||||||
|
|
||||||
|
_run_async(_go)
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
log.warning("fleet DB mirror (teardown) failed (best-effort): %s", exc)
|
||||||
|
|
||||||
|
|
||||||
@_traced("engine.deploy")
|
@_traced("engine.deploy")
|
||||||
def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False, parallel: bool = False) -> None:
|
def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False, parallel: bool = False) -> None:
|
||||||
log.info("deployment started n_deckies=%d interface=%s subnet=%s dry_run=%s", len(config.deckies), config.interface, config.subnet, dry_run)
|
log.info("deployment started n_deckies=%d interface=%s subnet=%s dry_run=%s", len(config.deckies), config.interface, config.subnet, dry_run)
|
||||||
@@ -476,6 +553,7 @@ def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False,
|
|||||||
return
|
return
|
||||||
|
|
||||||
save_state(config, compose_path)
|
save_state(config, compose_path)
|
||||||
|
_mirror_fleet_deploy_to_db(config)
|
||||||
|
|
||||||
# Emit one creation event per decky so the correlation graph has a
|
# Emit one creation event per decky so the correlation graph has a
|
||||||
# well-formed lifecycle start (old_services=[] ⇒ new_services=<initial>).
|
# well-formed lifecycle start (old_services=[] ⇒ new_services=<initial>).
|
||||||
@@ -546,6 +624,7 @@ def teardown(decky_id: str | None = None) -> None:
|
|||||||
)
|
)
|
||||||
_compose("stop", *svc_names, compose_file=compose_path)
|
_compose("stop", *svc_names, compose_file=compose_path)
|
||||||
_compose("rm", "-f", *svc_names, compose_file=compose_path)
|
_compose("rm", "-f", *svc_names, compose_file=compose_path)
|
||||||
|
_mirror_fleet_teardown_to_db([decky])
|
||||||
else:
|
else:
|
||||||
for decky in config.deckies:
|
for decky in config.deckies:
|
||||||
_emit_lifecycle_event(
|
_emit_lifecycle_event(
|
||||||
@@ -564,6 +643,7 @@ def teardown(decky_id: str | None = None) -> None:
|
|||||||
teardown_host_macvlan(decky_range)
|
teardown_host_macvlan(decky_range)
|
||||||
remove_macvlan_network(client)
|
remove_macvlan_network(client)
|
||||||
clear_state()
|
clear_state()
|
||||||
|
_mirror_fleet_teardown_to_db(config.deckies)
|
||||||
|
|
||||||
net_driver = "IPvlan" if config.ipvlan else "MACVLAN"
|
net_driver = "IPvlan" if config.ipvlan else "MACVLAN"
|
||||||
log.info("teardown complete all deckies removed network_driver=%s", net_driver)
|
log.info("teardown complete all deckies removed network_driver=%s", net_driver)
|
||||||
|
|||||||
@@ -15,6 +15,19 @@ import pytest
|
|||||||
from decnet.config import DeckyConfig, DecnetConfig
|
from decnet.config import DeckyConfig, DecnetConfig
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _stub_fleet_db_mirror(request):
|
||||||
|
"""The DB-mirror helpers are exercised in :class:`TestMirrorFleetToDb`;
|
||||||
|
every other test in this file mocks filesystem + docker but not the DB,
|
||||||
|
so we no-op the mirrors elsewhere to keep the suite self-contained."""
|
||||||
|
if "MirrorFleetToDb" in request.node.nodeid:
|
||||||
|
yield
|
||||||
|
return
|
||||||
|
with patch("decnet.engine.deployer._mirror_fleet_deploy_to_db"), \
|
||||||
|
patch("decnet.engine.deployer._mirror_fleet_teardown_to_db"):
|
||||||
|
yield
|
||||||
|
|
||||||
|
|
||||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
def _decky(name: str = "decky-01", ip: str = "192.168.1.10",
|
def _decky(name: str = "decky-01", ip: str = "192.168.1.10",
|
||||||
@@ -557,3 +570,95 @@ class TestPrintStatus:
|
|||||||
from decnet.engine.deployer import _print_status
|
from decnet.engine.deployer import _print_status
|
||||||
config = _config(deckies=[_decky(), _decky("decky-02", "192.168.1.11")])
|
config = _config(deckies=[_decky(), _decky("decky-02", "192.168.1.11")])
|
||||||
_print_status(config) # should not raise
|
_print_status(config) # should not raise
|
||||||
|
|
||||||
|
|
||||||
|
# ── DB mirror (engine ↔ fleet_deckies) ────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestMirrorFleetToDb:
|
||||||
|
"""The mirror helpers are best-effort: they replicate fleet state into
|
||||||
|
the ``fleet_deckies`` table so DB-only consumers (orchestrator, web,
|
||||||
|
REST API) see the same view as JSON consumers, but a DB failure must
|
||||||
|
never abort a CLI deploy."""
|
||||||
|
|
||||||
|
def _make_repo(self):
|
||||||
|
repo = MagicMock()
|
||||||
|
|
||||||
|
async def _upsert(data):
|
||||||
|
self.upserts.append(data)
|
||||||
|
async def _delete(*, host_uuid, name):
|
||||||
|
self.deletes.append((host_uuid, name))
|
||||||
|
|
||||||
|
repo.upsert_fleet_decky = MagicMock(side_effect=_upsert)
|
||||||
|
repo.delete_fleet_decky = MagicMock(side_effect=_delete)
|
||||||
|
return repo
|
||||||
|
|
||||||
|
def setup_method(self) -> None:
|
||||||
|
self.upserts: list[dict] = []
|
||||||
|
self.deletes: list[tuple[str, str]] = []
|
||||||
|
|
||||||
|
@patch("decnet.web.db.factory.get_repository")
|
||||||
|
def test_deploy_mirror_upserts_each_decky(self, mock_get_repo):
|
||||||
|
from decnet.engine.deployer import _mirror_fleet_deploy_to_db
|
||||||
|
mock_get_repo.return_value = self._make_repo()
|
||||||
|
cfg = _config(deckies=[
|
||||||
|
_decky(name="d1", ip="10.0.0.1", services=["ssh"]),
|
||||||
|
_decky(name="d2", ip="10.0.0.2", services=["http", "ftp"]),
|
||||||
|
])
|
||||||
|
_mirror_fleet_deploy_to_db(cfg)
|
||||||
|
assert len(self.upserts) == 2
|
||||||
|
names = sorted(u["name"] for u in self.upserts)
|
||||||
|
assert names == ["d1", "d2"]
|
||||||
|
u1 = next(u for u in self.upserts if u["name"] == "d1")
|
||||||
|
assert u1["host_uuid"] == "local"
|
||||||
|
assert u1["services"] == ["ssh"]
|
||||||
|
assert u1["state"] == "running"
|
||||||
|
assert u1["decky_ip"] == "10.0.0.1"
|
||||||
|
assert u1["decky_config"]["name"] == "d1"
|
||||||
|
|
||||||
|
@patch("decnet.web.db.factory.get_repository")
|
||||||
|
def test_deploy_mirror_honors_explicit_host_uuid(self, mock_get_repo):
|
||||||
|
from decnet.engine.deployer import _mirror_fleet_deploy_to_db
|
||||||
|
mock_get_repo.return_value = self._make_repo()
|
||||||
|
d = _decky()
|
||||||
|
d.host_uuid = "remote-host-abc"
|
||||||
|
_mirror_fleet_deploy_to_db(_config(deckies=[d]))
|
||||||
|
assert self.upserts[0]["host_uuid"] == "remote-host-abc"
|
||||||
|
|
||||||
|
@patch("decnet.web.db.factory.get_repository")
|
||||||
|
def test_deploy_mirror_swallows_db_failure(self, mock_get_repo):
|
||||||
|
from decnet.engine.deployer import _mirror_fleet_deploy_to_db
|
||||||
|
mock_get_repo.side_effect = RuntimeError("db down")
|
||||||
|
_mirror_fleet_deploy_to_db(_config()) # must not raise
|
||||||
|
|
||||||
|
@patch("decnet.web.db.factory.get_repository")
|
||||||
|
def test_teardown_mirror_deletes_each_decky(self, mock_get_repo):
|
||||||
|
from decnet.engine.deployer import _mirror_fleet_teardown_to_db
|
||||||
|
mock_get_repo.return_value = self._make_repo()
|
||||||
|
deckies = [
|
||||||
|
_decky(name="d1", ip="10.0.0.1"),
|
||||||
|
_decky(name="d2", ip="10.0.0.2"),
|
||||||
|
]
|
||||||
|
_mirror_fleet_teardown_to_db(deckies)
|
||||||
|
assert sorted(self.deletes) == [("local", "d1"), ("local", "d2")]
|
||||||
|
|
||||||
|
@patch("decnet.web.db.factory.get_repository")
|
||||||
|
def test_teardown_mirror_swallows_db_failure(self, mock_get_repo):
|
||||||
|
from decnet.engine.deployer import _mirror_fleet_teardown_to_db
|
||||||
|
mock_get_repo.side_effect = RuntimeError("db down")
|
||||||
|
_mirror_fleet_teardown_to_db([_decky()]) # must not raise
|
||||||
|
|
||||||
|
def test_run_async_works_with_running_loop(self):
|
||||||
|
"""``_run_async`` must work even when the caller is already inside
|
||||||
|
an asyncio loop (the API path calls deploy() from a FastAPI handler)."""
|
||||||
|
import asyncio
|
||||||
|
from decnet.engine.deployer import _run_async
|
||||||
|
|
||||||
|
result: list[int] = []
|
||||||
|
|
||||||
|
async def caller() -> None:
|
||||||
|
async def work() -> None:
|
||||||
|
result.append(42)
|
||||||
|
_run_async(work)
|
||||||
|
|
||||||
|
asyncio.run(caller())
|
||||||
|
assert result == [42]
|
||||||
|
|||||||
Reference in New Issue
Block a user