refactor: enhance CLI with improved service registration and deployment
- Refactor deploy command to support service randomization and selective service deployment - Add --services flag to filter deployed services by name - Improve status and teardown command output formatting - Update help text for clarity
This commit is contained in:
412
decnet/cli.py
412
decnet/cli.py
@@ -35,6 +35,22 @@ from decnet.services.registry import all_services
|
||||
|
||||
log = get_logger("cli")
|
||||
|
||||
|
||||
def _daemonize() -> None:
|
||||
"""Fork the current process into a background daemon (Unix double-fork)."""
|
||||
import os
|
||||
import sys
|
||||
|
||||
if os.fork() > 0:
|
||||
raise SystemExit(0)
|
||||
os.setsid()
|
||||
if os.fork() > 0:
|
||||
raise SystemExit(0)
|
||||
sys.stdout = open(os.devnull, "w") # noqa: SIM115
|
||||
sys.stderr = open(os.devnull, "w") # noqa: SIM115
|
||||
sys.stdin = open(os.devnull, "r") # noqa: SIM115
|
||||
|
||||
|
||||
app = typer.Typer(
|
||||
name="decnet",
|
||||
help="Deploy a deception network of honeypot deckies on your LAN.",
|
||||
@@ -43,34 +59,23 @@ app = typer.Typer(
|
||||
console = Console()
|
||||
|
||||
|
||||
def _kill_api() -> None:
|
||||
"""Find and kill any running DECNET API (uvicorn) or mutator processes."""
|
||||
import psutil
|
||||
def _kill_all_services() -> None:
|
||||
"""Find and kill all running DECNET microservice processes."""
|
||||
import os
|
||||
|
||||
_killed: bool = False
|
||||
for _proc in psutil.process_iter(['pid', 'name', 'cmdline']):
|
||||
try:
|
||||
_cmd = _proc.info['cmdline']
|
||||
if not _cmd:
|
||||
continue
|
||||
if "uvicorn" in _cmd and "decnet.web.api:app" in _cmd:
|
||||
console.print(f"[yellow]Stopping DECNET API (PID {_proc.info['pid']})...[/]")
|
||||
os.kill(_proc.info['pid'], signal.SIGTERM)
|
||||
_killed = True
|
||||
elif "decnet.cli" in _cmd and "mutate" in _cmd and "--watch" in _cmd:
|
||||
console.print(f"[yellow]Stopping DECNET Mutator Watcher (PID {_proc.info['pid']})...[/]")
|
||||
os.kill(_proc.info['pid'], signal.SIGTERM)
|
||||
_killed = True
|
||||
elif "decnet.cli" in _cmd and "collect" in _cmd:
|
||||
console.print(f"[yellow]Stopping DECNET Collector (PID {_proc.info['pid']})...[/]")
|
||||
os.kill(_proc.info['pid'], signal.SIGTERM)
|
||||
_killed = True
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
continue
|
||||
registry = _service_registry(str(DECNET_INGEST_LOG_FILE))
|
||||
killed = 0
|
||||
for name, match_fn, _launch_args in registry:
|
||||
pid = _is_running(match_fn)
|
||||
if pid is not None:
|
||||
console.print(f"[yellow]Stopping {name} (PID {pid})...[/]")
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
killed += 1
|
||||
|
||||
if _killed:
|
||||
console.print("[green]Background processes stopped.[/]")
|
||||
if killed:
|
||||
console.print(f"[green]{killed} background process(es) stopped.[/]")
|
||||
else:
|
||||
console.print("[dim]No DECNET services were running.[/]")
|
||||
|
||||
|
||||
@app.command()
|
||||
@@ -78,12 +83,17 @@ def api(
|
||||
port: int = typer.Option(DECNET_API_PORT, "--port", help="Port for the backend API"),
|
||||
host: str = typer.Option(DECNET_API_HOST, "--host", help="Host IP for the backend API"),
|
||||
log_file: str = typer.Option(DECNET_INGEST_LOG_FILE, "--log-file", help="Path to the DECNET log file to monitor"),
|
||||
daemon: bool = typer.Option(False, "--daemon", "-d", help="Detach to background as a daemon process"),
|
||||
) -> None:
|
||||
"""Run the DECNET API and Web Dashboard in standalone mode."""
|
||||
import subprocess # nosec B404
|
||||
import sys
|
||||
import os
|
||||
|
||||
if daemon:
|
||||
log.info("API daemonizing host=%s port=%d", host, port)
|
||||
_daemonize()
|
||||
|
||||
log.info("API command invoked host=%s port=%d", host, port)
|
||||
console.print(f"[green]Starting DECNET API on {host}:{port}...[/]")
|
||||
_env: dict[str, str] = os.environ.copy()
|
||||
@@ -120,9 +130,15 @@ def deploy(
|
||||
config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to INI config file"),
|
||||
api: bool = typer.Option(False, "--api", help="Start the FastAPI backend to ingest and serve logs"),
|
||||
api_port: int = typer.Option(8000, "--api-port", help="Port for the backend API"),
|
||||
daemon: bool = typer.Option(False, "--daemon", help="Detach to background as a daemon process"),
|
||||
) -> None:
|
||||
"""Deploy deckies to the LAN."""
|
||||
import os
|
||||
|
||||
if daemon:
|
||||
log.info("deploy daemonizing mode=%s deckies=%s", mode, deckies)
|
||||
_daemonize()
|
||||
|
||||
log.info("deploy command invoked mode=%s deckies=%s dry_run=%s", mode, deckies, dry_run)
|
||||
if mode not in ("unihost", "swarm"):
|
||||
console.print("[red]--mode must be 'unihost' or 'swarm'[/]")
|
||||
@@ -316,6 +332,136 @@ def deploy(
|
||||
except (FileNotFoundError, subprocess.SubprocessError):
|
||||
console.print("[red]Failed to start DECNET-PROBER.[/]")
|
||||
|
||||
if effective_log_file and not dry_run:
|
||||
import subprocess # nosec B404
|
||||
import sys
|
||||
console.print("[bold cyan]Starting DECNET-PROFILER[/] (builds attacker profiles from log stream)")
|
||||
try:
|
||||
subprocess.Popen( # nosec B603
|
||||
[sys.executable, "-m", "decnet.cli", "profiler", "--daemon"],
|
||||
stdin=subprocess.DEVNULL,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.STDOUT,
|
||||
start_new_session=True,
|
||||
)
|
||||
except (FileNotFoundError, subprocess.SubprocessError):
|
||||
console.print("[red]Failed to start DECNET-PROFILER.[/]")
|
||||
|
||||
if effective_log_file and not dry_run:
|
||||
import subprocess # nosec B404
|
||||
import sys
|
||||
console.print("[bold cyan]Starting DECNET-SNIFFER[/] (passive network capture)")
|
||||
try:
|
||||
subprocess.Popen( # nosec B603
|
||||
[sys.executable, "-m", "decnet.cli", "sniffer",
|
||||
"--daemon",
|
||||
"--log-file", str(effective_log_file)],
|
||||
stdin=subprocess.DEVNULL,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.STDOUT,
|
||||
start_new_session=True,
|
||||
)
|
||||
except (FileNotFoundError, subprocess.SubprocessError):
|
||||
console.print("[red]Failed to start DECNET-SNIFFER.[/]")
|
||||
|
||||
|
||||
def _is_running(match_fn) -> int | None:
|
||||
"""Return PID of a running DECNET process matching ``match_fn(cmdline)``, or None."""
|
||||
import psutil
|
||||
|
||||
for proc in psutil.process_iter(["pid", "cmdline"]):
|
||||
try:
|
||||
cmd = proc.info["cmdline"]
|
||||
if cmd and match_fn(cmd):
|
||||
return proc.info["pid"]
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
continue
|
||||
return None
|
||||
|
||||
|
||||
# Each entry: (display_name, detection_fn, launch_args_fn)
|
||||
# launch_args_fn receives log_file and returns the Popen argv list.
|
||||
def _service_registry(log_file: str) -> list[tuple[str, callable, list[str]]]:
|
||||
"""Return the microservice registry for health-check and relaunch."""
|
||||
import sys
|
||||
|
||||
_py = sys.executable
|
||||
return [
|
||||
(
|
||||
"Collector",
|
||||
lambda cmd: "decnet.cli" in cmd and "collect" in cmd,
|
||||
[_py, "-m", "decnet.cli", "collect", "--daemon", "--log-file", log_file],
|
||||
),
|
||||
(
|
||||
"Mutator",
|
||||
lambda cmd: "decnet.cli" in cmd and "mutate" in cmd and "--watch" in cmd,
|
||||
[_py, "-m", "decnet.cli", "mutate", "--daemon", "--watch"],
|
||||
),
|
||||
(
|
||||
"Prober",
|
||||
lambda cmd: "decnet.cli" in cmd and "probe" in cmd,
|
||||
[_py, "-m", "decnet.cli", "probe", "--daemon", "--log-file", log_file],
|
||||
),
|
||||
(
|
||||
"Profiler",
|
||||
lambda cmd: "decnet.cli" in cmd and "profiler" in cmd,
|
||||
[_py, "-m", "decnet.cli", "profiler", "--daemon"],
|
||||
),
|
||||
(
|
||||
"Sniffer",
|
||||
lambda cmd: "decnet.cli" in cmd and "sniffer" in cmd,
|
||||
[_py, "-m", "decnet.cli", "sniffer", "--daemon", "--log-file", log_file],
|
||||
),
|
||||
(
|
||||
"API",
|
||||
lambda cmd: "uvicorn" in cmd and "decnet.web.api:app" in cmd,
|
||||
[_py, "-m", "uvicorn", "decnet.web.api:app",
|
||||
"--host", DECNET_API_HOST, "--port", str(DECNET_API_PORT)],
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@app.command()
|
||||
def redeploy(
|
||||
log_file: str = typer.Option(DECNET_INGEST_LOG_FILE, "--log-file", "-f", help="Path to the DECNET log file"),
|
||||
) -> None:
|
||||
"""Check running DECNET services and relaunch any that are down."""
|
||||
import subprocess # nosec B404
|
||||
|
||||
log.info("redeploy: checking services")
|
||||
registry = _service_registry(str(log_file))
|
||||
|
||||
table = Table(title="DECNET Services", show_lines=True)
|
||||
table.add_column("Service", style="bold cyan")
|
||||
table.add_column("Status")
|
||||
table.add_column("PID", style="dim")
|
||||
table.add_column("Action")
|
||||
|
||||
relaunched = 0
|
||||
for name, match_fn, launch_args in registry:
|
||||
pid = _is_running(match_fn)
|
||||
if pid is not None:
|
||||
table.add_row(name, "[green]UP[/]", str(pid), "—")
|
||||
else:
|
||||
try:
|
||||
subprocess.Popen( # nosec B603
|
||||
launch_args,
|
||||
stdin=subprocess.DEVNULL,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.STDOUT,
|
||||
start_new_session=True,
|
||||
)
|
||||
table.add_row(name, "[red]DOWN[/]", "—", "[green]relaunched[/]")
|
||||
relaunched += 1
|
||||
except (FileNotFoundError, subprocess.SubprocessError) as exc:
|
||||
table.add_row(name, "[red]DOWN[/]", "—", f"[red]failed: {exc}[/]")
|
||||
|
||||
console.print(table)
|
||||
if relaunched:
|
||||
console.print(f"[green]{relaunched} service(s) relaunched.[/]")
|
||||
else:
|
||||
console.print("[green]All services running.[/]")
|
||||
|
||||
|
||||
@app.command()
|
||||
def probe(
|
||||
@@ -329,10 +475,11 @@ def probe(
|
||||
from decnet.prober import prober_worker
|
||||
|
||||
if daemon:
|
||||
# Suppress console output when running as background daemon
|
||||
import os
|
||||
log.info("probe daemon starting log_file=%s interval=%d", log_file, interval)
|
||||
log.info("probe daemonizing log_file=%s interval=%d", log_file, interval)
|
||||
_daemonize()
|
||||
asyncio.run(prober_worker(log_file, interval=interval, timeout=timeout))
|
||||
return
|
||||
|
||||
else:
|
||||
log.info("probe command invoked log_file=%s interval=%d", log_file, interval)
|
||||
console.print(f"[bold cyan]DECNET-PROBER[/] watching {log_file} for attackers (interval: {interval}s)")
|
||||
@@ -346,10 +493,16 @@ def probe(
|
||||
@app.command()
|
||||
def collect(
|
||||
log_file: str = typer.Option(DECNET_INGEST_LOG_FILE, "--log-file", "-f", help="Path to write RFC 5424 syslog lines and .json records"),
|
||||
daemon: bool = typer.Option(False, "--daemon", "-d", help="Detach to background as a daemon process"),
|
||||
) -> None:
|
||||
"""Stream Docker logs from all running decky service containers to a log file."""
|
||||
import asyncio
|
||||
from decnet.collector import log_collector_worker
|
||||
|
||||
if daemon:
|
||||
log.info("collect daemonizing log_file=%s", log_file)
|
||||
_daemonize()
|
||||
|
||||
log.info("collect command invoked log_file=%s", log_file)
|
||||
console.print(f"[bold cyan]Collector starting[/] → {log_file}")
|
||||
asyncio.run(log_collector_worker(log_file))
|
||||
@@ -358,14 +511,19 @@ def collect(
|
||||
@app.command()
|
||||
def mutate(
|
||||
watch: bool = typer.Option(False, "--watch", "-w", help="Run continuously and mutate deckies according to their interval"),
|
||||
decky_name: Optional[str] = typer.Option(None, "--decky", "-d", help="Force mutate a specific decky immediately"),
|
||||
decky_name: Optional[str] = typer.Option(None, "--decky", help="Force mutate a specific decky immediately"),
|
||||
force_all: bool = typer.Option(False, "--all", help="Force mutate all deckies immediately"),
|
||||
daemon: bool = typer.Option(False, "--daemon", "-d", help="Detach to background as a daemon process"),
|
||||
) -> None:
|
||||
"""Manually trigger or continuously watch for decky mutation."""
|
||||
import asyncio
|
||||
from decnet.mutator import mutate_decky, mutate_all, run_watch_loop
|
||||
from decnet.web.dependencies import repo
|
||||
|
||||
if daemon:
|
||||
log.info("mutate daemonizing watch=%s", watch)
|
||||
_daemonize()
|
||||
|
||||
async def _run() -> None:
|
||||
await repo.initialize()
|
||||
if watch:
|
||||
@@ -387,6 +545,21 @@ def status() -> None:
|
||||
from decnet.engine import status as _status
|
||||
_status()
|
||||
|
||||
registry = _service_registry(str(DECNET_INGEST_LOG_FILE))
|
||||
svc_table = Table(title="DECNET Services", show_lines=True)
|
||||
svc_table.add_column("Service", style="bold cyan")
|
||||
svc_table.add_column("Status")
|
||||
svc_table.add_column("PID", style="dim")
|
||||
|
||||
for name, match_fn, _launch_args in registry:
|
||||
pid = _is_running(match_fn)
|
||||
if pid is not None:
|
||||
svc_table.add_row(name, "[green]UP[/]", str(pid))
|
||||
else:
|
||||
svc_table.add_row(name, "[red]DOWN[/]", "—")
|
||||
|
||||
console.print(svc_table)
|
||||
|
||||
|
||||
@app.command()
|
||||
def teardown(
|
||||
@@ -404,7 +577,7 @@ def teardown(
|
||||
log.info("teardown complete all=%s id=%s", all_, id_)
|
||||
|
||||
if all_:
|
||||
_kill_api()
|
||||
_kill_all_services()
|
||||
|
||||
|
||||
@app.command(name="services")
|
||||
@@ -438,6 +611,7 @@ def correlate(
|
||||
min_deckies: int = typer.Option(2, "--min-deckies", "-m", help="Minimum number of distinct deckies an IP must touch to be reported"),
|
||||
output: str = typer.Option("table", "--output", "-o", help="Output format: table | json | syslog"),
|
||||
emit_syslog: bool = typer.Option(False, "--emit-syslog", help="Also print traversal events as RFC 5424 lines (for SIEM piping)"),
|
||||
daemon: bool = typer.Option(False, "--daemon", "-d", help="Detach to background as a daemon process"),
|
||||
) -> None:
|
||||
"""Analyse logs for cross-decky traversals and print the attacker movement graph."""
|
||||
import sys
|
||||
@@ -445,6 +619,10 @@ def correlate(
|
||||
from pathlib import Path
|
||||
from decnet.correlation.engine import CorrelationEngine
|
||||
|
||||
if daemon:
|
||||
log.info("correlate daemonizing log_file=%s", log_file)
|
||||
_daemonize()
|
||||
|
||||
engine = CorrelationEngine()
|
||||
|
||||
if log_file:
|
||||
@@ -509,6 +687,7 @@ def list_archetypes() -> None:
|
||||
def serve_web(
|
||||
web_port: int = typer.Option(DECNET_WEB_PORT, "--web-port", help="Port to serve the DECNET Web Dashboard"),
|
||||
host: str = typer.Option(DECNET_WEB_HOST, "--host", help="Host IP to serve the Web Dashboard"),
|
||||
daemon: bool = typer.Option(False, "--daemon", "-d", help="Detach to background as a daemon process"),
|
||||
) -> None:
|
||||
"""Serve the DECNET Web Dashboard frontend."""
|
||||
import http.server
|
||||
@@ -521,6 +700,10 @@ def serve_web(
|
||||
console.print(f"[red]Frontend build not found at {dist_dir}. Make sure you run 'npm run build' inside 'decnet_web'.[/]")
|
||||
raise typer.Exit(1)
|
||||
|
||||
if daemon:
|
||||
log.info("web daemonizing host=%s port=%d", host, web_port)
|
||||
_daemonize()
|
||||
|
||||
class SPAHTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
|
||||
def do_GET(self):
|
||||
path = self.translate_path(self.path)
|
||||
@@ -538,5 +721,174 @@ def serve_web(
|
||||
except KeyboardInterrupt:
|
||||
console.print("\n[dim]Shutting down dashboard server.[/]")
|
||||
|
||||
@app.command(name="profiler")
|
||||
def profiler_cmd(
|
||||
interval: int = typer.Option(30, "--interval", "-i", help="Seconds between profile rebuild cycles"),
|
||||
daemon: bool = typer.Option(False, "--daemon", "-d", help="Detach to background as a daemon process"),
|
||||
) -> None:
|
||||
"""Run the attacker profiler as a standalone microservice."""
|
||||
import asyncio
|
||||
from decnet.profiler import attacker_profile_worker
|
||||
from decnet.web.dependencies import repo
|
||||
|
||||
if daemon:
|
||||
log.info("profiler daemonizing interval=%d", interval)
|
||||
_daemonize()
|
||||
|
||||
log.info("profiler starting interval=%d", interval)
|
||||
console.print(f"[bold cyan]Profiler starting[/] (interval: {interval}s)")
|
||||
|
||||
async def _run() -> None:
|
||||
await repo.initialize()
|
||||
await attacker_profile_worker(repo, interval=interval)
|
||||
|
||||
try:
|
||||
asyncio.run(_run())
|
||||
except KeyboardInterrupt:
|
||||
console.print("\n[yellow]Profiler stopped.[/]")
|
||||
|
||||
|
||||
@app.command(name="sniffer")
|
||||
def sniffer_cmd(
|
||||
log_file: str = typer.Option(DECNET_INGEST_LOG_FILE, "--log-file", "-f", help="Path to write captured syslog + JSON records"),
|
||||
daemon: bool = typer.Option(False, "--daemon", "-d", help="Detach to background as a daemon process"),
|
||||
) -> None:
|
||||
"""Run the network sniffer as a standalone microservice."""
|
||||
import asyncio
|
||||
from decnet.sniffer import sniffer_worker
|
||||
|
||||
if daemon:
|
||||
log.info("sniffer daemonizing log_file=%s", log_file)
|
||||
_daemonize()
|
||||
|
||||
log.info("sniffer starting log_file=%s", log_file)
|
||||
console.print(f"[bold cyan]Sniffer starting[/] → {log_file}")
|
||||
|
||||
try:
|
||||
asyncio.run(sniffer_worker(log_file))
|
||||
except KeyboardInterrupt:
|
||||
console.print("\n[yellow]Sniffer stopped.[/]")
|
||||
|
||||
|
||||
_DB_RESET_TABLES: tuple[str, ...] = (
|
||||
# Order matters for DROP TABLE: attacker_behavior FK-references attackers.
|
||||
"attacker_behavior",
|
||||
"attackers",
|
||||
"logs",
|
||||
"bounty",
|
||||
"state",
|
||||
"users",
|
||||
)
|
||||
|
||||
|
||||
async def _db_reset_mysql_async(dsn: str, mode: str, confirm: bool) -> None:
|
||||
"""Inspect + (optionally) wipe a MySQL database. Pulled out of the CLI
|
||||
wrapper so tests can drive it without spawning a Typer runner."""
|
||||
from urllib.parse import urlparse
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.ext.asyncio import create_async_engine
|
||||
|
||||
db_name = urlparse(dsn).path.lstrip("/") or "(default)"
|
||||
engine = create_async_engine(dsn)
|
||||
try:
|
||||
# Collect current row counts per table. Missing tables yield -1.
|
||||
rows: dict[str, int] = {}
|
||||
async with engine.connect() as conn:
|
||||
for tbl in _DB_RESET_TABLES:
|
||||
try:
|
||||
result = await conn.execute(text(f"SELECT COUNT(*) FROM `{tbl}`"))
|
||||
rows[tbl] = result.scalar() or 0
|
||||
except Exception: # noqa: BLE001 — ProgrammingError for missing table varies by driver
|
||||
rows[tbl] = -1
|
||||
|
||||
summary = Table(title=f"DECNET MySQL reset — database `{db_name}` (mode={mode})")
|
||||
summary.add_column("Table", style="cyan")
|
||||
summary.add_column("Rows", justify="right")
|
||||
for tbl, count in rows.items():
|
||||
summary.add_row(tbl, "[dim]missing[/]" if count < 0 else f"{count:,}")
|
||||
console.print(summary)
|
||||
|
||||
if not confirm:
|
||||
console.print(
|
||||
"[yellow]Dry-run only. Re-run with [bold]--i-know-what-im-doing[/] "
|
||||
"to actually execute.[/]"
|
||||
)
|
||||
return
|
||||
|
||||
# Destructive phase. FK checks off so TRUNCATE/DROP works in any order.
|
||||
async with engine.begin() as conn:
|
||||
await conn.execute(text("SET FOREIGN_KEY_CHECKS = 0"))
|
||||
for tbl in _DB_RESET_TABLES:
|
||||
if rows.get(tbl, -1) < 0:
|
||||
continue # skip absent tables silently
|
||||
if mode == "truncate":
|
||||
await conn.execute(text(f"TRUNCATE TABLE `{tbl}`"))
|
||||
console.print(f"[green]✓ TRUNCATE {tbl}[/]")
|
||||
else: # drop-tables
|
||||
await conn.execute(text(f"DROP TABLE `{tbl}`"))
|
||||
console.print(f"[green]✓ DROP TABLE {tbl}[/]")
|
||||
await conn.execute(text("SET FOREIGN_KEY_CHECKS = 1"))
|
||||
|
||||
console.print(f"[bold green]Done. Database `{db_name}` reset ({mode}).[/]")
|
||||
finally:
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
@app.command(name="db-reset")
|
||||
def db_reset(
|
||||
i_know: bool = typer.Option(
|
||||
False,
|
||||
"--i-know-what-im-doing",
|
||||
help="Required to actually execute. Without it, the command runs in dry-run mode.",
|
||||
),
|
||||
mode: str = typer.Option(
|
||||
"truncate",
|
||||
"--mode",
|
||||
help="truncate (wipe rows, keep schema) | drop-tables (DROP TABLE for each DECNET table)",
|
||||
),
|
||||
url: Optional[str] = typer.Option(
|
||||
None,
|
||||
"--url",
|
||||
help="Override DECNET_DB_URL for this invocation (e.g. when cleanup needs admin creds).",
|
||||
),
|
||||
) -> None:
|
||||
"""Wipe the MySQL database used by the DECNET dashboard.
|
||||
|
||||
Destructive. Runs dry by default — pass --i-know-what-im-doing to commit.
|
||||
Only supported against MySQL; refuses to operate on SQLite.
|
||||
"""
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
if mode not in ("truncate", "drop-tables"):
|
||||
console.print(f"[red]Invalid --mode '{mode}'. Expected: truncate | drop-tables.[/]")
|
||||
raise typer.Exit(2)
|
||||
|
||||
db_type = os.environ.get("DECNET_DB_TYPE", "sqlite").lower()
|
||||
if db_type != "mysql":
|
||||
console.print(
|
||||
f"[red]db-reset is MySQL-only (DECNET_DB_TYPE='{db_type}'). "
|
||||
f"For SQLite, just delete the decnet.db file.[/]"
|
||||
)
|
||||
raise typer.Exit(2)
|
||||
|
||||
dsn = url or os.environ.get("DECNET_DB_URL")
|
||||
if not dsn:
|
||||
# Fall back to component env vars (DECNET_DB_HOST/PORT/NAME/USER/PASSWORD).
|
||||
from decnet.web.db.mysql.database import build_mysql_url
|
||||
try:
|
||||
dsn = build_mysql_url()
|
||||
except ValueError as e:
|
||||
console.print(f"[red]{e}[/]")
|
||||
raise typer.Exit(2) from e
|
||||
|
||||
log.info("db-reset invoked mode=%s confirm=%s", mode, i_know)
|
||||
try:
|
||||
asyncio.run(_db_reset_mysql_async(dsn, mode=mode, confirm=i_know))
|
||||
except Exception as e: # noqa: BLE001
|
||||
console.print(f"[red]db-reset failed: {e}[/]")
|
||||
raise typer.Exit(1) from e
|
||||
|
||||
|
||||
if __name__ == '__main__': # pragma: no cover
|
||||
app()
|
||||
|
||||
Reference in New Issue
Block a user