From 74096b6df016d8b24b66280525195b5b6ac23e79 Mon Sep 17 00:00:00 2001 From: anti Date: Thu, 18 Jun 2026 18:36:38 -0400 Subject: [PATCH] feat(1.2): prefork supervisor primitive + tests (C, CoW gate passed) CoW measurement on CPython 3.14: forked idle child keeps ~71MB shared, dirties ~1MB private; working child ~26MB. PEP 683 immortal objects keep code/module pages clean so gc.freeze() is unnecessary (freeze==nofreeze). prefork.run_fleet: master imports the base floor once, forks one child per worker (own process/GIL, CoW-shared floor), reaps + restarts with backoff, graceful SIGTERM->SIGKILL shutdown. Not yet wired to a command (that lands when 1.2 picks the target worker set). --- decnet/prefork.py | 140 ++++++++++++++++++++++++++++++++++++++++ tests/prefork_driver.py | 55 ++++++++++++++++ tests/test_prefork.py | 40 ++++++++++++ 3 files changed, 235 insertions(+) create mode 100644 decnet/prefork.py create mode 100644 tests/prefork_driver.py create mode 100644 tests/test_prefork.py diff --git a/decnet/prefork.py b/decnet/prefork.py new file mode 100644 index 00000000..50ed22fa --- /dev/null +++ b/decnet/prefork.py @@ -0,0 +1,140 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""Prefork supervisor — import the base floor ONCE in a master, then fork one +child process per worker. Children share the ~70 MB import floor via +copy-on-write. + +Measured on CPython 3.14 (development/cow_probe.py): an idle forked child keeps +~71 MB shared and dirties only ~1 MB private; a working child dirties ~26 MB +(its own heap, not the floor). PEP 683 immortal objects keep module/code pages +clean, so the classic refcount-dirties-CoW problem does not bite and gc.freeze() +is unnecessary on 3.14. + +Contrast with :mod:`decnet.supervisor` (asyncio tasks in ONE process, shared +GIL): use that for cheap co-resident IO workers. Use prefork for workers that +must keep their OWN process / GIL — CPU-heavy or isolation-critical — but +shouldn't each re-import the world. + +Each worker spec is a zero-arg callable that BLOCKS running the worker (e.g. +``lambda: asyncio.run(profiler_worker(repo))``). It executes in the forked +child; the master only forks, reaps, and restarts. +""" +from __future__ import annotations + +import logging +import os +import signal +import time +from collections.abc import Callable + +log = logging.getLogger("decnet.prefork") + +WorkerEntry = Callable[[], None] + + +def run_fleet( + specs: dict[str, WorkerEntry], + *, + max_backoff: float = 30.0, + poll_interval: float = 0.2, + stop_after: float | None = None, +) -> None: + """Fork one child per worker and supervise them until SIGTERM/SIGINT. + + A dead child is re-forked after exponential backoff (in-process + ``Restart=on-failure``). Backoff is tracked per worker and scheduled + non-blockingly, so one worker's restart delay never stalls reaping of + another. On shutdown, children get SIGTERM, then SIGKILL after a grace + period. + + ``stop_after`` (seconds) is a test hook: cleanly shut the fleet down after + that long instead of waiting for a signal. + """ + if not specs: + return + + children: dict[int, str] = {} # pid -> name + backoff: dict[str, float] = {n: 1.0 for n in specs} + due: dict[str, float] = {} # name -> earliest restart time + stopping = {"flag": False} + + def _request_stop(_signum: int, _frame: object) -> None: + stopping["flag"] = True + + signal.signal(signal.SIGTERM, _request_stop) + signal.signal(signal.SIGINT, _request_stop) + + def spawn(name: str) -> None: + pid = os.fork() + if pid == 0: # ---- child ---- + # Restore default signal handling so the worker's own asyncio + # handlers (or KeyboardInterrupt) work as if launched standalone. + signal.signal(signal.SIGTERM, signal.SIG_DFL) + signal.signal(signal.SIGINT, signal.SIG_DFL) + try: + specs[name]() + except KeyboardInterrupt: + pass + except BaseException: # noqa: BLE001 — last-resort child logging + log.exception("prefork: worker %s raised", name) + os._exit(1) + os._exit(0) + children[pid] = name # ---- parent ---- + log.info("prefork: spawned %s pid=%d", name, pid) + + log.info("prefork: master pid=%d forking %d workers: %s", + os.getpid(), len(specs), ", ".join(specs)) + for name in specs: + spawn(name) + + deadline = (time.monotonic() + stop_after) if stop_after is not None else None + while not stopping["flag"]: + if deadline is not None and time.monotonic() >= deadline: + break + now = time.monotonic() + # Restart any workers whose backoff has elapsed. + for name in [n for n, t in due.items() if now >= t]: + del due[name] + spawn(name) + # Reap without blocking so concurrent crashes are all handled. + try: + pid, status = os.waitpid(-1, os.WNOHANG) + except ChildProcessError: + pid = 0 + if pid == 0: + time.sleep(poll_interval) + continue + name = children.pop(pid, None) + if name is None: + continue + code = os.waitstatus_to_exitcode(status) + log.warning("prefork: %s (pid=%d) exited code=%d; restart in %.0fs", + name, pid, code, backoff[name]) + due[name] = time.monotonic() + backoff[name] + backoff[name] = min(backoff[name] * 2.0, max_backoff) + + _shutdown(children) + + +def _shutdown(children: dict[int, str], *, grace: float = 15.0) -> None: + """SIGTERM all children, reap within ``grace``, SIGKILL stragglers.""" + for pid in list(children): + try: + os.kill(pid, signal.SIGTERM) + except ProcessLookupError: + children.pop(pid, None) + deadline = time.monotonic() + grace + while children and time.monotonic() < deadline: + try: + pid, _ = os.waitpid(-1, os.WNOHANG) + except ChildProcessError: + break + if pid: + children.pop(pid, None) + else: + time.sleep(0.1) + for pid in list(children): + try: + os.kill(pid, signal.SIGKILL) + except ProcessLookupError: + pass + log.info("prefork: fleet shut down") diff --git a/tests/prefork_driver.py b/tests/prefork_driver.py new file mode 100644 index 00000000..d1fa3cfa --- /dev/null +++ b/tests/prefork_driver.py @@ -0,0 +1,55 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""Standalone driver for the prefork supervisor — runnable directly OR via +tests/test_prefork.py (which execs it in a subprocess so no fork happens inside +the pytest/xdist worker). + + python tests/prefork_driver.py + +Forks two fake workers under decnet.prefork.run_fleet: + * "tick" — append a line every 0.2s forever (proves a worker runs & stays up) + * "crasher" — write a marker then exit(1) (proves restart-on-crash) +Runs for ~2s via stop_after, then shuts the fleet down. Writes results into +; the caller asserts on them. +""" +from __future__ import annotations + +import os +import sys +import time + +# Running this file as a script puts its own dir (tests/) on sys.path[0], which +# shadows the stdlib `logging` via tests/logging/. Drop it before importing +# decnet (still importable — it's installed in the venv). +if sys.path and os.path.basename(sys.path[0]) == "tests": + sys.path.pop(0) + +from decnet.prefork import run_fleet # noqa: E402 + + +def main(out: str) -> None: + tick_log = os.path.join(out, "tick.log") + crash_log = os.path.join(out, "crash.log") + + def tick() -> None: + while True: + with open(tick_log, "a") as f: + f.write("t\n") + time.sleep(0.2) + + def crasher() -> None: + with open(crash_log, "a") as f: + f.write("c\n") + time.sleep(0.15) + os._exit(1) + + # Fast backoff so we observe multiple restarts inside the short window. + run_fleet( + {"tick": tick, "crasher": crasher}, + max_backoff=0.2, + poll_interval=0.05, + stop_after=2.0, + ) + + +if __name__ == "__main__": + main(sys.argv[1] if len(sys.argv) > 1 else ".") diff --git a/tests/test_prefork.py b/tests/test_prefork.py new file mode 100644 index 00000000..0c120f22 --- /dev/null +++ b/tests/test_prefork.py @@ -0,0 +1,40 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""Prefork supervisor behaviour, exercised via a subprocess driver so no fork +happens inside the pytest/xdist worker (which would be unsafe). + +Proves: workers fork and run, a crashing worker is restarted with backoff, and +the fleet shuts down cleanly (stop_after returns, no orphaned children). +""" +from __future__ import annotations + +import pathlib +import subprocess +import sys + + +def test_prefork_runs_and_restarts(tmp_path: pathlib.Path): + driver = pathlib.Path(__file__).parent / "prefork_driver.py" + proc = subprocess.run( + [sys.executable, str(driver), str(tmp_path)], + capture_output=True, text=True, timeout=30, + ) + assert proc.returncode == 0, f"driver failed:\n{proc.stderr}" + + tick = (tmp_path / "tick.log").read_text().splitlines() + crash = (tmp_path / "crash.log").read_text().splitlines() + + # tick ran continuously for ~2s at 0.2s cadence → several lines. + assert len(tick) >= 5, f"tick worker did not stay up: {len(tick)} lines" + # crasher died fast and was restarted repeatedly → many markers. + assert len(crash) >= 3, f"crasher was not restarted: {len(crash)} markers" + + +def test_empty_fleet_returns(tmp_path: pathlib.Path): + # run_fleet([]) must be a no-op, not hang. + code = ( + "from decnet.prefork import run_fleet; run_fleet({}, stop_after=5)" + ) + proc = subprocess.run( + [sys.executable, "-c", code], capture_output=True, text=True, timeout=15 + ) + assert proc.returncode == 0, proc.stderr