feat(webhooks): subscription CRUD + HMAC-signed delivery client

Introduces the webhook egress foundation — a new WebhookSubscription
table, admin-gated CRUD under /api/v1/webhooks, and the shared
delivery client that both the test-ping route and the upcoming worker
will use. No worker yet; this commit is API + model + client only.

Simple-mode enum (AttackerDetail / DeckyStatus / SystemStatus) expands
to bus-topic patterns at the router layer; storage is always the raw
pattern list. Advanced mode lets admins supply raw NATS-style patterns
directly. Filter-at-subscribe: the worker (next commit) will subscribe
to the union of patterns across enabled subscriptions.

Delivery client handles HMAC-SHA256 signing (X-DECNET-Signature),
retry on 429/5xx/network errors with jittered backoff, no-retry on
4xx. Secrets never leave the server on GET/LIST — only the create
response carries the secret for copy-out.

CRUD routes publish WEBHOOK_SUBSCRIPTIONS_CHANGED on the bus after
every mutation so the (future) worker can hot-reload.

Opens DEBT-037 for the deferred items (circuit breaker, dead-letter,
batch delivery, payload templates, secret-at-rest).
This commit is contained in:
2026-04-24 15:30:05 -04:00
parent 162f7c1194
commit b70845a85d
17 changed files with 1222 additions and 0 deletions

View File

@@ -93,6 +93,12 @@ SYSTEM_CONTROL = "control"
WORKER_CONTROL_STOP = "stop"
WORKER_CONTROL_START = "start"
# Webhook subscription-set changed — published by the CRUD router after any
# create / update / delete on WebhookSubscription so the webhook worker can
# reload its in-memory subscription list and re-subscribe to the new union
# of patterns. Payload is currently empty; consumers only need the signal.
WEBHOOK_SUBSCRIPTIONS_CHANGED = "system.webhook.subscriptions_changed"
# ─── Builders ────────────────────────────────────────────────────────────────

View File

@@ -112,6 +112,15 @@ from .updater import (
RollbackRequest,
RollbackResponse,
)
from .webhooks import (
SimpleEvent,
WebhookCreateRequest,
WebhookCreateResponse,
WebhookResponse,
WebhookSubscription,
WebhookTestResponse,
WebhookUpdateRequest,
)
from .workers import (
StartAllResponse,
StartFailure,
@@ -218,6 +227,14 @@ __all__ = [
"PushUpdateResult",
"RollbackRequest",
"RollbackResponse",
# webhooks
"SimpleEvent",
"WebhookCreateRequest",
"WebhookCreateResponse",
"WebhookResponse",
"WebhookSubscription",
"WebhookTestResponse",
"WebhookUpdateRequest",
# workers
"StartAllResponse",
"StartFailure",

View File

@@ -0,0 +1,126 @@
"""Webhook subscription table + CRUD DTOs.
Webhooks push DECNET bus events out to external SIEM / SOAR stacks
(Wazuh, Shuffle, TheHive, n8n, ...). Each subscription carries a set
of NATS-style topic patterns; the `decnet webhook` worker subscribes
to the union of patterns across all enabled subscriptions and POSTs
matching events to each matching URL with HMAC-SHA256 signing.
Simple mode (UI) exposes a friendly enum (`AttackerDetail`,
`DeckyStatus`, `SystemStatus`) that expands to patterns at save time.
Advanced mode lets an admin set raw patterns directly. Storage is
always the expanded list — the enum is sugar at the router layer.
"""
from __future__ import annotations
import json
from datetime import datetime, timezone
from typing import Any, List, Literal, Optional
from uuid import uuid4
from pydantic import BaseModel, Field as PydanticField, HttpUrl
from sqlmodel import Field, SQLModel
SimpleEvent = Literal["AttackerDetail", "DeckyStatus", "SystemStatus"]
class WebhookSubscription(SQLModel, table=True):
__tablename__ = "webhook_subscriptions"
uuid: str = Field(default_factory=lambda: str(uuid4()), primary_key=True)
name: str = Field(index=True, unique=True)
url: str
secret: str # HMAC-SHA256 key; plaintext pre-v1 (see DEBT-037 §7)
# JSON-encoded list[str] of NATS-style bus topic patterns.
# Storing as TEXT keeps the schema portable across SQLite and MySQL
# without pulling in dialect-specific JSON columns.
topic_patterns: str = Field(default="[]")
enabled: bool = Field(default=True, index=True)
consecutive_failures: int = Field(default=0)
last_success_at: Optional[datetime] = None
last_failure_at: Optional[datetime] = None
last_error: Optional[str] = None
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
def patterns(self) -> list[str]:
"""Decode `topic_patterns` to a list. Returns [] on bad/empty JSON."""
try:
raw = json.loads(self.topic_patterns or "[]")
except (ValueError, TypeError):
return []
return [p for p in raw if isinstance(p, str)]
# --- API Request / Response Models (Pydantic) ---
class WebhookCreateRequest(BaseModel):
name: str = PydanticField(..., min_length=1, max_length=64)
url: HttpUrl
# If secret is omitted, the router generates a secure random one and
# returns it exactly once on the create response. After that, callers
# can only rotate via PATCH.
secret: Optional[str] = PydanticField(None, min_length=16, max_length=256)
# At least one of simple_events / topic_patterns must be non-empty
# (validated in the router, not Pydantic, so the 400 carries a clear
# detail message).
simple_events: List[SimpleEvent] = PydanticField(default_factory=list)
topic_patterns: List[str] = PydanticField(default_factory=list)
enabled: bool = True
class WebhookUpdateRequest(BaseModel):
# Partial update — every field optional; the router diffs against the
# current row and only writes what changed.
name: Optional[str] = PydanticField(None, min_length=1, max_length=64)
url: Optional[HttpUrl] = None
secret: Optional[str] = PydanticField(None, min_length=16, max_length=256)
simple_events: Optional[List[SimpleEvent]] = None
topic_patterns: Optional[List[str]] = None
enabled: Optional[bool] = None
class WebhookResponse(BaseModel):
"""Public shape — deliberately omits `secret`."""
uuid: str
name: str
url: str
topic_patterns: List[str]
enabled: bool
consecutive_failures: int
last_success_at: Optional[datetime] = None
last_failure_at: Optional[datetime] = None
last_error: Optional[str] = None
created_at: datetime
updated_at: datetime
class WebhookCreateResponse(WebhookResponse):
"""Create-path response — carries the secret exactly once, for copy-out."""
secret: str
class WebhookTestResponse(BaseModel):
delivered: bool
status_code: Optional[int] = None
error: Optional[str] = None
def _row_to_response_dict(row: dict[str, Any]) -> dict[str, Any]:
"""Normalize a DB row into the WebhookResponse dict shape.
Used by the CRUD router to decode `topic_patterns` JSON and drop the
`secret` column before returning to the client.
"""
out = dict(row)
raw = out.pop("topic_patterns", "[]")
try:
out["topic_patterns"] = json.loads(raw or "[]")
except (ValueError, TypeError):
out["topic_patterns"] = []
out.pop("secret", None)
return out

View File

@@ -431,3 +431,40 @@ class BaseRepository(ABC):
async def list_live_topology_ids(self) -> list[str]:
return []
# --------------------------------------------------------- webhooks
# Webhook subscriptions — external SIEM / SOAR egress configuration.
# Default NotImplementedError keeps non-default backends honest; the
# SQLModel-backed SQLite and MySQL repos override everything below.
async def create_webhook_subscription(self, data: dict[str, Any]) -> None:
raise NotImplementedError
async def get_webhook_subscription(self, uuid: str) -> Optional[dict[str, Any]]:
raise NotImplementedError
async def get_webhook_subscription_by_name(
self, name: str
) -> Optional[dict[str, Any]]:
raise NotImplementedError
async def list_webhook_subscriptions(
self, enabled_only: bool = False
) -> list[dict[str, Any]]:
raise NotImplementedError
async def update_webhook_subscription(
self, uuid: str, patch: dict[str, Any]
) -> bool:
raise NotImplementedError
async def delete_webhook_subscription(self, uuid: str) -> bool:
raise NotImplementedError
async def record_webhook_success(self, uuid: str, ts: Any) -> None:
raise NotImplementedError
async def record_webhook_failure(
self, uuid: str, ts: Any, error: str
) -> None:
raise NotImplementedError

View File

@@ -44,6 +44,7 @@ from decnet.web.db.models import (
TopologyEdge,
TopologyStatusEvent,
TopologyMutation,
WebhookSubscription,
)
@@ -1744,3 +1745,110 @@ class SQLModelRepository(BaseRepository):
)
)
return [r for r in result.scalars().all()]
# --------------------------------------------------------- webhooks
async def create_webhook_subscription(self, data: dict[str, Any]) -> None:
async with self._session() as session:
session.add(WebhookSubscription(**data))
await session.commit()
async def get_webhook_subscription(
self, uuid: str
) -> Optional[dict[str, Any]]:
async with self._session() as session:
result = await session.execute(
select(WebhookSubscription).where(WebhookSubscription.uuid == uuid)
)
row = result.scalar_one_or_none()
return row.model_dump() if row else None
async def get_webhook_subscription_by_name(
self, name: str
) -> Optional[dict[str, Any]]:
async with self._session() as session:
result = await session.execute(
select(WebhookSubscription).where(WebhookSubscription.name == name)
)
row = result.scalar_one_or_none()
return row.model_dump() if row else None
async def list_webhook_subscriptions(
self, enabled_only: bool = False
) -> list[dict[str, Any]]:
async with self._session() as session:
stmt = select(WebhookSubscription)
if enabled_only:
stmt = stmt.where(WebhookSubscription.enabled.is_(True))
stmt = stmt.order_by(WebhookSubscription.created_at)
result = await session.execute(stmt)
return [r.model_dump() for r in result.scalars().all()]
async def update_webhook_subscription(
self, uuid: str, patch: dict[str, Any]
) -> bool:
if not patch:
return True
patch = {**patch, "updated_at": datetime.now(timezone.utc)}
async with self._session() as session:
result = await session.execute(
update(WebhookSubscription)
.where(WebhookSubscription.uuid == uuid)
.values(**patch)
)
await session.commit()
return result.rowcount > 0
async def delete_webhook_subscription(self, uuid: str) -> bool:
async with self._session() as session:
result = await session.execute(
select(WebhookSubscription).where(WebhookSubscription.uuid == uuid)
)
row = result.scalar_one_or_none()
if not row:
return False
await session.delete(row)
await session.commit()
return True
async def record_webhook_success(
self, uuid: str, ts: datetime
) -> None:
async with self._session() as session:
await session.execute(
update(WebhookSubscription)
.where(WebhookSubscription.uuid == uuid)
.values(
consecutive_failures=0,
last_success_at=ts,
last_error=None,
updated_at=ts,
)
)
await session.commit()
async def record_webhook_failure(
self, uuid: str, ts: datetime, error: str
) -> None:
async with self._session() as session:
# Read current failure count, bump, write. Small race window on
# concurrent deliveries to the same subscription is acceptable —
# the counter informs the circuit-breaker heuristic (DEBT-037),
# not a correctness invariant.
result = await session.execute(
select(WebhookSubscription.consecutive_failures).where(
WebhookSubscription.uuid == uuid
)
)
current = result.scalar_one_or_none() or 0
await session.execute(
update(WebhookSubscription)
.where(WebhookSubscription.uuid == uuid)
.values(
consecutive_failures=current + 1,
last_failure_at=ts,
last_error=error[:512] if error else None,
updated_at=ts,
)
)
await session.commit()

View File

@@ -33,6 +33,7 @@ from .swarm_updates import swarm_updates_router
from .swarm_mgmt import swarm_mgmt_router
from .system import system_router
from .topology import topology_router
from .webhooks import webhooks_router
api_router = APIRouter(
# Every route under /api/v1 is auth-guarded (either by an explicit
@@ -105,3 +106,6 @@ api_router.include_router(system_router)
# MazeNET Topologies (nested topology CRUD + mutation queue)
api_router.include_router(topology_router)
# External webhook subscriptions (SIEM/SOAR egress)
api_router.include_router(webhooks_router)

View File

@@ -0,0 +1,18 @@
"""Webhook subscription CRUD.
Admin-gated management of external-egress webhook subscriptions. The
actual delivery happens in the `decnet webhook` worker, which watches
the DB + bus and POSTs matching events out. This module is the API
surface operators use to configure destinations.
Mounted under `/api/v1/webhooks` by the main api router.
"""
from fastapi import APIRouter
from .api_manage_webhooks import router as manage_webhooks_router
from .api_test_webhook import router as test_webhook_router
webhooks_router = APIRouter(prefix="/webhooks")
webhooks_router.include_router(manage_webhooks_router)
webhooks_router.include_router(test_webhook_router)

View File

@@ -0,0 +1,222 @@
"""Webhook subscription CRUD — admin-gated."""
from __future__ import annotations
import json
import secrets
from datetime import datetime, timezone
from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from decnet.bus import topics as _topics
from decnet.bus.app import get_app_bus
from decnet.logging import get_logger
from decnet.telemetry import traced as _traced
from decnet.web.db.models import (
MessageResponse,
WebhookCreateRequest,
WebhookCreateResponse,
WebhookResponse,
WebhookUpdateRequest,
)
from decnet.web.db.models.webhooks import _row_to_response_dict
from decnet.web.dependencies import repo, require_admin
from decnet.webhook.enums import merge_patterns
log = get_logger("api.webhooks")
router = APIRouter()
async def _notify_subscriptions_changed() -> None:
"""Publish `system.webhook.subscriptions_changed` on the bus.
Fire-and-forget per the bus contract — a dropped signal is recoverable
because the webhook worker also reloads on a slow timer as a fallback.
"""
try:
bus = await get_app_bus()
if bus is None:
return
await bus.publish(
_topics.WEBHOOK_SUBSCRIPTIONS_CHANGED,
{},
event_type="changed",
)
except Exception as e: # noqa: BLE001 — bus failures must not break CRUD
log.warning("webhook subscriptions-changed publish failed: %s", e)
def _row_to_response(row: dict[str, Any]) -> WebhookResponse:
return WebhookResponse(**_row_to_response_dict(row))
@router.post(
"/",
tags=["Webhooks"],
response_model=WebhookCreateResponse,
status_code=201,
responses={
400: {"description": "At least one of simple_events / topic_patterns required"},
409: {"description": "Name already in use"},
},
)
@_traced("api.webhook.create")
async def api_create_webhook(
req: WebhookCreateRequest,
admin: dict = Depends(require_admin),
) -> WebhookCreateResponse:
patterns = merge_patterns(req.simple_events, req.topic_patterns)
if not patterns:
raise HTTPException(
status_code=400,
detail="Provide at least one simple_events entry or topic_patterns pattern.",
)
existing = await repo.get_webhook_subscription_by_name(req.name)
if existing:
raise HTTPException(status_code=409, detail="Webhook name already exists")
# Auto-generate a URL-safe secret if the caller didn't provide one.
# 32 bytes of os-entropy is the same ballpark as a CSRF token.
secret = req.secret or secrets.token_urlsafe(32)
now = datetime.now(timezone.utc)
data = {
"name": req.name,
"url": str(req.url),
"secret": secret,
"topic_patterns": json.dumps(patterns),
"enabled": req.enabled,
"consecutive_failures": 0,
"created_at": now,
"updated_at": now,
}
await repo.create_webhook_subscription(data)
row = await repo.get_webhook_subscription_by_name(req.name)
if row is None:
# Should never happen — the create just committed. Treat as 500
# rather than silently masking a storage bug.
raise HTTPException(status_code=500, detail="Webhook created but not retrievable")
await _notify_subscriptions_changed()
return WebhookCreateResponse(
**_row_to_response_dict(row),
secret=secret,
)
@router.get(
"/",
tags=["Webhooks"],
response_model=list[WebhookResponse],
)
@_traced("api.webhook.list")
async def api_list_webhooks(
admin: dict = Depends(require_admin),
) -> list[WebhookResponse]:
rows = await repo.list_webhook_subscriptions()
return [_row_to_response(r) for r in rows]
@router.get(
"/{uuid}",
tags=["Webhooks"],
response_model=WebhookResponse,
responses={404: {"description": "Webhook not found"}},
)
@_traced("api.webhook.get")
async def api_get_webhook(
uuid: str,
admin: dict = Depends(require_admin),
) -> WebhookResponse:
row = await repo.get_webhook_subscription(uuid)
if not row:
raise HTTPException(status_code=404, detail="Webhook not found")
return _row_to_response(row)
@router.patch(
"/{uuid}",
tags=["Webhooks"],
response_model=WebhookResponse,
responses={
400: {"description": "Empty or invalid patch"},
404: {"description": "Webhook not found"},
409: {"description": "Name already in use"},
},
)
@_traced("api.webhook.update")
async def api_update_webhook(
uuid: str,
req: WebhookUpdateRequest,
admin: dict = Depends(require_admin),
) -> WebhookResponse:
current = await repo.get_webhook_subscription(uuid)
if not current:
raise HTTPException(status_code=404, detail="Webhook not found")
patch: dict[str, Any] = {}
if req.name is not None and req.name != current["name"]:
clash = await repo.get_webhook_subscription_by_name(req.name)
if clash and clash["uuid"] != uuid:
raise HTTPException(status_code=409, detail="Webhook name already exists")
patch["name"] = req.name
if req.url is not None:
patch["url"] = str(req.url)
if req.secret is not None:
patch["secret"] = req.secret
if req.enabled is not None:
patch["enabled"] = req.enabled
if req.simple_events is not None or req.topic_patterns is not None:
# Re-merge using whatever the caller supplied; a caller that wants
# to clear all patterns must explicitly pass both as empty lists.
simple = req.simple_events if req.simple_events is not None else []
raw = req.topic_patterns if req.topic_patterns is not None else []
patterns = merge_patterns(simple, raw)
if not patterns:
raise HTTPException(
status_code=400,
detail="Cannot clear all patterns; disable the webhook instead.",
)
patch["topic_patterns"] = json.dumps(patterns)
if not patch:
# No-op patch — return the current row untouched.
return _row_to_response(current)
updated = await repo.update_webhook_subscription(uuid, patch)
if not updated:
raise HTTPException(status_code=404, detail="Webhook not found")
await _notify_subscriptions_changed()
row = await repo.get_webhook_subscription(uuid)
if row is None:
raise HTTPException(status_code=404, detail="Webhook not found")
return _row_to_response(row)
@router.delete(
"/{uuid}",
tags=["Webhooks"],
response_model=MessageResponse,
responses={404: {"description": "Webhook not found"}},
)
@_traced("api.webhook.delete")
async def api_delete_webhook(
uuid: str,
admin: dict = Depends(require_admin),
) -> dict[str, str]:
deleted = await repo.delete_webhook_subscription(uuid)
if not deleted:
raise HTTPException(status_code=404, detail="Webhook not found")
await _notify_subscriptions_changed()
return {"message": "Webhook deleted"}

View File

@@ -0,0 +1,60 @@
"""POST /webhooks/{uuid}/test — fire a synthetic ping to verify plumbing.
This hits the same delivery path the worker uses, so a 200 here proves
the destination URL, HMAC secret, and network egress all work without
waiting for a real bus event.
"""
from __future__ import annotations
from datetime import datetime, timezone
from uuid import uuid4
from fastapi import APIRouter, Depends, HTTPException
from decnet.logging import get_logger
from decnet.telemetry import traced as _traced
from decnet.web.db.models import WebhookTestResponse
from decnet.web.dependencies import repo, require_admin
from decnet.webhook.client import deliver, SyntheticEvent
log = get_logger("api.webhooks.test")
router = APIRouter()
@router.post(
"/{uuid}/test",
tags=["Webhooks"],
response_model=WebhookTestResponse,
responses={
404: {"description": "Webhook not found"},
},
)
@_traced("api.webhook.test")
async def api_test_webhook(
uuid: str,
admin: dict = Depends(require_admin),
) -> WebhookTestResponse:
sub = await repo.get_webhook_subscription(uuid)
if not sub:
raise HTTPException(status_code=404, detail="Webhook not found")
event = SyntheticEvent(
topic="test.ping",
type="test",
ts=datetime.now(timezone.utc).isoformat(),
id=str(uuid4()),
payload={
"message": "Synthetic test event from DECNET",
"triggered_by": admin.get("username", "unknown"),
},
)
# Single attempt — no retries on manual tests. The operator wants a
# fast signal about the current state of the receiver, not a
# retry-and-wait behavior.
result = await deliver(sub, event, retry_schedule=[])
return WebhookTestResponse(
delivered=result.ok,
status_code=result.status_code,
error=result.error,
)

View File

@@ -0,0 +1 @@
"""External webhook egress — ship bus events to SIEM/SOAR stacks."""

188
decnet/webhook/client.py Normal file
View File

@@ -0,0 +1,188 @@
"""HMAC-signed HTTP POST delivery for webhook events.
The delivery function is shared between the worker's normal dispatch
loop and the `/webhooks/{uuid}/test` admin route — same payload shape,
same signing, same headers. Retry policy is configurable by the caller
so manual tests can skip retries entirely while the worker retries
with backoff.
"""
from __future__ import annotations
import asyncio
import hashlib
import hmac
import random
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Optional
from uuid import uuid4
import httpx
import orjson
from decnet.logging import get_logger
log = get_logger("webhook.client")
_DEFAULT_TIMEOUT_S = 10.0
_DEFAULT_RETRY_SCHEDULE = (1.0, 2.0, 4.0)
_JITTER_LOW = 0.8
_JITTER_HIGH = 1.2
_PAYLOAD_VERSION = 1
@dataclass(frozen=True)
class SyntheticEvent:
"""Structural match for decnet.bus.base.Event — avoids importing the
bus dependency into the HTTP egress layer."""
topic: str
type: str
ts: str
id: str
payload: dict[str, Any]
@dataclass
class DeliveryResult:
ok: bool
status_code: Optional[int] = None
error: Optional[str] = None
attempts: int = 0
def _canonical_ts(value: Any) -> str:
"""Normalize bus-event ts (epoch float / ISO str / None) to ISO-8601 UTC."""
if isinstance(value, str) and value:
return value
if isinstance(value, (int, float)):
return datetime.fromtimestamp(float(value), tz=timezone.utc).isoformat()
return datetime.now(timezone.utc).isoformat()
def _event_id(event: Any) -> str:
explicit = getattr(event, "id", None)
if isinstance(explicit, str) and explicit:
return explicit
return str(uuid4())
def build_payload(event: Any) -> bytes:
"""Serialize an event to the canonical JSON body sent on the wire.
Stable key order (`orjson.OPT_SORT_KEYS`) matters because the HMAC
signs the exact byte sequence — receivers recomputing the hash must
see the same bytes we did.
"""
body = {
"v": _PAYLOAD_VERSION,
"id": _event_id(event),
"ts": _canonical_ts(getattr(event, "ts", None)),
"topic": getattr(event, "topic", ""),
"type": getattr(event, "type", "") or "",
"payload": getattr(event, "payload", None) or {},
}
return orjson.dumps(body, option=orjson.OPT_SORT_KEYS)
def sign(secret: str, body: bytes) -> str:
"""Return `sha256=<hex>` — the value of the `X-DECNET-Signature` header."""
digest = hmac.new(
secret.encode("utf-8"), body, hashlib.sha256
).hexdigest()
return f"sha256={digest}"
def _build_headers(secret: str, body: bytes, topic: str, event_id: str) -> dict[str, str]:
return {
"Content-Type": "application/json",
"User-Agent": "decnet-webhook/1.0",
"X-DECNET-Signature": sign(secret, body),
"X-DECNET-Event-Id": event_id,
"X-DECNET-Event-Topic": topic,
"X-DECNET-Timestamp": str(int(datetime.now(timezone.utc).timestamp())),
}
def _should_retry(status_code: int) -> bool:
"""Retry on network error, 5xx, and 429. 4xx (other) is terminal —
the receiver is telling us the request itself is wrong; retrying
won't help."""
if status_code == 429:
return True
return status_code >= 500
def _jittered(delay: float) -> float:
# Jitter is a load-smoothing knob, not a secret — non-crypto random is
# fine. Using secrets.SystemRandom here would burn entropy for no gain.
return delay * random.uniform(_JITTER_LOW, _JITTER_HIGH) # nosec B311
async def deliver(
sub: dict[str, Any],
event: Any,
*,
retry_schedule: Optional[list[float] | tuple[float, ...]] = None,
timeout_s: float = _DEFAULT_TIMEOUT_S,
client: Optional[httpx.AsyncClient] = None,
) -> DeliveryResult:
"""POST *event* to *sub['url']* with HMAC signing and bounded retries.
*sub* is a subscription row dict (from `repo.get_webhook_subscription`).
*retry_schedule* is the between-attempt delays in seconds; `None` uses
the default `(1, 2, 4)`, `[]` disables retries entirely (one attempt).
*client* allows tests to inject a mock `httpx.AsyncClient`.
"""
schedule = (
list(retry_schedule) if retry_schedule is not None
else list(_DEFAULT_RETRY_SCHEDULE)
)
max_attempts = 1 + len(schedule)
body = build_payload(event)
topic = getattr(event, "topic", "")
eid = _event_id(event)
headers = _build_headers(sub["secret"], body, topic, eid)
url = sub["url"]
owns_client = client is None
if owns_client:
client = httpx.AsyncClient(timeout=timeout_s)
last_status: Optional[int] = None
last_error: Optional[str] = None
try:
for attempt in range(1, max_attempts + 1):
try:
resp = await client.post(url, content=body, headers=headers)
last_status = resp.status_code
if 200 <= resp.status_code < 300:
return DeliveryResult(
ok=True, status_code=resp.status_code, attempts=attempt
)
if not _should_retry(resp.status_code):
return DeliveryResult(
ok=False,
status_code=resp.status_code,
error=f"non-retryable {resp.status_code}",
attempts=attempt,
)
last_error = f"http {resp.status_code}"
except (httpx.RequestError, asyncio.TimeoutError) as e:
last_error = f"{type(e).__name__}: {e}"
last_status = None
if attempt < max_attempts:
await asyncio.sleep(_jittered(schedule[attempt - 1]))
return DeliveryResult(
ok=False,
status_code=last_status,
error=last_error or "exhausted retries",
attempts=max_attempts,
)
finally:
if owns_client:
await client.aclose()

54
decnet/webhook/enums.py Normal file
View File

@@ -0,0 +1,54 @@
"""Simple-mode event enum → bus-topic pattern expansion.
The UI's Simple mode hides the NATS-style wildcard syntax behind three
friendly choices. Storage is always the expanded pattern list — the
enum exists only at the API boundary.
"""
from __future__ import annotations
# Patterns map to the bus topic hierarchy shipped by DEBT-031's worker
# rollout (see `decnet/bus/topics.py`):
# - attacker.{observed,fingerprinted,scored,session.started,session.ended}
# - decky.{id}.{state,traffic}
# - system.{log,<worker>.health,<worker>.control,bus.health}
SIMPLE_EVENT_PATTERNS: dict[str, list[str]] = {
"AttackerDetail": ["attacker.>"],
"DeckyStatus": ["decky.*.state", "decky.*.traffic"],
"SystemStatus": ["system.>"],
}
def expand_simple_events(names: list[str]) -> list[str]:
"""Flatten a list of simple-event names into their bus patterns.
Unknown names are silently dropped — the router layer validates
against the `SimpleEvent` Literal before calling us, so a bad value
here means a programming error elsewhere, not user input.
"""
out: list[str] = []
for n in names:
out.extend(SIMPLE_EVENT_PATTERNS.get(n, []))
return out
def merge_patterns(
simple: list[str] | None, advanced: list[str] | None
) -> list[str]:
"""Combine simple-event expansions with advanced raw patterns, deduped.
Order-preserving (simple expansions first, then advanced patterns in
the order the user supplied them) so operators see deterministic
patterns in API responses.
"""
seen: set[str] = set()
out: list[str] = []
for p in expand_simple_events(simple or []):
if p not in seen:
seen.add(p)
out.append(p)
for p in advanced or []:
if isinstance(p, str) and p and p not in seen:
seen.add(p)
out.append(p)
return out

View File

View File

@@ -0,0 +1,187 @@
"""CRUD tests for /api/v1/webhooks — admin-gated subscription management."""
from __future__ import annotations
import httpx
import pytest
PATH = "/api/v1/webhooks/"
@pytest.mark.asyncio
async def test_create_requires_patterns(client: httpx.AsyncClient, auth_token: str):
res = await client.post(
PATH,
json={"name": "wh1", "url": "https://example.com/x"},
headers={"Authorization": f"Bearer {auth_token}"},
)
assert res.status_code == 400, res.text
@pytest.mark.asyncio
async def test_create_expands_simple_events(
client: httpx.AsyncClient, auth_token: str
):
res = await client.post(
PATH,
json={
"name": "wh-simple",
"url": "https://example.com/x",
"simple_events": ["AttackerDetail"],
},
headers={"Authorization": f"Bearer {auth_token}"},
)
assert res.status_code == 201, res.text
body = res.json()
assert body["topic_patterns"] == ["attacker.>"]
# Create-path carries the secret for copy-out.
assert body["secret"]
assert len(body["secret"]) >= 16
@pytest.mark.asyncio
async def test_list_strips_secret(client: httpx.AsyncClient, auth_token: str):
await client.post(
PATH,
json={
"name": "wh-list",
"url": "https://example.com/x",
"topic_patterns": ["system.>"],
},
headers={"Authorization": f"Bearer {auth_token}"},
)
res = await client.get(
PATH, headers={"Authorization": f"Bearer {auth_token}"}
)
assert res.status_code == 200
rows = res.json()
assert len(rows) >= 1
for r in rows:
assert "secret" not in r
@pytest.mark.asyncio
async def test_get_single_strips_secret(
client: httpx.AsyncClient, auth_token: str
):
create = await client.post(
PATH,
json={
"name": "wh-one",
"url": "https://example.com/x",
"topic_patterns": ["decky.*.state"],
},
headers={"Authorization": f"Bearer {auth_token}"},
)
uuid = create.json()["uuid"]
res = await client.get(
PATH + uuid, headers={"Authorization": f"Bearer {auth_token}"}
)
assert res.status_code == 200
assert "secret" not in res.json()
@pytest.mark.asyncio
async def test_duplicate_name_conflicts(
client: httpx.AsyncClient, auth_token: str
):
payload = {
"name": "wh-dup",
"url": "https://example.com/x",
"topic_patterns": ["system.>"],
}
first = await client.post(
PATH, json=payload, headers={"Authorization": f"Bearer {auth_token}"}
)
assert first.status_code == 201
second = await client.post(
PATH, json=payload, headers={"Authorization": f"Bearer {auth_token}"}
)
assert second.status_code == 409
@pytest.mark.asyncio
async def test_patch_merges_patterns(
client: httpx.AsyncClient, auth_token: str
):
create = await client.post(
PATH,
json={
"name": "wh-patch",
"url": "https://example.com/x",
"simple_events": ["AttackerDetail"],
},
headers={"Authorization": f"Bearer {auth_token}"},
)
uuid = create.json()["uuid"]
res = await client.patch(
PATH + uuid,
json={"topic_patterns": ["custom.>"]},
headers={"Authorization": f"Bearer {auth_token}"},
)
assert res.status_code == 200
# simple_events was NOT passed → it's None → only raw patterns survive.
assert res.json()["topic_patterns"] == ["custom.>"]
@pytest.mark.asyncio
async def test_patch_refuses_empty_patterns(
client: httpx.AsyncClient, auth_token: str
):
create = await client.post(
PATH,
json={
"name": "wh-empty",
"url": "https://example.com/x",
"simple_events": ["AttackerDetail"],
},
headers={"Authorization": f"Bearer {auth_token}"},
)
uuid = create.json()["uuid"]
res = await client.patch(
PATH + uuid,
json={"simple_events": [], "topic_patterns": []},
headers={"Authorization": f"Bearer {auth_token}"},
)
assert res.status_code == 400
@pytest.mark.asyncio
async def test_delete_returns_message(
client: httpx.AsyncClient, auth_token: str
):
create = await client.post(
PATH,
json={
"name": "wh-del",
"url": "https://example.com/x",
"topic_patterns": ["system.>"],
},
headers={"Authorization": f"Bearer {auth_token}"},
)
uuid = create.json()["uuid"]
res = await client.delete(
PATH + uuid, headers={"Authorization": f"Bearer {auth_token}"}
)
assert res.status_code == 200
assert res.json() == {"message": "Webhook deleted"}
# Second delete → 404.
res2 = await client.delete(
PATH + uuid, headers={"Authorization": f"Bearer {auth_token}"}
)
assert res2.status_code == 404
@pytest.mark.asyncio
async def test_viewer_forbidden(client: httpx.AsyncClient, viewer_token: str):
res = await client.get(
PATH, headers={"Authorization": f"Bearer {viewer_token}"}
)
assert res.status_code == 403
@pytest.mark.asyncio
async def test_unauthenticated_rejected(client: httpx.AsyncClient):
res = await client.get(PATH)
assert res.status_code == 401

View File

View File

@@ -0,0 +1,145 @@
"""Unit tests for decnet.webhook.client — HMAC + retry policy."""
from __future__ import annotations
import hashlib
import hmac
import httpx
import pytest
from decnet.webhook.client import (
DeliveryResult,
SyntheticEvent,
build_payload,
deliver,
sign,
)
_EVENT = SyntheticEvent(
topic="attacker.observed",
type="first_sighting",
ts="2026-04-24T00:00:00+00:00",
id="11111111-1111-1111-1111-111111111111",
payload={"ip": "1.2.3.4"},
)
def _sub(url: str = "https://webhook.example/inbound", secret: str = "s" * 32) -> dict:
return {"uuid": "w1", "url": url, "secret": secret}
def test_sign_matches_known_vector():
body = b'{"hello":"world"}'
secret = "0123456789abcdef"
expected = (
"sha256="
+ hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
)
assert sign(secret, body) == expected
def test_build_payload_stable_key_order():
# Same input → same bytes → same HMAC, regardless of kwarg order.
b1 = build_payload(_EVENT)
b2 = build_payload(_EVENT)
assert b1 == b2
assert b'"topic":"attacker.observed"' in b1
assert b'"v":1' in b1
@pytest.mark.asyncio
async def test_deliver_success_on_2xx():
async def handler(request: httpx.Request) -> httpx.Response:
assert request.headers.get("X-DECNET-Signature", "").startswith("sha256=")
assert request.headers.get("X-DECNET-Event-Id") == _EVENT.id
return httpx.Response(200, json={"ok": True})
transport = httpx.MockTransport(handler)
async with httpx.AsyncClient(transport=transport) as client:
result = await deliver(_sub(), _EVENT, retry_schedule=[], client=client)
assert result == DeliveryResult(ok=True, status_code=200, attempts=1)
@pytest.mark.asyncio
async def test_deliver_no_retry_on_4xx():
calls = {"n": 0}
async def handler(request: httpx.Request) -> httpx.Response:
calls["n"] += 1
return httpx.Response(400, text="bad body")
transport = httpx.MockTransport(handler)
async with httpx.AsyncClient(transport=transport) as client:
result = await deliver(_sub(), _EVENT, retry_schedule=[1, 1, 1], client=client)
assert result.ok is False
assert result.status_code == 400
assert calls["n"] == 1 # no retry
@pytest.mark.asyncio
async def test_deliver_retries_on_429():
calls = {"n": 0}
async def handler(request: httpx.Request) -> httpx.Response:
calls["n"] += 1
if calls["n"] < 3:
return httpx.Response(429)
return httpx.Response(200)
transport = httpx.MockTransport(handler)
async with httpx.AsyncClient(transport=transport) as client:
result = await deliver(_sub(), _EVENT, retry_schedule=[0, 0], client=client)
assert result.ok is True
assert result.attempts == 3
@pytest.mark.asyncio
async def test_deliver_retries_on_5xx_then_gives_up():
async def handler(request: httpx.Request) -> httpx.Response:
return httpx.Response(503)
transport = httpx.MockTransport(handler)
async with httpx.AsyncClient(transport=transport) as client:
result = await deliver(_sub(), _EVENT, retry_schedule=[0, 0], client=client)
assert result.ok is False
assert result.status_code == 503
assert result.attempts == 3
@pytest.mark.asyncio
async def test_deliver_retries_on_connection_error():
async def handler(request: httpx.Request) -> httpx.Response:
raise httpx.ConnectError("boom")
transport = httpx.MockTransport(handler)
async with httpx.AsyncClient(transport=transport) as client:
result = await deliver(_sub(), _EVENT, retry_schedule=[0], client=client)
assert result.ok is False
assert result.status_code is None
assert "ConnectError" in (result.error or "")
assert result.attempts == 2
@pytest.mark.asyncio
async def test_deliver_receiver_can_verify_signature():
"""End-to-end: receiver recomputes HMAC over the posted body and matches ours."""
sub = _sub(secret="deadbeefdeadbeef")
captured: dict = {}
async def handler(request: httpx.Request) -> httpx.Response:
captured["body"] = request.content
captured["sig"] = request.headers["X-DECNET-Signature"]
return httpx.Response(200)
transport = httpx.MockTransport(handler)
async with httpx.AsyncClient(transport=transport) as client:
result = await deliver(sub, _EVENT, retry_schedule=[], client=client)
assert result.ok
expected = (
"sha256="
+ hmac.new(
sub["secret"].encode(), captured["body"], hashlib.sha256
).hexdigest()
)
assert captured["sig"] == expected

View File

@@ -0,0 +1,49 @@
"""Unit tests for decnet.webhook.enums — simple→patterns expansion."""
from decnet.webhook.enums import (
SIMPLE_EVENT_PATTERNS,
expand_simple_events,
merge_patterns,
)
def test_simple_event_patterns_covers_three_families():
assert set(SIMPLE_EVENT_PATTERNS) == {
"AttackerDetail",
"DeckyStatus",
"SystemStatus",
}
def test_expand_single_event():
assert expand_simple_events(["AttackerDetail"]) == ["attacker.>"]
def test_expand_multiple_events_concatenates():
out = expand_simple_events(["AttackerDetail", "DeckyStatus"])
assert out == ["attacker.>", "decky.*.state", "decky.*.traffic"]
def test_expand_unknown_event_dropped_silently():
# The Literal type on the router rejects unknowns; this guards against
# programmer error, not user input.
assert expand_simple_events(["NotAThing"]) == []
def test_merge_dedups_overlap():
merged = merge_patterns(["AttackerDetail"], ["attacker.>", "custom.>"])
assert merged == ["attacker.>", "custom.>"]
def test_merge_preserves_order_simple_first():
merged = merge_patterns(["SystemStatus"], ["attacker.>", "decky.*.state"])
assert merged == ["system.>", "attacker.>", "decky.*.state"]
def test_merge_empty_lists_returns_empty():
assert merge_patterns([], []) == []
assert merge_patterns(None, None) == []
def test_merge_drops_empty_strings_and_non_strings():
merged = merge_patterns([], ["", "attacker.>", None]) # type: ignore[list-item]
assert merged == ["attacker.>"]