feat(cli): gate master-only commands when DECNET_MODE=agent

- MASTER_ONLY_COMMANDS / MASTER_ONLY_GROUPS frozensets enumerate every
  command a worker host must not see. Comment block at the declaration
  puts the maintenance obligation in front of anyone touching command
  registration.
- _gate_commands_by_mode() filters both app.registered_commands (for
  @app.command() registrations) and app.registered_groups (for
  add_typer sub-apps) so the 'swarm' group disappears along with
  'api', 'swarmctl', 'deploy', etc. on agent hosts.
- _require_master_mode() is the belt-and-braces in-function guard,
  added to the four highest-risk commands (api, swarmctl, deploy,
  teardown). Protects against direct function imports that would
  bypass Typer.
- DECNET_DISALLOW_MASTER=false is the escape hatch for hybrid dev
  hosts that legitimately play both roles.

tests/test_mode_gating.py exercises help-text listings via subprocess
and the defence-in-depth guard via direct import.
This commit is contained in:
2026-04-19 03:20:48 -04:00
parent 2b1b962849
commit 3223bec615
2 changed files with 154 additions and 0 deletions

View File

@@ -92,6 +92,7 @@ def api(
import os import os
import signal import signal
_require_master_mode("api")
if daemon: if daemon:
log.info("API daemonizing host=%s port=%d workers=%d", host, port, workers) log.info("API daemonizing host=%s port=%d workers=%d", host, port, workers)
_daemonize() _daemonize()
@@ -136,6 +137,7 @@ def swarmctl(
import os import os
import signal import signal
_require_master_mode("swarmctl")
if daemon: if daemon:
log.info("swarmctl daemonizing host=%s port=%d", host, port) log.info("swarmctl daemonizing host=%s port=%d", host, port)
_daemonize() _daemonize()
@@ -782,6 +784,7 @@ def deploy(
"""Deploy deckies to the LAN.""" """Deploy deckies to the LAN."""
import os import os
_require_master_mode("deploy")
if daemon: if daemon:
log.info("deploy daemonizing mode=%s deckies=%s", mode, deckies) log.info("deploy daemonizing mode=%s deckies=%s", mode, deckies)
_daemonize() _daemonize()
@@ -1223,6 +1226,7 @@ def teardown(
id_: Optional[str] = typer.Option(None, "--id", help="Tear down a specific decky by name"), id_: Optional[str] = typer.Option(None, "--id", help="Tear down a specific decky by name"),
) -> None: ) -> None:
"""Stop and remove deckies.""" """Stop and remove deckies."""
_require_master_mode("teardown")
if not all_ and not id_: if not all_ and not id_:
console.print("[red]Specify --all or --id <name>.[/]") console.print("[red]Specify --all or --id <name>.[/]")
raise typer.Exit(1) raise typer.Exit(1)
@@ -1624,5 +1628,68 @@ def db_reset(
raise typer.Exit(1) from e raise typer.Exit(1) from e
# ───────────────────────────────────────────────────────────────────────────
# Role-based CLI gating.
#
# MAINTAINERS: when you add a new Typer command (or add_typer group) that is
# master-only, register its name in MASTER_ONLY_COMMANDS / MASTER_ONLY_GROUPS
# below. The gate is the only thing that:
# (a) hides the command from `decnet --help` on worker hosts, and
# (b) prevents a misconfigured worker from invoking master-side logic.
# Forgetting to register a new command is a role-boundary bug. Grep for
# MASTER_ONLY when touching command registration.
#
# Worker-legitimate commands (NOT in these sets): agent, updater, forwarder.
# ───────────────────────────────────────────────────────────────────────────
MASTER_ONLY_COMMANDS: frozenset[str] = frozenset({
"api", "swarmctl", "deploy", "redeploy", "teardown",
"probe", "collect", "mutate", "listener", "status",
"services", "distros", "correlate", "archetypes", "web",
"profiler", "sniffer", "db-reset",
})
MASTER_ONLY_GROUPS: frozenset[str] = frozenset({"swarm"})
def _agent_mode_active() -> bool:
"""True when the host is configured as an agent AND master commands are
disallowed (the default for agents). Workers overriding this explicitly
set DECNET_DISALLOW_MASTER=false to opt into hybrid use."""
import os
mode = os.environ.get("DECNET_MODE", "master").lower()
disallow = os.environ.get("DECNET_DISALLOW_MASTER", "true").lower() == "true"
return mode == "agent" and disallow
def _require_master_mode(command_name: str) -> None:
"""Defence-in-depth: called at the top of every master-only command body.
The registration-time gate in _gate_commands_by_mode() already hides
these commands from Typer's dispatch table, but this check protects
against direct function imports (e.g. from tests or third-party tools)
that would bypass Typer entirely."""
if _agent_mode_active():
console.print(
f"[red]`decnet {command_name}` is a master-only command; this host "
f"is configured as an agent (DECNET_MODE=agent).[/]"
)
raise typer.Exit(1)
def _gate_commands_by_mode(_app: typer.Typer) -> None:
if not _agent_mode_active():
return
_app.registered_commands = [
c for c in _app.registered_commands
if (c.name or c.callback.__name__) not in MASTER_ONLY_COMMANDS
]
_app.registered_groups = [
g for g in _app.registered_groups
if g.name not in MASTER_ONLY_GROUPS
]
_gate_commands_by_mode(app)
if __name__ == '__main__': # pragma: no cover if __name__ == '__main__': # pragma: no cover
app() app()

87
tests/test_mode_gating.py Normal file
View File

@@ -0,0 +1,87 @@
"""CLI mode gating — master-only commands hidden when DECNET_MODE=agent."""
from __future__ import annotations
import os
import pathlib
import subprocess
import sys
import pytest
REPO = pathlib.Path(__file__).resolve().parent.parent
DECNET_BIN = REPO / ".venv" / "bin" / "decnet"
def _clean_env(**overrides: str) -> dict[str, str]:
"""Env with no DECNET_* / PYTEST_* leakage from the parent test run.
Keeps only PATH so subprocess can locate the interpreter. HOME is
stubbed below so .env.local from the user's home doesn't leak in."""
base = {"PATH": os.environ["PATH"], "HOME": "/nonexistent-for-test"}
base.update(overrides)
# Ensure no stale DECNET_CONFIG pointing at some fixture INI
base["DECNET_CONFIG"] = "/nonexistent/decnet.ini"
# decnet.web.auth needs a JWT secret to import; provide one so
# `decnet --help` can walk the command tree.
base.setdefault("DECNET_JWT_SECRET", "x" * 32)
return base
def _help_text(env: dict[str, str]) -> str:
result = subprocess.run(
[str(DECNET_BIN), "--help"],
env=env, cwd=str(REPO),
capture_output=True, text=True, timeout=20,
)
assert result.returncode == 0, result.stderr
return result.stdout
def test_master_mode_lists_master_commands():
out = _help_text(_clean_env(DECNET_MODE="master"))
for cmd in ("api", "swarmctl", "swarm", "deploy", "teardown"):
assert cmd in out, f"expected '{cmd}' in master-mode --help"
# Agent commands are also visible on master (dual-use hosts).
for cmd in ("agent", "forwarder", "updater"):
assert cmd in out
def test_agent_mode_hides_master_commands():
out = _help_text(_clean_env(DECNET_MODE="agent", DECNET_DISALLOW_MASTER="true"))
for cmd in ("api", "swarmctl", "deploy", "teardown", "listener"):
assert cmd not in out, f"'{cmd}' leaked into agent-mode --help"
# The `swarm` subcommand group must also disappear — identify it by its
# unique help string (plain 'swarm' appears in other command descriptions).
assert "Manage swarm workers" not in out
# Worker-legitimate commands must remain.
for cmd in ("agent", "forwarder", "updater"):
assert cmd in out
def test_agent_mode_can_opt_in_to_master_via_disallow_false():
"""A hybrid dev host sets DECNET_DISALLOW_MASTER=false and keeps
full access even though DECNET_MODE=agent. This is the escape hatch
for single-machine development."""
out = _help_text(_clean_env(
DECNET_MODE="agent", DECNET_DISALLOW_MASTER="false",
))
assert "api" in out
assert "swarmctl" in out
def test_defence_in_depth_direct_call_fails_in_agent_mode(monkeypatch):
"""Typer's dispatch table hides the command in agent mode, but if
something imports the command function directly it must still bail.
_require_master_mode('api') is the belt-and-braces guard."""
monkeypatch.setenv("DECNET_MODE", "agent")
monkeypatch.setenv("DECNET_DISALLOW_MASTER", "true")
# Re-import cli so the module-level gate re-runs (harmless here;
# we're exercising the in-function guard).
for mod in list(sys.modules):
if mod == "decnet.cli":
sys.modules.pop(mod)
from decnet.cli import _require_master_mode
import typer
with pytest.raises(typer.Exit):
_require_master_mode("api")