feat(swarm): self-destruct agent on decommission
Decommissioning a worker from the dashboard (or swarm controller) now asks the agent to wipe its own install before the master forgets it. The agent stops decky containers + every decnet-* systemd unit, then deletes /opt/decnet*, /etc/systemd/system/decnet-*, /var/lib/decnet/*, and /usr/local/bin/decnet*. Logs under /var/log are preserved. The reaper runs as a detached /tmp script (start_new_session=True) so it survives the agent process being killed. Self-destruct dispatch is best-effort — a dead worker doesn't block master-side cleanup.
This commit is contained in:
@@ -87,6 +87,20 @@ async def teardown(req: TeardownRequest) -> dict:
|
|||||||
return {"status": "torn_down", "decky_id": req.decky_id}
|
return {"status": "torn_down", "decky_id": req.decky_id}
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/self-destruct")
|
||||||
|
async def self_destruct() -> dict:
|
||||||
|
"""Stop all DECNET services on this worker and delete the install
|
||||||
|
footprint. Called by the master during decommission. Logs under
|
||||||
|
/var/log/decnet* are preserved. Fire-and-forget — returns 202 before
|
||||||
|
the reaper starts deleting files."""
|
||||||
|
try:
|
||||||
|
await _exec.self_destruct()
|
||||||
|
except Exception as exc:
|
||||||
|
log.exception("agent.self_destruct failed")
|
||||||
|
raise HTTPException(status_code=500, detail=str(exc)) from exc
|
||||||
|
return {"status": "self_destruct_scheduled"}
|
||||||
|
|
||||||
|
|
||||||
@app.post("/mutate")
|
@app.post("/mutate")
|
||||||
async def mutate(req: MutateRequest) -> dict:
|
async def mutate(req: MutateRequest) -> dict:
|
||||||
# Service rotation is routed through the deployer's existing mutate path
|
# Service rotation is routed through the deployer's existing mutate path
|
||||||
|
|||||||
@@ -115,6 +115,76 @@ def _decky_runtime_states(config: DecnetConfig) -> dict[str, dict[str, Any]]:
|
|||||||
return out
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
_REAPER_SCRIPT = r"""#!/bin/bash
|
||||||
|
# DECNET agent self-destruct reaper.
|
||||||
|
# Runs detached from the agent process so it survives the agent's death.
|
||||||
|
# Waits briefly for the HTTP response to drain, then stops services,
|
||||||
|
# wipes install paths, and preserves logs.
|
||||||
|
set +e
|
||||||
|
|
||||||
|
sleep 3
|
||||||
|
|
||||||
|
# Stop decky containers started by the local deployer (best-effort).
|
||||||
|
if command -v docker >/dev/null 2>&1; then
|
||||||
|
docker ps -q --filter "label=com.docker.compose.project=decnet" | xargs -r docker stop
|
||||||
|
docker ps -aq --filter "label=com.docker.compose.project=decnet" | xargs -r docker rm -f
|
||||||
|
docker network rm decnet_lan 2>/dev/null
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Stop+disable every systemd unit the installer may have dropped.
|
||||||
|
for unit in decnet-agent decnet-engine decnet-collector decnet-forwarder decnet-prober decnet-sniffer decnet-updater; do
|
||||||
|
systemctl stop "$unit" 2>/dev/null
|
||||||
|
systemctl disable "$unit" 2>/dev/null
|
||||||
|
done
|
||||||
|
|
||||||
|
# Nuke install paths. Logs under /var/log/decnet* are intentionally
|
||||||
|
# preserved — the operator typically wants them for forensic review.
|
||||||
|
rm -rf /opt/decnet* /var/lib/decnet/* /usr/local/bin/decnet*
|
||||||
|
rm -f /etc/systemd/system/decnet-*.service /etc/systemd/system/decnet-*.timer
|
||||||
|
|
||||||
|
systemctl daemon-reload 2>/dev/null
|
||||||
|
rm -f "$0"
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
async def self_destruct() -> None:
|
||||||
|
"""Tear down deckies, then spawn a detached reaper that wipes the
|
||||||
|
install footprint. Returns immediately so the HTTP response can drain
|
||||||
|
before the reaper starts deleting files out from under the agent."""
|
||||||
|
import os
|
||||||
|
import subprocess # nosec B404
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
# Best-effort teardown first — the reaper also runs docker stop, but
|
||||||
|
# going through the deployer gives the host-macvlan/ipvlan helper a
|
||||||
|
# chance to clean up routes cleanly.
|
||||||
|
try:
|
||||||
|
await asyncio.to_thread(_deployer.teardown, None)
|
||||||
|
await asyncio.to_thread(clear_state)
|
||||||
|
except Exception:
|
||||||
|
log.exception("self_destruct: pre-reap teardown failed — reaper will force-stop containers")
|
||||||
|
|
||||||
|
# Reaper lives under /tmp so it survives rm -rf /opt/decnet*.
|
||||||
|
fd, path = tempfile.mkstemp(prefix="decnet-reaper-", suffix=".sh", dir="/tmp") # nosec B108 — reaper must outlive /opt/decnet removal
|
||||||
|
try:
|
||||||
|
os.write(fd, _REAPER_SCRIPT.encode())
|
||||||
|
finally:
|
||||||
|
os.close(fd)
|
||||||
|
os.chmod(path, 0o700) # nosec B103 — root-owned reaper, needs exec
|
||||||
|
|
||||||
|
# start_new_session detaches from the agent process group so the
|
||||||
|
# reaper isn't killed when systemctl stop decnet-agent fires.
|
||||||
|
subprocess.Popen( # nosec B603
|
||||||
|
["/bin/bash", path],
|
||||||
|
stdin=subprocess.DEVNULL,
|
||||||
|
stdout=subprocess.DEVNULL,
|
||||||
|
stderr=subprocess.DEVNULL,
|
||||||
|
start_new_session=True,
|
||||||
|
close_fds=True,
|
||||||
|
)
|
||||||
|
log.warning("self_destruct: reaper spawned path=%s — agent will die in ~3s", path)
|
||||||
|
|
||||||
|
|
||||||
async def status() -> dict[str, Any]:
|
async def status() -> dict[str, Any]:
|
||||||
state = await asyncio.to_thread(load_state)
|
state = await asyncio.to_thread(load_state)
|
||||||
if state is None:
|
if state is None:
|
||||||
|
|||||||
@@ -185,6 +185,12 @@ class AgentClient:
|
|||||||
resp.raise_for_status()
|
resp.raise_for_status()
|
||||||
return resp.json()
|
return resp.json()
|
||||||
|
|
||||||
|
async def self_destruct(self) -> dict[str, Any]:
|
||||||
|
"""Trigger the worker to stop services and wipe its install."""
|
||||||
|
resp = await self._require_client().post("/self-destruct")
|
||||||
|
resp.raise_for_status()
|
||||||
|
return resp.json()
|
||||||
|
|
||||||
# -------------------------------------------------------------- diagnostics
|
# -------------------------------------------------------------- diagnostics
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
|
|||||||
@@ -3,6 +3,9 @@
|
|||||||
Removes the DeckyShard rows bound to the host (portable cascade — MySQL
|
Removes the DeckyShard rows bound to the host (portable cascade — MySQL
|
||||||
and SQLite both honor it via the repo layer), deletes the SwarmHost row,
|
and SQLite both honor it via the repo layer), deletes the SwarmHost row,
|
||||||
and best-effort-cleans the per-worker bundle directory on the master.
|
and best-effort-cleans the per-worker bundle directory on the master.
|
||||||
|
|
||||||
|
Also asks the worker agent to wipe its own install (keeping logs). A
|
||||||
|
dead/unreachable worker does not block master-side cleanup.
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
@@ -10,9 +13,12 @@ import pathlib
|
|||||||
|
|
||||||
from fastapi import APIRouter, Depends, HTTPException, status
|
from fastapi import APIRouter, Depends, HTTPException, status
|
||||||
|
|
||||||
|
from decnet.logging import get_logger
|
||||||
|
from decnet.swarm.client import AgentClient
|
||||||
from decnet.web.db.repository import BaseRepository
|
from decnet.web.db.repository import BaseRepository
|
||||||
from decnet.web.dependencies import get_repo
|
from decnet.web.dependencies import get_repo
|
||||||
|
|
||||||
|
log = get_logger("swarm.decommission")
|
||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
|
|
||||||
|
|
||||||
@@ -29,6 +35,16 @@ async def api_decommission_host(
|
|||||||
if row is None:
|
if row is None:
|
||||||
raise HTTPException(status_code=404, detail="host not found")
|
raise HTTPException(status_code=404, detail="host not found")
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with AgentClient(host=row) as agent:
|
||||||
|
await agent.self_destruct()
|
||||||
|
except Exception:
|
||||||
|
log.exception(
|
||||||
|
"decommission: self-destruct dispatch failed host=%s — "
|
||||||
|
"proceeding with master-side cleanup anyway",
|
||||||
|
row.get("name"),
|
||||||
|
)
|
||||||
|
|
||||||
await repo.delete_decky_shards_for_host(uuid)
|
await repo.delete_decky_shards_for_host(uuid)
|
||||||
await repo.delete_swarm_host(uuid)
|
await repo.delete_swarm_host(uuid)
|
||||||
|
|
||||||
|
|||||||
@@ -1,13 +1,22 @@
|
|||||||
"""DELETE /swarm/hosts/{uuid} — decommission a worker from the dashboard."""
|
"""DELETE /swarm/hosts/{uuid} — decommission a worker from the dashboard.
|
||||||
|
|
||||||
|
Also instructs the worker agent to stop all DECNET services and delete
|
||||||
|
its install footprint (keeping logs). Agent self-destruct failure does
|
||||||
|
not block decommission — the master-side cleanup always runs so a dead
|
||||||
|
worker can still be removed from the dashboard.
|
||||||
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import pathlib
|
import pathlib
|
||||||
|
|
||||||
from fastapi import APIRouter, Depends, HTTPException, status
|
from fastapi import APIRouter, Depends, HTTPException, status
|
||||||
|
|
||||||
|
from decnet.logging import get_logger
|
||||||
|
from decnet.swarm.client import AgentClient
|
||||||
from decnet.web.db.repository import BaseRepository
|
from decnet.web.db.repository import BaseRepository
|
||||||
from decnet.web.dependencies import get_repo, require_admin
|
from decnet.web.dependencies import get_repo, require_admin
|
||||||
|
|
||||||
|
log = get_logger("swarm.decommission")
|
||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
|
|
||||||
|
|
||||||
@@ -25,6 +34,21 @@ async def decommission_host(
|
|||||||
if row is None:
|
if row is None:
|
||||||
raise HTTPException(status_code=404, detail="host not found")
|
raise HTTPException(status_code=404, detail="host not found")
|
||||||
|
|
||||||
|
# Ask the worker to wipe its own install (keeps logs). The agent
|
||||||
|
# schedules the reaper as a detached process and returns immediately,
|
||||||
|
# so this call is fast when the worker is reachable. A dead worker
|
||||||
|
# shouldn't block the operator from cleaning up the dashboard entry,
|
||||||
|
# hence best-effort with a log and continue.
|
||||||
|
try:
|
||||||
|
async with AgentClient(host=row) as agent:
|
||||||
|
await agent.self_destruct()
|
||||||
|
except Exception:
|
||||||
|
log.exception(
|
||||||
|
"decommission: self-destruct dispatch failed host=%s — "
|
||||||
|
"proceeding with master-side cleanup anyway",
|
||||||
|
row.get("name"),
|
||||||
|
)
|
||||||
|
|
||||||
await repo.delete_decky_shards_for_host(uuid)
|
await repo.delete_decky_shards_for_host(uuid)
|
||||||
await repo.delete_swarm_host(uuid)
|
await repo.delete_swarm_host(uuid)
|
||||||
|
|
||||||
|
|||||||
@@ -42,4 +42,49 @@ def test_deploy_rejects_malformed_body() -> None:
|
|||||||
|
|
||||||
def test_route_set() -> None:
|
def test_route_set() -> None:
|
||||||
paths = {r.path for r in app.routes if hasattr(r, "path")}
|
paths = {r.path for r in app.routes if hasattr(r, "path")}
|
||||||
assert {"/health", "/status", "/deploy", "/teardown", "/mutate"} <= paths
|
assert {"/health", "/status", "/deploy", "/teardown", "/mutate", "/self-destruct"} <= paths
|
||||||
|
|
||||||
|
|
||||||
|
def test_self_destruct_spawns_reaper_and_returns_fast(monkeypatch, tmp_path) -> None:
|
||||||
|
"""/self-destruct must write the reaper script and spawn it detached
|
||||||
|
(start_new_session=True). We intercept Popen so the test doesn't
|
||||||
|
actually nuke anything."""
|
||||||
|
from decnet.agent import executor as _exec
|
||||||
|
|
||||||
|
spawned: list[dict] = []
|
||||||
|
|
||||||
|
class _FakePopen:
|
||||||
|
def __init__(self, args, **kw):
|
||||||
|
spawned.append({"args": args, "kw": kw})
|
||||||
|
|
||||||
|
monkeypatch.setattr(_exec, "_deployer", type("X", (), {
|
||||||
|
"teardown": staticmethod(lambda _id: None),
|
||||||
|
})())
|
||||||
|
monkeypatch.setattr(_exec, "clear_state", lambda: None)
|
||||||
|
|
||||||
|
import subprocess as _sp
|
||||||
|
monkeypatch.setattr(_sp, "Popen", _FakePopen)
|
||||||
|
|
||||||
|
client = TestClient(app)
|
||||||
|
resp = client.post("/self-destruct")
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert resp.json()["status"] == "self_destruct_scheduled"
|
||||||
|
assert len(spawned) == 1
|
||||||
|
assert spawned[0]["kw"].get("start_new_session") is True
|
||||||
|
script_path = spawned[0]["args"][1]
|
||||||
|
assert script_path.startswith("/tmp/decnet-reaper-")
|
||||||
|
# Reaper content sanity check — covers the paths the operator asked for.
|
||||||
|
import pathlib
|
||||||
|
body = pathlib.Path(script_path).read_text()
|
||||||
|
assert "/opt/decnet*" in body
|
||||||
|
assert "/etc/systemd/system/decnet-" in body
|
||||||
|
assert "/var/lib/decnet/*" in body
|
||||||
|
assert "/usr/local/bin/decnet*" in body
|
||||||
|
# Logs must be preserved — no `rm` line should touch /var/log.
|
||||||
|
for line in body.splitlines():
|
||||||
|
stripped = line.strip()
|
||||||
|
if stripped.startswith("#") or not stripped:
|
||||||
|
continue
|
||||||
|
if stripped.startswith("rm "):
|
||||||
|
assert "/var/log" not in stripped
|
||||||
|
pathlib.Path(script_path).unlink(missing_ok=True)
|
||||||
|
|||||||
@@ -158,6 +158,60 @@ def test_decommission_removes_host_and_bundle(
|
|||||||
assert not bundle_dir.exists()
|
assert not bundle_dir.exists()
|
||||||
|
|
||||||
|
|
||||||
|
def test_decommission_dispatches_self_destruct_to_agent(
|
||||||
|
client: TestClient, ca_dir: pathlib.Path, monkeypatch: pytest.MonkeyPatch
|
||||||
|
) -> None:
|
||||||
|
"""Decommission must ask the worker to wipe its own install. Otherwise
|
||||||
|
the agent keeps running after the dashboard forgets it exists."""
|
||||||
|
calls: list[str] = []
|
||||||
|
|
||||||
|
class _SelfDestructAgent:
|
||||||
|
def __init__(self, host=None, **_):
|
||||||
|
self._host = host or {}
|
||||||
|
|
||||||
|
async def __aenter__(self): return self
|
||||||
|
async def __aexit__(self, *exc): return None
|
||||||
|
|
||||||
|
async def self_destruct(self):
|
||||||
|
calls.append(self._host.get("name") or "?")
|
||||||
|
return {"status": "self_destruct_scheduled"}
|
||||||
|
|
||||||
|
from decnet.web.router.swarm import api_decommission_host as decom_mod
|
||||||
|
monkeypatch.setattr(decom_mod, "AgentClient", _SelfDestructAgent)
|
||||||
|
|
||||||
|
reg = client.post(
|
||||||
|
"/swarm/enroll",
|
||||||
|
json={"name": "worker-nuke", "address": "10.0.0.8", "agent_port": 8765},
|
||||||
|
).json()
|
||||||
|
resp = client.delete(f"/swarm/hosts/{reg['host_uuid']}")
|
||||||
|
assert resp.status_code == 204
|
||||||
|
assert calls == ["worker-nuke"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_decommission_proceeds_when_agent_unreachable(
|
||||||
|
client: TestClient, ca_dir: pathlib.Path, monkeypatch: pytest.MonkeyPatch
|
||||||
|
) -> None:
|
||||||
|
"""A dead worker must not block the operator from cleaning up the
|
||||||
|
dashboard. Self-destruct failure is logged, decommission proceeds."""
|
||||||
|
class _DeadAgent:
|
||||||
|
def __init__(self, host=None, **_): pass
|
||||||
|
async def __aenter__(self): return self
|
||||||
|
async def __aexit__(self, *exc): return None
|
||||||
|
async def self_destruct(self):
|
||||||
|
raise RuntimeError("connection refused")
|
||||||
|
|
||||||
|
from decnet.web.router.swarm import api_decommission_host as decom_mod
|
||||||
|
monkeypatch.setattr(decom_mod, "AgentClient", _DeadAgent)
|
||||||
|
|
||||||
|
reg = client.post(
|
||||||
|
"/swarm/enroll",
|
||||||
|
json={"name": "worker-dead", "address": "10.0.0.8", "agent_port": 8765},
|
||||||
|
).json()
|
||||||
|
resp = client.delete(f"/swarm/hosts/{reg['host_uuid']}")
|
||||||
|
assert resp.status_code == 204
|
||||||
|
assert client.get(f"/swarm/hosts/{reg['host_uuid']}").status_code == 404
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------- /deploy
|
# ---------------------------------------------------------------- /deploy
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user