Files
DECNET/decnet/cli/gating.py
anti 6936a1426c feat(clustering): campaign-clusterer worker + bus topics + CLI
The campaign clusterer worker mirrors the identity-side worker shell
(bus connect, heartbeat, control listener, slow-tick fallback) but
wakes on identity.> instead of attacker.> — campaign-level work is
gated on identity-layer changes, not raw observations.

The connected-components implementation reads identities via
list_identities_for_clustering, projects them with from_identity_row,
runs union-find over combined_campaign_weight, writes campaigns rows,
sets attacker_identities.campaign_id, and runs the same revocable-
merge pass as the identity layer (a merged-out campaign whose
identities no longer co-cluster with the winner gets revoked).

Bus: adds campaign.> family (formed / identity.assigned / merged /
unmerged) plus the cross-family identity.campaign.assigned so
existing identity-stream subscribers see the badge update without
having to subscribe to campaign.>. Wiki Service-Bus.md updated in
wiki-checkout in the same wave per the project's bus-signals
discipline.

CLI: decnet campaign-clusterer registered as master-only via
MASTER_ONLY_COMMANDS; --poll-interval / --daemon mirror the identity
clusterer command surface.
2026-04-26 09:04:00 -04:00

72 lines
2.8 KiB
Python

"""Role-based CLI gating.
MAINTAINERS: when you add a new Typer command (or add_typer group) that is
master-only, register its name in MASTER_ONLY_COMMANDS / MASTER_ONLY_GROUPS
below. The gate is the only thing that:
(a) hides the command from `decnet --help` on worker hosts, and
(b) prevents a misconfigured worker from invoking master-side logic.
Forgetting to register a new command is a role-boundary bug. Grep for
MASTER_ONLY when touching command registration.
Worker-legitimate commands (NOT in these sets): agent, updater, forwarder,
status, collect, probe, sniffer. Agents run deckies locally and should be
able to inspect them + run the per-host microservices (collector streams
container logs, prober characterizes attackers hitting this host, sniffer
captures traffic). Mutator and Profiler stay master-only: the mutator
orchestrates respawns across the swarm; the profiler rebuilds attacker
profiles against the master DB (no per-host DB exists).
"""
from __future__ import annotations
import os
import typer
from .utils import console
MASTER_ONLY_COMMANDS: frozenset[str] = frozenset({
"api", "swarmctl", "deploy", "redeploy", "teardown",
"mutate", "listener", "profiler",
"services", "distros", "correlate", "archetypes", "web",
"db-reset", "init", "webhook", "clusterer", "campaign-clusterer",
})
MASTER_ONLY_GROUPS: frozenset[str] = frozenset({"swarm", "topology", "geoip"})
def _agent_mode_active() -> bool:
"""True when the host is configured as an agent AND master commands are
disallowed (the default for agents). Workers overriding this explicitly
set DECNET_DISALLOW_MASTER=false to opt into hybrid use."""
mode = os.environ.get("DECNET_MODE", "master").lower()
disallow = os.environ.get("DECNET_DISALLOW_MASTER", "true").lower() == "true"
return mode == "agent" and disallow
def _require_master_mode(command_name: str) -> None:
"""Defence-in-depth: called at the top of every master-only command body.
The registration-time gate in _gate_commands_by_mode() already hides
these commands from Typer's dispatch table, but this check protects
against direct function imports (e.g. from tests or third-party tools)
that would bypass Typer entirely."""
if _agent_mode_active():
console.print(
f"[red]`decnet {command_name}` is a master-only command; this host "
f"is configured as an agent (DECNET_MODE=agent).[/]"
)
raise typer.Exit(1)
def _gate_commands_by_mode(_app: typer.Typer) -> None:
if not _agent_mode_active():
return
_app.registered_commands = [
c for c in _app.registered_commands
if (c.name or c.callback.__name__) not in MASTER_ONLY_COMMANDS
]
_app.registered_groups = [
g for g in _app.registered_groups
if g.name not in MASTER_ONLY_GROUPS
]