From fcaac648a41c8962a0bdd713ad50d7ea7d6598fb Mon Sep 17 00:00:00 2001 From: anti Date: Wed, 22 Apr 2026 14:08:35 -0400 Subject: [PATCH] feat(web): add systemd_control helper for worker unit management MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thin async wrapper over `systemctl` — never shell=True, always create_subprocess_exec. Unit names are built from `decnet-.service`; the regex check is defence in depth on top of the router-level KNOWN_WORKERS validation. Exposes start / stop / is_active / list_installed; last is cached for 30s to keep the Workers panel cheap under REFRESH spam. On non-systemd hosts list_installed returns an empty set, so the UI renders with every row marked not-installed instead of 500-ing. --- decnet/web/services/__init__.py | 0 decnet/web/services/systemd_control.py | 136 +++++++++++++++++++++ tests/web/services/__init__.py | 0 tests/web/services/test_systemd_control.py | 133 ++++++++++++++++++++ 4 files changed, 269 insertions(+) create mode 100644 decnet/web/services/__init__.py create mode 100644 decnet/web/services/systemd_control.py create mode 100644 tests/web/services/__init__.py create mode 100644 tests/web/services/test_systemd_control.py diff --git a/decnet/web/services/__init__.py b/decnet/web/services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/decnet/web/services/systemd_control.py b/decnet/web/services/systemd_control.py new file mode 100644 index 00000000..2cba388f --- /dev/null +++ b/decnet/web/services/systemd_control.py @@ -0,0 +1,136 @@ +"""Thin async wrapper over ``systemctl`` for DECNET worker units. + +The API process runs as the unprivileged ``decnet`` user and delegates +the actual start/stop to systemd via a scoped polkit rule (see +``deploy/polkit/50-decnet-workers.rules``). This module keeps the shell +surface area minimal: + +* Unit names are always ``decnet-.service`` — callers pass the + bare worker name and the ``.service`` suffix is bolted on here. +* ``asyncio.create_subprocess_exec`` — never ``shell=True``. Worker + names are also validated at the router boundary against + :data:`decnet.web.worker_registry.KNOWN_WORKERS`; the extra regex + check here is defence in depth. +* ``list_installed()`` results are cached for 30 seconds to keep the + status endpoint cheap under repeated REFRESH clicks. +""" +from __future__ import annotations + +import asyncio +import re +import time +from typing import Set + +from decnet.logging import get_logger + +log = get_logger("web.systemd_control") + +_UNIT_NAME_RE = re.compile(r"^[a-z][a-z0-9_-]*$") +_LIST_CACHE_TTL = 30.0 + +_cache: Set[str] | None = None +_cache_ts: float = 0.0 + + +class SystemctlError(RuntimeError): + """Non-zero exit from ``systemctl``. Carries returncode + stderr.""" + + def __init__(self, unit: str, returncode: int, stderr: str) -> None: + self.unit = unit + self.returncode = returncode + self.stderr = stderr + super().__init__( + f"systemctl failed on {unit}: rc={returncode} stderr={stderr!r}" + ) + + +def _unit(name: str) -> str: + if not _UNIT_NAME_RE.match(name): + raise ValueError(f"invalid worker name: {name!r}") + return f"decnet-{name}.service" + + +async def _run(*argv: str) -> tuple[int, str, str]: + proc = await asyncio.create_subprocess_exec( + *argv, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout_b, stderr_b = await proc.communicate() + return ( + proc.returncode if proc.returncode is not None else -1, + stdout_b.decode("utf-8", "replace"), + stderr_b.decode("utf-8", "replace"), + ) + + +async def _systemctl(verb: str, name: str) -> None: + unit = _unit(name) + rc, _, stderr = await _run("systemctl", verb, unit) + if rc != 0: + log.warning("systemctl %s %s failed: rc=%s stderr=%s", verb, unit, rc, stderr.strip()) + raise SystemctlError(unit=unit, returncode=rc, stderr=stderr.strip()) + + +async def start(name: str) -> None: + """Start ``decnet-.service``. Raises :class:`SystemctlError`.""" + await _systemctl("start", name) + + +async def stop(name: str) -> None: + """Stop ``decnet-.service``. Raises :class:`SystemctlError`. + + Unused in v1 (bus-based STOP is authoritative) but kept for parity + so the supervisor contract is symmetric. + """ + await _systemctl("stop", name) + + +async def is_active(name: str) -> bool: + """Return True iff ``systemctl is-active`` reports ``active``. + + ``is-active`` exits non-zero for inactive / failed / unknown units; + that is **not** an error here — it's a signal. + """ + unit = _unit(name) + _, stdout, _ = await _run("systemctl", "is-active", unit) + return stdout.strip() == "active" + + +async def list_installed(*, force: bool = False) -> Set[str]: + """Return the set of worker names with unit files installed. + + Parses ``systemctl list-unit-files 'decnet-*.service' --no-legend`` + and strips the ``decnet-`` prefix + ``.service`` suffix off each + line. Cached for :data:`_LIST_CACHE_TTL` seconds; pass + ``force=True`` to bypass. + """ + global _cache, _cache_ts + now = time.time() + if not force and _cache is not None and (now - _cache_ts) < _LIST_CACHE_TTL: + return set(_cache) + rc, stdout, stderr = await _run( + "systemctl", "list-unit-files", "decnet-*.service", "--no-legend", + ) + if rc != 0: + # systemd missing / non-systemd host — treat as "nothing installed" + # and keep the UI rendering. Cache the empty result so we don't + # hammer the failing binary on every refresh. + log.info("list-unit-files failed (treating as empty): rc=%s stderr=%s", rc, stderr.strip()) + _cache = set() + _cache_ts = now + return set() + names: Set[str] = set() + for line in stdout.splitlines(): + token = line.split(None, 1)[0] if line.strip() else "" + if token.startswith("decnet-") and token.endswith(".service"): + names.add(token[len("decnet-"):-len(".service")]) + _cache = names + _cache_ts = now + return set(names) + + +def reset_cache_for_tests() -> None: + global _cache, _cache_ts + _cache = None + _cache_ts = 0.0 diff --git a/tests/web/services/__init__.py b/tests/web/services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/web/services/test_systemd_control.py b/tests/web/services/test_systemd_control.py new file mode 100644 index 00000000..d3e44697 --- /dev/null +++ b/tests/web/services/test_systemd_control.py @@ -0,0 +1,133 @@ +"""Unit tests for :mod:`decnet.web.services.systemd_control`. + +These tests monkeypatch :func:`asyncio.create_subprocess_exec` so we +never touch real ``systemctl``. The contract under test is: + +* argv shape — ``["systemctl", , "decnet-.service"]`` +* non-zero return ⇒ :class:`SystemctlError` with returncode + stderr +* ``list_installed`` parses ``list-unit-files`` output into a name set +* cache honours the 30s TTL +""" +from __future__ import annotations + +import asyncio +from typing import Any, List, Tuple + +import pytest + +from decnet.web.services import systemd_control as sc + + +class _FakeProc: + def __init__(self, returncode: int, stdout: bytes, stderr: bytes) -> None: + self.returncode = returncode + self._stdout = stdout + self._stderr = stderr + + async def communicate(self) -> Tuple[bytes, bytes]: + return self._stdout, self._stderr + + +def _patch_exec(monkeypatch: Any, *, rc: int = 0, stdout: bytes = b"", stderr: bytes = b"") -> List[tuple]: + calls: List[tuple] = [] + + async def fake_exec(*argv: str, **_kwargs: Any) -> _FakeProc: + calls.append(argv) + return _FakeProc(rc, stdout, stderr) + + monkeypatch.setattr(asyncio, "create_subprocess_exec", fake_exec) + return calls + + +@pytest.fixture(autouse=True) +def _reset_cache() -> None: + sc.reset_cache_for_tests() + yield + sc.reset_cache_for_tests() + + +@pytest.mark.asyncio +async def test_start_builds_correct_argv(monkeypatch: Any) -> None: + calls = _patch_exec(monkeypatch, rc=0) + await sc.start("mutator") + assert calls == [("systemctl", "start", "decnet-mutator.service")] + + +@pytest.mark.asyncio +async def test_stop_builds_correct_argv(monkeypatch: Any) -> None: + calls = _patch_exec(monkeypatch, rc=0) + await sc.stop("sniffer") + assert calls == [("systemctl", "stop", "decnet-sniffer.service")] + + +@pytest.mark.asyncio +async def test_start_raises_systemctl_error_on_nonzero(monkeypatch: Any) -> None: + _patch_exec(monkeypatch, rc=5, stderr=b"Unit decnet-mutator.service not found.\n") + with pytest.raises(sc.SystemctlError) as exc_info: + await sc.start("mutator") + err = exc_info.value + assert err.returncode == 5 + assert err.unit == "decnet-mutator.service" + assert "not found" in err.stderr + + +@pytest.mark.asyncio +async def test_is_active_true_when_stdout_active(monkeypatch: Any) -> None: + _patch_exec(monkeypatch, rc=0, stdout=b"active\n") + assert await sc.is_active("bus") is True + + +@pytest.mark.asyncio +async def test_is_active_false_when_inactive(monkeypatch: Any) -> None: + # systemctl exits 3 for "inactive" — is_active must treat that as a + # signal, not an error. + _patch_exec(monkeypatch, rc=3, stdout=b"inactive\n") + assert await sc.is_active("bus") is False + + +@pytest.mark.asyncio +async def test_list_installed_parses_unit_files(monkeypatch: Any) -> None: + stdout = ( + b"decnet-bus.service enabled enabled\n" + b"decnet-api.service enabled enabled\n" + b"decnet-mutator.service disabled enabled\n" + ) + _patch_exec(monkeypatch, rc=0, stdout=stdout) + names = await sc.list_installed() + assert names == {"bus", "api", "mutator"} + + +@pytest.mark.asyncio +async def test_list_installed_returns_empty_on_failure(monkeypatch: Any) -> None: + _patch_exec(monkeypatch, rc=1, stderr=b"systemctl: command not found\n") + names = await sc.list_installed() + assert names == set() + + +@pytest.mark.asyncio +async def test_list_installed_is_cached(monkeypatch: Any) -> None: + stdout = b"decnet-bus.service enabled enabled\n" + calls = _patch_exec(monkeypatch, rc=0, stdout=stdout) + await sc.list_installed() + await sc.list_installed() + await sc.list_installed() + # Three logical calls, one real subprocess invocation. + assert len(calls) == 1 + + +@pytest.mark.asyncio +async def test_list_installed_force_bypasses_cache(monkeypatch: Any) -> None: + stdout = b"decnet-bus.service enabled enabled\n" + calls = _patch_exec(monkeypatch, rc=0, stdout=stdout) + await sc.list_installed() + await sc.list_installed(force=True) + assert len(calls) == 2 + + +def test_invalid_worker_name_rejected() -> None: + with pytest.raises(ValueError): + sc._unit("../etc/passwd") + with pytest.raises(ValueError): + sc._unit("bus.service") + with pytest.raises(ValueError): + sc._unit("")