diff --git a/decnet/bus/app.py b/decnet/bus/app.py index ed8e8e87..129bf3c6 100644 --- a/decnet/bus/app.py +++ b/decnet/bus/app.py @@ -11,10 +11,17 @@ Failures during :meth:`BaseBus.connect` are swallowed and logged — a dead bus must never break request serving. Publishers should treat a ``None`` return from :func:`get_app_bus` as "skip this notification", same as ``DECNET_BUS_ENABLED=false``. + +Connect is **retried with a short backoff** (not one-shot): a startup +race where the API lifespan hits :func:`get_app_bus` before ``decnet +bus`` is ready would otherwise poison the singleton for the entire +process lifetime. Instead we remember the last failure timestamp and +let callers retry once ``_RETRY_BACKOFF`` seconds have passed. """ from __future__ import annotations import asyncio +import time from decnet.bus.base import BaseBus from decnet.bus.factory import get_bus @@ -22,48 +29,62 @@ from decnet.logging import get_logger log = get_logger("bus.app") +# Publishers in the hot path shouldn't pay connect-retry latency on every +# call; the dashboard's own 5 s poll interval recovers within one tick +# once the bus comes up. A persistently-dead bus only gets a connect +# attempt every 2 s, not once per request. +_RETRY_BACKOFF: float = 2.0 + _lock = asyncio.Lock() _shared: BaseBus | None = None -_tried = False +_last_failure_ts: float = 0.0 async def get_app_bus() -> BaseBus | None: """Return the process-wide connected bus, or ``None`` if unavailable. On first call, constructs a client via :func:`get_bus` and awaits - ``connect()``. Subsequent calls return the cached instance. If the - initial connect raises, we remember the failure and return ``None`` - from here on — callers are expected to fall back cleanly. + ``connect()``. Subsequent calls return the cached instance. If a + connect attempt raises, the failure timestamp is recorded and + subsequent calls within ``_RETRY_BACKOFF`` seconds return ``None`` + without re-attempting — after the backoff window, the next call + retries. This is what lets the API recover from a + ``decnet bus``-started-after-API race without a full API restart. """ - global _shared, _tried + global _shared, _last_failure_ts if _shared is not None: return _shared - if _tried: + if (time.monotonic() - _last_failure_ts) < _RETRY_BACKOFF: return None async with _lock: if _shared is not None: return _shared - if _tried: + if (time.monotonic() - _last_failure_ts) < _RETRY_BACKOFF: return None - _tried = True try: candidate = get_bus(client_name="api") await candidate.connect() _shared = candidate + _last_failure_ts = 0.0 + return _shared except Exception as exc: # noqa: BLE001 log.warning("app bus unavailable: %s", exc) + _last_failure_ts = time.monotonic() return None - return _shared async def close_app_bus() -> None: - """Close the shared bus if one is open; reset the tried-once guard. + """Close the shared bus if one is open; clear the backoff window. Call from the API lifespan shutdown. Safe to call multiple times. + Resetting ``_last_failure_ts`` means the next ``get_app_bus()`` + after shutdown-and-restart-within-the-same-process (rare, but + tests do this) retries immediately instead of honouring a stale + backoff. """ - global _shared, _tried + global _shared, _last_failure_ts bus, _shared = _shared, None - _tried = False + _last_failure_ts = 0.0 if bus is not None: try: await bus.close() diff --git a/tests/bus/test_app_singleton.py b/tests/bus/test_app_singleton.py new file mode 100644 index 00000000..0df9a424 --- /dev/null +++ b/tests/bus/test_app_singleton.py @@ -0,0 +1,135 @@ +"""Tests for the process-wide app-bus singleton. + +Covers the retry-with-backoff behaviour of ``get_app_bus()`` — the +regression guard against the "one-shot veto" bug where a startup race +between ``decnet bus`` and the API's lifespan poisoned the singleton +for the entire process lifetime. +""" +from __future__ import annotations + +import asyncio +import time +from typing import Any +from unittest.mock import AsyncMock, MagicMock + +import pytest + +import decnet.bus.app as app_module + + +@pytest.fixture(autouse=True) +def _reset_singleton() -> Any: + """Reset the module-level singleton state between tests.""" + app_module._shared = None + app_module._last_failure_ts = 0.0 + yield + app_module._shared = None + app_module._last_failure_ts = 0.0 + + +@pytest.mark.asyncio +async def test_first_call_succeeds_when_bus_connectable( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Happy path: connect succeeds, shared instance returned thereafter.""" + fake_bus = MagicMock() + fake_bus.connect = AsyncMock() + monkeypatch.setattr(app_module, "get_bus", lambda **_kw: fake_bus) + + result = await app_module.get_app_bus() + assert result is fake_bus + fake_bus.connect.assert_awaited_once() + + # Subsequent call returns cached instance, no second connect. + result2 = await app_module.get_app_bus() + assert result2 is fake_bus + assert fake_bus.connect.await_count == 1 + + +@pytest.mark.asyncio +async def test_connect_failure_backoff_prevents_hot_retry( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """After a failed connect, subsequent calls within the backoff + window return None WITHOUT re-attempting connect — the cost of + failure stays bounded.""" + fake_bus = MagicMock() + fake_bus.connect = AsyncMock(side_effect=ConnectionError("socket gone")) + monkeypatch.setattr(app_module, "get_bus", lambda **_kw: fake_bus) + + assert await app_module.get_app_bus() is None + assert fake_bus.connect.await_count == 1 + + # Second immediate call: still within backoff, no retry. + assert await app_module.get_app_bus() is None + assert fake_bus.connect.await_count == 1 + + # Third immediate call: same thing. + assert await app_module.get_app_bus() is None + assert fake_bus.connect.await_count == 1 + + +@pytest.mark.asyncio +async def test_connect_retried_after_backoff_expires( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Once the backoff window expires, the next call tries connect() + again. This is the regression guard for the original 'one-shot veto' + bug — the whole point of the fix.""" + fake_bus = MagicMock() + # First attempt fails, second succeeds. + fake_bus.connect = AsyncMock( + side_effect=[ConnectionError("socket gone"), None] + ) + monkeypatch.setattr(app_module, "get_bus", lambda **_kw: fake_bus) + + assert await app_module.get_app_bus() is None + assert fake_bus.connect.await_count == 1 + + # Simulate the backoff window elapsing by rewinding the recorded + # failure timestamp into the past. + app_module._last_failure_ts = time.monotonic() - (app_module._RETRY_BACKOFF + 0.1) + + result = await app_module.get_app_bus() + assert result is fake_bus + assert fake_bus.connect.await_count == 2 + + +@pytest.mark.asyncio +async def test_close_app_bus_clears_backoff_window( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """close_app_bus() after a failure (or after a successful bus) must + reset _last_failure_ts so the next get_app_bus() retries immediately + — otherwise tests that bring the app-bus up/down/up in one process + would see stale backoff.""" + fake_bus = MagicMock() + fake_bus.connect = AsyncMock(side_effect=ConnectionError("x")) + fake_bus.close = AsyncMock() + monkeypatch.setattr(app_module, "get_bus", lambda **_kw: fake_bus) + + assert await app_module.get_app_bus() is None + assert app_module._last_failure_ts > 0.0 + + await app_module.close_app_bus() + assert app_module._last_failure_ts == 0.0 + # Next call retries immediately (no backoff wait). + fake_bus.connect.side_effect = None # make it succeed this time + assert await app_module.get_app_bus() is fake_bus + + +@pytest.mark.asyncio +async def test_concurrent_callers_do_not_stampede_connect( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """The lock must serialise concurrent callers so a just-started bus + doesn't get hammered with N parallel connect attempts.""" + fake_bus = MagicMock() + fake_bus.connect = AsyncMock() + monkeypatch.setattr(app_module, "get_bus", lambda **_kw: fake_bus) + + results = await asyncio.gather( + *[app_module.get_app_bus() for _ in range(10)] + ) + assert all(r is fake_bus for r in results) + assert fake_bus.connect.await_count == 1