From bce2c1940c0b9b3c349334eba4efa4289f0f0791 Mon Sep 17 00:00:00 2001 From: anti Date: Wed, 17 Jun 2026 17:35:42 -0400 Subject: [PATCH] feat(1.1): supervise cpu group with ProcessPoolExecutor kernel offload Hosts clusterer/campaign-clusterer/attribution/reuse-correlate in one process. The two O(n^2) connected-components kernels (cluster_observations, cluster_identities) offload to ONE shared forkserver pool via decnet.offload .run_kernel, so they run in parallel instead of serialising under the GIL. - offload.run_kernel: pool when installed + offload_if holds, else inline. Standalone workers and all tests run inline => behaviour unchanged (424 clustering/correlation tests green). - offload_if gates on input size (>=256) to skip pickle cost on small passes. - forkserver (not fork): supervisor is multithreaded via bus clients. - attribution/reuse co-located but not offloaded yet (lighter; same run_kernel path extends to them if profiling shows contention). - systemd unit Conflicts= the 4 units it replaces; no docker/raw-socket priv. --- decnet/cli/supervise.py | 46 ++++++++++++- .../campaign/impl/connected_components.py | 11 ++- .../clustering/impl/connected_components.py | 11 ++- decnet/offload.py | 52 ++++++++++++++ deploy/decnet-supervise-cpu.service.j2 | 47 +++++++++++++ tests/cli/test_supervise.py | 3 +- tests/test_offload.py | 68 +++++++++++++++++++ 7 files changed, 232 insertions(+), 6 deletions(-) create mode 100644 decnet/offload.py create mode 100644 deploy/decnet-supervise-cpu.service.j2 create mode 100644 tests/test_offload.py diff --git a/decnet/cli/supervise.py b/decnet/cli/supervise.py index 2836252d..36b5b46d 100644 --- a/decnet/cli/supervise.py +++ b/decnet/cli/supervise.py @@ -17,7 +17,7 @@ from .utils import console, log # Groups are intentionally a small static registry, not config — the membership # is an architectural decision, not an operator knob. -_GROUPS = ("batch",) +_GROUPS = ("batch", "cpu") async def _build_specs(group: str): @@ -43,6 +43,22 @@ async def _build_specs(group: str): ("orchestrate", lambda: orchestrator_worker(repo, interval=60, llm_enabled=None)), ("mutate", lambda: run_watch_loop(repo)), ] + if group == "cpu": + from decnet.cli.gating import _require_master_mode + from decnet.clustering.campaign.worker import run_campaign_clusterer_loop + from decnet.clustering.worker import run_clusterer_loop + from decnet.correlation.attribution_worker import run_attribution_loop + from decnet.correlation.reuse_worker import run_reuse_loop + from decnet.web.dependencies import repo + + _require_master_mode("supervise cpu") + await repo.initialize() # shared by every cpu worker → one DB pool + return [ + ("clusterer", lambda: run_clusterer_loop(repo, poll_interval_secs=60.0)), + ("campaign-clusterer", lambda: run_campaign_clusterer_loop(repo, poll_interval_secs=60.0)), + ("attribution", lambda: run_attribution_loop(repo, multi_actor_tick_secs=60.0)), + ("reuse-correlate", lambda: run_reuse_loop(repo, poll_interval_secs=60.0, min_targets=2)), + ] raise ValueError(f"unknown supervise group: {group}") @@ -75,8 +91,32 @@ def register(app: typer.Typer) -> None: console.print(f"[bold cyan]Supervisor starting[/] group={group}") async def _run() -> None: - specs = await _build_specs(group) - await run_group(specs) + pool = None + if group == "cpu": + # The CPU workers offload their O(n^2) connected-components + # kernels to ONE shared pool so they run in parallel instead of + # serialising under the GIL. forkserver (not the default fork): + # this process is multithreaded via bus clients, and forking a + # multithreaded process is unsafe. + import multiprocessing as _mp + from concurrent.futures import ProcessPoolExecutor + + from decnet import offload + + pool = ProcessPoolExecutor( + max_workers=2, mp_context=_mp.get_context("forkserver") + ) + offload.set_executor(pool) + log.info("supervise cpu: kernel offload pool ready (max_workers=2)") + try: + specs = await _build_specs(group) + await run_group(specs) + finally: + if pool is not None: + from decnet import offload + + offload.set_executor(None) + pool.shutdown(wait=False, cancel_futures=True) try: asyncio.run(_run()) diff --git a/decnet/clustering/campaign/impl/connected_components.py b/decnet/clustering/campaign/impl/connected_components.py index 28429ef4..e5cdbcc6 100644 --- a/decnet/clustering/campaign/impl/connected_components.py +++ b/decnet/clustering/campaign/impl/connected_components.py @@ -31,11 +31,16 @@ from decnet.clustering.campaign.impl.similarity import ( combined_campaign_weight, ) from decnet.logging import get_logger +from decnet.offload import run_kernel from decnet.util.simhash import from_bytes8 from decnet.web.db.repository import BaseRepository log = get_logger("clustering.campaign.connected_components") +# Below this many identities the O(n^2) pass is cheaper than the pickle +# round-trip to a pool worker, so run inline even when a pool is installed. +_OFFLOAD_MIN_IDENTITIES = 256 + def cluster_identities( features: Iterable[IdentityFeatures], @@ -220,7 +225,11 @@ class ConnectedComponentsCampaignClusterer(CampaignClusterer): row_by_uuid: dict[str, dict[str, Any]] = { r["uuid"]: r for r in active_rows } - labels = cluster_identities(feature_list) + labels = await run_kernel( + cluster_identities, + feature_list, + offload_if=len(feature_list) >= _OFFLOAD_MIN_IDENTITIES, + ) # Group identities by predicted cluster. components: dict[str, list[str]] = {} diff --git a/decnet/clustering/impl/connected_components.py b/decnet/clustering/impl/connected_components.py index 3f511f1f..889cbc43 100644 --- a/decnet/clustering/impl/connected_components.py +++ b/decnet/clustering/impl/connected_components.py @@ -42,12 +42,17 @@ from decnet.clustering.impl.similarity import ( combined_edge_weight, ) from decnet.logging import get_logger +from decnet.offload import run_kernel from decnet.profiler.identity_rollup import extract_fp_summaries from decnet.util.simhash import from_bytes8, to_bytes8 from decnet.web.db.repository import BaseRepository log = get_logger("clustering.connected_components") +# Below this many observations the O(n^2) pass is cheaper than the pickle +# round-trip to a pool worker, so run inline even when a pool is installed. +_OFFLOAD_MIN_OBSERVATIONS = 256 + # Per-session SimHash observations of the keystroke-rhythm biometric; the # rollup folds them into one identity-level centroid. _DIGRAPH_PRIMITIVE = "motor.digraph_simhash" @@ -173,7 +178,11 @@ class ConnectedComponentsClusterer(Clusterer): obs = from_attacker_row(r) observations.append(obs) row_by_id[obs.observation_id] = r - labels = cluster_observations(observations) + labels = await run_kernel( + cluster_observations, + observations, + offload_if=len(observations) >= _OFFLOAD_MIN_OBSERVATIONS, + ) # Group observations by predicted cluster. components: dict[str, list[str]] = {} diff --git a/decnet/offload.py b/decnet/offload.py new file mode 100644 index 00000000..54632e3c --- /dev/null +++ b/decnet/offload.py @@ -0,0 +1,52 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""Shared CPU-kernel offload — run a pure, picklable function in a process pool +so GIL-bound compute doesn't block the event loop (or its co-hosted workers). + +Used by ``decnet supervise cpu`` (see ``decnet/cli/supervise.py``), which hosts +several CPU-bound workers in one process and installs ONE shared +``ProcessPoolExecutor`` here. When no executor is installed — standalone workers +and every test — :func:`run_kernel` runs the kernel inline, so behaviour is +identical to before this module existed. + +Contract for an offloadable kernel: a module-level function (picklable by +reference) that is pure (no DB / clock / I/O), taking and returning picklable +values. The clustering connected-components kernels satisfy this. +""" +from __future__ import annotations + +import asyncio +from concurrent.futures import ProcessPoolExecutor +from typing import Any, Callable, TypeVar + +_T = TypeVar("_T") + +_executor: ProcessPoolExecutor | None = None + + +def set_executor(ex: ProcessPoolExecutor | None) -> None: + """Install (``ex``) or clear (``None``) the shared pool. Idempotent.""" + global _executor + _executor = ex + + +def get_executor() -> ProcessPoolExecutor | None: + return _executor + + +async def run_kernel( + fn: Callable[..., _T], *args: Any, offload_if: bool = True +) -> _T: + """Run ``fn(*args)``, offloading to the shared pool when one is installed + and ``offload_if`` holds; otherwise run inline. + + ``offload_if`` lets the caller skip the pickle round-trip for inputs too + small to be worth a cross-process hop — the caller knows the problem size, + this module does not. + # ponytail: boolean gate, not an auto-tuned threshold. If kernels start + # varying wildly in cost, measure and move the decision here. + """ + ex = _executor + if ex is None or not offload_if: + return fn(*args) + loop = asyncio.get_running_loop() + return await loop.run_in_executor(ex, fn, *args) diff --git a/deploy/decnet-supervise-cpu.service.j2 b/deploy/decnet-supervise-cpu.service.j2 new file mode 100644 index 00000000..5e932f18 --- /dev/null +++ b/deploy/decnet-supervise-cpu.service.j2 @@ -0,0 +1,47 @@ +[Unit] +Description=DECNET CPU Supervisor (clusterer + campaign-clusterer + attribution + reuse-correlate in one process, kernels offloaded to a shared pool) +Documentation=https://git.resacachile.cl/anti/DECNET/wiki/Workers#supervisor +After=network-online.target decnet-bus.service +Wants=network-online.target decnet-bus.service +# Replaces the individual clusterer / campaign-clusterer / attribution / +# reuse-correlator units. Do NOT enable those alongside this one. +Conflicts=decnet-clusterer.service decnet-campaign-clusterer.service decnet-attribution.service decnet-reuse-correlator.service + +[Service] +Type=simple +User={{ user }} +Group={{ group }} +WorkingDirectory={{ install_dir }} +EnvironmentFile=-{{ install_dir }}/.env.local +Environment=DECNET_SYSTEM_LOGS=/var/log/decnet/decnet.supervise-cpu.log +ExecStart={{ venv_dir }}/bin/decnet supervise cpu +StandardOutput=append:/var/log/decnet/decnet.supervise-cpu.log +StandardError=append:/var/log/decnet/decnet.supervise-cpu.log + +# These are read-heavy correlators (DB in, DB out, bus). No docker socket, no +# raw sockets — so unlike the batch supervisor this carries NO extra privilege +# beyond DB + network. The forkserver pool spawns short-lived compute children +# that inherit only this unit's sandbox. + +CapabilityBoundingSet= +AmbientCapabilities= + +# Security Hardening +NoNewPrivileges=yes +ProtectSystem=full +ProtectHome=read-only +PrivateTmp=yes +ProtectKernelTunables=yes +ProtectKernelModules=yes +ProtectControlGroups=yes +RestrictSUIDSGID=yes +LockPersonality=yes +ReadOnlyPaths=/var/lib/decnet +ReadWritePaths={{ install_dir }} /var/log/decnet + +Restart=on-failure +RestartSec=5 +TimeoutStopSec=20 + +[Install] +WantedBy=multi-user.target diff --git a/tests/cli/test_supervise.py b/tests/cli/test_supervise.py index 2588a29a..8b70b605 100644 --- a/tests/cli/test_supervise.py +++ b/tests/cli/test_supervise.py @@ -22,5 +22,6 @@ def test_unknown_group_exits_2(): assert "unknown group" in result.stdout -def test_batch_group_is_known(): +def test_known_groups(): assert "batch" in _GROUPS + assert "cpu" in _GROUPS diff --git a/tests/test_offload.py b/tests/test_offload.py new file mode 100644 index 00000000..7a7f0c0c --- /dev/null +++ b/tests/test_offload.py @@ -0,0 +1,68 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""Tests for the shared CPU-kernel offload (DECNET 1.1 cpu group). + +Proves the offloaded result is identical to the inline result — i.e. the kernel +and its inputs survive the process boundary and the GIL-relief path is correct, +not just fast. +""" +from __future__ import annotations + +import multiprocessing as mp +from concurrent.futures import ProcessPoolExecutor + +import pytest + +from decnet import offload +from decnet.clustering.impl.connected_components import cluster_observations +from decnet.clustering.impl.similarity import Observation + +pytestmark = pytest.mark.asyncio + + +@pytest.fixture(autouse=True) +def _clear_executor(): + offload.set_executor(None) + yield + offload.set_executor(None) + + +async def test_inline_when_no_executor(): + assert offload.get_executor() is None + out = await offload.run_kernel(lambda a, b: a + b, 2, 3) + assert out == 5 # closures are fine on the inline path (no pickling) + + +async def test_offload_if_false_runs_inline_even_with_pool(): + with ProcessPoolExecutor( + max_workers=1, mp_context=mp.get_context("forkserver") + ) as pool: + offload.set_executor(pool) + # a closure would fail to pickle — proves this stayed inline + out = await offload.run_kernel(lambda x: x * 10, 4, offload_if=False) + assert out == 40 + + +async def test_offloaded_result_equals_inline(): + obs = [ + Observation(observation_id="a", ja3="x", hassh=None, asn=1), + Observation(observation_id="b", ja3="x", hassh=None, asn=1), + Observation(observation_id="c", ja3="y", hassh=None, asn=2), + ] + inline = cluster_observations(obs) + + with ProcessPoolExecutor( + max_workers=2, mp_context=mp.get_context("forkserver") + ) as pool: + offload.set_executor(pool) + offloaded = await offload.run_kernel(cluster_observations, obs) + + assert offloaded == inline # identical across the process boundary + + +async def test_set_get_executor_roundtrip(): + assert offload.get_executor() is None + with ProcessPoolExecutor(max_workers=1) as pool: + offload.set_executor(pool) + assert offload.get_executor() is pool + offload.set_executor(None) + assert offload.get_executor() is None