Table of Contents
- Service Bus — host-local pub/sub for DECNET workers
Service Bus — host-local pub/sub for DECNET workers
The DECNET service bus is a host-local UNIX-domain socket pub/sub transport that lets independent workers (mutator, correlator, profiler, API, sniffer, …) push events to each other without coupling through the database.
It is the plumbing that turns DECNET from a pile of cooperating pollers into something that reacts in sub-second when state changes.
See also: Design Overview, Module reference: workers, Environment variables, Systemd setup.
Why a bus
Every DECNET worker already talks to the database. But the database is a poor fit for "something just happened" — every consumer has to poll, and every poll is a round-trip whether or not there's work.
The bus gives you a second communication channel alongside the DB:
| Concern | DB | Bus |
|---|---|---|
| Source of truth | yes | no |
| Persistence | durable | in-flight only |
| Delivery | exactly-once | at-most-once |
| Latency | poll-interval bounded | sub-millisecond |
| Survival across restarts | yes | no |
Rule of thumb: state goes in the DB. Notifications that something in the DB just changed go on the bus. If the bus drops a message, the next DB poll picks up the state; nothing is lost, only latency increases.
This invariant is load-bearing. It's what lets publishers be fire-and-forget, lets the transport discard events under backpressure instead of blocking, and lets consumers treat the bus as a hint rather than a contract.
Architecture
┌─────────────┐ publish ┌────────────┐ subscribe ┌──────────────┐
│ mutator │────────────────▶│ decnet bus │──────────────▶│ api (SSE) │
└─────────────┘ │ worker │ └──────────────┘
│ (UNIX │
┌─────────────┐ publish │ socket) │ subscribe ┌──────────────┐
│ web api │────────────────▶│ │──────────────▶│ mutator │
│ (enqueue) │ └────────────┘ │ (wake-on- │
└─────────────┘ │ enqueue) │
└──────────────┘
decnet bus is a standalone systemd-supervised worker. It owns the
UNIX socket, multiplexes clients, matches topics against subscription
patterns, and fans events out to interested subscribers. Publishers
and subscribers are just client processes — the worker doesn't know
what each topic means, only how to route it.
One bus per host
The MVP is host-local only. Each host runs its own decnet bus.
Cross-host federation (swarm master ↔ agents over TCP) is deferred to
a future --bridge-tcp mode that will proxy over the existing swarm
mTLS infra if a use case emerges. Today, if you want something on the
master to react to an event on an agent, the agent pushes it over the
regular swarm API and the master's local bus re-publishes from there.
Authorization
The socket file is created with mode 0660 and owned by the
decnet group. Authorization is therefore whatever the kernel says
about who can connect() — anyone in the decnet group is trusted.
There is no per-topic ACL; if you can connect, you can publish or
subscribe to anything.
For dev boxes without the decnet group, pass --group "" when
starting the worker and the socket stays owned by the current process
group.
Starting the bus
Via systemd (production)
sudo systemctl enable --now decnet-bus.service
The unit file ships with DECNET_BUS_SOCKET=/run/decnet/bus.sock,
RuntimeDirectory=decnet, and Group=decnet. Any worker unit that
depends on the bus should declare After=decnet-bus.service and
Requires=decnet-bus.service.
Manually (dev)
decnet bus # defaults: /run/decnet/bus.sock, group=decnet
decnet bus --socket /tmp/b.sock --group "" --heartbeat 5
--heartbeat N emits a system.bus.health event every N seconds so
subscribers can detect a hung worker.
Disabled mode
If you don't want a bus at all (contract-test runs, minimal dev), set:
export DECNET_BUS_ENABLED=false
Every get_bus() call returns a NullBus — publishes are silently
dropped, subscriptions yield nothing. Workers start cleanly; features
that depend on sub-second push fall back to their poll-interval
fallback.
Topic hierarchy
Topics are NATS-style dot-separated strings. Two wildcards are supported in subscription patterns:
*matches exactly one token.>matches one-or-more trailing tokens.
So topology.> matches topology.abc.status but not the bare
topology; topology.*.mutation.applied matches every topology's
applied event; topology.abc.mutation.* matches every mutation
state for one topology.
Current topic families:
| Topic pattern | Emitted by | Example payload |
|---|---|---|
topology.{id}.mutation.enqueued |
API (POST /mutations) |
{mutation_id, op, payload} |
topology.{id}.mutation.applying |
Mutator | {mutation_id, op, payload} |
topology.{id}.mutation.applied |
Mutator | {mutation_id, op} |
topology.{id}.mutation.failed |
Mutator | {mutation_id, op, reason} |
topology.{id}.status |
Mutator | {state, reason} |
decky.{id}.state |
reserved | — |
decky.{id}.traffic |
reserved | — |
decky.{name}.service_added |
Engine (services_live) | {decky_name, service_name, topology_id?, services} — live add of a single service to a deployed decky (fleet OR MazeNET). Published only after compose up -d --no-deps --build <decky>-<service> succeeds. Substrate-watching consumers (correlator, dashboard) reconcile shape off this without waiting for the next decnet-state.json snapshot. topology_id is null for fleet deckies. Note: leaf is service_added (underscore), not service.added — bus segments are NATS-style tokens and can't contain dots. |
decky.{name}.service_removed |
Engine (services_live) | {decky_name, service_name, topology_id?, services} — live remove. Compose stop + rm -f of the service container, then DB persist + compose re-render so a future up -d doesn't bring it back. Same payload shape as the added event; services is the post-mutation list. |
decky.{name}.service_config_changed |
Engine (services_live) | {decky_name, service_name, topology_id?, service_config, recreated} — published when the schema-driven Inspector form persists a new per-service config. service_config is the post-validation dict (unknown keys dropped, types coerced). recreated=true when the operator hit Apply (force-recreate of <decky>-<service> to pick up the new env, destructive); false for Save (DB + compose only). Note: this topic is an audit-trail signal, not a propagation trigger — for swarm fleet deckies and agent-pinned topologies the master propagates the change directly via AgentClient.deploy / apply_topology (see _redispatch_fleet_shard / _resync_agent_topology in decnet/engine/services_live.py). |
orchestrator.traffic.{decky_id} |
Orchestrator | {kind: "traffic", protocol: "ssh", action, src_decky_uuid, dst_decky_uuid, success, payload, ts} — synthetic inter-decky SSH traffic generated to keep the fleet from looking suspiciously static |
orchestrator.file.{decky_id} |
Orchestrator | {kind: "file", protocol: "ssh", action, dst_decky_uuid, success, payload, ts} — synthetic file create or edit (action="file:create" / "file:edit") performed inside a decky via docker exec. Driven by the realism planner; rare ticks (~3%) carry callback-bearing canary content. |
orchestrator.email.{decky_id} |
Orchestrator | {kind: "email", mail_decky_uuid, thread_id, message_id, in_reply_to, sender_email, recipient_email, subject, language, success, ts} — one fake corporate email persona-driven by the realism content engine and dropped into a mail decky's spool. decky_id is the mail decky (the IMAP/POP3 host serving the mailbox). Producer changed from a separate emailgen worker to the unified orchestrator in the realism migration; topic shape is unchanged. |
system.orchestrator.health |
Orchestrator | standard worker heartbeat (covers traffic + file + email branches). Carries extra.realism = {llm_enabled, llm_backend, llm_model, llm_breaker_state} so the dashboard's worker panel can render a live LLM/circuit-breaker status badge without reaching into worker memory. |
system.orchestrator.control |
Orchestrator | admin-originated stop intents for the orchestrator loop |
attacker.observed |
Correlator | first sighting; consumed by decnet enrich as a wake signal |
attacker.scored |
Profiler | post-enrichment score update; also wakes decnet enrich |
attacker.intel.enriched |
decnet enrich |
{attacker_ip, aggregate_verdict, providers} after a threat-intel pass; webhook → SIEM |
attacker.fingerprinted |
Prober | {attacker_ip, port, jarm_hash|hassh_server|tcpfp_hash, ...} — fires on every successful active probe result. Distinct from attacker.observed (correlator first-sight); a fingerprint is additional evidence about an already-observed attacker. |
attacker.fingerprint_rotated |
Prober (via decnet.correlation.fingerprint_rotation) |
{attacker_uuid, attacker_ip, port, probe_type, old_hash, new_hash, rotation_count, ts} — fires only when a probe produces a different hash than the last persisted hash for the same (attacker_uuid, port, probe_type) triple. Carries both old and new hash so consumers don't have to join. Indicates infrastructure churn / VPS rotation / banner rewrite / cert swap. Consumers: dashboard, forensics, attribution clustering. |
identity.formed |
reserved (clusterer) | {identity_uuid, observation_uuids: [...], confidence, first_seen_at} — clusterer creates a new identity from one or more observations |
identity.observation.linked |
reserved (clusterer) | {identity_uuid, observation_uuid, confidence_after} — observation attached / re-attached to an identity |
identity.merged |
reserved (clusterer) | {winner_uuid, loser_uuid, observation_uuids: [...], confidence_after} — two identities collapsed; subscribers re-key cached references to the winner |
identity.unmerged |
reserved (clusterer) | {resurrected_uuid, former_winner_uuid, observation_uuids: [...], reason} — revocable-merge undo: contradicting evidence cleared merged_into_uuid; subscribers should re-split cached references back to the resurrected side |
identity.campaign.assigned |
Campaign clusterer | {identity_uuid, campaign_uuid, prior_campaign_uuid?} — cross-family signal so identity.> subscribers (e.g. IdentityDetail SSE) see the campaign-id change without needing to subscribe to campaign.> |
attribution.profile.state_changed |
decnet.correlation.attribution_worker |
{identity_uuid, primitive, old_state, new_state, current_value, confidence, observation_count, ts} — per-(identity, primitive) state-machine transition (e.g. stable → drifting). Emitted only on transition; idempotent observations that don't change state produce no event. State vocabulary is fixed at five values: unknown / stable / drifting / conflicted / multi_actor. See development/ATTRIBUTION-ENGINE.md. v0 ships Phase 1 substrate only (worker logs, no transitions emitted yet); the topic constants ship now so SSE relays can subscribe ahead of Phase 4. |
attribution.profile.multi_actor_suspected |
decnet.correlation.attribution_worker |
{identity_uuid, primitives: [...], evidence_summary, confidence, ts} — fires when ≥ 2 primitives independently flag the same identity as multi_actor. Single-primitive multi_actor is too noisy to alarm on (flapping primitive on flaky network looks like two operators); the cross-primitive correlator is the real signal. confidence is capped at 0.6 by convention — see _thresholds.MULTI_ACTOR_MAX_CONFIDENCE. |
campaign.formed |
Campaign clusterer | {campaign_uuid, identity_uuids: [...], confidence, first_seen_at} — clusterer creates a new campaign from one or more identities |
campaign.identity.assigned |
Campaign clusterer | {campaign_uuid, identity_uuid} — identity attached / re-attached to a campaign |
campaign.merged |
Campaign clusterer | {winner_uuid, loser_uuid, identity_uuids: [...]} — two campaigns collapsed; subscribers re-key cached references to the winner |
campaign.unmerged |
Campaign clusterer | {resurrected_uuid, former_winner_uuid, identity_uuids: [...]} — revocable-merge undo at the campaign layer |
canary.{token_id}.placed |
Planter (API + deploy hook) | {token_id, decky_id, kind, instrumenter?, placement_path, placed_at} — a canary artifact was successfully written into a decky's filesystem (or a passive token persisted) |
canary.{token_id}.triggered |
decnet canary worker |
{token_id, decky_id, src_ip, user_agent?, request_path?, dns_qname?, occurred_at, raw_headers?} — attacker hit the HTTP slug or DNS subdomain; correlator + webhook fanout consume to attribute and forward |
canary.{token_id}.revoked |
API (DELETE /tokens/{id}) |
{token_id, decky_id, revoked_at} — operator removed a token; subscribers may evict cached lookups by token id |
system.canary.health |
decnet canary worker |
standard worker heartbeat |
smtp.probe.pending |
Ingester | {decky, attacker_ip, stored_as, mail_from, rcpt_to} — fired when an smtp_relay decky stores a new inbound message. The realism worker subscribes and forwards the email to the upstream relay if this source IP has not yet reached probe_limit. stored_as is the quarantine filename (relative to the per-decky smtp dir). mail_from / rcpt_to are the envelope sender/recipients captured at SMTP time. The worker writes a probe_relay bounty row on success or failure so the limit check is DB-backed and survives container restarts. |
email.received |
Ingester (decnet/web/ingester.py:_publish_email_received) |
{source_id, attacker_uuid, attacker_ip, decky_id, service, subject, from_domain, mail_from_domain, return_path_domain, rcpt_count, rcpt_domains, x_mailer, dkim_signed, spf_pass, urls, attachment_count, attachment_sha256s, attachment_extensions, body_simhash, body_base64_bytes, attachment_macros, attachment_password_protected, html_smuggling, stored_as, body_sha256, mal_hash_match?} — fired on full-message receipt, consumed by the TTP email_lifter. PII discipline (TTP_TAGGING.md "Hard parts §6"): hashes, counts, header names, and rcpt-domain sets only — never rcpt addresses or body bytes. mal_hash_match (DEBT-046, 2026-05-03): boolean, present only when the message had attachments. True when any attachment SHA-256 hits the configured MalHashProvider (default: MalwareBazaar bulk feed, gated on DECNET_MALWAREBAZAAR_AUTH_KEY); False when every hash was checked-and-clean; omitted when there were no attachments at all (so R0046's is True predicate stays silent on hash-less mail, matching the rule's pre-paydown behavior). Per-hash observations also persist to the observed_attachments table — DECNET is a honeypot platform, not just a consumer of intel. |
ttp.tagged |
decnet.ttp.worker |
{tag_uuids: [...], techniques_added: [...], attacker_uuid?, identity_uuid, session_id?} — published only when INSERT OR IGNORE wrote at least one new row. Idempotent re-evaluations that produce zero new tags publish zero events (loop-prevention invariant — a webhook subscriber re-triggering enrichment on ttp.tagged could otherwise loop forever). |
ttp.rule.fired.{technique_id} |
decnet.ttp.worker |
{technique_id, tag_uuids: [...], attacker_uuid?, identity_uuid, session_id?} — per-technique fan-out for SIEM correlation rules that subscribe to one technique. Topic key is the parent technique; sub-technique IDs are not promoted to the topic (a T1110.001 tag fires ttp.rule.fired.T1110). Use ttp.rule.fired.> for fleet-wide subscribers. |
ttp.rule.suppressed |
reserved (TTP worker) | {rule_id, technique_id, reason} where reason ∈ {"below_floor", "rate_limited", "rule_disabled"} — observability for tags that would have been written but were dropped. Drives the dashboard's per-rule suppression counters. Suppression-event publish is deferred — the v0 worker drops sub-floor confidence at the repo layer and the bus event is unwired. |
ttp.rule.reloaded.{rule_id} |
decnet.ttp.store (FilesystemRuleStore + DatabaseRuleStore) |
{rule_id, rule_version, deleted?} — fired by the rule store when a rule's definition changes (YAML edit on disk for the FS backend, ttp_rule row update on the DB backend). One event per per-rule edit — never batched. A 5-rule deploy fires 5 events; the engine recompiles each rule alone and atomically swaps it into the dispatch index. Built via topics.ttp_rule_reloaded(rule_id); fleet subscribers use ttp.rule.reloaded.>. deleted: true indicates the rule file was removed. |
ttp.rule.state.{rule_id} |
decnet.ttp.store (FilesystemRuleStore + DatabaseRuleStore) + POST /api/v1/ttp/rules/{rule_id}/state |
{rule_id, state, set_by, auto_revert?} — fired when a rule's operational state changes (operator hits disable/clip via the API, or an expires_at TTL fires and auto-reverts the state). state ∈ {"enabled", "disabled", "clipped"}. auto_revert: true flags TTL-driven reverts so dashboards can distinguish them from operator actions. Built via topics.ttp_rule_state(rule_id); fleet subscribers use ttp.rule.state.>. |
system.log |
reserved | — |
system.bus.health |
Bus worker heartbeat | {ts, uptime_s} |
Topic builders live in decnet.bus.topics. Always use the helpers
(topology_mutation(tid, state), topology_status(tid), …) — they
reject segments containing ., *, >, or whitespace so a bogus
topology id can't silently corrupt the hierarchy.
Renaming an existing topic is a breaking change for every subscriber. Adding a new family is safe.
Environment variables
| Variable | Default | Meaning |
|---|---|---|
DECNET_BUS_ENABLED |
true |
Set false to short-circuit get_bus() to NullBus. |
DECNET_BUS_TYPE |
unix |
unix or fake. fake is the in-process transport used by tests. |
DECNET_BUS_SOCKET |
/run/decnet/bus.sock (falls back to ~/.decnet/bus.sock) |
Path to the UNIX socket. |
DECNET_BUS_GROUP |
decnet |
Group that owns the socket file. "" disables chown. |
See Environment variables for the full list.
Using the bus from code
Always use the factory. Never import UnixSocketBus, FakeBus,
or NullBus directly — mirrors the get_repository() convention.
Publishing
from decnet.bus.factory import get_bus
from decnet.bus import topics
bus = get_bus(client_name="mutator")
await bus.connect()
await bus.publish(
topics.topology_mutation(topology_id, topics.MUTATION_APPLIED),
{"mutation_id": mutation_id, "op": "add_lan"},
event_type=topics.MUTATION_APPLIED,
)
Publishers are fire-and-forget. Wrap publishes in a
try/except Exception as exc: log.warning(...) — a bus failure must
never break the caller, because the DB write already happened.
Subscribing
from decnet.bus.factory import get_bus
bus = get_bus(client_name="my-worker")
await bus.connect()
sub = bus.subscribe("topology.*.mutation.applied")
async with sub:
async for event in sub:
print(event.topic, event.payload)
Subscriptions are async iterators. Wrap them in async with so the
client unregisters itself cleanly on exit.
Inside the web API
Request handlers use a shared singleton so the bus socket isn't opened per request:
from decnet.bus.app import get_app_bus
bus = await get_app_bus() # None if bus is disabled or down
if bus is not None:
await bus.publish(...)
get_app_bus() returns None on connection failure and remembers
the failure for the process lifetime — the API stays healthy without
a bus.
Delivery guarantees
At-most-once. Fire-and-forget. A published event may be delivered to every matching subscriber, some of them, or none.
Specifically, the bus drops events when:
- No subscribers match the topic at publish time.
- A subscriber's per-connection queue is full (default 1024 events); the oldest event is discarded to make room. This is drop-oldest backpressure — the invariant is that a slow consumer cannot stall publishers.
- The bus worker is restarting or the client loses its socket connection. In-flight events are lost.
These are features, not bugs. The DB remains source of truth, so the worst-case effect of a dropped event is up to ~N seconds of latency until the next poll. In exchange, publishers never block, never retry, and never have to reason about ordering across subscribers.
If you need durable delivery, write to the DB instead and let consumers poll. Use the bus to shorten the poll interval from seconds to milliseconds, not to replace it.
Worked example: live topology mutations
The mutator + web editor flow is the canonical consumer of the bus today. See MazeNET for the feature-level description.
- Operator drags a LAN in the MazeNET editor.
- Frontend posts
POST /api/v1/topologies/{id}/mutations. The API writes aTopologyMutationrow in statepending, then publishestopology.{id}.mutation.enqueuedon the bus. - The
decnet mutateworker is subscribed totopology.*.mutation.enqueued. The enqueue event flips anasyncio.Eventthat cancels its 10-secondasyncio.sleep, and the reconciler re-enters immediately. - The reconciler claims the row atomically, publishes
topology.{id}.mutation.applying, dispatches to the op, then publishestopology.{id}.mutation.appliedor.failed. - The SSE route
GET /api/v1/topologies/{id}/eventsforwards every matching topic as an SSE frame to the connected browser. The editor refetches onapplied/failed/statusand the header flips between LIVE and CONNECTING….
If the bus drops any one of those events, the 10-second mutator poll picks up the pending row (step 3 falls back gracefully) and the editor's next REFRESH button press resyncs (step 5 falls back gracefully). The feature works with a degraded bus — just slower.
Testing
The bus ships with two implementations:
UnixSocketBus(production) — talks to a realdecnet busworker over a UNIX socket.FakeBus(tests) — in-process pub/sub, no socket. Select viaDECNET_BUS_TYPE=fakeor by instantiating directly in a pytest fixture.
Standard test fixtures live in tests/bus/conftest.py:
from decnet.bus.fake import FakeBus
@pytest_asyncio.fixture
async def fake_bus():
bus = FakeBus()
await bus.connect()
yield bus
await bus.close()
For end-to-end smoke, scripts/bus/smoke.sh boots a real bus worker,
publishes one event, and asserts a subscriber sees it.
scripts/bus/smoke-mutator.sh does the same for the four
mutator-family topics.
Troubleshooting
bus: unavailable in worker logs
Most common causes:
- The
decnet-bus.serviceis not running on the host. Checksystemctl status decnet-bus. - The worker process isn't in the
decnetgroup.id $(whoami)— ifdecnetis missing,usermod -aG decnet $(whoami)and log out/in, or pass--group ""for dev. DECNET_BUS_SOCKETdisagrees between the worker and the bus. Make sure both read the same env file.
A missing bus is non-fatal: the mutator drops back to its 10-second poll, the API SSE route idles on keepalives. But you've lost the sub-second push — fix it.
SSE stream connects but events never arrive
If the editor header says CONNECTING… but never flips to LIVE,
the SSE route opened but no events have been forwarded. Check in
order:
- Is the topology actually
activeordegraded? The editor only streams for live topologies. - Is the mutator worker running? Publishes come from there.
- Is the bus worker reachable from both the API process and the
mutator?
scripts/bus/smoke-mutator.shnarrows this down in a few seconds.
Dropped events under load
The per-subscriber queue is 1024. If you see
bus.fake: subscriber queue full, dropped … in logs, your
subscriber is slower than the publisher. Either speed up the
subscriber's loop body or accept the drop — remember, DB is source
of truth.
Future work
decnet bus --bridge-tcpfor cross-host pub/sub (deferred).- Migrating the dashboard
/streamendpoint off its internal poll loop onto the bus (a bus consumer fanning out to SSE, parallel to the topology events route). - First decky/attacker consumers: live-visualization pulsation
driven by
decky.{id}.trafficandattacker.observed.
DECNET
User docs
- Quick-Start
- Installation
- Requirements-and-Python-Versions
- CLI-Reference
- INI-Config-Format
- Custom-Services
- Services-Catalog
- Service-Personas
- Archetypes
- Distro-Profiles
- OS-Fingerprint-Spoofing
- Networking-MACVLAN-IPVLAN
- Deployment-Modes
- SWARM-Mode
- Tailscale-Global-Deployment
- Resource-Footprint
- MazeNET
- Remote-Updates
- Environment-Variables
- Teardown-and-State
- Database-Drivers
- Systemd-Setup
- Logging-and-Syslog
- Fingerprinting
- Service-Bus
- Realism
- Web-Dashboard
- REST-API-Reference
- Mutation-and-Randomization
- Troubleshooting
Developer docs
DECNET — honeypot deception-network framework. Pre-1.0, active development — use with caution. See Sponsors to support the project. Contact: samuel@securejump.cl