Table of Contents
- Service Bus — host-local pub/sub for DECNET workers
Service Bus — host-local pub/sub for DECNET workers
The DECNET service bus is a host-local UNIX-domain socket pub/sub transport that lets independent workers (mutator, correlator, profiler, API, sniffer, …) push events to each other without coupling through the database.
It is the plumbing that turns DECNET from a pile of cooperating pollers into something that reacts in sub-second when state changes.
See also: Design Overview, Module reference: workers, Environment variables, Systemd setup.
Why a bus
Every DECNET worker already talks to the database. But the database is a poor fit for "something just happened" — every consumer has to poll, and every poll is a round-trip whether or not there's work.
The bus gives you a second communication channel alongside the DB:
| Concern | DB | Bus |
|---|---|---|
| Source of truth | yes | no |
| Persistence | durable | in-flight only |
| Delivery | exactly-once | at-most-once |
| Latency | poll-interval bounded | sub-millisecond |
| Survival across restarts | yes | no |
Rule of thumb: state goes in the DB. Notifications that something in the DB just changed go on the bus. If the bus drops a message, the next DB poll picks up the state; nothing is lost, only latency increases.
This invariant is load-bearing. It's what lets publishers be fire-and-forget, lets the transport discard events under backpressure instead of blocking, and lets consumers treat the bus as a hint rather than a contract.
Architecture
┌─────────────┐ publish ┌────────────┐ subscribe ┌──────────────┐
│ mutator │────────────────▶│ decnet bus │──────────────▶│ api (SSE) │
└─────────────┘ │ worker │ └──────────────┘
│ (UNIX │
┌─────────────┐ publish │ socket) │ subscribe ┌──────────────┐
│ web api │────────────────▶│ │──────────────▶│ mutator │
│ (enqueue) │ └────────────┘ │ (wake-on- │
└─────────────┘ │ enqueue) │
└──────────────┘
decnet bus is a standalone systemd-supervised worker. It owns the
UNIX socket, multiplexes clients, matches topics against subscription
patterns, and fans events out to interested subscribers. Publishers
and subscribers are just client processes — the worker doesn't know
what each topic means, only how to route it.
One bus per host
The MVP is host-local only. Each host runs its own decnet bus.
Cross-host federation (swarm master ↔ agents over TCP) is deferred to
a future --bridge-tcp mode that will proxy over the existing swarm
mTLS infra if a use case emerges. Today, if you want something on the
master to react to an event on an agent, the agent pushes it over the
regular swarm API and the master's local bus re-publishes from there.
Authorization
The socket file is created with mode 0660 and owned by the
decnet group. Authorization is therefore whatever the kernel says
about who can connect() — anyone in the decnet group is trusted.
There is no per-topic ACL; if you can connect, you can publish or
subscribe to anything.
For dev boxes without the decnet group, pass --group "" when
starting the worker and the socket stays owned by the current process
group.
Starting the bus
Via systemd (production)
sudo systemctl enable --now decnet-bus.service
The unit file ships with DECNET_BUS_SOCKET=/run/decnet/bus.sock,
RuntimeDirectory=decnet, and Group=decnet. Any worker unit that
depends on the bus should declare After=decnet-bus.service and
Requires=decnet-bus.service.
Manually (dev)
decnet bus # defaults: /run/decnet/bus.sock, group=decnet
decnet bus --socket /tmp/b.sock --group "" --heartbeat 5
--heartbeat N emits a system.bus.health event every N seconds so
subscribers can detect a hung worker.
Disabled mode
If you don't want a bus at all (contract-test runs, minimal dev), set:
export DECNET_BUS_ENABLED=false
Every get_bus() call returns a NullBus — publishes are silently
dropped, subscriptions yield nothing. Workers start cleanly; features
that depend on sub-second push fall back to their poll-interval
fallback.
Topic hierarchy
Topics are NATS-style dot-separated strings. Two wildcards are supported in subscription patterns:
*matches exactly one token.>matches one-or-more trailing tokens.
So topology.> matches topology.abc.status but not the bare
topology; topology.*.mutation.applied matches every topology's
applied event; topology.abc.mutation.* matches every mutation
state for one topology.
Current topic families:
| Topic pattern | Emitted by | Example payload |
|---|---|---|
topology.{id}.mutation.enqueued |
API (POST /mutations) |
{mutation_id, op, payload} |
topology.{id}.mutation.applying |
Mutator | {mutation_id, op, payload} |
topology.{id}.mutation.applied |
Mutator | {mutation_id, op} |
topology.{id}.mutation.failed |
Mutator | {mutation_id, op, reason} |
topology.{id}.status |
Mutator | {state, reason} |
decky.{id}.state |
reserved | — |
decky.{id}.traffic |
reserved | — |
attacker.observed |
reserved | — |
system.log |
reserved | — |
system.bus.health |
Bus worker heartbeat | {ts, uptime_s} |
Topic builders live in decnet.bus.topics. Always use the helpers
(topology_mutation(tid, state), topology_status(tid), …) — they
reject segments containing ., *, >, or whitespace so a bogus
topology id can't silently corrupt the hierarchy.
Renaming an existing topic is a breaking change for every subscriber. Adding a new family is safe.
Environment variables
| Variable | Default | Meaning |
|---|---|---|
DECNET_BUS_ENABLED |
true |
Set false to short-circuit get_bus() to NullBus. |
DECNET_BUS_TYPE |
unix |
unix or fake. fake is the in-process transport used by tests. |
DECNET_BUS_SOCKET |
/run/decnet/bus.sock (falls back to ~/.decnet/bus.sock) |
Path to the UNIX socket. |
DECNET_BUS_GROUP |
decnet |
Group that owns the socket file. "" disables chown. |
See Environment variables for the full list.
Using the bus from code
Always use the factory. Never import UnixSocketBus, FakeBus,
or NullBus directly — mirrors the get_repository() convention.
Publishing
from decnet.bus.factory import get_bus
from decnet.bus import topics
bus = get_bus(client_name="mutator")
await bus.connect()
await bus.publish(
topics.topology_mutation(topology_id, topics.MUTATION_APPLIED),
{"mutation_id": mutation_id, "op": "add_lan"},
event_type=topics.MUTATION_APPLIED,
)
Publishers are fire-and-forget. Wrap publishes in a
try/except Exception as exc: log.warning(...) — a bus failure must
never break the caller, because the DB write already happened.
Subscribing
from decnet.bus.factory import get_bus
bus = get_bus(client_name="my-worker")
await bus.connect()
sub = bus.subscribe("topology.*.mutation.applied")
async with sub:
async for event in sub:
print(event.topic, event.payload)
Subscriptions are async iterators. Wrap them in async with so the
client unregisters itself cleanly on exit.
Inside the web API
Request handlers use a shared singleton so the bus socket isn't opened per request:
from decnet.bus.app import get_app_bus
bus = await get_app_bus() # None if bus is disabled or down
if bus is not None:
await bus.publish(...)
get_app_bus() returns None on connection failure and remembers
the failure for the process lifetime — the API stays healthy without
a bus.
Delivery guarantees
At-most-once. Fire-and-forget. A published event may be delivered to every matching subscriber, some of them, or none.
Specifically, the bus drops events when:
- No subscribers match the topic at publish time.
- A subscriber's per-connection queue is full (default 1024 events); the oldest event is discarded to make room. This is drop-oldest backpressure — the invariant is that a slow consumer cannot stall publishers.
- The bus worker is restarting or the client loses its socket connection. In-flight events are lost.
These are features, not bugs. The DB remains source of truth, so the worst-case effect of a dropped event is up to ~N seconds of latency until the next poll. In exchange, publishers never block, never retry, and never have to reason about ordering across subscribers.
If you need durable delivery, write to the DB instead and let consumers poll. Use the bus to shorten the poll interval from seconds to milliseconds, not to replace it.
Worked example: live topology mutations
The mutator + web editor flow is the canonical consumer of the bus today. See MazeNET for the feature-level description.
- Operator drags a LAN in the MazeNET editor.
- Frontend posts
POST /api/v1/topologies/{id}/mutations. The API writes aTopologyMutationrow in statepending, then publishestopology.{id}.mutation.enqueuedon the bus. - The
decnet mutateworker is subscribed totopology.*.mutation.enqueued. The enqueue event flips anasyncio.Eventthat cancels its 10-secondasyncio.sleep, and the reconciler re-enters immediately. - The reconciler claims the row atomically, publishes
topology.{id}.mutation.applying, dispatches to the op, then publishestopology.{id}.mutation.appliedor.failed. - The SSE route
GET /api/v1/topologies/{id}/eventsforwards every matching topic as an SSE frame to the connected browser. The editor refetches onapplied/failed/statusand the header flips between LIVE and CONNECTING….
If the bus drops any one of those events, the 10-second mutator poll picks up the pending row (step 3 falls back gracefully) and the editor's next REFRESH button press resyncs (step 5 falls back gracefully). The feature works with a degraded bus — just slower.
Testing
The bus ships with two implementations:
UnixSocketBus(production) — talks to a realdecnet busworker over a UNIX socket.FakeBus(tests) — in-process pub/sub, no socket. Select viaDECNET_BUS_TYPE=fakeor by instantiating directly in a pytest fixture.
Standard test fixtures live in tests/bus/conftest.py:
from decnet.bus.fake import FakeBus
@pytest_asyncio.fixture
async def fake_bus():
bus = FakeBus()
await bus.connect()
yield bus
await bus.close()
For end-to-end smoke, scripts/bus/smoke.sh boots a real bus worker,
publishes one event, and asserts a subscriber sees it.
scripts/bus/smoke-mutator.sh does the same for the four
mutator-family topics.
Troubleshooting
bus: unavailable in worker logs
Most common causes:
- The
decnet-bus.serviceis not running on the host. Checksystemctl status decnet-bus. - The worker process isn't in the
decnetgroup.id $(whoami)— ifdecnetis missing,usermod -aG decnet $(whoami)and log out/in, or pass--group ""for dev. DECNET_BUS_SOCKETdisagrees between the worker and the bus. Make sure both read the same env file.
A missing bus is non-fatal: the mutator drops back to its 10-second poll, the API SSE route idles on keepalives. But you've lost the sub-second push — fix it.
SSE stream connects but events never arrive
If the editor header says CONNECTING… but never flips to LIVE,
the SSE route opened but no events have been forwarded. Check in
order:
- Is the topology actually
activeordegraded? The editor only streams for live topologies. - Is the mutator worker running? Publishes come from there.
- Is the bus worker reachable from both the API process and the
mutator?
scripts/bus/smoke-mutator.shnarrows this down in a few seconds.
Dropped events under load
The per-subscriber queue is 1024. If you see
bus.fake: subscriber queue full, dropped … in logs, your
subscriber is slower than the publisher. Either speed up the
subscriber's loop body or accept the drop — remember, DB is source
of truth.
Future work
decnet bus --bridge-tcpfor cross-host pub/sub (deferred).- Migrating the dashboard
/streamendpoint off its internal poll loop onto the bus (a bus consumer fanning out to SSE, parallel to the topology events route). - First decky/attacker consumers: live-visualization pulsation
driven by
decky.{id}.trafficandattacker.observed.
DECNET
User docs
- Quick-Start
- Installation
- Requirements-and-Python-Versions
- CLI-Reference
- INI-Config-Format
- Custom-Services
- Services-Catalog
- Service-Personas
- Archetypes
- Distro-Profiles
- OS-Fingerprint-Spoofing
- Networking-MACVLAN-IPVLAN
- Deployment-Modes
- SWARM-Mode
- MazeNET
- Remote-Updates
- Environment-Variables
- Teardown-and-State
- Database-Drivers
- Systemd-Setup
- Logging-and-Syslog
- Service-Bus
- Web-Dashboard
- REST-API-Reference
- Mutation-and-Randomization
- Troubleshooting
Developer docs
DECNET — honeypot deception-network framework. Pre-1.0, active development — use with caution. See Sponsors to support the project. Contact: samuel@securejump.cl