From eefab020d4e8fd857cdcb3c154f37b48a9704999 Mon Sep 17 00:00:00 2001 From: anti Date: Wed, 29 Apr 2026 12:51:16 -0400 Subject: [PATCH] fix(swarm): propagate service mutations to worker agent via shard re-dispatch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add/remove/update_config on a fleet decky living on a swarm worker — and on an agent-pinned topology — used to run the master's local docker-compose only, which has no containers for the remote decky. The mutation persisted on master and silently no-op'd on the worker. - Fleet swarm: lookup DeckyShard.host_uuid; if found, rebuild a single-host shard from master state and call dispatch_decnet_config — same proven path as POST /swarm/deploy. Skip local _compose (no containers to touch). - Topology agent-pinned: call decnet.engine.deployer.resync_agent_topology (existing helper) to push the latest hydrated blob to the worker. - Local-only deckies: behaviour unchanged. - Tests: 5 new in tests/engine/test_services_live_swarm.py covering all three mutations on a swarm fleet decky (no local _compose, dispatch fires with the right host's deckies), plus apply=False save-only path (no dispatch), plus regression that local-only fleet add still runs local compose. Bus signal `decky.{name}.service_config_changed` keeps publishing as an audit trail; it is not the propagation trigger. --- decnet/engine/services_live.py | 189 +++++++++++---- tests/engine/test_services_live_swarm.py | 284 +++++++++++++++++++++++ 2 files changed, 428 insertions(+), 45 deletions(-) create mode 100644 tests/engine/test_services_live_swarm.py diff --git a/decnet/engine/services_live.py b/decnet/engine/services_live.py index ffa05968..936c7c63 100644 --- a/decnet/engine/services_live.py +++ b/decnet/engine/services_live.py @@ -82,6 +82,66 @@ def _get_bus(): from decnet.bus.factory import get_bus return get_bus() + +# --------------------------- swarm propagation helpers --------------------------- +# +# Service mutations (add/remove/update_config) on a deployed decky used to run +# the master's local docker-compose only. For swarm fleet deckies the master +# has no containers; for agent-targeted topologies the master only writes a +# compose file the worker never sees. These helpers replay the change to the +# worker so the env actually lands. +# +# Lazy imports keep this module's import graph clean (composer/swarm pull in +# decnet.network → docker, mirroring the pattern used elsewhere in this file). + + +async def _fleet_decky_host_uuid(repo: BaseRepository, decky_name: str) -> Optional[str]: + """Return ``host_uuid`` if a fleet decky lives on a swarm worker, else None.""" + shards = await repo.list_decky_shards() + for s in shards: + if s.get("decky_name") == decky_name: + return s.get("host_uuid") + return None + + +async def _redispatch_fleet_shard(repo: BaseRepository, host_uuid: str) -> None: + """Re-push the host's full shard to its worker agent. + + Uses the same code path as POST /swarm/deploy: load master state, filter + to the host's deckies, hand to AgentClient.deploy via dispatch_decnet_config. + The agent regenerates compose and recreates only the changed containers. + Idempotent for unchanged deckies. + """ + from decnet.web.router.swarm.api_deploy_swarm import dispatch_decnet_config + + state = _load_state() + if state is None: + log.warning("redispatch_fleet_shard: no fleet state on master; skipping") + return + config, _compose_path = state + host_deckies = [d for d in config.deckies if getattr(d, "host_uuid", None) == host_uuid] + if not host_deckies: + log.warning( + "redispatch_fleet_shard: master state has no deckies for host=%s; skipping", + host_uuid, + ) + return + filtered = config.model_copy(update={"deckies": host_deckies}) + await dispatch_decnet_config(filtered, repo) + + +async def _resync_agent_topology(repo: BaseRepository, topology_id: str) -> None: + """If the topology is agent-pinned, push the latest hydrated blob to the worker.""" + from decnet.engine.deployer import resync_agent_topology + + hydrated = await hydrate(repo, topology_id) + if hydrated is None: + return + if not hydrated.get("topology", {}).get("target_host_uuid"): + return # unihost topology — local compose is authoritative + await resync_agent_topology(repo, topology_id) + + log = get_logger("engine.services_live") DeckyKind = Literal["fleet", "topology"] @@ -188,18 +248,30 @@ async def _add_topology_service( await repo.update_topology_decky(decky["uuid"], update) compose_path = await _rerender_topology_compose(repo, topology_id) - target = f"{decky_name}-{service_name}" - # Run compose in a worker thread so the API event loop stays - # responsive — same pattern as engine/deployer.deploy_topology. - await anyio.to_thread.run_sync( - lambda: _compose( - "up", "-d", "--no-deps", "--build", target, - compose_file=compose_path, - ), - ) + if await _topology_is_agent_pinned(repo, topology_id): + # Agent-pinned: the master's local compose has nothing to up. + # Push the new hydrated blob to the worker. + await _resync_agent_topology(repo, topology_id) + else: + target = f"{decky_name}-{service_name}" + # Run compose in a worker thread so the API event loop stays + # responsive — same pattern as engine/deployer.deploy_topology. + await anyio.to_thread.run_sync( + lambda: _compose( + "up", "-d", "--no-deps", "--build", target, + compose_file=compose_path, + ), + ) return services +async def _topology_is_agent_pinned(repo: BaseRepository, topology_id: str) -> bool: + hydrated = await hydrate(repo, topology_id) + if hydrated is None: + return False + return bool(hydrated.get("topology", {}).get("target_host_uuid")) + + async def _remove_topology_service( repo: BaseRepository, topology_id: str, @@ -215,17 +287,23 @@ async def _remove_topology_service( services = [s for s in services if s != service_name] target = f"{decky_name}-{service_name}" compose_path = _topology_compose_path(topology_id) - # Stop + rm before persisting + re-rendering so a half-completed - # mutation leaves the operator a clear state to retry from - # (container still running; DB still says service is on). - await anyio.to_thread.run_sync( - lambda: _compose("stop", target, compose_file=compose_path), - ) - await anyio.to_thread.run_sync( - lambda: _compose("rm", "-f", target, compose_file=compose_path), - ) + agent_pinned = await _topology_is_agent_pinned(repo, topology_id) + if not agent_pinned: + # Stop + rm before persisting + re-rendering so a half-completed + # mutation leaves the operator a clear state to retry from + # (container still running; DB still says service is on). + await anyio.to_thread.run_sync( + lambda: _compose("stop", target, compose_file=compose_path), + ) + await anyio.to_thread.run_sync( + lambda: _compose("rm", "-f", target, compose_file=compose_path), + ) await repo.update_topology_decky(decky["uuid"], {"services": services}) await _rerender_topology_compose(repo, topology_id) + if agent_pinned: + # Worker tears down the removed service when it diffs the + # incoming hydrated blob against its current state. + await _resync_agent_topology(repo, topology_id) return services @@ -291,13 +369,19 @@ async def _add_fleet_service( sc[service_name] = initial_config decky.service_config = sc await _persist_fleet_change(repo, decky, services, compose_path) - target = f"{decky_name}-{service_name}" - await anyio.to_thread.run_sync( - lambda: _compose( - "up", "-d", "--no-deps", "--build", target, - compose_file=compose_path, - ), - ) + swarm_host_uuid = await _fleet_decky_host_uuid(repo, decky_name) + if swarm_host_uuid: + # Master has no container for this decky — re-push the host's + # shard so the worker materialises the new service. + await _redispatch_fleet_shard(repo, swarm_host_uuid) + else: + target = f"{decky_name}-{service_name}" + await anyio.to_thread.run_sync( + lambda: _compose( + "up", "-d", "--no-deps", "--build", target, + compose_file=compose_path, + ), + ) return services @@ -313,13 +397,21 @@ async def _remove_fleet_service( ) services = [s for s in services if s != service_name] target = f"{decky_name}-{service_name}" - await anyio.to_thread.run_sync( - lambda: _compose("stop", target, compose_file=compose_path), - ) - await anyio.to_thread.run_sync( - lambda: _compose("rm", "-f", target, compose_file=compose_path), - ) + swarm_host_uuid = await _fleet_decky_host_uuid(repo, decky_name) + if not swarm_host_uuid: + # Local: stop+rm before persist so the operator has a clear retry + # state if compose fails halfway. Swarm: skip — the worker's compose + # will handle the removal when the redispatched config drops the + # service from the decky. + await anyio.to_thread.run_sync( + lambda: _compose("stop", target, compose_file=compose_path), + ) + await anyio.to_thread.run_sync( + lambda: _compose("rm", "-f", target, compose_file=compose_path), + ) await _persist_fleet_change(repo, decky, services, compose_path) + if swarm_host_uuid: + await _redispatch_fleet_shard(repo, swarm_host_uuid) return services @@ -464,13 +556,16 @@ async def _update_topology_service_config( await repo.update_topology_decky(decky["uuid"], {"decky_config": cfg_blob}) compose_path = await _rerender_topology_compose(repo, topology_id) if apply: - target = f"{decky_name}-{service_name}" - await anyio.to_thread.run_sync( - lambda: _compose( - "up", "-d", "--no-deps", "--force-recreate", "--build", target, - compose_file=compose_path, - ), - ) + if await _topology_is_agent_pinned(repo, topology_id): + await _resync_agent_topology(repo, topology_id) + else: + target = f"{decky_name}-{service_name}" + await anyio.to_thread.run_sync( + lambda: _compose( + "up", "-d", "--no-deps", "--force-recreate", "--build", target, + compose_file=compose_path, + ), + ) async def _update_fleet_service_config( @@ -502,13 +597,17 @@ async def _update_fleet_service_config( "state": "running", }) if apply: - target = f"{decky_name}-{service_name}" - await anyio.to_thread.run_sync( - lambda: _compose( - "up", "-d", "--no-deps", "--force-recreate", "--build", target, - compose_file=compose_path, - ), - ) + swarm_host_uuid = await _fleet_decky_host_uuid(repo, decky_name) + if swarm_host_uuid: + await _redispatch_fleet_shard(repo, swarm_host_uuid) + else: + target = f"{decky_name}-{service_name}" + await anyio.to_thread.run_sync( + lambda: _compose( + "up", "-d", "--no-deps", "--force-recreate", "--build", target, + compose_file=compose_path, + ), + ) async def remove_service( diff --git a/tests/engine/test_services_live_swarm.py b/tests/engine/test_services_live_swarm.py new file mode 100644 index 00000000..9c4f246c --- /dev/null +++ b/tests/engine/test_services_live_swarm.py @@ -0,0 +1,284 @@ +"""Swarm propagation coverage for services_live. + +Three mutation paths (add / remove / update_config) need to re-dispatch the +host's shard via ``AgentClient.deploy`` instead of running master-local +docker-compose, because the master has no containers for swarm deckies. + +These tests stub ``_load_state``, ``_compose``, and ``dispatch_decnet_config`` +so we can assert the routing decisions without spinning up a real worker. +""" +from __future__ import annotations + +from pathlib import Path +from typing import Any, AsyncIterator +from datetime import datetime, timezone + +import pytest +import pytest_asyncio + +from decnet.bus.fake import FakeBus +from decnet.engine import services_live +from decnet.models import DecnetConfig, DeckyConfig +from decnet.web.db.sqlite.repository import SQLiteRepository +import decnet.web.db.models # noqa: F401 — register tables + + +@pytest_asyncio.fixture +async def repo(tmp_path: Path) -> AsyncIterator[SQLiteRepository]: + r = SQLiteRepository(str(tmp_path / "p.db")) + await r.initialize() + yield r + + +@pytest_asyncio.fixture +async def fake_bus(monkeypatch) -> AsyncIterator[FakeBus]: + bus = FakeBus() + await bus.connect() + from decnet.bus import factory + monkeypatch.setattr(factory, "get_bus", lambda: bus) + yield bus + await bus.close() + + +def _make_decky(name: str, host_uuid: str | None) -> DeckyConfig: + """Build a minimally-valid DeckyConfig for state-fixture purposes.""" + return DeckyConfig( + name=name, + ip="10.0.0.5", + services=["ssh"], + distro="debian", + base_image="debian:bookworm-slim", + hostname=name, + host_uuid=host_uuid, + ) + + +def _make_state(decky: DeckyConfig) -> tuple[DecnetConfig, Path]: + cfg = DecnetConfig( + mode="swarm" if decky.host_uuid else "unihost", + interface="eth0", + subnet="10.0.0.0/24", + gateway="10.0.0.1", + deckies=[decky], + ) + return cfg, Path("/tmp/decnet-compose.yml") + + +@pytest_asyncio.fixture +async def swarm_fleet_decky(repo: SQLiteRepository) -> dict: + """Persist one SwarmHost + one DeckyShard for tests to mutate.""" + host_uuid = "host-uuid-1" + await repo.add_swarm_host({ + "uuid": host_uuid, + "name": "worker-01", + "address": "10.99.0.5", + "agent_port": 8765, + "status": "active", + "client_cert_fingerprint": "deadbeef" * 8, + "cert_bundle_path": "/tmp/bundle", + "enrolled_at": datetime.now(timezone.utc), + }) + decky = _make_decky("web1", host_uuid) + await repo.upsert_decky_shard({ + "decky_name": decky.name, + "host_uuid": host_uuid, + "services": '["ssh"]', + "decky_config": decky.model_dump_json(), + "decky_ip": decky.ip, + "state": "running", + "updated_at": datetime.now(timezone.utc), + }) + return {"host_uuid": host_uuid, "decky": decky} + + +@pytest_asyncio.fixture +async def local_fleet_decky() -> DeckyConfig: + """Decky without a host_uuid — purely local.""" + return _make_decky("local1", None) + + +def _patch_no_state_writes(monkeypatch) -> list[tuple]: + """Stub the disk-touching helpers so tests don't write a real state file.""" + captured_compose: list[tuple] = [] + monkeypatch.setattr(services_live, "_save_state", lambda *a, **kw: None) + monkeypatch.setattr(services_live, "_write_compose", lambda *a, **kw: None) + monkeypatch.setattr( + services_live, "_compose", + lambda *a, **kw: captured_compose.append(a), + ) + return captured_compose + + +# --------------------------- swarm fleet add -------------------------------- + + +@pytest.mark.asyncio +async def test_swarm_fleet_add_service_redispatches_and_skips_local_compose( + repo: SQLiteRepository, swarm_fleet_decky: dict, fake_bus: FakeBus, + monkeypatch, +) -> None: + captured_compose = _patch_no_state_writes(monkeypatch) + decky = swarm_fleet_decky["decky"] + state = _make_state(decky) + monkeypatch.setattr(services_live, "_load_state", lambda: state) + + dispatched: list[DecnetConfig] = [] + + async def fake_dispatch(config, repo_, dry_run=False, no_cache=False): + dispatched.append(config) + from decnet.web.db.models import SwarmDeployResponse + return SwarmDeployResponse(results=[]) + + # services_live imports lazily inside _redispatch_fleet_shard. + monkeypatch.setattr( + "decnet.web.router.swarm.api_deploy_swarm.dispatch_decnet_config", + fake_dispatch, + ) + + await services_live.add_service( + repo, decky_kind="fleet", + decky_name=decky.name, service_name="rdp", + ) + # Local _compose was NOT called for the swarm decky. + assert captured_compose == [] + # Dispatch was called with a config containing only the host's deckies. + assert len(dispatched) == 1 + sent = dispatched[0] + assert all(d.host_uuid == swarm_fleet_decky["host_uuid"] for d in sent.deckies) + assert any(d.name == decky.name for d in sent.deckies) + + +# --------------------------- swarm fleet remove ----------------------------- + + +@pytest.mark.asyncio +async def test_swarm_fleet_remove_service_redispatches_and_skips_local_compose( + repo: SQLiteRepository, swarm_fleet_decky: dict, fake_bus: FakeBus, + monkeypatch, +) -> None: + captured_compose = _patch_no_state_writes(monkeypatch) + decky = swarm_fleet_decky["decky"] + state = _make_state(decky) + monkeypatch.setattr(services_live, "_load_state", lambda: state) + + dispatched: list[Any] = [] + + async def fake_dispatch(config, repo_, dry_run=False, no_cache=False): + dispatched.append(config) + from decnet.web.db.models import SwarmDeployResponse + return SwarmDeployResponse(results=[]) + + monkeypatch.setattr( + "decnet.web.router.swarm.api_deploy_swarm.dispatch_decnet_config", + fake_dispatch, + ) + + await services_live.remove_service( + repo, decky_kind="fleet", + decky_name=decky.name, service_name="ssh", # currently on + ) + # No master-local stop / rm — those would no-op anyway, save the syscalls. + assert captured_compose == [] + assert len(dispatched) == 1 + + +# --------------------------- swarm fleet update_config ---------------------- + + +@pytest.mark.asyncio +async def test_swarm_fleet_update_config_apply_redispatches( + repo: SQLiteRepository, swarm_fleet_decky: dict, fake_bus: FakeBus, + monkeypatch, +) -> None: + captured_compose = _patch_no_state_writes(monkeypatch) + decky = swarm_fleet_decky["decky"] + state = _make_state(decky) + monkeypatch.setattr(services_live, "_load_state", lambda: state) + + dispatched: list[Any] = [] + + async def fake_dispatch(config, repo_, dry_run=False, no_cache=False): + dispatched.append(config) + from decnet.web.db.models import SwarmDeployResponse + return SwarmDeployResponse(results=[]) + + monkeypatch.setattr( + "decnet.web.router.swarm.api_deploy_swarm.dispatch_decnet_config", + fake_dispatch, + ) + + await services_live.update_service_config( + repo, decky_kind="fleet", + decky_name=decky.name, service_name="ssh", + cfg={"password": "hunter2"}, apply=True, + ) + assert captured_compose == [] + assert len(dispatched) == 1 + + +@pytest.mark.asyncio +async def test_swarm_fleet_update_config_save_only_does_not_redispatch( + repo: SQLiteRepository, swarm_fleet_decky: dict, fake_bus: FakeBus, + monkeypatch, +) -> None: + """``apply=False`` means Save: persist but don't recreate. No redispatch + either — the worker keeps its current containers running their old env.""" + _patch_no_state_writes(monkeypatch) + decky = swarm_fleet_decky["decky"] + state = _make_state(decky) + monkeypatch.setattr(services_live, "_load_state", lambda: state) + + dispatched: list[Any] = [] + + async def fake_dispatch(*a, **kw): + dispatched.append(a) + from decnet.web.db.models import SwarmDeployResponse + return SwarmDeployResponse(results=[]) + + monkeypatch.setattr( + "decnet.web.router.swarm.api_deploy_swarm.dispatch_decnet_config", + fake_dispatch, + ) + + await services_live.update_service_config( + repo, decky_kind="fleet", + decky_name=decky.name, service_name="ssh", + cfg={"password": "hunter2"}, apply=False, + ) + assert dispatched == [] + + +# --------------------------- local-only path stays local ------------------- + + +@pytest.mark.asyncio +async def test_local_fleet_add_service_runs_local_compose_no_dispatch( + repo: SQLiteRepository, local_fleet_decky: DeckyConfig, fake_bus: FakeBus, + monkeypatch, +) -> None: + """Decky with no host_uuid → no DeckyShard row → master keeps running + docker-compose locally and skips the dispatch path entirely.""" + captured_compose = _patch_no_state_writes(monkeypatch) + state = _make_state(local_fleet_decky) + monkeypatch.setattr(services_live, "_load_state", lambda: state) + + dispatched: list[Any] = [] + + async def fake_dispatch(*a, **kw): + dispatched.append(a) + from decnet.web.db.models import SwarmDeployResponse + return SwarmDeployResponse(results=[]) + + monkeypatch.setattr( + "decnet.web.router.swarm.api_deploy_swarm.dispatch_decnet_config", + fake_dispatch, + ) + + await services_live.add_service( + repo, decky_kind="fleet", + decky_name=local_fleet_decky.name, service_name="rdp", + ) + # Local compose ran (up -d --no-deps --build local1-rdp). + assert any("up" in c and f"{local_fleet_decky.name}-rdp" in c for c in captured_compose) + # AgentClient was NOT called. + assert dispatched == []