From 13ea9169432d459054cae0b7663b5406d6b67cd6 Mon Sep 17 00:00:00 2001 From: anti Date: Wed, 22 Apr 2026 14:12:29 -0400 Subject: [PATCH] feat(workers): add start + start-all endpoints (systemd supervisor) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit POST /api/v1/workers/{name}/start — 202 on acceptance, 404 unknown worker, 503 if the unit file is not installed, 502 if systemctl returns non-zero (stderr snippet in detail, full stack logged). Admin only. POST /api/v1/workers/start-all — best-effort: walks the worker list in dependency order (bus → api → data-plane), skips already-active and uninstalled units, aggregates outcomes into {started, already_running, failed[]}. Returns 200 even on partial failure; the caller reads the three lists. Both endpoints delegate to the systemd_control helper, so the attack surface for "what gets executed" is locked to `decnet- .service` at two layers (router KNOWN_WORKERS + helper regex). --- decnet/web/router/__init__.py | 4 + .../router/workers/api_start_all_workers.py | 95 ++++++++++ decnet/web/router/workers/api_start_worker.py | 72 +++++++ tests/api/workers/test_start_workers.py | 179 ++++++++++++++++++ 4 files changed, 350 insertions(+) create mode 100644 decnet/web/router/workers/api_start_all_workers.py create mode 100644 decnet/web/router/workers/api_start_worker.py create mode 100644 tests/api/workers/test_start_workers.py diff --git a/decnet/web/router/__init__.py b/decnet/web/router/__init__.py index 6cbff07c..ab4fbb5c 100644 --- a/decnet/web/router/__init__.py +++ b/decnet/web/router/__init__.py @@ -24,6 +24,8 @@ from .config.api_reinit import router as config_reinit_router from .health.api_get_health import router as health_router from .workers.api_list_workers import router as workers_list_router from .workers.api_control_worker import router as workers_control_router +from .workers.api_start_worker import router as workers_start_router +from .workers.api_start_all_workers import router as workers_start_all_router from .artifacts.api_get_artifact import router as artifacts_router from .swarm_updates import swarm_updates_router from .swarm_mgmt import swarm_mgmt_router @@ -73,6 +75,8 @@ api_router.include_router(stream_router) api_router.include_router(health_router) api_router.include_router(workers_list_router) api_router.include_router(workers_control_router) +api_router.include_router(workers_start_router) +api_router.include_router(workers_start_all_router) # Configuration api_router.include_router(config_get_router) diff --git a/decnet/web/router/workers/api_start_all_workers.py b/decnet/web/router/workers/api_start_all_workers.py new file mode 100644 index 00000000..0323a1e0 --- /dev/null +++ b/decnet/web/router/workers/api_start_all_workers.py @@ -0,0 +1,95 @@ +from fastapi import APIRouter, Depends + +from decnet.logging import get_logger +from decnet.telemetry import traced as _traced +from decnet.web.db.models import StartAllResponse, StartFailure +from decnet.web.dependencies import require_admin +from decnet.web.services import systemd_control +from decnet.web.worker_registry import KNOWN_WORKERS + +log = get_logger("api") + +router = APIRouter() + + +# Order matters — bus comes up first so subsequent workers have a place +# to publish their heartbeats; then the API, then the data-plane set. +# Anything unknown in KNOWN_WORKERS but not here gets appended at the +# end so new worker names still get started even if we forget to place +# them explicitly. +_PREFERRED_ORDER: tuple[str, ...] = ( + "bus", + "api", + "collector", + "profiler", + "sniffer", + "prober", + "mutator", +) + + +def _ordered() -> list[str]: + seen: set[str] = set() + out: list[str] = [] + for name in _PREFERRED_ORDER: + if name in KNOWN_WORKERS and name not in seen: + out.append(name) + seen.add(name) + for name in KNOWN_WORKERS: + if name not in seen: + out.append(name) + seen.add(name) + return out + + +@router.post( + "/workers/start-all", + response_model=StartAllResponse, + tags=["Observability"], + responses={ + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + }, +) +@_traced("api.start_all_workers") +async def start_all_workers( + admin: dict = Depends(require_admin), +) -> StartAllResponse: + """Best-effort: bring up every installed worker unit in order. + + Workers already ``active`` are counted in ``already_running`` and + skipped. Workers without a unit file (common on dev boxes) are + silently skipped — the UI already renders them as not-installed. + Returns 200 even on partial failure; the caller reads the three + lists. Started sequentially, not in parallel: systemd dependency + ordering (bus → api → data-plane) matters. + """ + installed = await systemd_control.list_installed() + started: list[str] = [] + already_running: list[str] = [] + failed: list[StartFailure] = [] + + for name in _ordered(): + if name not in installed: + continue + try: + if await systemd_control.is_active(name): + already_running.append(name) + continue + await systemd_control.start(name) + started.append(name) + except systemd_control.SystemctlError as exc: + snippet = (exc.stderr.splitlines() or ["systemctl failed"])[0][:200] + failed.append(StartFailure(name=name, reason=snippet)) + log.warning("start-all: %s failed: %s", name, snippet) + + log.info( + "workers: start-all by=%s started=%d already=%d failed=%d", + admin.get("username") or admin.get("sub") or "admin", + len(started), len(already_running), len(failed), + ) + return StartAllResponse( + started=started, + already_running=already_running, + failed=failed, + ) diff --git a/decnet/web/router/workers/api_start_worker.py b/decnet/web/router/workers/api_start_worker.py new file mode 100644 index 00000000..1a470a44 --- /dev/null +++ b/decnet/web/router/workers/api_start_worker.py @@ -0,0 +1,72 @@ +from fastapi import APIRouter, Depends, HTTPException, status +from fastapi.responses import ORJSONResponse + +from decnet.logging import get_logger +from decnet.telemetry import traced as _traced +from decnet.web.db.models import WorkerControlResponse +from decnet.web.dependencies import require_admin +from decnet.web.services import systemd_control +from decnet.web.worker_registry import KNOWN_WORKERS + +log = get_logger("api") + +router = APIRouter() + + +@router.post( + "/workers/{name}/start", + tags=["Observability"], + responses={ + 202: {"model": WorkerControlResponse, "description": "Start issued via systemd"}, + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + 404: {"description": "Unknown worker"}, + 502: {"description": "systemctl returned non-zero"}, + 503: {"description": "Unit file not installed on this host"}, + }, +) +@_traced("api.start_worker") +async def start_worker( + name: str, + admin: dict = Depends(require_admin), +) -> ORJSONResponse: + """Start ``decnet-.service`` via systemd. + + Unlike STOP (which is bus-based — the worker signals itself), START + has to come from *outside* the worker since a stopped worker has no + subscriber. The API shells out to ``systemctl`` via a scoped polkit + rule. Returns 202 on acceptance; the UI then waits for the next + REFRESH to see the heartbeat flip the row to OK. + """ + if name not in KNOWN_WORKERS: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Unknown worker: {name!r}", + ) + + installed = await systemd_control.list_installed() + if name not in installed: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail=f"unit file not installed: decnet-{name}.service", + ) + + try: + await systemd_control.start(name) + except systemd_control.SystemctlError as exc: + log.exception("systemctl start %s failed: %s", name, exc.stderr) + snippet = exc.stderr.splitlines()[0] if exc.stderr else "systemctl failed" + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail=snippet[:200], + ) from exc + + log.info( + "workers: start requested worker=%s by=%s", + name, admin.get("username") or admin.get("sub") or "admin", + ) + body = WorkerControlResponse(accepted=True, worker=name, action="start") + return ORJSONResponse( + content=body.model_dump(), + status_code=status.HTTP_202_ACCEPTED, + ) diff --git a/tests/api/workers/test_start_workers.py b/tests/api/workers/test_start_workers.py new file mode 100644 index 00000000..6eb9633f --- /dev/null +++ b/tests/api/workers/test_start_workers.py @@ -0,0 +1,179 @@ +"""Tests for ``POST /api/v1/workers/{name}/start`` + ``start-all``. + +Uses the shared ``client`` / ``auth_token`` / ``viewer_token`` fixtures +from ``tests/api/conftest.py``. Stubs out ``systemd_control`` so tests +never touch real systemctl. +""" +from __future__ import annotations + +from typing import Any, Set + +import httpx +import pytest + +from decnet.web.router.workers import api_list_workers as _list +from decnet.web.router.workers import api_start_all_workers as _start_all +from decnet.web.router.workers import api_start_worker as _start +from decnet.web.services import systemd_control as _sc + + +def _patch_installed(monkeypatch: Any, names: Set[str]) -> None: + async def _stub() -> Set[str]: + return set(names) + + # Each module imported `systemd_control` directly; patch on the + # module-level attribute so all three endpoints see the stub. + for mod in (_start, _start_all, _list): + monkeypatch.setattr(mod.systemd_control, "list_installed", _stub) + + +def _patch_start(monkeypatch: Any, *, raises: _sc.SystemctlError | None = None) -> list[str]: + calls: list[str] = [] + + async def _stub(name: str) -> None: + calls.append(name) + if raises is not None: + raise raises + + monkeypatch.setattr(_sc, "start", _stub) + return calls + + +def _patch_is_active(monkeypatch: Any, active: Set[str]) -> None: + async def _stub(name: str) -> bool: + return name in active + + monkeypatch.setattr(_sc, "is_active", _stub) + + +@pytest.mark.asyncio +async def test_start_worker_admin_happy_path( + client: httpx.AsyncClient, auth_token: str, monkeypatch, +) -> None: + _patch_installed(monkeypatch, {"mutator", "bus"}) + calls = _patch_start(monkeypatch) + resp = await client.post( + "/api/v1/workers/mutator/start", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 202 + body = resp.json() + assert body == {"accepted": True, "worker": "mutator", "action": "start"} + assert calls == ["mutator"] + + +@pytest.mark.asyncio +async def test_start_worker_viewer_forbidden( + client: httpx.AsyncClient, viewer_token: str, monkeypatch, +) -> None: + _patch_installed(monkeypatch, {"mutator"}) + _patch_start(monkeypatch) + resp = await client.post( + "/api/v1/workers/mutator/start", + headers={"Authorization": f"Bearer {viewer_token}"}, + ) + assert resp.status_code == 403 + + +@pytest.mark.asyncio +async def test_start_worker_unknown_name_404( + client: httpx.AsyncClient, auth_token: str, monkeypatch, +) -> None: + _patch_installed(monkeypatch, {"mutator"}) + _patch_start(monkeypatch) + resp = await client.post( + "/api/v1/workers/nosuch/start", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 404 + + +@pytest.mark.asyncio +async def test_start_worker_not_installed_503( + client: httpx.AsyncClient, auth_token: str, monkeypatch, +) -> None: + _patch_installed(monkeypatch, set()) # nothing installed + _patch_start(monkeypatch) + resp = await client.post( + "/api/v1/workers/mutator/start", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 503 + + +@pytest.mark.asyncio +async def test_start_worker_systemctl_failure_502( + client: httpx.AsyncClient, auth_token: str, monkeypatch, +) -> None: + _patch_installed(monkeypatch, {"mutator"}) + err = _sc.SystemctlError( + unit="decnet-mutator.service", + returncode=1, + stderr="Failed to start decnet-mutator.service: unit not found", + ) + _patch_start(monkeypatch, raises=err) + resp = await client.post( + "/api/v1/workers/mutator/start", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 502 + body = resp.json() + assert "not found" in body["detail"] + + +@pytest.mark.asyncio +async def test_start_all_aggregates_success_running_and_failure( + client: httpx.AsyncClient, auth_token: str, monkeypatch, +) -> None: + _patch_installed(monkeypatch, {"bus", "api", "mutator"}) + _patch_is_active(monkeypatch, {"bus"}) # bus is already running + + async def _stub_start(name: str) -> None: + if name == "mutator": + raise _sc.SystemctlError( + unit="decnet-mutator.service", + returncode=1, + stderr="Unit decnet-mutator.service is masked.", + ) + # api starts cleanly + + monkeypatch.setattr(_sc, "start", _stub_start) + + resp = await client.post( + "/api/v1/workers/start-all", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 200 + body = resp.json() + assert body["already_running"] == ["bus"] + assert body["started"] == ["api"] + assert len(body["failed"]) == 1 + assert body["failed"][0]["name"] == "mutator" + assert "masked" in body["failed"][0]["reason"] + + +@pytest.mark.asyncio +async def test_start_all_viewer_forbidden( + client: httpx.AsyncClient, viewer_token: str, monkeypatch, +) -> None: + _patch_installed(monkeypatch, {"bus"}) + resp = await client.post( + "/api/v1/workers/start-all", + headers={"Authorization": f"Bearer {viewer_token}"}, + ) + assert resp.status_code == 403 + + +@pytest.mark.asyncio +async def test_start_all_skips_uninstalled( + client: httpx.AsyncClient, auth_token: str, monkeypatch, +) -> None: + _patch_installed(monkeypatch, set()) # no units installed + _patch_is_active(monkeypatch, set()) + _patch_start(monkeypatch) + resp = await client.post( + "/api/v1/workers/start-all", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 200 + assert resp.json() == {"started": [], "already_running": [], "failed": []}