Add Service Bus page
Documents the host-local UNIX-socket pub/sub transport: architecture, topic hierarchy, env vars, code examples (publish/subscribe + get_app_bus for the API), at-most-once delivery semantics, the live-topology-mutation worked example, testing helpers, and troubleshooting. Linked from the user-docs section of the sidebar between Systemd setup and Logging.
372
Service-Bus.md
Normal file
372
Service-Bus.md
Normal file
@@ -0,0 +1,372 @@
|
||||
# Service Bus — host-local pub/sub for DECNET workers
|
||||
|
||||
The DECNET service bus is a **host-local UNIX-domain socket pub/sub
|
||||
transport** that lets independent workers (mutator, correlator,
|
||||
profiler, API, sniffer, …) push events to each other without coupling
|
||||
through the database.
|
||||
|
||||
It is the plumbing that turns DECNET from a pile of cooperating
|
||||
pollers into something that reacts in sub-second when state changes.
|
||||
|
||||
See also: [Design Overview](Design-Overview),
|
||||
[Module reference: workers](Module-Reference-Workers),
|
||||
[Environment variables](Environment-Variables),
|
||||
[Systemd setup](Systemd-Setup).
|
||||
|
||||
---
|
||||
|
||||
## Why a bus
|
||||
|
||||
Every DECNET worker already talks to the database. But the database is
|
||||
a poor fit for "something just happened" — every consumer has to poll,
|
||||
and every poll is a round-trip whether or not there's work.
|
||||
|
||||
The bus gives you a second communication channel alongside the DB:
|
||||
|
||||
| Concern | DB | Bus |
|
||||
|---|---|---|
|
||||
| Source of truth | **yes** | no |
|
||||
| Persistence | durable | in-flight only |
|
||||
| Delivery | exactly-once | at-most-once |
|
||||
| Latency | poll-interval bounded | sub-millisecond |
|
||||
| Survival across restarts | yes | no |
|
||||
|
||||
**Rule of thumb:** state goes in the DB. Notifications that something
|
||||
in the DB just changed go on the bus. If the bus drops a message, the
|
||||
next DB poll picks up the state; nothing is lost, only latency
|
||||
increases.
|
||||
|
||||
This invariant is load-bearing. It's what lets publishers be
|
||||
fire-and-forget, lets the transport discard events under backpressure
|
||||
instead of blocking, and lets consumers treat the bus as a hint rather
|
||||
than a contract.
|
||||
|
||||
---
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
┌─────────────┐ publish ┌────────────┐ subscribe ┌──────────────┐
|
||||
│ mutator │────────────────▶│ decnet bus │──────────────▶│ api (SSE) │
|
||||
└─────────────┘ │ worker │ └──────────────┘
|
||||
│ (UNIX │
|
||||
┌─────────────┐ publish │ socket) │ subscribe ┌──────────────┐
|
||||
│ web api │────────────────▶│ │──────────────▶│ mutator │
|
||||
│ (enqueue) │ └────────────┘ │ (wake-on- │
|
||||
└─────────────┘ │ enqueue) │
|
||||
└──────────────┘
|
||||
```
|
||||
|
||||
`decnet bus` is a standalone systemd-supervised worker. It owns the
|
||||
UNIX socket, multiplexes clients, matches topics against subscription
|
||||
patterns, and fans events out to interested subscribers. Publishers
|
||||
and subscribers are just client processes — the worker doesn't know
|
||||
what each topic means, only how to route it.
|
||||
|
||||
### One bus per host
|
||||
|
||||
The MVP is **host-local only**. Each host runs its own `decnet bus`.
|
||||
Cross-host federation (swarm master ↔ agents over TCP) is deferred to
|
||||
a future `--bridge-tcp` mode that will proxy over the existing swarm
|
||||
mTLS infra if a use case emerges. Today, if you want something on the
|
||||
master to react to an event on an agent, the agent pushes it over the
|
||||
regular swarm API and the master's local bus re-publishes from there.
|
||||
|
||||
### Authorization
|
||||
|
||||
The socket file is created with mode `0660` and owned by the
|
||||
`decnet` group. Authorization is therefore whatever the kernel says
|
||||
about who can `connect()` — anyone in the `decnet` group is trusted.
|
||||
There is no per-topic ACL; if you can connect, you can publish or
|
||||
subscribe to anything.
|
||||
|
||||
For dev boxes without the `decnet` group, pass `--group ""` when
|
||||
starting the worker and the socket stays owned by the current process
|
||||
group.
|
||||
|
||||
---
|
||||
|
||||
## Starting the bus
|
||||
|
||||
### Via systemd (production)
|
||||
|
||||
```bash
|
||||
sudo systemctl enable --now decnet-bus.service
|
||||
```
|
||||
|
||||
The unit file ships with `DECNET_BUS_SOCKET=/run/decnet/bus.sock`,
|
||||
`RuntimeDirectory=decnet`, and `Group=decnet`. Any worker unit that
|
||||
depends on the bus should declare `After=decnet-bus.service` and
|
||||
`Requires=decnet-bus.service`.
|
||||
|
||||
### Manually (dev)
|
||||
|
||||
```bash
|
||||
decnet bus # defaults: /run/decnet/bus.sock, group=decnet
|
||||
decnet bus --socket /tmp/b.sock --group "" --heartbeat 5
|
||||
```
|
||||
|
||||
`--heartbeat N` emits a `system.bus.health` event every N seconds so
|
||||
subscribers can detect a hung worker.
|
||||
|
||||
### Disabled mode
|
||||
|
||||
If you don't want a bus at all (contract-test runs, minimal dev), set:
|
||||
|
||||
```bash
|
||||
export DECNET_BUS_ENABLED=false
|
||||
```
|
||||
|
||||
Every `get_bus()` call returns a `NullBus` — publishes are silently
|
||||
dropped, subscriptions yield nothing. Workers start cleanly; features
|
||||
that depend on sub-second push fall back to their poll-interval
|
||||
fallback.
|
||||
|
||||
---
|
||||
|
||||
## Topic hierarchy
|
||||
|
||||
Topics are NATS-style dot-separated strings. Two wildcards are
|
||||
supported in subscription patterns:
|
||||
|
||||
- `*` matches exactly one token.
|
||||
- `>` matches one-or-more trailing tokens.
|
||||
|
||||
So `topology.>` matches `topology.abc.status` but not the bare
|
||||
`topology`; `topology.*.mutation.applied` matches every topology's
|
||||
`applied` event; `topology.abc.mutation.*` matches every mutation
|
||||
state for one topology.
|
||||
|
||||
Current topic families:
|
||||
|
||||
| Topic pattern | Emitted by | Example payload |
|
||||
|---|---|---|
|
||||
| `topology.{id}.mutation.enqueued` | API (`POST /mutations`) | `{mutation_id, op, payload}` |
|
||||
| `topology.{id}.mutation.applying` | Mutator | `{mutation_id, op, payload}` |
|
||||
| `topology.{id}.mutation.applied` | Mutator | `{mutation_id, op}` |
|
||||
| `topology.{id}.mutation.failed` | Mutator | `{mutation_id, op, reason}` |
|
||||
| `topology.{id}.status` | Mutator | `{state, reason}` |
|
||||
| `decky.{id}.state` | _reserved_ | — |
|
||||
| `decky.{id}.traffic` | _reserved_ | — |
|
||||
| `attacker.observed` | _reserved_ | — |
|
||||
| `system.log` | _reserved_ | — |
|
||||
| `system.bus.health` | Bus worker heartbeat | `{ts, uptime_s}` |
|
||||
|
||||
Topic builders live in `decnet.bus.topics`. **Always** use the helpers
|
||||
(`topology_mutation(tid, state)`, `topology_status(tid)`, …) — they
|
||||
reject segments containing `.`, `*`, `>`, or whitespace so a bogus
|
||||
topology id can't silently corrupt the hierarchy.
|
||||
|
||||
Renaming an existing topic is a breaking change for every subscriber.
|
||||
Adding a new family is safe.
|
||||
|
||||
---
|
||||
|
||||
## Environment variables
|
||||
|
||||
| Variable | Default | Meaning |
|
||||
|---|---|---|
|
||||
| `DECNET_BUS_ENABLED` | `true` | Set `false` to short-circuit `get_bus()` to `NullBus`. |
|
||||
| `DECNET_BUS_TYPE` | `unix` | `unix` or `fake`. `fake` is the in-process transport used by tests. |
|
||||
| `DECNET_BUS_SOCKET` | `/run/decnet/bus.sock` (falls back to `~/.decnet/bus.sock`) | Path to the UNIX socket. |
|
||||
| `DECNET_BUS_GROUP` | `decnet` | Group that owns the socket file. `""` disables chown. |
|
||||
|
||||
See [Environment variables](Environment-Variables) for the full list.
|
||||
|
||||
---
|
||||
|
||||
## Using the bus from code
|
||||
|
||||
Always use the factory. **Never** import `UnixSocketBus`, `FakeBus`,
|
||||
or `NullBus` directly — mirrors the `get_repository()` convention.
|
||||
|
||||
### Publishing
|
||||
|
||||
```python
|
||||
from decnet.bus.factory import get_bus
|
||||
from decnet.bus import topics
|
||||
|
||||
bus = get_bus(client_name="mutator")
|
||||
await bus.connect()
|
||||
|
||||
await bus.publish(
|
||||
topics.topology_mutation(topology_id, topics.MUTATION_APPLIED),
|
||||
{"mutation_id": mutation_id, "op": "add_lan"},
|
||||
event_type=topics.MUTATION_APPLIED,
|
||||
)
|
||||
```
|
||||
|
||||
Publishers are fire-and-forget. Wrap publishes in a
|
||||
`try/except Exception as exc: log.warning(...)` — a bus failure must
|
||||
never break the caller, because the DB write already happened.
|
||||
|
||||
### Subscribing
|
||||
|
||||
```python
|
||||
from decnet.bus.factory import get_bus
|
||||
|
||||
bus = get_bus(client_name="my-worker")
|
||||
await bus.connect()
|
||||
|
||||
sub = bus.subscribe("topology.*.mutation.applied")
|
||||
async with sub:
|
||||
async for event in sub:
|
||||
print(event.topic, event.payload)
|
||||
```
|
||||
|
||||
Subscriptions are async iterators. Wrap them in `async with` so the
|
||||
client unregisters itself cleanly on exit.
|
||||
|
||||
### Inside the web API
|
||||
|
||||
Request handlers use a shared singleton so the bus socket isn't
|
||||
opened per request:
|
||||
|
||||
```python
|
||||
from decnet.bus.app import get_app_bus
|
||||
|
||||
bus = await get_app_bus() # None if bus is disabled or down
|
||||
if bus is not None:
|
||||
await bus.publish(...)
|
||||
```
|
||||
|
||||
`get_app_bus()` returns `None` on connection failure and remembers
|
||||
the failure for the process lifetime — the API stays healthy without
|
||||
a bus.
|
||||
|
||||
---
|
||||
|
||||
## Delivery guarantees
|
||||
|
||||
**At-most-once. Fire-and-forget.** A published event may be delivered
|
||||
to every matching subscriber, some of them, or none.
|
||||
|
||||
Specifically, the bus drops events when:
|
||||
|
||||
- No subscribers match the topic at publish time.
|
||||
- A subscriber's per-connection queue is full (default 1024 events);
|
||||
the oldest event is discarded to make room. This is **drop-oldest**
|
||||
backpressure — the invariant is that a slow consumer cannot stall
|
||||
publishers.
|
||||
- The bus worker is restarting or the client loses its socket
|
||||
connection. In-flight events are lost.
|
||||
|
||||
**These are features, not bugs.** The DB remains source of truth, so
|
||||
the worst-case effect of a dropped event is up to ~N seconds of
|
||||
latency until the next poll. In exchange, publishers never block,
|
||||
never retry, and never have to reason about ordering across
|
||||
subscribers.
|
||||
|
||||
If you need durable delivery, write to the DB instead and let
|
||||
consumers poll. Use the bus to shorten the poll interval from
|
||||
seconds to milliseconds, not to replace it.
|
||||
|
||||
---
|
||||
|
||||
## Worked example: live topology mutations
|
||||
|
||||
The mutator + web editor flow is the canonical consumer of the bus
|
||||
today. See [MazeNET](MazeNET) for the feature-level description.
|
||||
|
||||
1. Operator drags a LAN in the MazeNET editor.
|
||||
2. Frontend posts `POST /api/v1/topologies/{id}/mutations`. The API
|
||||
writes a `TopologyMutation` row in state `pending`, then publishes
|
||||
`topology.{id}.mutation.enqueued` on the bus.
|
||||
3. The `decnet mutate` worker is subscribed to
|
||||
`topology.*.mutation.enqueued`. The enqueue event flips an
|
||||
`asyncio.Event` that cancels its 10-second `asyncio.sleep`, and
|
||||
the reconciler re-enters immediately.
|
||||
4. The reconciler claims the row atomically, publishes
|
||||
`topology.{id}.mutation.applying`, dispatches to the op, then
|
||||
publishes `topology.{id}.mutation.applied` or `.failed`.
|
||||
5. The SSE route `GET /api/v1/topologies/{id}/events` forwards every
|
||||
matching topic as an SSE frame to the connected browser. The
|
||||
editor refetches on `applied`/`failed`/`status` and the header
|
||||
flips between LIVE and CONNECTING….
|
||||
|
||||
If the bus drops any one of those events, the 10-second mutator
|
||||
poll picks up the pending row (step 3 falls back gracefully) and the
|
||||
editor's next REFRESH button press resyncs (step 5 falls back
|
||||
gracefully). The feature works with a degraded bus — just slower.
|
||||
|
||||
---
|
||||
|
||||
## Testing
|
||||
|
||||
The bus ships with two implementations:
|
||||
|
||||
- `UnixSocketBus` (production) — talks to a real `decnet bus` worker
|
||||
over a UNIX socket.
|
||||
- `FakeBus` (tests) — in-process pub/sub, no socket. Select via
|
||||
`DECNET_BUS_TYPE=fake` or by instantiating directly in a pytest
|
||||
fixture.
|
||||
|
||||
Standard test fixtures live in `tests/bus/conftest.py`:
|
||||
|
||||
```python
|
||||
from decnet.bus.fake import FakeBus
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def fake_bus():
|
||||
bus = FakeBus()
|
||||
await bus.connect()
|
||||
yield bus
|
||||
await bus.close()
|
||||
```
|
||||
|
||||
For end-to-end smoke, `scripts/bus/smoke.sh` boots a real bus worker,
|
||||
publishes one event, and asserts a subscriber sees it.
|
||||
`scripts/bus/smoke-mutator.sh` does the same for the four
|
||||
mutator-family topics.
|
||||
|
||||
---
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### `bus: unavailable` in worker logs
|
||||
|
||||
Most common causes:
|
||||
|
||||
1. The `decnet-bus.service` is not running on the host. Check
|
||||
`systemctl status decnet-bus`.
|
||||
2. The worker process isn't in the `decnet` group. `id $(whoami)` —
|
||||
if `decnet` is missing, `usermod -aG decnet $(whoami)` and log
|
||||
out/in, or pass `--group ""` for dev.
|
||||
3. `DECNET_BUS_SOCKET` disagrees between the worker and the bus.
|
||||
Make sure both read the same env file.
|
||||
|
||||
A missing bus is non-fatal: the mutator drops back to its 10-second
|
||||
poll, the API SSE route idles on keepalives. But you've lost the
|
||||
sub-second push — fix it.
|
||||
|
||||
### SSE stream connects but events never arrive
|
||||
|
||||
If the editor header says `CONNECTING…` but never flips to `LIVE`,
|
||||
the SSE route opened but no events have been forwarded. Check in
|
||||
order:
|
||||
|
||||
1. Is the topology actually `active` or `degraded`? The editor only
|
||||
streams for live topologies.
|
||||
2. Is the mutator worker running? Publishes come from there.
|
||||
3. Is the bus worker reachable from both the API process and the
|
||||
mutator? `scripts/bus/smoke-mutator.sh` narrows this down in a
|
||||
few seconds.
|
||||
|
||||
### Dropped events under load
|
||||
|
||||
The per-subscriber queue is 1024. If you see
|
||||
`bus.fake: subscriber queue full, dropped …` in logs, your
|
||||
subscriber is slower than the publisher. Either speed up the
|
||||
subscriber's loop body or accept the drop — remember, DB is source
|
||||
of truth.
|
||||
|
||||
---
|
||||
|
||||
## Future work
|
||||
|
||||
- `decnet bus --bridge-tcp` for cross-host pub/sub (deferred).
|
||||
- Migrating the dashboard `/stream` endpoint off its internal poll
|
||||
loop onto the bus (a bus consumer fanning out to SSE, parallel to
|
||||
the topology events route).
|
||||
- First decky/attacker consumers: live-visualization pulsation
|
||||
driven by `decky.{id}.traffic` and `attacker.observed`.
|
||||
@@ -25,6 +25,7 @@
|
||||
- [Database-Drivers](Database-Drivers)
|
||||
- [Systemd-Setup](Systemd-Setup)
|
||||
- [Logging-and-Syslog](Logging-and-Syslog)
|
||||
- [Service-Bus](Service-Bus)
|
||||
- [Web-Dashboard](Web-Dashboard)
|
||||
- [REST-API-Reference](REST-API-Reference)
|
||||
- [Mutation-and-Randomization](Mutation-and-Randomization)
|
||||
|
||||
Reference in New Issue
Block a user