Files
DECNET/tests/api/webhooks/test_crud.py
anti 638236113d feat(webhooks): non-blocking http:// warning + WH-03 accepted risk
WebhookResponse now carries a `warnings: list[str]` field. When the
subscription's URL starts with http://, an `insecure_url` advisory is
surfaced on every GET/CREATE without blocking the request. HMAC still
detects tampering regardless of transport — only read-confidentiality
is lost over plaintext — and test/dev environments without TLS stay
usable.

Matches the operator-trust posture already established by DA-06
(admin-on-admin protection is out of scope). The alternative — hard
rejection at admin time — was considered and declined; warning-plus-
visibility is the right shape.

THREAT_MODEL WH-03 accepted risk registered; revisit triggers are
multi-admin delegation, a regulated customer, or an operator ticket
asking for a DECNET_WEBHOOK_REQUIRE_HTTPS enforcement knob.
2026-04-24 15:53:30 -04:00

225 lines
6.3 KiB
Python

"""CRUD tests for /api/v1/webhooks — admin-gated subscription management."""
from __future__ import annotations
import httpx
import pytest
PATH = "/api/v1/webhooks/"
@pytest.mark.asyncio
async def test_create_requires_patterns(client: httpx.AsyncClient, auth_token: str):
res = await client.post(
PATH,
json={"name": "wh1", "url": "https://example.com/x"},
headers={"Authorization": f"Bearer {auth_token}"},
)
assert res.status_code == 400, res.text
@pytest.mark.asyncio
async def test_create_expands_simple_events(
client: httpx.AsyncClient, auth_token: str
):
res = await client.post(
PATH,
json={
"name": "wh-simple",
"url": "https://example.com/x",
"simple_events": ["AttackerDetail"],
},
headers={"Authorization": f"Bearer {auth_token}"},
)
assert res.status_code == 201, res.text
body = res.json()
assert body["topic_patterns"] == ["attacker.>"]
# Create-path carries the secret for copy-out.
assert body["secret"]
assert len(body["secret"]) >= 16
@pytest.mark.asyncio
async def test_list_strips_secret(client: httpx.AsyncClient, auth_token: str):
await client.post(
PATH,
json={
"name": "wh-list",
"url": "https://example.com/x",
"topic_patterns": ["system.>"],
},
headers={"Authorization": f"Bearer {auth_token}"},
)
res = await client.get(
PATH, headers={"Authorization": f"Bearer {auth_token}"}
)
assert res.status_code == 200
rows = res.json()
assert len(rows) >= 1
for r in rows:
assert "secret" not in r
@pytest.mark.asyncio
async def test_get_single_strips_secret(
client: httpx.AsyncClient, auth_token: str
):
create = await client.post(
PATH,
json={
"name": "wh-one",
"url": "https://example.com/x",
"topic_patterns": ["decky.*.state"],
},
headers={"Authorization": f"Bearer {auth_token}"},
)
uuid = create.json()["uuid"]
res = await client.get(
PATH + uuid, headers={"Authorization": f"Bearer {auth_token}"}
)
assert res.status_code == 200
assert "secret" not in res.json()
@pytest.mark.asyncio
async def test_duplicate_name_conflicts(
client: httpx.AsyncClient, auth_token: str
):
payload = {
"name": "wh-dup",
"url": "https://example.com/x",
"topic_patterns": ["system.>"],
}
first = await client.post(
PATH, json=payload, headers={"Authorization": f"Bearer {auth_token}"}
)
assert first.status_code == 201
second = await client.post(
PATH, json=payload, headers={"Authorization": f"Bearer {auth_token}"}
)
assert second.status_code == 409
@pytest.mark.asyncio
async def test_patch_merges_patterns(
client: httpx.AsyncClient, auth_token: str
):
create = await client.post(
PATH,
json={
"name": "wh-patch",
"url": "https://example.com/x",
"simple_events": ["AttackerDetail"],
},
headers={"Authorization": f"Bearer {auth_token}"},
)
uuid = create.json()["uuid"]
res = await client.patch(
PATH + uuid,
json={"topic_patterns": ["custom.>"]},
headers={"Authorization": f"Bearer {auth_token}"},
)
assert res.status_code == 200
# simple_events was NOT passed → it's None → only raw patterns survive.
assert res.json()["topic_patterns"] == ["custom.>"]
@pytest.mark.asyncio
async def test_patch_refuses_empty_patterns(
client: httpx.AsyncClient, auth_token: str
):
create = await client.post(
PATH,
json={
"name": "wh-empty",
"url": "https://example.com/x",
"simple_events": ["AttackerDetail"],
},
headers={"Authorization": f"Bearer {auth_token}"},
)
uuid = create.json()["uuid"]
res = await client.patch(
PATH + uuid,
json={"simple_events": [], "topic_patterns": []},
headers={"Authorization": f"Bearer {auth_token}"},
)
assert res.status_code == 400
@pytest.mark.asyncio
async def test_delete_returns_message(
client: httpx.AsyncClient, auth_token: str
):
create = await client.post(
PATH,
json={
"name": "wh-del",
"url": "https://example.com/x",
"topic_patterns": ["system.>"],
},
headers={"Authorization": f"Bearer {auth_token}"},
)
uuid = create.json()["uuid"]
res = await client.delete(
PATH + uuid, headers={"Authorization": f"Bearer {auth_token}"}
)
assert res.status_code == 200
assert res.json() == {"message": "Webhook deleted"}
# Second delete → 404.
res2 = await client.delete(
PATH + uuid, headers={"Authorization": f"Bearer {auth_token}"}
)
assert res2.status_code == 404
@pytest.mark.asyncio
async def test_http_url_warns_but_accepts(
client: httpx.AsyncClient, auth_token: str
):
"""Plain http:// is allowed (operator-trust posture per WH-03) but
surfaces a non-blocking advisory in the response's warnings list."""
res = await client.post(
PATH,
json={
"name": "wh-http",
"url": "http://insecure.local/inbound",
"topic_patterns": ["system.>"],
},
headers={"Authorization": f"Bearer {auth_token}"},
)
assert res.status_code == 201, res.text
body = res.json()
assert any("insecure_url" in w for w in body["warnings"])
@pytest.mark.asyncio
async def test_https_url_has_no_warning(
client: httpx.AsyncClient, auth_token: str
):
res = await client.post(
PATH,
json={
"name": "wh-https",
"url": "https://secure.example/inbound",
"topic_patterns": ["system.>"],
},
headers={"Authorization": f"Bearer {auth_token}"},
)
assert res.status_code == 201
assert res.json()["warnings"] == []
@pytest.mark.asyncio
async def test_viewer_forbidden(client: httpx.AsyncClient, viewer_token: str):
res = await client.get(
PATH, headers={"Authorization": f"Bearer {viewer_token}"}
)
assert res.status_code == 403
@pytest.mark.asyncio
async def test_unauthenticated_rejected(client: httpx.AsyncClient):
res = await client.get(PATH)
assert res.status_code == 401