1
Service Bus
anti edited this page 2026-04-21 14:41:18 -04:00

Service Bus — host-local pub/sub for DECNET workers

The DECNET service bus is a host-local UNIX-domain socket pub/sub transport that lets independent workers (mutator, correlator, profiler, API, sniffer, …) push events to each other without coupling through the database.

It is the plumbing that turns DECNET from a pile of cooperating pollers into something that reacts in sub-second when state changes.

See also: Design Overview, Module reference: workers, Environment variables, Systemd setup.


Why a bus

Every DECNET worker already talks to the database. But the database is a poor fit for "something just happened" — every consumer has to poll, and every poll is a round-trip whether or not there's work.

The bus gives you a second communication channel alongside the DB:

Concern DB Bus
Source of truth yes no
Persistence durable in-flight only
Delivery exactly-once at-most-once
Latency poll-interval bounded sub-millisecond
Survival across restarts yes no

Rule of thumb: state goes in the DB. Notifications that something in the DB just changed go on the bus. If the bus drops a message, the next DB poll picks up the state; nothing is lost, only latency increases.

This invariant is load-bearing. It's what lets publishers be fire-and-forget, lets the transport discard events under backpressure instead of blocking, and lets consumers treat the bus as a hint rather than a contract.


Architecture

┌─────────────┐     publish     ┌────────────┐  subscribe   ┌──────────────┐
│  mutator    │────────────────▶│ decnet bus │──────────────▶│   api (SSE)  │
└─────────────┘                 │  worker    │               └──────────────┘
                                │  (UNIX      │
┌─────────────┐     publish     │  socket)   │  subscribe   ┌──────────────┐
│ web api     │────────────────▶│            │──────────────▶│   mutator    │
│ (enqueue)   │                 └────────────┘               │ (wake-on-    │
└─────────────┘                                              │  enqueue)    │
                                                             └──────────────┘

decnet bus is a standalone systemd-supervised worker. It owns the UNIX socket, multiplexes clients, matches topics against subscription patterns, and fans events out to interested subscribers. Publishers and subscribers are just client processes — the worker doesn't know what each topic means, only how to route it.

One bus per host

The MVP is host-local only. Each host runs its own decnet bus. Cross-host federation (swarm master ↔ agents over TCP) is deferred to a future --bridge-tcp mode that will proxy over the existing swarm mTLS infra if a use case emerges. Today, if you want something on the master to react to an event on an agent, the agent pushes it over the regular swarm API and the master's local bus re-publishes from there.

Authorization

The socket file is created with mode 0660 and owned by the decnet group. Authorization is therefore whatever the kernel says about who can connect() — anyone in the decnet group is trusted. There is no per-topic ACL; if you can connect, you can publish or subscribe to anything.

For dev boxes without the decnet group, pass --group "" when starting the worker and the socket stays owned by the current process group.


Starting the bus

Via systemd (production)

sudo systemctl enable --now decnet-bus.service

The unit file ships with DECNET_BUS_SOCKET=/run/decnet/bus.sock, RuntimeDirectory=decnet, and Group=decnet. Any worker unit that depends on the bus should declare After=decnet-bus.service and Requires=decnet-bus.service.

Manually (dev)

decnet bus                    # defaults: /run/decnet/bus.sock, group=decnet
decnet bus --socket /tmp/b.sock --group "" --heartbeat 5

--heartbeat N emits a system.bus.health event every N seconds so subscribers can detect a hung worker.

Disabled mode

If you don't want a bus at all (contract-test runs, minimal dev), set:

export DECNET_BUS_ENABLED=false

Every get_bus() call returns a NullBus — publishes are silently dropped, subscriptions yield nothing. Workers start cleanly; features that depend on sub-second push fall back to their poll-interval fallback.


Topic hierarchy

Topics are NATS-style dot-separated strings. Two wildcards are supported in subscription patterns:

  • * matches exactly one token.
  • > matches one-or-more trailing tokens.

So topology.> matches topology.abc.status but not the bare topology; topology.*.mutation.applied matches every topology's applied event; topology.abc.mutation.* matches every mutation state for one topology.

Current topic families:

Topic pattern Emitted by Example payload
topology.{id}.mutation.enqueued API (POST /mutations) {mutation_id, op, payload}
topology.{id}.mutation.applying Mutator {mutation_id, op, payload}
topology.{id}.mutation.applied Mutator {mutation_id, op}
topology.{id}.mutation.failed Mutator {mutation_id, op, reason}
topology.{id}.status Mutator {state, reason}
decky.{id}.state reserved
decky.{id}.traffic reserved
attacker.observed reserved
system.log reserved
system.bus.health Bus worker heartbeat {ts, uptime_s}

Topic builders live in decnet.bus.topics. Always use the helpers (topology_mutation(tid, state), topology_status(tid), …) — they reject segments containing ., *, >, or whitespace so a bogus topology id can't silently corrupt the hierarchy.

Renaming an existing topic is a breaking change for every subscriber. Adding a new family is safe.


Environment variables

Variable Default Meaning
DECNET_BUS_ENABLED true Set false to short-circuit get_bus() to NullBus.
DECNET_BUS_TYPE unix unix or fake. fake is the in-process transport used by tests.
DECNET_BUS_SOCKET /run/decnet/bus.sock (falls back to ~/.decnet/bus.sock) Path to the UNIX socket.
DECNET_BUS_GROUP decnet Group that owns the socket file. "" disables chown.

See Environment variables for the full list.


Using the bus from code

Always use the factory. Never import UnixSocketBus, FakeBus, or NullBus directly — mirrors the get_repository() convention.

Publishing

from decnet.bus.factory import get_bus
from decnet.bus import topics

bus = get_bus(client_name="mutator")
await bus.connect()

await bus.publish(
    topics.topology_mutation(topology_id, topics.MUTATION_APPLIED),
    {"mutation_id": mutation_id, "op": "add_lan"},
    event_type=topics.MUTATION_APPLIED,
)

Publishers are fire-and-forget. Wrap publishes in a try/except Exception as exc: log.warning(...) — a bus failure must never break the caller, because the DB write already happened.

Subscribing

from decnet.bus.factory import get_bus

bus = get_bus(client_name="my-worker")
await bus.connect()

sub = bus.subscribe("topology.*.mutation.applied")
async with sub:
    async for event in sub:
        print(event.topic, event.payload)

Subscriptions are async iterators. Wrap them in async with so the client unregisters itself cleanly on exit.

Inside the web API

Request handlers use a shared singleton so the bus socket isn't opened per request:

from decnet.bus.app import get_app_bus

bus = await get_app_bus()          # None if bus is disabled or down
if bus is not None:
    await bus.publish(...)

get_app_bus() returns None on connection failure and remembers the failure for the process lifetime — the API stays healthy without a bus.


Delivery guarantees

At-most-once. Fire-and-forget. A published event may be delivered to every matching subscriber, some of them, or none.

Specifically, the bus drops events when:

  • No subscribers match the topic at publish time.
  • A subscriber's per-connection queue is full (default 1024 events); the oldest event is discarded to make room. This is drop-oldest backpressure — the invariant is that a slow consumer cannot stall publishers.
  • The bus worker is restarting or the client loses its socket connection. In-flight events are lost.

These are features, not bugs. The DB remains source of truth, so the worst-case effect of a dropped event is up to ~N seconds of latency until the next poll. In exchange, publishers never block, never retry, and never have to reason about ordering across subscribers.

If you need durable delivery, write to the DB instead and let consumers poll. Use the bus to shorten the poll interval from seconds to milliseconds, not to replace it.


Worked example: live topology mutations

The mutator + web editor flow is the canonical consumer of the bus today. See MazeNET for the feature-level description.

  1. Operator drags a LAN in the MazeNET editor.
  2. Frontend posts POST /api/v1/topologies/{id}/mutations. The API writes a TopologyMutation row in state pending, then publishes topology.{id}.mutation.enqueued on the bus.
  3. The decnet mutate worker is subscribed to topology.*.mutation.enqueued. The enqueue event flips an asyncio.Event that cancels its 10-second asyncio.sleep, and the reconciler re-enters immediately.
  4. The reconciler claims the row atomically, publishes topology.{id}.mutation.applying, dispatches to the op, then publishes topology.{id}.mutation.applied or .failed.
  5. The SSE route GET /api/v1/topologies/{id}/events forwards every matching topic as an SSE frame to the connected browser. The editor refetches on applied/failed/status and the header flips between LIVE and CONNECTING….

If the bus drops any one of those events, the 10-second mutator poll picks up the pending row (step 3 falls back gracefully) and the editor's next REFRESH button press resyncs (step 5 falls back gracefully). The feature works with a degraded bus — just slower.


Testing

The bus ships with two implementations:

  • UnixSocketBus (production) — talks to a real decnet bus worker over a UNIX socket.
  • FakeBus (tests) — in-process pub/sub, no socket. Select via DECNET_BUS_TYPE=fake or by instantiating directly in a pytest fixture.

Standard test fixtures live in tests/bus/conftest.py:

from decnet.bus.fake import FakeBus

@pytest_asyncio.fixture
async def fake_bus():
    bus = FakeBus()
    await bus.connect()
    yield bus
    await bus.close()

For end-to-end smoke, scripts/bus/smoke.sh boots a real bus worker, publishes one event, and asserts a subscriber sees it. scripts/bus/smoke-mutator.sh does the same for the four mutator-family topics.


Troubleshooting

bus: unavailable in worker logs

Most common causes:

  1. The decnet-bus.service is not running on the host. Check systemctl status decnet-bus.
  2. The worker process isn't in the decnet group. id $(whoami) — if decnet is missing, usermod -aG decnet $(whoami) and log out/in, or pass --group "" for dev.
  3. DECNET_BUS_SOCKET disagrees between the worker and the bus. Make sure both read the same env file.

A missing bus is non-fatal: the mutator drops back to its 10-second poll, the API SSE route idles on keepalives. But you've lost the sub-second push — fix it.

SSE stream connects but events never arrive

If the editor header says CONNECTING… but never flips to LIVE, the SSE route opened but no events have been forwarded. Check in order:

  1. Is the topology actually active or degraded? The editor only streams for live topologies.
  2. Is the mutator worker running? Publishes come from there.
  3. Is the bus worker reachable from both the API process and the mutator? scripts/bus/smoke-mutator.sh narrows this down in a few seconds.

Dropped events under load

The per-subscriber queue is 1024. If you see bus.fake: subscriber queue full, dropped … in logs, your subscriber is slower than the publisher. Either speed up the subscriber's loop body or accept the drop — remember, DB is source of truth.


Future work

  • decnet bus --bridge-tcp for cross-host pub/sub (deferred).
  • Migrating the dashboard /stream endpoint off its internal poll loop onto the bus (a bus consumer fanning out to SSE, parallel to the topology events route).
  • First decky/attacker consumers: live-visualization pulsation driven by decky.{id}.traffic and attacker.observed.