Files
DECNET/tests/bus/test_closed_publish.py
anti f2b3393669 chore: relicense to AGPL-3.0-or-later and add SPDX headers
Replaces LICENSE (GPLv3 -> AGPLv3) and prepends
`SPDX-License-Identifier: AGPL-3.0-or-later` to every source file
across decnet/, decnet_web/, tests/, scripts/, and tools/.

Rationale: closes the GPLv3 ASP loophole so any party operating a
modified DECNET as a network service must offer their modified
source. Personal copyright (Samuel Paschuan) + inbound=outbound
contributions make a future unilateral relicense infeasible.

- LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt)
- COPYRIGHT: project copyright notice
- tools/add_spdx_headers.py: idempotent header injector
  (shebang- and PEP 263-aware)

Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh).
No behavior change; comments only.
2026-05-22 21:04:16 -04:00

105 lines
3.6 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""Tests for graceful publish-on-closed-bus behaviour.
Regression guard for the 'publish on closed bus' log flood: when a
worker's private bus closes (shutdown) but stream threads keep calling
the publish closure, the bus must not raise a RuntimeError per call.
First drop warns loudly (bus is critical infra); subsequent drops on
the same instance are DEBUG to prevent the flood.
"""
from __future__ import annotations
import asyncio
import logging
import pathlib
from unittest.mock import MagicMock
import pytest
from decnet.bus.publish import make_thread_safe_publisher
from decnet.bus.unix_client import UnixSocketBus
def _make_closed_bus() -> UnixSocketBus:
"""Build a UnixSocketBus and flip _closed without touching sockets.
We don't need a live connection to test the closed-publish path —
the guard clause short-circuits before any I/O.
"""
bus = UnixSocketBus(pathlib.Path("/tmp/does-not-matter.sock"))
bus._closed = True
return bus
@pytest.mark.asyncio
async def test_publish_on_closed_bus_returns_silently(
caplog: pytest.LogCaptureFixture,
) -> None:
"""First post-close publish warns loudly; does not raise."""
bus = _make_closed_bus()
with caplog.at_level(logging.WARNING, logger="decnet.bus.client"):
await bus.publish("system.log", {"x": 1})
assert any(
rec.levelno == logging.WARNING
and "publish on closed bus dropped" in rec.getMessage()
for rec in caplog.records
), f"expected one WARNING, got: {[(r.levelname, r.getMessage()) for r in caplog.records]}"
@pytest.mark.asyncio
async def test_subsequent_closed_publishes_downgrade_to_debug(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Only the first drop warns; the next N drops are DEBUG. This is
the regression guard against the log flood."""
bus = _make_closed_bus()
with caplog.at_level(logging.DEBUG, logger="decnet.bus.client"):
for _ in range(50):
await bus.publish("system.log", {"x": 1})
warnings = [r for r in caplog.records if r.levelno == logging.WARNING]
debugs = [r for r in caplog.records if r.levelno == logging.DEBUG]
assert len(warnings) == 1, (
f"expected exactly 1 WARNING across 50 publishes, got {len(warnings)}"
)
assert len(debugs) >= 49, (
f"expected >=49 DEBUG drops, got {len(debugs)}"
)
@pytest.mark.asyncio
async def test_thread_safe_publisher_short_circuits_on_closed_bus() -> None:
"""The sync shim returned by make_thread_safe_publisher must NOT
marshal a coroutine onto the loop when the bus is already closed."""
bus = _make_closed_bus()
loop = asyncio.get_running_loop()
publisher = make_thread_safe_publisher(bus, loop)
# Patch run_coroutine_threadsafe so we can detect if the shim tries
# to marshal anything.
import decnet.bus.publish as pub_mod
called = MagicMock()
orig = asyncio.run_coroutine_threadsafe
pub_mod.asyncio.run_coroutine_threadsafe = lambda coro, _loop: (called(), orig(coro, _loop))[1]
try:
publisher("system.log", {"x": 1})
publisher("system.log", {"x": 2})
publisher("system.log", {"x": 3})
finally:
pub_mod.asyncio.run_coroutine_threadsafe = orig
called.assert_not_called()
@pytest.mark.asyncio
async def test_thread_safe_publisher_noop_when_bus_is_none() -> None:
"""A None bus still yields a no-op callable (pre-existing contract)."""
loop = asyncio.get_running_loop()
publisher = make_thread_safe_publisher(None, loop)
# Should not raise, return None.
assert publisher("topic", {"x": 1}) is None