feat(workers): add start + start-all endpoints (systemd supervisor)
POST /api/v1/workers/{name}/start — 202 on acceptance, 404 unknown
worker, 503 if the unit file is not installed, 502 if systemctl
returns non-zero (stderr snippet in detail, full stack logged).
Admin only.
POST /api/v1/workers/start-all — best-effort: walks the worker list
in dependency order (bus → api → data-plane), skips already-active
and uninstalled units, aggregates outcomes into
{started, already_running, failed[]}. Returns 200 even on partial
failure; the caller reads the three lists.
Both endpoints delegate to the systemd_control helper, so the attack
surface for "what gets executed" is locked to `decnet-<validated-name>
.service` at two layers (router KNOWN_WORKERS + helper regex).
This commit is contained in:
@@ -24,6 +24,8 @@ from .config.api_reinit import router as config_reinit_router
|
||||
from .health.api_get_health import router as health_router
|
||||
from .workers.api_list_workers import router as workers_list_router
|
||||
from .workers.api_control_worker import router as workers_control_router
|
||||
from .workers.api_start_worker import router as workers_start_router
|
||||
from .workers.api_start_all_workers import router as workers_start_all_router
|
||||
from .artifacts.api_get_artifact import router as artifacts_router
|
||||
from .swarm_updates import swarm_updates_router
|
||||
from .swarm_mgmt import swarm_mgmt_router
|
||||
@@ -73,6 +75,8 @@ api_router.include_router(stream_router)
|
||||
api_router.include_router(health_router)
|
||||
api_router.include_router(workers_list_router)
|
||||
api_router.include_router(workers_control_router)
|
||||
api_router.include_router(workers_start_router)
|
||||
api_router.include_router(workers_start_all_router)
|
||||
|
||||
# Configuration
|
||||
api_router.include_router(config_get_router)
|
||||
|
||||
95
decnet/web/router/workers/api_start_all_workers.py
Normal file
95
decnet/web/router/workers/api_start_all_workers.py
Normal file
@@ -0,0 +1,95 @@
|
||||
from fastapi import APIRouter, Depends
|
||||
|
||||
from decnet.logging import get_logger
|
||||
from decnet.telemetry import traced as _traced
|
||||
from decnet.web.db.models import StartAllResponse, StartFailure
|
||||
from decnet.web.dependencies import require_admin
|
||||
from decnet.web.services import systemd_control
|
||||
from decnet.web.worker_registry import KNOWN_WORKERS
|
||||
|
||||
log = get_logger("api")
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
# Order matters — bus comes up first so subsequent workers have a place
|
||||
# to publish their heartbeats; then the API, then the data-plane set.
|
||||
# Anything unknown in KNOWN_WORKERS but not here gets appended at the
|
||||
# end so new worker names still get started even if we forget to place
|
||||
# them explicitly.
|
||||
_PREFERRED_ORDER: tuple[str, ...] = (
|
||||
"bus",
|
||||
"api",
|
||||
"collector",
|
||||
"profiler",
|
||||
"sniffer",
|
||||
"prober",
|
||||
"mutator",
|
||||
)
|
||||
|
||||
|
||||
def _ordered() -> list[str]:
|
||||
seen: set[str] = set()
|
||||
out: list[str] = []
|
||||
for name in _PREFERRED_ORDER:
|
||||
if name in KNOWN_WORKERS and name not in seen:
|
||||
out.append(name)
|
||||
seen.add(name)
|
||||
for name in KNOWN_WORKERS:
|
||||
if name not in seen:
|
||||
out.append(name)
|
||||
seen.add(name)
|
||||
return out
|
||||
|
||||
|
||||
@router.post(
|
||||
"/workers/start-all",
|
||||
response_model=StartAllResponse,
|
||||
tags=["Observability"],
|
||||
responses={
|
||||
401: {"description": "Could not validate credentials"},
|
||||
403: {"description": "Insufficient permissions"},
|
||||
},
|
||||
)
|
||||
@_traced("api.start_all_workers")
|
||||
async def start_all_workers(
|
||||
admin: dict = Depends(require_admin),
|
||||
) -> StartAllResponse:
|
||||
"""Best-effort: bring up every installed worker unit in order.
|
||||
|
||||
Workers already ``active`` are counted in ``already_running`` and
|
||||
skipped. Workers without a unit file (common on dev boxes) are
|
||||
silently skipped — the UI already renders them as not-installed.
|
||||
Returns 200 even on partial failure; the caller reads the three
|
||||
lists. Started sequentially, not in parallel: systemd dependency
|
||||
ordering (bus → api → data-plane) matters.
|
||||
"""
|
||||
installed = await systemd_control.list_installed()
|
||||
started: list[str] = []
|
||||
already_running: list[str] = []
|
||||
failed: list[StartFailure] = []
|
||||
|
||||
for name in _ordered():
|
||||
if name not in installed:
|
||||
continue
|
||||
try:
|
||||
if await systemd_control.is_active(name):
|
||||
already_running.append(name)
|
||||
continue
|
||||
await systemd_control.start(name)
|
||||
started.append(name)
|
||||
except systemd_control.SystemctlError as exc:
|
||||
snippet = (exc.stderr.splitlines() or ["systemctl failed"])[0][:200]
|
||||
failed.append(StartFailure(name=name, reason=snippet))
|
||||
log.warning("start-all: %s failed: %s", name, snippet)
|
||||
|
||||
log.info(
|
||||
"workers: start-all by=%s started=%d already=%d failed=%d",
|
||||
admin.get("username") or admin.get("sub") or "admin",
|
||||
len(started), len(already_running), len(failed),
|
||||
)
|
||||
return StartAllResponse(
|
||||
started=started,
|
||||
already_running=already_running,
|
||||
failed=failed,
|
||||
)
|
||||
72
decnet/web/router/workers/api_start_worker.py
Normal file
72
decnet/web/router/workers/api_start_worker.py
Normal file
@@ -0,0 +1,72 @@
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from fastapi.responses import ORJSONResponse
|
||||
|
||||
from decnet.logging import get_logger
|
||||
from decnet.telemetry import traced as _traced
|
||||
from decnet.web.db.models import WorkerControlResponse
|
||||
from decnet.web.dependencies import require_admin
|
||||
from decnet.web.services import systemd_control
|
||||
from decnet.web.worker_registry import KNOWN_WORKERS
|
||||
|
||||
log = get_logger("api")
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.post(
|
||||
"/workers/{name}/start",
|
||||
tags=["Observability"],
|
||||
responses={
|
||||
202: {"model": WorkerControlResponse, "description": "Start issued via systemd"},
|
||||
401: {"description": "Could not validate credentials"},
|
||||
403: {"description": "Insufficient permissions"},
|
||||
404: {"description": "Unknown worker"},
|
||||
502: {"description": "systemctl returned non-zero"},
|
||||
503: {"description": "Unit file not installed on this host"},
|
||||
},
|
||||
)
|
||||
@_traced("api.start_worker")
|
||||
async def start_worker(
|
||||
name: str,
|
||||
admin: dict = Depends(require_admin),
|
||||
) -> ORJSONResponse:
|
||||
"""Start ``decnet-<name>.service`` via systemd.
|
||||
|
||||
Unlike STOP (which is bus-based — the worker signals itself), START
|
||||
has to come from *outside* the worker since a stopped worker has no
|
||||
subscriber. The API shells out to ``systemctl`` via a scoped polkit
|
||||
rule. Returns 202 on acceptance; the UI then waits for the next
|
||||
REFRESH to see the heartbeat flip the row to OK.
|
||||
"""
|
||||
if name not in KNOWN_WORKERS:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=f"Unknown worker: {name!r}",
|
||||
)
|
||||
|
||||
installed = await systemd_control.list_installed()
|
||||
if name not in installed:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
||||
detail=f"unit file not installed: decnet-{name}.service",
|
||||
)
|
||||
|
||||
try:
|
||||
await systemd_control.start(name)
|
||||
except systemd_control.SystemctlError as exc:
|
||||
log.exception("systemctl start %s failed: %s", name, exc.stderr)
|
||||
snippet = exc.stderr.splitlines()[0] if exc.stderr else "systemctl failed"
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_502_BAD_GATEWAY,
|
||||
detail=snippet[:200],
|
||||
) from exc
|
||||
|
||||
log.info(
|
||||
"workers: start requested worker=%s by=%s",
|
||||
name, admin.get("username") or admin.get("sub") or "admin",
|
||||
)
|
||||
body = WorkerControlResponse(accepted=True, worker=name, action="start")
|
||||
return ORJSONResponse(
|
||||
content=body.model_dump(),
|
||||
status_code=status.HTTP_202_ACCEPTED,
|
||||
)
|
||||
179
tests/api/workers/test_start_workers.py
Normal file
179
tests/api/workers/test_start_workers.py
Normal file
@@ -0,0 +1,179 @@
|
||||
"""Tests for ``POST /api/v1/workers/{name}/start`` + ``start-all``.
|
||||
|
||||
Uses the shared ``client`` / ``auth_token`` / ``viewer_token`` fixtures
|
||||
from ``tests/api/conftest.py``. Stubs out ``systemd_control`` so tests
|
||||
never touch real systemctl.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Set
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
from decnet.web.router.workers import api_list_workers as _list
|
||||
from decnet.web.router.workers import api_start_all_workers as _start_all
|
||||
from decnet.web.router.workers import api_start_worker as _start
|
||||
from decnet.web.services import systemd_control as _sc
|
||||
|
||||
|
||||
def _patch_installed(monkeypatch: Any, names: Set[str]) -> None:
|
||||
async def _stub() -> Set[str]:
|
||||
return set(names)
|
||||
|
||||
# Each module imported `systemd_control` directly; patch on the
|
||||
# module-level attribute so all three endpoints see the stub.
|
||||
for mod in (_start, _start_all, _list):
|
||||
monkeypatch.setattr(mod.systemd_control, "list_installed", _stub)
|
||||
|
||||
|
||||
def _patch_start(monkeypatch: Any, *, raises: _sc.SystemctlError | None = None) -> list[str]:
|
||||
calls: list[str] = []
|
||||
|
||||
async def _stub(name: str) -> None:
|
||||
calls.append(name)
|
||||
if raises is not None:
|
||||
raise raises
|
||||
|
||||
monkeypatch.setattr(_sc, "start", _stub)
|
||||
return calls
|
||||
|
||||
|
||||
def _patch_is_active(monkeypatch: Any, active: Set[str]) -> None:
|
||||
async def _stub(name: str) -> bool:
|
||||
return name in active
|
||||
|
||||
monkeypatch.setattr(_sc, "is_active", _stub)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_worker_admin_happy_path(
|
||||
client: httpx.AsyncClient, auth_token: str, monkeypatch,
|
||||
) -> None:
|
||||
_patch_installed(monkeypatch, {"mutator", "bus"})
|
||||
calls = _patch_start(monkeypatch)
|
||||
resp = await client.post(
|
||||
"/api/v1/workers/mutator/start",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 202
|
||||
body = resp.json()
|
||||
assert body == {"accepted": True, "worker": "mutator", "action": "start"}
|
||||
assert calls == ["mutator"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_worker_viewer_forbidden(
|
||||
client: httpx.AsyncClient, viewer_token: str, monkeypatch,
|
||||
) -> None:
|
||||
_patch_installed(monkeypatch, {"mutator"})
|
||||
_patch_start(monkeypatch)
|
||||
resp = await client.post(
|
||||
"/api/v1/workers/mutator/start",
|
||||
headers={"Authorization": f"Bearer {viewer_token}"},
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_worker_unknown_name_404(
|
||||
client: httpx.AsyncClient, auth_token: str, monkeypatch,
|
||||
) -> None:
|
||||
_patch_installed(monkeypatch, {"mutator"})
|
||||
_patch_start(monkeypatch)
|
||||
resp = await client.post(
|
||||
"/api/v1/workers/nosuch/start",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 404
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_worker_not_installed_503(
|
||||
client: httpx.AsyncClient, auth_token: str, monkeypatch,
|
||||
) -> None:
|
||||
_patch_installed(monkeypatch, set()) # nothing installed
|
||||
_patch_start(monkeypatch)
|
||||
resp = await client.post(
|
||||
"/api/v1/workers/mutator/start",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 503
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_worker_systemctl_failure_502(
|
||||
client: httpx.AsyncClient, auth_token: str, monkeypatch,
|
||||
) -> None:
|
||||
_patch_installed(monkeypatch, {"mutator"})
|
||||
err = _sc.SystemctlError(
|
||||
unit="decnet-mutator.service",
|
||||
returncode=1,
|
||||
stderr="Failed to start decnet-mutator.service: unit not found",
|
||||
)
|
||||
_patch_start(monkeypatch, raises=err)
|
||||
resp = await client.post(
|
||||
"/api/v1/workers/mutator/start",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 502
|
||||
body = resp.json()
|
||||
assert "not found" in body["detail"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_all_aggregates_success_running_and_failure(
|
||||
client: httpx.AsyncClient, auth_token: str, monkeypatch,
|
||||
) -> None:
|
||||
_patch_installed(monkeypatch, {"bus", "api", "mutator"})
|
||||
_patch_is_active(monkeypatch, {"bus"}) # bus is already running
|
||||
|
||||
async def _stub_start(name: str) -> None:
|
||||
if name == "mutator":
|
||||
raise _sc.SystemctlError(
|
||||
unit="decnet-mutator.service",
|
||||
returncode=1,
|
||||
stderr="Unit decnet-mutator.service is masked.",
|
||||
)
|
||||
# api starts cleanly
|
||||
|
||||
monkeypatch.setattr(_sc, "start", _stub_start)
|
||||
|
||||
resp = await client.post(
|
||||
"/api/v1/workers/start-all",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
body = resp.json()
|
||||
assert body["already_running"] == ["bus"]
|
||||
assert body["started"] == ["api"]
|
||||
assert len(body["failed"]) == 1
|
||||
assert body["failed"][0]["name"] == "mutator"
|
||||
assert "masked" in body["failed"][0]["reason"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_all_viewer_forbidden(
|
||||
client: httpx.AsyncClient, viewer_token: str, monkeypatch,
|
||||
) -> None:
|
||||
_patch_installed(monkeypatch, {"bus"})
|
||||
resp = await client.post(
|
||||
"/api/v1/workers/start-all",
|
||||
headers={"Authorization": f"Bearer {viewer_token}"},
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_all_skips_uninstalled(
|
||||
client: httpx.AsyncClient, auth_token: str, monkeypatch,
|
||||
) -> None:
|
||||
_patch_installed(monkeypatch, set()) # no units installed
|
||||
_patch_is_active(monkeypatch, set())
|
||||
_patch_start(monkeypatch)
|
||||
resp = await client.post(
|
||||
"/api/v1/workers/start-all",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json() == {"started": [], "already_running": [], "failed": []}
|
||||
Reference in New Issue
Block a user