feat(cli): allow decnet ttp on agents (DEBT-047)

The TTP-tagging worker is now safe to run on agent hosts: EmailLifter
disk-reaches body-aware predicates from the local artifacts tree
(DEBT-035 unblocked filesystem access; DEBT-047 added the helper).

Drop `ttp` from MASTER_ONLY_COMMANDS in cli/gating.py and remove the
defence-in-depth `_require_master_mode("ttp")` call in cli/ttp.py.
`ttp-backfill` walks the master DB and stays master-only.
This commit is contained in:
2026-05-02 20:07:03 -04:00
parent e972d870de
commit 79674026dd
3 changed files with 69 additions and 4 deletions

View File

@@ -30,7 +30,10 @@ MASTER_ONLY_COMMANDS: frozenset[str] = frozenset({
"mutate", "listener", "profiler",
"services", "distros", "correlate", "archetypes", "web",
"db-reset", "init", "webhook", "clusterer", "campaign-clusterer",
"ttp", "ttp-backfill",
# `ttp` runs on agents — local SMTP decoys persist .eml files into the
# agent's artifacts tree and the EmailLifter disk-reaches them in-process
# (DEBT-047). `ttp-backfill` stays master-only: it walks the master DB.
"ttp-backfill",
})
MASTER_ONLY_GROUPS: frozenset[str] = frozenset(
{"swarm", "topology", "geoip", "realism"}

View File

@@ -55,12 +55,9 @@ def register(app: typer.Typer) -> None:
),
) -> None:
"""TTP-tagging worker — MITRE ATT&CK technique tagging."""
from decnet.cli.gating import _require_master_mode
from decnet.ttp.worker import run_ttp_worker_loop
from decnet.web.dependencies import repo
_require_master_mode("ttp")
if daemon:
log.info("ttp daemonizing poll=%s", poll_interval_secs)
_utils._daemonize()

View File

@@ -0,0 +1,65 @@
"""Agent-mode gating for the `ttp` worker (DEBT-047).
`decnet ttp` runs the live TTP-tagging worker against local bus events
and the local artifacts tree. After DEBT-047 it MUST be available on
agent hosts so EmailLifter R0047 (BEC) can disk-reach .eml files
without round-tripping raw body text through the master.
`decnet ttp-backfill` walks the master DB and stays master-only.
"""
from __future__ import annotations
import os
import pathlib
import subprocess
import sys
from pathlib import Path
REPO = pathlib.Path(__file__).resolve().parent.parent.parent
DECNET_BIN = Path(sys.executable).parent / "decnet"
def _clean_env(**overrides: str) -> dict[str, str]:
base = {"PATH": os.environ["PATH"], "HOME": "/nonexistent-for-test"}
base.update(overrides)
base["DECNET_CONFIG"] = "/nonexistent/decnet.ini"
base.setdefault("DECNET_JWT_SECRET", "x" * 32)
return base
def _help_text(env: dict[str, str]) -> str:
result = subprocess.run(
[str(DECNET_BIN), "--help"],
env=env, cwd=str(REPO),
capture_output=True, text=True, timeout=20,
)
assert result.returncode == 0, result.stderr
return result.stdout
def test_ttp_visible_on_agent_mode():
out = _help_text(_clean_env(DECNET_MODE="agent", DECNET_DISALLOW_MASTER="true"))
assert "ttp " in out or "ttp\n" in out, (
"`ttp` worker must be agent-runnable after DEBT-047 (disk-reach unblock)"
)
def test_ttp_backfill_hidden_on_agent_mode():
out = _help_text(_clean_env(DECNET_MODE="agent", DECNET_DISALLOW_MASTER="true"))
assert "ttp-backfill" not in out, (
"`ttp-backfill` walks the master DB and must stay master-only"
)
def test_ttp_visible_on_master_mode():
out = _help_text(_clean_env(DECNET_MODE="master"))
assert "ttp " in out or "ttp\n" in out
assert "ttp-backfill" in out
def test_master_only_set_excludes_ttp():
"""Source-level guard against re-adding `ttp` to the master-only set."""
from decnet.cli.gating import MASTER_ONLY_COMMANDS
assert "ttp" not in MASTER_ONLY_COMMANDS
assert "ttp-backfill" in MASTER_ONLY_COMMANDS