feat(1.1): in-process worker supervision primitive (C5)
supervise(): per-worker restart loop with exponential backoff (in-process Restart=on-failure). run_group(): hosts workers as concurrent independently- supervised tasks — one crash never cancels siblings (deliberately NOT asyncio.TaskGroup, whose all-or-nothing cancel breaks isolation). SIGTERM/ SIGINT → graceful cancel-and-await. Tests cover restart, clean-exit, crash-isolation, shutdown, empty group.
This commit is contained in:
92
decnet/supervisor.py
Normal file
92
decnet/supervisor.py
Normal file
@@ -0,0 +1,92 @@
|
|||||||
|
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||||
|
"""In-process worker supervision — host several worker coroutines in one process
|
||||||
|
without losing per-worker fault isolation.
|
||||||
|
|
||||||
|
This is the consolidation primitive for DECNET 1.1 (see
|
||||||
|
``development/RELEASE-1.1.md``). It deliberately does NOT use
|
||||||
|
``asyncio.TaskGroup``: TaskGroup cancels every sibling when one task raises,
|
||||||
|
which is the opposite of worker isolation. Instead each worker runs in its own
|
||||||
|
``supervise()`` restart loop — the in-process equivalent of systemd
|
||||||
|
``Restart=on-failure`` with exponential backoff — and the loops are run
|
||||||
|
concurrently so one crashing worker never takes down the others.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import signal
|
||||||
|
from collections.abc import Awaitable, Callable
|
||||||
|
|
||||||
|
log = logging.getLogger("decnet.supervisor")
|
||||||
|
|
||||||
|
# A worker is anything that returns a fresh awaitable each time it's (re)started.
|
||||||
|
WorkerFactory = Callable[[], Awaitable[None]]
|
||||||
|
|
||||||
|
|
||||||
|
async def supervise(
|
||||||
|
name: str, factory: WorkerFactory, *, max_backoff: float = 30.0
|
||||||
|
) -> None:
|
||||||
|
"""Run one worker, restarting it with exponential backoff if it crashes.
|
||||||
|
|
||||||
|
- A raised exception → log, sleep (capped backoff), restart.
|
||||||
|
- A clean return → stop supervising (the worker decided it was done).
|
||||||
|
- Cancellation (group shutdown) → propagate, do not restart.
|
||||||
|
"""
|
||||||
|
backoff = 1.0
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
await factory()
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
log.info("worker %s cancelled; stopping", name)
|
||||||
|
raise
|
||||||
|
except Exception:
|
||||||
|
log.exception(
|
||||||
|
"worker %s crashed; restarting in %.0fs", name, backoff
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
await asyncio.sleep(backoff)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
|
backoff = min(backoff * 2.0, max_backoff)
|
||||||
|
else:
|
||||||
|
log.info("worker %s exited cleanly; not restarting", name)
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
async def run_group(
|
||||||
|
specs: list[tuple[str, WorkerFactory]],
|
||||||
|
*,
|
||||||
|
stop: asyncio.Event | None = None,
|
||||||
|
install_signals: bool = True,
|
||||||
|
) -> None:
|
||||||
|
"""Host a group of workers as independently-supervised concurrent tasks.
|
||||||
|
|
||||||
|
Returns when ``stop`` is set (SIGTERM/SIGINT install it by default), at which
|
||||||
|
point every worker task is cancelled and awaited. A worker that exits or
|
||||||
|
crashes on its own never cancels its siblings — that is the whole point.
|
||||||
|
"""
|
||||||
|
if not specs:
|
||||||
|
return
|
||||||
|
if stop is None:
|
||||||
|
stop = asyncio.Event()
|
||||||
|
if install_signals:
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
for sig in (signal.SIGTERM, signal.SIGINT):
|
||||||
|
try:
|
||||||
|
loop.add_signal_handler(sig, stop.set)
|
||||||
|
except (NotImplementedError, RuntimeError): # pragma: no cover
|
||||||
|
pass
|
||||||
|
|
||||||
|
tasks = [
|
||||||
|
asyncio.create_task(supervise(name, fac), name=name)
|
||||||
|
for name, fac in specs
|
||||||
|
]
|
||||||
|
log.info("supervisor: hosting %d workers: %s", len(tasks),
|
||||||
|
", ".join(n for n, _ in specs))
|
||||||
|
try:
|
||||||
|
await stop.wait()
|
||||||
|
finally:
|
||||||
|
for t in tasks:
|
||||||
|
t.cancel()
|
||||||
|
await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
log.info("supervisor: group shut down (%d workers)", len(tasks))
|
||||||
104
tests/test_supervisor.py
Normal file
104
tests/test_supervisor.py
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||||
|
"""Tests for the in-process worker supervisor (DECNET 1.1 consolidation)."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from decnet.supervisor import run_group, supervise
|
||||||
|
|
||||||
|
pytestmark = pytest.mark.asyncio
|
||||||
|
|
||||||
|
|
||||||
|
async def test_supervise_restarts_on_crash():
|
||||||
|
calls = []
|
||||||
|
|
||||||
|
async def flaky():
|
||||||
|
calls.append(1)
|
||||||
|
if len(calls) < 3:
|
||||||
|
raise RuntimeError("boom")
|
||||||
|
# third start: block until cancelled
|
||||||
|
await asyncio.Event().wait()
|
||||||
|
|
||||||
|
task = asyncio.create_task(supervise("flaky", flaky, max_backoff=0.01))
|
||||||
|
# let it crash-restart its way to the blocking third start
|
||||||
|
for _ in range(200):
|
||||||
|
if len(calls) >= 3:
|
||||||
|
break
|
||||||
|
await asyncio.sleep(0.005)
|
||||||
|
task.cancel()
|
||||||
|
with pytest.raises(asyncio.CancelledError):
|
||||||
|
await task
|
||||||
|
assert len(calls) == 3 # crashed twice, restarted, then stuck on the 3rd
|
||||||
|
|
||||||
|
|
||||||
|
async def test_supervise_clean_exit_does_not_restart():
|
||||||
|
calls = []
|
||||||
|
|
||||||
|
async def one_shot():
|
||||||
|
calls.append(1)
|
||||||
|
|
||||||
|
await asyncio.wait_for(supervise("once", one_shot), timeout=1.0)
|
||||||
|
assert calls == [1] # returned cleanly, no restart loop
|
||||||
|
|
||||||
|
|
||||||
|
async def test_one_worker_crash_does_not_kill_siblings():
|
||||||
|
survivor_ticks = []
|
||||||
|
crash_count = []
|
||||||
|
|
||||||
|
async def survivor():
|
||||||
|
while True:
|
||||||
|
survivor_ticks.append(1)
|
||||||
|
await asyncio.sleep(0.005)
|
||||||
|
|
||||||
|
async def crasher():
|
||||||
|
crash_count.append(1)
|
||||||
|
raise RuntimeError("crash")
|
||||||
|
|
||||||
|
stop = asyncio.Event()
|
||||||
|
group = asyncio.create_task(
|
||||||
|
run_group(
|
||||||
|
[("survivor", survivor), ("crasher", crasher)],
|
||||||
|
stop=stop,
|
||||||
|
install_signals=False,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
# survivor kept ticking despite crasher dying — the isolation property.
|
||||||
|
# (restart/backoff timing is covered by test_supervise_restarts_on_crash)
|
||||||
|
assert len(survivor_ticks) > 3
|
||||||
|
assert len(crash_count) >= 1
|
||||||
|
stop.set()
|
||||||
|
await asyncio.wait_for(group, timeout=1.0)
|
||||||
|
|
||||||
|
|
||||||
|
async def test_run_group_shutdown_cancels_all():
|
||||||
|
running = {"a": False, "b": False}
|
||||||
|
|
||||||
|
def make(name):
|
||||||
|
async def worker():
|
||||||
|
running[name] = True
|
||||||
|
try:
|
||||||
|
await asyncio.Event().wait()
|
||||||
|
finally:
|
||||||
|
running[name] = False
|
||||||
|
return worker
|
||||||
|
|
||||||
|
stop = asyncio.Event()
|
||||||
|
group = asyncio.create_task(
|
||||||
|
run_group(
|
||||||
|
[("a", make("a")), ("b", make("b"))],
|
||||||
|
stop=stop,
|
||||||
|
install_signals=False,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
await asyncio.sleep(0.05)
|
||||||
|
assert running == {"a": True, "b": True}
|
||||||
|
stop.set()
|
||||||
|
await asyncio.wait_for(group, timeout=1.0)
|
||||||
|
assert running == {"a": False, "b": False} # finally blocks ran → clean cancel
|
||||||
|
|
||||||
|
|
||||||
|
async def test_empty_group_returns_immediately():
|
||||||
|
await asyncio.wait_for(run_group([], install_signals=False), timeout=1.0)
|
||||||
Reference in New Issue
Block a user