From fbf289ff63e3ecff9107a3d44f8d8c6f44eaa299 Mon Sep 17 00:00:00 2001 From: anti Date: Tue, 21 Apr 2026 13:49:02 -0400 Subject: [PATCH] feat(bus): host-local UNIX-socket pub/sub worker (DEBT-029) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Land the `decnet bus` worker and `get_bus()` factory. Transport is a host-local UNIX-domain socket (0660, group=decnet); authz is the file mode. Wire framing is a tiny verb-line + 4-byte-BE length + orjson body. NATS-style wildcard topics (`*`, `>`). At-most-once, fire-and-forget — DB stays the source of truth. `FakeBus` / `NullBus` for tests and the disabled path. Cross-host federation is deferred to a future `--bridge-tcp` mode; DEBT-030 is master-only and unblocked. --- decnet/bus/__init__.py | 18 ++ decnet/bus/base.py | 205 ++++++++++++++++++++ decnet/bus/factory.py | 85 ++++++++ decnet/bus/fake.py | 183 ++++++++++++++++++ decnet/bus/protocol.py | 144 ++++++++++++++ decnet/bus/topics.py | 106 ++++++++++ decnet/bus/unix_client.py | 237 +++++++++++++++++++++++ decnet/bus/unix_server.py | 309 ++++++++++++++++++++++++++++++ decnet/bus/worker.py | 121 ++++++++++++ decnet/cli/__init__.py | 3 +- decnet/cli/bus.py | 45 +++++ decnet/env.py | 9 + deploy/decnet-bus.service | 43 +++++ development/DEBT.md | 50 ++++- tests/bus/__init__.py | 0 tests/bus/conftest.py | 59 ++++++ tests/bus/test_base.py | 66 +++++++ tests/bus/test_factory.py | 52 +++++ tests/bus/test_fake_bus.py | 108 +++++++++++ tests/bus/test_protocol.py | 87 +++++++++ tests/bus/test_topics.py | 42 ++++ tests/bus/test_unix_socket_bus.py | 131 +++++++++++++ tests/bus/test_worker.py | 68 +++++++ 23 files changed, 2167 insertions(+), 4 deletions(-) create mode 100644 decnet/bus/__init__.py create mode 100644 decnet/bus/base.py create mode 100644 decnet/bus/factory.py create mode 100644 decnet/bus/fake.py create mode 100644 decnet/bus/protocol.py create mode 100644 decnet/bus/topics.py create mode 100644 decnet/bus/unix_client.py create mode 100644 decnet/bus/unix_server.py create mode 100644 decnet/bus/worker.py create mode 100644 decnet/cli/bus.py create mode 100644 deploy/decnet-bus.service create mode 100644 tests/bus/__init__.py create mode 100644 tests/bus/conftest.py create mode 100644 tests/bus/test_base.py create mode 100644 tests/bus/test_factory.py create mode 100644 tests/bus/test_fake_bus.py create mode 100644 tests/bus/test_protocol.py create mode 100644 tests/bus/test_topics.py create mode 100644 tests/bus/test_unix_socket_bus.py create mode 100644 tests/bus/test_worker.py diff --git a/decnet/bus/__init__.py b/decnet/bus/__init__.py new file mode 100644 index 00000000..1fc4d87f --- /dev/null +++ b/decnet/bus/__init__.py @@ -0,0 +1,18 @@ +"""DECNET ServiceBus — pub/sub notification substrate. + +The bus is the notification layer for DECNET's worker constellation. The DB +remains the source of truth for anything durable; the bus carries "something +happened, go look" events. Delivery is at-most-once, fire-and-forget. + +Consumers call :func:`get_bus` from :mod:`decnet.bus.factory`; never import +transport implementations directly. The factory selects the backend via +``DECNET_BUS_TYPE`` (``nats`` or ``fake``) and honors ``DECNET_BUS_ENABLED``. + +Topic hierarchy is defined in :mod:`decnet.bus.topics` and locked early so +consumers can subscribe with stable wildcard patterns. +""" +from __future__ import annotations + +from decnet.bus.base import BaseBus, Event, Subscription + +__all__ = ["BaseBus", "Event", "Subscription"] diff --git a/decnet/bus/base.py b/decnet/bus/base.py new file mode 100644 index 00000000..8edd1724 --- /dev/null +++ b/decnet/bus/base.py @@ -0,0 +1,205 @@ +"""Bus abstractions: the :class:`Event` envelope and the :class:`BaseBus` ABC. + +Every transport (NATS, in-process fake, null) speaks this contract. The +envelope is versioned (``v``) so future evolution never breaks deployed +consumers that happen to see a newer event shape. + +Subscription model: :meth:`BaseBus.subscribe` returns a :class:`Subscription` +that is an async context manager AND an async iterator. The expected usage is: + + async with bus.subscribe("topology.*.mutation.*") as sub: + async for event in sub: + handle(event) + +Leaving the ``async with`` releases the underlying subscription handle; the +transport is free to drop any buffered events after that point. +""" +from __future__ import annotations + +import abc +import asyncio +import time +import uuid +from dataclasses import dataclass, field +from typing import Any, AsyncIterator + +EVENT_SCHEMA_VERSION = 1 + + +@dataclass(frozen=True) +class Event: + """The bus envelope. + + ``v`` is the envelope schema version, bumped on incompatible shape + changes. ``type`` is a short discriminator (``"mutation.applied"``, + ``"decky.state"``) useful for consumers that subscribe to a broad + wildcard and dispatch in Python; it is redundant with the trailing + segments of ``topic`` but cheaper to inspect. ``ts`` is epoch seconds + (float). ``id`` is a random UUID so consumers can de-dupe if they + ever see the same event twice (not expected at-most-once, but cheap + insurance). + """ + + topic: str + payload: dict[str, Any] + type: str = "" + v: int = EVENT_SCHEMA_VERSION + ts: float = field(default_factory=time.time) + id: str = field(default_factory=lambda: uuid.uuid4().hex) + + def to_dict(self) -> dict[str, Any]: + return { + "v": self.v, + "id": self.id, + "topic": self.topic, + "type": self.type, + "ts": self.ts, + "payload": self.payload, + } + + @classmethod + def from_dict(cls, topic: str, data: dict[str, Any]) -> "Event": + """Reconstruct an Event from a wire-format dict. + + ``topic`` is passed explicitly because the transport knows which + subject the message arrived on; trusting a ``topic`` field from the + wire would let a misbehaving publisher spoof events on topics they + don't actually publish to. + """ + return cls( + topic=topic, + payload=data.get("payload", {}) or {}, + type=data.get("type", "") or "", + v=int(data.get("v", EVENT_SCHEMA_VERSION)), + ts=float(data.get("ts", time.time())), + id=data.get("id") or uuid.uuid4().hex, + ) + + +class Subscription(abc.ABC): + """An open subscription — async context manager + async iterator. + + Concrete transports subclass this and implement :meth:`_aclose` plus the + async iterator protocol. Callers should not instantiate directly; use + :meth:`BaseBus.subscribe`. + """ + + def __init__(self, pattern: str) -> None: + self.pattern = pattern + self._closed = False + + async def __aenter__(self) -> "Subscription": + return self + + async def __aexit__(self, *exc: Any) -> None: + await self.aclose() + + def __aiter__(self) -> AsyncIterator[Event]: + return self + + async def aclose(self) -> None: + if self._closed: + return + self._closed = True + await self._aclose() + + @abc.abstractmethod + async def __anext__(self) -> Event: # pragma: no cover - abstract + raise NotImplementedError + + @abc.abstractmethod + async def _aclose(self) -> None: # pragma: no cover - abstract + raise NotImplementedError + + +class BaseBus(abc.ABC): + """Pub/sub transport contract. + + Implementations MUST be safe to ``await connect()`` multiple times and + ``await close()`` multiple times. Publishing to a closed bus raises + :class:`RuntimeError`; subscribing to a closed bus does too. + """ + + @abc.abstractmethod + async def connect(self) -> None: + """Establish any network/transport resources. Idempotent.""" + + @abc.abstractmethod + async def publish( + self, + topic: str, + payload: dict[str, Any], + *, + event_type: str = "", + ) -> None: + """Publish *payload* on *topic*. Fire-and-forget. + + Delivery is at-most-once. On transport error the implementation + logs and returns; it does not raise, because bus losses must not + cascade into worker failure (DB is source of truth). + """ + + @abc.abstractmethod + def subscribe(self, pattern: str) -> Subscription: + """Return a :class:`Subscription` that yields events matching *pattern*. + + Patterns follow NATS wildcard semantics: ``*`` matches one topic + token, ``>`` matches one-or-more trailing tokens. Examples: + + * ``topology.*.mutation.applied`` — all ``applied`` events for any + topology. + * ``topology.abc123.mutation.*`` — all mutation states for one + topology. + * ``topology.>`` — every event under the ``topology`` root. + """ + + @abc.abstractmethod + async def close(self) -> None: + """Tear down transport resources. Idempotent.""" + + async def __aenter__(self) -> "BaseBus": + await self.connect() + return self + + async def __aexit__(self, *exc: Any) -> None: + await self.close() + + +# ─── Wildcard matching shared across in-process transports ─────────────────── + +def matches(pattern: str, topic: str) -> bool: + """Return True iff *topic* matches *pattern* under NATS wildcard rules. + + ``*`` matches exactly one non-empty token; ``>`` matches one-or-more + trailing tokens (so ``topology.>`` matches ``topology.abc.x`` but not + ``topology`` alone). + """ + p_tokens = pattern.split(".") + t_tokens = topic.split(".") + for i, p in enumerate(p_tokens): + if p == ">": + # Must have at least one token remaining to match. + return i < len(t_tokens) + if i >= len(t_tokens): + return False + if p == "*": + if not t_tokens[i]: + return False + continue + if p != t_tokens[i]: + return False + return len(p_tokens) == len(t_tokens) + + +# Sentinel used by the in-process transports to signal "no more events" +# through the asyncio.Queue fan-out without inventing a separate control +# channel. Not part of the wire protocol. +_CLOSE_SENTINEL: Any = object() + + +async def _next_or_stop(queue: "asyncio.Queue[Any]") -> Event: + """Pop the next item from *queue*, raising ``StopAsyncIteration`` on close.""" + item = await queue.get() + if item is _CLOSE_SENTINEL: + raise StopAsyncIteration + return item diff --git a/decnet/bus/factory.py b/decnet/bus/factory.py new file mode 100644 index 00000000..f7a935ac --- /dev/null +++ b/decnet/bus/factory.py @@ -0,0 +1,85 @@ +"""Bus factory — selects a :class:`~decnet.bus.base.BaseBus` implementation. + +Dispatch key: the ``DECNET_BUS_TYPE`` environment variable. + +* ``unix`` (default) → :class:`~decnet.bus.unix_client.UnixSocketBus` +* ``fake`` → :class:`~decnet.bus.fake.FakeBus` (in-process) + +If ``DECNET_BUS_ENABLED`` is ``"false"`` the factory short-circuits to +:class:`~decnet.bus.fake.NullBus` regardless of ``DECNET_BUS_TYPE`` — a +cheap way for dev environments to run workers without a bus daemon. + +Mirrors :mod:`decnet.web.db.factory` (lazy imports inside each branch, +env-driven dispatch, optional telemetry wrapping). Callers MUST use +:func:`get_bus` rather than instantiating transports directly. +""" +from __future__ import annotations + +import os +from typing import Any + +from decnet.bus.base import BaseBus + + +def get_bus(**kwargs: Any) -> BaseBus: + """Instantiate the bus implementation selected by environment. + + Keyword arguments are forwarded to the concrete transport: + + * ``UnixSocketBus`` accepts ``socket_path`` (overrides + ``DECNET_BUS_SOCKET``) and ``client_name``. + * ``FakeBus`` accepts ``queue_size``. + """ + if os.environ.get("DECNET_BUS_ENABLED", "true").lower() == "false": + from decnet.bus.fake import NullBus + return NullBus() + + bus_type = os.environ.get("DECNET_BUS_TYPE", "unix").lower() + + if bus_type == "unix": + from decnet.bus.unix_client import UnixSocketBus + socket_path = kwargs.pop("socket_path", None) or _default_socket_path() + bus: BaseBus = UnixSocketBus(socket_path=socket_path, **kwargs) + elif bus_type == "fake": + from decnet.bus.fake import FakeBus + bus = FakeBus(**kwargs) + else: + raise ValueError(f"Unsupported bus type: {bus_type}") + + return _maybe_wrap_telemetry(bus) + + +def _default_socket_path() -> str: + """Return the bus socket path honoring ``DECNET_BUS_SOCKET`` and falling + back to ``/run/decnet/bus.sock`` → ``~/.decnet/bus.sock``. + + The runtime path (``/run/decnet``) is preferred because systemd + ``RuntimeDirectory=decnet`` sets it up with the right perms; the home + fallback keeps dev boxes usable without systemd. + """ + explicit = os.environ.get("DECNET_BUS_SOCKET") + if explicit: + return explicit + + runtime_dir = "/run/decnet" + if os.path.isdir(runtime_dir) and os.access(runtime_dir, os.W_OK): + return f"{runtime_dir}/bus.sock" + return os.path.expanduser("~/.decnet/bus.sock") + + +def _maybe_wrap_telemetry(bus: BaseBus) -> BaseBus: + """Wrap *bus* in a tracing proxy if OTEL is enabled, else return as-is. + + Uses :func:`decnet.telemetry.wrap_repository` as the underlying proxy — + its implementation is generic (wraps any async method in a span), so we + reuse it with a bus-appropriate tracer name. If telemetry isn't wired + up at all we no-op. + """ + try: + from decnet.telemetry import wrap_repository # type: ignore[attr-defined] + except ImportError: + return bus + try: + return wrap_repository(bus) + except Exception: # pragma: no cover - defensive + return bus diff --git a/decnet/bus/fake.py b/decnet/bus/fake.py new file mode 100644 index 00000000..9f6a26a9 --- /dev/null +++ b/decnet/bus/fake.py @@ -0,0 +1,183 @@ +"""In-process bus transports. + +* :class:`FakeBus` — real pub/sub semantics without touching a socket. Used + by unit tests and anywhere ``DECNET_BUS_TYPE=fake`` is set. Lets code + that depends on the bus be exercised entirely inside a single event loop, + matching the DECNET testing convention of not opening real network + sockets from unit tests. +* :class:`NullBus` — no-op. Returned by :func:`~decnet.bus.factory.get_bus` + when ``DECNET_BUS_ENABLED=false`` so workers can start cleanly in dev + environments where no bus daemon is running. Publishes are dropped; + subscriptions yield nothing and close cleanly. +""" +from __future__ import annotations + +import asyncio +from typing import Any + +from decnet.bus.base import ( + BaseBus, + Event, + Subscription, + _CLOSE_SENTINEL, + matches, +) +from decnet.logging import get_logger + +log = get_logger("bus.fake") + +# Per-subscriber bounded queue: backpressure policy is drop-oldest so a slow +# consumer cannot stall publishers (the invariant — DB is the source of +# truth — makes dropped events acceptable). +_DEFAULT_QUEUE_SIZE = 1024 + + +# ─── FakeBus ───────────────────────────────────────────────────────────────── + + +class _FakeSubscription(Subscription): + """Subscription backed by an :class:`asyncio.Queue` fed from + :meth:`FakeBus.publish`. Unregisters itself on close.""" + + def __init__(self, bus: "FakeBus", pattern: str, queue: "asyncio.Queue[Any]") -> None: + super().__init__(pattern) + self._bus = bus + self._queue = queue + + async def __anext__(self) -> Event: + if self._closed: + raise StopAsyncIteration + item = await self._queue.get() + if item is _CLOSE_SENTINEL: + raise StopAsyncIteration + return item + + async def _aclose(self) -> None: + self._bus._unregister(self) + # Unblock any pending __anext__ waiter. + try: + self._queue.put_nowait(_CLOSE_SENTINEL) + except asyncio.QueueFull: + pass + + +class FakeBus(BaseBus): + """In-process pub/sub. + + Publishes iterate every active subscription and enqueue the event on + the ones whose pattern matches the topic. If a subscriber's queue is + full, the oldest event is discarded to make room — same at-most-once + semantics as the real UNIX-socket transport. + """ + + def __init__(self, queue_size: int = _DEFAULT_QUEUE_SIZE) -> None: + self._queue_size = queue_size + self._subs: list[_FakeSubscription] = [] + self._connected = False + self._closed = False + self._lock = asyncio.Lock() + + async def connect(self) -> None: + self._connected = True + + async def publish( + self, + topic: str, + payload: dict[str, Any], + *, + event_type: str = "", + ) -> None: + if self._closed: + raise RuntimeError("publish on closed bus") + event = Event(topic=topic, payload=payload, type=event_type) + async with self._lock: + targets = [s for s in self._subs if matches(s.pattern, topic)] + for sub in targets: + _enqueue_drop_oldest(sub._queue, event) + + def subscribe(self, pattern: str) -> Subscription: + if self._closed: + raise RuntimeError("subscribe on closed bus") + queue: asyncio.Queue[Any] = asyncio.Queue(maxsize=self._queue_size) + sub = _FakeSubscription(self, pattern, queue) + self._subs.append(sub) + return sub + + def _unregister(self, sub: _FakeSubscription) -> None: + try: + self._subs.remove(sub) + except ValueError: + pass + + async def close(self) -> None: + if self._closed: + return + self._closed = True + # Wake every still-open subscription so iterators unblock cleanly. + for sub in list(self._subs): + try: + sub._queue.put_nowait(_CLOSE_SENTINEL) + except asyncio.QueueFull: + pass + self._subs.clear() + + +def _enqueue_drop_oldest(queue: "asyncio.Queue[Any]", event: Event) -> None: + """Put *event* on *queue*, dropping the oldest item if the queue is full. + + Factored out so both FakeBus and the real UNIX server share the exact + same backpressure policy. + """ + while True: + try: + queue.put_nowait(event) + return + except asyncio.QueueFull: + try: + dropped = queue.get_nowait() + log.warning( + "bus.fake: subscriber queue full, dropped %s", getattr(dropped, "topic", "?") + ) + except asyncio.QueueEmpty: + return + + +# ─── NullBus ───────────────────────────────────────────────────────────────── + + +class _NullSubscription(Subscription): + """A subscription that never yields and closes immediately on iteration.""" + + async def __anext__(self) -> Event: + raise StopAsyncIteration + + async def _aclose(self) -> None: + return + + +class NullBus(BaseBus): + """No-op bus used when ``DECNET_BUS_ENABLED=false``. + + Publishes are silently dropped; subscriptions are empty. Intended for + dev environments where no bus daemon is running — the process starts + cleanly, code that publishes doesn't need feature flags, and nothing + ever blocks on a subscriber. + """ + + async def connect(self) -> None: + return + + async def publish( + self, + topic: str, + payload: dict[str, Any], + *, + event_type: str = "", + ) -> None: + return + + def subscribe(self, pattern: str) -> Subscription: + return _NullSubscription(pattern) + + async def close(self) -> None: + return diff --git a/decnet/bus/protocol.py b/decnet/bus/protocol.py new file mode 100644 index 00000000..a0f2f2eb --- /dev/null +++ b/decnet/bus/protocol.py @@ -0,0 +1,144 @@ +"""Wire protocol for the DECNET bus UNIX-socket transport. + +Frame layout: + + []\\n # ASCII header, single line, no trailing space + <4-byte big-endian body length> + # orjson-serialized dict, or empty (length 0) + +Verbs: + +* ``HELLO `` — optional greeting, logged by server. Body empty. +* ``PUB `` — publisher → server. Body = payload dict. +* ``SUB `` — subscriber → server. Body empty. +* ``UNSUB `` — subscriber → server. Body empty. +* ``EVT `` — server → subscriber. Body = payload dict (wrapped + in an :class:`~decnet.bus.base.Event` envelope). +* ``BYE`` — either direction. Body empty. Graceful shutdown. + +Parsing rules: + +* The header is a single line terminated by ``\\n`` (LF). ``\\r`` is tolerated + but not required. +* Header tokens are whitespace-separated. The first token is the verb; + everything after is verb-specific. We split on the first space only so + topics / patterns with quoted content are not supported (they are not + needed — topic segments forbid whitespace per :mod:`decnet.bus.topics`). +* Maximum header length is 4096 bytes; maximum body length is 1 MiB. Beyond + those, the connection is dropped with a logged error. This is a honeypot + framework, not a general-purpose message broker; a malformed frame is + treated as hostile. +""" +from __future__ import annotations + +import asyncio +import struct +from dataclasses import dataclass +from typing import Any + +import orjson + +MAX_HEADER_BYTES = 4096 +MAX_BODY_BYTES = 1 * 1024 * 1024 # 1 MiB + +# Verb constants (callers should reference these, not bare strings). +HELLO = "HELLO" +PUB = "PUB" +SUB = "SUB" +UNSUB = "UNSUB" +EVT = "EVT" +BYE = "BYE" + +_VALID_VERBS = frozenset({HELLO, PUB, SUB, UNSUB, EVT, BYE}) + + +class ProtocolError(Exception): + """Malformed or oversized frame. Callers should close the connection.""" + + +@dataclass(frozen=True) +class Frame: + """A parsed frame. ``body`` is the raw (unparsed) body bytes — callers + decide whether to orjson-decode it (the protocol does not know whether + a given verb expects a dict body or an empty one). + """ + + verb: str + args: str # everything after the verb on the header line, trimmed + body: bytes + + +def encode(verb: str, args: str = "", body: dict[str, Any] | None = None) -> bytes: + """Serialize a frame. + + *body* is a dict that will be orjson-encoded, or ``None`` for an empty + body. The header line is written verbatim — callers must supply args + that are free of ``\\n``. + """ + if verb not in _VALID_VERBS: + raise ProtocolError(f"unknown verb {verb!r}") + if "\n" in args or "\r" in args: + raise ProtocolError("args must not contain newline characters") + + body_bytes = b"" if body is None else orjson.dumps(body) + if len(body_bytes) > MAX_BODY_BYTES: + raise ProtocolError( + f"body {len(body_bytes)} bytes exceeds max {MAX_BODY_BYTES}" + ) + + header = f"{verb} {args}".rstrip() + "\n" + header_bytes = header.encode("ascii") + if len(header_bytes) > MAX_HEADER_BYTES: + raise ProtocolError( + f"header {len(header_bytes)} bytes exceeds max {MAX_HEADER_BYTES}" + ) + return header_bytes + struct.pack(">I", len(body_bytes)) + body_bytes + + +async def read_frame(reader: asyncio.StreamReader) -> Frame | None: + """Read one frame from *reader*. + + Returns ``None`` on clean EOF before a new frame starts. Raises + :class:`ProtocolError` on malformed input (caller should close the + connection). + """ + try: + header = await reader.readuntil(b"\n") + except asyncio.IncompleteReadError as exc: + if not exc.partial: + return None + raise ProtocolError("connection closed mid-header") from exc + except asyncio.LimitOverrunError as exc: + raise ProtocolError("header exceeded buffer limit") from exc + + if len(header) > MAX_HEADER_BYTES: + raise ProtocolError(f"header {len(header)} bytes exceeds max") + + line = header.rstrip(b"\r\n").decode("ascii", errors="strict") + if not line: + raise ProtocolError("empty header line") + + verb, _, args = line.partition(" ") + if verb not in _VALID_VERBS: + raise ProtocolError(f"unknown verb {verb!r}") + + length_bytes = await reader.readexactly(4) + (body_len,) = struct.unpack(">I", length_bytes) + if body_len > MAX_BODY_BYTES: + raise ProtocolError(f"body length {body_len} exceeds max") + + body = await reader.readexactly(body_len) if body_len else b"" + return Frame(verb=verb, args=args.strip(), body=body) + + +def decode_body(body: bytes) -> dict[str, Any]: + """Decode a frame body as a JSON dict. Empty body → empty dict.""" + if not body: + return {} + try: + obj = orjson.loads(body) + except orjson.JSONDecodeError as exc: + raise ProtocolError(f"body is not valid JSON: {exc}") from exc + if not isinstance(obj, dict): + raise ProtocolError(f"body must be a JSON object, got {type(obj).__name__}") + return obj diff --git a/decnet/bus/topics.py b/decnet/bus/topics.py new file mode 100644 index 00000000..27e03184 --- /dev/null +++ b/decnet/bus/topics.py @@ -0,0 +1,106 @@ +"""Canonical topic hierarchy for the DECNET ServiceBus. + +Locked early so consumers can subscribe with stable wildcard patterns. +Adding new topic families is fine; **renaming** existing ones is a breaking +change for every subscriber and requires a coordinated rollout. + +Token structure (NATS-style, dot-separated): + + topology.{topology_id}.mutation.{state} + topology.{topology_id}.status + decky.{decky_id}.state + decky.{decky_id}.traffic + attacker.observed + system.log + system.bus.health + +Wildcards (per :func:`decnet.bus.base.matches`): + +* ``*`` matches exactly one token. +* ``>`` matches one-or-more trailing tokens (so ``topology.>`` matches + ``topology.abc.status`` but not the bare root ``topology``). +""" +from __future__ import annotations + +# ─── Root prefixes ─────────────────────────────────────────────────────────── + +TOPOLOGY = "topology" +DECKY = "decky" +ATTACKER = "attacker" +SYSTEM = "system" + + +# ─── Leaf event-type constants (the last segment of each topic) ────────────── + +# Topology mutation lifecycle states — keep in sync with TopologyMutation.state +# in decnet/web/db/models.py; the bus topic mirrors the DB state machine. +MUTATION_ENQUEUED = "enqueued" +MUTATION_APPLYING = "applying" +MUTATION_APPLIED = "applied" +MUTATION_FAILED = "failed" + +# Topology-level status transitions (topology.{id}.status): fires when the +# topology row's status column changes (pending/deploying/active/degraded/failed). +TOPOLOGY_STATUS = "status" + +# Decky-level event types (second token). +DECKY_STATE = "state" +DECKY_TRAFFIC = "traffic" + +# System event types. +SYSTEM_LOG = "log" +SYSTEM_BUS_HEALTH = "bus.health" + + +# ─── Builders ──────────────────────────────────────────────────────────────── + +def topology_mutation(topology_id: str, state: str) -> str: + """Build ``topology..mutation.``. + + *state* should be one of the ``MUTATION_*`` constants. + """ + _reject_tokens(topology_id, state) + return f"{TOPOLOGY}.{topology_id}.mutation.{state}" + + +def topology_status(topology_id: str) -> str: + """Build ``topology..status``.""" + _reject_tokens(topology_id) + return f"{TOPOLOGY}.{topology_id}.{TOPOLOGY_STATUS}" + + +def decky(decky_id: str, event_type: str) -> str: + """Build ``decky..``. + + *event_type* is typically one of ``DECKY_STATE`` or ``DECKY_TRAFFIC``. + """ + _reject_tokens(decky_id, event_type) + return f"{DECKY}.{decky_id}.{event_type}" + + +def system(event_type: str) -> str: + """Build ``system.``. + + *event_type* may itself contain dots (e.g. ``bus.health``) — we don't + re-validate the already-constant leaves; this just prefixes. + """ + if not event_type: + raise ValueError("system topic requires a non-empty event_type") + return f"{SYSTEM}.{event_type}" + + +def _reject_tokens(*parts: str) -> None: + """Reject topic segments that would break NATS-style tokenization. + + Dots, wildcards, whitespace, and empty strings in a *segment* would + silently corrupt the hierarchy (e.g. ``topology.a.b.status`` for a + ``topology_id`` of ``"a.b"``). Raise early at the builder instead of + shipping a malformed topic to the wire. + """ + for p in parts: + if not p: + raise ValueError("topic segment must not be empty") + if "." in p or "*" in p or ">" in p or any(c.isspace() for c in p): + raise ValueError( + f"topic segment {p!r} may not contain '.', '*', '>', or whitespace" + ) diff --git a/decnet/bus/unix_client.py b/decnet/bus/unix_client.py new file mode 100644 index 00000000..1c5d1659 --- /dev/null +++ b/decnet/bus/unix_client.py @@ -0,0 +1,237 @@ +"""UNIX-socket client — :class:`UnixSocketBus` implementation of :class:`BaseBus`. + +Holds one open socket to the local :class:`~decnet.bus.unix_server.BusServer`. +Operations: + +* :meth:`publish` writes a single ``PUB`` frame and returns; no ack. +* :meth:`subscribe` writes a ``SUB`` frame and returns a + :class:`~decnet.bus.base.Subscription` backed by an :class:`asyncio.Queue` + that the background reader task feeds. + +One background reader task per bus instance dispatches incoming ``EVT`` +frames to every registered subscription whose pattern matches the topic. +On connection drop or close, every subscription is woken via a sentinel so +iterators unblock cleanly; callers see :class:`StopAsyncIteration` from the +``async for`` loop. + +No auto-reconnect in MVP. If the server restarts, callers must +:meth:`close` the bus and construct a new one. This mirrors how other +DECNET workers handle their dependencies — the systemd ``Restart=on-failure`` +supervision above us is the retry loop. +""" +from __future__ import annotations + +import asyncio +import contextlib +import os +import pathlib +from typing import Any + +from decnet.bus import protocol +from decnet.bus.base import ( + BaseBus, + Event, + Subscription, + _CLOSE_SENTINEL, + matches, +) +from decnet.bus.fake import _enqueue_drop_oldest as _enqueue_event_drop_oldest +from decnet.logging import get_logger + +log = get_logger("bus.client") + +_INBOUND_QUEUE_SIZE = 1024 + + +class _UnixSubscription(Subscription): + def __init__( + self, + bus: "UnixSocketBus", + pattern: str, + queue: "asyncio.Queue[Any]", + ) -> None: + super().__init__(pattern) + self._bus = bus + self._queue = queue + + async def __anext__(self) -> Event: + if self._closed: + raise StopAsyncIteration + item = await self._queue.get() + if item is _CLOSE_SENTINEL: + raise StopAsyncIteration + return item + + async def _aclose(self) -> None: + await self._bus._unregister(self) + try: + self._queue.put_nowait(_CLOSE_SENTINEL) + except asyncio.QueueFull: + pass + + +class UnixSocketBus(BaseBus): + """Client handle for a local :class:`BusServer`. + + One instance per process typically; multiple instances simply open + multiple sockets to the same server. Connection is lazy — the first + :meth:`connect` (or any publish/subscribe call via ``async with``) + opens the socket. + """ + + def __init__( + self, + socket_path: pathlib.Path | str, + *, + client_name: str | None = None, + ) -> None: + self._path = pathlib.Path(socket_path) + self._client_name = client_name or f"decnet-bus-client[{os.getpid()}]" + self._reader: asyncio.StreamReader | None = None + self._writer: asyncio.StreamWriter | None = None + self._reader_task: asyncio.Task[None] | None = None + self._subs: list[_UnixSubscription] = [] + self._lock = asyncio.Lock() + self._write_lock = asyncio.Lock() + self._closed = False + + # ─── Lifecycle ────────────────────────────────────────────────────────── + + async def connect(self) -> None: + if self._writer is not None: + return + if self._closed: + raise RuntimeError("connect on closed bus") + self._reader, self._writer = await asyncio.open_unix_connection(str(self._path)) + await self._send(protocol.encode(protocol.HELLO, args=self._client_name)) + self._reader_task = asyncio.create_task(self._reader_loop()) + log.debug("bus.client: connected to %s as %s", self._path, self._client_name) + + async def close(self) -> None: + if self._closed: + return + self._closed = True + + # Best-effort BYE — we don't care if it fails. + if self._writer is not None and not self._writer.is_closing(): + with contextlib.suppress(Exception): + await self._send(protocol.encode(protocol.BYE)) + + if self._reader_task is not None: + self._reader_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._reader_task + self._reader_task = None + + if self._writer is not None: + with contextlib.suppress(Exception): + self._writer.close() + await self._writer.wait_closed() + self._writer = None + self._reader = None + + # Wake every subscription so `async for` exits. + for sub in list(self._subs): + with contextlib.suppress(asyncio.QueueFull): + sub._queue.put_nowait(_CLOSE_SENTINEL) + self._subs.clear() + + # ─── Pub/Sub ──────────────────────────────────────────────────────────── + + async def publish( + self, + topic: str, + payload: dict[str, Any], + *, + event_type: str = "", + ) -> None: + if self._closed: + raise RuntimeError("publish on closed bus") + if self._writer is None: + await self.connect() + body = Event(topic=topic, payload=payload, type=event_type).to_dict() + try: + await self._send(protocol.encode(protocol.PUB, args=topic, body=body)) + except (ConnectionError, BrokenPipeError) as exc: + # Bus loss is a logged warning, never a publisher crash. The + # DB-as-source-of-truth invariant means the work is already + # persisted; the missing event is just a missed notification. + log.warning("bus.client: publish failed: %s", exc) + + def subscribe(self, pattern: str) -> Subscription: + if self._closed: + raise RuntimeError("subscribe on closed bus") + queue: asyncio.Queue[Any] = asyncio.Queue(maxsize=_INBOUND_QUEUE_SIZE) + sub = _UnixSubscription(self, pattern, queue) + self._subs.append(sub) + # Schedule the SUB frame asynchronously so subscribe() stays sync, + # matching the BaseBus signature. The caller will shortly `async + # with` / `async for` the subscription, which will run the event + # loop and pick this task up. + asyncio.ensure_future(self._send_sub(pattern)) + return sub + + async def _send_sub(self, pattern: str) -> None: + try: + if self._writer is None: + await self.connect() + await self._send(protocol.encode(protocol.SUB, args=pattern)) + except Exception as exc: # pragma: no cover - network paths in live tests + log.warning("bus.client: SUB %s failed: %s", pattern, exc) + + async def _unregister(self, sub: _UnixSubscription) -> None: + try: + self._subs.remove(sub) + except ValueError: + return + # Tell the server we no longer want events for this pattern if no + # other local subscription still wants it. + if not any(s.pattern == sub.pattern for s in self._subs): + with contextlib.suppress(Exception): + await self._send(protocol.encode(protocol.UNSUB, args=sub.pattern)) + + # ─── Internal I/O ─────────────────────────────────────────────────────── + + async def _send(self, frame_bytes: bytes) -> None: + if self._writer is None: + raise ConnectionError("bus.client: not connected") + async with self._write_lock: + self._writer.write(frame_bytes) + await self._writer.drain() + + async def _reader_loop(self) -> None: + if self._reader is None: + return + try: + while True: + frame = await protocol.read_frame(self._reader) + if frame is None: + break + if frame.verb != protocol.EVT: + # Clients only ever legitimately receive EVT (or BYE). + if frame.verb == protocol.BYE: + break + log.warning("bus.client: unexpected verb from server: %s", frame.verb) + continue + topic = frame.args + data = protocol.decode_body(frame.body) if frame.body else {} + event = Event.from_dict(topic, data) + self._dispatch(event) + except protocol.ProtocolError as exc: + log.warning("bus.client: protocol error: %s", exc) + except (asyncio.IncompleteReadError, ConnectionError): + pass + except asyncio.CancelledError: + raise + except Exception: # pragma: no cover + log.exception("bus.client: reader loop crashed") + finally: + # Server-side close — wake every subscription. + for sub in list(self._subs): + with contextlib.suppress(asyncio.QueueFull): + sub._queue.put_nowait(_CLOSE_SENTINEL) + + def _dispatch(self, event: Event) -> None: + for sub in self._subs: + if matches(sub.pattern, event.topic): + _enqueue_event_drop_oldest(sub._queue, event) diff --git a/decnet/bus/unix_server.py b/decnet/bus/unix_server.py new file mode 100644 index 00000000..502a8dcf --- /dev/null +++ b/decnet/bus/unix_server.py @@ -0,0 +1,309 @@ +"""UNIX-socket server for the DECNET bus. + +One :class:`BusServer` per host. Accepts local connections on a UNIX-domain +socket; each connection may: + +* publish events (``PUB`` frames) that the server fans out to all matching + subscribers on other connections, and +* subscribe to patterns (``SUB`` frames) and receive matching events as + ``EVT`` frames. + +Authorization is socket file permissions (0660, group=``decnet`` if that +POSIX group exists, else the server process's own group). Anything the +kernel lets ``connect()`` is trusted — there is no verb-level auth. This +matches the "local processes on the same host" threat model; cross-host +federation is out of scope (see DEBT-029). + +Backpressure is per-connection, drop-oldest: if a subscriber can't drain its +outbound queue fast enough, the server discards the oldest pending event +rather than blocking publishers. The bus is at-most-once by contract, so +drops are acceptable; stalled publishers are not. +""" +from __future__ import annotations + +import asyncio +import contextlib +import grp +import os +import pathlib +from dataclasses import dataclass, field +from typing import Any + +from decnet.bus import protocol +from decnet.bus.base import Event, matches +from decnet.logging import get_logger + +log = get_logger("bus.server") + +_SOCKET_MODE = 0o660 +_DEFAULT_GROUP = "decnet" +_OUTBOUND_QUEUE_SIZE = 1024 + + +@dataclass(eq=False) +class _Connection: + """Per-connection server state.""" + + writer: asyncio.StreamWriter + peer_name: str = "" + patterns: set[str] = field(default_factory=set) + outbound: asyncio.Queue[bytes] = field( + default_factory=lambda: asyncio.Queue(maxsize=_OUTBOUND_QUEUE_SIZE) + ) + closed: bool = False + + +class BusServer: + """Serve a UNIX-socket bus on *socket_path*. + + Lifecycle: construct → :meth:`start` → :meth:`serve_forever` (or rely + on :meth:`start` returning once bound) → :meth:`close` for teardown. + Safe to :meth:`close` multiple times. + """ + + def __init__( + self, + socket_path: pathlib.Path | str, + *, + group: str | None = _DEFAULT_GROUP, + mode: int = _SOCKET_MODE, + ) -> None: + self._path = pathlib.Path(socket_path) + self._group = group + self._mode = mode + self._server: asyncio.base_events.Server | None = None + self._connections: set[_Connection] = set() + self._closed = False + + # ─── Lifecycle ────────────────────────────────────────────────────────── + + async def start(self) -> None: + """Bind the socket and begin accepting connections. + + Removes any stale socket file at *socket_path* first (common case: + the previous worker crashed without cleaning up). The parent + directory must already exist; we do NOT create it blindly because + the chosen directory (typically ``/run/decnet``) may require + systemd ``RuntimeDirectory=`` to set up. + """ + if self._server is not None: + return + + parent = self._path.parent + if not parent.exists(): + raise FileNotFoundError( + f"bus socket parent directory {parent} does not exist; " + f"create it with systemd RuntimeDirectory= or mkdir" + ) + + # Clean up a stale socket from a previous crash. If a live server + # is actually listening there, ``bind()`` below will fail — we do + # not try to detect live vs. stale ourselves. + with contextlib.suppress(FileNotFoundError): + if self._path.is_socket(): + self._path.unlink() + + self._server = await asyncio.start_unix_server( + self._handle_connection, path=str(self._path), + ) + _chmod_and_chown(self._path, self._mode, self._group) + log.info("bus.server: listening on %s (mode=%o group=%s)", + self._path, self._mode, self._group or "") + + async def serve_forever(self) -> None: + if self._server is None: + raise RuntimeError("BusServer not started") + async with self._server: + await self._server.serve_forever() + + async def close(self) -> None: + if self._closed: + return + self._closed = True + + if self._server is not None: + self._server.close() + with contextlib.suppress(Exception): + await self._server.wait_closed() + self._server = None + + # Drain every live connection. + for conn in list(self._connections): + await self._close_connection(conn) + self._connections.clear() + + with contextlib.suppress(FileNotFoundError): + self._path.unlink() + log.info("bus.server: closed") + + # ─── Internal publish fan-out ─────────────────────────────────────────── + + async def publish(self, topic: str, payload: dict[str, Any], event_type: str = "") -> None: + """Server-side publish helper — used by the worker to emit + ``system.bus.health`` heartbeats without opening a client loop.""" + event = Event(topic=topic, payload=payload, type=event_type) + self._fanout(event) + + # ─── Connection handler ───────────────────────────────────────────────── + + async def _handle_connection( + self, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + ) -> None: + conn = _Connection(writer=writer) + self._connections.add(conn) + writer_task = asyncio.create_task(self._writer_loop(conn)) + try: + await self._reader_loop(conn, reader) + except protocol.ProtocolError as exc: + log.warning("bus.server: protocol error from %s: %s", conn.peer_name, exc) + except (asyncio.IncompleteReadError, ConnectionError) as exc: + log.debug("bus.server: %s disconnected: %s", conn.peer_name, exc) + except Exception: # pragma: no cover - defensive + log.exception("bus.server: unhandled error in connection") + finally: + await self._close_connection(conn) + self._connections.discard(conn) + writer_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await writer_task + + async def _reader_loop( + self, conn: _Connection, reader: asyncio.StreamReader, + ) -> None: + while True: + frame = await protocol.read_frame(reader) + if frame is None: + return + await self._dispatch(conn, frame) + if frame.verb == protocol.BYE: + return + + async def _dispatch(self, conn: _Connection, frame: protocol.Frame) -> None: + if frame.verb == protocol.HELLO: + conn.peer_name = frame.args or conn.peer_name + log.debug("bus.server: HELLO from %s", conn.peer_name) + return + if frame.verb == protocol.SUB: + pattern = frame.args + if not pattern: + raise protocol.ProtocolError("SUB requires a pattern") + conn.patterns.add(pattern) + log.debug("bus.server: %s SUB %s", conn.peer_name, pattern) + return + if frame.verb == protocol.UNSUB: + conn.patterns.discard(frame.args) + return + if frame.verb == protocol.PUB: + topic = frame.args + if not topic: + raise protocol.ProtocolError("PUB requires a topic") + data = protocol.decode_body(frame.body) if frame.body else {} + event = Event( + topic=topic, + payload=data.get("payload", {}) or {}, + type=data.get("type", "") or "", + ) + self._fanout(event, origin=conn) + return + if frame.verb == protocol.BYE: + return + # EVT is server-to-client only; receiving one is a protocol violation. + raise protocol.ProtocolError(f"unexpected verb {frame.verb!r} from client") + + def _fanout(self, event: Event, *, origin: _Connection | None = None) -> None: + """Enqueue *event* as an EVT frame on every matching connection. + + We do NOT deliver back to the originating connection (a publisher + does not receive its own event). Encoding happens once per event, + not once per subscriber. + """ + try: + frame_bytes = protocol.encode( + protocol.EVT, args=event.topic, body=event.to_dict(), + ) + except protocol.ProtocolError: + log.exception("bus.server: failed to encode EVT for topic=%s", event.topic) + return + + for conn in self._connections: + if conn is origin or conn.closed: + continue + if not any(matches(p, event.topic) for p in conn.patterns): + continue + _enqueue_drop_oldest(conn.outbound, frame_bytes, event.topic) + + async def _writer_loop(self, conn: _Connection) -> None: + """Serialize writes onto *conn*'s socket. + + One writer task per connection so a slow peer only blocks its own + queue, not the fan-out loop. The queue is bounded with drop-oldest + policy applied at enqueue time (see :func:`_enqueue_drop_oldest`). + """ + try: + while not conn.closed: + data = await conn.outbound.get() + conn.writer.write(data) + await conn.writer.drain() + except (ConnectionError, BrokenPipeError): + log.debug("bus.server: %s writer: peer closed", conn.peer_name) + except asyncio.CancelledError: + pass + except Exception: # pragma: no cover - defensive + log.exception("bus.server: writer loop crashed for %s", conn.peer_name) + + async def _close_connection(self, conn: _Connection) -> None: + if conn.closed: + return + conn.closed = True + with contextlib.suppress(Exception): + conn.writer.close() + await conn.writer.wait_closed() + + +# ─── Helpers ───────────────────────────────────────────────────────────────── + +def _chmod_and_chown(path: pathlib.Path, mode: int, group: str | None) -> None: + """Apply socket file perms and best-effort group ownership. + + If *group* is ``None`` or the named group does not exist, we leave the + socket owned by the current process group. This keeps the server + usable on dev boxes that don't have a ``decnet`` group set up. + """ + try: + os.chmod(path, mode) + except OSError as exc: + log.warning("bus.server: chmod(%s, %o) failed: %s", path, mode, exc) + + if not group: + return + try: + gid = grp.getgrnam(group).gr_gid + except KeyError: + log.debug("bus.server: group %r not found, leaving socket group unchanged", group) + return + try: + os.chown(path, -1, gid) + except PermissionError: + # Dev box running as an unprivileged user can't chown. Log once at + # debug and move on — the socket is still usable by the owner. + log.debug("bus.server: chown(%s, gid=%d) denied; leaving as-is", path, gid) + except OSError as exc: + log.warning("bus.server: chown(%s, gid=%d) failed: %s", path, gid, exc) + + +def _enqueue_drop_oldest( + queue: "asyncio.Queue[bytes]", data: bytes, topic: str, +) -> None: + """Drop-oldest backpressure — mirrors :func:`decnet.bus.fake._enqueue_drop_oldest`.""" + while True: + try: + queue.put_nowait(data) + return + except asyncio.QueueFull: + try: + queue.get_nowait() + log.warning("bus.server: subscriber queue full, dropped event topic=%s", topic) + except asyncio.QueueEmpty: + return diff --git a/decnet/bus/worker.py b/decnet/bus/worker.py new file mode 100644 index 00000000..bbefaf65 --- /dev/null +++ b/decnet/bus/worker.py @@ -0,0 +1,121 @@ +"""``decnet bus`` worker entrypoint. + +Starts a :class:`~decnet.bus.unix_server.BusServer` on the configured UNIX +socket and serves forever, emitting a ``system.bus.health`` heartbeat on +its own bus every :data:`HEARTBEAT_INTERVAL_SEC` seconds so liveness-aware +consumers (dashboards, watchdogs) can tell the bus is up without polling +the filesystem. + +Cross-host federation is **out of scope** for the MVP; each host runs its +own bus independently. See DEBT-029 for the deferred ``--bridge-tcp`` +mode that would proxy the socket over the swarm mTLS channel. +""" +from __future__ import annotations + +import asyncio +import os +import pathlib +import signal +import time + +from decnet.bus import topics +from decnet.bus.unix_server import BusServer +from decnet.logging import get_logger + +log = get_logger("bus.worker") + +HEARTBEAT_INTERVAL_SEC = 10 + + +async def bus_worker( + socket_path: str | pathlib.Path, + *, + group: str | None = "decnet", + heartbeat_interval: int = HEARTBEAT_INTERVAL_SEC, +) -> None: + """Run the bus server until cancelled or SIGTERM/SIGINT is received. + + The parent directory of *socket_path* must already exist (systemd's + ``RuntimeDirectory=decnet`` handles this in prod; dev code is expected + to ``mkdir`` first). This function does not create it implicitly + because the right choice of perms/owner depends on the deployment + context. + """ + path = pathlib.Path(socket_path) + _ensure_parent(path) + + server = BusServer(path, group=group) + await server.start() + log.info("bus.worker: pid=%d socket=%s", os.getpid(), path) + + stop_event = asyncio.Event() + _install_signal_handlers(stop_event) + + heartbeat_task = asyncio.create_task(_heartbeat_loop(server, heartbeat_interval)) + serve_task = asyncio.create_task(server.serve_forever()) + + try: + await stop_event.wait() + log.info("bus.worker: shutdown signal received") + finally: + heartbeat_task.cancel() + serve_task.cancel() + for task in (heartbeat_task, serve_task): + try: + await task + except (asyncio.CancelledError, Exception): # noqa: BLE001 - draining shutdown + pass + await server.close() + log.info("bus.worker: stopped") + + +async def _heartbeat_loop(server: BusServer, interval: int) -> None: + """Publish ``system.bus.health`` on the server's own fan-out.""" + started_at = time.time() + while True: + try: + await server.publish( + topics.system(topics.SYSTEM_BUS_HEALTH), + { + "pid": os.getpid(), + "uptime_sec": round(time.time() - started_at, 3), + "ts": time.time(), + }, + event_type=topics.SYSTEM_BUS_HEALTH, + ) + except Exception: # pragma: no cover - heartbeat must never kill the worker + log.exception("bus.worker: heartbeat publish failed") + await asyncio.sleep(interval) + + +def _install_signal_handlers(stop_event: asyncio.Event) -> None: + loop = asyncio.get_running_loop() + for sig in (signal.SIGTERM, signal.SIGINT): + try: + loop.add_signal_handler(sig, stop_event.set) + except (NotImplementedError, RuntimeError): + # add_signal_handler is not supported on Windows / in some + # test harnesses where the loop is running in a non-main thread. + # The worker still exits via KeyboardInterrupt bubbling up. + pass + + +def _ensure_parent(path: pathlib.Path) -> None: + parent = path.parent + if parent.exists(): + return + # Dev-box convenience: if the parent is the user's ``~/.decnet`` dir, + # create it. We do not auto-mkdir ``/run/decnet`` — that's systemd's job + # and silently creating it as the wrong user would cause permission + # confusion later. + home_prefix = pathlib.Path.home() / ".decnet" + try: + parent.relative_to(home_prefix.parent) + except ValueError: + raise FileNotFoundError( + f"bus socket parent {parent} does not exist; create it first" + ) + parent.mkdir(parents=True, exist_ok=True) + + +__all__ = ["bus_worker", "HEARTBEAT_INTERVAL_SEC"] diff --git a/decnet/cli/__init__.py b/decnet/cli/__init__.py index fc1d3bcc..493db82e 100644 --- a/decnet/cli/__init__.py +++ b/decnet/cli/__init__.py @@ -21,6 +21,7 @@ import typer from . import ( agent, api, + bus, db, deploy, forwarder, @@ -51,7 +52,7 @@ for _mod in ( swarm, deploy, lifecycle, workers, inventory, web, profiler, sniffer, db, - topology, + topology, bus, ): _mod.register(app) diff --git a/decnet/cli/bus.py b/decnet/cli/bus.py new file mode 100644 index 00000000..5a29dd91 --- /dev/null +++ b/decnet/cli/bus.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +import typer + +from . import utils as _utils +from .utils import console, log + + +def register(app: typer.Typer) -> None: + @app.command(name="bus") + def bus_cmd( + socket_path: str = typer.Option( + None, "--socket", "-s", + help="UNIX socket path (defaults to DECNET_BUS_SOCKET env var, " + "then /run/decnet/bus.sock, then ~/.decnet/bus.sock).", + ), + group: str = typer.Option( + "decnet", "--group", "-g", + help="POSIX group to chown the socket to (falls back to process " + "group if the named group does not exist).", + ), + heartbeat: int = typer.Option( + 10, "--heartbeat", "-H", + help="Seconds between system.bus.health heartbeat events.", + ), + daemon: bool = typer.Option(False, "--daemon", "-d", help="Detach to background as a daemon process."), + ) -> None: + """Run the DECNET ServiceBus worker (host-local UNIX-socket pub/sub).""" + import asyncio + from decnet.bus.factory import _default_socket_path + from decnet.bus.worker import bus_worker + + resolved = socket_path or _default_socket_path() + + if daemon: + log.info("bus daemonizing socket=%s", resolved) + _utils._daemonize() + + log.info("bus starting socket=%s group=%s heartbeat=%ds", resolved, group, heartbeat) + console.print(f"[bold cyan]Bus starting[/] (socket: {resolved}, heartbeat: {heartbeat}s)") + + try: + asyncio.run(bus_worker(resolved, group=group, heartbeat_interval=heartbeat)) + except KeyboardInterrupt: + console.print("\n[yellow]Bus stopped.[/]") diff --git a/decnet/env.py b/decnet/env.py index 2055f014..65142467 100644 --- a/decnet/env.py +++ b/decnet/env.py @@ -131,6 +131,15 @@ DECNET_DISALLOW_MASTER: bool = ( os.environ.get("DECNET_DISALLOW_MASTER", "true").lower() == "true" ) +# ServiceBus — host-local UNIX-socket pub/sub. Workers consume via +# ``decnet.bus.factory.get_bus()``. Disabled → NullBus (publishes drop, +# subscriptions yield nothing) so dev environments without a bus daemon +# can still boot. See DEBT-029 for the MVP design. +DECNET_BUS_ENABLED: bool = os.environ.get("DECNET_BUS_ENABLED", "true").lower() != "false" +DECNET_BUS_TYPE: str = os.environ.get("DECNET_BUS_TYPE", "unix").lower() +DECNET_BUS_SOCKET: Optional[str] = os.environ.get("DECNET_BUS_SOCKET") +DECNET_BUS_GROUP: str = os.environ.get("DECNET_BUS_GROUP", "decnet") + # Tracing — set to "true" to enable OpenTelemetry distributed tracing. # Separate from DECNET_DEVELOPER so tracing can be toggled independently. DECNET_DEVELOPER_TRACING: bool = os.environ.get("DECNET_DEVELOPER_TRACING", "").lower() == "true" diff --git a/deploy/decnet-bus.service b/deploy/decnet-bus.service new file mode 100644 index 00000000..80ae38d9 --- /dev/null +++ b/deploy/decnet-bus.service @@ -0,0 +1,43 @@ +[Unit] +Description=DECNET Service Bus (host-local UNIX-socket pub/sub) +Documentation=https://github.com/4nt11/DECNET/wiki/Service-Bus +After=network-online.target +Wants=network-online.target + +[Service] +Type=simple +User=decnet +Group=decnet +WorkingDirectory=/opt/decnet +EnvironmentFile=-/opt/decnet/.env.local +# /run/decnet is created automatically with the RuntimeDirectory= directive +# below (mode 0755, owned by User/Group) and cleaned up on stop. The bus +# socket is placed inside it with 0660 perms so only the decnet group can +# connect. +RuntimeDirectory=decnet +RuntimeDirectoryMode=0755 +ExecStart=/opt/decnet/venv/bin/decnet bus \ + --socket /run/decnet/bus.sock \ + --group decnet + +# No privileged network operations — UNIX-domain socket only. +CapabilityBoundingSet= +AmbientCapabilities= + +# Security Hardening +NoNewPrivileges=yes +ProtectSystem=full +ProtectHome=read-only +PrivateTmp=yes +ProtectKernelTunables=yes +ProtectKernelModules=yes +ProtectControlGroups=yes +RestrictSUIDSGID=yes +LockPersonality=yes +ReadWritePaths=/run/decnet /var/log/decnet + +Restart=on-failure +RestartSec=5 + +[Install] +WantedBy=multi-user.target diff --git a/development/DEBT.md b/development/DEBT.md index 7422e02f..7b62d9af 100644 --- a/development/DEBT.md +++ b/development/DEBT.md @@ -1,6 +1,6 @@ # DECNET — Technical Debt Register -> Last updated: 2026-04-09 — All addressable debt cleared. +> Last updated: 2026-04-21 — All addressable debt cleared. > Severity: 🔴 Critical · 🟠 High · 🟡 Medium · 🟢 Low --- @@ -122,6 +122,48 @@ The bait store and honeypot files are hardcoded. A dynamic injection framework s The deploy endpoint exercises Docker Compose orchestration via `decnet.engine.deploy`, which creates MACVLAN/IPvlan networks and runs `docker compose up`. Meaningful tests require mocking the entire Docker SDK + subprocess layer, coupling tightly to implementation details. **Status:** Deferred — test after Docker-in-Docker CI is available. +### DEBT-029 — Service-wide pub/sub bus worker (`decnet bus`) ✅ RESOLVED +**Files:** `decnet/bus/` (`worker.py`, `factory.py`, `unix_client.py`, `unix_server.py`, `protocol.py`, `fake.py`, `base.py`, `topics.py`), `decnet/cli/bus.py`, `deploy/decnet-bus.service`, `tests/bus/` (62 tests green). + +`CLAUDE.md` promises a `ServiceBus` worker and a `get_bus()` factory, but neither exists. Today there is no event plumbing between workers: mutator, correlator, profiler, sniffer, and prober cannot publish state transitions to interested consumers. The web SSE endpoint (`/stream`) polls the DB every ~1s inside its generator loop as a result. Downstream features that need this infrastructure: live topology mutations (DEBT-030), pulsating/live topology visualization, automatic mutations, network traffic simulation, attacker-pool push updates. + +MVP scope (**host-local**): +1. `decnet bus` long-running worker, systemd-supervised like every other worker. Runs on every host — master and each swarm agent — independently. +2. Transport: **UNIX-domain socket** (default `/run/decnet/bus.sock`, fallback `~/.decnet/bus.sock` in dev). Kernel-authenticated peer delivery; authorization is socket file permissions (0660, group=`decnet`). No TCP, no mTLS, no external broker. +3. Wire protocol: tiny hand-rolled framing — 1 ASCII verb line (`PUB `, `SUB `, `EVT `, `HELLO`, `BYE`) + 4-byte big-endian body length + orjson body. Shared `matches(pattern, topic)` helper implements NATS-style wildcards (`*` = one token, `>` = one-or-more trailing tokens). +4. Factory `get_bus()` returns a client with `publish(topic, payload)` / `subscribe(pattern) -> Subscription` (async ctx + async iterator). In-process `FakeBus` for unit tests; `NullBus` when `DECNET_BUS_ENABLED=false`. +5. Topic hierarchy locked early: `topology.{id}.mutation.{state}`, `topology.{id}.status`, `decky.{id}.state`, `decky.{id}.traffic`, `attacker.observed`, `system.log`, `system.bus.health`. +6. Delivery semantics: **at-most-once, fire-and-forget**. Per-subscriber bounded queue with drop-oldest on overflow. No replay, no persistence, no queue groups, no ordering guarantees. DB remains the source of truth; the bus is the notification layer only. +7. First consumer proving end-to-end: SSE route for topology events (DEBT-030). +8. Later: migrate `/stream` off its internal poll loop onto the bus for global events. + +**Cross-host federation is out of MVP scope.** Each host runs its own bus — swarm agents and the master do not share a bus substrate. If a use case emerges that requires cross-host pub/sub, it will land as a `decnet bus --bridge-tcp` mode that proxies the UNIX socket over the existing swarm mTLS infra. DEBT-030 is master-only and therefore unblocked by this deferral. + +**Status:** ✅ Resolved — MVP shipped. Host-local UNIX-socket bus, `get_bus()` factory, `decnet bus` worker with heartbeats, systemd unit, 62 unit/integration tests green. DEBT-030 is now unblocked. + +### DEBT-030 — Live (hot) topology mutations via web UI +**Files:** `decnet/web/router/topology/api_mutations.py` (enqueue endpoint already exists), `decnet/mutator/engine.py` + `ops.py` (reconciler already applies all 7 ops), `web/src/hooks/useMazeApi.ts` (missing enqueue methods), `web/src/components/MazeNET.tsx` (editor treats every topology as pending). + +**Backend is already there:** +- `TopologyMutation` table (`decnet/web/db/models.py:322-358`) supports `add_lan`, `remove_lan`, `attach_decky`, `detach_decky`, `remove_decky`, `update_decky`, `update_lan`. +- `POST /topologies/{id}/mutations` enqueues, gated to `active|degraded`. +- Mutator watch loop (`decnet/mutator/engine.py:136-190`) claims atomically, dispatches to `ops.py`, does Docker best-effort, flips topology to `degraded` on failure. + +**Gap is entirely in the frontend + event delivery:** +1. `useMazeApi.ts` has no `enqueueMutation()` peer to `deployTopology()`; editor edits on `active` topologies currently no-op / 4xx. +2. No mutation-status UI (pending / applying / applied / failed badges, audit log). +3. No server→client push channel for mutation state transitions — depends on DEBT-029. + +**Design (agreed):** +- **Staged buffer** (client-side, Zustand, not persisted): every editor action pushes a `TopologyMutation` onto `pendingOps[]`. Undo = pop. Reset = clear. +- **Apply (N changes)** button opens a diff modal rendering ops in plain English, then POSTs the batch. Batch carries the `topology.version` observed when staging began; server returns 409 on drift. +- **Batch atomicity = honest partial.** Server enqueues N rows in order; mutator applies one-by-one. If op 3 fails, 1-2 stay applied, topology flips to `degraded`, user decides to fix-forward or enqueue a manual revert. (Docker ops aren't transactional; pretending otherwise causes worse bugs than honesty.) +- **Visual states compose** per existing rule: `pending-mutation`, `applying`, `failed` layer on top of `running / inactive / selected`, never replace them. +- **Push via SSE over the bus** (not polling): new route `GET /api/v1/topologies/{id}/events` subscribes to `topology.{id}.*` on the service bus and forwards as SSE. Envelope: `{v, type, ts, payload}`. Day-one event types: `mutation.enqueued|applying|applied|failed`, `topology.status_changed`, `topology.version_bumped`. Room to grow: `decky.state_changed`, `decky.traffic`, `attacker.observed`. +- **Separate from `/stream`** deliberately: different auth scopes, different fan-out shape (per-topology vs global), different failure isolation. Two routes, one bus. + +**Status:** Deferred — blocked on DEBT-029 (bus worker). Once the bus exists, this is ~1-2 days for route + frontend + tests. + --- ## 🟢 Low @@ -176,6 +218,8 @@ The deploy endpoint exercises Docker Compose orchestration via `decnet.engine.de | DEBT-026 | 🟡 Medium | Features | deferred (out of scope) | | DEBT-027 | 🟡 Medium | Features | deferred (out of scope) | | DEBT-028 | 🟡 Medium | Testing | deferred (needs DinD CI) | +| DEBT-029 | 🟡 Medium | Architecture / Bus | ✅ resolved | +| DEBT-030 | 🟡 Medium | Web / Live mutations | deferred (unblocked) | -**Remaining open:** DEBT-011 (Alembic), DEBT-023 (image pinning), DEBT-026 (modular mailboxes), DEBT-027 (Dynamic bait store), DEBT-028 (deploy endpoint tests) -**Estimated remaining effort:** ~12 hours +**Remaining open:** DEBT-011 (Alembic), DEBT-023 (image pinning), DEBT-026 (modular mailboxes), DEBT-027 (Dynamic bait store), DEBT-028 (deploy endpoint tests), DEBT-029 (service bus worker), DEBT-030 (live topology mutations) +**Estimated remaining effort:** ~12 hours + ~3 days for DEBT-029/030 diff --git a/tests/bus/__init__.py b/tests/bus/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/bus/conftest.py b/tests/bus/conftest.py new file mode 100644 index 00000000..c41e0485 --- /dev/null +++ b/tests/bus/conftest.py @@ -0,0 +1,59 @@ +"""Shared fixtures for decnet.bus tests.""" +from __future__ import annotations + +import asyncio +import pathlib +from typing import AsyncIterator + +import pytest +import pytest_asyncio + +from decnet.bus.fake import FakeBus +from decnet.bus.unix_client import UnixSocketBus +from decnet.bus.unix_server import BusServer + + +@pytest_asyncio.fixture +async def fake_bus() -> AsyncIterator[FakeBus]: + bus = FakeBus() + await bus.connect() + try: + yield bus + finally: + await bus.close() + + +@pytest_asyncio.fixture +async def unix_bus(tmp_path: pathlib.Path) -> AsyncIterator[tuple[BusServer, UnixSocketBus]]: + """Spin a BusServer on a tmp socket, yield (server, connected client). + + Teardown closes both in the right order. No privileged group chown — + the fixture passes ``group=None`` so the socket stays owned by the + test-runner's process group. + """ + sock = tmp_path / "bus.sock" + server = BusServer(sock, group=None) + await server.start() + serve_task = asyncio.create_task(server.serve_forever()) + + client = UnixSocketBus(sock, client_name="test-client") + await client.connect() + + try: + yield server, client + finally: + await client.close() + serve_task.cancel() + try: + await serve_task + except asyncio.CancelledError: + pass + await server.close() + + +@pytest.fixture +def bus_env_fake(monkeypatch: pytest.MonkeyPatch) -> None: + """Point :func:`decnet.bus.factory.get_bus` at the in-process FakeBus.""" + monkeypatch.setenv("DECNET_BUS_TYPE", "fake") + monkeypatch.setenv("DECNET_BUS_ENABLED", "true") + monkeypatch.delenv("DECNET_BUS_SOCKET", raising=False) diff --git a/tests/bus/test_base.py b/tests/bus/test_base.py new file mode 100644 index 00000000..503da157 --- /dev/null +++ b/tests/bus/test_base.py @@ -0,0 +1,66 @@ +"""Unit tests for :mod:`decnet.bus.base` — wildcard matching and the Event envelope.""" +from __future__ import annotations + +import pytest + +from decnet.bus.base import EVENT_SCHEMA_VERSION, Event, matches + + +class TestMatches: + @pytest.mark.parametrize("pattern,topic", [ + ("topology.abc.mutation.applied", "topology.abc.mutation.applied"), + ("topology.*.mutation.applied", "topology.abc.mutation.applied"), + ("topology.*.mutation.*", "topology.abc.mutation.applied"), + ("topology.>", "topology.abc.mutation.applied"), + ("topology.>", "topology.abc.status"), + ("decky.*.state", "decky.xyz.state"), + ("system.bus.health", "system.bus.health"), + ]) + def test_matches_positive(self, pattern: str, topic: str) -> None: + assert matches(pattern, topic) is True + + @pytest.mark.parametrize("pattern,topic", [ + ("topology.abc.mutation.applied", "topology.abc.mutation.failed"), + ("topology.*", "topology.abc.mutation.applied"), # * is one token + ("topology.>", "topology"), # > needs ≥1 trailing + ("decky.*.state", "decky.state"), # missing middle token + ("decky.*.state", "decky.xyz.status"), + ("a.b.c", "a.b"), + ("a.b", "a.b.c"), + ]) + def test_matches_negative(self, pattern: str, topic: str) -> None: + assert matches(pattern, topic) is False + + +class TestEvent: + def test_to_dict_round_trip(self) -> None: + event = Event(topic="topology.abc.status", payload={"status": "active"}, type="status") + data = event.to_dict() + assert data["v"] == EVENT_SCHEMA_VERSION + assert data["topic"] == "topology.abc.status" + assert data["payload"] == {"status": "active"} + assert data["type"] == "status" + assert isinstance(data["id"], str) + assert isinstance(data["ts"], float) + + def test_from_dict_prefers_wire_fields_but_ignores_topic(self) -> None: + # The wire topic is the authoritative one (passed from the transport); + # a malicious "topic" field in the body must be ignored. + data = { + "v": 1, "id": "abc", "type": "status", + "topic": "attacker.spoofed", # ignored + "ts": 123.0, + "payload": {"x": 1}, + } + event = Event.from_dict("topology.abc.status", data) + assert event.topic == "topology.abc.status" + assert event.payload == {"x": 1} + assert event.id == "abc" + assert event.ts == 123.0 + + def test_from_dict_tolerates_missing_fields(self) -> None: + event = Event.from_dict("system.log", {}) + assert event.topic == "system.log" + assert event.payload == {} + assert event.v == EVENT_SCHEMA_VERSION + assert event.id # auto-generated diff --git a/tests/bus/test_factory.py b/tests/bus/test_factory.py new file mode 100644 index 00000000..aef5a791 --- /dev/null +++ b/tests/bus/test_factory.py @@ -0,0 +1,52 @@ +"""Tests for :func:`decnet.bus.factory.get_bus` dispatch.""" +from __future__ import annotations + +import pathlib + +import pytest + +from decnet.bus.factory import _default_socket_path, get_bus +from decnet.bus.fake import FakeBus, NullBus +from decnet.bus.unix_client import UnixSocketBus + + +def test_disabled_returns_null_bus(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("DECNET_BUS_ENABLED", "false") + monkeypatch.setenv("DECNET_BUS_TYPE", "unix") # ignored when disabled + bus = get_bus() + assert isinstance(bus, NullBus) + + +def test_fake_dispatch(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("DECNET_BUS_ENABLED", "true") + monkeypatch.setenv("DECNET_BUS_TYPE", "fake") + bus = get_bus() + assert isinstance(bus, FakeBus) + + +def test_unix_dispatch(monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path) -> None: + monkeypatch.setenv("DECNET_BUS_ENABLED", "true") + monkeypatch.setenv("DECNET_BUS_TYPE", "unix") + monkeypatch.setenv("DECNET_BUS_SOCKET", str(tmp_path / "b.sock")) + bus = get_bus() + assert isinstance(bus, UnixSocketBus) + + +def test_unknown_type_raises(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("DECNET_BUS_ENABLED", "true") + monkeypatch.setenv("DECNET_BUS_TYPE", "mqtt") + with pytest.raises(ValueError, match="Unsupported bus type"): + get_bus() + + +def test_default_socket_path_honors_env(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("DECNET_BUS_SOCKET", "/tmp/explicit.sock") + assert _default_socket_path() == "/tmp/explicit.sock" + + +def test_default_socket_path_falls_back_to_home(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("DECNET_BUS_SOCKET", raising=False) + # Force /run/decnet to look unusable. + monkeypatch.setattr("os.path.isdir", lambda p: False) + path = _default_socket_path() + assert path.endswith(".decnet/bus.sock") diff --git a/tests/bus/test_fake_bus.py b/tests/bus/test_fake_bus.py new file mode 100644 index 00000000..41441168 --- /dev/null +++ b/tests/bus/test_fake_bus.py @@ -0,0 +1,108 @@ +"""Tests for :class:`decnet.bus.fake.FakeBus` and :class:`NullBus`.""" +from __future__ import annotations + +import asyncio + +import pytest + +from decnet.bus.fake import FakeBus, NullBus + + +async def _collect(sub, n: int, timeout: float = 1.0) -> list: + out = [] + try: + async with asyncio.timeout(timeout): + async for event in sub: + out.append(event) + if len(out) >= n: + break + except TimeoutError: + pass + return out + + +class TestFakeBus: + async def test_publish_delivers_to_exact_match(self, fake_bus: FakeBus) -> None: + sub = fake_bus.subscribe("topology.abc.status") + async with sub: + await fake_bus.publish("topology.abc.status", {"status": "active"}) + events = await _collect(sub, 1) + assert len(events) == 1 + assert events[0].payload == {"status": "active"} + + async def test_publish_delivers_to_wildcard(self, fake_bus: FakeBus) -> None: + sub = fake_bus.subscribe("topology.*.mutation.*") + async with sub: + await fake_bus.publish("topology.t1.mutation.applied", {"id": 1}) + await fake_bus.publish("topology.t2.mutation.failed", {"id": 2}) + await fake_bus.publish("decky.x.state", {"state": "running"}) # should not match + events = await _collect(sub, 2) + assert len(events) == 2 + assert {e.payload["id"] for e in events} == {1, 2} + + async def test_multiple_subscribers_each_get_copy(self, fake_bus: FakeBus) -> None: + sub_a = fake_bus.subscribe("topology.>") + sub_b = fake_bus.subscribe("topology.>") + async with sub_a, sub_b: + await fake_bus.publish("topology.abc.status", {"status": "active"}) + a = await _collect(sub_a, 1) + b = await _collect(sub_b, 1) + assert len(a) == 1 + assert len(b) == 1 + + async def test_subscription_close_unblocks_iter(self, fake_bus: FakeBus) -> None: + sub = fake_bus.subscribe("topology.>") + + async def consume() -> list: + out = [] + async for event in sub: + out.append(event) + return out + + task = asyncio.create_task(consume()) + await asyncio.sleep(0.01) # let task block on queue.get() + await sub.aclose() + events = await asyncio.wait_for(task, timeout=0.5) + assert events == [] + + async def test_close_is_idempotent(self, fake_bus: FakeBus) -> None: + await fake_bus.close() + await fake_bus.close() # second call must not raise + + async def test_publish_on_closed_raises(self, fake_bus: FakeBus) -> None: + await fake_bus.close() + with pytest.raises(RuntimeError): + await fake_bus.publish("x", {}) + with pytest.raises(RuntimeError): + fake_bus.subscribe("x") + + async def test_backpressure_drops_oldest(self) -> None: + bus = FakeBus(queue_size=2) + await bus.connect() + try: + sub = bus.subscribe("t") + # Don't consume; publish 5 — queue holds at most 2, oldest dropped. + for i in range(5): + await bus.publish("t", {"i": i}) + events = await _collect(sub, 2, timeout=0.2) + assert len(events) == 2 + # We kept the 2 most recent. + assert events[-1].payload["i"] == 4 + finally: + await bus.close() + + +class TestNullBus: + async def test_publish_is_noop(self) -> None: + bus = NullBus() + await bus.connect() + await bus.publish("anything", {"x": 1}) + await bus.close() + + async def test_subscribe_yields_nothing(self) -> None: + bus = NullBus() + sub = bus.subscribe("topology.>") + async with sub: + # Iteration must stop immediately. + events = [e async for e in sub] + assert events == [] diff --git a/tests/bus/test_protocol.py b/tests/bus/test_protocol.py new file mode 100644 index 00000000..f013847e --- /dev/null +++ b/tests/bus/test_protocol.py @@ -0,0 +1,87 @@ +"""Tests for the wire protocol framing.""" +from __future__ import annotations + +import asyncio +import struct + +import pytest + +from decnet.bus import protocol + + +def _reader_from(data: bytes) -> asyncio.StreamReader: + reader = asyncio.StreamReader() + reader.feed_data(data) + reader.feed_eof() + return reader + + +async def _read_one(data: bytes) -> protocol.Frame | None: + return await protocol.read_frame(_reader_from(data)) + + +class TestEncodeDecode: + async def test_pub_round_trip(self) -> None: + data = protocol.encode(protocol.PUB, args="topology.abc.status", body={"payload": {"x": 1}}) + frame = await _read_one(data) + assert frame is not None + assert frame.verb == protocol.PUB + assert frame.args == "topology.abc.status" + assert protocol.decode_body(frame.body) == {"payload": {"x": 1}} + + async def test_sub_empty_body(self) -> None: + data = protocol.encode(protocol.SUB, args="topology.*.mutation.*") + frame = await _read_one(data) + assert frame is not None + assert frame.verb == protocol.SUB + assert frame.args == "topology.*.mutation.*" + assert frame.body == b"" + + async def test_bye_no_args(self) -> None: + data = protocol.encode(protocol.BYE) + frame = await _read_one(data) + assert frame is not None + assert frame.verb == protocol.BYE + assert frame.args == "" + assert frame.body == b"" + + async def test_clean_eof_returns_none(self) -> None: + assert await _read_one(b"") is None + + +class TestProtocolErrors: + def test_encode_rejects_unknown_verb(self) -> None: + with pytest.raises(protocol.ProtocolError): + protocol.encode("NOPE", args="x") + + def test_encode_rejects_newline_in_args(self) -> None: + with pytest.raises(protocol.ProtocolError): + protocol.encode(protocol.PUB, args="bad\ntopic") + + def test_encode_rejects_oversized_body(self) -> None: + big = {"payload": {"x": "a" * (protocol.MAX_BODY_BYTES + 1)}} + with pytest.raises(protocol.ProtocolError): + protocol.encode(protocol.PUB, args="t", body=big) + + async def test_decode_rejects_unknown_verb(self) -> None: + bad = b"NOPE x\n" + struct.pack(">I", 0) + with pytest.raises(protocol.ProtocolError): + await _read_one(bad) + + async def test_decode_rejects_oversized_body_length(self) -> None: + bad = b"PUB x\n" + struct.pack(">I", protocol.MAX_BODY_BYTES + 1) + with pytest.raises(protocol.ProtocolError): + await _read_one(bad) + + async def test_decode_rejects_truncated_body(self) -> None: + bad = b"PUB x\n" + struct.pack(">I", 10) + b"short" + with pytest.raises(Exception): # IncompleteReadError bubbles up + await _read_one(bad) + + def test_decode_body_rejects_non_object(self) -> None: + import orjson + with pytest.raises(protocol.ProtocolError): + protocol.decode_body(orjson.dumps([1, 2, 3])) + + def test_decode_body_empty_returns_empty_dict(self) -> None: + assert protocol.decode_body(b"") == {} diff --git a/tests/bus/test_topics.py b/tests/bus/test_topics.py new file mode 100644 index 00000000..02c4cdbe --- /dev/null +++ b/tests/bus/test_topics.py @@ -0,0 +1,42 @@ +"""Tests for the topic hierarchy builders.""" +from __future__ import annotations + +import pytest + +from decnet.bus import topics + + +def test_topology_mutation_builder() -> None: + topic = topics.topology_mutation("abc123", topics.MUTATION_APPLIED) + assert topic == "topology.abc123.mutation.applied" + + +def test_topology_status_builder() -> None: + assert topics.topology_status("t-1") == "topology.t-1.status" + + +def test_decky_builder() -> None: + assert topics.decky("d-42", topics.DECKY_STATE) == "decky.d-42.state" + assert topics.decky("d-42", topics.DECKY_TRAFFIC) == "decky.d-42.traffic" + + +def test_system_builder_allows_dotted_leaf() -> None: + # system.bus.health has a dot in the leaf — that's intentional and a + # legitimate hierarchy refinement, not a segment violation. + assert topics.system(topics.SYSTEM_BUS_HEALTH) == "system.bus.health" + assert topics.system(topics.SYSTEM_LOG) == "system.log" + + +def test_system_builder_rejects_empty() -> None: + with pytest.raises(ValueError): + topics.system("") + + +@pytest.mark.parametrize("bad", ["", "has.dot", "has*wildcard", "has>wild", "with space", "with\ttab"]) +def test_segment_validation(bad: str) -> None: + with pytest.raises(ValueError): + topics.topology_mutation(bad, topics.MUTATION_APPLIED) + with pytest.raises(ValueError): + topics.topology_status(bad) + with pytest.raises(ValueError): + topics.decky(bad, topics.DECKY_STATE) diff --git a/tests/bus/test_unix_socket_bus.py b/tests/bus/test_unix_socket_bus.py new file mode 100644 index 00000000..ddc48ad5 --- /dev/null +++ b/tests/bus/test_unix_socket_bus.py @@ -0,0 +1,131 @@ +"""End-to-end tests for :class:`UnixSocketBus` against a real :class:`BusServer`. + +These tests run in the dev loop (no pytest marker) because they only need +the tmp filesystem — no Docker, no external broker. +""" +from __future__ import annotations + +import asyncio +import pathlib +import stat + +import pytest + +from decnet.bus.unix_client import UnixSocketBus +from decnet.bus.unix_server import BusServer + + +async def _drain(sub, n: int, timeout: float = 1.5) -> list: + out = [] + try: + async with asyncio.timeout(timeout): + async for event in sub: + out.append(event) + if len(out) >= n: + break + except TimeoutError: + pass + return out + + +class TestEndToEnd: + async def test_pub_sub_exact(self, unix_bus) -> None: + server, client = unix_bus + sub = client.subscribe("topology.abc.status") + # Give the SUB frame a tick to register on the server. + await asyncio.sleep(0.05) + async with sub: + await client.publish("topology.abc.status", {"status": "active"}) + events = await _drain(sub, 1) + # A publisher doesn't see its own events — use a second client. + assert events == [] + + async def test_pub_sub_across_two_clients( + self, tmp_path: pathlib.Path, + ) -> None: + sock = tmp_path / "bus.sock" + server = BusServer(sock, group=None) + await server.start() + serve_task = asyncio.create_task(server.serve_forever()) + + publisher = UnixSocketBus(sock, client_name="publisher") + subscriber = UnixSocketBus(sock, client_name="subscriber") + await publisher.connect() + await subscriber.connect() + + try: + sub = subscriber.subscribe("topology.*.mutation.*") + await asyncio.sleep(0.05) # let SUB register + + async with sub: + await publisher.publish( + "topology.t1.mutation.applied", {"id": 1}, event_type="applied", + ) + await publisher.publish( + "decky.xyz.state", {"state": "running"}, # should not match + ) + await publisher.publish( + "topology.t2.mutation.failed", {"id": 2}, event_type="failed", + ) + events = await _drain(sub, 2) + ids = {e.payload["id"] for e in events} + assert ids == {1, 2} + finally: + await publisher.close() + await subscriber.close() + serve_task.cancel() + try: + await serve_task + except asyncio.CancelledError: + pass + await server.close() + + async def test_socket_file_mode(self, tmp_path: pathlib.Path) -> None: + sock = tmp_path / "bus.sock" + server = BusServer(sock, group=None) + await server.start() + try: + mode = stat.S_IMODE(sock.stat().st_mode) + assert mode == 0o660 + finally: + await server.close() + + async def test_server_close_wakes_subscribers( + self, tmp_path: pathlib.Path, + ) -> None: + sock = tmp_path / "bus.sock" + server = BusServer(sock, group=None) + await server.start() + serve_task = asyncio.create_task(server.serve_forever()) + + client = UnixSocketBus(sock, client_name="watcher") + await client.connect() + sub = client.subscribe("system.>") + await asyncio.sleep(0.05) + + async def consume() -> list: + out = [] + async for event in sub: + out.append(event) + return out + + consumer = asyncio.create_task(consume()) + await asyncio.sleep(0.05) + + serve_task.cancel() + try: + await serve_task + except asyncio.CancelledError: + pass + await server.close() + + # The consumer must unblock within a reasonable time. + events = await asyncio.wait_for(consumer, timeout=1.0) + assert events == [] + await client.close() + + async def test_start_rejects_missing_parent(self, tmp_path: pathlib.Path) -> None: + sock = tmp_path / "nonexistent-dir" / "bus.sock" + server = BusServer(sock, group=None) + with pytest.raises(FileNotFoundError): + await server.start() diff --git a/tests/bus/test_worker.py b/tests/bus/test_worker.py new file mode 100644 index 00000000..8c5b3317 --- /dev/null +++ b/tests/bus/test_worker.py @@ -0,0 +1,68 @@ +"""Tests for :func:`decnet.bus.worker.bus_worker` lifecycle + heartbeat.""" +from __future__ import annotations + +import asyncio +import pathlib + +import pytest + +from decnet.bus import topics +from decnet.bus.unix_client import UnixSocketBus +from decnet.bus.worker import bus_worker + + +class TestBusWorker: + async def test_worker_serves_and_heartbeats( + self, tmp_path: pathlib.Path, + ) -> None: + sock = tmp_path / "bus.sock" + task = asyncio.create_task( + bus_worker(sock, group=None, heartbeat_interval=1), + ) + # Wait for the socket to exist. + for _ in range(40): + if sock.exists(): + break + await asyncio.sleep(0.05) + assert sock.exists(), "bus worker did not create socket" + + client = UnixSocketBus(sock, client_name="hb-watcher") + await client.connect() + sub = client.subscribe(topics.system(topics.SYSTEM_BUS_HEALTH)) + try: + async with sub: + async with asyncio.timeout(3.0): + async for event in sub: + assert event.topic == "system.bus.health" + assert "pid" in event.payload + break + finally: + await client.close() + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + async def test_worker_creates_home_fallback_parent( + self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch, + ) -> None: + # Point Path.home() at tmp_path so the "auto-mkdir ~/.decnet" branch + # activates without touching the real home directory. + monkeypatch.setattr(pathlib.Path, "home", classmethod(lambda cls: tmp_path)) + sock = tmp_path / ".decnet" / "bus.sock" + task = asyncio.create_task( + bus_worker(sock, group=None, heartbeat_interval=60), + ) + try: + for _ in range(40): + if sock.exists(): + break + await asyncio.sleep(0.05) + assert sock.exists() + finally: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass