diff --git a/decnet/bus/topics.py b/decnet/bus/topics.py index 5f6900b7..9c4566af 100644 --- a/decnet/bus/topics.py +++ b/decnet/bus/topics.py @@ -93,6 +93,12 @@ SYSTEM_CONTROL = "control" WORKER_CONTROL_STOP = "stop" WORKER_CONTROL_START = "start" +# Webhook subscription-set changed — published by the CRUD router after any +# create / update / delete on WebhookSubscription so the webhook worker can +# reload its in-memory subscription list and re-subscribe to the new union +# of patterns. Payload is currently empty; consumers only need the signal. +WEBHOOK_SUBSCRIPTIONS_CHANGED = "system.webhook.subscriptions_changed" + # ─── Builders ──────────────────────────────────────────────────────────────── diff --git a/decnet/web/db/models/__init__.py b/decnet/web/db/models/__init__.py index 88e8e6c8..63d04a09 100644 --- a/decnet/web/db/models/__init__.py +++ b/decnet/web/db/models/__init__.py @@ -112,6 +112,15 @@ from .updater import ( RollbackRequest, RollbackResponse, ) +from .webhooks import ( + SimpleEvent, + WebhookCreateRequest, + WebhookCreateResponse, + WebhookResponse, + WebhookSubscription, + WebhookTestResponse, + WebhookUpdateRequest, +) from .workers import ( StartAllResponse, StartFailure, @@ -218,6 +227,14 @@ __all__ = [ "PushUpdateResult", "RollbackRequest", "RollbackResponse", + # webhooks + "SimpleEvent", + "WebhookCreateRequest", + "WebhookCreateResponse", + "WebhookResponse", + "WebhookSubscription", + "WebhookTestResponse", + "WebhookUpdateRequest", # workers "StartAllResponse", "StartFailure", diff --git a/decnet/web/db/models/webhooks.py b/decnet/web/db/models/webhooks.py new file mode 100644 index 00000000..49480ef5 --- /dev/null +++ b/decnet/web/db/models/webhooks.py @@ -0,0 +1,126 @@ +"""Webhook subscription table + CRUD DTOs. + +Webhooks push DECNET bus events out to external SIEM / SOAR stacks +(Wazuh, Shuffle, TheHive, n8n, ...). Each subscription carries a set +of NATS-style topic patterns; the `decnet webhook` worker subscribes +to the union of patterns across all enabled subscriptions and POSTs +matching events to each matching URL with HMAC-SHA256 signing. + +Simple mode (UI) exposes a friendly enum (`AttackerDetail`, +`DeckyStatus`, `SystemStatus`) that expands to patterns at save time. +Advanced mode lets an admin set raw patterns directly. Storage is +always the expanded list — the enum is sugar at the router layer. +""" +from __future__ import annotations + +import json +from datetime import datetime, timezone +from typing import Any, List, Literal, Optional +from uuid import uuid4 + +from pydantic import BaseModel, Field as PydanticField, HttpUrl +from sqlmodel import Field, SQLModel + + +SimpleEvent = Literal["AttackerDetail", "DeckyStatus", "SystemStatus"] + + +class WebhookSubscription(SQLModel, table=True): + __tablename__ = "webhook_subscriptions" + + uuid: str = Field(default_factory=lambda: str(uuid4()), primary_key=True) + name: str = Field(index=True, unique=True) + url: str + secret: str # HMAC-SHA256 key; plaintext pre-v1 (see DEBT-037 §7) + # JSON-encoded list[str] of NATS-style bus topic patterns. + # Storing as TEXT keeps the schema portable across SQLite and MySQL + # without pulling in dialect-specific JSON columns. + topic_patterns: str = Field(default="[]") + enabled: bool = Field(default=True, index=True) + consecutive_failures: int = Field(default=0) + last_success_at: Optional[datetime] = None + last_failure_at: Optional[datetime] = None + last_error: Optional[str] = None + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + + def patterns(self) -> list[str]: + """Decode `topic_patterns` to a list. Returns [] on bad/empty JSON.""" + try: + raw = json.loads(self.topic_patterns or "[]") + except (ValueError, TypeError): + return [] + return [p for p in raw if isinstance(p, str)] + + +# --- API Request / Response Models (Pydantic) --- + + +class WebhookCreateRequest(BaseModel): + name: str = PydanticField(..., min_length=1, max_length=64) + url: HttpUrl + # If secret is omitted, the router generates a secure random one and + # returns it exactly once on the create response. After that, callers + # can only rotate via PATCH. + secret: Optional[str] = PydanticField(None, min_length=16, max_length=256) + # At least one of simple_events / topic_patterns must be non-empty + # (validated in the router, not Pydantic, so the 400 carries a clear + # detail message). + simple_events: List[SimpleEvent] = PydanticField(default_factory=list) + topic_patterns: List[str] = PydanticField(default_factory=list) + enabled: bool = True + + +class WebhookUpdateRequest(BaseModel): + # Partial update — every field optional; the router diffs against the + # current row and only writes what changed. + name: Optional[str] = PydanticField(None, min_length=1, max_length=64) + url: Optional[HttpUrl] = None + secret: Optional[str] = PydanticField(None, min_length=16, max_length=256) + simple_events: Optional[List[SimpleEvent]] = None + topic_patterns: Optional[List[str]] = None + enabled: Optional[bool] = None + + +class WebhookResponse(BaseModel): + """Public shape — deliberately omits `secret`.""" + + uuid: str + name: str + url: str + topic_patterns: List[str] + enabled: bool + consecutive_failures: int + last_success_at: Optional[datetime] = None + last_failure_at: Optional[datetime] = None + last_error: Optional[str] = None + created_at: datetime + updated_at: datetime + + +class WebhookCreateResponse(WebhookResponse): + """Create-path response — carries the secret exactly once, for copy-out.""" + + secret: str + + +class WebhookTestResponse(BaseModel): + delivered: bool + status_code: Optional[int] = None + error: Optional[str] = None + + +def _row_to_response_dict(row: dict[str, Any]) -> dict[str, Any]: + """Normalize a DB row into the WebhookResponse dict shape. + + Used by the CRUD router to decode `topic_patterns` JSON and drop the + `secret` column before returning to the client. + """ + out = dict(row) + raw = out.pop("topic_patterns", "[]") + try: + out["topic_patterns"] = json.loads(raw or "[]") + except (ValueError, TypeError): + out["topic_patterns"] = [] + out.pop("secret", None) + return out diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index 771fc469..e13ea272 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -431,3 +431,40 @@ class BaseRepository(ABC): async def list_live_topology_ids(self) -> list[str]: return [] + + # --------------------------------------------------------- webhooks + # Webhook subscriptions — external SIEM / SOAR egress configuration. + # Default NotImplementedError keeps non-default backends honest; the + # SQLModel-backed SQLite and MySQL repos override everything below. + + async def create_webhook_subscription(self, data: dict[str, Any]) -> None: + raise NotImplementedError + + async def get_webhook_subscription(self, uuid: str) -> Optional[dict[str, Any]]: + raise NotImplementedError + + async def get_webhook_subscription_by_name( + self, name: str + ) -> Optional[dict[str, Any]]: + raise NotImplementedError + + async def list_webhook_subscriptions( + self, enabled_only: bool = False + ) -> list[dict[str, Any]]: + raise NotImplementedError + + async def update_webhook_subscription( + self, uuid: str, patch: dict[str, Any] + ) -> bool: + raise NotImplementedError + + async def delete_webhook_subscription(self, uuid: str) -> bool: + raise NotImplementedError + + async def record_webhook_success(self, uuid: str, ts: Any) -> None: + raise NotImplementedError + + async def record_webhook_failure( + self, uuid: str, ts: Any, error: str + ) -> None: + raise NotImplementedError diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index 83758fc0..0486fffd 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -44,6 +44,7 @@ from decnet.web.db.models import ( TopologyEdge, TopologyStatusEvent, TopologyMutation, + WebhookSubscription, ) @@ -1744,3 +1745,110 @@ class SQLModelRepository(BaseRepository): ) ) return [r for r in result.scalars().all()] + + # --------------------------------------------------------- webhooks + + async def create_webhook_subscription(self, data: dict[str, Any]) -> None: + async with self._session() as session: + session.add(WebhookSubscription(**data)) + await session.commit() + + async def get_webhook_subscription( + self, uuid: str + ) -> Optional[dict[str, Any]]: + async with self._session() as session: + result = await session.execute( + select(WebhookSubscription).where(WebhookSubscription.uuid == uuid) + ) + row = result.scalar_one_or_none() + return row.model_dump() if row else None + + async def get_webhook_subscription_by_name( + self, name: str + ) -> Optional[dict[str, Any]]: + async with self._session() as session: + result = await session.execute( + select(WebhookSubscription).where(WebhookSubscription.name == name) + ) + row = result.scalar_one_or_none() + return row.model_dump() if row else None + + async def list_webhook_subscriptions( + self, enabled_only: bool = False + ) -> list[dict[str, Any]]: + async with self._session() as session: + stmt = select(WebhookSubscription) + if enabled_only: + stmt = stmt.where(WebhookSubscription.enabled.is_(True)) + stmt = stmt.order_by(WebhookSubscription.created_at) + result = await session.execute(stmt) + return [r.model_dump() for r in result.scalars().all()] + + async def update_webhook_subscription( + self, uuid: str, patch: dict[str, Any] + ) -> bool: + if not patch: + return True + patch = {**patch, "updated_at": datetime.now(timezone.utc)} + async with self._session() as session: + result = await session.execute( + update(WebhookSubscription) + .where(WebhookSubscription.uuid == uuid) + .values(**patch) + ) + await session.commit() + return result.rowcount > 0 + + async def delete_webhook_subscription(self, uuid: str) -> bool: + async with self._session() as session: + result = await session.execute( + select(WebhookSubscription).where(WebhookSubscription.uuid == uuid) + ) + row = result.scalar_one_or_none() + if not row: + return False + await session.delete(row) + await session.commit() + return True + + async def record_webhook_success( + self, uuid: str, ts: datetime + ) -> None: + async with self._session() as session: + await session.execute( + update(WebhookSubscription) + .where(WebhookSubscription.uuid == uuid) + .values( + consecutive_failures=0, + last_success_at=ts, + last_error=None, + updated_at=ts, + ) + ) + await session.commit() + + async def record_webhook_failure( + self, uuid: str, ts: datetime, error: str + ) -> None: + async with self._session() as session: + # Read current failure count, bump, write. Small race window on + # concurrent deliveries to the same subscription is acceptable — + # the counter informs the circuit-breaker heuristic (DEBT-037), + # not a correctness invariant. + result = await session.execute( + select(WebhookSubscription.consecutive_failures).where( + WebhookSubscription.uuid == uuid + ) + ) + current = result.scalar_one_or_none() or 0 + await session.execute( + update(WebhookSubscription) + .where(WebhookSubscription.uuid == uuid) + .values( + consecutive_failures=current + 1, + last_failure_at=ts, + last_error=error[:512] if error else None, + updated_at=ts, + ) + ) + await session.commit() diff --git a/decnet/web/router/__init__.py b/decnet/web/router/__init__.py index 91dba590..a0944254 100644 --- a/decnet/web/router/__init__.py +++ b/decnet/web/router/__init__.py @@ -33,6 +33,7 @@ from .swarm_updates import swarm_updates_router from .swarm_mgmt import swarm_mgmt_router from .system import system_router from .topology import topology_router +from .webhooks import webhooks_router api_router = APIRouter( # Every route under /api/v1 is auth-guarded (either by an explicit @@ -105,3 +106,6 @@ api_router.include_router(system_router) # MazeNET Topologies (nested topology CRUD + mutation queue) api_router.include_router(topology_router) + +# External webhook subscriptions (SIEM/SOAR egress) +api_router.include_router(webhooks_router) diff --git a/decnet/web/router/webhooks/__init__.py b/decnet/web/router/webhooks/__init__.py new file mode 100644 index 00000000..53cdc3a6 --- /dev/null +++ b/decnet/web/router/webhooks/__init__.py @@ -0,0 +1,18 @@ +"""Webhook subscription CRUD. + +Admin-gated management of external-egress webhook subscriptions. The +actual delivery happens in the `decnet webhook` worker, which watches +the DB + bus and POSTs matching events out. This module is the API +surface operators use to configure destinations. + +Mounted under `/api/v1/webhooks` by the main api router. +""" +from fastapi import APIRouter + +from .api_manage_webhooks import router as manage_webhooks_router +from .api_test_webhook import router as test_webhook_router + +webhooks_router = APIRouter(prefix="/webhooks") + +webhooks_router.include_router(manage_webhooks_router) +webhooks_router.include_router(test_webhook_router) diff --git a/decnet/web/router/webhooks/api_manage_webhooks.py b/decnet/web/router/webhooks/api_manage_webhooks.py new file mode 100644 index 00000000..9c5f38af --- /dev/null +++ b/decnet/web/router/webhooks/api_manage_webhooks.py @@ -0,0 +1,222 @@ +"""Webhook subscription CRUD — admin-gated.""" +from __future__ import annotations + +import json +import secrets +from datetime import datetime, timezone +from typing import Any + +from fastapi import APIRouter, Depends, HTTPException + +from decnet.bus import topics as _topics +from decnet.bus.app import get_app_bus +from decnet.logging import get_logger +from decnet.telemetry import traced as _traced +from decnet.web.db.models import ( + MessageResponse, + WebhookCreateRequest, + WebhookCreateResponse, + WebhookResponse, + WebhookUpdateRequest, +) +from decnet.web.db.models.webhooks import _row_to_response_dict +from decnet.web.dependencies import repo, require_admin +from decnet.webhook.enums import merge_patterns + +log = get_logger("api.webhooks") + +router = APIRouter() + + +async def _notify_subscriptions_changed() -> None: + """Publish `system.webhook.subscriptions_changed` on the bus. + + Fire-and-forget per the bus contract — a dropped signal is recoverable + because the webhook worker also reloads on a slow timer as a fallback. + """ + try: + bus = await get_app_bus() + if bus is None: + return + await bus.publish( + _topics.WEBHOOK_SUBSCRIPTIONS_CHANGED, + {}, + event_type="changed", + ) + except Exception as e: # noqa: BLE001 — bus failures must not break CRUD + log.warning("webhook subscriptions-changed publish failed: %s", e) + + +def _row_to_response(row: dict[str, Any]) -> WebhookResponse: + return WebhookResponse(**_row_to_response_dict(row)) + + +@router.post( + "/", + tags=["Webhooks"], + response_model=WebhookCreateResponse, + status_code=201, + responses={ + 400: {"description": "At least one of simple_events / topic_patterns required"}, + 409: {"description": "Name already in use"}, + }, +) +@_traced("api.webhook.create") +async def api_create_webhook( + req: WebhookCreateRequest, + admin: dict = Depends(require_admin), +) -> WebhookCreateResponse: + patterns = merge_patterns(req.simple_events, req.topic_patterns) + if not patterns: + raise HTTPException( + status_code=400, + detail="Provide at least one simple_events entry or topic_patterns pattern.", + ) + + existing = await repo.get_webhook_subscription_by_name(req.name) + if existing: + raise HTTPException(status_code=409, detail="Webhook name already exists") + + # Auto-generate a URL-safe secret if the caller didn't provide one. + # 32 bytes of os-entropy is the same ballpark as a CSRF token. + secret = req.secret or secrets.token_urlsafe(32) + + now = datetime.now(timezone.utc) + data = { + "name": req.name, + "url": str(req.url), + "secret": secret, + "topic_patterns": json.dumps(patterns), + "enabled": req.enabled, + "consecutive_failures": 0, + "created_at": now, + "updated_at": now, + } + await repo.create_webhook_subscription(data) + row = await repo.get_webhook_subscription_by_name(req.name) + if row is None: + # Should never happen — the create just committed. Treat as 500 + # rather than silently masking a storage bug. + raise HTTPException(status_code=500, detail="Webhook created but not retrievable") + + await _notify_subscriptions_changed() + + return WebhookCreateResponse( + **_row_to_response_dict(row), + secret=secret, + ) + + +@router.get( + "/", + tags=["Webhooks"], + response_model=list[WebhookResponse], +) +@_traced("api.webhook.list") +async def api_list_webhooks( + admin: dict = Depends(require_admin), +) -> list[WebhookResponse]: + rows = await repo.list_webhook_subscriptions() + return [_row_to_response(r) for r in rows] + + +@router.get( + "/{uuid}", + tags=["Webhooks"], + response_model=WebhookResponse, + responses={404: {"description": "Webhook not found"}}, +) +@_traced("api.webhook.get") +async def api_get_webhook( + uuid: str, + admin: dict = Depends(require_admin), +) -> WebhookResponse: + row = await repo.get_webhook_subscription(uuid) + if not row: + raise HTTPException(status_code=404, detail="Webhook not found") + return _row_to_response(row) + + +@router.patch( + "/{uuid}", + tags=["Webhooks"], + response_model=WebhookResponse, + responses={ + 400: {"description": "Empty or invalid patch"}, + 404: {"description": "Webhook not found"}, + 409: {"description": "Name already in use"}, + }, +) +@_traced("api.webhook.update") +async def api_update_webhook( + uuid: str, + req: WebhookUpdateRequest, + admin: dict = Depends(require_admin), +) -> WebhookResponse: + current = await repo.get_webhook_subscription(uuid) + if not current: + raise HTTPException(status_code=404, detail="Webhook not found") + + patch: dict[str, Any] = {} + + if req.name is not None and req.name != current["name"]: + clash = await repo.get_webhook_subscription_by_name(req.name) + if clash and clash["uuid"] != uuid: + raise HTTPException(status_code=409, detail="Webhook name already exists") + patch["name"] = req.name + + if req.url is not None: + patch["url"] = str(req.url) + + if req.secret is not None: + patch["secret"] = req.secret + + if req.enabled is not None: + patch["enabled"] = req.enabled + + if req.simple_events is not None or req.topic_patterns is not None: + # Re-merge using whatever the caller supplied; a caller that wants + # to clear all patterns must explicitly pass both as empty lists. + simple = req.simple_events if req.simple_events is not None else [] + raw = req.topic_patterns if req.topic_patterns is not None else [] + patterns = merge_patterns(simple, raw) + if not patterns: + raise HTTPException( + status_code=400, + detail="Cannot clear all patterns; disable the webhook instead.", + ) + patch["topic_patterns"] = json.dumps(patterns) + + if not patch: + # No-op patch — return the current row untouched. + return _row_to_response(current) + + updated = await repo.update_webhook_subscription(uuid, patch) + if not updated: + raise HTTPException(status_code=404, detail="Webhook not found") + + await _notify_subscriptions_changed() + + row = await repo.get_webhook_subscription(uuid) + if row is None: + raise HTTPException(status_code=404, detail="Webhook not found") + return _row_to_response(row) + + +@router.delete( + "/{uuid}", + tags=["Webhooks"], + response_model=MessageResponse, + responses={404: {"description": "Webhook not found"}}, +) +@_traced("api.webhook.delete") +async def api_delete_webhook( + uuid: str, + admin: dict = Depends(require_admin), +) -> dict[str, str]: + deleted = await repo.delete_webhook_subscription(uuid) + if not deleted: + raise HTTPException(status_code=404, detail="Webhook not found") + + await _notify_subscriptions_changed() + return {"message": "Webhook deleted"} diff --git a/decnet/web/router/webhooks/api_test_webhook.py b/decnet/web/router/webhooks/api_test_webhook.py new file mode 100644 index 00000000..2546abcb --- /dev/null +++ b/decnet/web/router/webhooks/api_test_webhook.py @@ -0,0 +1,60 @@ +"""POST /webhooks/{uuid}/test — fire a synthetic ping to verify plumbing. + +This hits the same delivery path the worker uses, so a 200 here proves +the destination URL, HMAC secret, and network egress all work without +waiting for a real bus event. +""" +from __future__ import annotations + +from datetime import datetime, timezone +from uuid import uuid4 + +from fastapi import APIRouter, Depends, HTTPException + +from decnet.logging import get_logger +from decnet.telemetry import traced as _traced +from decnet.web.db.models import WebhookTestResponse +from decnet.web.dependencies import repo, require_admin +from decnet.webhook.client import deliver, SyntheticEvent + +log = get_logger("api.webhooks.test") + +router = APIRouter() + + +@router.post( + "/{uuid}/test", + tags=["Webhooks"], + response_model=WebhookTestResponse, + responses={ + 404: {"description": "Webhook not found"}, + }, +) +@_traced("api.webhook.test") +async def api_test_webhook( + uuid: str, + admin: dict = Depends(require_admin), +) -> WebhookTestResponse: + sub = await repo.get_webhook_subscription(uuid) + if not sub: + raise HTTPException(status_code=404, detail="Webhook not found") + + event = SyntheticEvent( + topic="test.ping", + type="test", + ts=datetime.now(timezone.utc).isoformat(), + id=str(uuid4()), + payload={ + "message": "Synthetic test event from DECNET", + "triggered_by": admin.get("username", "unknown"), + }, + ) + # Single attempt — no retries on manual tests. The operator wants a + # fast signal about the current state of the receiver, not a + # retry-and-wait behavior. + result = await deliver(sub, event, retry_schedule=[]) + return WebhookTestResponse( + delivered=result.ok, + status_code=result.status_code, + error=result.error, + ) diff --git a/decnet/webhook/__init__.py b/decnet/webhook/__init__.py new file mode 100644 index 00000000..51d405a4 --- /dev/null +++ b/decnet/webhook/__init__.py @@ -0,0 +1 @@ +"""External webhook egress — ship bus events to SIEM/SOAR stacks.""" diff --git a/decnet/webhook/client.py b/decnet/webhook/client.py new file mode 100644 index 00000000..0840a2eb --- /dev/null +++ b/decnet/webhook/client.py @@ -0,0 +1,188 @@ +"""HMAC-signed HTTP POST delivery for webhook events. + +The delivery function is shared between the worker's normal dispatch +loop and the `/webhooks/{uuid}/test` admin route — same payload shape, +same signing, same headers. Retry policy is configurable by the caller +so manual tests can skip retries entirely while the worker retries +with backoff. +""" +from __future__ import annotations + +import asyncio +import hashlib +import hmac +import random +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Any, Optional +from uuid import uuid4 + +import httpx +import orjson + +from decnet.logging import get_logger + +log = get_logger("webhook.client") + + +_DEFAULT_TIMEOUT_S = 10.0 +_DEFAULT_RETRY_SCHEDULE = (1.0, 2.0, 4.0) +_JITTER_LOW = 0.8 +_JITTER_HIGH = 1.2 +_PAYLOAD_VERSION = 1 + + +@dataclass(frozen=True) +class SyntheticEvent: + """Structural match for decnet.bus.base.Event — avoids importing the + bus dependency into the HTTP egress layer.""" + + topic: str + type: str + ts: str + id: str + payload: dict[str, Any] + + +@dataclass +class DeliveryResult: + ok: bool + status_code: Optional[int] = None + error: Optional[str] = None + attempts: int = 0 + + +def _canonical_ts(value: Any) -> str: + """Normalize bus-event ts (epoch float / ISO str / None) to ISO-8601 UTC.""" + if isinstance(value, str) and value: + return value + if isinstance(value, (int, float)): + return datetime.fromtimestamp(float(value), tz=timezone.utc).isoformat() + return datetime.now(timezone.utc).isoformat() + + +def _event_id(event: Any) -> str: + explicit = getattr(event, "id", None) + if isinstance(explicit, str) and explicit: + return explicit + return str(uuid4()) + + +def build_payload(event: Any) -> bytes: + """Serialize an event to the canonical JSON body sent on the wire. + + Stable key order (`orjson.OPT_SORT_KEYS`) matters because the HMAC + signs the exact byte sequence — receivers recomputing the hash must + see the same bytes we did. + """ + body = { + "v": _PAYLOAD_VERSION, + "id": _event_id(event), + "ts": _canonical_ts(getattr(event, "ts", None)), + "topic": getattr(event, "topic", ""), + "type": getattr(event, "type", "") or "", + "payload": getattr(event, "payload", None) or {}, + } + return orjson.dumps(body, option=orjson.OPT_SORT_KEYS) + + +def sign(secret: str, body: bytes) -> str: + """Return `sha256=` — the value of the `X-DECNET-Signature` header.""" + digest = hmac.new( + secret.encode("utf-8"), body, hashlib.sha256 + ).hexdigest() + return f"sha256={digest}" + + +def _build_headers(secret: str, body: bytes, topic: str, event_id: str) -> dict[str, str]: + return { + "Content-Type": "application/json", + "User-Agent": "decnet-webhook/1.0", + "X-DECNET-Signature": sign(secret, body), + "X-DECNET-Event-Id": event_id, + "X-DECNET-Event-Topic": topic, + "X-DECNET-Timestamp": str(int(datetime.now(timezone.utc).timestamp())), + } + + +def _should_retry(status_code: int) -> bool: + """Retry on network error, 5xx, and 429. 4xx (other) is terminal — + the receiver is telling us the request itself is wrong; retrying + won't help.""" + if status_code == 429: + return True + return status_code >= 500 + + +def _jittered(delay: float) -> float: + # Jitter is a load-smoothing knob, not a secret — non-crypto random is + # fine. Using secrets.SystemRandom here would burn entropy for no gain. + return delay * random.uniform(_JITTER_LOW, _JITTER_HIGH) # nosec B311 + + +async def deliver( + sub: dict[str, Any], + event: Any, + *, + retry_schedule: Optional[list[float] | tuple[float, ...]] = None, + timeout_s: float = _DEFAULT_TIMEOUT_S, + client: Optional[httpx.AsyncClient] = None, +) -> DeliveryResult: + """POST *event* to *sub['url']* with HMAC signing and bounded retries. + + *sub* is a subscription row dict (from `repo.get_webhook_subscription`). + *retry_schedule* is the between-attempt delays in seconds; `None` uses + the default `(1, 2, 4)`, `[]` disables retries entirely (one attempt). + *client* allows tests to inject a mock `httpx.AsyncClient`. + """ + schedule = ( + list(retry_schedule) if retry_schedule is not None + else list(_DEFAULT_RETRY_SCHEDULE) + ) + max_attempts = 1 + len(schedule) + + body = build_payload(event) + topic = getattr(event, "topic", "") + eid = _event_id(event) + headers = _build_headers(sub["secret"], body, topic, eid) + url = sub["url"] + + owns_client = client is None + if owns_client: + client = httpx.AsyncClient(timeout=timeout_s) + + last_status: Optional[int] = None + last_error: Optional[str] = None + try: + for attempt in range(1, max_attempts + 1): + try: + resp = await client.post(url, content=body, headers=headers) + last_status = resp.status_code + if 200 <= resp.status_code < 300: + return DeliveryResult( + ok=True, status_code=resp.status_code, attempts=attempt + ) + if not _should_retry(resp.status_code): + return DeliveryResult( + ok=False, + status_code=resp.status_code, + error=f"non-retryable {resp.status_code}", + attempts=attempt, + ) + last_error = f"http {resp.status_code}" + except (httpx.RequestError, asyncio.TimeoutError) as e: + last_error = f"{type(e).__name__}: {e}" + last_status = None + + if attempt < max_attempts: + await asyncio.sleep(_jittered(schedule[attempt - 1])) + + return DeliveryResult( + ok=False, + status_code=last_status, + error=last_error or "exhausted retries", + attempts=max_attempts, + ) + finally: + if owns_client: + await client.aclose() diff --git a/decnet/webhook/enums.py b/decnet/webhook/enums.py new file mode 100644 index 00000000..dbb9a70a --- /dev/null +++ b/decnet/webhook/enums.py @@ -0,0 +1,54 @@ +"""Simple-mode event enum → bus-topic pattern expansion. + +The UI's Simple mode hides the NATS-style wildcard syntax behind three +friendly choices. Storage is always the expanded pattern list — the +enum exists only at the API boundary. +""" +from __future__ import annotations + + +# Patterns map to the bus topic hierarchy shipped by DEBT-031's worker +# rollout (see `decnet/bus/topics.py`): +# - attacker.{observed,fingerprinted,scored,session.started,session.ended} +# - decky.{id}.{state,traffic} +# - system.{log,.health,.control,bus.health} +SIMPLE_EVENT_PATTERNS: dict[str, list[str]] = { + "AttackerDetail": ["attacker.>"], + "DeckyStatus": ["decky.*.state", "decky.*.traffic"], + "SystemStatus": ["system.>"], +} + + +def expand_simple_events(names: list[str]) -> list[str]: + """Flatten a list of simple-event names into their bus patterns. + + Unknown names are silently dropped — the router layer validates + against the `SimpleEvent` Literal before calling us, so a bad value + here means a programming error elsewhere, not user input. + """ + out: list[str] = [] + for n in names: + out.extend(SIMPLE_EVENT_PATTERNS.get(n, [])) + return out + + +def merge_patterns( + simple: list[str] | None, advanced: list[str] | None +) -> list[str]: + """Combine simple-event expansions with advanced raw patterns, deduped. + + Order-preserving (simple expansions first, then advanced patterns in + the order the user supplied them) so operators see deterministic + patterns in API responses. + """ + seen: set[str] = set() + out: list[str] = [] + for p in expand_simple_events(simple or []): + if p not in seen: + seen.add(p) + out.append(p) + for p in advanced or []: + if isinstance(p, str) and p and p not in seen: + seen.add(p) + out.append(p) + return out diff --git a/tests/api/webhooks/__init__.py b/tests/api/webhooks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/api/webhooks/test_crud.py b/tests/api/webhooks/test_crud.py new file mode 100644 index 00000000..acbdcc27 --- /dev/null +++ b/tests/api/webhooks/test_crud.py @@ -0,0 +1,187 @@ +"""CRUD tests for /api/v1/webhooks — admin-gated subscription management.""" +from __future__ import annotations + +import httpx +import pytest + + +PATH = "/api/v1/webhooks/" + + +@pytest.mark.asyncio +async def test_create_requires_patterns(client: httpx.AsyncClient, auth_token: str): + res = await client.post( + PATH, + json={"name": "wh1", "url": "https://example.com/x"}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert res.status_code == 400, res.text + + +@pytest.mark.asyncio +async def test_create_expands_simple_events( + client: httpx.AsyncClient, auth_token: str +): + res = await client.post( + PATH, + json={ + "name": "wh-simple", + "url": "https://example.com/x", + "simple_events": ["AttackerDetail"], + }, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert res.status_code == 201, res.text + body = res.json() + assert body["topic_patterns"] == ["attacker.>"] + # Create-path carries the secret for copy-out. + assert body["secret"] + assert len(body["secret"]) >= 16 + + +@pytest.mark.asyncio +async def test_list_strips_secret(client: httpx.AsyncClient, auth_token: str): + await client.post( + PATH, + json={ + "name": "wh-list", + "url": "https://example.com/x", + "topic_patterns": ["system.>"], + }, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + res = await client.get( + PATH, headers={"Authorization": f"Bearer {auth_token}"} + ) + assert res.status_code == 200 + rows = res.json() + assert len(rows) >= 1 + for r in rows: + assert "secret" not in r + + +@pytest.mark.asyncio +async def test_get_single_strips_secret( + client: httpx.AsyncClient, auth_token: str +): + create = await client.post( + PATH, + json={ + "name": "wh-one", + "url": "https://example.com/x", + "topic_patterns": ["decky.*.state"], + }, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + uuid = create.json()["uuid"] + + res = await client.get( + PATH + uuid, headers={"Authorization": f"Bearer {auth_token}"} + ) + assert res.status_code == 200 + assert "secret" not in res.json() + + +@pytest.mark.asyncio +async def test_duplicate_name_conflicts( + client: httpx.AsyncClient, auth_token: str +): + payload = { + "name": "wh-dup", + "url": "https://example.com/x", + "topic_patterns": ["system.>"], + } + first = await client.post( + PATH, json=payload, headers={"Authorization": f"Bearer {auth_token}"} + ) + assert first.status_code == 201 + second = await client.post( + PATH, json=payload, headers={"Authorization": f"Bearer {auth_token}"} + ) + assert second.status_code == 409 + + +@pytest.mark.asyncio +async def test_patch_merges_patterns( + client: httpx.AsyncClient, auth_token: str +): + create = await client.post( + PATH, + json={ + "name": "wh-patch", + "url": "https://example.com/x", + "simple_events": ["AttackerDetail"], + }, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + uuid = create.json()["uuid"] + res = await client.patch( + PATH + uuid, + json={"topic_patterns": ["custom.>"]}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert res.status_code == 200 + # simple_events was NOT passed → it's None → only raw patterns survive. + assert res.json()["topic_patterns"] == ["custom.>"] + + +@pytest.mark.asyncio +async def test_patch_refuses_empty_patterns( + client: httpx.AsyncClient, auth_token: str +): + create = await client.post( + PATH, + json={ + "name": "wh-empty", + "url": "https://example.com/x", + "simple_events": ["AttackerDetail"], + }, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + uuid = create.json()["uuid"] + res = await client.patch( + PATH + uuid, + json={"simple_events": [], "topic_patterns": []}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert res.status_code == 400 + + +@pytest.mark.asyncio +async def test_delete_returns_message( + client: httpx.AsyncClient, auth_token: str +): + create = await client.post( + PATH, + json={ + "name": "wh-del", + "url": "https://example.com/x", + "topic_patterns": ["system.>"], + }, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + uuid = create.json()["uuid"] + res = await client.delete( + PATH + uuid, headers={"Authorization": f"Bearer {auth_token}"} + ) + assert res.status_code == 200 + assert res.json() == {"message": "Webhook deleted"} + # Second delete → 404. + res2 = await client.delete( + PATH + uuid, headers={"Authorization": f"Bearer {auth_token}"} + ) + assert res2.status_code == 404 + + +@pytest.mark.asyncio +async def test_viewer_forbidden(client: httpx.AsyncClient, viewer_token: str): + res = await client.get( + PATH, headers={"Authorization": f"Bearer {viewer_token}"} + ) + assert res.status_code == 403 + + +@pytest.mark.asyncio +async def test_unauthenticated_rejected(client: httpx.AsyncClient): + res = await client.get(PATH) + assert res.status_code == 401 diff --git a/tests/webhook/__init__.py b/tests/webhook/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/webhook/test_client.py b/tests/webhook/test_client.py new file mode 100644 index 00000000..8755adf5 --- /dev/null +++ b/tests/webhook/test_client.py @@ -0,0 +1,145 @@ +"""Unit tests for decnet.webhook.client — HMAC + retry policy.""" +from __future__ import annotations + +import hashlib +import hmac + +import httpx +import pytest + +from decnet.webhook.client import ( + DeliveryResult, + SyntheticEvent, + build_payload, + deliver, + sign, +) + + +_EVENT = SyntheticEvent( + topic="attacker.observed", + type="first_sighting", + ts="2026-04-24T00:00:00+00:00", + id="11111111-1111-1111-1111-111111111111", + payload={"ip": "1.2.3.4"}, +) + + +def _sub(url: str = "https://webhook.example/inbound", secret: str = "s" * 32) -> dict: + return {"uuid": "w1", "url": url, "secret": secret} + + +def test_sign_matches_known_vector(): + body = b'{"hello":"world"}' + secret = "0123456789abcdef" + expected = ( + "sha256=" + + hmac.new(secret.encode(), body, hashlib.sha256).hexdigest() + ) + assert sign(secret, body) == expected + + +def test_build_payload_stable_key_order(): + # Same input → same bytes → same HMAC, regardless of kwarg order. + b1 = build_payload(_EVENT) + b2 = build_payload(_EVENT) + assert b1 == b2 + assert b'"topic":"attacker.observed"' in b1 + assert b'"v":1' in b1 + + +@pytest.mark.asyncio +async def test_deliver_success_on_2xx(): + async def handler(request: httpx.Request) -> httpx.Response: + assert request.headers.get("X-DECNET-Signature", "").startswith("sha256=") + assert request.headers.get("X-DECNET-Event-Id") == _EVENT.id + return httpx.Response(200, json={"ok": True}) + + transport = httpx.MockTransport(handler) + async with httpx.AsyncClient(transport=transport) as client: + result = await deliver(_sub(), _EVENT, retry_schedule=[], client=client) + assert result == DeliveryResult(ok=True, status_code=200, attempts=1) + + +@pytest.mark.asyncio +async def test_deliver_no_retry_on_4xx(): + calls = {"n": 0} + + async def handler(request: httpx.Request) -> httpx.Response: + calls["n"] += 1 + return httpx.Response(400, text="bad body") + + transport = httpx.MockTransport(handler) + async with httpx.AsyncClient(transport=transport) as client: + result = await deliver(_sub(), _EVENT, retry_schedule=[1, 1, 1], client=client) + assert result.ok is False + assert result.status_code == 400 + assert calls["n"] == 1 # no retry + + +@pytest.mark.asyncio +async def test_deliver_retries_on_429(): + calls = {"n": 0} + + async def handler(request: httpx.Request) -> httpx.Response: + calls["n"] += 1 + if calls["n"] < 3: + return httpx.Response(429) + return httpx.Response(200) + + transport = httpx.MockTransport(handler) + async with httpx.AsyncClient(transport=transport) as client: + result = await deliver(_sub(), _EVENT, retry_schedule=[0, 0], client=client) + assert result.ok is True + assert result.attempts == 3 + + +@pytest.mark.asyncio +async def test_deliver_retries_on_5xx_then_gives_up(): + async def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(503) + + transport = httpx.MockTransport(handler) + async with httpx.AsyncClient(transport=transport) as client: + result = await deliver(_sub(), _EVENT, retry_schedule=[0, 0], client=client) + assert result.ok is False + assert result.status_code == 503 + assert result.attempts == 3 + + +@pytest.mark.asyncio +async def test_deliver_retries_on_connection_error(): + async def handler(request: httpx.Request) -> httpx.Response: + raise httpx.ConnectError("boom") + + transport = httpx.MockTransport(handler) + async with httpx.AsyncClient(transport=transport) as client: + result = await deliver(_sub(), _EVENT, retry_schedule=[0], client=client) + assert result.ok is False + assert result.status_code is None + assert "ConnectError" in (result.error or "") + assert result.attempts == 2 + + +@pytest.mark.asyncio +async def test_deliver_receiver_can_verify_signature(): + """End-to-end: receiver recomputes HMAC over the posted body and matches ours.""" + sub = _sub(secret="deadbeefdeadbeef") + captured: dict = {} + + async def handler(request: httpx.Request) -> httpx.Response: + captured["body"] = request.content + captured["sig"] = request.headers["X-DECNET-Signature"] + return httpx.Response(200) + + transport = httpx.MockTransport(handler) + async with httpx.AsyncClient(transport=transport) as client: + result = await deliver(sub, _EVENT, retry_schedule=[], client=client) + assert result.ok + expected = ( + "sha256=" + + hmac.new( + sub["secret"].encode(), captured["body"], hashlib.sha256 + ).hexdigest() + ) + assert captured["sig"] == expected diff --git a/tests/webhook/test_enums.py b/tests/webhook/test_enums.py new file mode 100644 index 00000000..1a9dff7c --- /dev/null +++ b/tests/webhook/test_enums.py @@ -0,0 +1,49 @@ +"""Unit tests for decnet.webhook.enums — simple→patterns expansion.""" +from decnet.webhook.enums import ( + SIMPLE_EVENT_PATTERNS, + expand_simple_events, + merge_patterns, +) + + +def test_simple_event_patterns_covers_three_families(): + assert set(SIMPLE_EVENT_PATTERNS) == { + "AttackerDetail", + "DeckyStatus", + "SystemStatus", + } + + +def test_expand_single_event(): + assert expand_simple_events(["AttackerDetail"]) == ["attacker.>"] + + +def test_expand_multiple_events_concatenates(): + out = expand_simple_events(["AttackerDetail", "DeckyStatus"]) + assert out == ["attacker.>", "decky.*.state", "decky.*.traffic"] + + +def test_expand_unknown_event_dropped_silently(): + # The Literal type on the router rejects unknowns; this guards against + # programmer error, not user input. + assert expand_simple_events(["NotAThing"]) == [] + + +def test_merge_dedups_overlap(): + merged = merge_patterns(["AttackerDetail"], ["attacker.>", "custom.>"]) + assert merged == ["attacker.>", "custom.>"] + + +def test_merge_preserves_order_simple_first(): + merged = merge_patterns(["SystemStatus"], ["attacker.>", "decky.*.state"]) + assert merged == ["system.>", "attacker.>", "decky.*.state"] + + +def test_merge_empty_lists_returns_empty(): + assert merge_patterns([], []) == [] + assert merge_patterns(None, None) == [] + + +def test_merge_drops_empty_strings_and_non_strings(): + merged = merge_patterns([], ["", "attacker.>", None]) # type: ignore[list-item] + assert merged == ["attacker.>"]