feat(web): add systemd_control helper for worker unit management

Thin async wrapper over `systemctl` — never shell=True, always
create_subprocess_exec. Unit names are built from
`decnet-<validated-name>.service`; the regex check is defence in depth
on top of the router-level KNOWN_WORKERS validation.

Exposes start / stop / is_active / list_installed; last is cached for
30s to keep the Workers panel cheap under REFRESH spam. On non-systemd
hosts list_installed returns an empty set, so the UI renders with
every row marked not-installed instead of 500-ing.
This commit is contained in:
2026-04-22 14:08:35 -04:00
parent a41ef52249
commit fcaac648a4
4 changed files with 269 additions and 0 deletions

View File

View File

@@ -0,0 +1,136 @@
"""Thin async wrapper over ``systemctl`` for DECNET worker units.
The API process runs as the unprivileged ``decnet`` user and delegates
the actual start/stop to systemd via a scoped polkit rule (see
``deploy/polkit/50-decnet-workers.rules``). This module keeps the shell
surface area minimal:
* Unit names are always ``decnet-<name>.service`` — callers pass the
bare worker name and the ``.service`` suffix is bolted on here.
* ``asyncio.create_subprocess_exec`` — never ``shell=True``. Worker
names are also validated at the router boundary against
:data:`decnet.web.worker_registry.KNOWN_WORKERS`; the extra regex
check here is defence in depth.
* ``list_installed()`` results are cached for 30 seconds to keep the
status endpoint cheap under repeated REFRESH clicks.
"""
from __future__ import annotations
import asyncio
import re
import time
from typing import Set
from decnet.logging import get_logger
log = get_logger("web.systemd_control")
_UNIT_NAME_RE = re.compile(r"^[a-z][a-z0-9_-]*$")
_LIST_CACHE_TTL = 30.0
_cache: Set[str] | None = None
_cache_ts: float = 0.0
class SystemctlError(RuntimeError):
"""Non-zero exit from ``systemctl``. Carries returncode + stderr."""
def __init__(self, unit: str, returncode: int, stderr: str) -> None:
self.unit = unit
self.returncode = returncode
self.stderr = stderr
super().__init__(
f"systemctl failed on {unit}: rc={returncode} stderr={stderr!r}"
)
def _unit(name: str) -> str:
if not _UNIT_NAME_RE.match(name):
raise ValueError(f"invalid worker name: {name!r}")
return f"decnet-{name}.service"
async def _run(*argv: str) -> tuple[int, str, str]:
proc = await asyncio.create_subprocess_exec(
*argv,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout_b, stderr_b = await proc.communicate()
return (
proc.returncode if proc.returncode is not None else -1,
stdout_b.decode("utf-8", "replace"),
stderr_b.decode("utf-8", "replace"),
)
async def _systemctl(verb: str, name: str) -> None:
unit = _unit(name)
rc, _, stderr = await _run("systemctl", verb, unit)
if rc != 0:
log.warning("systemctl %s %s failed: rc=%s stderr=%s", verb, unit, rc, stderr.strip())
raise SystemctlError(unit=unit, returncode=rc, stderr=stderr.strip())
async def start(name: str) -> None:
"""Start ``decnet-<name>.service``. Raises :class:`SystemctlError`."""
await _systemctl("start", name)
async def stop(name: str) -> None:
"""Stop ``decnet-<name>.service``. Raises :class:`SystemctlError`.
Unused in v1 (bus-based STOP is authoritative) but kept for parity
so the supervisor contract is symmetric.
"""
await _systemctl("stop", name)
async def is_active(name: str) -> bool:
"""Return True iff ``systemctl is-active`` reports ``active``.
``is-active`` exits non-zero for inactive / failed / unknown units;
that is **not** an error here — it's a signal.
"""
unit = _unit(name)
_, stdout, _ = await _run("systemctl", "is-active", unit)
return stdout.strip() == "active"
async def list_installed(*, force: bool = False) -> Set[str]:
"""Return the set of worker names with unit files installed.
Parses ``systemctl list-unit-files 'decnet-*.service' --no-legend``
and strips the ``decnet-`` prefix + ``.service`` suffix off each
line. Cached for :data:`_LIST_CACHE_TTL` seconds; pass
``force=True`` to bypass.
"""
global _cache, _cache_ts
now = time.time()
if not force and _cache is not None and (now - _cache_ts) < _LIST_CACHE_TTL:
return set(_cache)
rc, stdout, stderr = await _run(
"systemctl", "list-unit-files", "decnet-*.service", "--no-legend",
)
if rc != 0:
# systemd missing / non-systemd host — treat as "nothing installed"
# and keep the UI rendering. Cache the empty result so we don't
# hammer the failing binary on every refresh.
log.info("list-unit-files failed (treating as empty): rc=%s stderr=%s", rc, stderr.strip())
_cache = set()
_cache_ts = now
return set()
names: Set[str] = set()
for line in stdout.splitlines():
token = line.split(None, 1)[0] if line.strip() else ""
if token.startswith("decnet-") and token.endswith(".service"):
names.add(token[len("decnet-"):-len(".service")])
_cache = names
_cache_ts = now
return set(names)
def reset_cache_for_tests() -> None:
global _cache, _cache_ts
_cache = None
_cache_ts = 0.0

View File

View File

@@ -0,0 +1,133 @@
"""Unit tests for :mod:`decnet.web.services.systemd_control`.
These tests monkeypatch :func:`asyncio.create_subprocess_exec` so we
never touch real ``systemctl``. The contract under test is:
* argv shape — ``["systemctl", <verb>, "decnet-<name>.service"]``
* non-zero return ⇒ :class:`SystemctlError` with returncode + stderr
* ``list_installed`` parses ``list-unit-files`` output into a name set
* cache honours the 30s TTL
"""
from __future__ import annotations
import asyncio
from typing import Any, List, Tuple
import pytest
from decnet.web.services import systemd_control as sc
class _FakeProc:
def __init__(self, returncode: int, stdout: bytes, stderr: bytes) -> None:
self.returncode = returncode
self._stdout = stdout
self._stderr = stderr
async def communicate(self) -> Tuple[bytes, bytes]:
return self._stdout, self._stderr
def _patch_exec(monkeypatch: Any, *, rc: int = 0, stdout: bytes = b"", stderr: bytes = b"") -> List[tuple]:
calls: List[tuple] = []
async def fake_exec(*argv: str, **_kwargs: Any) -> _FakeProc:
calls.append(argv)
return _FakeProc(rc, stdout, stderr)
monkeypatch.setattr(asyncio, "create_subprocess_exec", fake_exec)
return calls
@pytest.fixture(autouse=True)
def _reset_cache() -> None:
sc.reset_cache_for_tests()
yield
sc.reset_cache_for_tests()
@pytest.mark.asyncio
async def test_start_builds_correct_argv(monkeypatch: Any) -> None:
calls = _patch_exec(monkeypatch, rc=0)
await sc.start("mutator")
assert calls == [("systemctl", "start", "decnet-mutator.service")]
@pytest.mark.asyncio
async def test_stop_builds_correct_argv(monkeypatch: Any) -> None:
calls = _patch_exec(monkeypatch, rc=0)
await sc.stop("sniffer")
assert calls == [("systemctl", "stop", "decnet-sniffer.service")]
@pytest.mark.asyncio
async def test_start_raises_systemctl_error_on_nonzero(monkeypatch: Any) -> None:
_patch_exec(monkeypatch, rc=5, stderr=b"Unit decnet-mutator.service not found.\n")
with pytest.raises(sc.SystemctlError) as exc_info:
await sc.start("mutator")
err = exc_info.value
assert err.returncode == 5
assert err.unit == "decnet-mutator.service"
assert "not found" in err.stderr
@pytest.mark.asyncio
async def test_is_active_true_when_stdout_active(monkeypatch: Any) -> None:
_patch_exec(monkeypatch, rc=0, stdout=b"active\n")
assert await sc.is_active("bus") is True
@pytest.mark.asyncio
async def test_is_active_false_when_inactive(monkeypatch: Any) -> None:
# systemctl exits 3 for "inactive" — is_active must treat that as a
# signal, not an error.
_patch_exec(monkeypatch, rc=3, stdout=b"inactive\n")
assert await sc.is_active("bus") is False
@pytest.mark.asyncio
async def test_list_installed_parses_unit_files(monkeypatch: Any) -> None:
stdout = (
b"decnet-bus.service enabled enabled\n"
b"decnet-api.service enabled enabled\n"
b"decnet-mutator.service disabled enabled\n"
)
_patch_exec(monkeypatch, rc=0, stdout=stdout)
names = await sc.list_installed()
assert names == {"bus", "api", "mutator"}
@pytest.mark.asyncio
async def test_list_installed_returns_empty_on_failure(monkeypatch: Any) -> None:
_patch_exec(monkeypatch, rc=1, stderr=b"systemctl: command not found\n")
names = await sc.list_installed()
assert names == set()
@pytest.mark.asyncio
async def test_list_installed_is_cached(monkeypatch: Any) -> None:
stdout = b"decnet-bus.service enabled enabled\n"
calls = _patch_exec(monkeypatch, rc=0, stdout=stdout)
await sc.list_installed()
await sc.list_installed()
await sc.list_installed()
# Three logical calls, one real subprocess invocation.
assert len(calls) == 1
@pytest.mark.asyncio
async def test_list_installed_force_bypasses_cache(monkeypatch: Any) -> None:
stdout = b"decnet-bus.service enabled enabled\n"
calls = _patch_exec(monkeypatch, rc=0, stdout=stdout)
await sc.list_installed()
await sc.list_installed(force=True)
assert len(calls) == 2
def test_invalid_worker_name_rejected() -> None:
with pytest.raises(ValueError):
sc._unit("../etc/passwd")
with pytest.raises(ValueError):
sc._unit("bus.service")
with pytest.raises(ValueError):
sc._unit("")