diff --git a/Service-Bus.md b/Service-Bus.md new file mode 100644 index 0000000..fa34e2e --- /dev/null +++ b/Service-Bus.md @@ -0,0 +1,372 @@ +# Service Bus — host-local pub/sub for DECNET workers + +The DECNET service bus is a **host-local UNIX-domain socket pub/sub +transport** that lets independent workers (mutator, correlator, +profiler, API, sniffer, …) push events to each other without coupling +through the database. + +It is the plumbing that turns DECNET from a pile of cooperating +pollers into something that reacts in sub-second when state changes. + +See also: [Design Overview](Design-Overview), +[Module reference: workers](Module-Reference-Workers), +[Environment variables](Environment-Variables), +[Systemd setup](Systemd-Setup). + +--- + +## Why a bus + +Every DECNET worker already talks to the database. But the database is +a poor fit for "something just happened" — every consumer has to poll, +and every poll is a round-trip whether or not there's work. + +The bus gives you a second communication channel alongside the DB: + +| Concern | DB | Bus | +|---|---|---| +| Source of truth | **yes** | no | +| Persistence | durable | in-flight only | +| Delivery | exactly-once | at-most-once | +| Latency | poll-interval bounded | sub-millisecond | +| Survival across restarts | yes | no | + +**Rule of thumb:** state goes in the DB. Notifications that something +in the DB just changed go on the bus. If the bus drops a message, the +next DB poll picks up the state; nothing is lost, only latency +increases. + +This invariant is load-bearing. It's what lets publishers be +fire-and-forget, lets the transport discard events under backpressure +instead of blocking, and lets consumers treat the bus as a hint rather +than a contract. + +--- + +## Architecture + +``` +┌─────────────┐ publish ┌────────────┐ subscribe ┌──────────────┐ +│ mutator │────────────────▶│ decnet bus │──────────────▶│ api (SSE) │ +└─────────────┘ │ worker │ └──────────────┘ + │ (UNIX │ +┌─────────────┐ publish │ socket) │ subscribe ┌──────────────┐ +│ web api │────────────────▶│ │──────────────▶│ mutator │ +│ (enqueue) │ └────────────┘ │ (wake-on- │ +└─────────────┘ │ enqueue) │ + └──────────────┘ +``` + +`decnet bus` is a standalone systemd-supervised worker. It owns the +UNIX socket, multiplexes clients, matches topics against subscription +patterns, and fans events out to interested subscribers. Publishers +and subscribers are just client processes — the worker doesn't know +what each topic means, only how to route it. + +### One bus per host + +The MVP is **host-local only**. Each host runs its own `decnet bus`. +Cross-host federation (swarm master ↔ agents over TCP) is deferred to +a future `--bridge-tcp` mode that will proxy over the existing swarm +mTLS infra if a use case emerges. Today, if you want something on the +master to react to an event on an agent, the agent pushes it over the +regular swarm API and the master's local bus re-publishes from there. + +### Authorization + +The socket file is created with mode `0660` and owned by the +`decnet` group. Authorization is therefore whatever the kernel says +about who can `connect()` — anyone in the `decnet` group is trusted. +There is no per-topic ACL; if you can connect, you can publish or +subscribe to anything. + +For dev boxes without the `decnet` group, pass `--group ""` when +starting the worker and the socket stays owned by the current process +group. + +--- + +## Starting the bus + +### Via systemd (production) + +```bash +sudo systemctl enable --now decnet-bus.service +``` + +The unit file ships with `DECNET_BUS_SOCKET=/run/decnet/bus.sock`, +`RuntimeDirectory=decnet`, and `Group=decnet`. Any worker unit that +depends on the bus should declare `After=decnet-bus.service` and +`Requires=decnet-bus.service`. + +### Manually (dev) + +```bash +decnet bus # defaults: /run/decnet/bus.sock, group=decnet +decnet bus --socket /tmp/b.sock --group "" --heartbeat 5 +``` + +`--heartbeat N` emits a `system.bus.health` event every N seconds so +subscribers can detect a hung worker. + +### Disabled mode + +If you don't want a bus at all (contract-test runs, minimal dev), set: + +```bash +export DECNET_BUS_ENABLED=false +``` + +Every `get_bus()` call returns a `NullBus` — publishes are silently +dropped, subscriptions yield nothing. Workers start cleanly; features +that depend on sub-second push fall back to their poll-interval +fallback. + +--- + +## Topic hierarchy + +Topics are NATS-style dot-separated strings. Two wildcards are +supported in subscription patterns: + +- `*` matches exactly one token. +- `>` matches one-or-more trailing tokens. + +So `topology.>` matches `topology.abc.status` but not the bare +`topology`; `topology.*.mutation.applied` matches every topology's +`applied` event; `topology.abc.mutation.*` matches every mutation +state for one topology. + +Current topic families: + +| Topic pattern | Emitted by | Example payload | +|---|---|---| +| `topology.{id}.mutation.enqueued` | API (`POST /mutations`) | `{mutation_id, op, payload}` | +| `topology.{id}.mutation.applying` | Mutator | `{mutation_id, op, payload}` | +| `topology.{id}.mutation.applied` | Mutator | `{mutation_id, op}` | +| `topology.{id}.mutation.failed` | Mutator | `{mutation_id, op, reason}` | +| `topology.{id}.status` | Mutator | `{state, reason}` | +| `decky.{id}.state` | _reserved_ | — | +| `decky.{id}.traffic` | _reserved_ | — | +| `attacker.observed` | _reserved_ | — | +| `system.log` | _reserved_ | — | +| `system.bus.health` | Bus worker heartbeat | `{ts, uptime_s}` | + +Topic builders live in `decnet.bus.topics`. **Always** use the helpers +(`topology_mutation(tid, state)`, `topology_status(tid)`, …) — they +reject segments containing `.`, `*`, `>`, or whitespace so a bogus +topology id can't silently corrupt the hierarchy. + +Renaming an existing topic is a breaking change for every subscriber. +Adding a new family is safe. + +--- + +## Environment variables + +| Variable | Default | Meaning | +|---|---|---| +| `DECNET_BUS_ENABLED` | `true` | Set `false` to short-circuit `get_bus()` to `NullBus`. | +| `DECNET_BUS_TYPE` | `unix` | `unix` or `fake`. `fake` is the in-process transport used by tests. | +| `DECNET_BUS_SOCKET` | `/run/decnet/bus.sock` (falls back to `~/.decnet/bus.sock`) | Path to the UNIX socket. | +| `DECNET_BUS_GROUP` | `decnet` | Group that owns the socket file. `""` disables chown. | + +See [Environment variables](Environment-Variables) for the full list. + +--- + +## Using the bus from code + +Always use the factory. **Never** import `UnixSocketBus`, `FakeBus`, +or `NullBus` directly — mirrors the `get_repository()` convention. + +### Publishing + +```python +from decnet.bus.factory import get_bus +from decnet.bus import topics + +bus = get_bus(client_name="mutator") +await bus.connect() + +await bus.publish( + topics.topology_mutation(topology_id, topics.MUTATION_APPLIED), + {"mutation_id": mutation_id, "op": "add_lan"}, + event_type=topics.MUTATION_APPLIED, +) +``` + +Publishers are fire-and-forget. Wrap publishes in a +`try/except Exception as exc: log.warning(...)` — a bus failure must +never break the caller, because the DB write already happened. + +### Subscribing + +```python +from decnet.bus.factory import get_bus + +bus = get_bus(client_name="my-worker") +await bus.connect() + +sub = bus.subscribe("topology.*.mutation.applied") +async with sub: + async for event in sub: + print(event.topic, event.payload) +``` + +Subscriptions are async iterators. Wrap them in `async with` so the +client unregisters itself cleanly on exit. + +### Inside the web API + +Request handlers use a shared singleton so the bus socket isn't +opened per request: + +```python +from decnet.bus.app import get_app_bus + +bus = await get_app_bus() # None if bus is disabled or down +if bus is not None: + await bus.publish(...) +``` + +`get_app_bus()` returns `None` on connection failure and remembers +the failure for the process lifetime — the API stays healthy without +a bus. + +--- + +## Delivery guarantees + +**At-most-once. Fire-and-forget.** A published event may be delivered +to every matching subscriber, some of them, or none. + +Specifically, the bus drops events when: + +- No subscribers match the topic at publish time. +- A subscriber's per-connection queue is full (default 1024 events); + the oldest event is discarded to make room. This is **drop-oldest** + backpressure — the invariant is that a slow consumer cannot stall + publishers. +- The bus worker is restarting or the client loses its socket + connection. In-flight events are lost. + +**These are features, not bugs.** The DB remains source of truth, so +the worst-case effect of a dropped event is up to ~N seconds of +latency until the next poll. In exchange, publishers never block, +never retry, and never have to reason about ordering across +subscribers. + +If you need durable delivery, write to the DB instead and let +consumers poll. Use the bus to shorten the poll interval from +seconds to milliseconds, not to replace it. + +--- + +## Worked example: live topology mutations + +The mutator + web editor flow is the canonical consumer of the bus +today. See [MazeNET](MazeNET) for the feature-level description. + +1. Operator drags a LAN in the MazeNET editor. +2. Frontend posts `POST /api/v1/topologies/{id}/mutations`. The API + writes a `TopologyMutation` row in state `pending`, then publishes + `topology.{id}.mutation.enqueued` on the bus. +3. The `decnet mutate` worker is subscribed to + `topology.*.mutation.enqueued`. The enqueue event flips an + `asyncio.Event` that cancels its 10-second `asyncio.sleep`, and + the reconciler re-enters immediately. +4. The reconciler claims the row atomically, publishes + `topology.{id}.mutation.applying`, dispatches to the op, then + publishes `topology.{id}.mutation.applied` or `.failed`. +5. The SSE route `GET /api/v1/topologies/{id}/events` forwards every + matching topic as an SSE frame to the connected browser. The + editor refetches on `applied`/`failed`/`status` and the header + flips between LIVE and CONNECTING…. + +If the bus drops any one of those events, the 10-second mutator +poll picks up the pending row (step 3 falls back gracefully) and the +editor's next REFRESH button press resyncs (step 5 falls back +gracefully). The feature works with a degraded bus — just slower. + +--- + +## Testing + +The bus ships with two implementations: + +- `UnixSocketBus` (production) — talks to a real `decnet bus` worker + over a UNIX socket. +- `FakeBus` (tests) — in-process pub/sub, no socket. Select via + `DECNET_BUS_TYPE=fake` or by instantiating directly in a pytest + fixture. + +Standard test fixtures live in `tests/bus/conftest.py`: + +```python +from decnet.bus.fake import FakeBus + +@pytest_asyncio.fixture +async def fake_bus(): + bus = FakeBus() + await bus.connect() + yield bus + await bus.close() +``` + +For end-to-end smoke, `scripts/bus/smoke.sh` boots a real bus worker, +publishes one event, and asserts a subscriber sees it. +`scripts/bus/smoke-mutator.sh` does the same for the four +mutator-family topics. + +--- + +## Troubleshooting + +### `bus: unavailable` in worker logs + +Most common causes: + +1. The `decnet-bus.service` is not running on the host. Check + `systemctl status decnet-bus`. +2. The worker process isn't in the `decnet` group. `id $(whoami)` — + if `decnet` is missing, `usermod -aG decnet $(whoami)` and log + out/in, or pass `--group ""` for dev. +3. `DECNET_BUS_SOCKET` disagrees between the worker and the bus. + Make sure both read the same env file. + +A missing bus is non-fatal: the mutator drops back to its 10-second +poll, the API SSE route idles on keepalives. But you've lost the +sub-second push — fix it. + +### SSE stream connects but events never arrive + +If the editor header says `CONNECTING…` but never flips to `LIVE`, +the SSE route opened but no events have been forwarded. Check in +order: + +1. Is the topology actually `active` or `degraded`? The editor only + streams for live topologies. +2. Is the mutator worker running? Publishes come from there. +3. Is the bus worker reachable from both the API process and the + mutator? `scripts/bus/smoke-mutator.sh` narrows this down in a + few seconds. + +### Dropped events under load + +The per-subscriber queue is 1024. If you see +`bus.fake: subscriber queue full, dropped …` in logs, your +subscriber is slower than the publisher. Either speed up the +subscriber's loop body or accept the drop — remember, DB is source +of truth. + +--- + +## Future work + +- `decnet bus --bridge-tcp` for cross-host pub/sub (deferred). +- Migrating the dashboard `/stream` endpoint off its internal poll + loop onto the bus (a bus consumer fanning out to SSE, parallel to + the topology events route). +- First decky/attacker consumers: live-visualization pulsation + driven by `decky.{id}.traffic` and `attacker.observed`. diff --git a/_Sidebar.md b/_Sidebar.md index 658f701..33ff347 100644 --- a/_Sidebar.md +++ b/_Sidebar.md @@ -25,6 +25,7 @@ - [Database-Drivers](Database-Drivers) - [Systemd-Setup](Systemd-Setup) - [Logging-and-Syslog](Logging-and-Syslog) +- [Service-Bus](Service-Bus) - [Web-Dashboard](Web-Dashboard) - [REST-API-Reference](REST-API-Reference) - [Mutation-and-Randomization](Mutation-and-Randomization)