From efc98285aa0f7aa52820df5ba163d260686c87d8 Mon Sep 17 00:00:00 2001 From: anti Date: Fri, 24 Apr 2026 16:39:38 -0400 Subject: [PATCH] fix(webhook/worker): self-heal when bus starts late or restarts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Before: if the bus was unreachable at worker start, we logged "running in idle mode" once and parked on shutdown forever. systemd doesn't guarantee bus is fully up before the webhook worker starts, so a race on boot left the worker permanently dead until restart. Now: wrap the whole bus-use in an outer reconnect loop. while not shutdown: try: connect() except: sleep(RECONNECT_SECS) ; continue try: run_with_bus(...) # heartbeat + dispatch except: log+close ; reconnect on next iter Clean consequence: if the bus dies mid-operation the dispatch loop's subscriptions raise inside the consumer tasks, `_run_with_bus` exits, the outer loop closes the stale connection and reconnects. No partial state leaks across epochs — fresh bus, fresh subs, fresh heartbeat. Interval is 60s by default, overridable via DECNET_WEBHOOK_BUS_RECONNECT_SECS. Shutdown wakes the wait so systemctl stop doesn't hang for a minute. Test added: flaky get_bus that fails once, then returns a live FakeBus — asserts retry + successful delivery. get_app_bus() in decnet/bus/app.py already has a 2s backoff retry so the FastAPI hot path self-heals; this commit brings the standalone webhook worker in line with the same posture. --- decnet/webhook/worker.py | 129 ++++++++++++++++++++++------------- tests/webhook/test_worker.py | 56 +++++++++++++++ 2 files changed, 137 insertions(+), 48 deletions(-) diff --git a/decnet/webhook/worker.py b/decnet/webhook/worker.py index b24253a0..5c4da60a 100644 --- a/decnet/webhook/worker.py +++ b/decnet/webhook/worker.py @@ -43,6 +43,11 @@ _EGRESS_CONCURRENCY = 10 # DECNET_WEBHOOK_CIRCUIT_THRESHOLD. Operator clears the trip by # toggling `enabled` back on via PATCH. _CIRCUIT_THRESHOLD = max(1, int(os.environ.get("DECNET_WEBHOOK_CIRCUIT_THRESHOLD", "5"))) +# How long to wait between bus (re)connect attempts when the bus is +# unreachable. Keeps the worker self-healing against a bus that starts +# after the webhook worker does (systemd race) or crashes+restarts +# mid-operation. Override via DECNET_WEBHOOK_BUS_RECONNECT_SECS. +_BUS_RECONNECT_SECS = max(5.0, float(os.environ.get("DECNET_WEBHOOK_BUS_RECONNECT_SECS", "60"))) def _patterns_for(sub: dict[str, Any]) -> list[str]: @@ -70,42 +75,83 @@ async def webhook_worker( *, reload_interval: float = _RELOAD_FALLBACK_SECS, http_client: httpx.AsyncClient | None = None, + bus_reconnect_secs: float = _BUS_RECONNECT_SECS, ) -> None: """Main entry — connect bus, spawn per-subscription delivery tasks, - reload on signal.""" + reload on signal. Retries bus connection in a loop so the worker + self-heals if the bus starts after the worker or restarts mid-run. + """ logger.info("webhook worker started") - bus = None - try: - bus = get_bus(client_name="webhook") - await bus.connect() - except Exception as exc: # noqa: BLE001 — bus is optional (DEBT-031) - logger.warning("webhook: bus unavailable, running in idle mode: %s", exc) - bus = None - shutdown = asyncio.Event() - reload_flag = asyncio.Event() - - heartbeat_task = ( - asyncio.create_task(run_health_heartbeat(bus, "webhook")) - if bus is not None else None - ) - control_task = ( - asyncio.create_task(run_control_listener(bus, "webhook", shutdown)) - if bus is not None else None - ) - reload_task = ( - asyncio.create_task(_reload_listener(bus, reload_flag, shutdown)) - if bus is not None else None - ) - owns_http = http_client is None if owns_http: http_client = httpx.AsyncClient(timeout=10.0) + try: + while not shutdown.is_set(): + # Try to connect to the bus. If it's down, wait out the + # reconnect interval and try again. Shutdown interrupts the + # wait so systemd stop doesn't hang for a minute. + bus = None + try: + bus = get_bus(client_name="webhook") + await bus.connect() + except Exception as exc: # noqa: BLE001 + logger.warning( + "webhook: bus unavailable, retrying in %.0fs: %s", + bus_reconnect_secs, exc, + ) + with contextlib.suppress(asyncio.TimeoutError): + await asyncio.wait_for( + shutdown.wait(), timeout=bus_reconnect_secs + ) + continue + + # Bus is live — run one dispatch epoch until it fails or we + # shut down. On any crash the outer loop reconnects and + # retries from scratch; no state carries across epochs so a + # half-dead bus can't leave us with stale subscriptions. + logger.info("webhook: bus connected") + try: + await _run_with_bus( + bus, repo, http_client, + shutdown, reload_interval, + ) + except asyncio.CancelledError: + shutdown.set() + raise + except Exception as exc: # noqa: BLE001 + logger.warning( + "webhook: dispatch crashed, will reconnect: %s", exc + ) + finally: + with contextlib.suppress(Exception): + await bus.close() + finally: + if owns_http and http_client is not None: + await http_client.aclose() + + +async def _run_with_bus( + bus, + repo: BaseRepository, + http_client: httpx.AsyncClient, + shutdown: asyncio.Event, + reload_interval: float, +) -> None: + """Run one bus-up epoch: start heartbeat+control+reload listeners, + dispatch events until shutdown or error, clean up.""" + reload_flag = asyncio.Event() semaphore = asyncio.Semaphore(_EGRESS_CONCURRENCY) consumer_tasks: list[asyncio.Task] = [] + heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "webhook")) + control_task = asyncio.create_task( + run_control_listener(bus, "webhook", shutdown) + ) + reload_task = asyncio.create_task(_reload_listener(bus, reload_flag, shutdown)) + try: while not shutdown.is_set(): # Cancel prior epoch's consumers before starting new ones. @@ -113,41 +159,28 @@ async def webhook_worker( consumer_tasks.clear() subs = await repo.list_webhook_subscriptions(enabled_only=True) - - if bus is not None: - for sub in subs: - for pattern in _patterns_for(sub): - consumer_tasks.append(asyncio.create_task( - _consume( - bus, pattern, sub, repo, http_client, semaphore, reload_flag, - ) - )) + for sub in subs: + for pattern in _patterns_for(sub): + consumer_tasks.append(asyncio.create_task( + _consume( + bus, pattern, sub, repo, http_client, semaphore, reload_flag, + ) + )) # Wait for reload OR timer fallback. Shutdown propagates via - # CancelledError when the outer task is cancelled — no explicit - # race required because `await` points are cancellation-safe. + # CancelledError when the outer task is cancelled. with contextlib.suppress(asyncio.TimeoutError): await asyncio.wait_for( reload_flag.wait(), timeout=reload_interval ) reload_flag.clear() - except asyncio.CancelledError: - shutdown.set() - raise finally: await _cancel_all(consumer_tasks) for t in (heartbeat_task, control_task, reload_task): - if t is not None: - t.cancel() + t.cancel() for t in (heartbeat_task, control_task, reload_task): - if t is not None: - with contextlib.suppress(asyncio.CancelledError, Exception): - await t - if bus is not None: - with contextlib.suppress(Exception): - await bus.close() - if owns_http and http_client is not None: - await http_client.aclose() + with contextlib.suppress(asyncio.CancelledError, Exception): + await t async def _cancel_all(tasks: list[asyncio.Task]) -> None: diff --git a/tests/webhook/test_worker.py b/tests/webhook/test_worker.py index 85c03291..57404bd6 100644 --- a/tests/webhook/test_worker.py +++ b/tests/webhook/test_worker.py @@ -293,3 +293,59 @@ async def test_worker_trips_circuit_after_threshold(fake_bus, monkeypatch): assert sub["enabled"] is False assert sub["auto_disabled_at"] is not None + +@pytest.mark.asyncio +async def test_worker_self_heals_when_bus_starts_late(fake_bus): + """Bus down at startup → worker parks in a retry loop. Once the bus + comes up on the next attempt, the worker transitions to dispatch + mode and delivers events normally. + """ + sub = _sub("u1", "w1", ["attacker.>"]) + repo = _FakeRepo([sub]) + captured: list[httpx.Request] = [] + + async def handler(request: httpx.Request) -> httpx.Response: + captured.append(request) + return httpx.Response(200) + + # First call: raise (bus unavailable). Second call: hand out the + # FakeBus. The worker's retry loop should bridge the gap. + calls = {"n": 0} + + def flaky_get_bus(*args, **kwargs): + calls["n"] += 1 + if calls["n"] == 1: + raise ConnectionError("bus not ready yet") + return fake_bus + + async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client: + with patch("decnet.webhook.worker.get_bus", side_effect=flaky_get_bus): + task = asyncio.create_task( + webhook_worker( + repo, + reload_interval=0.5, + http_client=client, + bus_reconnect_secs=0.3, + ) + ) + # First attempt fails, worker waits bus_reconnect_secs, second + # attempt succeeds. Give it a generous window. + await asyncio.sleep(0.8) + + await fake_bus.publish( + "attacker.observed", {"ip": "1.2.3.4"}, event_type="x" + ) + for _ in range(60): + if captured: + break + await asyncio.sleep(0.05) + + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + assert calls["n"] >= 2, "expected at least one retry after initial failure" + assert len(captured) == 1, "expected delivery once bus came up" +