9 Commits

Author SHA1 Message Date
fb522af107 feat(bus): reserve identity.unmerged topic
Revocable merges (a contradiction-driven undo of identity.merged) ship
in the clusterer work; this reserves the topic up-front so identity.>
subscribers receive it day one without a re-subscribe.

The clusterer worker's ClusterResult fan-out now publishes on
identity.unmerged when populated. The skeleton clusterer never
populates it; the revocable-merge commit will.

Wiki update lives in wiki-checkout/Service-Bus.md (separate repo).
2026-04-26 08:10:56 -04:00
4f1077be72 feat(bus): identity.* topic family (formed / observation.linked / merged)
Fourth of the five-step identity-resolution substrate. Constants and
builder ship now; no publishers exist yet — they land with the
clusterer worker. Subscribers (webhook worker, dashboard SSE relay)
can register against identity.> from day one.

* decnet/bus/topics.py — IDENTITY root + IDENTITY_FORMED /
  IDENTITY_OBSERVATION_LINKED / IDENTITY_MERGED leaves; identity()
  builder mirroring the attacker() / system() helpers. Module
  docstring topic-tree updated.
* tests/bus/test_topics.py — assert builder produces the expected
  three topic strings + rejects empty event_type.

Wiki Service-Bus.md and a new Identity-Resolution.md page land in the
companion wiki-checkout commit.
2026-04-26 07:15:44 -04:00
4418608a54 fix(bus): silently drop publishes on closed bus instead of raising
Worker bus instances (collector, ingester) close their private buses
in finally blocks on shutdown, but stream threads holding closure
references kept calling publish after close — one `RuntimeError:
publish on closed bus` per stream line, caught by publish_safely
and logged per call, flooding server logs.

Changes:
- `UnixSocketBus.publish()` now drops post-close calls. First drop
  WARNs loudly (bus is critical infra — silent drops would hide real
  problems); subsequent drops on the same instance log at DEBUG to
  prevent the flood. Sticky `_closed_publish_warned` flag, reset
  naturally per new bus instance.
- `make_thread_safe_publisher` short-circuits on a closed bus before
  marshalling a coroutine onto the loop. Avoids the wasted scheduling
  work in the hot shutdown path.

Degradation is safe: callers go through `publish_safely`, which
already treats exceptions as 'dropped notification, DB is source of
truth.' We just stop manufacturing the exception in the first place
for a known-benign condition.
2026-04-23 18:00:47 -04:00
eb2308d9e1 fix(bus): retry app-bus connect with backoff instead of one-shot veto
A startup race between `decnet bus` being ready and the API's lifespan
hitting `get_app_bus()` at api.py:135 would set `_tried = True`
permanently, poisoning the singleton for the rest of the process: the
dashboard shows BUS OFFLINE, topology SSE falls into the bus-is-None
snapshot-only branch, mutator publish calls no-op. Only an API
restart recovered.

Replaces the one-shot veto with a time-gated retry keyed on a
`_last_failure_ts` monotonic timestamp plus a 2 s backoff. Publishers
on the hot path still pay at most one connect attempt every 2 s when
the bus is down, but the singleton auto-recovers within 5 s (one
dashboard poll) once the bus comes up.

The asyncio lock still serialises concurrent callers so the bus server
doesn't get stampeded with parallel connect attempts on startup.
2026-04-23 17:59:17 -04:00
0fbb07c2ec feat(workers): bus-backed Workers panel (registry, control, installed flag)
Ships the backend half of Config → Workers:

* Worker registry aggregates `system.*.health` + `system.bus.health`
  heartbeats into a last-seen dict; OK / STALE / UNKNOWN tiers drop
  out of a 90s window (3× the 30s heartbeat interval).
* `GET /api/v1/workers` returns the snapshot plus `bus_connected`
  (so the UI can explain "all UNKNOWN" when the bus socket is down)
  and a per-row `installed` flag populated from
  `systemctl list-unit-files decnet-*.service` (cached 30s).
* `POST /api/v1/workers/{name}/stop` publishes a stop intent on
  `system.<name>.control`; workers listen via the shared control
  listener in `bus/publish.py`.
* Heartbeat + control listener wired into collector / profiler /
  sniffer / prober / mutator worker loops. API self-heartbeats too
  so the panel always has one ground-truth row.
* Topic helper `system_control(name)` + tests covering builder
  validation, control listener shutdown path, and the API surface
  (auth gating, bus-connected field, unknown-name 404).

Adds `StartFailure` / `StartAllResponse` models in anticipation of
the upcoming start endpoints (DEBT-034).
2026-04-22 14:10:39 -04:00
5c0631e12c feat(agent,forwarder,updater): publish system.<worker>.health heartbeats (DEBT-031 workers 7-9)
All three workers now share a run_health_heartbeat helper in
decnet.bus.publish.  Each publishes system.<worker>.health on a 30s tick
with {worker, ts} plus optional per-worker extras.  Subscribers can
watch system.*.health to see every DECNET worker on a host at once.

- agent: heartbeat runs inside the FastAPI lifespan alongside the
  existing master-facing heartbeat; bus-disabled path is a no-op.
- forwarder: heartbeat task spawned at run_forwarder entry, cancelled
  in the finally block so a crashed master loop never leaks the task.
- updater: new FastAPI lifespan hosts the heartbeat.

Heartbeat helper swallows extra() failures and is cancellation-safe so
lifespan teardown never hangs on it.
2026-04-21 17:02:10 -04:00
34d9e37ab0 feat(prober): publish attacker.fingerprinted on the bus (DEBT-031)
Each successful JARM / HASSH / TCPfp probe fans out an
attacker.fingerprinted event; the probe family goes in event.type so a
single subscription covers all three.  Payload carries the attacker IP,
port, and probe-specific hash — enough for the MazeNET live map to
render fingerprint info on observed attackers.

Lifts the thread-safe publisher helper out of the sniffer worker into
decnet/bus/publish.py so the prober (and every future worker with a
to_thread hot path) can reuse it without copy-pasting the
run_coroutine_threadsafe dance.  Sniffer rewires onto the shared helper
in passing.

Adds ATTACKER_FINGERPRINTED as a new leaf — distinct from
ATTACKER_OBSERVED (correlator's first-sight signal) because an active
probe result is additional evidence about an already-observed attacker.

Note: the plan's decky.{id}.state realism-probe publish path is
deferred — the current prober fingerprints attackers, not decky
realism.  Will revisit when realism probes exist.
2026-04-21 16:47:55 -04:00
f3eaab5d37 refactor(bus): extract publish_safely + extend topics for DEBT-031
Shared publish_safely helper at decnet/bus/publish.py so the nine
workers about to be wired into the bus don't each copy-paste the
"never raise back at the caller" contract. Mutator drops its private
copy and imports the canonical one.

topics.py gains the attacker.* hierarchy (observed, scored,
session.started, session.ended) and a system_health(worker) builder
for per-worker health heartbeats — both prerequisites for the worker
rollout under DEBT-031.
2026-04-21 16:32:30 -04:00
fbf289ff63 feat(bus): host-local UNIX-socket pub/sub worker (DEBT-029)
Land the `decnet bus` worker and `get_bus()` factory. Transport is a
host-local UNIX-domain socket (0660, group=decnet); authz is the file
mode. Wire framing is a tiny verb-line + 4-byte-BE length + orjson body.
NATS-style wildcard topics (`*`, `>`). At-most-once, fire-and-forget —
DB stays the source of truth. `FakeBus` / `NullBus` for tests and the
disabled path. Cross-host federation is deferred to a future
`--bridge-tcp` mode; DEBT-030 is master-only and unblocked.
2026-04-21 13:49:02 -04:00