fix(webhook/worker): self-heal when bus starts late or restarts
Before: if the bus was unreachable at worker start, we logged
"running in idle mode" once and parked on shutdown forever. systemd
doesn't guarantee bus is fully up before the webhook worker starts,
so a race on boot left the worker permanently dead until restart.
Now: wrap the whole bus-use in an outer reconnect loop.
while not shutdown:
try: connect()
except: sleep(RECONNECT_SECS) ; continue
try: run_with_bus(...) # heartbeat + dispatch
except: log+close ; reconnect on next iter
Clean consequence: if the bus dies mid-operation the dispatch loop's
subscriptions raise inside the consumer tasks, `_run_with_bus` exits,
the outer loop closes the stale connection and reconnects. No partial
state leaks across epochs — fresh bus, fresh subs, fresh heartbeat.
Interval is 60s by default, overridable via
DECNET_WEBHOOK_BUS_RECONNECT_SECS. Shutdown wakes the wait so
systemctl stop doesn't hang for a minute.
Test added: flaky get_bus that fails once, then returns a live
FakeBus — asserts retry + successful delivery.
get_app_bus() in decnet/bus/app.py already has a 2s backoff retry so
the FastAPI hot path self-heals; this commit brings the standalone
webhook worker in line with the same posture.
This commit is contained in:
@@ -293,3 +293,59 @@ async def test_worker_trips_circuit_after_threshold(fake_bus, monkeypatch):
|
||||
assert sub["enabled"] is False
|
||||
assert sub["auto_disabled_at"] is not None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_worker_self_heals_when_bus_starts_late(fake_bus):
|
||||
"""Bus down at startup → worker parks in a retry loop. Once the bus
|
||||
comes up on the next attempt, the worker transitions to dispatch
|
||||
mode and delivers events normally.
|
||||
"""
|
||||
sub = _sub("u1", "w1", ["attacker.>"])
|
||||
repo = _FakeRepo([sub])
|
||||
captured: list[httpx.Request] = []
|
||||
|
||||
async def handler(request: httpx.Request) -> httpx.Response:
|
||||
captured.append(request)
|
||||
return httpx.Response(200)
|
||||
|
||||
# First call: raise (bus unavailable). Second call: hand out the
|
||||
# FakeBus. The worker's retry loop should bridge the gap.
|
||||
calls = {"n": 0}
|
||||
|
||||
def flaky_get_bus(*args, **kwargs):
|
||||
calls["n"] += 1
|
||||
if calls["n"] == 1:
|
||||
raise ConnectionError("bus not ready yet")
|
||||
return fake_bus
|
||||
|
||||
async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client:
|
||||
with patch("decnet.webhook.worker.get_bus", side_effect=flaky_get_bus):
|
||||
task = asyncio.create_task(
|
||||
webhook_worker(
|
||||
repo,
|
||||
reload_interval=0.5,
|
||||
http_client=client,
|
||||
bus_reconnect_secs=0.3,
|
||||
)
|
||||
)
|
||||
# First attempt fails, worker waits bus_reconnect_secs, second
|
||||
# attempt succeeds. Give it a generous window.
|
||||
await asyncio.sleep(0.8)
|
||||
|
||||
await fake_bus.publish(
|
||||
"attacker.observed", {"ip": "1.2.3.4"}, event_type="x"
|
||||
)
|
||||
for _ in range(60):
|
||||
if captured:
|
||||
break
|
||||
await asyncio.sleep(0.05)
|
||||
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
assert calls["n"] >= 2, "expected at least one retry after initial failure"
|
||||
assert len(captured) == 1, "expected delivery once bus came up"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user