16
Service Bus
anti edited this page 2026-05-10 04:08:49 -04:00

Service Bus — host-local pub/sub for DECNET workers

The DECNET service bus is a host-local UNIX-domain socket pub/sub transport that lets independent workers (mutator, correlator, profiler, API, sniffer, …) push events to each other without coupling through the database.

It is the plumbing that turns DECNET from a pile of cooperating pollers into something that reacts in sub-second when state changes.

See also: Design Overview, Module reference: workers, Environment variables, Systemd setup.


Why a bus

Every DECNET worker already talks to the database. But the database is a poor fit for "something just happened" — every consumer has to poll, and every poll is a round-trip whether or not there's work.

The bus gives you a second communication channel alongside the DB:

Concern DB Bus
Source of truth yes no
Persistence durable in-flight only
Delivery exactly-once at-most-once
Latency poll-interval bounded sub-millisecond
Survival across restarts yes no

Rule of thumb: state goes in the DB. Notifications that something in the DB just changed go on the bus. If the bus drops a message, the next DB poll picks up the state; nothing is lost, only latency increases.

This invariant is load-bearing. It's what lets publishers be fire-and-forget, lets the transport discard events under backpressure instead of blocking, and lets consumers treat the bus as a hint rather than a contract.


Architecture

┌─────────────┐     publish     ┌────────────┐  subscribe   ┌──────────────┐
│  mutator    │────────────────▶│ decnet bus │──────────────▶│   api (SSE)  │
└─────────────┘                 │  worker    │               └──────────────┘
                                │  (UNIX      │
┌─────────────┐     publish     │  socket)   │  subscribe   ┌──────────────┐
│ web api     │────────────────▶│            │──────────────▶│   mutator    │
│ (enqueue)   │                 └────────────┘               │ (wake-on-    │
└─────────────┘                                              │  enqueue)    │
                                                             └──────────────┘

decnet bus is a standalone systemd-supervised worker. It owns the UNIX socket, multiplexes clients, matches topics against subscription patterns, and fans events out to interested subscribers. Publishers and subscribers are just client processes — the worker doesn't know what each topic means, only how to route it.

One bus per host

The MVP is host-local only. Each host runs its own decnet bus. Cross-host federation (swarm master ↔ agents over TCP) is deferred to a future --bridge-tcp mode that will proxy over the existing swarm mTLS infra if a use case emerges. Today, if you want something on the master to react to an event on an agent, the agent pushes it over the regular swarm API and the master's local bus re-publishes from there.

Authorization

The socket file is created with mode 0660 and owned by the decnet group. Authorization is therefore whatever the kernel says about who can connect() — anyone in the decnet group is trusted. There is no per-topic ACL; if you can connect, you can publish or subscribe to anything.

For dev boxes without the decnet group, pass --group "" when starting the worker and the socket stays owned by the current process group.


Starting the bus

Via systemd (production)

sudo systemctl enable --now decnet-bus.service

The unit file ships with DECNET_BUS_SOCKET=/run/decnet/bus.sock, RuntimeDirectory=decnet, and Group=decnet. Any worker unit that depends on the bus should declare After=decnet-bus.service and Requires=decnet-bus.service.

Manually (dev)

decnet bus                    # defaults: /run/decnet/bus.sock, group=decnet
decnet bus --socket /tmp/b.sock --group "" --heartbeat 5

--heartbeat N emits a system.bus.health event every N seconds so subscribers can detect a hung worker.

Disabled mode

If you don't want a bus at all (contract-test runs, minimal dev), set:

export DECNET_BUS_ENABLED=false

Every get_bus() call returns a NullBus — publishes are silently dropped, subscriptions yield nothing. Workers start cleanly; features that depend on sub-second push fall back to their poll-interval fallback.


Topic hierarchy

Topics are NATS-style dot-separated strings. Two wildcards are supported in subscription patterns:

  • * matches exactly one token.
  • > matches one-or-more trailing tokens.

So topology.> matches topology.abc.status but not the bare topology; topology.*.mutation.applied matches every topology's applied event; topology.abc.mutation.* matches every mutation state for one topology.

Current topic families:

Topic pattern Emitted by Example payload
topology.{id}.mutation.enqueued API (POST /mutations) {mutation_id, op, payload}
topology.{id}.mutation.applying Mutator {mutation_id, op, payload}
topology.{id}.mutation.applied Mutator {mutation_id, op}
topology.{id}.mutation.failed Mutator {mutation_id, op, reason}
topology.{id}.status Mutator {state, reason}
decky.{id}.state reserved
decky.{id}.traffic reserved
decky.{name}.service_added Engine (services_live) {decky_name, service_name, topology_id?, services} — live add of a single service to a deployed decky (fleet OR MazeNET). Published only after compose up -d --no-deps --build <decky>-<service> succeeds. Substrate-watching consumers (correlator, dashboard) reconcile shape off this without waiting for the next decnet-state.json snapshot. topology_id is null for fleet deckies. Note: leaf is service_added (underscore), not service.added — bus segments are NATS-style tokens and can't contain dots.
decky.{name}.service_removed Engine (services_live) {decky_name, service_name, topology_id?, services} — live remove. Compose stop + rm -f of the service container, then DB persist + compose re-render so a future up -d doesn't bring it back. Same payload shape as the added event; services is the post-mutation list.
decky.{name}.service_config_changed Engine (services_live) {decky_name, service_name, topology_id?, service_config, recreated} — published when the schema-driven Inspector form persists a new per-service config. service_config is the post-validation dict (unknown keys dropped, types coerced). recreated=true when the operator hit Apply (force-recreate of <decky>-<service> to pick up the new env, destructive); false for Save (DB + compose only). Note: this topic is an audit-trail signal, not a propagation trigger — for swarm fleet deckies and agent-pinned topologies the master propagates the change directly via AgentClient.deploy / apply_topology (see _redispatch_fleet_shard / _resync_agent_topology in decnet/engine/services_live.py).
orchestrator.traffic.{decky_id} Orchestrator {kind: "traffic", protocol: "ssh", action, src_decky_uuid, dst_decky_uuid, success, payload, ts} — synthetic inter-decky SSH traffic generated to keep the fleet from looking suspiciously static
orchestrator.file.{decky_id} Orchestrator {kind: "file", protocol: "ssh", action, dst_decky_uuid, success, payload, ts} — synthetic file create or edit (action="file:create" / "file:edit") performed inside a decky via docker exec. Driven by the realism planner; rare ticks (~3%) carry callback-bearing canary content.
orchestrator.email.{decky_id} Orchestrator {kind: "email", mail_decky_uuid, thread_id, message_id, in_reply_to, sender_email, recipient_email, subject, language, success, ts} — one fake corporate email persona-driven by the realism content engine and dropped into a mail decky's spool. decky_id is the mail decky (the IMAP/POP3 host serving the mailbox). Producer changed from a separate emailgen worker to the unified orchestrator in the realism migration; topic shape is unchanged.
system.orchestrator.health Orchestrator standard worker heartbeat (covers traffic + file + email branches). Carries extra.realism = {llm_enabled, llm_backend, llm_model, llm_breaker_state} so the dashboard's worker panel can render a live LLM/circuit-breaker status badge without reaching into worker memory.
system.orchestrator.control Orchestrator admin-originated stop intents for the orchestrator loop
attacker.observed Correlator first sighting; consumed by decnet enrich as a wake signal
attacker.scored Profiler post-enrichment score update; also wakes decnet enrich
attacker.intel.enriched decnet enrich {attacker_ip, aggregate_verdict, providers} after a threat-intel pass; webhook → SIEM
attacker.fingerprinted Prober {attacker_ip, port, jarm_hash|hassh_server|tcpfp_hash, ...} — fires on every successful active probe result. Distinct from attacker.observed (correlator first-sight); a fingerprint is additional evidence about an already-observed attacker.
attacker.fingerprint_rotated Prober (via decnet.correlation.fingerprint_rotation) {attacker_uuid, attacker_ip, port, probe_type, old_hash, new_hash, rotation_count, ts} — fires only when a probe produces a different hash than the last persisted hash for the same (attacker_uuid, port, probe_type) triple. Carries both old and new hash so consumers don't have to join. Indicates infrastructure churn / VPS rotation / banner rewrite / cert swap. Consumers: dashboard, forensics, attribution clustering.
identity.formed reserved (clusterer) {identity_uuid, observation_uuids: [...], confidence, first_seen_at} — clusterer creates a new identity from one or more observations
identity.observation.linked reserved (clusterer) {identity_uuid, observation_uuid, confidence_after} — observation attached / re-attached to an identity
identity.merged reserved (clusterer) {winner_uuid, loser_uuid, observation_uuids: [...], confidence_after} — two identities collapsed; subscribers re-key cached references to the winner
identity.unmerged reserved (clusterer) {resurrected_uuid, former_winner_uuid, observation_uuids: [...], reason} — revocable-merge undo: contradicting evidence cleared merged_into_uuid; subscribers should re-split cached references back to the resurrected side
identity.campaign.assigned Campaign clusterer {identity_uuid, campaign_uuid, prior_campaign_uuid?} — cross-family signal so identity.> subscribers (e.g. IdentityDetail SSE) see the campaign-id change without needing to subscribe to campaign.>
attribution.profile.state_changed decnet.correlation.attribution_worker {identity_uuid, primitive, old_state, new_state, current_value, confidence, observation_count, ts} — per-(identity, primitive) state-machine transition (e.g. stabledrifting). Emitted only on transition; idempotent observations that don't change state produce no event. State vocabulary is fixed at five values: unknown / stable / drifting / conflicted / multi_actor. See development/ATTRIBUTION-ENGINE.md. v0 ships Phase 1 substrate only (worker logs, no transitions emitted yet); the topic constants ship now so SSE relays can subscribe ahead of Phase 4.
attribution.profile.multi_actor_suspected decnet.correlation.attribution_worker {identity_uuid, primitives: [...], evidence_summary, confidence, ts} — fires when ≥ 2 primitives independently flag the same identity as multi_actor. Single-primitive multi_actor is too noisy to alarm on (flapping primitive on flaky network looks like two operators); the cross-primitive correlator is the real signal. confidence is capped at 0.6 by convention — see _thresholds.MULTI_ACTOR_MAX_CONFIDENCE.
campaign.formed Campaign clusterer {campaign_uuid, identity_uuids: [...], confidence, first_seen_at} — clusterer creates a new campaign from one or more identities
campaign.identity.assigned Campaign clusterer {campaign_uuid, identity_uuid} — identity attached / re-attached to a campaign
campaign.merged Campaign clusterer {winner_uuid, loser_uuid, identity_uuids: [...]} — two campaigns collapsed; subscribers re-key cached references to the winner
campaign.unmerged Campaign clusterer {resurrected_uuid, former_winner_uuid, identity_uuids: [...]} — revocable-merge undo at the campaign layer
canary.{token_id}.placed Planter (API + deploy hook) {token_id, decky_id, kind, instrumenter?, placement_path, placed_at} — a canary artifact was successfully written into a decky's filesystem (or a passive token persisted)
canary.{token_id}.triggered decnet canary worker {token_id, decky_id, src_ip, user_agent?, request_path?, dns_qname?, occurred_at, raw_headers?} — attacker hit the HTTP slug or DNS subdomain; correlator + webhook fanout consume to attribute and forward
canary.{token_id}.revoked API (DELETE /tokens/{id}) {token_id, decky_id, revoked_at} — operator removed a token; subscribers may evict cached lookups by token id
system.canary.health decnet canary worker standard worker heartbeat
smtp.probe.pending Ingester {decky, attacker_ip, stored_as, mail_from, rcpt_to} — fired when an smtp_relay decky stores a new inbound message. The realism worker subscribes and forwards the email to the upstream relay if this source IP has not yet reached probe_limit. stored_as is the quarantine filename (relative to the per-decky smtp dir). mail_from / rcpt_to are the envelope sender/recipients captured at SMTP time. The worker writes a probe_relay bounty row on success or failure so the limit check is DB-backed and survives container restarts.
email.received Ingester (decnet/web/ingester.py:_publish_email_received) {source_id, attacker_uuid, attacker_ip, decky_id, service, subject, from_domain, mail_from_domain, return_path_domain, rcpt_count, rcpt_domains, x_mailer, dkim_signed, spf_pass, urls, attachment_count, attachment_sha256s, attachment_extensions, body_simhash, body_base64_bytes, attachment_macros, attachment_password_protected, html_smuggling, stored_as, body_sha256, mal_hash_match?} — fired on full-message receipt, consumed by the TTP email_lifter. PII discipline (TTP_TAGGING.md "Hard parts §6"): hashes, counts, header names, and rcpt-domain sets only — never rcpt addresses or body bytes. mal_hash_match (DEBT-046, 2026-05-03): boolean, present only when the message had attachments. True when any attachment SHA-256 hits the configured MalHashProvider (default: MalwareBazaar bulk feed, gated on DECNET_MALWAREBAZAAR_AUTH_KEY); False when every hash was checked-and-clean; omitted when there were no attachments at all (so R0046's is True predicate stays silent on hash-less mail, matching the rule's pre-paydown behavior). Per-hash observations also persist to the observed_attachments table — DECNET is a honeypot platform, not just a consumer of intel.
ttp.tagged decnet.ttp.worker {tag_uuids: [...], techniques_added: [...], attacker_uuid?, identity_uuid, session_id?} — published only when INSERT OR IGNORE wrote at least one new row. Idempotent re-evaluations that produce zero new tags publish zero events (loop-prevention invariant — a webhook subscriber re-triggering enrichment on ttp.tagged could otherwise loop forever).
ttp.rule.fired.{technique_id} decnet.ttp.worker {technique_id, tag_uuids: [...], attacker_uuid?, identity_uuid, session_id?} — per-technique fan-out for SIEM correlation rules that subscribe to one technique. Topic key is the parent technique; sub-technique IDs are not promoted to the topic (a T1110.001 tag fires ttp.rule.fired.T1110). Use ttp.rule.fired.> for fleet-wide subscribers.
ttp.rule.suppressed reserved (TTP worker) {rule_id, technique_id, reason} where reason ∈ {"below_floor", "rate_limited", "rule_disabled"} — observability for tags that would have been written but were dropped. Drives the dashboard's per-rule suppression counters. Suppression-event publish is deferred — the v0 worker drops sub-floor confidence at the repo layer and the bus event is unwired.
ttp.rule.reloaded.{rule_id} decnet.ttp.store (FilesystemRuleStore + DatabaseRuleStore) {rule_id, rule_version, deleted?} — fired by the rule store when a rule's definition changes (YAML edit on disk for the FS backend, ttp_rule row update on the DB backend). One event per per-rule edit — never batched. A 5-rule deploy fires 5 events; the engine recompiles each rule alone and atomically swaps it into the dispatch index. Built via topics.ttp_rule_reloaded(rule_id); fleet subscribers use ttp.rule.reloaded.>. deleted: true indicates the rule file was removed.
ttp.rule.state.{rule_id} decnet.ttp.store (FilesystemRuleStore + DatabaseRuleStore) + POST /api/v1/ttp/rules/{rule_id}/state {rule_id, state, set_by, auto_revert?} — fired when a rule's operational state changes (operator hits disable/clip via the API, or an expires_at TTL fires and auto-reverts the state). state ∈ {"enabled", "disabled", "clipped"}. auto_revert: true flags TTL-driven reverts so dashboards can distinguish them from operator actions. Built via topics.ttp_rule_state(rule_id); fleet subscribers use ttp.rule.state.>.
system.log reserved
system.bus.health Bus worker heartbeat {ts, uptime_s}

Topic builders live in decnet.bus.topics. Always use the helpers (topology_mutation(tid, state), topology_status(tid), …) — they reject segments containing ., *, >, or whitespace so a bogus topology id can't silently corrupt the hierarchy.

Renaming an existing topic is a breaking change for every subscriber. Adding a new family is safe.


Environment variables

Variable Default Meaning
DECNET_BUS_ENABLED true Set false to short-circuit get_bus() to NullBus.
DECNET_BUS_TYPE unix unix or fake. fake is the in-process transport used by tests.
DECNET_BUS_SOCKET /run/decnet/bus.sock (falls back to ~/.decnet/bus.sock) Path to the UNIX socket.
DECNET_BUS_GROUP decnet Group that owns the socket file. "" disables chown.

See Environment variables for the full list.


Using the bus from code

Always use the factory. Never import UnixSocketBus, FakeBus, or NullBus directly — mirrors the get_repository() convention.

Publishing

from decnet.bus.factory import get_bus
from decnet.bus import topics

bus = get_bus(client_name="mutator")
await bus.connect()

await bus.publish(
    topics.topology_mutation(topology_id, topics.MUTATION_APPLIED),
    {"mutation_id": mutation_id, "op": "add_lan"},
    event_type=topics.MUTATION_APPLIED,
)

Publishers are fire-and-forget. Wrap publishes in a try/except Exception as exc: log.warning(...) — a bus failure must never break the caller, because the DB write already happened.

Subscribing

from decnet.bus.factory import get_bus

bus = get_bus(client_name="my-worker")
await bus.connect()

sub = bus.subscribe("topology.*.mutation.applied")
async with sub:
    async for event in sub:
        print(event.topic, event.payload)

Subscriptions are async iterators. Wrap them in async with so the client unregisters itself cleanly on exit.

Inside the web API

Request handlers use a shared singleton so the bus socket isn't opened per request:

from decnet.bus.app import get_app_bus

bus = await get_app_bus()          # None if bus is disabled or down
if bus is not None:
    await bus.publish(...)

get_app_bus() returns None on connection failure and remembers the failure for the process lifetime — the API stays healthy without a bus.


Delivery guarantees

At-most-once. Fire-and-forget. A published event may be delivered to every matching subscriber, some of them, or none.

Specifically, the bus drops events when:

  • No subscribers match the topic at publish time.
  • A subscriber's per-connection queue is full (default 1024 events); the oldest event is discarded to make room. This is drop-oldest backpressure — the invariant is that a slow consumer cannot stall publishers.
  • The bus worker is restarting or the client loses its socket connection. In-flight events are lost.

These are features, not bugs. The DB remains source of truth, so the worst-case effect of a dropped event is up to ~N seconds of latency until the next poll. In exchange, publishers never block, never retry, and never have to reason about ordering across subscribers.

If you need durable delivery, write to the DB instead and let consumers poll. Use the bus to shorten the poll interval from seconds to milliseconds, not to replace it.


Worked example: live topology mutations

The mutator + web editor flow is the canonical consumer of the bus today. See MazeNET for the feature-level description.

  1. Operator drags a LAN in the MazeNET editor.
  2. Frontend posts POST /api/v1/topologies/{id}/mutations. The API writes a TopologyMutation row in state pending, then publishes topology.{id}.mutation.enqueued on the bus.
  3. The decnet mutate worker is subscribed to topology.*.mutation.enqueued. The enqueue event flips an asyncio.Event that cancels its 10-second asyncio.sleep, and the reconciler re-enters immediately.
  4. The reconciler claims the row atomically, publishes topology.{id}.mutation.applying, dispatches to the op, then publishes topology.{id}.mutation.applied or .failed.
  5. The SSE route GET /api/v1/topologies/{id}/events forwards every matching topic as an SSE frame to the connected browser. The editor refetches on applied/failed/status and the header flips between LIVE and CONNECTING….

If the bus drops any one of those events, the 10-second mutator poll picks up the pending row (step 3 falls back gracefully) and the editor's next REFRESH button press resyncs (step 5 falls back gracefully). The feature works with a degraded bus — just slower.


Testing

The bus ships with two implementations:

  • UnixSocketBus (production) — talks to a real decnet bus worker over a UNIX socket.
  • FakeBus (tests) — in-process pub/sub, no socket. Select via DECNET_BUS_TYPE=fake or by instantiating directly in a pytest fixture.

Standard test fixtures live in tests/bus/conftest.py:

from decnet.bus.fake import FakeBus

@pytest_asyncio.fixture
async def fake_bus():
    bus = FakeBus()
    await bus.connect()
    yield bus
    await bus.close()

For end-to-end smoke, scripts/bus/smoke.sh boots a real bus worker, publishes one event, and asserts a subscriber sees it. scripts/bus/smoke-mutator.sh does the same for the four mutator-family topics.


Troubleshooting

bus: unavailable in worker logs

Most common causes:

  1. The decnet-bus.service is not running on the host. Check systemctl status decnet-bus.
  2. The worker process isn't in the decnet group. id $(whoami) — if decnet is missing, usermod -aG decnet $(whoami) and log out/in, or pass --group "" for dev.
  3. DECNET_BUS_SOCKET disagrees between the worker and the bus. Make sure both read the same env file.

A missing bus is non-fatal: the mutator drops back to its 10-second poll, the API SSE route idles on keepalives. But you've lost the sub-second push — fix it.

SSE stream connects but events never arrive

If the editor header says CONNECTING… but never flips to LIVE, the SSE route opened but no events have been forwarded. Check in order:

  1. Is the topology actually active or degraded? The editor only streams for live topologies.
  2. Is the mutator worker running? Publishes come from there.
  3. Is the bus worker reachable from both the API process and the mutator? scripts/bus/smoke-mutator.sh narrows this down in a few seconds.

Dropped events under load

The per-subscriber queue is 1024. If you see bus.fake: subscriber queue full, dropped … in logs, your subscriber is slower than the publisher. Either speed up the subscriber's loop body or accept the drop — remember, DB is source of truth.


Future work

  • decnet bus --bridge-tcp for cross-host pub/sub (deferred).
  • Migrating the dashboard /stream endpoint off its internal poll loop onto the bus (a bus consumer fanning out to SSE, parallel to the topology events route).
  • First decky/attacker consumers: live-visualization pulsation driven by decky.{id}.traffic and attacker.observed.