diff --git a/decnet/supervisor.py b/decnet/supervisor.py new file mode 100644 index 00000000..127f5459 --- /dev/null +++ b/decnet/supervisor.py @@ -0,0 +1,92 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""In-process worker supervision — host several worker coroutines in one process +without losing per-worker fault isolation. + +This is the consolidation primitive for DECNET 1.1 (see +``development/RELEASE-1.1.md``). It deliberately does NOT use +``asyncio.TaskGroup``: TaskGroup cancels every sibling when one task raises, +which is the opposite of worker isolation. Instead each worker runs in its own +``supervise()`` restart loop — the in-process equivalent of systemd +``Restart=on-failure`` with exponential backoff — and the loops are run +concurrently so one crashing worker never takes down the others. +""" +from __future__ import annotations + +import asyncio +import logging +import signal +from collections.abc import Awaitable, Callable + +log = logging.getLogger("decnet.supervisor") + +# A worker is anything that returns a fresh awaitable each time it's (re)started. +WorkerFactory = Callable[[], Awaitable[None]] + + +async def supervise( + name: str, factory: WorkerFactory, *, max_backoff: float = 30.0 +) -> None: + """Run one worker, restarting it with exponential backoff if it crashes. + + - A raised exception → log, sleep (capped backoff), restart. + - A clean return → stop supervising (the worker decided it was done). + - Cancellation (group shutdown) → propagate, do not restart. + """ + backoff = 1.0 + while True: + try: + await factory() + except asyncio.CancelledError: + log.info("worker %s cancelled; stopping", name) + raise + except Exception: + log.exception( + "worker %s crashed; restarting in %.0fs", name, backoff + ) + try: + await asyncio.sleep(backoff) + except asyncio.CancelledError: + raise + backoff = min(backoff * 2.0, max_backoff) + else: + log.info("worker %s exited cleanly; not restarting", name) + return + + +async def run_group( + specs: list[tuple[str, WorkerFactory]], + *, + stop: asyncio.Event | None = None, + install_signals: bool = True, +) -> None: + """Host a group of workers as independently-supervised concurrent tasks. + + Returns when ``stop`` is set (SIGTERM/SIGINT install it by default), at which + point every worker task is cancelled and awaited. A worker that exits or + crashes on its own never cancels its siblings — that is the whole point. + """ + if not specs: + return + if stop is None: + stop = asyncio.Event() + if install_signals: + loop = asyncio.get_running_loop() + for sig in (signal.SIGTERM, signal.SIGINT): + try: + loop.add_signal_handler(sig, stop.set) + except (NotImplementedError, RuntimeError): # pragma: no cover + pass + + tasks = [ + asyncio.create_task(supervise(name, fac), name=name) + for name, fac in specs + ] + log.info("supervisor: hosting %d workers: %s", len(tasks), + ", ".join(n for n, _ in specs)) + try: + await stop.wait() + finally: + for t in tasks: + t.cancel() + await asyncio.gather(*tasks, return_exceptions=True) + log.info("supervisor: group shut down (%d workers)", len(tasks)) diff --git a/tests/test_supervisor.py b/tests/test_supervisor.py new file mode 100644 index 00000000..70b5e8ee --- /dev/null +++ b/tests/test_supervisor.py @@ -0,0 +1,104 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""Tests for the in-process worker supervisor (DECNET 1.1 consolidation).""" +from __future__ import annotations + +import asyncio + +import pytest + +from decnet.supervisor import run_group, supervise + +pytestmark = pytest.mark.asyncio + + +async def test_supervise_restarts_on_crash(): + calls = [] + + async def flaky(): + calls.append(1) + if len(calls) < 3: + raise RuntimeError("boom") + # third start: block until cancelled + await asyncio.Event().wait() + + task = asyncio.create_task(supervise("flaky", flaky, max_backoff=0.01)) + # let it crash-restart its way to the blocking third start + for _ in range(200): + if len(calls) >= 3: + break + await asyncio.sleep(0.005) + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + assert len(calls) == 3 # crashed twice, restarted, then stuck on the 3rd + + +async def test_supervise_clean_exit_does_not_restart(): + calls = [] + + async def one_shot(): + calls.append(1) + + await asyncio.wait_for(supervise("once", one_shot), timeout=1.0) + assert calls == [1] # returned cleanly, no restart loop + + +async def test_one_worker_crash_does_not_kill_siblings(): + survivor_ticks = [] + crash_count = [] + + async def survivor(): + while True: + survivor_ticks.append(1) + await asyncio.sleep(0.005) + + async def crasher(): + crash_count.append(1) + raise RuntimeError("crash") + + stop = asyncio.Event() + group = asyncio.create_task( + run_group( + [("survivor", survivor), ("crasher", crasher)], + stop=stop, + install_signals=False, + ) + ) + await asyncio.sleep(0.1) + # survivor kept ticking despite crasher dying — the isolation property. + # (restart/backoff timing is covered by test_supervise_restarts_on_crash) + assert len(survivor_ticks) > 3 + assert len(crash_count) >= 1 + stop.set() + await asyncio.wait_for(group, timeout=1.0) + + +async def test_run_group_shutdown_cancels_all(): + running = {"a": False, "b": False} + + def make(name): + async def worker(): + running[name] = True + try: + await asyncio.Event().wait() + finally: + running[name] = False + return worker + + stop = asyncio.Event() + group = asyncio.create_task( + run_group( + [("a", make("a")), ("b", make("b"))], + stop=stop, + install_signals=False, + ) + ) + await asyncio.sleep(0.05) + assert running == {"a": True, "b": True} + stop.set() + await asyncio.wait_for(group, timeout=1.0) + assert running == {"a": False, "b": False} # finally blocks ran → clean cancel + + +async def test_empty_group_returns_immediately(): + await asyncio.wait_for(run_group([], install_signals=False), timeout=1.0)