From 1e8ca4cc05e40e99ffae3744945bca184e7478bd Mon Sep 17 00:00:00 2001 From: anti Date: Sat, 18 Apr 2026 19:52:37 -0400 Subject: [PATCH] feat(swarm-cli): add `decnet swarm {enroll,list,decommission}` + `deploy --mode swarm` New sub-app talks HTTP to the local swarm controller (127.0.0.1:8770 by default; override with --url or $DECNET_SWARMCTL_URL). - enroll: POSTs /swarm/enroll, prints fingerprint, optionally writes ca.crt/worker.crt/worker.key to --out-dir for scp to the worker. - list: renders enrolled workers as a rich table (with --status filter). - decommission: looks up uuid by --name, confirms, DELETEs. deploy --mode swarm now: 1. fetches enrolled+active workers from the controller, 2. round-robin-assigns host_uuid to each decky, 3. POSTs the sharded DecnetConfig to /swarm/deploy, 4. renders per-worker pass/fail in a results table. Exits non-zero if no workers exist or any worker's dispatch failed. --- decnet/cli.py | 203 ++++++++++++++++++++++++++++++++++ tests/swarm/test_cli_swarm.py | 191 ++++++++++++++++++++++++++++++++ 2 files changed, 394 insertions(+) create mode 100644 tests/swarm/test_cli_swarm.py diff --git a/decnet/cli.py b/decnet/cli.py index b0453b6..cc71e5c 100644 --- a/decnet/cli.py +++ b/decnet/cli.py @@ -244,6 +244,200 @@ def forwarder( pass +# --------------------------------------------------------------------------- +# `decnet swarm ...` — master-side operator commands (HTTP to local swarmctl) +# --------------------------------------------------------------------------- + +swarm_app = typer.Typer( + name="swarm", + help="Manage swarm workers (enroll, list, decommission). Requires `decnet swarmctl` running.", + no_args_is_help=True, +) +app.add_typer(swarm_app, name="swarm") + + +_DEFAULT_SWARMCTL_URL = "http://127.0.0.1:8770" + + +def _swarmctl_base_url(url: Optional[str]) -> str: + import os as _os + return url or _os.environ.get("DECNET_SWARMCTL_URL", _DEFAULT_SWARMCTL_URL) + + +def _http_request(method: str, url: str, *, json_body: Optional[dict] = None, timeout: float = 30.0): + """Tiny sync wrapper around httpx; avoids leaking async into the CLI.""" + import httpx + try: + resp = httpx.request(method, url, json=json_body, timeout=timeout) + except httpx.HTTPError as exc: + console.print(f"[red]Could not reach swarm controller at {url}: {exc}[/]") + console.print("[dim]Is `decnet swarmctl` running?[/]") + raise typer.Exit(2) + if resp.status_code >= 400: + try: + detail = resp.json().get("detail", resp.text) + except Exception: # nosec B110 + detail = resp.text + console.print(f"[red]{method} {url} failed: {resp.status_code} — {detail}[/]") + raise typer.Exit(1) + return resp + + +def _deploy_swarm(config: "DecnetConfig", *, dry_run: bool, no_cache: bool) -> None: + """Shard deckies round-robin across enrolled workers and POST to swarmctl.""" + base = _swarmctl_base_url(None) + resp = _http_request("GET", base + "/swarm/hosts?host_status=enrolled") + enrolled = resp.json() + resp2 = _http_request("GET", base + "/swarm/hosts?host_status=active") + active = resp2.json() + # Treat enrolled+active workers as dispatch targets. + workers = [*enrolled, *active] + if not workers: + console.print("[red]No enrolled workers — run `decnet swarm enroll ...` first.[/]") + raise typer.Exit(1) + + # Round-robin assign deckies to workers by host_uuid (mutate the config's + # decky entries in-place — DecnetConfig is a pydantic model so we use + # model_copy on each decky). + assigned: list = [] + for idx, d in enumerate(config.deckies): + target = workers[idx % len(workers)] + assigned.append(d.model_copy(update={"host_uuid": target["uuid"]})) + config = config.model_copy(update={"deckies": assigned}) + + body = {"config": config.model_dump(mode="json"), "dry_run": dry_run, "no_cache": no_cache} + console.print(f"[cyan]Dispatching {len(config.deckies)} deckies across {len(workers)} worker(s)...[/]") + # Swarm deploy can be slow (image pulls on each worker) — give it plenty. + resp3 = _http_request("POST", base + "/swarm/deploy", json_body=body, timeout=900.0) + results = resp3.json().get("results", []) + + table = Table(title="SWARM deploy results") + for col in ("worker", "host_uuid", "ok", "detail"): + table.add_column(col) + any_failed = False + for r in results: + ok = bool(r.get("ok")) + if not ok: + any_failed = True + detail = r.get("detail") + if isinstance(detail, dict): + detail = detail.get("status") or "ok" + table.add_row( + str(r.get("host_name") or ""), + str(r.get("host_uuid") or ""), + "[green]yes[/]" if ok else "[red]no[/]", + str(detail)[:80], + ) + console.print(table) + if any_failed: + raise typer.Exit(1) + + +@swarm_app.command("enroll") +def swarm_enroll( + name: str = typer.Option(..., "--name", help="Short hostname for the worker (also the cert CN)"), + address: str = typer.Option(..., "--address", help="IP or DNS the master uses to reach the worker"), + agent_port: int = typer.Option(8765, "--agent-port", help="Worker agent TCP port"), + sans: Optional[str] = typer.Option(None, "--sans", help="Comma-separated extra SANs for the worker cert"), + notes: Optional[str] = typer.Option(None, "--notes", help="Free-form operator notes"), + out_dir: Optional[str] = typer.Option(None, "--out-dir", help="Write the bundle (ca.crt/worker.crt/worker.key) to this dir for scp"), + url: Optional[str] = typer.Option(None, "--url", help="Override swarm controller URL (default: 127.0.0.1:8770)"), +) -> None: + """Issue a mTLS bundle for a new worker and register it in the swarm.""" + import pathlib as _pathlib + + body: dict = {"name": name, "address": address, "agent_port": agent_port} + if sans: + body["sans"] = [s.strip() for s in sans.split(",") if s.strip()] + if notes: + body["notes"] = notes + + resp = _http_request("POST", _swarmctl_base_url(url) + "/swarm/enroll", json_body=body) + data = resp.json() + + console.print(f"[green]Enrolled worker:[/] {data['name']} " + f"[dim]uuid=[/]{data['host_uuid']} " + f"[dim]fingerprint=[/]{data['fingerprint']}") + + if out_dir: + target = _pathlib.Path(out_dir).expanduser() + target.mkdir(parents=True, exist_ok=True) + (target / "ca.crt").write_text(data["ca_cert_pem"]) + (target / "worker.crt").write_text(data["worker_cert_pem"]) + (target / "worker.key").write_text(data["worker_key_pem"]) + for leaf in ("worker.key",): + try: + (target / leaf).chmod(0o600) + except OSError: + pass + console.print(f"[cyan]Bundle written to[/] {target}") + console.print("[dim]Ship this directory to the worker at ~/.decnet/agent/ (or wherever `decnet agent --agent-dir` points).[/]") + else: + console.print("[yellow]No --out-dir given — bundle PEMs are in the JSON response; persist them before leaving this shell.[/]") + + +@swarm_app.command("list") +def swarm_list( + host_status: Optional[str] = typer.Option(None, "--status", help="Filter by status (enrolled|active|unreachable|decommissioned)"), + url: Optional[str] = typer.Option(None, "--url", help="Override swarm controller URL"), +) -> None: + """List enrolled workers.""" + q = f"?host_status={host_status}" if host_status else "" + resp = _http_request("GET", _swarmctl_base_url(url) + "/swarm/hosts" + q) + rows = resp.json() + if not rows: + console.print("[dim]No workers enrolled.[/]") + return + table = Table(title="DECNET swarm workers") + for col in ("name", "address", "port", "status", "last heartbeat", "enrolled"): + table.add_column(col) + for r in rows: + table.add_row( + r.get("name") or "", + r.get("address") or "", + str(r.get("agent_port") or ""), + r.get("status") or "", + str(r.get("last_heartbeat") or "—"), + str(r.get("enrolled_at") or "—"), + ) + console.print(table) + + +@swarm_app.command("decommission") +def swarm_decommission( + name: Optional[str] = typer.Option(None, "--name", help="Worker hostname"), + uuid: Optional[str] = typer.Option(None, "--uuid", help="Worker UUID (skip lookup)"), + url: Optional[str] = typer.Option(None, "--url", help="Override swarm controller URL"), + yes: bool = typer.Option(False, "--yes", "-y", help="Skip interactive confirmation"), +) -> None: + """Remove a worker from the swarm (cascades decky shard rows).""" + if not (name or uuid): + console.print("[red]Supply --name or --uuid.[/]") + raise typer.Exit(2) + + base = _swarmctl_base_url(url) + target_uuid = uuid + target_name = name + if target_uuid is None: + resp = _http_request("GET", base + "/swarm/hosts") + rows = resp.json() + match = next((r for r in rows if r.get("name") == name), None) + if match is None: + console.print(f"[red]No enrolled worker named '{name}'.[/]") + raise typer.Exit(1) + target_uuid = match["uuid"] + target_name = match.get("name") or target_name + + if not yes: + confirm = typer.confirm(f"Decommission worker {target_name!r} ({target_uuid})?", default=False) + if not confirm: + console.print("[dim]Aborted.[/]") + raise typer.Exit(0) + + _http_request("DELETE", f"{base}/swarm/hosts/{target_uuid}") + console.print(f"[green]Decommissioned {target_name or target_uuid}.[/]") + + @app.command() def deploy( mode: str = typer.Option("unihost", "--mode", "-m", help="Deployment mode: unihost | swarm"), @@ -395,6 +589,15 @@ def deploy( ) log.debug("deploy: config built deckies=%d interface=%s subnet=%s", len(config.deckies), config.interface, config.subnet) + + if mode == "swarm": + _deploy_swarm(config, dry_run=dry_run, no_cache=no_cache) + if dry_run: + log.info("deploy: swarm dry-run complete, no workers dispatched") + else: + log.info("deploy: swarm deployment complete deckies=%d", len(config.deckies)) + return + from decnet.engine import deploy as _deploy _deploy(config, dry_run=dry_run, no_cache=no_cache, parallel=parallel) if dry_run: diff --git a/tests/swarm/test_cli_swarm.py b/tests/swarm/test_cli_swarm.py new file mode 100644 index 0000000..840dadd --- /dev/null +++ b/tests/swarm/test_cli_swarm.py @@ -0,0 +1,191 @@ +"""CLI `decnet swarm {enroll,list,decommission}` + `deploy --mode swarm`. + +Controller HTTP is stubbed via monkeypatching `_http_request`; we aren't +testing the controller (that has its own test file) or httpx itself. We +*are* testing: arg parsing, URL construction, round-robin sharding of +deckies, bundle file output, error paths when the controller rejects. +""" +from __future__ import annotations + +import json +import pathlib +from typing import Any + +import pytest +from typer.testing import CliRunner + +from decnet import cli as cli_mod +from decnet.cli import app + + +runner = CliRunner() + + +class _FakeResp: + def __init__(self, payload: Any, status: int = 200): + self._payload = payload + self.status_code = status + self.text = json.dumps(payload) if not isinstance(payload, str) else payload + + def json(self) -> Any: + return self._payload + + +class _HttpStub(list): + """Both a call log and a scripted-reply registry.""" + def __init__(self) -> None: + super().__init__() + self.script: dict[tuple[str, str], _FakeResp] = {} + + +@pytest.fixture +def http_stub(monkeypatch: pytest.MonkeyPatch) -> _HttpStub: + calls = _HttpStub() + + def _fake(method, url, *, json_body=None, timeout=30.0): + calls.append((method, url, json_body)) + for (m, suffix), resp in calls.script.items(): + if m == method and url.endswith(suffix): + return resp + raise AssertionError(f"Unscripted HTTP call: {method} {url}") + + monkeypatch.setattr(cli_mod, "_http_request", _fake) + return calls + + +# ------------------------------------------------------------- swarm list + + +def test_swarm_list_empty(http_stub) -> None: + http_stub.script[("GET", "/swarm/hosts")] = _FakeResp([]) + result = runner.invoke(app, ["swarm", "list"]) + assert result.exit_code == 0 + assert "No workers" in result.output + + +def test_swarm_list_with_rows(http_stub) -> None: + http_stub.script[("GET", "/swarm/hosts")] = _FakeResp([ + {"uuid": "u1", "name": "decky01", "address": "10.0.0.1", + "agent_port": 8765, "status": "active", "last_heartbeat": None, + "enrolled_at": "2026-04-18T00:00:00Z", "notes": None, + "client_cert_fingerprint": "ab:cd"}, + ]) + result = runner.invoke(app, ["swarm", "list"]) + assert result.exit_code == 0 + assert "decky01" in result.output + assert "10.0.0.1" in result.output + + +def test_swarm_list_passes_status_filter(http_stub) -> None: + http_stub.script[("GET", "/swarm/hosts?host_status=active")] = _FakeResp([]) + result = runner.invoke(app, ["swarm", "list", "--status", "active"]) + assert result.exit_code == 0 + # last call URL ended with the filter suffix + assert http_stub[-1][1].endswith("/swarm/hosts?host_status=active") + + +# ------------------------------------------------------------- swarm enroll + + +def test_swarm_enroll_writes_bundle(http_stub, tmp_path: pathlib.Path) -> None: + http_stub.script[("POST", "/swarm/enroll")] = _FakeResp({ + "host_uuid": "u-123", "name": "decky01", "address": "10.0.0.1", + "agent_port": 8765, "fingerprint": "de:ad:be:ef", + "ca_cert_pem": "CA-PEM", "worker_cert_pem": "CRT-PEM", + "worker_key_pem": "KEY-PEM", + }) + out = tmp_path / "bundle" + result = runner.invoke(app, [ + "swarm", "enroll", + "--name", "decky01", "--address", "10.0.0.1", + "--sans", "decky01.lan,10.0.0.1", + "--out-dir", str(out), + ]) + assert result.exit_code == 0, result.output + assert (out / "ca.crt").read_text() == "CA-PEM" + assert (out / "worker.crt").read_text() == "CRT-PEM" + assert (out / "worker.key").read_text() == "KEY-PEM" + # SANs were forwarded in the JSON body. + _, _, body = http_stub[0] + assert body["sans"] == ["decky01.lan", "10.0.0.1"] + + +# ------------------------------------------------------------- swarm decommission + + +def test_swarm_decommission_by_name_looks_up_uuid(http_stub) -> None: + http_stub.script[("GET", "/swarm/hosts")] = _FakeResp([ + {"uuid": "u-x", "name": "decky02"}, + ]) + http_stub.script[("DELETE", "/swarm/hosts/u-x")] = _FakeResp({}, status=204) + result = runner.invoke(app, ["swarm", "decommission", "--name", "decky02", "--yes"]) + assert result.exit_code == 0, result.output + methods = [c[0] for c in http_stub] + assert methods == ["GET", "DELETE"] + + +def test_swarm_decommission_name_not_found(http_stub) -> None: + http_stub.script[("GET", "/swarm/hosts")] = _FakeResp([]) + result = runner.invoke(app, ["swarm", "decommission", "--name", "ghost", "--yes"]) + assert result.exit_code == 1 + assert "No enrolled worker" in result.output + + +def test_swarm_decommission_requires_identifier() -> None: + result = runner.invoke(app, ["swarm", "decommission", "--yes"]) + assert result.exit_code == 2 + + +# ------------------------------------------------------------- deploy --mode swarm + + +def test_deploy_swarm_round_robins_and_posts(http_stub, monkeypatch: pytest.MonkeyPatch) -> None: + """deploy --mode swarm fetches hosts, assigns host_uuid round-robin, + POSTs to /swarm/deploy with the sharded config.""" + # Two enrolled workers, zero active. + http_stub.script[("GET", "/swarm/hosts?host_status=enrolled")] = _FakeResp([ + {"uuid": "u-a", "name": "A", "address": "10.0.0.1", "agent_port": 8765, + "status": "enrolled"}, + {"uuid": "u-b", "name": "B", "address": "10.0.0.2", "agent_port": 8765, + "status": "enrolled"}, + ]) + http_stub.script[("GET", "/swarm/hosts?host_status=active")] = _FakeResp([]) + http_stub.script[("POST", "/swarm/deploy")] = _FakeResp({ + "results": [ + {"host_uuid": "u-a", "host_name": "A", "ok": True, "detail": {"status": "ok"}}, + {"host_uuid": "u-b", "host_name": "B", "ok": True, "detail": {"status": "ok"}}, + ], + }) + + # Stub network detection so we don't need root / real NICs. + monkeypatch.setattr(cli_mod, "detect_interface", lambda: "eth0") + monkeypatch.setattr(cli_mod, "detect_subnet", lambda _iface: ("10.0.0.0/24", "10.0.0.254")) + monkeypatch.setattr(cli_mod, "get_host_ip", lambda _iface: "10.0.0.100") + + result = runner.invoke(app, [ + "deploy", "--mode", "swarm", "--deckies", "3", + "--services", "ssh", "--dry-run", + ]) + assert result.exit_code == 0, result.output + + # Find the POST /swarm/deploy body and confirm round-robin sharding. + post = next(c for c in http_stub if c[0] == "POST" and c[1].endswith("/swarm/deploy")) + body = post[2] + uuids = [d["host_uuid"] for d in body["config"]["deckies"]] + assert uuids == ["u-a", "u-b", "u-a"] + assert body["dry_run"] is True + + +def test_deploy_swarm_fails_if_no_workers(http_stub, monkeypatch: pytest.MonkeyPatch) -> None: + http_stub.script[("GET", "/swarm/hosts?host_status=enrolled")] = _FakeResp([]) + http_stub.script[("GET", "/swarm/hosts?host_status=active")] = _FakeResp([]) + monkeypatch.setattr(cli_mod, "detect_interface", lambda: "eth0") + monkeypatch.setattr(cli_mod, "detect_subnet", lambda _iface: ("10.0.0.0/24", "10.0.0.254")) + monkeypatch.setattr(cli_mod, "get_host_ip", lambda _iface: "10.0.0.100") + + result = runner.invoke(app, [ + "deploy", "--mode", "swarm", "--deckies", "2", + "--services", "ssh", "--dry-run", + ]) + assert result.exit_code == 1 + assert "No enrolled workers" in result.output