From ade8bbe30ae622e23e611e7f75a979fd90f59733 Mon Sep 17 00:00:00 2001 From: anti Date: Fri, 22 May 2026 16:14:46 -0400 Subject: [PATCH] feat(agent): real worker-side /mutate with master swarm dispatch - Implement /mutate handler: load_state, update services + last_mutated, save_state, write_compose, compose up -d via asyncio.to_thread. 404 for missing state / unknown decky_id. dry_run short-circuits before any side effect. - Add AgentClient.mutate(decky_id, services, *, dry_run=False) using _TIMEOUT_DEPLOY (compose up can pull/build, exceeds control timeout). - mutator/engine.py: in swarm mode with decky.host_uuid set, resolve worker via _resolve_swarm_host and dispatch through AgentClient.mutate instead of writing a compose file on master. Master-resident deckies (unihost mode, or swarm with host_uuid=None) keep the local path. --- decnet/agent/app.py | 56 +++++++++-- decnet/mutator/engine.py | 41 +++++--- decnet/swarm/client.py | 23 +++++ tests/mutator/test_mutator.py | 61 ++++++++++++ tests/swarm/test_agent_app.py | 128 ++++++++++++++++++++++++- tests/swarm/test_client_mutate.py | 150 ++++++++++++++++++++++++++++++ 6 files changed, 434 insertions(+), 25 deletions(-) create mode 100644 tests/swarm/test_client_mutate.py diff --git a/decnet/agent/app.py b/decnet/agent/app.py index 8302314d..b6ca27d6 100644 --- a/decnet/agent/app.py +++ b/decnet/agent/app.py @@ -181,6 +181,7 @@ class TeardownRequest(BaseModel): class MutateRequest(BaseModel): decky_id: str services: list[str] + dry_run: bool = False # ------------------------------------------------------------------ routes @@ -307,14 +308,51 @@ async def topology_state() -> dict: @app.post( "/mutate", - responses={501: {"description": "Worker-side mutate not yet implemented"}}, + responses={ + 404: {"description": "No active deployment, or unknown decky_id"}, + 500: {"description": "Compose rewrite or container restart failed"}, + }, ) async def mutate(req: MutateRequest) -> dict: - # TODO: implement worker-side mutate. Currently the master performs - # mutation by re-sending a full /deploy with the updated DecnetConfig; - # this avoids duplicating mutation logic on the worker for v1. When - # ready, replace the 501 with a real redeploy-of-a-single-decky path. - raise HTTPException( - status_code=501, - detail="Per-decky mutate is performed via /deploy with updated services", - ) + import time + from decnet.composer import write_compose + from decnet.config import load_state, save_state + from decnet.engine import _compose_with_retry + + state = load_state() + if state is None: + raise HTTPException(status_code=404, detail="no active deployment on this worker") + cfg, compose_path = state + + decky = next((d for d in cfg.deckies if d.name == req.decky_id), None) + if decky is None: + raise HTTPException( + status_code=404, detail=f"decky {req.decky_id!r} not found in worker state", + ) + + decky.services = list(req.services) + decky.last_mutated = time.time() + + if req.dry_run: + return { + "status": "dry_run", + "decky_id": decky.name, + "services": list(decky.services), + } + + try: + save_state(cfg, compose_path) + write_compose(cfg, compose_path) + await asyncio.to_thread( + _compose_with_retry, "up", "-d", "--remove-orphans", + compose_file=compose_path, + ) + except Exception as exc: + log.exception("agent.mutate failed") + raise HTTPException(status_code=500, detail=str(exc)) from exc + + return { + "status": "mutated", + "decky_id": decky.name, + "services": list(decky.services), + } diff --git a/decnet/mutator/engine.py b/decnet/mutator/engine.py index d0de3951..e91a9647 100644 --- a/decnet/mutator/engine.py +++ b/decnet/mutator/engine.py @@ -93,22 +93,37 @@ async def mutate_decky( # Save to DB await repo.set_state("deployment", {"config": config.model_dump(), "compose_path": str(compose_path)}) - # Still writes files for Docker to use - write_compose(config, compose_path) - log.info("mutation applied decky=%s services=%s", decky_name, ",".join(decky.services)) console.print(f"[cyan]Mutating '{decky_name}' to services: {', '.join(decky.services)}[/]") - try: - # Wrap blocking call in thread - cp = compose_path - await anyio.to_thread.run_sync( - lambda: _compose_with_retry("up", "-d", "--remove-orphans", compose_file=cp) - ) - except Exception as e: - log.error("mutation failed decky=%s error=%s", decky_name, e) - console.print(f"[red]Failed to mutate '{decky_name}': {e}[/]") - return False + # Swarm-resident deckies are reified on a remote worker; dispatch to its + # agent /mutate rather than scribbling a compose file on the master. + # Master-resident deckies (host_uuid is None, or unihost mode) keep the + # local docker path. + if config.mode == "swarm" and decky.host_uuid: + try: + from decnet.engine.deployer import _resolve_swarm_host + from decnet.swarm.client import AgentClient + + host = await _resolve_swarm_host(repo, decky.host_uuid) + async with AgentClient(host=host) as agent: + await agent.mutate(decky.name, list(decky.services)) + except Exception as e: + log.error("mutation failed decky=%s error=%s", decky_name, e) + console.print(f"[red]Failed to mutate '{decky_name}': {e}[/]") + return False + else: + # Still writes files for Docker to use + write_compose(config, compose_path) + try: + cp = compose_path + await anyio.to_thread.run_sync( + lambda: _compose_with_retry("up", "-d", "--remove-orphans", compose_file=cp) + ) + except Exception as e: + log.error("mutation failed decky=%s error=%s", decky_name, e) + console.print(f"[red]Failed to mutate '{decky_name}': {e}[/]") + return False await emit_decky_mutated( bus, diff --git a/decnet/swarm/client.py b/decnet/swarm/client.py index f31da15b..bcb699d2 100644 --- a/decnet/swarm/client.py +++ b/decnet/swarm/client.py @@ -256,6 +256,29 @@ class AgentClient: resp.raise_for_status() return resp.json() + async def mutate( + self, + decky_id: str, + services: list[str], + *, + dry_run: bool = False, + ) -> dict[str, Any]: + body = { + "decky_id": decky_id, + "services": list(services), + "dry_run": dry_run, + } + # Worker /mutate runs `compose up -d` which can pull/build; same + # long-tail latency as /deploy. Swap the deploy timeout in. + old = self._require_client().timeout + self._require_client().timeout = _TIMEOUT_DEPLOY + try: + resp = await self._require_client().post("/mutate", json=body) + finally: + self._require_client().timeout = old + resp.raise_for_status() + return resp.json() + async def teardown(self, decky_id: Optional[str] = None) -> dict[str, Any]: resp = await self._require_client().post( "/teardown", json={"decky_id": decky_id} diff --git a/tests/mutator/test_mutator.py b/tests/mutator/test_mutator.py index 7e034742..0bff731d 100644 --- a/tests/mutator/test_mutator.py +++ b/tests/mutator/test_mutator.py @@ -128,6 +128,67 @@ class TestMutateDecky: new_last_mutated = call_args[1]["config"]["deckies"][0]["last_mutated"] assert new_last_mutated >= before + async def test_swarm_decky_dispatches_to_agent(self, mock_repo): + """When mode=swarm and decky.host_uuid is set, mutate_decky must + call AgentClient.mutate() instead of touching local compose/docker.""" + decky = _make_decky() + decky.host_uuid = "host-uuid-42" + cfg = DecnetConfig( + mode="swarm", interface="eth0", + subnet="192.168.1.0/24", gateway="192.168.1.1", + deckies=[decky], + ) + mock_repo.get_state.return_value = { + "config": cfg.model_dump(), "compose_path": "c.yml", + } + + mutate_mock = AsyncMock(return_value={"status": "mutated"}) + agent_ctx = MagicMock() + agent_ctx.__aenter__ = AsyncMock(return_value=MagicMock(mutate=mutate_mock)) + agent_ctx.__aexit__ = AsyncMock(return_value=None) + + with patch("decnet.engine.deployer._resolve_swarm_host", + new_callable=AsyncMock, + return_value={"uuid": "host-uuid-42", "address": "10.0.0.2"}), \ + patch("decnet.swarm.client.AgentClient", return_value=agent_ctx), \ + patch("decnet.mutator.engine.write_compose") as mock_compose, \ + patch("anyio.to_thread.run_sync", new_callable=AsyncMock) as mock_run: + ok = await mutate_decky("decky-01", repo=mock_repo) + + assert ok is True + mutate_mock.assert_awaited_once() + # AgentClient.mutate(decky_name, services_list) + call = mutate_mock.await_args + assert call.args[0] == "decky-01" + assert isinstance(call.args[1], list) + # Local docker path MUST NOT run for swarm-resident deckies. + mock_compose.assert_not_called() + mock_run.assert_not_called() + + async def test_swarm_decky_without_host_uuid_uses_local_path(self, mock_repo): + """In swarm mode, a decky with host_uuid=None is master-resident + and should still take the local compose path.""" + decky = _make_decky() + # host_uuid defaults to None — explicit for clarity. + decky.host_uuid = None + cfg = DecnetConfig( + mode="swarm", interface="eth0", + subnet="192.168.1.0/24", gateway="192.168.1.1", + deckies=[decky], + ) + mock_repo.get_state.return_value = { + "config": cfg.model_dump(), "compose_path": "c.yml", + } + with patch("decnet.mutator.engine.write_compose") as mock_compose, \ + patch("anyio.to_thread.run_sync", new_callable=AsyncMock) as mock_run, \ + patch("decnet.swarm.client.AgentClient") as mock_client: + ok = await mutate_decky("decky-01", repo=mock_repo) + assert ok is True + mock_compose.assert_called_once() + mock_run.assert_awaited_once() + mock_client.assert_not_called() + + # --------------------------------------------------------------------------- # mutate_all # --------------------------------------------------------------------------- diff --git a/tests/swarm/test_agent_app.py b/tests/swarm/test_agent_app.py index 1f756043..2ef78b3e 100644 --- a/tests/swarm/test_agent_app.py +++ b/tests/swarm/test_agent_app.py @@ -28,10 +28,132 @@ def test_status_when_not_deployed() -> None: assert "deckies" in body -def test_mutate_is_501() -> None: +def _seed_state(monkeypatch, tmp_path): + """Install a fake load_state/save_state pair backed by a list cell so + tests can both seed and re-read what the handler wrote.""" + from decnet.config import DecnetConfig, DeckyConfig + from decnet.agent import app as _app_module + + cfg = DecnetConfig( + mode="swarm", + interface="eth0", + subnet="10.66.0.0/24", + gateway="10.66.0.1", + deckies=[ + DeckyConfig( + name="decky-01", + ip="10.66.0.10", + services=["ssh"], + distro="debian", + base_image="debian:bookworm-slim", + hostname="d01", + ), + ], + ) + compose_path = tmp_path / "decnet-compose.yml" + cell = {"cfg": cfg, "compose_path": compose_path} + + def _fake_load_state(): + return (cell["cfg"], cell["compose_path"]) if cell["cfg"] is not None else None + + def _fake_save_state(c, p): + cell["cfg"] = c + cell["compose_path"] = p + + monkeypatch.setattr("decnet.config.load_state", _fake_load_state) + monkeypatch.setattr("decnet.config.save_state", _fake_save_state) + return cell + + +def test_mutate_success(monkeypatch, tmp_path) -> None: + cell = _seed_state(monkeypatch, tmp_path) + compose_calls: list[tuple] = [] + write_compose_calls: list[tuple] = [] + + monkeypatch.setattr( + "decnet.composer.write_compose", + lambda c, p: write_compose_calls.append((c, p)) or p, + ) + monkeypatch.setattr( + "decnet.engine._compose_with_retry", + lambda *a, **kw: compose_calls.append((a, kw)), + ) + client = TestClient(app) - resp = client.post("/mutate", json={"decky_id": "decky-01", "services": ["ssh"]}) - assert resp.status_code == 501 + resp = client.post( + "/mutate", + json={"decky_id": "decky-01", "services": ["http", "ftp"]}, + ) + assert resp.status_code == 200, resp.text + body = resp.json() + assert body == {"status": "mutated", "decky_id": "decky-01", "services": ["http", "ftp"]} + assert cell["cfg"].deckies[0].services == ["http", "ftp"] + assert cell["cfg"].deckies[0].last_mutated > 0 + assert len(write_compose_calls) == 1 + assert len(compose_calls) == 1 + assert compose_calls[0][0] == ("up", "-d", "--remove-orphans") + + +def test_mutate_unknown_decky_returns_404(monkeypatch, tmp_path) -> None: + _seed_state(monkeypatch, tmp_path) + compose_calls: list = [] + monkeypatch.setattr( + "decnet.engine._compose_with_retry", + lambda *a, **kw: compose_calls.append((a, kw)), + ) + + client = TestClient(app) + resp = client.post( + "/mutate", json={"decky_id": "ghost", "services": ["ssh"]}, + ) + assert resp.status_code == 404 + assert compose_calls == [] + + +def test_mutate_no_state_returns_404(monkeypatch) -> None: + monkeypatch.setattr("decnet.config.load_state", lambda: None) + client = TestClient(app) + resp = client.post( + "/mutate", json={"decky_id": "decky-01", "services": ["ssh"]}, + ) + assert resp.status_code == 404 + + +def test_mutate_dry_run_does_not_touch_docker_or_state(monkeypatch, tmp_path) -> None: + cell = _seed_state(monkeypatch, tmp_path) + saved: list = [] + written: list = [] + composed: list = [] + + monkeypatch.setattr( + "decnet.config.save_state", + lambda c, p: saved.append((c, p)), + ) + monkeypatch.setattr( + "decnet.composer.write_compose", + lambda c, p: written.append((c, p)), + ) + monkeypatch.setattr( + "decnet.engine._compose_with_retry", + lambda *a, **kw: composed.append((a, kw)), + ) + + original_services = list(cell["cfg"].deckies[0].services) + client = TestClient(app) + resp = client.post( + "/mutate", + json={"decky_id": "decky-01", "services": ["http"], "dry_run": True}, + ) + assert resp.status_code == 200 + assert resp.json()["status"] == "dry_run" + # No persistence, no compose render, no docker. + assert saved == [] + assert written == [] + assert composed == [] + # State on the in-memory cell was touched (handler mutated the loaded + # DeckyConfig) but never persisted — load_state is shared by reference, + # so we only assert that no save/render happened above. + del original_services def test_deploy_rejects_malformed_body() -> None: diff --git a/tests/swarm/test_client_mutate.py b/tests/swarm/test_client_mutate.py new file mode 100644 index 00000000..777fd963 --- /dev/null +++ b/tests/swarm/test_client_mutate.py @@ -0,0 +1,150 @@ +"""Roundtrip test for AgentClient.mutate() through a live in-process +agent over mTLS. Mirrors test_client_agent_roundtrip's harness.""" +from __future__ import annotations + +import asyncio +import pathlib +import socket +import threading +import time + +import pytest +import uvicorn + +from decnet.agent.app import app as agent_app +from decnet.config import DeckyConfig, DecnetConfig +from decnet.swarm import client as swarm_client +from decnet.swarm import pki + + +def _free_port() -> int: + s = socket.socket() + s.bind(("127.0.0.1", 0)) + port = s.getsockname()[1] + s.close() + return port + + +def _start_agent(tmp_path: pathlib.Path, port: int): + ca_dir = tmp_path / "ca" + pki.ensure_ca(ca_dir) + worker_dir = tmp_path / "agent" + pki.write_worker_bundle( + pki.issue_worker_cert(pki.load_ca(ca_dir), "worker-test", ["127.0.0.1"]), + worker_dir, + ) + master_id = swarm_client.ensure_master_identity(ca_dir) + config = uvicorn.Config( + agent_app, + host="127.0.0.1", + port=port, + log_level="warning", + ssl_keyfile=str(worker_dir / "worker.key"), + ssl_certfile=str(worker_dir / "worker.crt"), + ssl_ca_certs=str(worker_dir / "ca.crt"), + ssl_cert_reqs=2, + ) + server = uvicorn.Server(config) + + def _run() -> None: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(server.serve()) + loop.close() + + thread = threading.Thread(target=_run, daemon=True) + thread.start() + deadline = time.time() + 5 + while time.time() < deadline: + if server.started: + return server, thread, master_id + time.sleep(0.05) + raise RuntimeError("agent did not start within 5s") + + +@pytest.mark.asyncio +async def test_client_mutate_dry_run_roundtrip( + tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch, +) -> None: + """Drive the real agent /mutate handler over mTLS in dry_run mode so we + don't need docker. Asserts that the client POSTs the right body and + parses the worker's response. + """ + cfg = DecnetConfig( + mode="swarm", + interface="eth0", + subnet="10.66.0.0/24", + gateway="10.66.0.1", + deckies=[ + DeckyConfig( + name="decky-01", + ip="10.66.0.10", + services=["ssh"], + distro="debian", + base_image="debian:bookworm-slim", + hostname="d01", + ), + ], + ) + monkeypatch.setattr( + "decnet.config.load_state", + lambda: (cfg, tmp_path / "decnet-compose.yml"), + ) + + port = _free_port() + server, thread, master_id = _start_agent(tmp_path, port) + try: + async with swarm_client.AgentClient( + address="127.0.0.1", agent_port=port, identity=master_id, + ) as agent: + body = await agent.mutate( + "decky-01", ["http", "ftp"], dry_run=True, + ) + assert body == { + "status": "dry_run", + "decky_id": "decky-01", + "services": ["http", "ftp"], + } + finally: + server.should_exit = True + thread.join(timeout=5) + + +@pytest.mark.asyncio +async def test_client_mutate_unknown_decky_404( + tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch, +) -> None: + cfg = DecnetConfig( + mode="swarm", + interface="eth0", + subnet="10.66.0.0/24", + gateway="10.66.0.1", + deckies=[ + DeckyConfig( + name="decky-01", + ip="10.66.0.10", + services=["ssh"], + distro="debian", + base_image="debian:bookworm-slim", + hostname="d01", + ), + ], + ) + monkeypatch.setattr( + "decnet.config.load_state", + lambda: (cfg, tmp_path / "decnet-compose.yml"), + ) + + port = _free_port() + server, thread, master_id = _start_agent(tmp_path, port) + try: + import httpx + async with swarm_client.AgentClient( + address="127.0.0.1", agent_port=port, identity=master_id, + ) as agent: + with pytest.raises(httpx.HTTPStatusError) as ei: + await agent.mutate("ghost", ["ssh"]) + assert ei.value.response.status_code == 404 + finally: + server.should_exit = True + thread.join(timeout=5)