104 lines
3.5 KiB
Python
104 lines
3.5 KiB
Python
"""Tests for graceful publish-on-closed-bus behaviour.
|
|
|
|
Regression guard for the 'publish on closed bus' log flood: when a
|
|
worker's private bus closes (shutdown) but stream threads keep calling
|
|
the publish closure, the bus must not raise a RuntimeError per call.
|
|
First drop warns loudly (bus is critical infra); subsequent drops on
|
|
the same instance are DEBUG to prevent the flood.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import pathlib
|
|
from unittest.mock import MagicMock
|
|
|
|
import pytest
|
|
|
|
from decnet.bus.publish import make_thread_safe_publisher
|
|
from decnet.bus.unix_client import UnixSocketBus
|
|
|
|
|
|
def _make_closed_bus() -> UnixSocketBus:
|
|
"""Build a UnixSocketBus and flip _closed without touching sockets.
|
|
|
|
We don't need a live connection to test the closed-publish path —
|
|
the guard clause short-circuits before any I/O.
|
|
"""
|
|
bus = UnixSocketBus(pathlib.Path("/tmp/does-not-matter.sock"))
|
|
bus._closed = True
|
|
return bus
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_publish_on_closed_bus_returns_silently(
|
|
caplog: pytest.LogCaptureFixture,
|
|
) -> None:
|
|
"""First post-close publish warns loudly; does not raise."""
|
|
bus = _make_closed_bus()
|
|
with caplog.at_level(logging.WARNING, logger="decnet.bus.client"):
|
|
await bus.publish("system.log", {"x": 1})
|
|
|
|
assert any(
|
|
rec.levelno == logging.WARNING
|
|
and "publish on closed bus dropped" in rec.getMessage()
|
|
for rec in caplog.records
|
|
), f"expected one WARNING, got: {[(r.levelname, r.getMessage()) for r in caplog.records]}"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_subsequent_closed_publishes_downgrade_to_debug(
|
|
caplog: pytest.LogCaptureFixture,
|
|
) -> None:
|
|
"""Only the first drop warns; the next N drops are DEBUG. This is
|
|
the regression guard against the log flood."""
|
|
bus = _make_closed_bus()
|
|
|
|
with caplog.at_level(logging.DEBUG, logger="decnet.bus.client"):
|
|
for _ in range(50):
|
|
await bus.publish("system.log", {"x": 1})
|
|
|
|
warnings = [r for r in caplog.records if r.levelno == logging.WARNING]
|
|
debugs = [r for r in caplog.records if r.levelno == logging.DEBUG]
|
|
assert len(warnings) == 1, (
|
|
f"expected exactly 1 WARNING across 50 publishes, got {len(warnings)}"
|
|
)
|
|
assert len(debugs) >= 49, (
|
|
f"expected >=49 DEBUG drops, got {len(debugs)}"
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_thread_safe_publisher_short_circuits_on_closed_bus() -> None:
|
|
"""The sync shim returned by make_thread_safe_publisher must NOT
|
|
marshal a coroutine onto the loop when the bus is already closed."""
|
|
bus = _make_closed_bus()
|
|
loop = asyncio.get_running_loop()
|
|
|
|
publisher = make_thread_safe_publisher(bus, loop)
|
|
|
|
# Patch run_coroutine_threadsafe so we can detect if the shim tries
|
|
# to marshal anything.
|
|
import decnet.bus.publish as pub_mod
|
|
called = MagicMock()
|
|
orig = asyncio.run_coroutine_threadsafe
|
|
pub_mod.asyncio.run_coroutine_threadsafe = lambda coro, _loop: (called(), orig(coro, _loop))[1]
|
|
|
|
try:
|
|
publisher("system.log", {"x": 1})
|
|
publisher("system.log", {"x": 2})
|
|
publisher("system.log", {"x": 3})
|
|
finally:
|
|
pub_mod.asyncio.run_coroutine_threadsafe = orig
|
|
|
|
called.assert_not_called()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_thread_safe_publisher_noop_when_bus_is_none() -> None:
|
|
"""A None bus still yields a no-op callable (pre-existing contract)."""
|
|
loop = asyncio.get_running_loop()
|
|
publisher = make_thread_safe_publisher(None, loop)
|
|
# Should not raise, return None.
|
|
assert publisher("topic", {"x": 1}) is None
|