diff --git a/decnet/cli/__init__.py b/decnet/cli/__init__.py index 04e30527..03461e5a 100644 --- a/decnet/cli/__init__.py +++ b/decnet/cli/__init__.py @@ -37,6 +37,7 @@ from . import ( realism, reconciler, sniffer, + supervise, swarm, swarmctl, topology, @@ -61,7 +62,7 @@ for _mod in ( swarm, deploy, lifecycle, workers, inventory, web, profiler, orchestrator, realism, reconciler, sniffer, db, - topology, bus, geoip, init, webhook, canary, ttp, + topology, bus, geoip, init, webhook, canary, ttp, supervise, ): _mod.register(app) diff --git a/decnet/cli/supervise.py b/decnet/cli/supervise.py new file mode 100644 index 00000000..2836252d --- /dev/null +++ b/decnet/cli/supervise.py @@ -0,0 +1,84 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""``decnet supervise `` — host a co-resident group of workers in one +process, paying the import floor (and the DB connection pool) once instead of +once per worker. See ``development/RELEASE-1.1.md``. + +Each worker keeps its own restart loop (see :mod:`decnet.supervisor`), so this +trades per-worker systemd granularity for RAM — a worker can always be pulled +back out to its own ``decnet `` unit by removing it from the group spec +below; nothing about the worker's own code changes. +""" +from __future__ import annotations + +import typer + +from . import utils as _utils +from .utils import console, log + +# Groups are intentionally a small static registry, not config — the membership +# is an architectural decision, not an operator knob. +_GROUPS = ("batch",) + + +async def _build_specs(group: str): + """Return ``[(name, factory), ...]`` for *group*, lazy-importing only the + workers it hosts and initializing the shared ``repo`` once. + + Factories return a fresh coroutine each call so :func:`supervise` can restart + them. Intervals match the standalone units' defaults. + # ponytail: defaults hardcoded to match the per-worker units; add CLI knobs + # only if an operator actually needs to retune a consolidated group. + """ + if group == "batch": + from decnet.fleet.reconciler_worker import fleet_reconciler_worker + from decnet.intel.worker import run_intel_loop + from decnet.mutator import run_watch_loop + from decnet.orchestrator import orchestrator_worker + from decnet.web.dependencies import repo + + await repo.initialize() # shared by every batch worker → one DB pool + return [ + ("reconcile", lambda: fleet_reconciler_worker(repo, interval=30)), + ("enrich", lambda: run_intel_loop(repo, poll_interval_secs=60.0, ttl_hours=24)), + ("orchestrate", lambda: orchestrator_worker(repo, interval=60, llm_enabled=None)), + ("mutate", lambda: run_watch_loop(repo)), + ] + raise ValueError(f"unknown supervise group: {group}") + + +def register(app: typer.Typer) -> None: + @app.command(name="supervise") + def supervise_cmd( + group: str = typer.Argument( + ..., help=f"Worker group to host. One of: {', '.join(_GROUPS)}" + ), + daemon: bool = typer.Option( + False, "--daemon", "-d", help="Detach to background as a daemon process" + ), + ) -> None: + """Host a co-resident worker group in one process (shared import floor + DB pool).""" + import asyncio + + from decnet.supervisor import run_group + + if group not in _GROUPS: + console.print( + f"[red]unknown group {group!r}; known groups: {', '.join(_GROUPS)}[/]" + ) + raise typer.Exit(2) + + if daemon: + log.info("supervise %s daemonizing", group) + _utils._daemonize() + + log.info("supervise group=%s starting", group) + console.print(f"[bold cyan]Supervisor starting[/] group={group}") + + async def _run() -> None: + specs = await _build_specs(group) + await run_group(specs) + + try: + asyncio.run(_run()) + except KeyboardInterrupt: + console.print("\n[yellow]Supervisor stopped.[/]") diff --git a/deploy/decnet-supervise-batch.service.j2 b/deploy/decnet-supervise-batch.service.j2 new file mode 100644 index 00000000..618a5f5e --- /dev/null +++ b/deploy/decnet-supervise-batch.service.j2 @@ -0,0 +1,49 @@ +[Unit] +Description=DECNET Batch Supervisor (hosts reconcile + enrich + orchestrate + mutate in one process) +Documentation=https://git.resacachile.cl/anti/DECNET/wiki/Workers#supervisor +After=network-online.target decnet-bus.service +Wants=network-online.target decnet-bus.service +# Replaces the individual decnet-reconciler / decnet-enrich / decnet-orchestrator +# / decnet-mutator units. Do NOT enable those alongside this one. +Conflicts=decnet-reconciler.service decnet-orchestrator.service decnet-mutator.service + +[Service] +Type=simple +User={{ user }} +Group={{ group }} +WorkingDirectory={{ install_dir }} +EnvironmentFile=-{{ install_dir }}/.env.local +Environment=DECNET_SYSTEM_LOGS=/var/log/decnet/decnet.supervise-batch.log +ExecStart={{ venv_dir }}/bin/decnet supervise batch +StandardOutput=append:/var/log/decnet/decnet.supervise-batch.log +StandardError=append:/var/log/decnet/decnet.supervise-batch.log + +# CONSOLIDATION COST: this unit holds the UNION of its members' privileges — +# docker socket (reconcile + mutate observe/mutate containers) AND network egress +# (enrich fans out to threat-intel providers). That is a wider blast radius than +# any single worker had. Acceptable for the batch group; weigh it before adding +# a member that needs materially more (e.g. raw sockets). +SupplementaryGroups=docker + +CapabilityBoundingSet= +AmbientCapabilities= + +# Security Hardening +NoNewPrivileges=yes +ProtectSystem=full +ProtectHome=read-only +PrivateTmp=yes +ProtectKernelTunables=yes +ProtectKernelModules=yes +ProtectControlGroups=yes +RestrictSUIDSGID=yes +LockPersonality=yes +ReadOnlyPaths=/var/lib/decnet +ReadWritePaths={{ install_dir }} /var/log/decnet + +Restart=on-failure +RestartSec=5 +TimeoutStopSec=20 + +[Install] +WantedBy=multi-user.target diff --git a/tests/cli/test_supervise.py b/tests/cli/test_supervise.py new file mode 100644 index 00000000..2588a29a --- /dev/null +++ b/tests/cli/test_supervise.py @@ -0,0 +1,26 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""CLI surface for ``decnet supervise`` (DECNET 1.1 consolidation).""" +from __future__ import annotations + +from typer.testing import CliRunner + +from decnet.cli import app +from decnet.cli.supervise import _GROUPS + +runner = CliRunner() + + +def test_supervise_is_registered(): + result = runner.invoke(app, ["supervise", "--help"]) + assert result.exit_code == 0 + assert "group" in result.stdout.lower() + + +def test_unknown_group_exits_2(): + result = runner.invoke(app, ["supervise", "definitely-not-a-group"]) + assert result.exit_code == 2 + assert "unknown group" in result.stdout + + +def test_batch_group_is_known(): + assert "batch" in _GROUPS