feat(cli): add decnet listener + --agent-dir on agent

New `decnet listener` command runs the master-side RFC 5425 syslog-TLS
receiver as a standalone process (mirrors `decnet api` / `decnet swarmctl`
pattern, SIGTERM/SIGINT handlers, --daemon support).

`decnet agent` now accepts --agent-dir so operators running the worker
agent under sudo/root can point at a bundle outside /root/.decnet/agent
(the HOME under sudo propagation).

Both flags were needed to stand up the full SWARM pipeline end-to-end on
a throwaway VM: mTLS control plane reachable, syslog-over-TLS wire
confirmed via tcpdump, master-crash/resume proved with zero loss and
zero duplication across 10 forwarded lines.

pyproject: bump asyncmy floor to 0.2.11 (resolver already pulled this in).
This commit is contained in:
2026-04-18 20:15:25 -04:00
parent bfc7af000a
commit 3da5a2c4ee
2 changed files with 55 additions and 3 deletions

View File

@@ -166,22 +166,74 @@ def swarmctl(
def agent( def agent(
port: int = typer.Option(8765, "--port", help="Port for the worker agent"), port: int = typer.Option(8765, "--port", help="Port for the worker agent"),
host: str = typer.Option("0.0.0.0", "--host", help="Bind address for the worker agent"), # nosec B104 host: str = typer.Option("0.0.0.0", "--host", help="Bind address for the worker agent"), # nosec B104
agent_dir: Optional[str] = typer.Option(None, "--agent-dir", help="Worker cert bundle dir (default: ~/.decnet/agent, expanded under the running user's HOME — set this when running as sudo/root)"),
daemon: bool = typer.Option(False, "--daemon", "-d", help="Detach to background as a daemon process"), daemon: bool = typer.Option(False, "--daemon", "-d", help="Detach to background as a daemon process"),
) -> None: ) -> None:
"""Run the DECNET SWARM worker agent (requires a cert bundle in ~/.decnet/agent/).""" """Run the DECNET SWARM worker agent (requires a cert bundle in ~/.decnet/agent/)."""
import pathlib as _pathlib
from decnet.agent import server as _agent_server from decnet.agent import server as _agent_server
from decnet.swarm import pki as _pki
resolved_dir = _pathlib.Path(agent_dir) if agent_dir else _pki.DEFAULT_AGENT_DIR
if daemon: if daemon:
log.info("agent daemonizing host=%s port=%d", host, port) log.info("agent daemonizing host=%s port=%d", host, port)
_daemonize() _daemonize()
log.info("agent command invoked host=%s port=%d", host, port) log.info("agent command invoked host=%s port=%d dir=%s", host, port, resolved_dir)
console.print(f"[green]Starting DECNET worker agent on {host}:{port} (mTLS)...[/]") console.print(f"[green]Starting DECNET worker agent on {host}:{port} (mTLS)...[/]")
rc = _agent_server.run(host, port) rc = _agent_server.run(host, port, agent_dir=resolved_dir)
if rc != 0: if rc != 0:
raise typer.Exit(rc) raise typer.Exit(rc)
@app.command()
def listener(
bind_host: str = typer.Option("0.0.0.0", "--host", help="Bind address for the master syslog-TLS listener"), # nosec B104
bind_port: int = typer.Option(6514, "--port", help="Listener TCP port (RFC 5425 default 6514)"),
log_path: Optional[str] = typer.Option(None, "--log-path", help="RFC 5424 forensic sink (default: ./master.log)"),
json_path: Optional[str] = typer.Option(None, "--json-path", help="Parsed-JSON ingest sink (default: ./master.json)"),
ca_dir: Optional[str] = typer.Option(None, "--ca-dir", help="DECNET CA dir (default: ~/.decnet/ca)"),
daemon: bool = typer.Option(False, "--daemon", "-d", help="Detach to background as a daemon process"),
) -> None:
"""Run the master-side syslog-over-TLS listener (RFC 5425, mTLS)."""
import asyncio
import pathlib
from decnet.swarm import pki
from decnet.swarm.log_listener import ListenerConfig, run_listener
resolved_ca_dir = pathlib.Path(ca_dir) if ca_dir else pki.DEFAULT_CA_DIR
resolved_log = pathlib.Path(log_path) if log_path else pathlib.Path("master.log")
resolved_json = pathlib.Path(json_path) if json_path else pathlib.Path("master.json")
cfg = ListenerConfig(
log_path=resolved_log, json_path=resolved_json,
bind_host=bind_host, bind_port=bind_port, ca_dir=resolved_ca_dir,
)
if daemon:
log.info("listener daemonizing host=%s port=%d", bind_host, bind_port)
_daemonize()
log.info("listener command invoked host=%s port=%d", bind_host, bind_port)
console.print(f"[green]Starting DECNET log listener on {bind_host}:{bind_port} (mTLS)...[/]")
async def _main() -> None:
stop = asyncio.Event()
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
try:
loop.add_signal_handler(sig, stop.set)
except (NotImplementedError, RuntimeError): # pragma: no cover
pass
await run_listener(cfg, stop_event=stop)
try:
asyncio.run(_main())
except KeyboardInterrupt:
pass
@app.command() @app.command()
def forwarder( def forwarder(
master_host: Optional[str] = typer.Option(None, "--master-host", help="Master listener hostname/IP (default: $DECNET_SWARM_MASTER_HOST)"), master_host: Optional[str] = typer.Option(None, "--master-host", help="Master listener hostname/IP (default: $DECNET_SWARM_MASTER_HOST)"),

View File

@@ -16,7 +16,7 @@ dependencies = [
"fastapi>=0.110.0", "fastapi>=0.110.0",
"uvicorn>=0.29.0", "uvicorn>=0.29.0",
"aiosqlite>=0.20.0", "aiosqlite>=0.20.0",
"asyncmy>=0.2.9", "asyncmy>=0.2.11",
"PyJWT>=2.8.0", "PyJWT>=2.8.0",
"bcrypt>=4.1.0", "bcrypt>=4.1.0",
"psutil>=5.9.0", "psutil>=5.9.0",