diff --git a/decnet/cli.py b/decnet/cli.py index d6e7083..e490a00 100644 --- a/decnet/cli.py +++ b/decnet/cli.py @@ -92,6 +92,7 @@ def api( import os import signal + _require_master_mode("api") if daemon: log.info("API daemonizing host=%s port=%d workers=%d", host, port, workers) _daemonize() @@ -136,6 +137,7 @@ def swarmctl( import os import signal + _require_master_mode("swarmctl") if daemon: log.info("swarmctl daemonizing host=%s port=%d", host, port) _daemonize() @@ -782,6 +784,7 @@ def deploy( """Deploy deckies to the LAN.""" import os + _require_master_mode("deploy") if daemon: log.info("deploy daemonizing mode=%s deckies=%s", mode, deckies) _daemonize() @@ -1223,6 +1226,7 @@ def teardown( id_: Optional[str] = typer.Option(None, "--id", help="Tear down a specific decky by name"), ) -> None: """Stop and remove deckies.""" + _require_master_mode("teardown") if not all_ and not id_: console.print("[red]Specify --all or --id .[/]") raise typer.Exit(1) @@ -1624,5 +1628,68 @@ def db_reset( raise typer.Exit(1) from e +# ─────────────────────────────────────────────────────────────────────────── +# Role-based CLI gating. +# +# MAINTAINERS: when you add a new Typer command (or add_typer group) that is +# master-only, register its name in MASTER_ONLY_COMMANDS / MASTER_ONLY_GROUPS +# below. The gate is the only thing that: +# (a) hides the command from `decnet --help` on worker hosts, and +# (b) prevents a misconfigured worker from invoking master-side logic. +# Forgetting to register a new command is a role-boundary bug. Grep for +# MASTER_ONLY when touching command registration. +# +# Worker-legitimate commands (NOT in these sets): agent, updater, forwarder. +# ─────────────────────────────────────────────────────────────────────────── +MASTER_ONLY_COMMANDS: frozenset[str] = frozenset({ + "api", "swarmctl", "deploy", "redeploy", "teardown", + "probe", "collect", "mutate", "listener", "status", + "services", "distros", "correlate", "archetypes", "web", + "profiler", "sniffer", "db-reset", +}) +MASTER_ONLY_GROUPS: frozenset[str] = frozenset({"swarm"}) + + +def _agent_mode_active() -> bool: + """True when the host is configured as an agent AND master commands are + disallowed (the default for agents). Workers overriding this explicitly + set DECNET_DISALLOW_MASTER=false to opt into hybrid use.""" + import os + mode = os.environ.get("DECNET_MODE", "master").lower() + disallow = os.environ.get("DECNET_DISALLOW_MASTER", "true").lower() == "true" + return mode == "agent" and disallow + + +def _require_master_mode(command_name: str) -> None: + """Defence-in-depth: called at the top of every master-only command body. + + The registration-time gate in _gate_commands_by_mode() already hides + these commands from Typer's dispatch table, but this check protects + against direct function imports (e.g. from tests or third-party tools) + that would bypass Typer entirely.""" + if _agent_mode_active(): + console.print( + f"[red]`decnet {command_name}` is a master-only command; this host " + f"is configured as an agent (DECNET_MODE=agent).[/]" + ) + raise typer.Exit(1) + + +def _gate_commands_by_mode(_app: typer.Typer) -> None: + if not _agent_mode_active(): + return + _app.registered_commands = [ + c for c in _app.registered_commands + if (c.name or c.callback.__name__) not in MASTER_ONLY_COMMANDS + ] + _app.registered_groups = [ + g for g in _app.registered_groups + if g.name not in MASTER_ONLY_GROUPS + ] + + +_gate_commands_by_mode(app) + + if __name__ == '__main__': # pragma: no cover app() diff --git a/tests/test_mode_gating.py b/tests/test_mode_gating.py new file mode 100644 index 0000000..c81ad70 --- /dev/null +++ b/tests/test_mode_gating.py @@ -0,0 +1,87 @@ +"""CLI mode gating — master-only commands hidden when DECNET_MODE=agent.""" +from __future__ import annotations + +import os +import pathlib +import subprocess +import sys + +import pytest + + +REPO = pathlib.Path(__file__).resolve().parent.parent +DECNET_BIN = REPO / ".venv" / "bin" / "decnet" + + +def _clean_env(**overrides: str) -> dict[str, str]: + """Env with no DECNET_* / PYTEST_* leakage from the parent test run. + + Keeps only PATH so subprocess can locate the interpreter. HOME is + stubbed below so .env.local from the user's home doesn't leak in.""" + base = {"PATH": os.environ["PATH"], "HOME": "/nonexistent-for-test"} + base.update(overrides) + # Ensure no stale DECNET_CONFIG pointing at some fixture INI + base["DECNET_CONFIG"] = "/nonexistent/decnet.ini" + # decnet.web.auth needs a JWT secret to import; provide one so + # `decnet --help` can walk the command tree. + base.setdefault("DECNET_JWT_SECRET", "x" * 32) + return base + + +def _help_text(env: dict[str, str]) -> str: + result = subprocess.run( + [str(DECNET_BIN), "--help"], + env=env, cwd=str(REPO), + capture_output=True, text=True, timeout=20, + ) + assert result.returncode == 0, result.stderr + return result.stdout + + +def test_master_mode_lists_master_commands(): + out = _help_text(_clean_env(DECNET_MODE="master")) + for cmd in ("api", "swarmctl", "swarm", "deploy", "teardown"): + assert cmd in out, f"expected '{cmd}' in master-mode --help" + # Agent commands are also visible on master (dual-use hosts). + for cmd in ("agent", "forwarder", "updater"): + assert cmd in out + + +def test_agent_mode_hides_master_commands(): + out = _help_text(_clean_env(DECNET_MODE="agent", DECNET_DISALLOW_MASTER="true")) + for cmd in ("api", "swarmctl", "deploy", "teardown", "listener"): + assert cmd not in out, f"'{cmd}' leaked into agent-mode --help" + # The `swarm` subcommand group must also disappear — identify it by its + # unique help string (plain 'swarm' appears in other command descriptions). + assert "Manage swarm workers" not in out + # Worker-legitimate commands must remain. + for cmd in ("agent", "forwarder", "updater"): + assert cmd in out + + +def test_agent_mode_can_opt_in_to_master_via_disallow_false(): + """A hybrid dev host sets DECNET_DISALLOW_MASTER=false and keeps + full access even though DECNET_MODE=agent. This is the escape hatch + for single-machine development.""" + out = _help_text(_clean_env( + DECNET_MODE="agent", DECNET_DISALLOW_MASTER="false", + )) + assert "api" in out + assert "swarmctl" in out + + +def test_defence_in_depth_direct_call_fails_in_agent_mode(monkeypatch): + """Typer's dispatch table hides the command in agent mode, but if + something imports the command function directly it must still bail. + _require_master_mode('api') is the belt-and-braces guard.""" + monkeypatch.setenv("DECNET_MODE", "agent") + monkeypatch.setenv("DECNET_DISALLOW_MASTER", "true") + # Re-import cli so the module-level gate re-runs (harmless here; + # we're exercising the in-function guard). + for mod in list(sys.modules): + if mod == "decnet.cli": + sys.modules.pop(mod) + from decnet.cli import _require_master_mode + import typer + with pytest.raises(typer.Exit): + _require_master_mode("api")