Before: if the bus was unreachable at worker start, we logged
"running in idle mode" once and parked on shutdown forever. systemd
doesn't guarantee bus is fully up before the webhook worker starts,
so a race on boot left the worker permanently dead until restart.
Now: wrap the whole bus-use in an outer reconnect loop.
while not shutdown:
try: connect()
except: sleep(RECONNECT_SECS) ; continue
try: run_with_bus(...) # heartbeat + dispatch
except: log+close ; reconnect on next iter
Clean consequence: if the bus dies mid-operation the dispatch loop's
subscriptions raise inside the consumer tasks, `_run_with_bus` exits,
the outer loop closes the stale connection and reconnects. No partial
state leaks across epochs — fresh bus, fresh subs, fresh heartbeat.
Interval is 60s by default, overridable via
DECNET_WEBHOOK_BUS_RECONNECT_SECS. Shutdown wakes the wait so
systemctl stop doesn't hang for a minute.
Test added: flaky get_bus that fails once, then returns a live
FakeBus — asserts retry + successful delivery.
get_app_bus() in decnet/bus/app.py already has a 2s backoff retry so
the FastAPI hot path self-heals; this commit brings the standalone
webhook worker in line with the same posture.
313 lines
11 KiB
Python
313 lines
11 KiB
Python
"""Webhook dispatcher — bus consumer → HTTP egress.
|
|
|
|
Spawns one asyncio task per (subscription, pattern) pair. Each task
|
|
subscribes to the bus, iterates matching events, and POSTs them via
|
|
`decnet.webhook.client.deliver`. Reloads on `WEBHOOK_SUBSCRIPTIONS_CHANGED`
|
|
bus signals and as a slow fallback so a dropped signal costs latency,
|
|
not correctness.
|
|
|
|
One-task-per-pair is deliberately dumb: cancellation propagates cleanly,
|
|
and the bus's own trie does the actual pattern matching — no in-memory
|
|
filter logic to maintain. Scales fine up to thousands of subs; if that
|
|
ever breaks down we collapse to one task per distinct pattern and add
|
|
in-memory dispatch.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import contextlib
|
|
import json
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
from decnet.bus.factory import get_bus
|
|
from decnet.bus.publish import run_control_listener, run_health_heartbeat
|
|
from decnet.bus import topics as _topics
|
|
from decnet.logging import get_logger
|
|
from decnet.web.db.repository import BaseRepository
|
|
from decnet.webhook.client import deliver
|
|
|
|
logger = get_logger("webhook_worker")
|
|
|
|
|
|
_RELOAD_FALLBACK_SECS = 60.0
|
|
# Max parallel HTTP egress — one global semaphore keeps the process's
|
|
# outbound footprint bounded regardless of event volume.
|
|
_EGRESS_CONCURRENCY = 10
|
|
# Circuit-breaker trip point. After this many consecutive delivery
|
|
# failures the worker auto-disables the subscription so one dead
|
|
# receiver can't poison the shared egress pool. Override via
|
|
# DECNET_WEBHOOK_CIRCUIT_THRESHOLD. Operator clears the trip by
|
|
# toggling `enabled` back on via PATCH.
|
|
_CIRCUIT_THRESHOLD = max(1, int(os.environ.get("DECNET_WEBHOOK_CIRCUIT_THRESHOLD", "5")))
|
|
# How long to wait between bus (re)connect attempts when the bus is
|
|
# unreachable. Keeps the worker self-healing against a bus that starts
|
|
# after the webhook worker does (systemd race) or crashes+restarts
|
|
# mid-operation. Override via DECNET_WEBHOOK_BUS_RECONNECT_SECS.
|
|
_BUS_RECONNECT_SECS = max(5.0, float(os.environ.get("DECNET_WEBHOOK_BUS_RECONNECT_SECS", "60")))
|
|
|
|
|
|
def _patterns_for(sub: dict[str, Any]) -> list[str]:
|
|
raw = sub.get("topic_patterns") or "[]"
|
|
try:
|
|
return [p for p in json.loads(raw) if isinstance(p, str)]
|
|
except (ValueError, TypeError):
|
|
return []
|
|
|
|
|
|
def _union_patterns(subs: list[dict[str, Any]]) -> list[str]:
|
|
"""Dedup patterns across all subs, preserving first-occurrence order."""
|
|
seen: set[str] = set()
|
|
out: list[str] = []
|
|
for sub in subs:
|
|
for p in _patterns_for(sub):
|
|
if p not in seen:
|
|
seen.add(p)
|
|
out.append(p)
|
|
return out
|
|
|
|
|
|
async def webhook_worker(
|
|
repo: BaseRepository,
|
|
*,
|
|
reload_interval: float = _RELOAD_FALLBACK_SECS,
|
|
http_client: httpx.AsyncClient | None = None,
|
|
bus_reconnect_secs: float = _BUS_RECONNECT_SECS,
|
|
) -> None:
|
|
"""Main entry — connect bus, spawn per-subscription delivery tasks,
|
|
reload on signal. Retries bus connection in a loop so the worker
|
|
self-heals if the bus starts after the worker or restarts mid-run.
|
|
"""
|
|
logger.info("webhook worker started")
|
|
|
|
shutdown = asyncio.Event()
|
|
owns_http = http_client is None
|
|
if owns_http:
|
|
http_client = httpx.AsyncClient(timeout=10.0)
|
|
|
|
try:
|
|
while not shutdown.is_set():
|
|
# Try to connect to the bus. If it's down, wait out the
|
|
# reconnect interval and try again. Shutdown interrupts the
|
|
# wait so systemd stop doesn't hang for a minute.
|
|
bus = None
|
|
try:
|
|
bus = get_bus(client_name="webhook")
|
|
await bus.connect()
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.warning(
|
|
"webhook: bus unavailable, retrying in %.0fs: %s",
|
|
bus_reconnect_secs, exc,
|
|
)
|
|
with contextlib.suppress(asyncio.TimeoutError):
|
|
await asyncio.wait_for(
|
|
shutdown.wait(), timeout=bus_reconnect_secs
|
|
)
|
|
continue
|
|
|
|
# Bus is live — run one dispatch epoch until it fails or we
|
|
# shut down. On any crash the outer loop reconnects and
|
|
# retries from scratch; no state carries across epochs so a
|
|
# half-dead bus can't leave us with stale subscriptions.
|
|
logger.info("webhook: bus connected")
|
|
try:
|
|
await _run_with_bus(
|
|
bus, repo, http_client,
|
|
shutdown, reload_interval,
|
|
)
|
|
except asyncio.CancelledError:
|
|
shutdown.set()
|
|
raise
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.warning(
|
|
"webhook: dispatch crashed, will reconnect: %s", exc
|
|
)
|
|
finally:
|
|
with contextlib.suppress(Exception):
|
|
await bus.close()
|
|
finally:
|
|
if owns_http and http_client is not None:
|
|
await http_client.aclose()
|
|
|
|
|
|
async def _run_with_bus(
|
|
bus,
|
|
repo: BaseRepository,
|
|
http_client: httpx.AsyncClient,
|
|
shutdown: asyncio.Event,
|
|
reload_interval: float,
|
|
) -> None:
|
|
"""Run one bus-up epoch: start heartbeat+control+reload listeners,
|
|
dispatch events until shutdown or error, clean up."""
|
|
reload_flag = asyncio.Event()
|
|
semaphore = asyncio.Semaphore(_EGRESS_CONCURRENCY)
|
|
consumer_tasks: list[asyncio.Task] = []
|
|
|
|
heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "webhook"))
|
|
control_task = asyncio.create_task(
|
|
run_control_listener(bus, "webhook", shutdown)
|
|
)
|
|
reload_task = asyncio.create_task(_reload_listener(bus, reload_flag, shutdown))
|
|
|
|
try:
|
|
while not shutdown.is_set():
|
|
# Cancel prior epoch's consumers before starting new ones.
|
|
await _cancel_all(consumer_tasks)
|
|
consumer_tasks.clear()
|
|
|
|
subs = await repo.list_webhook_subscriptions(enabled_only=True)
|
|
for sub in subs:
|
|
for pattern in _patterns_for(sub):
|
|
consumer_tasks.append(asyncio.create_task(
|
|
_consume(
|
|
bus, pattern, sub, repo, http_client, semaphore, reload_flag,
|
|
)
|
|
))
|
|
|
|
# Wait for reload OR timer fallback. Shutdown propagates via
|
|
# CancelledError when the outer task is cancelled.
|
|
with contextlib.suppress(asyncio.TimeoutError):
|
|
await asyncio.wait_for(
|
|
reload_flag.wait(), timeout=reload_interval
|
|
)
|
|
reload_flag.clear()
|
|
finally:
|
|
await _cancel_all(consumer_tasks)
|
|
for t in (heartbeat_task, control_task, reload_task):
|
|
t.cancel()
|
|
for t in (heartbeat_task, control_task, reload_task):
|
|
with contextlib.suppress(asyncio.CancelledError, Exception):
|
|
await t
|
|
|
|
|
|
async def _cancel_all(tasks: list[asyncio.Task]) -> None:
|
|
for t in tasks:
|
|
if not t.done():
|
|
t.cancel()
|
|
for t in tasks:
|
|
with contextlib.suppress(asyncio.CancelledError, Exception):
|
|
await t
|
|
|
|
|
|
async def _consume(
|
|
bus,
|
|
pattern: str,
|
|
sub: dict[str, Any],
|
|
repo: BaseRepository,
|
|
http_client: httpx.AsyncClient,
|
|
semaphore: asyncio.Semaphore,
|
|
reload_flag: asyncio.Event,
|
|
) -> None:
|
|
"""Subscribe to one pattern and dispatch events to one webhook."""
|
|
try:
|
|
subscription = bus.subscribe(pattern)
|
|
async with subscription:
|
|
async for event in subscription:
|
|
asyncio.create_task(
|
|
_dispatch_one(repo, http_client, semaphore, sub, event, reload_flag)
|
|
)
|
|
except asyncio.CancelledError:
|
|
raise
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"webhook: consumer crashed sub=%s pattern=%s err=%s",
|
|
sub.get("name"), pattern, exc,
|
|
)
|
|
|
|
|
|
async def _dispatch_one(
|
|
repo: BaseRepository,
|
|
http_client: httpx.AsyncClient,
|
|
semaphore: asyncio.Semaphore,
|
|
sub: dict[str, Any],
|
|
event: Any,
|
|
reload_flag: asyncio.Event,
|
|
) -> None:
|
|
async with semaphore:
|
|
try:
|
|
result = await deliver(sub, event, client=http_client)
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.exception(
|
|
"webhook: deliver raised for sub=%s topic=%s: %s",
|
|
sub.get("uuid"), getattr(event, "topic", ""), exc,
|
|
)
|
|
await _safe_record_failure(
|
|
repo, sub["uuid"], f"internal: {exc}", sub.get("name", ""), reload_flag,
|
|
)
|
|
return
|
|
|
|
now = datetime.now(timezone.utc)
|
|
if result.ok:
|
|
await _safe_record_success(repo, sub["uuid"], now)
|
|
else:
|
|
logger.warning(
|
|
"webhook: delivery failed sub=%s topic=%s status=%s err=%s",
|
|
sub.get("name"), getattr(event, "topic", ""),
|
|
result.status_code, result.error,
|
|
)
|
|
await _safe_record_failure(
|
|
repo, sub["uuid"], result.error or "unknown", sub.get("name", ""), reload_flag,
|
|
)
|
|
|
|
|
|
async def _safe_record_success(
|
|
repo: BaseRepository, uuid: str, ts: datetime
|
|
) -> None:
|
|
try:
|
|
await repo.record_webhook_success(uuid, ts)
|
|
except Exception as exc:
|
|
logger.warning("webhook: record_success failed: %s", exc)
|
|
|
|
|
|
async def _safe_record_failure(
|
|
repo: BaseRepository,
|
|
uuid: str,
|
|
error: str,
|
|
sub_name: str = "",
|
|
reload_flag: asyncio.Event | None = None,
|
|
) -> None:
|
|
try:
|
|
now = datetime.now(timezone.utc)
|
|
new_count = await repo.record_webhook_failure(uuid, now, error)
|
|
except Exception as exc:
|
|
logger.warning("webhook: record_failure failed: %s", exc)
|
|
return
|
|
|
|
# Circuit breaker — trip after threshold. Set the reload flag so the
|
|
# outer loop re-queries the DB and stops consuming events for the
|
|
# now-disabled sub. Idempotent: tripping an already-tripped sub just
|
|
# re-stamps auto_disabled_at.
|
|
if new_count >= _CIRCUIT_THRESHOLD:
|
|
try:
|
|
await repo.trip_webhook_circuit(uuid, now)
|
|
logger.warning(
|
|
"webhook: circuit tripped sub=%s uuid=%s failures=%d threshold=%d",
|
|
sub_name or "<unknown>", uuid, new_count, _CIRCUIT_THRESHOLD,
|
|
)
|
|
if reload_flag is not None:
|
|
reload_flag.set()
|
|
except Exception as exc:
|
|
logger.warning("webhook: trip_circuit failed: %s", exc)
|
|
|
|
|
|
async def _reload_listener(
|
|
bus, reload_flag: asyncio.Event, shutdown: asyncio.Event
|
|
) -> None:
|
|
"""Set `reload_flag` on every WEBHOOK_SUBSCRIPTIONS_CHANGED signal."""
|
|
try:
|
|
sub = bus.subscribe(_topics.WEBHOOK_SUBSCRIPTIONS_CHANGED)
|
|
async with sub:
|
|
async for _event in sub:
|
|
if shutdown.is_set():
|
|
return
|
|
reload_flag.set()
|
|
except asyncio.CancelledError:
|
|
raise
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"webhook: reload listener crashed, fallback timer only: %s", exc
|
|
)
|