diff --git a/decnet/mutator/ops.py b/decnet/mutator/ops.py index efc9b296..742cb9bb 100644 --- a/decnet/mutator/ops.py +++ b/decnet/mutator/ops.py @@ -186,6 +186,315 @@ async def _materialise_lan_change( ) +def _decky_targets(decky_name: str, services: list[str]) -> list[str]: + """Compose service names for one decky: base + each per-decky service. + + Skips ``fleet_singleton`` services — those run once fleet-wide and + don't have a per-decky compose entry. Mirrors the same filter + applied at compose-render time + (:mod:`decnet.topology.compose.generate_topology_compose`). + """ + from decnet.services.registry import get_service + targets = [decky_name] + for svc_name in services: + try: + svc = get_service(svc_name) + except KeyError: + # Unknown service — leave it; the compose render won't emit + # a fragment for it, so compose up will simply ignore the + # name with a clear "no such service" error. Surface that + # rather than silently dropping it. + targets.append(f"{decky_name}-{svc_name}") + continue + if svc.fleet_singleton: + continue + targets.append(f"{decky_name}-{svc_name}") + return targets + + +async def _live_topology_or_none( + repo: Any, topology_id: str, +) -> Optional[dict[str, Any]]: + """Return the topology row only when it's eligible for live materialisation. + + Returns None (so callers can skip with a single ``if`` check) when: + + * the topology doesn't exist; + * status is not ``active`` or ``degraded`` (pending topologies get + everything materialised at deploy time); + * the topology is pinned to a swarm agent (cross-host live editing + is its own routing workstream). + """ + topology = await repo.get_topology(topology_id) + if topology is None: + return None + if topology.get("status") not in ("active", "degraded"): + return None + if topology.get("target_host_uuid"): + _log.info( + "live decky op skipped (agent-pinned topology=%s); " + "next agent push will reconcile", + topology_id, + ) + return None + return topology + + +async def _rerender_compose(repo: Any, topology_id: str) -> None: + """Re-render the per-topology compose file from the current DB. + + Called after each materialisation step so the file on disk matches + the topology rows. Soft-fails: a render error is logged but + doesn't poison the DB-side mutation. + """ + from decnet.engine.deployer import _topology_compose_path + from decnet.topology.compose import write_topology_compose + hydrated = await hydrate(repo, topology_id) + if hydrated is None: + return + try: + write_topology_compose(hydrated, _topology_compose_path(topology_id)) + except Exception as exc: # noqa: BLE001 + _log.warning( + "live op: compose re-render failed topology=%s: %s", + topology_id, exc, + ) + + +async def _materialise_decky_spawn( + repo: Any, topology_id: str, decky_name: str, services: list[str], +) -> None: + """compose up -d --no-deps --build for one decky (base + services). + + Re-renders compose first so the file lists the new decky. No-op + when the topology isn't eligible for live materialisation (see + :func:`_live_topology_or_none`). Best-effort: docker failure is + logged, not re-raised — DB row is the source of truth. + """ + if await _live_topology_or_none(repo, topology_id) is None: + return + import anyio + from decnet.engine.deployer import ( + _compose_with_retry, + _topology_compose_path, + ) + await _rerender_compose(repo, topology_id) + targets = _decky_targets(decky_name, services) + compose_path = _topology_compose_path(topology_id) + try: + await anyio.to_thread.run_sync( + lambda: _compose_with_retry( + "up", "-d", "--no-deps", "--build", *targets, + compose_file=compose_path, + ), + ) + except Exception as exc: # noqa: BLE001 + _log.error( + "live add_decky: compose up failed topology=%s decky=%s: %s", + topology_id, decky_name, exc, + ) + + +async def _materialise_decky_remove( + repo: Any, topology_id: str, decky_name: str, services: list[str], +) -> None: + """compose stop + rm -f for one decky's containers, then re-render.""" + if await _live_topology_or_none(repo, topology_id) is None: + return + import anyio + from decnet.engine.deployer import _compose, _topology_compose_path + + targets = _decky_targets(decky_name, services) + compose_path = _topology_compose_path(topology_id) + # Stop + rm BEFORE re-rendering compose; the re-rendered file no + # longer mentions the decky, so a stop run AFTER rendering would + # find no service to act on. + try: + await anyio.to_thread.run_sync( + lambda: _compose("stop", *targets, compose_file=compose_path), + ) + except Exception as exc: # noqa: BLE001 + _log.warning( + "live remove_decky: compose stop failed topology=%s decky=%s: %s", + topology_id, decky_name, exc, + ) + try: + await anyio.to_thread.run_sync( + lambda: _compose("rm", "-f", *targets, compose_file=compose_path), + ) + except Exception as exc: # noqa: BLE001 + _log.warning( + "live remove_decky: compose rm failed topology=%s decky=%s: %s", + topology_id, decky_name, exc, + ) + await _rerender_compose(repo, topology_id) + + +async def _materialise_decky_connect( + repo: Any, topology_id: str, + decky_name: str, lan_name: str, ipv4_address: str, +) -> None: + """SDK ``network.connect`` to multi-home a running base container. + + Service containers share the base's netns via ``network_mode: + service:`` (see :mod:`decnet.topology.compose`), so attaching + the base alone gives every service container the new interface for + free — we don't need to iterate. + """ + if await _live_topology_or_none(repo, topology_id) is None: + return + import docker + from decnet.topology.compose import _container_name, _network_name + + net_name = _network_name(topology_id, lan_name) + container_name = _container_name(topology_id, decky_name) + try: + client = docker.from_env() + net = client.networks.get(net_name) + container = client.containers.get(container_name) + net.connect(container, ipv4_address=ipv4_address) + except docker.errors.APIError as exc: + # Idempotency — already on the network is fine. + msg = str(exc).lower() + if "already" in msg or "endpoint" in msg and "exists" in msg: + _log.info( + "live attach_decky: %s already on network %s — skipping", + container_name, net_name, + ) + else: + _log.error( + "live attach_decky: connect failed topology=%s decky=%s lan=%s: %s", + topology_id, decky_name, lan_name, exc, + ) + except Exception as exc: # noqa: BLE001 + _log.error( + "live attach_decky: SDK call crashed topology=%s decky=%s lan=%s: %s", + topology_id, decky_name, lan_name, exc, + ) + await _rerender_compose(repo, topology_id) + + +async def _materialise_decky_disconnect( + repo: Any, topology_id: str, decky_name: str, lan_name: str, +) -> None: + """SDK ``network.disconnect`` to drop a multi-home edge.""" + if await _live_topology_or_none(repo, topology_id) is None: + return + import docker + from decnet.topology.compose import _container_name, _network_name + + net_name = _network_name(topology_id, lan_name) + container_name = _container_name(topology_id, decky_name) + try: + client = docker.from_env() + net = client.networks.get(net_name) + container = client.containers.get(container_name) + net.disconnect(container) + except docker.errors.APIError as exc: + msg = str(exc).lower() + if "not connected" in msg or "no such" in msg: + _log.info( + "live detach_decky: %s already off network %s — skipping", + container_name, net_name, + ) + else: + _log.error( + "live detach_decky: disconnect failed topology=%s decky=%s lan=%s: %s", + topology_id, decky_name, lan_name, exc, + ) + except Exception as exc: # noqa: BLE001 + _log.error( + "live detach_decky: SDK call crashed topology=%s decky=%s lan=%s: %s", + topology_id, decky_name, lan_name, exc, + ) + await _rerender_compose(repo, topology_id) + + +async def _materialise_decky_services_diff( + repo: Any, topology_id: str, + decky_name: str, + added: list[str], + removed: list[str], +) -> None: + """Add/remove per-service containers without touching siblings. + + Mirrors :mod:`decnet.engine.services_live`'s up/down pattern but + without coupling the mutator to that module — service mutations + routed via the mutator queue publish ``mutation.applied`` while the + direct API publishes ``decky..service_added``; they share + machinery, not control flow. + """ + if not added and not removed: + return + if await _live_topology_or_none(repo, topology_id) is None: + return + import anyio + from decnet.engine.deployer import ( + _compose, _compose_with_retry, _topology_compose_path, + ) + await _rerender_compose(repo, topology_id) + compose_path = _topology_compose_path(topology_id) + add_targets = _decky_targets(decky_name, list(added))[1:] # drop the base + if add_targets: + try: + await anyio.to_thread.run_sync( + lambda: _compose_with_retry( + "up", "-d", "--no-deps", "--build", *add_targets, + compose_file=compose_path, + ), + ) + except Exception as exc: # noqa: BLE001 + _log.error( + "live update_decky add: compose up failed topology=%s decky=%s: %s", + topology_id, decky_name, exc, + ) + rm_targets = _decky_targets(decky_name, list(removed))[1:] + for action_name, args in (("stop", ("stop",)), ("rm", ("rm", "-f"))): + if not rm_targets: + break + try: + await anyio.to_thread.run_sync( + lambda args=args: _compose(*args, *rm_targets, compose_file=compose_path), + ) + except Exception as exc: # noqa: BLE001 + _log.warning( + "live update_decky %s failed topology=%s decky=%s: %s", + action_name, topology_id, decky_name, exc, + ) + + +async def _materialise_decky_recreate_base( + repo: Any, topology_id: str, decky_name: str, +) -> None: + """Force-recreate just the base container (used for forwards_l3 flips). + + DESTRUCTIVE: kills any in-container state on the base. Service + containers re-attach via ``network_mode: service:`` after the + base is rebuilt. Caller is responsible for gating this on an + explicit operator-supplied ``force=true`` flag. + """ + if await _live_topology_or_none(repo, topology_id) is None: + return + import anyio + from decnet.engine.deployer import ( + _compose_with_retry, _topology_compose_path, + ) + await _rerender_compose(repo, topology_id) + compose_path = _topology_compose_path(topology_id) + try: + await anyio.to_thread.run_sync( + lambda: _compose_with_retry( + "up", "-d", "--no-deps", "--force-recreate", decky_name, + compose_file=compose_path, + ), + ) + except Exception as exc: # noqa: BLE001 + _log.error( + "live update_decky recreate_base failed topology=%s decky=%s: %s", + topology_id, decky_name, exc, + ) + + # ------------------------------------------------------------------- ops @@ -308,11 +617,12 @@ async def apply_add_decky( if forwards_l3: decky_config["forwards_l3"] = True + services_list = list(payload.get("services", [])) decky_uuid = await repo.add_topology_decky( { "topology_id": topology_id, "name": name, - "services": list(payload.get("services", [])), + "services": services_list, "decky_config": decky_config, "x": payload.get("x"), "y": payload.get("y"), @@ -327,6 +637,10 @@ async def apply_add_decky( "forwards_l3": forwards_l3, } ) + # Live materialisation: spawn the new decky's containers without + # touching siblings. Skips on pending / agent-pinned topologies — + # see _live_topology_or_none. + await _materialise_decky_spawn(repo, topology_id, name, services_list) await _assert_valid_after(repo, topology_id)