feat(webhooks): circuit breaker auto-disables misbehaving subscriptions
After DECNET_WEBHOOK_CIRCUIT_THRESHOLD (default 5) consecutive failed
deliveries, the worker calls trip_webhook_circuit(uuid, ts) which
flips enabled=False and stamps auto_disabled_at. The worker sets its
reload flag so the next dispatch epoch stops consuming events for the
tripped sub entirely — one dead receiver can't poison the shared
egress pool anymore.
Operator clears the trip via PATCH — setting enabled=True when the
sub was previously disabled clears auto_disabled_at, zeros
consecutive_failures, and clears last_error. Admin-pause → re-enable
hits the same path harmlessly.
Three observable states now distinguishable in the UI:
- Active enabled=True, auto_disabled_at=NULL
- Admin-paused enabled=False, auto_disabled_at=NULL
- Tripped enabled=False, auto_disabled_at=<ts>
UI surfaces a TRIPPED · <ts> chip on the row (red, alert-styled) and
a "N TRIPPED" count in the page header. Hover tooltip tells the
operator how to reset ("Re-enable via Edit").
record_webhook_failure now returns the new consecutive_failures count
so the worker can compare against the threshold without a second
roundtrip. trip_webhook_circuit is idempotent — re-tripping just
re-stamps auto_disabled_at.
Closes THREAT_MODEL WH-02 and DEBT-037 §1.
This commit is contained in:
@@ -17,6 +17,7 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
import contextlib
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
@@ -36,6 +37,12 @@ _RELOAD_FALLBACK_SECS = 60.0
|
||||
# Max parallel HTTP egress — one global semaphore keeps the process's
|
||||
# outbound footprint bounded regardless of event volume.
|
||||
_EGRESS_CONCURRENCY = 10
|
||||
# Circuit-breaker trip point. After this many consecutive delivery
|
||||
# failures the worker auto-disables the subscription so one dead
|
||||
# receiver can't poison the shared egress pool. Override via
|
||||
# DECNET_WEBHOOK_CIRCUIT_THRESHOLD. Operator clears the trip by
|
||||
# toggling `enabled` back on via PATCH.
|
||||
_CIRCUIT_THRESHOLD = max(1, int(os.environ.get("DECNET_WEBHOOK_CIRCUIT_THRESHOLD", "5")))
|
||||
|
||||
|
||||
def _patterns_for(sub: dict[str, Any]) -> list[str]:
|
||||
@@ -112,7 +119,7 @@ async def webhook_worker(
|
||||
for pattern in _patterns_for(sub):
|
||||
consumer_tasks.append(asyncio.create_task(
|
||||
_consume(
|
||||
bus, pattern, sub, repo, http_client, semaphore
|
||||
bus, pattern, sub, repo, http_client, semaphore, reload_flag,
|
||||
)
|
||||
))
|
||||
|
||||
@@ -159,6 +166,7 @@ async def _consume(
|
||||
repo: BaseRepository,
|
||||
http_client: httpx.AsyncClient,
|
||||
semaphore: asyncio.Semaphore,
|
||||
reload_flag: asyncio.Event,
|
||||
) -> None:
|
||||
"""Subscribe to one pattern and dispatch events to one webhook."""
|
||||
try:
|
||||
@@ -166,7 +174,7 @@ async def _consume(
|
||||
async with subscription:
|
||||
async for event in subscription:
|
||||
asyncio.create_task(
|
||||
_dispatch_one(repo, http_client, semaphore, sub, event)
|
||||
_dispatch_one(repo, http_client, semaphore, sub, event, reload_flag)
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
@@ -183,6 +191,7 @@ async def _dispatch_one(
|
||||
semaphore: asyncio.Semaphore,
|
||||
sub: dict[str, Any],
|
||||
event: Any,
|
||||
reload_flag: asyncio.Event,
|
||||
) -> None:
|
||||
async with semaphore:
|
||||
try:
|
||||
@@ -192,7 +201,9 @@ async def _dispatch_one(
|
||||
"webhook: deliver raised for sub=%s topic=%s: %s",
|
||||
sub.get("uuid"), getattr(event, "topic", ""), exc,
|
||||
)
|
||||
await _safe_record_failure(repo, sub["uuid"], f"internal: {exc}")
|
||||
await _safe_record_failure(
|
||||
repo, sub["uuid"], f"internal: {exc}", sub.get("name", ""), reload_flag,
|
||||
)
|
||||
return
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
@@ -205,7 +216,7 @@ async def _dispatch_one(
|
||||
result.status_code, result.error,
|
||||
)
|
||||
await _safe_record_failure(
|
||||
repo, sub["uuid"], result.error or "unknown"
|
||||
repo, sub["uuid"], result.error or "unknown", sub.get("name", ""), reload_flag,
|
||||
)
|
||||
|
||||
|
||||
@@ -219,14 +230,34 @@ async def _safe_record_success(
|
||||
|
||||
|
||||
async def _safe_record_failure(
|
||||
repo: BaseRepository, uuid: str, error: str
|
||||
repo: BaseRepository,
|
||||
uuid: str,
|
||||
error: str,
|
||||
sub_name: str = "",
|
||||
reload_flag: asyncio.Event | None = None,
|
||||
) -> None:
|
||||
try:
|
||||
await repo.record_webhook_failure(
|
||||
uuid, datetime.now(timezone.utc), error
|
||||
)
|
||||
now = datetime.now(timezone.utc)
|
||||
new_count = await repo.record_webhook_failure(uuid, now, error)
|
||||
except Exception as exc:
|
||||
logger.warning("webhook: record_failure failed: %s", exc)
|
||||
return
|
||||
|
||||
# Circuit breaker — trip after threshold. Set the reload flag so the
|
||||
# outer loop re-queries the DB and stops consuming events for the
|
||||
# now-disabled sub. Idempotent: tripping an already-tripped sub just
|
||||
# re-stamps auto_disabled_at.
|
||||
if new_count >= _CIRCUIT_THRESHOLD:
|
||||
try:
|
||||
await repo.trip_webhook_circuit(uuid, now)
|
||||
logger.warning(
|
||||
"webhook: circuit tripped sub=%s uuid=%s failures=%d threshold=%d",
|
||||
sub_name or "<unknown>", uuid, new_count, _CIRCUIT_THRESHOLD,
|
||||
)
|
||||
if reload_flag is not None:
|
||||
reload_flag.set()
|
||||
except Exception as exc:
|
||||
logger.warning("webhook: trip_circuit failed: %s", exc)
|
||||
|
||||
|
||||
async def _reload_listener(
|
||||
|
||||
Reference in New Issue
Block a user