fix(bus): retry app-bus connect with backoff instead of one-shot veto

A startup race between `decnet bus` being ready and the API's lifespan
hitting `get_app_bus()` at api.py:135 would set `_tried = True`
permanently, poisoning the singleton for the rest of the process: the
dashboard shows BUS OFFLINE, topology SSE falls into the bus-is-None
snapshot-only branch, mutator publish calls no-op. Only an API
restart recovered.

Replaces the one-shot veto with a time-gated retry keyed on a
`_last_failure_ts` monotonic timestamp plus a 2 s backoff. Publishers
on the hot path still pay at most one connect attempt every 2 s when
the bus is down, but the singleton auto-recovers within 5 s (one
dashboard poll) once the bus comes up.

The asyncio lock still serialises concurrent callers so the bus server
doesn't get stampeded with parallel connect attempts on startup.
This commit is contained in:
2026-04-23 17:59:17 -04:00
parent ef4179ea1f
commit eb2308d9e1
2 changed files with 168 additions and 12 deletions

View File

@@ -11,10 +11,17 @@ Failures during :meth:`BaseBus.connect` are swallowed and logged — a
dead bus must never break request serving. Publishers should treat a
``None`` return from :func:`get_app_bus` as "skip this notification",
same as ``DECNET_BUS_ENABLED=false``.
Connect is **retried with a short backoff** (not one-shot): a startup
race where the API lifespan hits :func:`get_app_bus` before ``decnet
bus`` is ready would otherwise poison the singleton for the entire
process lifetime. Instead we remember the last failure timestamp and
let callers retry once ``_RETRY_BACKOFF`` seconds have passed.
"""
from __future__ import annotations
import asyncio
import time
from decnet.bus.base import BaseBus
from decnet.bus.factory import get_bus
@@ -22,48 +29,62 @@ from decnet.logging import get_logger
log = get_logger("bus.app")
# Publishers in the hot path shouldn't pay connect-retry latency on every
# call; the dashboard's own 5 s poll interval recovers within one tick
# once the bus comes up. A persistently-dead bus only gets a connect
# attempt every 2 s, not once per request.
_RETRY_BACKOFF: float = 2.0
_lock = asyncio.Lock()
_shared: BaseBus | None = None
_tried = False
_last_failure_ts: float = 0.0
async def get_app_bus() -> BaseBus | None:
"""Return the process-wide connected bus, or ``None`` if unavailable.
On first call, constructs a client via :func:`get_bus` and awaits
``connect()``. Subsequent calls return the cached instance. If the
initial connect raises, we remember the failure and return ``None``
from here on — callers are expected to fall back cleanly.
``connect()``. Subsequent calls return the cached instance. If a
connect attempt raises, the failure timestamp is recorded and
subsequent calls within ``_RETRY_BACKOFF`` seconds return ``None``
without re-attempting — after the backoff window, the next call
retries. This is what lets the API recover from a
``decnet bus``-started-after-API race without a full API restart.
"""
global _shared, _tried
global _shared, _last_failure_ts
if _shared is not None:
return _shared
if _tried:
if (time.monotonic() - _last_failure_ts) < _RETRY_BACKOFF:
return None
async with _lock:
if _shared is not None:
return _shared
if _tried:
if (time.monotonic() - _last_failure_ts) < _RETRY_BACKOFF:
return None
_tried = True
try:
candidate = get_bus(client_name="api")
await candidate.connect()
_shared = candidate
_last_failure_ts = 0.0
return _shared
except Exception as exc: # noqa: BLE001
log.warning("app bus unavailable: %s", exc)
_last_failure_ts = time.monotonic()
return None
return _shared
async def close_app_bus() -> None:
"""Close the shared bus if one is open; reset the tried-once guard.
"""Close the shared bus if one is open; clear the backoff window.
Call from the API lifespan shutdown. Safe to call multiple times.
Resetting ``_last_failure_ts`` means the next ``get_app_bus()``
after shutdown-and-restart-within-the-same-process (rare, but
tests do this) retries immediately instead of honouring a stale
backoff.
"""
global _shared, _tried
global _shared, _last_failure_ts
bus, _shared = _shared, None
_tried = False
_last_failure_ts = 0.0
if bus is not None:
try:
await bus.close()

View File

@@ -0,0 +1,135 @@
"""Tests for the process-wide app-bus singleton.
Covers the retry-with-backoff behaviour of ``get_app_bus()`` — the
regression guard against the "one-shot veto" bug where a startup race
between ``decnet bus`` and the API's lifespan poisoned the singleton
for the entire process lifetime.
"""
from __future__ import annotations
import asyncio
import time
from typing import Any
from unittest.mock import AsyncMock, MagicMock
import pytest
import decnet.bus.app as app_module
@pytest.fixture(autouse=True)
def _reset_singleton() -> Any:
"""Reset the module-level singleton state between tests."""
app_module._shared = None
app_module._last_failure_ts = 0.0
yield
app_module._shared = None
app_module._last_failure_ts = 0.0
@pytest.mark.asyncio
async def test_first_call_succeeds_when_bus_connectable(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Happy path: connect succeeds, shared instance returned thereafter."""
fake_bus = MagicMock()
fake_bus.connect = AsyncMock()
monkeypatch.setattr(app_module, "get_bus", lambda **_kw: fake_bus)
result = await app_module.get_app_bus()
assert result is fake_bus
fake_bus.connect.assert_awaited_once()
# Subsequent call returns cached instance, no second connect.
result2 = await app_module.get_app_bus()
assert result2 is fake_bus
assert fake_bus.connect.await_count == 1
@pytest.mark.asyncio
async def test_connect_failure_backoff_prevents_hot_retry(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""After a failed connect, subsequent calls within the backoff
window return None WITHOUT re-attempting connect — the cost of
failure stays bounded."""
fake_bus = MagicMock()
fake_bus.connect = AsyncMock(side_effect=ConnectionError("socket gone"))
monkeypatch.setattr(app_module, "get_bus", lambda **_kw: fake_bus)
assert await app_module.get_app_bus() is None
assert fake_bus.connect.await_count == 1
# Second immediate call: still within backoff, no retry.
assert await app_module.get_app_bus() is None
assert fake_bus.connect.await_count == 1
# Third immediate call: same thing.
assert await app_module.get_app_bus() is None
assert fake_bus.connect.await_count == 1
@pytest.mark.asyncio
async def test_connect_retried_after_backoff_expires(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Once the backoff window expires, the next call tries connect()
again. This is the regression guard for the original 'one-shot veto'
bug — the whole point of the fix."""
fake_bus = MagicMock()
# First attempt fails, second succeeds.
fake_bus.connect = AsyncMock(
side_effect=[ConnectionError("socket gone"), None]
)
monkeypatch.setattr(app_module, "get_bus", lambda **_kw: fake_bus)
assert await app_module.get_app_bus() is None
assert fake_bus.connect.await_count == 1
# Simulate the backoff window elapsing by rewinding the recorded
# failure timestamp into the past.
app_module._last_failure_ts = time.monotonic() - (app_module._RETRY_BACKOFF + 0.1)
result = await app_module.get_app_bus()
assert result is fake_bus
assert fake_bus.connect.await_count == 2
@pytest.mark.asyncio
async def test_close_app_bus_clears_backoff_window(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""close_app_bus() after a failure (or after a successful bus) must
reset _last_failure_ts so the next get_app_bus() retries immediately
— otherwise tests that bring the app-bus up/down/up in one process
would see stale backoff."""
fake_bus = MagicMock()
fake_bus.connect = AsyncMock(side_effect=ConnectionError("x"))
fake_bus.close = AsyncMock()
monkeypatch.setattr(app_module, "get_bus", lambda **_kw: fake_bus)
assert await app_module.get_app_bus() is None
assert app_module._last_failure_ts > 0.0
await app_module.close_app_bus()
assert app_module._last_failure_ts == 0.0
# Next call retries immediately (no backoff wait).
fake_bus.connect.side_effect = None # make it succeed this time
assert await app_module.get_app_bus() is fake_bus
@pytest.mark.asyncio
async def test_concurrent_callers_do_not_stampede_connect(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""The lock must serialise concurrent callers so a just-started bus
doesn't get hammered with N parallel connect attempts."""
fake_bus = MagicMock()
fake_bus.connect = AsyncMock()
monkeypatch.setattr(app_module, "get_bus", lambda **_kw: fake_bus)
results = await asyncio.gather(
*[app_module.get_app_bus() for _ in range(10)]
)
assert all(r is fake_bus for r in results)
assert fake_bus.connect.await_count == 1