fix(swarm): propagate service mutations to worker agent via shard re-dispatch

Add/remove/update_config on a fleet decky living on a swarm worker — and on
an agent-pinned topology — used to run the master's local docker-compose only,
which has no containers for the remote decky. The mutation persisted on master
and silently no-op'd on the worker.

- Fleet swarm: lookup DeckyShard.host_uuid; if found, rebuild a single-host
  shard from master state and call dispatch_decnet_config — same proven path
  as POST /swarm/deploy. Skip local _compose (no containers to touch).
- Topology agent-pinned: call decnet.engine.deployer.resync_agent_topology
  (existing helper) to push the latest hydrated blob to the worker.
- Local-only deckies: behaviour unchanged.
- Tests: 5 new in tests/engine/test_services_live_swarm.py covering all
  three mutations on a swarm fleet decky (no local _compose, dispatch fires
  with the right host's deckies), plus apply=False save-only path (no
  dispatch), plus regression that local-only fleet add still runs local compose.

Bus signal `decky.{name}.service_config_changed` keeps publishing as an
audit trail; it is not the propagation trigger.
This commit is contained in:
2026-04-29 12:51:16 -04:00
parent 94b06ee862
commit eefab020d4
2 changed files with 428 additions and 45 deletions

View File

@@ -82,6 +82,66 @@ def _get_bus():
from decnet.bus.factory import get_bus
return get_bus()
# --------------------------- swarm propagation helpers ---------------------------
#
# Service mutations (add/remove/update_config) on a deployed decky used to run
# the master's local docker-compose only. For swarm fleet deckies the master
# has no containers; for agent-targeted topologies the master only writes a
# compose file the worker never sees. These helpers replay the change to the
# worker so the env actually lands.
#
# Lazy imports keep this module's import graph clean (composer/swarm pull in
# decnet.network → docker, mirroring the pattern used elsewhere in this file).
async def _fleet_decky_host_uuid(repo: BaseRepository, decky_name: str) -> Optional[str]:
"""Return ``host_uuid`` if a fleet decky lives on a swarm worker, else None."""
shards = await repo.list_decky_shards()
for s in shards:
if s.get("decky_name") == decky_name:
return s.get("host_uuid")
return None
async def _redispatch_fleet_shard(repo: BaseRepository, host_uuid: str) -> None:
"""Re-push the host's full shard to its worker agent.
Uses the same code path as POST /swarm/deploy: load master state, filter
to the host's deckies, hand to AgentClient.deploy via dispatch_decnet_config.
The agent regenerates compose and recreates only the changed containers.
Idempotent for unchanged deckies.
"""
from decnet.web.router.swarm.api_deploy_swarm import dispatch_decnet_config
state = _load_state()
if state is None:
log.warning("redispatch_fleet_shard: no fleet state on master; skipping")
return
config, _compose_path = state
host_deckies = [d for d in config.deckies if getattr(d, "host_uuid", None) == host_uuid]
if not host_deckies:
log.warning(
"redispatch_fleet_shard: master state has no deckies for host=%s; skipping",
host_uuid,
)
return
filtered = config.model_copy(update={"deckies": host_deckies})
await dispatch_decnet_config(filtered, repo)
async def _resync_agent_topology(repo: BaseRepository, topology_id: str) -> None:
"""If the topology is agent-pinned, push the latest hydrated blob to the worker."""
from decnet.engine.deployer import resync_agent_topology
hydrated = await hydrate(repo, topology_id)
if hydrated is None:
return
if not hydrated.get("topology", {}).get("target_host_uuid"):
return # unihost topology — local compose is authoritative
await resync_agent_topology(repo, topology_id)
log = get_logger("engine.services_live")
DeckyKind = Literal["fleet", "topology"]
@@ -188,18 +248,30 @@ async def _add_topology_service(
await repo.update_topology_decky(decky["uuid"], update)
compose_path = await _rerender_topology_compose(repo, topology_id)
target = f"{decky_name}-{service_name}"
# Run compose in a worker thread so the API event loop stays
# responsive — same pattern as engine/deployer.deploy_topology.
await anyio.to_thread.run_sync(
lambda: _compose(
"up", "-d", "--no-deps", "--build", target,
compose_file=compose_path,
),
)
if await _topology_is_agent_pinned(repo, topology_id):
# Agent-pinned: the master's local compose has nothing to up.
# Push the new hydrated blob to the worker.
await _resync_agent_topology(repo, topology_id)
else:
target = f"{decky_name}-{service_name}"
# Run compose in a worker thread so the API event loop stays
# responsive — same pattern as engine/deployer.deploy_topology.
await anyio.to_thread.run_sync(
lambda: _compose(
"up", "-d", "--no-deps", "--build", target,
compose_file=compose_path,
),
)
return services
async def _topology_is_agent_pinned(repo: BaseRepository, topology_id: str) -> bool:
hydrated = await hydrate(repo, topology_id)
if hydrated is None:
return False
return bool(hydrated.get("topology", {}).get("target_host_uuid"))
async def _remove_topology_service(
repo: BaseRepository,
topology_id: str,
@@ -215,17 +287,23 @@ async def _remove_topology_service(
services = [s for s in services if s != service_name]
target = f"{decky_name}-{service_name}"
compose_path = _topology_compose_path(topology_id)
# Stop + rm before persisting + re-rendering so a half-completed
# mutation leaves the operator a clear state to retry from
# (container still running; DB still says service is on).
await anyio.to_thread.run_sync(
lambda: _compose("stop", target, compose_file=compose_path),
)
await anyio.to_thread.run_sync(
lambda: _compose("rm", "-f", target, compose_file=compose_path),
)
agent_pinned = await _topology_is_agent_pinned(repo, topology_id)
if not agent_pinned:
# Stop + rm before persisting + re-rendering so a half-completed
# mutation leaves the operator a clear state to retry from
# (container still running; DB still says service is on).
await anyio.to_thread.run_sync(
lambda: _compose("stop", target, compose_file=compose_path),
)
await anyio.to_thread.run_sync(
lambda: _compose("rm", "-f", target, compose_file=compose_path),
)
await repo.update_topology_decky(decky["uuid"], {"services": services})
await _rerender_topology_compose(repo, topology_id)
if agent_pinned:
# Worker tears down the removed service when it diffs the
# incoming hydrated blob against its current state.
await _resync_agent_topology(repo, topology_id)
return services
@@ -291,13 +369,19 @@ async def _add_fleet_service(
sc[service_name] = initial_config
decky.service_config = sc
await _persist_fleet_change(repo, decky, services, compose_path)
target = f"{decky_name}-{service_name}"
await anyio.to_thread.run_sync(
lambda: _compose(
"up", "-d", "--no-deps", "--build", target,
compose_file=compose_path,
),
)
swarm_host_uuid = await _fleet_decky_host_uuid(repo, decky_name)
if swarm_host_uuid:
# Master has no container for this decky — re-push the host's
# shard so the worker materialises the new service.
await _redispatch_fleet_shard(repo, swarm_host_uuid)
else:
target = f"{decky_name}-{service_name}"
await anyio.to_thread.run_sync(
lambda: _compose(
"up", "-d", "--no-deps", "--build", target,
compose_file=compose_path,
),
)
return services
@@ -313,13 +397,21 @@ async def _remove_fleet_service(
)
services = [s for s in services if s != service_name]
target = f"{decky_name}-{service_name}"
await anyio.to_thread.run_sync(
lambda: _compose("stop", target, compose_file=compose_path),
)
await anyio.to_thread.run_sync(
lambda: _compose("rm", "-f", target, compose_file=compose_path),
)
swarm_host_uuid = await _fleet_decky_host_uuid(repo, decky_name)
if not swarm_host_uuid:
# Local: stop+rm before persist so the operator has a clear retry
# state if compose fails halfway. Swarm: skip — the worker's compose
# will handle the removal when the redispatched config drops the
# service from the decky.
await anyio.to_thread.run_sync(
lambda: _compose("stop", target, compose_file=compose_path),
)
await anyio.to_thread.run_sync(
lambda: _compose("rm", "-f", target, compose_file=compose_path),
)
await _persist_fleet_change(repo, decky, services, compose_path)
if swarm_host_uuid:
await _redispatch_fleet_shard(repo, swarm_host_uuid)
return services
@@ -464,13 +556,16 @@ async def _update_topology_service_config(
await repo.update_topology_decky(decky["uuid"], {"decky_config": cfg_blob})
compose_path = await _rerender_topology_compose(repo, topology_id)
if apply:
target = f"{decky_name}-{service_name}"
await anyio.to_thread.run_sync(
lambda: _compose(
"up", "-d", "--no-deps", "--force-recreate", "--build", target,
compose_file=compose_path,
),
)
if await _topology_is_agent_pinned(repo, topology_id):
await _resync_agent_topology(repo, topology_id)
else:
target = f"{decky_name}-{service_name}"
await anyio.to_thread.run_sync(
lambda: _compose(
"up", "-d", "--no-deps", "--force-recreate", "--build", target,
compose_file=compose_path,
),
)
async def _update_fleet_service_config(
@@ -502,13 +597,17 @@ async def _update_fleet_service_config(
"state": "running",
})
if apply:
target = f"{decky_name}-{service_name}"
await anyio.to_thread.run_sync(
lambda: _compose(
"up", "-d", "--no-deps", "--force-recreate", "--build", target,
compose_file=compose_path,
),
)
swarm_host_uuid = await _fleet_decky_host_uuid(repo, decky_name)
if swarm_host_uuid:
await _redispatch_fleet_shard(repo, swarm_host_uuid)
else:
target = f"{decky_name}-{service_name}"
await anyio.to_thread.run_sync(
lambda: _compose(
"up", "-d", "--no-deps", "--force-recreate", "--build", target,
compose_file=compose_path,
),
)
async def remove_service(