From 3da5a2c4eebe97383b9c08ae3cc5edc53b88ebaa Mon Sep 17 00:00:00 2001 From: anti Date: Sat, 18 Apr 2026 20:15:25 -0400 Subject: [PATCH] feat(cli): add decnet listener + --agent-dir on agent New `decnet listener` command runs the master-side RFC 5425 syslog-TLS receiver as a standalone process (mirrors `decnet api` / `decnet swarmctl` pattern, SIGTERM/SIGINT handlers, --daemon support). `decnet agent` now accepts --agent-dir so operators running the worker agent under sudo/root can point at a bundle outside /root/.decnet/agent (the HOME under sudo propagation). Both flags were needed to stand up the full SWARM pipeline end-to-end on a throwaway VM: mTLS control plane reachable, syslog-over-TLS wire confirmed via tcpdump, master-crash/resume proved with zero loss and zero duplication across 10 forwarded lines. pyproject: bump asyncmy floor to 0.2.11 (resolver already pulled this in). --- decnet/cli.py | 56 ++++++++++++++++++++++++++++++++++++++++++++++++-- pyproject.toml | 2 +- 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/decnet/cli.py b/decnet/cli.py index cc71e5c..18f306c 100644 --- a/decnet/cli.py +++ b/decnet/cli.py @@ -166,22 +166,74 @@ def swarmctl( def agent( port: int = typer.Option(8765, "--port", help="Port for the worker agent"), host: str = typer.Option("0.0.0.0", "--host", help="Bind address for the worker agent"), # nosec B104 + agent_dir: Optional[str] = typer.Option(None, "--agent-dir", help="Worker cert bundle dir (default: ~/.decnet/agent, expanded under the running user's HOME — set this when running as sudo/root)"), daemon: bool = typer.Option(False, "--daemon", "-d", help="Detach to background as a daemon process"), ) -> None: """Run the DECNET SWARM worker agent (requires a cert bundle in ~/.decnet/agent/).""" + import pathlib as _pathlib from decnet.agent import server as _agent_server + from decnet.swarm import pki as _pki + + resolved_dir = _pathlib.Path(agent_dir) if agent_dir else _pki.DEFAULT_AGENT_DIR if daemon: log.info("agent daemonizing host=%s port=%d", host, port) _daemonize() - log.info("agent command invoked host=%s port=%d", host, port) + log.info("agent command invoked host=%s port=%d dir=%s", host, port, resolved_dir) console.print(f"[green]Starting DECNET worker agent on {host}:{port} (mTLS)...[/]") - rc = _agent_server.run(host, port) + rc = _agent_server.run(host, port, agent_dir=resolved_dir) if rc != 0: raise typer.Exit(rc) +@app.command() +def listener( + bind_host: str = typer.Option("0.0.0.0", "--host", help="Bind address for the master syslog-TLS listener"), # nosec B104 + bind_port: int = typer.Option(6514, "--port", help="Listener TCP port (RFC 5425 default 6514)"), + log_path: Optional[str] = typer.Option(None, "--log-path", help="RFC 5424 forensic sink (default: ./master.log)"), + json_path: Optional[str] = typer.Option(None, "--json-path", help="Parsed-JSON ingest sink (default: ./master.json)"), + ca_dir: Optional[str] = typer.Option(None, "--ca-dir", help="DECNET CA dir (default: ~/.decnet/ca)"), + daemon: bool = typer.Option(False, "--daemon", "-d", help="Detach to background as a daemon process"), +) -> None: + """Run the master-side syslog-over-TLS listener (RFC 5425, mTLS).""" + import asyncio + import pathlib + from decnet.swarm import pki + from decnet.swarm.log_listener import ListenerConfig, run_listener + + resolved_ca_dir = pathlib.Path(ca_dir) if ca_dir else pki.DEFAULT_CA_DIR + resolved_log = pathlib.Path(log_path) if log_path else pathlib.Path("master.log") + resolved_json = pathlib.Path(json_path) if json_path else pathlib.Path("master.json") + + cfg = ListenerConfig( + log_path=resolved_log, json_path=resolved_json, + bind_host=bind_host, bind_port=bind_port, ca_dir=resolved_ca_dir, + ) + + if daemon: + log.info("listener daemonizing host=%s port=%d", bind_host, bind_port) + _daemonize() + + log.info("listener command invoked host=%s port=%d", bind_host, bind_port) + console.print(f"[green]Starting DECNET log listener on {bind_host}:{bind_port} (mTLS)...[/]") + + async def _main() -> None: + stop = asyncio.Event() + loop = asyncio.get_running_loop() + for sig in (signal.SIGTERM, signal.SIGINT): + try: + loop.add_signal_handler(sig, stop.set) + except (NotImplementedError, RuntimeError): # pragma: no cover + pass + await run_listener(cfg, stop_event=stop) + + try: + asyncio.run(_main()) + except KeyboardInterrupt: + pass + + @app.command() def forwarder( master_host: Optional[str] = typer.Option(None, "--master-host", help="Master listener hostname/IP (default: $DECNET_SWARM_MASTER_HOST)"), diff --git a/pyproject.toml b/pyproject.toml index b75b370..804b43e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ dependencies = [ "fastapi>=0.110.0", "uvicorn>=0.29.0", "aiosqlite>=0.20.0", - "asyncmy>=0.2.9", + "asyncmy>=0.2.11", "PyJWT>=2.8.0", "bcrypt>=4.1.0", "psutil>=5.9.0",