diff --git a/decnet/webhook/worker.py b/decnet/webhook/worker.py index b24253a0..5c4da60a 100644 --- a/decnet/webhook/worker.py +++ b/decnet/webhook/worker.py @@ -43,6 +43,11 @@ _EGRESS_CONCURRENCY = 10 # DECNET_WEBHOOK_CIRCUIT_THRESHOLD. Operator clears the trip by # toggling `enabled` back on via PATCH. _CIRCUIT_THRESHOLD = max(1, int(os.environ.get("DECNET_WEBHOOK_CIRCUIT_THRESHOLD", "5"))) +# How long to wait between bus (re)connect attempts when the bus is +# unreachable. Keeps the worker self-healing against a bus that starts +# after the webhook worker does (systemd race) or crashes+restarts +# mid-operation. Override via DECNET_WEBHOOK_BUS_RECONNECT_SECS. +_BUS_RECONNECT_SECS = max(5.0, float(os.environ.get("DECNET_WEBHOOK_BUS_RECONNECT_SECS", "60"))) def _patterns_for(sub: dict[str, Any]) -> list[str]: @@ -70,42 +75,83 @@ async def webhook_worker( *, reload_interval: float = _RELOAD_FALLBACK_SECS, http_client: httpx.AsyncClient | None = None, + bus_reconnect_secs: float = _BUS_RECONNECT_SECS, ) -> None: """Main entry — connect bus, spawn per-subscription delivery tasks, - reload on signal.""" + reload on signal. Retries bus connection in a loop so the worker + self-heals if the bus starts after the worker or restarts mid-run. + """ logger.info("webhook worker started") - bus = None - try: - bus = get_bus(client_name="webhook") - await bus.connect() - except Exception as exc: # noqa: BLE001 — bus is optional (DEBT-031) - logger.warning("webhook: bus unavailable, running in idle mode: %s", exc) - bus = None - shutdown = asyncio.Event() - reload_flag = asyncio.Event() - - heartbeat_task = ( - asyncio.create_task(run_health_heartbeat(bus, "webhook")) - if bus is not None else None - ) - control_task = ( - asyncio.create_task(run_control_listener(bus, "webhook", shutdown)) - if bus is not None else None - ) - reload_task = ( - asyncio.create_task(_reload_listener(bus, reload_flag, shutdown)) - if bus is not None else None - ) - owns_http = http_client is None if owns_http: http_client = httpx.AsyncClient(timeout=10.0) + try: + while not shutdown.is_set(): + # Try to connect to the bus. If it's down, wait out the + # reconnect interval and try again. Shutdown interrupts the + # wait so systemd stop doesn't hang for a minute. + bus = None + try: + bus = get_bus(client_name="webhook") + await bus.connect() + except Exception as exc: # noqa: BLE001 + logger.warning( + "webhook: bus unavailable, retrying in %.0fs: %s", + bus_reconnect_secs, exc, + ) + with contextlib.suppress(asyncio.TimeoutError): + await asyncio.wait_for( + shutdown.wait(), timeout=bus_reconnect_secs + ) + continue + + # Bus is live — run one dispatch epoch until it fails or we + # shut down. On any crash the outer loop reconnects and + # retries from scratch; no state carries across epochs so a + # half-dead bus can't leave us with stale subscriptions. + logger.info("webhook: bus connected") + try: + await _run_with_bus( + bus, repo, http_client, + shutdown, reload_interval, + ) + except asyncio.CancelledError: + shutdown.set() + raise + except Exception as exc: # noqa: BLE001 + logger.warning( + "webhook: dispatch crashed, will reconnect: %s", exc + ) + finally: + with contextlib.suppress(Exception): + await bus.close() + finally: + if owns_http and http_client is not None: + await http_client.aclose() + + +async def _run_with_bus( + bus, + repo: BaseRepository, + http_client: httpx.AsyncClient, + shutdown: asyncio.Event, + reload_interval: float, +) -> None: + """Run one bus-up epoch: start heartbeat+control+reload listeners, + dispatch events until shutdown or error, clean up.""" + reload_flag = asyncio.Event() semaphore = asyncio.Semaphore(_EGRESS_CONCURRENCY) consumer_tasks: list[asyncio.Task] = [] + heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "webhook")) + control_task = asyncio.create_task( + run_control_listener(bus, "webhook", shutdown) + ) + reload_task = asyncio.create_task(_reload_listener(bus, reload_flag, shutdown)) + try: while not shutdown.is_set(): # Cancel prior epoch's consumers before starting new ones. @@ -113,41 +159,28 @@ async def webhook_worker( consumer_tasks.clear() subs = await repo.list_webhook_subscriptions(enabled_only=True) - - if bus is not None: - for sub in subs: - for pattern in _patterns_for(sub): - consumer_tasks.append(asyncio.create_task( - _consume( - bus, pattern, sub, repo, http_client, semaphore, reload_flag, - ) - )) + for sub in subs: + for pattern in _patterns_for(sub): + consumer_tasks.append(asyncio.create_task( + _consume( + bus, pattern, sub, repo, http_client, semaphore, reload_flag, + ) + )) # Wait for reload OR timer fallback. Shutdown propagates via - # CancelledError when the outer task is cancelled — no explicit - # race required because `await` points are cancellation-safe. + # CancelledError when the outer task is cancelled. with contextlib.suppress(asyncio.TimeoutError): await asyncio.wait_for( reload_flag.wait(), timeout=reload_interval ) reload_flag.clear() - except asyncio.CancelledError: - shutdown.set() - raise finally: await _cancel_all(consumer_tasks) for t in (heartbeat_task, control_task, reload_task): - if t is not None: - t.cancel() + t.cancel() for t in (heartbeat_task, control_task, reload_task): - if t is not None: - with contextlib.suppress(asyncio.CancelledError, Exception): - await t - if bus is not None: - with contextlib.suppress(Exception): - await bus.close() - if owns_http and http_client is not None: - await http_client.aclose() + with contextlib.suppress(asyncio.CancelledError, Exception): + await t async def _cancel_all(tasks: list[asyncio.Task]) -> None: diff --git a/tests/webhook/test_worker.py b/tests/webhook/test_worker.py index 85c03291..57404bd6 100644 --- a/tests/webhook/test_worker.py +++ b/tests/webhook/test_worker.py @@ -293,3 +293,59 @@ async def test_worker_trips_circuit_after_threshold(fake_bus, monkeypatch): assert sub["enabled"] is False assert sub["auto_disabled_at"] is not None + +@pytest.mark.asyncio +async def test_worker_self_heals_when_bus_starts_late(fake_bus): + """Bus down at startup → worker parks in a retry loop. Once the bus + comes up on the next attempt, the worker transitions to dispatch + mode and delivers events normally. + """ + sub = _sub("u1", "w1", ["attacker.>"]) + repo = _FakeRepo([sub]) + captured: list[httpx.Request] = [] + + async def handler(request: httpx.Request) -> httpx.Response: + captured.append(request) + return httpx.Response(200) + + # First call: raise (bus unavailable). Second call: hand out the + # FakeBus. The worker's retry loop should bridge the gap. + calls = {"n": 0} + + def flaky_get_bus(*args, **kwargs): + calls["n"] += 1 + if calls["n"] == 1: + raise ConnectionError("bus not ready yet") + return fake_bus + + async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client: + with patch("decnet.webhook.worker.get_bus", side_effect=flaky_get_bus): + task = asyncio.create_task( + webhook_worker( + repo, + reload_interval=0.5, + http_client=client, + bus_reconnect_secs=0.3, + ) + ) + # First attempt fails, worker waits bus_reconnect_secs, second + # attempt succeeds. Give it a generous window. + await asyncio.sleep(0.8) + + await fake_bus.publish( + "attacker.observed", {"ip": "1.2.3.4"}, event_type="x" + ) + for _ in range(60): + if captured: + break + await asyncio.sleep(0.05) + + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + assert calls["n"] >= 2, "expected at least one retry after initial failure" + assert len(captured) == 1, "expected delivery once bus came up" +