feat(1.2): decnet fleet — prefork master for the heavy worker tier
Wires the prefork primitive into a CLI command. 'decnet fleet heavy' imports the shared base floor once in the master, then forks profiler + ttp as CoW-sharing child processes (own process/GIL, full isolation, shared ~71MB floor). DB-only tier => systemd unit carries no extra privilege (prefork's privilege-union cost is nil for this fleet). Unit Conflicts= the profiler/ttp units it replaces. Heavy per-worker state (ATT&CK/ML) still loads per-child; warming it in the master to share is deferred until a live RSS measurement shows the big object graph CoW-shares rather than refcount-dirties.
This commit is contained in:
@@ -26,6 +26,7 @@ from . import (
|
||||
canary,
|
||||
db,
|
||||
deploy,
|
||||
fleet,
|
||||
forwarder,
|
||||
geoip,
|
||||
init,
|
||||
@@ -62,7 +63,7 @@ for _mod in (
|
||||
swarm,
|
||||
deploy, lifecycle, workers, inventory,
|
||||
web, profiler, orchestrator, realism, reconciler, sniffer, db,
|
||||
topology, bus, geoip, init, webhook, canary, ttp, supervise,
|
||||
topology, bus, geoip, init, webhook, canary, ttp, supervise, fleet,
|
||||
):
|
||||
_mod.register(app)
|
||||
|
||||
|
||||
94
decnet/cli/fleet.py
Normal file
94
decnet/cli/fleet.py
Normal file
@@ -0,0 +1,94 @@
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
"""``decnet fleet <name>`` — prefork supervisor (DECNET 1.2).
|
||||
|
||||
Imports the shared base floor ONCE in the master, then forks one child process
|
||||
per worker (see :mod:`decnet.prefork`). Children share the floor via copy-on-write
|
||||
(measured ~71 MB shared / ~1 MB private per idle child on CPython 3.14) while
|
||||
keeping their OWN process and GIL — unlike ``decnet supervise``, which co-hosts
|
||||
workers as asyncio tasks in one shared-GIL process.
|
||||
|
||||
Use ``fleet`` for workers that must stay process-isolated (heavy resident state,
|
||||
sustained CPU) but shouldn't each re-import the world; use ``supervise`` for cheap
|
||||
co-resident IO workers.
|
||||
|
||||
CONSOLIDATION COSTS (same shape as ``supervise``):
|
||||
* Forked children inherit the master's privileges — a fleet's systemd unit
|
||||
carries the UNION of its members' caps. So group by privilege profile, not
|
||||
convenience. The ``heavy`` fleet is DB-only (no docker socket, no raw net).
|
||||
* To share via CoW the master pre-imports each worker's module BEFORE forking,
|
||||
so its RSS is large — but that RSS is the shared floor, not per-child cost.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import typer
|
||||
|
||||
from . import utils as _utils
|
||||
from .utils import console, log
|
||||
|
||||
_FLEETS = ("heavy",)
|
||||
|
||||
|
||||
def _build_fleet(name: str) -> dict:
|
||||
"""Return ``{worker_name: entry_thunk}`` for *name*.
|
||||
|
||||
Imports happen here, in the MASTER, before :func:`run_fleet` forks — that is
|
||||
what lets children share the imported code/objects via copy-on-write. Each
|
||||
thunk blocks running one worker; ``repo`` is initialized inside the child
|
||||
(post-fork) so every child opens its own pool, never a fork-inherited one.
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
if name == "heavy":
|
||||
from decnet.profiler import attacker_profile_worker
|
||||
from decnet.ttp.worker import run_ttp_worker_loop
|
||||
from decnet.web.dependencies import repo
|
||||
|
||||
# Importing the worker modules here (in the master) is what lets children
|
||||
# share their code via CoW. Heavy per-worker runtime state (ATT&CK bundle,
|
||||
# ML) still loads lazily in each child — warming it in the master to share
|
||||
# it too is a future optimization, gated on a live RSS measurement showing
|
||||
# the big object graph actually CoW-shares rather than refcount-dirtying.
|
||||
def _profiler() -> None:
|
||||
async def _go() -> None:
|
||||
await repo.initialize()
|
||||
await attacker_profile_worker(repo, interval=60)
|
||||
asyncio.run(_go())
|
||||
|
||||
def _ttp() -> None:
|
||||
async def _go() -> None:
|
||||
await repo.initialize()
|
||||
await run_ttp_worker_loop(repo, poll_interval_secs=60.0)
|
||||
asyncio.run(_go())
|
||||
|
||||
return {"profiler": _profiler, "ttp": _ttp}
|
||||
|
||||
raise ValueError(f"unknown fleet: {name}")
|
||||
|
||||
|
||||
def register(app: typer.Typer) -> None:
|
||||
@app.command(name="fleet")
|
||||
def fleet_cmd(
|
||||
name: str = typer.Argument(
|
||||
..., help=f"Worker fleet to fork. One of: {', '.join(_FLEETS)}"
|
||||
),
|
||||
daemon: bool = typer.Option(
|
||||
False, "--daemon", "-d", help="Detach to background as a daemon process"
|
||||
),
|
||||
) -> None:
|
||||
"""Prefork a worker fleet: shared base floor (CoW), one child process per worker."""
|
||||
from decnet.prefork import run_fleet
|
||||
|
||||
if name not in _FLEETS:
|
||||
console.print(
|
||||
f"[red]unknown fleet {name!r}; known fleets: {', '.join(_FLEETS)}[/]"
|
||||
)
|
||||
raise typer.Exit(2)
|
||||
|
||||
if daemon:
|
||||
log.info("fleet %s daemonizing", name)
|
||||
_utils._daemonize()
|
||||
|
||||
log.info("fleet %s starting", name)
|
||||
console.print(f"[bold cyan]Fleet starting[/] {name} (prefork)")
|
||||
specs = _build_fleet(name)
|
||||
run_fleet(specs)
|
||||
48
deploy/decnet-fleet-heavy.service.j2
Normal file
48
deploy/decnet-fleet-heavy.service.j2
Normal file
@@ -0,0 +1,48 @@
|
||||
[Unit]
|
||||
Description=DECNET Heavy Fleet (prefork master forking profiler + ttp as CoW-sharing children)
|
||||
Documentation=https://git.resacachile.cl/anti/DECNET/wiki/Workers#fleet
|
||||
After=network-online.target decnet-bus.service
|
||||
Wants=network-online.target decnet-bus.service
|
||||
# Replaces the individual decnet-profiler / decnet-ttp units. Do NOT enable
|
||||
# those alongside this one.
|
||||
Conflicts=decnet-profiler.service decnet-ttp.service
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
User={{ user }}
|
||||
Group={{ group }}
|
||||
WorkingDirectory={{ install_dir }}
|
||||
EnvironmentFile=-{{ install_dir }}/.env.local
|
||||
Environment=DECNET_SYSTEM_LOGS=/var/log/decnet/decnet.fleet-heavy.log
|
||||
ExecStart={{ venv_dir }}/bin/decnet fleet heavy
|
||||
StandardOutput=append:/var/log/decnet/decnet.fleet-heavy.log
|
||||
StandardError=append:/var/log/decnet/decnet.fleet-heavy.log
|
||||
|
||||
# Prefork master imports the shared base floor once, then forks one child per
|
||||
# worker; children share the floor via copy-on-write. Both members are DB-only
|
||||
# (no docker socket, no raw sockets) so this unit carries NO extra privilege —
|
||||
# the prefork privilege-union cost is nil for this fleet by construction.
|
||||
CapabilityBoundingSet=
|
||||
AmbientCapabilities=
|
||||
|
||||
# Security Hardening
|
||||
NoNewPrivileges=yes
|
||||
ProtectSystem=full
|
||||
# Dev installs under /home need ProtectHome=read-only: the ttp child reads
|
||||
# ./rules/ttp/ from the project root (read-only suffices — YAML reads only).
|
||||
ProtectHome=read-only
|
||||
PrivateTmp=yes
|
||||
ProtectKernelTunables=yes
|
||||
ProtectKernelModules=yes
|
||||
ProtectControlGroups=yes
|
||||
RestrictSUIDSGID=yes
|
||||
LockPersonality=yes
|
||||
ReadWritePaths={{ install_dir }} /var/log/decnet
|
||||
|
||||
Restart=on-failure
|
||||
RestartSec=5
|
||||
# Master forwards SIGTERM to children and reaps; give it room for both to drain.
|
||||
TimeoutStopSec=25
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
35
tests/cli/test_fleet.py
Normal file
35
tests/cli/test_fleet.py
Normal file
@@ -0,0 +1,35 @@
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
"""CLI surface for ``decnet fleet`` (DECNET 1.2 prefork). The fork/restart
|
||||
mechanism itself is covered by tests/test_prefork.py."""
|
||||
from __future__ import annotations
|
||||
|
||||
from typer.testing import CliRunner
|
||||
|
||||
from decnet.cli import app
|
||||
from decnet.cli.fleet import _FLEETS, _build_fleet
|
||||
|
||||
runner = CliRunner()
|
||||
|
||||
|
||||
def test_fleet_is_registered():
|
||||
result = runner.invoke(app, ["fleet", "--help"])
|
||||
assert result.exit_code == 0
|
||||
assert "fleet" in result.stdout.lower()
|
||||
|
||||
|
||||
def test_unknown_fleet_exits_2():
|
||||
result = runner.invoke(app, ["fleet", "not-a-fleet"])
|
||||
assert result.exit_code == 2
|
||||
assert "unknown fleet" in result.stdout
|
||||
|
||||
|
||||
def test_heavy_fleet_builds_expected_workers():
|
||||
# _build_fleet imports worker modules + builds thunks but runs nothing
|
||||
# (no fork, no repo.initialize) — safe to call in-process.
|
||||
specs = _build_fleet("heavy")
|
||||
assert set(specs) == {"profiler", "ttp"}
|
||||
assert all(callable(t) for t in specs.values())
|
||||
|
||||
|
||||
def test_heavy_is_known():
|
||||
assert "heavy" in _FLEETS
|
||||
Reference in New Issue
Block a user