Files
DECNET/decnet/prefork.py
anti 74096b6df0 feat(1.2): prefork supervisor primitive + tests (C, CoW gate passed)
CoW measurement on CPython 3.14: forked idle child keeps ~71MB shared,
dirties ~1MB private; working child ~26MB. PEP 683 immortal objects keep
code/module pages clean so gc.freeze() is unnecessary (freeze==nofreeze).

prefork.run_fleet: master imports the base floor once, forks one child
per worker (own process/GIL, CoW-shared floor), reaps + restarts with
backoff, graceful SIGTERM->SIGKILL shutdown. Not yet wired to a command
(that lands when 1.2 picks the target worker set).
2026-06-18 19:24:15 -04:00

141 lines
5.0 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""Prefork supervisor — import the base floor ONCE in a master, then fork one
child process per worker. Children share the ~70 MB import floor via
copy-on-write.
Measured on CPython 3.14 (development/cow_probe.py): an idle forked child keeps
~71 MB shared and dirties only ~1 MB private; a working child dirties ~26 MB
(its own heap, not the floor). PEP 683 immortal objects keep module/code pages
clean, so the classic refcount-dirties-CoW problem does not bite and gc.freeze()
is unnecessary on 3.14.
Contrast with :mod:`decnet.supervisor` (asyncio tasks in ONE process, shared
GIL): use that for cheap co-resident IO workers. Use prefork for workers that
must keep their OWN process / GIL — CPU-heavy or isolation-critical — but
shouldn't each re-import the world.
Each worker spec is a zero-arg callable that BLOCKS running the worker (e.g.
``lambda: asyncio.run(profiler_worker(repo))``). It executes in the forked
child; the master only forks, reaps, and restarts.
"""
from __future__ import annotations
import logging
import os
import signal
import time
from collections.abc import Callable
log = logging.getLogger("decnet.prefork")
WorkerEntry = Callable[[], None]
def run_fleet(
specs: dict[str, WorkerEntry],
*,
max_backoff: float = 30.0,
poll_interval: float = 0.2,
stop_after: float | None = None,
) -> None:
"""Fork one child per worker and supervise them until SIGTERM/SIGINT.
A dead child is re-forked after exponential backoff (in-process
``Restart=on-failure``). Backoff is tracked per worker and scheduled
non-blockingly, so one worker's restart delay never stalls reaping of
another. On shutdown, children get SIGTERM, then SIGKILL after a grace
period.
``stop_after`` (seconds) is a test hook: cleanly shut the fleet down after
that long instead of waiting for a signal.
"""
if not specs:
return
children: dict[int, str] = {} # pid -> name
backoff: dict[str, float] = {n: 1.0 for n in specs}
due: dict[str, float] = {} # name -> earliest restart time
stopping = {"flag": False}
def _request_stop(_signum: int, _frame: object) -> None:
stopping["flag"] = True
signal.signal(signal.SIGTERM, _request_stop)
signal.signal(signal.SIGINT, _request_stop)
def spawn(name: str) -> None:
pid = os.fork()
if pid == 0: # ---- child ----
# Restore default signal handling so the worker's own asyncio
# handlers (or KeyboardInterrupt) work as if launched standalone.
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
try:
specs[name]()
except KeyboardInterrupt:
pass
except BaseException: # noqa: BLE001 — last-resort child logging
log.exception("prefork: worker %s raised", name)
os._exit(1)
os._exit(0)
children[pid] = name # ---- parent ----
log.info("prefork: spawned %s pid=%d", name, pid)
log.info("prefork: master pid=%d forking %d workers: %s",
os.getpid(), len(specs), ", ".join(specs))
for name in specs:
spawn(name)
deadline = (time.monotonic() + stop_after) if stop_after is not None else None
while not stopping["flag"]:
if deadline is not None and time.monotonic() >= deadline:
break
now = time.monotonic()
# Restart any workers whose backoff has elapsed.
for name in [n for n, t in due.items() if now >= t]:
del due[name]
spawn(name)
# Reap without blocking so concurrent crashes are all handled.
try:
pid, status = os.waitpid(-1, os.WNOHANG)
except ChildProcessError:
pid = 0
if pid == 0:
time.sleep(poll_interval)
continue
name = children.pop(pid, None)
if name is None:
continue
code = os.waitstatus_to_exitcode(status)
log.warning("prefork: %s (pid=%d) exited code=%d; restart in %.0fs",
name, pid, code, backoff[name])
due[name] = time.monotonic() + backoff[name]
backoff[name] = min(backoff[name] * 2.0, max_backoff)
_shutdown(children)
def _shutdown(children: dict[int, str], *, grace: float = 15.0) -> None:
"""SIGTERM all children, reap within ``grace``, SIGKILL stragglers."""
for pid in list(children):
try:
os.kill(pid, signal.SIGTERM)
except ProcessLookupError:
children.pop(pid, None)
deadline = time.monotonic() + grace
while children and time.monotonic() < deadline:
try:
pid, _ = os.waitpid(-1, os.WNOHANG)
except ChildProcessError:
break
if pid:
children.pop(pid, None)
else:
time.sleep(0.1)
for pid in list(children):
try:
os.kill(pid, signal.SIGKILL)
except ProcessLookupError:
pass
log.info("prefork: fleet shut down")