From 14250cacade6dd97ebd8a11a015dfebe0a4bb2fc Mon Sep 17 00:00:00 2001 From: anti Date: Sun, 19 Apr 2026 20:47:09 -0400 Subject: [PATCH] feat(swarm): self-destruct agent on decommission MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Decommissioning a worker from the dashboard (or swarm controller) now asks the agent to wipe its own install before the master forgets it. The agent stops decky containers + every decnet-* systemd unit, then deletes /opt/decnet*, /etc/systemd/system/decnet-*, /var/lib/decnet/*, and /usr/local/bin/decnet*. Logs under /var/log are preserved. The reaper runs as a detached /tmp script (start_new_session=True) so it survives the agent process being killed. Self-destruct dispatch is best-effort — a dead worker doesn't block master-side cleanup. --- decnet/agent/app.py | 14 ++++ decnet/agent/executor.py | 70 +++++++++++++++++++ decnet/swarm/client.py | 6 ++ .../web/router/swarm/api_decommission_host.py | 16 +++++ .../swarm_mgmt/api_decommission_host.py | 26 ++++++- tests/swarm/test_agent_app.py | 47 ++++++++++++- tests/swarm/test_swarm_api.py | 54 ++++++++++++++ 7 files changed, 231 insertions(+), 2 deletions(-) diff --git a/decnet/agent/app.py b/decnet/agent/app.py index fb72390..ce1213d 100644 --- a/decnet/agent/app.py +++ b/decnet/agent/app.py @@ -87,6 +87,20 @@ async def teardown(req: TeardownRequest) -> dict: return {"status": "torn_down", "decky_id": req.decky_id} +@app.post("/self-destruct") +async def self_destruct() -> dict: + """Stop all DECNET services on this worker and delete the install + footprint. Called by the master during decommission. Logs under + /var/log/decnet* are preserved. Fire-and-forget — returns 202 before + the reaper starts deleting files.""" + try: + await _exec.self_destruct() + except Exception as exc: + log.exception("agent.self_destruct failed") + raise HTTPException(status_code=500, detail=str(exc)) from exc + return {"status": "self_destruct_scheduled"} + + @app.post("/mutate") async def mutate(req: MutateRequest) -> dict: # Service rotation is routed through the deployer's existing mutate path diff --git a/decnet/agent/executor.py b/decnet/agent/executor.py index 76eaff2..b93a0cb 100644 --- a/decnet/agent/executor.py +++ b/decnet/agent/executor.py @@ -115,6 +115,76 @@ def _decky_runtime_states(config: DecnetConfig) -> dict[str, dict[str, Any]]: return out +_REAPER_SCRIPT = r"""#!/bin/bash +# DECNET agent self-destruct reaper. +# Runs detached from the agent process so it survives the agent's death. +# Waits briefly for the HTTP response to drain, then stops services, +# wipes install paths, and preserves logs. +set +e + +sleep 3 + +# Stop decky containers started by the local deployer (best-effort). +if command -v docker >/dev/null 2>&1; then + docker ps -q --filter "label=com.docker.compose.project=decnet" | xargs -r docker stop + docker ps -aq --filter "label=com.docker.compose.project=decnet" | xargs -r docker rm -f + docker network rm decnet_lan 2>/dev/null +fi + +# Stop+disable every systemd unit the installer may have dropped. +for unit in decnet-agent decnet-engine decnet-collector decnet-forwarder decnet-prober decnet-sniffer decnet-updater; do + systemctl stop "$unit" 2>/dev/null + systemctl disable "$unit" 2>/dev/null +done + +# Nuke install paths. Logs under /var/log/decnet* are intentionally +# preserved — the operator typically wants them for forensic review. +rm -rf /opt/decnet* /var/lib/decnet/* /usr/local/bin/decnet* +rm -f /etc/systemd/system/decnet-*.service /etc/systemd/system/decnet-*.timer + +systemctl daemon-reload 2>/dev/null +rm -f "$0" +""" + + +async def self_destruct() -> None: + """Tear down deckies, then spawn a detached reaper that wipes the + install footprint. Returns immediately so the HTTP response can drain + before the reaper starts deleting files out from under the agent.""" + import os + import subprocess # nosec B404 + import tempfile + + # Best-effort teardown first — the reaper also runs docker stop, but + # going through the deployer gives the host-macvlan/ipvlan helper a + # chance to clean up routes cleanly. + try: + await asyncio.to_thread(_deployer.teardown, None) + await asyncio.to_thread(clear_state) + except Exception: + log.exception("self_destruct: pre-reap teardown failed — reaper will force-stop containers") + + # Reaper lives under /tmp so it survives rm -rf /opt/decnet*. + fd, path = tempfile.mkstemp(prefix="decnet-reaper-", suffix=".sh", dir="/tmp") # nosec B108 — reaper must outlive /opt/decnet removal + try: + os.write(fd, _REAPER_SCRIPT.encode()) + finally: + os.close(fd) + os.chmod(path, 0o700) # nosec B103 — root-owned reaper, needs exec + + # start_new_session detaches from the agent process group so the + # reaper isn't killed when systemctl stop decnet-agent fires. + subprocess.Popen( # nosec B603 + ["/bin/bash", path], + stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + start_new_session=True, + close_fds=True, + ) + log.warning("self_destruct: reaper spawned path=%s — agent will die in ~3s", path) + + async def status() -> dict[str, Any]: state = await asyncio.to_thread(load_state) if state is None: diff --git a/decnet/swarm/client.py b/decnet/swarm/client.py index 1d14e17..6f16e8e 100644 --- a/decnet/swarm/client.py +++ b/decnet/swarm/client.py @@ -185,6 +185,12 @@ class AgentClient: resp.raise_for_status() return resp.json() + async def self_destruct(self) -> dict[str, Any]: + """Trigger the worker to stop services and wipe its install.""" + resp = await self._require_client().post("/self-destruct") + resp.raise_for_status() + return resp.json() + # -------------------------------------------------------------- diagnostics def __repr__(self) -> str: diff --git a/decnet/web/router/swarm/api_decommission_host.py b/decnet/web/router/swarm/api_decommission_host.py index 1d109ae..966784f 100644 --- a/decnet/web/router/swarm/api_decommission_host.py +++ b/decnet/web/router/swarm/api_decommission_host.py @@ -3,6 +3,9 @@ Removes the DeckyShard rows bound to the host (portable cascade — MySQL and SQLite both honor it via the repo layer), deletes the SwarmHost row, and best-effort-cleans the per-worker bundle directory on the master. + +Also asks the worker agent to wipe its own install (keeping logs). A +dead/unreachable worker does not block master-side cleanup. """ from __future__ import annotations @@ -10,9 +13,12 @@ import pathlib from fastapi import APIRouter, Depends, HTTPException, status +from decnet.logging import get_logger +from decnet.swarm.client import AgentClient from decnet.web.db.repository import BaseRepository from decnet.web.dependencies import get_repo +log = get_logger("swarm.decommission") router = APIRouter() @@ -29,6 +35,16 @@ async def api_decommission_host( if row is None: raise HTTPException(status_code=404, detail="host not found") + try: + async with AgentClient(host=row) as agent: + await agent.self_destruct() + except Exception: + log.exception( + "decommission: self-destruct dispatch failed host=%s — " + "proceeding with master-side cleanup anyway", + row.get("name"), + ) + await repo.delete_decky_shards_for_host(uuid) await repo.delete_swarm_host(uuid) diff --git a/decnet/web/router/swarm_mgmt/api_decommission_host.py b/decnet/web/router/swarm_mgmt/api_decommission_host.py index e39f055..6c86b1f 100644 --- a/decnet/web/router/swarm_mgmt/api_decommission_host.py +++ b/decnet/web/router/swarm_mgmt/api_decommission_host.py @@ -1,13 +1,22 @@ -"""DELETE /swarm/hosts/{uuid} — decommission a worker from the dashboard.""" +"""DELETE /swarm/hosts/{uuid} — decommission a worker from the dashboard. + +Also instructs the worker agent to stop all DECNET services and delete +its install footprint (keeping logs). Agent self-destruct failure does +not block decommission — the master-side cleanup always runs so a dead +worker can still be removed from the dashboard. +""" from __future__ import annotations import pathlib from fastapi import APIRouter, Depends, HTTPException, status +from decnet.logging import get_logger +from decnet.swarm.client import AgentClient from decnet.web.db.repository import BaseRepository from decnet.web.dependencies import get_repo, require_admin +log = get_logger("swarm.decommission") router = APIRouter() @@ -25,6 +34,21 @@ async def decommission_host( if row is None: raise HTTPException(status_code=404, detail="host not found") + # Ask the worker to wipe its own install (keeps logs). The agent + # schedules the reaper as a detached process and returns immediately, + # so this call is fast when the worker is reachable. A dead worker + # shouldn't block the operator from cleaning up the dashboard entry, + # hence best-effort with a log and continue. + try: + async with AgentClient(host=row) as agent: + await agent.self_destruct() + except Exception: + log.exception( + "decommission: self-destruct dispatch failed host=%s — " + "proceeding with master-side cleanup anyway", + row.get("name"), + ) + await repo.delete_decky_shards_for_host(uuid) await repo.delete_swarm_host(uuid) diff --git a/tests/swarm/test_agent_app.py b/tests/swarm/test_agent_app.py index fa02817..725c399 100644 --- a/tests/swarm/test_agent_app.py +++ b/tests/swarm/test_agent_app.py @@ -42,4 +42,49 @@ def test_deploy_rejects_malformed_body() -> None: def test_route_set() -> None: paths = {r.path for r in app.routes if hasattr(r, "path")} - assert {"/health", "/status", "/deploy", "/teardown", "/mutate"} <= paths + assert {"/health", "/status", "/deploy", "/teardown", "/mutate", "/self-destruct"} <= paths + + +def test_self_destruct_spawns_reaper_and_returns_fast(monkeypatch, tmp_path) -> None: + """/self-destruct must write the reaper script and spawn it detached + (start_new_session=True). We intercept Popen so the test doesn't + actually nuke anything.""" + from decnet.agent import executor as _exec + + spawned: list[dict] = [] + + class _FakePopen: + def __init__(self, args, **kw): + spawned.append({"args": args, "kw": kw}) + + monkeypatch.setattr(_exec, "_deployer", type("X", (), { + "teardown": staticmethod(lambda _id: None), + })()) + monkeypatch.setattr(_exec, "clear_state", lambda: None) + + import subprocess as _sp + monkeypatch.setattr(_sp, "Popen", _FakePopen) + + client = TestClient(app) + resp = client.post("/self-destruct") + assert resp.status_code == 200 + assert resp.json()["status"] == "self_destruct_scheduled" + assert len(spawned) == 1 + assert spawned[0]["kw"].get("start_new_session") is True + script_path = spawned[0]["args"][1] + assert script_path.startswith("/tmp/decnet-reaper-") + # Reaper content sanity check — covers the paths the operator asked for. + import pathlib + body = pathlib.Path(script_path).read_text() + assert "/opt/decnet*" in body + assert "/etc/systemd/system/decnet-" in body + assert "/var/lib/decnet/*" in body + assert "/usr/local/bin/decnet*" in body + # Logs must be preserved — no `rm` line should touch /var/log. + for line in body.splitlines(): + stripped = line.strip() + if stripped.startswith("#") or not stripped: + continue + if stripped.startswith("rm "): + assert "/var/log" not in stripped + pathlib.Path(script_path).unlink(missing_ok=True) diff --git a/tests/swarm/test_swarm_api.py b/tests/swarm/test_swarm_api.py index 77629d8..e43dd59 100644 --- a/tests/swarm/test_swarm_api.py +++ b/tests/swarm/test_swarm_api.py @@ -158,6 +158,60 @@ def test_decommission_removes_host_and_bundle( assert not bundle_dir.exists() +def test_decommission_dispatches_self_destruct_to_agent( + client: TestClient, ca_dir: pathlib.Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """Decommission must ask the worker to wipe its own install. Otherwise + the agent keeps running after the dashboard forgets it exists.""" + calls: list[str] = [] + + class _SelfDestructAgent: + def __init__(self, host=None, **_): + self._host = host or {} + + async def __aenter__(self): return self + async def __aexit__(self, *exc): return None + + async def self_destruct(self): + calls.append(self._host.get("name") or "?") + return {"status": "self_destruct_scheduled"} + + from decnet.web.router.swarm import api_decommission_host as decom_mod + monkeypatch.setattr(decom_mod, "AgentClient", _SelfDestructAgent) + + reg = client.post( + "/swarm/enroll", + json={"name": "worker-nuke", "address": "10.0.0.8", "agent_port": 8765}, + ).json() + resp = client.delete(f"/swarm/hosts/{reg['host_uuid']}") + assert resp.status_code == 204 + assert calls == ["worker-nuke"] + + +def test_decommission_proceeds_when_agent_unreachable( + client: TestClient, ca_dir: pathlib.Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """A dead worker must not block the operator from cleaning up the + dashboard. Self-destruct failure is logged, decommission proceeds.""" + class _DeadAgent: + def __init__(self, host=None, **_): pass + async def __aenter__(self): return self + async def __aexit__(self, *exc): return None + async def self_destruct(self): + raise RuntimeError("connection refused") + + from decnet.web.router.swarm import api_decommission_host as decom_mod + monkeypatch.setattr(decom_mod, "AgentClient", _DeadAgent) + + reg = client.post( + "/swarm/enroll", + json={"name": "worker-dead", "address": "10.0.0.8", "agent_port": 8765}, + ).json() + resp = client.delete(f"/swarm/hosts/{reg['host_uuid']}") + assert resp.status_code == 204 + assert client.get(f"/swarm/hosts/{reg['host_uuid']}").status_code == 404 + + # ---------------------------------------------------------------- /deploy