chore(bus): add scripts/bus/ smoke + manual test helpers

start.sh boots a local bus on /tmp (no root, no decnet group).
sub.py / pub.py are thin CLIs over UnixSocketBus for manual poking.
smoke.sh is a self-contained end-to-end check — spawns a worker,
subscribes, publishes, asserts delivery, cleans up.
This commit is contained in:
2026-04-21 14:03:30 -04:00
parent fbf289ff63
commit f0349632c3
4 changed files with 150 additions and 0 deletions

44
scripts/bus/pub.py Executable file
View File

@@ -0,0 +1,44 @@
#!/usr/bin/env python3
"""Publish a single event to the local DECNET bus.
Usage: scripts/bus/pub.py <topic> [json-payload] [--type EVENT_TYPE]
Examples:
scripts/bus/pub.py topology.abc.status '{"state": "active"}'
scripts/bus/pub.py topology.abc.mutation.applied '{"id": 1}' --type applied
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
from decnet.bus.unix_client import UnixSocketBus
async def main(topic: str, payload: dict, event_type: str) -> None:
sock = os.environ.get("DECNET_BUS_SOCKET", "/tmp/decnet-bus.sock")
client = UnixSocketBus(sock, client_name="scripts-pub")
await client.connect()
try:
await client.publish(topic, payload, event_type=event_type)
print(f"pub: {topic} type={event_type!r} payload={payload}")
finally:
await client.close()
if __name__ == "__main__":
ap = argparse.ArgumentParser()
ap.add_argument("topic")
ap.add_argument("payload", nargs="?", default="{}", help="JSON object (default {})")
ap.add_argument("--type", dest="event_type", default="", help="optional event_type tag")
args = ap.parse_args()
try:
payload = json.loads(args.payload)
except json.JSONDecodeError as exc:
raise SystemExit(f"pub: payload is not valid JSON: {exc}")
if not isinstance(payload, dict):
raise SystemExit("pub: payload must be a JSON object")
asyncio.run(main(args.topic, payload, args.event_type))

57
scripts/bus/smoke.sh Executable file
View File

@@ -0,0 +1,57 @@
#!/usr/bin/env bash
# End-to-end bus smoke test: boots a worker, subscribes, publishes,
# verifies the event lands, tears everything down. Exits non-zero if
# anything misbehaves.
#
# Usage: scripts/bus/smoke.sh
set -euo pipefail
SOCK="$(mktemp -u -t decnet-bus-smoke.XXXXXX.sock)"
export DECNET_BUS_SOCKET="${SOCK}"
LOGDIR="$(mktemp -d -t decnet-bus-smoke.XXXXXX)"
trap 'rm -f "${SOCK}"; rm -rf "${LOGDIR}"' EXIT
HERE="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
echo "smoke: socket=${SOCK}"
decnet bus --socket "${SOCK}" --group "" --heartbeat 1 \
> "${LOGDIR}/worker.log" 2>&1 &
WORKER_PID=$!
trap 'kill ${WORKER_PID} 2>/dev/null || true; wait ${WORKER_PID} 2>/dev/null || true; rm -f "${SOCK}"; rm -rf "${LOGDIR}"' EXIT
# Wait for the socket to exist.
for _ in {1..40}; do
[[ -S "${SOCK}" ]] && break
sleep 0.05
done
if [[ ! -S "${SOCK}" ]]; then
echo "smoke: FAIL — worker never created ${SOCK}" >&2
cat "${LOGDIR}/worker.log" >&2
exit 1
fi
# Subscriber in the background, redirected to a file we can tail.
python "${HERE}/sub.py" 'topology.>' > "${LOGDIR}/sub.log" 2>&1 &
SUB_PID=$!
trap 'kill ${SUB_PID} 2>/dev/null || true; kill ${WORKER_PID} 2>/dev/null || true; wait 2>/dev/null || true; rm -f "${SOCK}"; rm -rf "${LOGDIR}"' EXIT
# Give the SUB frame a tick to register.
sleep 0.3
python "${HERE}/pub.py" topology.abc.status '{"state": "active"}' >/dev/null
# Wait up to 2s for the event to show up.
for _ in {1..40}; do
if grep -q 'topology.abc.status' "${LOGDIR}/sub.log"; then
echo "smoke: OK — subscriber received event"
grep 'topology.abc.status' "${LOGDIR}/sub.log"
exit 0
fi
sleep 0.05
done
echo "smoke: FAIL — subscriber never saw the event" >&2
echo "--- worker.log ---" >&2; cat "${LOGDIR}/worker.log" >&2
echo "--- sub.log ---" >&2; cat "${LOGDIR}/sub.log" >&2
exit 1

11
scripts/bus/start.sh Executable file
View File

@@ -0,0 +1,11 @@
#!/usr/bin/env bash
# Start a local `decnet bus` worker for manual smoke-testing.
# Uses /tmp so it works without root and without the `decnet` POSIX group.
# Usage: scripts/bus/start.sh [heartbeat-seconds]
set -euo pipefail
SOCK="${DECNET_BUS_SOCKET:-/tmp/decnet-bus.sock}"
HEARTBEAT="${1:-3}"
echo "bus: socket=${SOCK} heartbeat=${HEARTBEAT}s (Ctrl-C to stop)"
exec decnet bus --socket "${SOCK}" --group "" --heartbeat "${HEARTBEAT}"

38
scripts/bus/sub.py Executable file
View File

@@ -0,0 +1,38 @@
#!/usr/bin/env python3
"""Subscribe to a pattern on the local DECNET bus and print events.
Usage: scripts/bus/sub.py 'topology.>'
scripts/bus/sub.py 'system.bus.health'
DECNET_BUS_SOCKET=/tmp/decnet-bus.sock scripts/bus/sub.py 'topology.*.status'
"""
from __future__ import annotations
import asyncio
import os
import sys
from decnet.bus.unix_client import UnixSocketBus
async def main(pattern: str) -> None:
sock = os.environ.get("DECNET_BUS_SOCKET", "/tmp/decnet-bus.sock")
client = UnixSocketBus(sock, client_name="scripts-sub")
await client.connect()
sub = client.subscribe(pattern)
print(f"sub: pattern={pattern!r} socket={sock} (Ctrl-C to stop)", flush=True)
try:
async with sub:
async for ev in sub:
print(f"{ev.topic} type={ev.type!r} payload={ev.payload}", flush=True)
finally:
await client.close()
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: sub.py <pattern>", file=sys.stderr)
sys.exit(2)
try:
asyncio.run(main(sys.argv[1]))
except KeyboardInterrupt:
pass