A startup race between `decnet bus` being ready and the API's lifespan hitting `get_app_bus()` at api.py:135 would set `_tried = True` permanently, poisoning the singleton for the rest of the process: the dashboard shows BUS OFFLINE, topology SSE falls into the bus-is-None snapshot-only branch, mutator publish calls no-op. Only an API restart recovered. Replaces the one-shot veto with a time-gated retry keyed on a `_last_failure_ts` monotonic timestamp plus a 2 s backoff. Publishers on the hot path still pay at most one connect attempt every 2 s when the bus is down, but the singleton auto-recovers within 5 s (one dashboard poll) once the bus comes up. The asyncio lock still serialises concurrent callers so the bus server doesn't get stampeded with parallel connect attempts on startup.
93 lines
3.5 KiB
Python
93 lines
3.5 KiB
Python
"""Process-wide bus singleton for request-serving workers (API, SSE routes).
|
|
|
|
A single connected :class:`~decnet.bus.base.BaseBus` shared across request
|
|
handlers — opening a UNIX socket per request would be wasteful and add
|
|
latency to the hot path. The API lifespan is responsible for calling
|
|
:func:`close_app_bus` on shutdown; connect is lazy so tests and
|
|
contract-test mode that never hit a publish/subscribe code path don't
|
|
pay for a bus connection they'll never use.
|
|
|
|
Failures during :meth:`BaseBus.connect` are swallowed and logged — a
|
|
dead bus must never break request serving. Publishers should treat a
|
|
``None`` return from :func:`get_app_bus` as "skip this notification",
|
|
same as ``DECNET_BUS_ENABLED=false``.
|
|
|
|
Connect is **retried with a short backoff** (not one-shot): a startup
|
|
race where the API lifespan hits :func:`get_app_bus` before ``decnet
|
|
bus`` is ready would otherwise poison the singleton for the entire
|
|
process lifetime. Instead we remember the last failure timestamp and
|
|
let callers retry once ``_RETRY_BACKOFF`` seconds have passed.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import time
|
|
|
|
from decnet.bus.base import BaseBus
|
|
from decnet.bus.factory import get_bus
|
|
from decnet.logging import get_logger
|
|
|
|
log = get_logger("bus.app")
|
|
|
|
# Publishers in the hot path shouldn't pay connect-retry latency on every
|
|
# call; the dashboard's own 5 s poll interval recovers within one tick
|
|
# once the bus comes up. A persistently-dead bus only gets a connect
|
|
# attempt every 2 s, not once per request.
|
|
_RETRY_BACKOFF: float = 2.0
|
|
|
|
_lock = asyncio.Lock()
|
|
_shared: BaseBus | None = None
|
|
_last_failure_ts: float = 0.0
|
|
|
|
|
|
async def get_app_bus() -> BaseBus | None:
|
|
"""Return the process-wide connected bus, or ``None`` if unavailable.
|
|
|
|
On first call, constructs a client via :func:`get_bus` and awaits
|
|
``connect()``. Subsequent calls return the cached instance. If a
|
|
connect attempt raises, the failure timestamp is recorded and
|
|
subsequent calls within ``_RETRY_BACKOFF`` seconds return ``None``
|
|
without re-attempting — after the backoff window, the next call
|
|
retries. This is what lets the API recover from a
|
|
``decnet bus``-started-after-API race without a full API restart.
|
|
"""
|
|
global _shared, _last_failure_ts
|
|
if _shared is not None:
|
|
return _shared
|
|
if (time.monotonic() - _last_failure_ts) < _RETRY_BACKOFF:
|
|
return None
|
|
async with _lock:
|
|
if _shared is not None:
|
|
return _shared
|
|
if (time.monotonic() - _last_failure_ts) < _RETRY_BACKOFF:
|
|
return None
|
|
try:
|
|
candidate = get_bus(client_name="api")
|
|
await candidate.connect()
|
|
_shared = candidate
|
|
_last_failure_ts = 0.0
|
|
return _shared
|
|
except Exception as exc: # noqa: BLE001
|
|
log.warning("app bus unavailable: %s", exc)
|
|
_last_failure_ts = time.monotonic()
|
|
return None
|
|
|
|
|
|
async def close_app_bus() -> None:
|
|
"""Close the shared bus if one is open; clear the backoff window.
|
|
|
|
Call from the API lifespan shutdown. Safe to call multiple times.
|
|
Resetting ``_last_failure_ts`` means the next ``get_app_bus()``
|
|
after shutdown-and-restart-within-the-same-process (rare, but
|
|
tests do this) retries immediately instead of honouring a stale
|
|
backoff.
|
|
"""
|
|
global _shared, _last_failure_ts
|
|
bus, _shared = _shared, None
|
|
_last_failure_ts = 0.0
|
|
if bus is not None:
|
|
try:
|
|
await bus.close()
|
|
except Exception as exc: # noqa: BLE001
|
|
log.warning("app bus close raised: %s", exc)
|