From 4418608a5440cd4391122e595735f4775bd049f8 Mon Sep 17 00:00:00 2001 From: anti Date: Thu, 23 Apr 2026 18:00:47 -0400 Subject: [PATCH] fix(bus): silently drop publishes on closed bus instead of raising MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Worker bus instances (collector, ingester) close their private buses in finally blocks on shutdown, but stream threads holding closure references kept calling publish after close — one `RuntimeError: publish on closed bus` per stream line, caught by publish_safely and logged per call, flooding server logs. Changes: - `UnixSocketBus.publish()` now drops post-close calls. First drop WARNs loudly (bus is critical infra — silent drops would hide real problems); subsequent drops on the same instance log at DEBUG to prevent the flood. Sticky `_closed_publish_warned` flag, reset naturally per new bus instance. - `make_thread_safe_publisher` short-circuits on a closed bus before marshalling a coroutine onto the loop. Avoids the wasted scheduling work in the hot shutdown path. Degradation is safe: callers go through `publish_safely`, which already treats exceptions as 'dropped notification, DB is source of truth.' We just stop manufacturing the exception in the first place for a known-benign condition. --- decnet/bus/publish.py | 8 +++ decnet/bus/unix_client.py | 22 ++++++- tests/bus/test_closed_publish.py | 103 +++++++++++++++++++++++++++++++ 3 files changed, 132 insertions(+), 1 deletion(-) create mode 100644 tests/bus/test_closed_publish.py diff --git a/decnet/bus/publish.py b/decnet/bus/publish.py index dfe3276d..15319cfe 100644 --- a/decnet/bus/publish.py +++ b/decnet/bus/publish.py @@ -61,6 +61,14 @@ def make_thread_safe_publisher( return lambda _topic, _payload, _event_type="": None def _publish(topic: str, payload: dict[str, Any], event_type: str = "") -> None: + # Stream threads may keep draining after the bus owner closed it + # (shutdown race). Short-circuit here so we don't marshal a + # coroutine onto a dead loop just to have publish_safely swallow + # it. bus.publish's own WARN-once guard handles the rare case + # where _closed flips between this check and the coroutine + # actually running. + if getattr(bus, "_closed", False): + return try: asyncio.run_coroutine_threadsafe( publish_safely(bus, topic, payload, event_type=event_type), diff --git a/decnet/bus/unix_client.py b/decnet/bus/unix_client.py index 1c5d1659..226b296a 100644 --- a/decnet/bus/unix_client.py +++ b/decnet/bus/unix_client.py @@ -94,6 +94,12 @@ class UnixSocketBus(BaseBus): self._lock = asyncio.Lock() self._write_lock = asyncio.Lock() self._closed = False + # Sticky flag: the first publish-on-closed-bus call logs at + # WARNING so operators see that a publish was dropped; subsequent + # calls on the same instance log at DEBUG only to prevent a + # log flood when stream threads drain after close. The bus is + # critical infra, so the first warning is non-negotiable. + self._closed_publish_warned = False # ─── Lifecycle ────────────────────────────────────────────────────────── @@ -146,7 +152,21 @@ class UnixSocketBus(BaseBus): event_type: str = "", ) -> None: if self._closed: - raise RuntimeError("publish on closed bus") + # Degrade gracefully: the DB is the source of truth, the bus + # is only the notification layer. Raising here made every + # caller via publish_safely flood the logs once per stream + # line during shutdown races. First drop warns loudly; + # subsequent drops on the same instance are DEBUG-only. + if not self._closed_publish_warned: + self._closed_publish_warned = True + log.warning( + "bus.client: publish on closed bus dropped topic=%s " + "(further drops on this instance logged at DEBUG)", + topic, + ) + else: + log.debug("bus.client: publish on closed bus dropped topic=%s", topic) + return if self._writer is None: await self.connect() body = Event(topic=topic, payload=payload, type=event_type).to_dict() diff --git a/tests/bus/test_closed_publish.py b/tests/bus/test_closed_publish.py new file mode 100644 index 00000000..a16f399c --- /dev/null +++ b/tests/bus/test_closed_publish.py @@ -0,0 +1,103 @@ +"""Tests for graceful publish-on-closed-bus behaviour. + +Regression guard for the 'publish on closed bus' log flood: when a +worker's private bus closes (shutdown) but stream threads keep calling +the publish closure, the bus must not raise a RuntimeError per call. +First drop warns loudly (bus is critical infra); subsequent drops on +the same instance are DEBUG to prevent the flood. +""" +from __future__ import annotations + +import asyncio +import logging +import pathlib +from unittest.mock import MagicMock + +import pytest + +from decnet.bus.publish import make_thread_safe_publisher +from decnet.bus.unix_client import UnixSocketBus + + +def _make_closed_bus() -> UnixSocketBus: + """Build a UnixSocketBus and flip _closed without touching sockets. + + We don't need a live connection to test the closed-publish path — + the guard clause short-circuits before any I/O. + """ + bus = UnixSocketBus(pathlib.Path("/tmp/does-not-matter.sock")) + bus._closed = True + return bus + + +@pytest.mark.asyncio +async def test_publish_on_closed_bus_returns_silently( + caplog: pytest.LogCaptureFixture, +) -> None: + """First post-close publish warns loudly; does not raise.""" + bus = _make_closed_bus() + with caplog.at_level(logging.WARNING, logger="decnet.bus.client"): + await bus.publish("system.log", {"x": 1}) + + assert any( + rec.levelno == logging.WARNING + and "publish on closed bus dropped" in rec.getMessage() + for rec in caplog.records + ), f"expected one WARNING, got: {[(r.levelname, r.getMessage()) for r in caplog.records]}" + + +@pytest.mark.asyncio +async def test_subsequent_closed_publishes_downgrade_to_debug( + caplog: pytest.LogCaptureFixture, +) -> None: + """Only the first drop warns; the next N drops are DEBUG. This is + the regression guard against the log flood.""" + bus = _make_closed_bus() + + with caplog.at_level(logging.DEBUG, logger="decnet.bus.client"): + for _ in range(50): + await bus.publish("system.log", {"x": 1}) + + warnings = [r for r in caplog.records if r.levelno == logging.WARNING] + debugs = [r for r in caplog.records if r.levelno == logging.DEBUG] + assert len(warnings) == 1, ( + f"expected exactly 1 WARNING across 50 publishes, got {len(warnings)}" + ) + assert len(debugs) >= 49, ( + f"expected >=49 DEBUG drops, got {len(debugs)}" + ) + + +@pytest.mark.asyncio +async def test_thread_safe_publisher_short_circuits_on_closed_bus() -> None: + """The sync shim returned by make_thread_safe_publisher must NOT + marshal a coroutine onto the loop when the bus is already closed.""" + bus = _make_closed_bus() + loop = asyncio.get_running_loop() + + publisher = make_thread_safe_publisher(bus, loop) + + # Patch run_coroutine_threadsafe so we can detect if the shim tries + # to marshal anything. + import decnet.bus.publish as pub_mod + called = MagicMock() + orig = asyncio.run_coroutine_threadsafe + pub_mod.asyncio.run_coroutine_threadsafe = lambda coro, _loop: (called(), orig(coro, _loop))[1] + + try: + publisher("system.log", {"x": 1}) + publisher("system.log", {"x": 2}) + publisher("system.log", {"x": 3}) + finally: + pub_mod.asyncio.run_coroutine_threadsafe = orig + + called.assert_not_called() + + +@pytest.mark.asyncio +async def test_thread_safe_publisher_noop_when_bus_is_none() -> None: + """A None bus still yields a no-op callable (pre-existing contract).""" + loop = asyncio.get_running_loop() + publisher = make_thread_safe_publisher(None, loop) + # Should not raise, return None. + assert publisher("topic", {"x": 1}) is None