fix(bus): silently drop publishes on closed bus instead of raising
Worker bus instances (collector, ingester) close their private buses in finally blocks on shutdown, but stream threads holding closure references kept calling publish after close — one `RuntimeError: publish on closed bus` per stream line, caught by publish_safely and logged per call, flooding server logs. Changes: - `UnixSocketBus.publish()` now drops post-close calls. First drop WARNs loudly (bus is critical infra — silent drops would hide real problems); subsequent drops on the same instance log at DEBUG to prevent the flood. Sticky `_closed_publish_warned` flag, reset naturally per new bus instance. - `make_thread_safe_publisher` short-circuits on a closed bus before marshalling a coroutine onto the loop. Avoids the wasted scheduling work in the hot shutdown path. Degradation is safe: callers go through `publish_safely`, which already treats exceptions as 'dropped notification, DB is source of truth.' We just stop manufacturing the exception in the first place for a known-benign condition.
This commit is contained in:
@@ -61,6 +61,14 @@ def make_thread_safe_publisher(
|
||||
return lambda _topic, _payload, _event_type="": None
|
||||
|
||||
def _publish(topic: str, payload: dict[str, Any], event_type: str = "") -> None:
|
||||
# Stream threads may keep draining after the bus owner closed it
|
||||
# (shutdown race). Short-circuit here so we don't marshal a
|
||||
# coroutine onto a dead loop just to have publish_safely swallow
|
||||
# it. bus.publish's own WARN-once guard handles the rare case
|
||||
# where _closed flips between this check and the coroutine
|
||||
# actually running.
|
||||
if getattr(bus, "_closed", False):
|
||||
return
|
||||
try:
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
publish_safely(bus, topic, payload, event_type=event_type),
|
||||
|
||||
@@ -94,6 +94,12 @@ class UnixSocketBus(BaseBus):
|
||||
self._lock = asyncio.Lock()
|
||||
self._write_lock = asyncio.Lock()
|
||||
self._closed = False
|
||||
# Sticky flag: the first publish-on-closed-bus call logs at
|
||||
# WARNING so operators see that a publish was dropped; subsequent
|
||||
# calls on the same instance log at DEBUG only to prevent a
|
||||
# log flood when stream threads drain after close. The bus is
|
||||
# critical infra, so the first warning is non-negotiable.
|
||||
self._closed_publish_warned = False
|
||||
|
||||
# ─── Lifecycle ──────────────────────────────────────────────────────────
|
||||
|
||||
@@ -146,7 +152,21 @@ class UnixSocketBus(BaseBus):
|
||||
event_type: str = "",
|
||||
) -> None:
|
||||
if self._closed:
|
||||
raise RuntimeError("publish on closed bus")
|
||||
# Degrade gracefully: the DB is the source of truth, the bus
|
||||
# is only the notification layer. Raising here made every
|
||||
# caller via publish_safely flood the logs once per stream
|
||||
# line during shutdown races. First drop warns loudly;
|
||||
# subsequent drops on the same instance are DEBUG-only.
|
||||
if not self._closed_publish_warned:
|
||||
self._closed_publish_warned = True
|
||||
log.warning(
|
||||
"bus.client: publish on closed bus dropped topic=%s "
|
||||
"(further drops on this instance logged at DEBUG)",
|
||||
topic,
|
||||
)
|
||||
else:
|
||||
log.debug("bus.client: publish on closed bus dropped topic=%s", topic)
|
||||
return
|
||||
if self._writer is None:
|
||||
await self.connect()
|
||||
body = Event(topic=topic, payload=payload, type=event_type).to_dict()
|
||||
|
||||
103
tests/bus/test_closed_publish.py
Normal file
103
tests/bus/test_closed_publish.py
Normal file
@@ -0,0 +1,103 @@
|
||||
"""Tests for graceful publish-on-closed-bus behaviour.
|
||||
|
||||
Regression guard for the 'publish on closed bus' log flood: when a
|
||||
worker's private bus closes (shutdown) but stream threads keep calling
|
||||
the publish closure, the bus must not raise a RuntimeError per call.
|
||||
First drop warns loudly (bus is critical infra); subsequent drops on
|
||||
the same instance are DEBUG to prevent the flood.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import pathlib
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from decnet.bus.publish import make_thread_safe_publisher
|
||||
from decnet.bus.unix_client import UnixSocketBus
|
||||
|
||||
|
||||
def _make_closed_bus() -> UnixSocketBus:
|
||||
"""Build a UnixSocketBus and flip _closed without touching sockets.
|
||||
|
||||
We don't need a live connection to test the closed-publish path —
|
||||
the guard clause short-circuits before any I/O.
|
||||
"""
|
||||
bus = UnixSocketBus(pathlib.Path("/tmp/does-not-matter.sock"))
|
||||
bus._closed = True
|
||||
return bus
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_publish_on_closed_bus_returns_silently(
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""First post-close publish warns loudly; does not raise."""
|
||||
bus = _make_closed_bus()
|
||||
with caplog.at_level(logging.WARNING, logger="decnet.bus.client"):
|
||||
await bus.publish("system.log", {"x": 1})
|
||||
|
||||
assert any(
|
||||
rec.levelno == logging.WARNING
|
||||
and "publish on closed bus dropped" in rec.getMessage()
|
||||
for rec in caplog.records
|
||||
), f"expected one WARNING, got: {[(r.levelname, r.getMessage()) for r in caplog.records]}"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_subsequent_closed_publishes_downgrade_to_debug(
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Only the first drop warns; the next N drops are DEBUG. This is
|
||||
the regression guard against the log flood."""
|
||||
bus = _make_closed_bus()
|
||||
|
||||
with caplog.at_level(logging.DEBUG, logger="decnet.bus.client"):
|
||||
for _ in range(50):
|
||||
await bus.publish("system.log", {"x": 1})
|
||||
|
||||
warnings = [r for r in caplog.records if r.levelno == logging.WARNING]
|
||||
debugs = [r for r in caplog.records if r.levelno == logging.DEBUG]
|
||||
assert len(warnings) == 1, (
|
||||
f"expected exactly 1 WARNING across 50 publishes, got {len(warnings)}"
|
||||
)
|
||||
assert len(debugs) >= 49, (
|
||||
f"expected >=49 DEBUG drops, got {len(debugs)}"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_thread_safe_publisher_short_circuits_on_closed_bus() -> None:
|
||||
"""The sync shim returned by make_thread_safe_publisher must NOT
|
||||
marshal a coroutine onto the loop when the bus is already closed."""
|
||||
bus = _make_closed_bus()
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
publisher = make_thread_safe_publisher(bus, loop)
|
||||
|
||||
# Patch run_coroutine_threadsafe so we can detect if the shim tries
|
||||
# to marshal anything.
|
||||
import decnet.bus.publish as pub_mod
|
||||
called = MagicMock()
|
||||
orig = asyncio.run_coroutine_threadsafe
|
||||
pub_mod.asyncio.run_coroutine_threadsafe = lambda coro, _loop: (called(), orig(coro, _loop))[1]
|
||||
|
||||
try:
|
||||
publisher("system.log", {"x": 1})
|
||||
publisher("system.log", {"x": 2})
|
||||
publisher("system.log", {"x": 3})
|
||||
finally:
|
||||
pub_mod.asyncio.run_coroutine_threadsafe = orig
|
||||
|
||||
called.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_thread_safe_publisher_noop_when_bus_is_none() -> None:
|
||||
"""A None bus still yields a no-op callable (pre-existing contract)."""
|
||||
loop = asyncio.get_running_loop()
|
||||
publisher = make_thread_safe_publisher(None, loop)
|
||||
# Should not raise, return None.
|
||||
assert publisher("topic", {"x": 1}) is None
|
||||
Reference in New Issue
Block a user