diff --git a/decnet/web/db/models/webhooks.py b/decnet/web/db/models/webhooks.py index a3a7a1cc..e0e19177 100644 --- a/decnet/web/db/models/webhooks.py +++ b/decnet/web/db/models/webhooks.py @@ -41,6 +41,13 @@ class WebhookSubscription(SQLModel, table=True): last_success_at: Optional[datetime] = None last_failure_at: Optional[datetime] = None last_error: Optional[str] = None + # Set when the circuit breaker auto-disables the subscription after + # too many consecutive failures. NULL means "not tripped" — the + # subscription is either active (enabled=True) or admin-paused + # (enabled=False, auto_disabled_at=NULL). A non-NULL stamp with + # enabled=False means the worker tripped it; the operator clears + # the flag by re-enabling via PATCH. + auto_disabled_at: Optional[datetime] = None created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) @@ -100,6 +107,7 @@ class WebhookResponse(BaseModel): last_success_at: Optional[datetime] = None last_failure_at: Optional[datetime] = None last_error: Optional[str] = None + auto_disabled_at: Optional[datetime] = None created_at: datetime updated_at: datetime warnings: List[str] = PydanticField(default_factory=list) diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index e13ea272..6b37876c 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -466,5 +466,12 @@ class BaseRepository(ABC): async def record_webhook_failure( self, uuid: str, ts: Any, error: str - ) -> None: + ) -> int: + """Record a failed delivery; return the new ``consecutive_failures`` + count so the caller can decide whether to trip the circuit.""" + raise NotImplementedError + + async def trip_webhook_circuit(self, uuid: str, ts: Any) -> None: + """Auto-disable a subscription after repeated failures. Sets + ``enabled=False`` and stamps ``auto_disabled_at``.""" raise NotImplementedError diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index 0486fffd..c2047938 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -1829,26 +1829,41 @@ class SQLModelRepository(BaseRepository): async def record_webhook_failure( self, uuid: str, ts: datetime, error: str - ) -> None: + ) -> int: async with self._session() as session: # Read current failure count, bump, write. Small race window on # concurrent deliveries to the same subscription is acceptable — - # the counter informs the circuit-breaker heuristic (DEBT-037), - # not a correctness invariant. + # the counter informs the circuit-breaker heuristic, not a + # correctness invariant. result = await session.execute( select(WebhookSubscription.consecutive_failures).where( WebhookSubscription.uuid == uuid ) ) current = result.scalar_one_or_none() or 0 + new_count = current + 1 await session.execute( update(WebhookSubscription) .where(WebhookSubscription.uuid == uuid) .values( - consecutive_failures=current + 1, + consecutive_failures=new_count, last_failure_at=ts, last_error=error[:512] if error else None, updated_at=ts, ) ) await session.commit() + return new_count + + async def trip_webhook_circuit(self, uuid: str, ts: datetime) -> None: + async with self._session() as session: + await session.execute( + update(WebhookSubscription) + .where(WebhookSubscription.uuid == uuid) + .values( + enabled=False, + auto_disabled_at=ts, + updated_at=ts, + ) + ) + await session.commit() diff --git a/decnet/web/router/webhooks/api_manage_webhooks.py b/decnet/web/router/webhooks/api_manage_webhooks.py index 9c5f38af..0c263cca 100644 --- a/decnet/web/router/webhooks/api_manage_webhooks.py +++ b/decnet/web/router/webhooks/api_manage_webhooks.py @@ -173,6 +173,15 @@ async def api_update_webhook( if req.enabled is not None: patch["enabled"] = req.enabled + # Re-enabling after a circuit trip clears the trip stamp and + # zeros the failure count — the operator has acknowledged and + # is ready to resume delivery. Admin-paused → re-enabled also + # hits this path harmlessly (auto_disabled_at is already NULL + # and consecutive_failures is already 0). + if req.enabled is True and not current.get("enabled"): + patch["auto_disabled_at"] = None + patch["consecutive_failures"] = 0 + patch["last_error"] = None if req.simple_events is not None or req.topic_patterns is not None: # Re-merge using whatever the caller supplied; a caller that wants diff --git a/decnet/webhook/worker.py b/decnet/webhook/worker.py index fcdf94eb..b24253a0 100644 --- a/decnet/webhook/worker.py +++ b/decnet/webhook/worker.py @@ -17,6 +17,7 @@ from __future__ import annotations import asyncio import contextlib import json +import os from datetime import datetime, timezone from typing import Any @@ -36,6 +37,12 @@ _RELOAD_FALLBACK_SECS = 60.0 # Max parallel HTTP egress — one global semaphore keeps the process's # outbound footprint bounded regardless of event volume. _EGRESS_CONCURRENCY = 10 +# Circuit-breaker trip point. After this many consecutive delivery +# failures the worker auto-disables the subscription so one dead +# receiver can't poison the shared egress pool. Override via +# DECNET_WEBHOOK_CIRCUIT_THRESHOLD. Operator clears the trip by +# toggling `enabled` back on via PATCH. +_CIRCUIT_THRESHOLD = max(1, int(os.environ.get("DECNET_WEBHOOK_CIRCUIT_THRESHOLD", "5"))) def _patterns_for(sub: dict[str, Any]) -> list[str]: @@ -112,7 +119,7 @@ async def webhook_worker( for pattern in _patterns_for(sub): consumer_tasks.append(asyncio.create_task( _consume( - bus, pattern, sub, repo, http_client, semaphore + bus, pattern, sub, repo, http_client, semaphore, reload_flag, ) )) @@ -159,6 +166,7 @@ async def _consume( repo: BaseRepository, http_client: httpx.AsyncClient, semaphore: asyncio.Semaphore, + reload_flag: asyncio.Event, ) -> None: """Subscribe to one pattern and dispatch events to one webhook.""" try: @@ -166,7 +174,7 @@ async def _consume( async with subscription: async for event in subscription: asyncio.create_task( - _dispatch_one(repo, http_client, semaphore, sub, event) + _dispatch_one(repo, http_client, semaphore, sub, event, reload_flag) ) except asyncio.CancelledError: raise @@ -183,6 +191,7 @@ async def _dispatch_one( semaphore: asyncio.Semaphore, sub: dict[str, Any], event: Any, + reload_flag: asyncio.Event, ) -> None: async with semaphore: try: @@ -192,7 +201,9 @@ async def _dispatch_one( "webhook: deliver raised for sub=%s topic=%s: %s", sub.get("uuid"), getattr(event, "topic", ""), exc, ) - await _safe_record_failure(repo, sub["uuid"], f"internal: {exc}") + await _safe_record_failure( + repo, sub["uuid"], f"internal: {exc}", sub.get("name", ""), reload_flag, + ) return now = datetime.now(timezone.utc) @@ -205,7 +216,7 @@ async def _dispatch_one( result.status_code, result.error, ) await _safe_record_failure( - repo, sub["uuid"], result.error or "unknown" + repo, sub["uuid"], result.error or "unknown", sub.get("name", ""), reload_flag, ) @@ -219,14 +230,34 @@ async def _safe_record_success( async def _safe_record_failure( - repo: BaseRepository, uuid: str, error: str + repo: BaseRepository, + uuid: str, + error: str, + sub_name: str = "", + reload_flag: asyncio.Event | None = None, ) -> None: try: - await repo.record_webhook_failure( - uuid, datetime.now(timezone.utc), error - ) + now = datetime.now(timezone.utc) + new_count = await repo.record_webhook_failure(uuid, now, error) except Exception as exc: logger.warning("webhook: record_failure failed: %s", exc) + return + + # Circuit breaker — trip after threshold. Set the reload flag so the + # outer loop re-queries the DB and stops consuming events for the + # now-disabled sub. Idempotent: tripping an already-tripped sub just + # re-stamps auto_disabled_at. + if new_count >= _CIRCUIT_THRESHOLD: + try: + await repo.trip_webhook_circuit(uuid, now) + logger.warning( + "webhook: circuit tripped sub=%s uuid=%s failures=%d threshold=%d", + sub_name or "", uuid, new_count, _CIRCUIT_THRESHOLD, + ) + if reload_flag is not None: + reload_flag.set() + except Exception as exc: + logger.warning("webhook: trip_circuit failed: %s", exc) async def _reload_listener( diff --git a/decnet_web/src/components/Webhooks.tsx b/decnet_web/src/components/Webhooks.tsx index 552aafd9..29fd8e92 100644 --- a/decnet_web/src/components/Webhooks.tsx +++ b/decnet_web/src/components/Webhooks.tsx @@ -29,6 +29,7 @@ interface WebhookRow { last_success_at: string | null; last_failure_at: string | null; last_error: string | null; + auto_disabled_at: string | null; created_at: string; updated_at: string; warnings: string[]; @@ -116,6 +117,10 @@ const Webhooks: React.FC = () => { () => webhooks.filter((w) => w.consecutive_failures > 0).length, [webhooks], ); + const trippedCount = useMemo( + () => webhooks.filter((w) => w.auto_disabled_at).length, + [webhooks], + ); const fetchWebhooks = async () => { try { @@ -290,6 +295,7 @@ const Webhooks: React.FC = () => {

WEBHOOKS

{webhooks.length} CONFIGURED · {enabledCount} ENABLED + {trippedCount > 0 && ` · ${trippedCount} TRIPPED`} {failCount > 0 && ` · ${failCount} FAILING`} {insecureCount > 0 && ` · ${insecureCount} INSECURE`} @@ -424,6 +430,14 @@ const Webhooks: React.FC = () => { {w.enabled ? 'ENABLED' : 'DISABLED'} + {w.auto_disabled_at && ( + + TRIPPED · {formatDate(w.auto_disabled_at)} + + )} {w.consecutive_failures > 0 && ( FAIL · {w.consecutive_failures} diff --git a/development/DEBT.md b/development/DEBT.md index 89c6d32e..6795da64 100644 --- a/development/DEBT.md +++ b/development/DEBT.md @@ -352,7 +352,7 @@ The webhook worker (Wazuh / Shuffle / TheHive / n8n integration path) ships MVP- What MVP deliberately defers: -1. **Circuit breaker.** After N consecutive 5xx / timeout / connection refused errors, auto-disable the subscription and require admin re-enable. Without this, a half-working SOAR endpoint can pin the webhook worker's connection pool and starve healthy destinations. Fast follow-up — the state (consecutive_failures, last_failure_at) is small and fits on the subscription row. +1. ~~**Circuit breaker.**~~ ✅ **Shipped 2026-04-24.** After `DECNET_WEBHOOK_CIRCUIT_THRESHOLD` (default 5) consecutive failures the worker calls `trip_webhook_circuit(uuid, ts)` — flips `enabled=False`, stamps `auto_disabled_at`, fires a reload. Operator clears the trip by re-enabling via PATCH, which zeros the counter and clears the stamp. UI surfaces `TRIPPED · ` chip on the row; page header shows a `N TRIPPED` count. 2. **Dead-letter table.** Events that exhaust retries are dropped with a log line, not persisted. Operators can't replay a missed event after they fix their Shuffle flow. Minimum viable: `webhook_dead_letters(subscription_id, topic, payload_json, final_error, dropped_at)` with a TTL sweep, and `POST /webhooks/{id}/replay?since=...` to re-queue. 3. **Delivery audit log.** No persisted record of "what went where and when." Useful for compliance and for debugging "why didn't TheHive see that alert." Same table shape as dead-letter but success-path entries with retention knob. 4. **Batch delivery / coalescing.** Every event fires one HTTP POST. High-volume topics (`system.log` on a busy master) will happily saturate the egress. Post-MVP, add a bounded batch window (e.g. up to 50 events or 500 ms) and POST an envelope `{events: [...]}`. diff --git a/development/THREAT_MODEL.md b/development/THREAT_MODEL.md index 0783e1f5..60cb7814 100644 --- a/development/THREAT_MODEL.md +++ b/development/THREAT_MODEL.md @@ -389,7 +389,7 @@ the receiver (Shuffle→Slack, TheHive→Cortex, …). | I | Secret leaks via API GET/LIST response | M | `WebhookResponse` deliberately omits the `secret` field. `WebhookCreateResponse` carries the secret exactly once on create for copy-out. PATCH-to-rotate, no read-back. | | I | Webhook URL + secret leak via DB dump | A | Plaintext at-rest on SQLite/MySQL. Same trust assumption as the JWT secret (which is env-sourced, not DB-stored). See WH-01 and DEBT-037 §7. | | I | Attacker-controlled event content reaches receiver | T | Event payloads pass through DECNET untransformed — the receiver must sanitize before rendering (e.g. XSS if Shuffle pipes to a browser-facing Slack block without escaping). Out of scope for the DECNET side. Document in operator docs. | -| D | Slow / unreachable receiver ties up egress | M / A | Bounded concurrency (`Semaphore(10)`), per-delivery timeout (10s), and bounded retry (3 attempts, `[1,2,4]` × jitter) keep one slow destination from starving others. Half-dead receivers still waste retry budget — see WH-02. Circuit breaker deferred to DEBT-037 §1. | +| D | Slow / unreachable receiver ties up egress | M | Bounded concurrency (`Semaphore(10)`), per-delivery timeout (10s), and bounded retry (3 attempts, `[1,2,4]` × jitter) plus a circuit breaker: after `DECNET_WEBHOOK_CIRCUIT_THRESHOLD` (default 5) consecutive failures the worker auto-disables the subscription (`enabled=False`, `auto_disabled_at=`), publishes a reload signal, and stops consuming events for that sub. Operator re-enables via PATCH which clears the trip stamp and zeros the counter. See `decnet/webhook/worker.py::_safe_record_failure` + `sqlmodel_repo.py::trip_webhook_circuit`. | | D | Huge payload floods receiver | A | Payload shape is whatever the bus event carries; no per-destination batching / coalescing. On high-volume topics this is a known concern — see DEBT-037 §4 for post-MVP batch delivery. | | E | Viewer role manipulates webhook config | M | All CRUD routes under `/api/v1/webhooks` are `Depends(require_admin)`. Verified by `tests/api/test_rbac_contract.py` (every admin-classified route asserts viewer → 403). | | E | Admin adds a URL pointing at an internal-only DECNET service (SSRF-style) | A | Admin role is trusted; protecting admin from self-inflicted SSRF is out of scope under the current trust model. Revisit if we ever delegate subscription CRUD to a less-trusted role. | @@ -399,7 +399,7 @@ the receiver (Shuffle→Slack, TheHive→Cortex, …). | ID | Threat | Why accepted | Revisit when | |----|--------|--------------|--------------| | WH-01 | Webhook secret + URL stored plaintext in the DB | Matches the existing pre-v1 posture (JWT secret is env-sourced; there's no operator expectation that DB-at-rest is encrypted). Encrypting one column in isolation invents a KEK lifecycle we don't have. | Comprehensive DB-at-rest encryption lands, OR regulated-industry customer engagement. Tracked in DEBT-037 §7. | -| WH-02 | Half-dead receiver wastes the full retry budget (1+2+4 ≈ 7s with jitter) per delivery before the worker gives up | Admin role is trusted; this is operator-observable via `consecutive_failures` on the subscription row. A sticky-failure receiver disabled itself via operator action is fine pre-v1. | Circuit breaker lands (DEBT-037 §1) — auto-disable after N consecutive failures, require admin re-enable. | +| ~~WH-02~~ | ~~Half-dead receiver wastes the full retry budget (1+2+4 ≈ 7s with jitter) per delivery before the worker gives up~~ | ~~Admin role is trusted…~~ | **Closed 2026-04-24 — circuit breaker shipped. Tripped sub is isolated after N failures; operator clears via PATCH. See D row above.** | | WH-03 | Admin configures an `http://` webhook URL; event body (incl. payload fields) travels plaintext on the wire | Operator-trust posture (same rationale as DA-06: protecting admin from self is out of scope). HMAC signature still detects tampering regardless of transport — only *read* confidentiality is lost. The API surfaces a non-blocking warning in `WebhookResponse.warnings` so the operator is informed on every GET/CREATE, and test/dev environments without TLS remain usable. | Multi-admin delegation lands, OR a regulated-industry customer engagement, OR an operator ticket asks for a `DECNET_WEBHOOK_REQUIRE_HTTPS=true` enforcement knob. | ### Needs-verification checklist (DECNET↔Webhook) @@ -447,3 +447,4 @@ In priority order: | 2026-04-24 | F6/I and F6/D both moved from **?** to **M**. F6/I: documented the viewer-safe-by-construction invariant for both SSE streams (every emitted event type wraps data already viewer-readable via REST). F6/D: added `decnet/web/sse_limits.py::sse_connection_slot` — per-user counter + async lock + 429 on overflow, wired into both SSE generators. `DECNET_SSE_MAX_PER_USER` env knob, default 5. | ANTI | | 2026-04-24 | Component 2 added — DECNET↔External webhook destination. Covers the new `decnet webhook` worker + `/api/v1/webhooks` admin CRUD. HMAC-SHA256 signing, 4xx no-retry + 5xx/429 retry with jittered backoff, admin-only CRUD, secret never leaks post-create. Two accepted risks registered (WH-01 secret at rest, WH-02 half-dead-receiver retry waste) paired with DEBT-037 pointers. | ANTI | | 2026-04-24 | WH-03 accepted risk added — `http://` webhook URLs are allowed (operator-trust posture) but surface an `insecure_url` advisory in `WebhookResponse.warnings`. Checklist item "reject http://" resolved as "warn, not reject" per explicit operator decision. | ANTI | +| 2026-04-24 | WH-02 closed — circuit breaker shipped. After `DECNET_WEBHOOK_CIRCUIT_THRESHOLD` (default 5) consecutive failures, the worker auto-disables the subscription via `trip_webhook_circuit`, stamps `auto_disabled_at`, and fires a reload so no further events are consumed for it. Operator re-enables via PATCH which clears the trip. Closes DEBT-037 §1. | ANTI | diff --git a/tests/api/webhooks/test_crud.py b/tests/api/webhooks/test_crud.py index e643c8c7..d0ad89eb 100644 --- a/tests/api/webhooks/test_crud.py +++ b/tests/api/webhooks/test_crud.py @@ -210,6 +210,58 @@ async def test_https_url_has_no_warning( assert res.json()["warnings"] == [] +@pytest.mark.asyncio +async def test_reenabling_clears_circuit_trip( + client: httpx.AsyncClient, auth_token: str +): + """Re-enabling via PATCH clears auto_disabled_at + consecutive_failures. + + Simulates the full circuit-breaker lifecycle: create → tripped (via + direct DB write, since we can't easily force N worker failures in an + API-only test) → re-enable via PATCH → verify state cleared. + """ + from datetime import datetime, timezone + from decnet.web.dependencies import repo + + create = await client.post( + PATH, + json={ + "name": "wh-trip", + "url": "https://example.com/x", + "topic_patterns": ["system.>"], + }, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert create.status_code == 201 + uuid = create.json()["uuid"] + + # Simulate the circuit tripping — direct repo call. + now = datetime.now(timezone.utc) + await repo.record_webhook_failure(uuid, now, "503 service unavailable") + await repo.record_webhook_failure(uuid, now, "503 service unavailable") + await repo.trip_webhook_circuit(uuid, now) + + pre = await client.get( + f"{PATH}{uuid}", headers={"Authorization": f"Bearer {auth_token}"} + ) + assert pre.json()["enabled"] is False + assert pre.json()["auto_disabled_at"] is not None + assert pre.json()["consecutive_failures"] >= 1 + + # Re-enable via PATCH — should clear trip + counter + last_error. + res = await client.patch( + f"{PATH}{uuid}", + json={"enabled": True}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert res.status_code == 200 + body = res.json() + assert body["enabled"] is True + assert body["auto_disabled_at"] is None + assert body["consecutive_failures"] == 0 + assert body["last_error"] is None + + @pytest.mark.asyncio async def test_viewer_forbidden(client: httpx.AsyncClient, viewer_token: str): res = await client.get( diff --git a/tests/webhook/test_worker.py b/tests/webhook/test_worker.py index 35c1e924..85c03291 100644 --- a/tests/webhook/test_worker.py +++ b/tests/webhook/test_worker.py @@ -48,15 +48,29 @@ class _FakeRepo: self.subs = subs self.success_calls: list[str] = [] self.failure_calls: list[tuple[str, str]] = [] + self.trip_calls: list[str] = [] + self._failure_counts: dict[str, int] = {} async def list_webhook_subscriptions(self, enabled_only: bool = False) -> list[dict[str, Any]]: return [s for s in self.subs if s["enabled"]] if enabled_only else list(self.subs) async def record_webhook_success(self, uuid: str, ts: datetime) -> None: self.success_calls.append(uuid) + self._failure_counts[uuid] = 0 - async def record_webhook_failure(self, uuid: str, ts: datetime, error: str) -> None: + async def record_webhook_failure(self, uuid: str, ts: datetime, error: str) -> int: self.failure_calls.append((uuid, error)) + self._failure_counts[uuid] = self._failure_counts.get(uuid, 0) + 1 + return self._failure_counts[uuid] + + async def trip_webhook_circuit(self, uuid: str, ts: datetime) -> None: + self.trip_calls.append(uuid) + # Mirror the real DB effect: flip enabled=False so next reload + # skips this sub. + for s in self.subs: + if s["uuid"] == uuid: + s["enabled"] = False + s["auto_disabled_at"] = ts def test_patterns_for_decodes_json(): @@ -231,6 +245,51 @@ async def test_worker_reloads_on_subscriptions_changed_signal(fake_bus): except asyncio.CancelledError: pass + # The new sub (u2) should have received the system.log event. assert len(captured) == 1 assert "system.log" in captured[0].headers.get("X-DECNET-Event-Topic", "") + + +@pytest.mark.asyncio +async def test_worker_trips_circuit_after_threshold(fake_bus, monkeypatch): + """After N consecutive failures the worker auto-disables the sub.""" + sub = _sub("u1", "w1", ["attacker.>"]) + repo = _FakeRepo([sub]) + + # Tight threshold + zero-delay retry so the test finishes fast. + monkeypatch.setattr("decnet.webhook.worker._CIRCUIT_THRESHOLD", 2) + monkeypatch.setattr( + "decnet.webhook.client._DEFAULT_RETRY_SCHEDULE", (0.0, 0.0, 0.0) + ) + + async def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(503) + + async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client: + with patch("decnet.webhook.worker.get_bus", return_value=fake_bus): + task = asyncio.create_task( + webhook_worker(repo, reload_interval=0.5, http_client=client) + ) + await asyncio.sleep(0.2) + # Publish two events — each fails N retries, each increments + # consecutive_failures by 1. Second trip should fire. + await fake_bus.publish("attacker.observed", {}, event_type="x") + await fake_bus.publish("attacker.observed", {}, event_type="x") + for _ in range(120): + if repo.trip_calls: + break + await asyncio.sleep(0.05) + + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + assert repo.trip_calls, "expected circuit to trip after threshold" + assert repo.trip_calls[0] == "u1" + # The sub was flipped to enabled=False by trip_webhook_circuit. + assert sub["enabled"] is False + assert sub["auto_disabled_at"] is not None +