diff --git a/scripts/bus/pub.py b/scripts/bus/pub.py new file mode 100755 index 00000000..a4b11edb --- /dev/null +++ b/scripts/bus/pub.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python3 +"""Publish a single event to the local DECNET bus. + +Usage: scripts/bus/pub.py [json-payload] [--type EVENT_TYPE] +Examples: + scripts/bus/pub.py topology.abc.status '{"state": "active"}' + scripts/bus/pub.py topology.abc.mutation.applied '{"id": 1}' --type applied +""" +from __future__ import annotations + +import argparse +import asyncio +import json +import os + +from decnet.bus.unix_client import UnixSocketBus + + +async def main(topic: str, payload: dict, event_type: str) -> None: + sock = os.environ.get("DECNET_BUS_SOCKET", "/tmp/decnet-bus.sock") + client = UnixSocketBus(sock, client_name="scripts-pub") + await client.connect() + try: + await client.publish(topic, payload, event_type=event_type) + print(f"pub: {topic} type={event_type!r} payload={payload}") + finally: + await client.close() + + +if __name__ == "__main__": + ap = argparse.ArgumentParser() + ap.add_argument("topic") + ap.add_argument("payload", nargs="?", default="{}", help="JSON object (default {})") + ap.add_argument("--type", dest="event_type", default="", help="optional event_type tag") + args = ap.parse_args() + + try: + payload = json.loads(args.payload) + except json.JSONDecodeError as exc: + raise SystemExit(f"pub: payload is not valid JSON: {exc}") + if not isinstance(payload, dict): + raise SystemExit("pub: payload must be a JSON object") + + asyncio.run(main(args.topic, payload, args.event_type)) diff --git a/scripts/bus/smoke.sh b/scripts/bus/smoke.sh new file mode 100755 index 00000000..b8fe48a8 --- /dev/null +++ b/scripts/bus/smoke.sh @@ -0,0 +1,57 @@ +#!/usr/bin/env bash +# End-to-end bus smoke test: boots a worker, subscribes, publishes, +# verifies the event lands, tears everything down. Exits non-zero if +# anything misbehaves. +# +# Usage: scripts/bus/smoke.sh +set -euo pipefail + +SOCK="$(mktemp -u -t decnet-bus-smoke.XXXXXX.sock)" +export DECNET_BUS_SOCKET="${SOCK}" +LOGDIR="$(mktemp -d -t decnet-bus-smoke.XXXXXX)" +trap 'rm -f "${SOCK}"; rm -rf "${LOGDIR}"' EXIT + +HERE="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +echo "smoke: socket=${SOCK}" + +decnet bus --socket "${SOCK}" --group "" --heartbeat 1 \ + > "${LOGDIR}/worker.log" 2>&1 & +WORKER_PID=$! +trap 'kill ${WORKER_PID} 2>/dev/null || true; wait ${WORKER_PID} 2>/dev/null || true; rm -f "${SOCK}"; rm -rf "${LOGDIR}"' EXIT + +# Wait for the socket to exist. +for _ in {1..40}; do + [[ -S "${SOCK}" ]] && break + sleep 0.05 +done +if [[ ! -S "${SOCK}" ]]; then + echo "smoke: FAIL — worker never created ${SOCK}" >&2 + cat "${LOGDIR}/worker.log" >&2 + exit 1 +fi + +# Subscriber in the background, redirected to a file we can tail. +python "${HERE}/sub.py" 'topology.>' > "${LOGDIR}/sub.log" 2>&1 & +SUB_PID=$! +trap 'kill ${SUB_PID} 2>/dev/null || true; kill ${WORKER_PID} 2>/dev/null || true; wait 2>/dev/null || true; rm -f "${SOCK}"; rm -rf "${LOGDIR}"' EXIT + +# Give the SUB frame a tick to register. +sleep 0.3 + +python "${HERE}/pub.py" topology.abc.status '{"state": "active"}' >/dev/null + +# Wait up to 2s for the event to show up. +for _ in {1..40}; do + if grep -q 'topology.abc.status' "${LOGDIR}/sub.log"; then + echo "smoke: OK — subscriber received event" + grep 'topology.abc.status' "${LOGDIR}/sub.log" + exit 0 + fi + sleep 0.05 +done + +echo "smoke: FAIL — subscriber never saw the event" >&2 +echo "--- worker.log ---" >&2; cat "${LOGDIR}/worker.log" >&2 +echo "--- sub.log ---" >&2; cat "${LOGDIR}/sub.log" >&2 +exit 1 diff --git a/scripts/bus/start.sh b/scripts/bus/start.sh new file mode 100755 index 00000000..d4b8bd7b --- /dev/null +++ b/scripts/bus/start.sh @@ -0,0 +1,11 @@ +#!/usr/bin/env bash +# Start a local `decnet bus` worker for manual smoke-testing. +# Uses /tmp so it works without root and without the `decnet` POSIX group. +# Usage: scripts/bus/start.sh [heartbeat-seconds] +set -euo pipefail + +SOCK="${DECNET_BUS_SOCKET:-/tmp/decnet-bus.sock}" +HEARTBEAT="${1:-3}" + +echo "bus: socket=${SOCK} heartbeat=${HEARTBEAT}s (Ctrl-C to stop)" +exec decnet bus --socket "${SOCK}" --group "" --heartbeat "${HEARTBEAT}" diff --git a/scripts/bus/sub.py b/scripts/bus/sub.py new file mode 100755 index 00000000..95ceec33 --- /dev/null +++ b/scripts/bus/sub.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 +"""Subscribe to a pattern on the local DECNET bus and print events. + +Usage: scripts/bus/sub.py 'topology.>' + scripts/bus/sub.py 'system.bus.health' + DECNET_BUS_SOCKET=/tmp/decnet-bus.sock scripts/bus/sub.py 'topology.*.status' +""" +from __future__ import annotations + +import asyncio +import os +import sys + +from decnet.bus.unix_client import UnixSocketBus + + +async def main(pattern: str) -> None: + sock = os.environ.get("DECNET_BUS_SOCKET", "/tmp/decnet-bus.sock") + client = UnixSocketBus(sock, client_name="scripts-sub") + await client.connect() + sub = client.subscribe(pattern) + print(f"sub: pattern={pattern!r} socket={sock} (Ctrl-C to stop)", flush=True) + try: + async with sub: + async for ev in sub: + print(f"{ev.topic} type={ev.type!r} payload={ev.payload}", flush=True) + finally: + await client.close() + + +if __name__ == "__main__": + if len(sys.argv) != 2: + print("Usage: sub.py ", file=sys.stderr) + sys.exit(2) + try: + asyncio.run(main(sys.argv[1])) + except KeyboardInterrupt: + pass