feat(mutator): live spawn for apply_add_decky + shared materialisation helpers

Adds _materialise_decky_{spawn,remove,connect,disconnect,services_diff,recreate_base}
helpers alongside the existing _materialise_lan_change.  Each follows
the same skip rules: bail when topology is not active/degraded, when
agent-pinned, or when docker calls fail (logged, not re-raised — DB
remains source of truth).

apply_add_decky now calls _materialise_decky_spawn after the DB writes.
The helper:

* re-renders the per-topology compose so it lists the new decky;
* runs 'compose up -d --no-deps --build <decky_base> <decky>-<svc>...'
  in a worker thread (matches engine/services_live's pattern).

Service container targets are filtered through get_service() so
fleet_singleton services are skipped — they don't have per-decky
compose entries.  Gateway (forwards_l3=True) deckies need no
special-case here; the compose generator already emits the host
'ports:' block for them.

Subsequent commits wire the other apply_* ops to the matching
helpers.  Tests for the full set ship in the workstream's last
commit.
This commit is contained in:
2026-04-29 00:14:18 -04:00
parent 578cdf9e2e
commit 8c06190e69

View File

@@ -186,6 +186,315 @@ async def _materialise_lan_change(
)
def _decky_targets(decky_name: str, services: list[str]) -> list[str]:
"""Compose service names for one decky: base + each per-decky service.
Skips ``fleet_singleton`` services — those run once fleet-wide and
don't have a per-decky compose entry. Mirrors the same filter
applied at compose-render time
(:mod:`decnet.topology.compose.generate_topology_compose`).
"""
from decnet.services.registry import get_service
targets = [decky_name]
for svc_name in services:
try:
svc = get_service(svc_name)
except KeyError:
# Unknown service — leave it; the compose render won't emit
# a fragment for it, so compose up will simply ignore the
# name with a clear "no such service" error. Surface that
# rather than silently dropping it.
targets.append(f"{decky_name}-{svc_name}")
continue
if svc.fleet_singleton:
continue
targets.append(f"{decky_name}-{svc_name}")
return targets
async def _live_topology_or_none(
repo: Any, topology_id: str,
) -> Optional[dict[str, Any]]:
"""Return the topology row only when it's eligible for live materialisation.
Returns None (so callers can skip with a single ``if`` check) when:
* the topology doesn't exist;
* status is not ``active`` or ``degraded`` (pending topologies get
everything materialised at deploy time);
* the topology is pinned to a swarm agent (cross-host live editing
is its own routing workstream).
"""
topology = await repo.get_topology(topology_id)
if topology is None:
return None
if topology.get("status") not in ("active", "degraded"):
return None
if topology.get("target_host_uuid"):
_log.info(
"live decky op skipped (agent-pinned topology=%s); "
"next agent push will reconcile",
topology_id,
)
return None
return topology
async def _rerender_compose(repo: Any, topology_id: str) -> None:
"""Re-render the per-topology compose file from the current DB.
Called after each materialisation step so the file on disk matches
the topology rows. Soft-fails: a render error is logged but
doesn't poison the DB-side mutation.
"""
from decnet.engine.deployer import _topology_compose_path
from decnet.topology.compose import write_topology_compose
hydrated = await hydrate(repo, topology_id)
if hydrated is None:
return
try:
write_topology_compose(hydrated, _topology_compose_path(topology_id))
except Exception as exc: # noqa: BLE001
_log.warning(
"live op: compose re-render failed topology=%s: %s",
topology_id, exc,
)
async def _materialise_decky_spawn(
repo: Any, topology_id: str, decky_name: str, services: list[str],
) -> None:
"""compose up -d --no-deps --build for one decky (base + services).
Re-renders compose first so the file lists the new decky. No-op
when the topology isn't eligible for live materialisation (see
:func:`_live_topology_or_none`). Best-effort: docker failure is
logged, not re-raised — DB row is the source of truth.
"""
if await _live_topology_or_none(repo, topology_id) is None:
return
import anyio
from decnet.engine.deployer import (
_compose_with_retry,
_topology_compose_path,
)
await _rerender_compose(repo, topology_id)
targets = _decky_targets(decky_name, services)
compose_path = _topology_compose_path(topology_id)
try:
await anyio.to_thread.run_sync(
lambda: _compose_with_retry(
"up", "-d", "--no-deps", "--build", *targets,
compose_file=compose_path,
),
)
except Exception as exc: # noqa: BLE001
_log.error(
"live add_decky: compose up failed topology=%s decky=%s: %s",
topology_id, decky_name, exc,
)
async def _materialise_decky_remove(
repo: Any, topology_id: str, decky_name: str, services: list[str],
) -> None:
"""compose stop + rm -f for one decky's containers, then re-render."""
if await _live_topology_or_none(repo, topology_id) is None:
return
import anyio
from decnet.engine.deployer import _compose, _topology_compose_path
targets = _decky_targets(decky_name, services)
compose_path = _topology_compose_path(topology_id)
# Stop + rm BEFORE re-rendering compose; the re-rendered file no
# longer mentions the decky, so a stop run AFTER rendering would
# find no service to act on.
try:
await anyio.to_thread.run_sync(
lambda: _compose("stop", *targets, compose_file=compose_path),
)
except Exception as exc: # noqa: BLE001
_log.warning(
"live remove_decky: compose stop failed topology=%s decky=%s: %s",
topology_id, decky_name, exc,
)
try:
await anyio.to_thread.run_sync(
lambda: _compose("rm", "-f", *targets, compose_file=compose_path),
)
except Exception as exc: # noqa: BLE001
_log.warning(
"live remove_decky: compose rm failed topology=%s decky=%s: %s",
topology_id, decky_name, exc,
)
await _rerender_compose(repo, topology_id)
async def _materialise_decky_connect(
repo: Any, topology_id: str,
decky_name: str, lan_name: str, ipv4_address: str,
) -> None:
"""SDK ``network.connect`` to multi-home a running base container.
Service containers share the base's netns via ``network_mode:
service:<base>`` (see :mod:`decnet.topology.compose`), so attaching
the base alone gives every service container the new interface for
free — we don't need to iterate.
"""
if await _live_topology_or_none(repo, topology_id) is None:
return
import docker
from decnet.topology.compose import _container_name, _network_name
net_name = _network_name(topology_id, lan_name)
container_name = _container_name(topology_id, decky_name)
try:
client = docker.from_env()
net = client.networks.get(net_name)
container = client.containers.get(container_name)
net.connect(container, ipv4_address=ipv4_address)
except docker.errors.APIError as exc:
# Idempotency — already on the network is fine.
msg = str(exc).lower()
if "already" in msg or "endpoint" in msg and "exists" in msg:
_log.info(
"live attach_decky: %s already on network %s — skipping",
container_name, net_name,
)
else:
_log.error(
"live attach_decky: connect failed topology=%s decky=%s lan=%s: %s",
topology_id, decky_name, lan_name, exc,
)
except Exception as exc: # noqa: BLE001
_log.error(
"live attach_decky: SDK call crashed topology=%s decky=%s lan=%s: %s",
topology_id, decky_name, lan_name, exc,
)
await _rerender_compose(repo, topology_id)
async def _materialise_decky_disconnect(
repo: Any, topology_id: str, decky_name: str, lan_name: str,
) -> None:
"""SDK ``network.disconnect`` to drop a multi-home edge."""
if await _live_topology_or_none(repo, topology_id) is None:
return
import docker
from decnet.topology.compose import _container_name, _network_name
net_name = _network_name(topology_id, lan_name)
container_name = _container_name(topology_id, decky_name)
try:
client = docker.from_env()
net = client.networks.get(net_name)
container = client.containers.get(container_name)
net.disconnect(container)
except docker.errors.APIError as exc:
msg = str(exc).lower()
if "not connected" in msg or "no such" in msg:
_log.info(
"live detach_decky: %s already off network %s — skipping",
container_name, net_name,
)
else:
_log.error(
"live detach_decky: disconnect failed topology=%s decky=%s lan=%s: %s",
topology_id, decky_name, lan_name, exc,
)
except Exception as exc: # noqa: BLE001
_log.error(
"live detach_decky: SDK call crashed topology=%s decky=%s lan=%s: %s",
topology_id, decky_name, lan_name, exc,
)
await _rerender_compose(repo, topology_id)
async def _materialise_decky_services_diff(
repo: Any, topology_id: str,
decky_name: str,
added: list[str],
removed: list[str],
) -> None:
"""Add/remove per-service containers without touching siblings.
Mirrors :mod:`decnet.engine.services_live`'s up/down pattern but
without coupling the mutator to that module — service mutations
routed via the mutator queue publish ``mutation.applied`` while the
direct API publishes ``decky.<name>.service_added``; they share
machinery, not control flow.
"""
if not added and not removed:
return
if await _live_topology_or_none(repo, topology_id) is None:
return
import anyio
from decnet.engine.deployer import (
_compose, _compose_with_retry, _topology_compose_path,
)
await _rerender_compose(repo, topology_id)
compose_path = _topology_compose_path(topology_id)
add_targets = _decky_targets(decky_name, list(added))[1:] # drop the base
if add_targets:
try:
await anyio.to_thread.run_sync(
lambda: _compose_with_retry(
"up", "-d", "--no-deps", "--build", *add_targets,
compose_file=compose_path,
),
)
except Exception as exc: # noqa: BLE001
_log.error(
"live update_decky add: compose up failed topology=%s decky=%s: %s",
topology_id, decky_name, exc,
)
rm_targets = _decky_targets(decky_name, list(removed))[1:]
for action_name, args in (("stop", ("stop",)), ("rm", ("rm", "-f"))):
if not rm_targets:
break
try:
await anyio.to_thread.run_sync(
lambda args=args: _compose(*args, *rm_targets, compose_file=compose_path),
)
except Exception as exc: # noqa: BLE001
_log.warning(
"live update_decky %s failed topology=%s decky=%s: %s",
action_name, topology_id, decky_name, exc,
)
async def _materialise_decky_recreate_base(
repo: Any, topology_id: str, decky_name: str,
) -> None:
"""Force-recreate just the base container (used for forwards_l3 flips).
DESTRUCTIVE: kills any in-container state on the base. Service
containers re-attach via ``network_mode: service:<base>`` after the
base is rebuilt. Caller is responsible for gating this on an
explicit operator-supplied ``force=true`` flag.
"""
if await _live_topology_or_none(repo, topology_id) is None:
return
import anyio
from decnet.engine.deployer import (
_compose_with_retry, _topology_compose_path,
)
await _rerender_compose(repo, topology_id)
compose_path = _topology_compose_path(topology_id)
try:
await anyio.to_thread.run_sync(
lambda: _compose_with_retry(
"up", "-d", "--no-deps", "--force-recreate", decky_name,
compose_file=compose_path,
),
)
except Exception as exc: # noqa: BLE001
_log.error(
"live update_decky recreate_base failed topology=%s decky=%s: %s",
topology_id, decky_name, exc,
)
# ------------------------------------------------------------------- ops
@@ -308,11 +617,12 @@ async def apply_add_decky(
if forwards_l3:
decky_config["forwards_l3"] = True
services_list = list(payload.get("services", []))
decky_uuid = await repo.add_topology_decky(
{
"topology_id": topology_id,
"name": name,
"services": list(payload.get("services", [])),
"services": services_list,
"decky_config": decky_config,
"x": payload.get("x"),
"y": payload.get("y"),
@@ -327,6 +637,10 @@ async def apply_add_decky(
"forwards_l3": forwards_l3,
}
)
# Live materialisation: spawn the new decky's containers without
# touching siblings. Skips on pending / agent-pinned topologies —
# see _live_topology_or_none.
await _materialise_decky_spawn(repo, topology_id, name, services_list)
await _assert_valid_after(repo, topology_id)