diff --git a/decnet/cli/__init__.py b/decnet/cli/__init__.py index 03461e5a..63de545c 100644 --- a/decnet/cli/__init__.py +++ b/decnet/cli/__init__.py @@ -26,6 +26,7 @@ from . import ( canary, db, deploy, + fleet, forwarder, geoip, init, @@ -62,7 +63,7 @@ for _mod in ( swarm, deploy, lifecycle, workers, inventory, web, profiler, orchestrator, realism, reconciler, sniffer, db, - topology, bus, geoip, init, webhook, canary, ttp, supervise, + topology, bus, geoip, init, webhook, canary, ttp, supervise, fleet, ): _mod.register(app) diff --git a/decnet/cli/fleet.py b/decnet/cli/fleet.py new file mode 100644 index 00000000..3b720719 --- /dev/null +++ b/decnet/cli/fleet.py @@ -0,0 +1,94 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""``decnet fleet `` — prefork supervisor (DECNET 1.2). + +Imports the shared base floor ONCE in the master, then forks one child process +per worker (see :mod:`decnet.prefork`). Children share the floor via copy-on-write +(measured ~71 MB shared / ~1 MB private per idle child on CPython 3.14) while +keeping their OWN process and GIL — unlike ``decnet supervise``, which co-hosts +workers as asyncio tasks in one shared-GIL process. + +Use ``fleet`` for workers that must stay process-isolated (heavy resident state, +sustained CPU) but shouldn't each re-import the world; use ``supervise`` for cheap +co-resident IO workers. + +CONSOLIDATION COSTS (same shape as ``supervise``): + * Forked children inherit the master's privileges — a fleet's systemd unit + carries the UNION of its members' caps. So group by privilege profile, not + convenience. The ``heavy`` fleet is DB-only (no docker socket, no raw net). + * To share via CoW the master pre-imports each worker's module BEFORE forking, + so its RSS is large — but that RSS is the shared floor, not per-child cost. +""" +from __future__ import annotations + +import typer + +from . import utils as _utils +from .utils import console, log + +_FLEETS = ("heavy",) + + +def _build_fleet(name: str) -> dict: + """Return ``{worker_name: entry_thunk}`` for *name*. + + Imports happen here, in the MASTER, before :func:`run_fleet` forks — that is + what lets children share the imported code/objects via copy-on-write. Each + thunk blocks running one worker; ``repo`` is initialized inside the child + (post-fork) so every child opens its own pool, never a fork-inherited one. + """ + import asyncio + + if name == "heavy": + from decnet.profiler import attacker_profile_worker + from decnet.ttp.worker import run_ttp_worker_loop + from decnet.web.dependencies import repo + + # Importing the worker modules here (in the master) is what lets children + # share their code via CoW. Heavy per-worker runtime state (ATT&CK bundle, + # ML) still loads lazily in each child — warming it in the master to share + # it too is a future optimization, gated on a live RSS measurement showing + # the big object graph actually CoW-shares rather than refcount-dirtying. + def _profiler() -> None: + async def _go() -> None: + await repo.initialize() + await attacker_profile_worker(repo, interval=60) + asyncio.run(_go()) + + def _ttp() -> None: + async def _go() -> None: + await repo.initialize() + await run_ttp_worker_loop(repo, poll_interval_secs=60.0) + asyncio.run(_go()) + + return {"profiler": _profiler, "ttp": _ttp} + + raise ValueError(f"unknown fleet: {name}") + + +def register(app: typer.Typer) -> None: + @app.command(name="fleet") + def fleet_cmd( + name: str = typer.Argument( + ..., help=f"Worker fleet to fork. One of: {', '.join(_FLEETS)}" + ), + daemon: bool = typer.Option( + False, "--daemon", "-d", help="Detach to background as a daemon process" + ), + ) -> None: + """Prefork a worker fleet: shared base floor (CoW), one child process per worker.""" + from decnet.prefork import run_fleet + + if name not in _FLEETS: + console.print( + f"[red]unknown fleet {name!r}; known fleets: {', '.join(_FLEETS)}[/]" + ) + raise typer.Exit(2) + + if daemon: + log.info("fleet %s daemonizing", name) + _utils._daemonize() + + log.info("fleet %s starting", name) + console.print(f"[bold cyan]Fleet starting[/] {name} (prefork)") + specs = _build_fleet(name) + run_fleet(specs) diff --git a/deploy/decnet-fleet-heavy.service.j2 b/deploy/decnet-fleet-heavy.service.j2 new file mode 100644 index 00000000..607f65a5 --- /dev/null +++ b/deploy/decnet-fleet-heavy.service.j2 @@ -0,0 +1,48 @@ +[Unit] +Description=DECNET Heavy Fleet (prefork master forking profiler + ttp as CoW-sharing children) +Documentation=https://git.resacachile.cl/anti/DECNET/wiki/Workers#fleet +After=network-online.target decnet-bus.service +Wants=network-online.target decnet-bus.service +# Replaces the individual decnet-profiler / decnet-ttp units. Do NOT enable +# those alongside this one. +Conflicts=decnet-profiler.service decnet-ttp.service + +[Service] +Type=simple +User={{ user }} +Group={{ group }} +WorkingDirectory={{ install_dir }} +EnvironmentFile=-{{ install_dir }}/.env.local +Environment=DECNET_SYSTEM_LOGS=/var/log/decnet/decnet.fleet-heavy.log +ExecStart={{ venv_dir }}/bin/decnet fleet heavy +StandardOutput=append:/var/log/decnet/decnet.fleet-heavy.log +StandardError=append:/var/log/decnet/decnet.fleet-heavy.log + +# Prefork master imports the shared base floor once, then forks one child per +# worker; children share the floor via copy-on-write. Both members are DB-only +# (no docker socket, no raw sockets) so this unit carries NO extra privilege — +# the prefork privilege-union cost is nil for this fleet by construction. +CapabilityBoundingSet= +AmbientCapabilities= + +# Security Hardening +NoNewPrivileges=yes +ProtectSystem=full +# Dev installs under /home need ProtectHome=read-only: the ttp child reads +# ./rules/ttp/ from the project root (read-only suffices — YAML reads only). +ProtectHome=read-only +PrivateTmp=yes +ProtectKernelTunables=yes +ProtectKernelModules=yes +ProtectControlGroups=yes +RestrictSUIDSGID=yes +LockPersonality=yes +ReadWritePaths={{ install_dir }} /var/log/decnet + +Restart=on-failure +RestartSec=5 +# Master forwards SIGTERM to children and reaps; give it room for both to drain. +TimeoutStopSec=25 + +[Install] +WantedBy=multi-user.target diff --git a/tests/cli/test_fleet.py b/tests/cli/test_fleet.py new file mode 100644 index 00000000..baadf85b --- /dev/null +++ b/tests/cli/test_fleet.py @@ -0,0 +1,35 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""CLI surface for ``decnet fleet`` (DECNET 1.2 prefork). The fork/restart +mechanism itself is covered by tests/test_prefork.py.""" +from __future__ import annotations + +from typer.testing import CliRunner + +from decnet.cli import app +from decnet.cli.fleet import _FLEETS, _build_fleet + +runner = CliRunner() + + +def test_fleet_is_registered(): + result = runner.invoke(app, ["fleet", "--help"]) + assert result.exit_code == 0 + assert "fleet" in result.stdout.lower() + + +def test_unknown_fleet_exits_2(): + result = runner.invoke(app, ["fleet", "not-a-fleet"]) + assert result.exit_code == 2 + assert "unknown fleet" in result.stdout + + +def test_heavy_fleet_builds_expected_workers(): + # _build_fleet imports worker modules + builds thunks but runs nothing + # (no fork, no repo.initialize) — safe to call in-process. + specs = _build_fleet("heavy") + assert set(specs) == {"profiler", "ttp"} + assert all(callable(t) for t in specs.values()) + + +def test_heavy_is_known(): + assert "heavy" in _FLEETS