Files
DECNET/tests/test_web_api_fuzz.py
anti b6b046c90b fix: harden startup security — require strong secrets, restrict CORS
- decnet/env.py: DECNET_JWT_SECRET and DECNET_ADMIN_PASSWORD are now
  required env vars; startup raises ValueError if unset or set to a
  known-bad default ("admin", "password", etc.)
- decnet/env.py: add DECNET_CORS_ORIGINS (comma-separated, defaults to
  http://localhost:8080) replacing the previous allow_origins=["*"]
- decnet/web/api.py: use DECNET_CORS_ORIGINS and tighten allow_methods
  and allow_headers to explicit lists
- tests/conftest.py: set required env vars at module level so test
  collection works without real credentials
- tests/test_web_api.py, test_web_api_fuzz.py: use DECNET_ADMIN_PASSWORD
  from env instead of hardcoded "admin"

Closes DEBT-001, DEBT-002, DEBT-004
2026-04-09 12:13:22 -04:00

111 lines
4.1 KiB
Python

import os
import pytest
import json
from typing import Generator, Any, Optional
from fastapi.testclient import TestClient
from hypothesis import given, strategies as st, settings, HealthCheck
import httpx
from decnet.web.api import app
from decnet.web.dependencies import repo
from decnet.env import DECNET_ADMIN_USER, DECNET_ADMIN_PASSWORD
# Re-use setup from test_web_api
@pytest.fixture(scope="function", autouse=True)
def setup_db() -> Generator[None, None, None]:
repo.db_path = "test_fuzz_decnet.db"
if os.path.exists(repo.db_path):
os.remove(repo.db_path)
repo.reinitialize()
yield
if os.path.exists(repo.db_path):
os.remove(repo.db_path)
# bcrypt is intentionally slow, so we disable/extend the deadline
_FUZZ_SETTINGS: dict[str, Any] = {
"max_examples": 50,
"deadline": None, # bcrypt hashing takes >200ms
"suppress_health_check": [HealthCheck.function_scoped_fixture]
}
@settings(**_FUZZ_SETTINGS)
@given(
username=st.text(min_size=0, max_size=2048),
password=st.text(min_size=0, max_size=2048)
)
def test_fuzz_login(username: str, password: str) -> None:
"""Fuzz the login endpoint with random strings (including non-ASCII)."""
with TestClient(app) as _client:
_payload: dict[str, str] = {"username": username, "password": password}
try:
_response: httpx.Response = _client.post("/api/v1/auth/login", json=_payload)
# 200, 401, or 422 are acceptable. 500 is a failure.
assert _response.status_code in (200, 401, 422)
except (UnicodeEncodeError, json.JSONDecodeError):
pass
@settings(**_FUZZ_SETTINGS)
@given(
old_password=st.text(min_size=0, max_size=2048),
new_password=st.text(min_size=0, max_size=2048)
)
def test_fuzz_change_password(old_password: str, new_password: str) -> None:
"""Fuzz the change-password endpoint with random strings."""
with TestClient(app) as _client:
# Get valid token first
_login_resp: httpx.Response = _client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD})
_token: str = _login_resp.json()["access_token"]
_payload: dict[str, str] = {"old_password": old_password, "new_password": new_password}
try:
_response: httpx.Response = _client.post(
"/api/v1/auth/change-password",
json=_payload,
headers={"Authorization": f"Bearer {_token}"}
)
assert _response.status_code in (200, 401, 422)
except (UnicodeEncodeError, json.JSONDecodeError):
pass
@settings(**_FUZZ_SETTINGS)
@given(
limit=st.integers(min_value=-2000, max_value=5000),
offset=st.integers(min_value=-2000, max_value=5000),
search=st.one_of(st.none(), st.text(max_size=2048))
)
def test_fuzz_get_logs(limit: int, offset: int, search: Optional[str]) -> None:
"""Fuzz the logs pagination and search."""
with TestClient(app) as _client:
_login_resp: httpx.Response = _client.post("/api/v1/auth/login", json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD})
_token: str = _login_resp.json()["access_token"]
_params: dict[str, Any] = {"limit": limit, "offset": offset}
if search is not None:
_params["search"] = search
_response: httpx.Response = _client.get(
"/api/v1/logs",
params=_params,
headers={"Authorization": f"Bearer {_token}"}
)
assert _response.status_code in (200, 422)
@settings(**_FUZZ_SETTINGS)
@given(
token=st.text(min_size=0, max_size=4096)
)
def test_fuzz_auth_header(token: str) -> None:
"""Fuzz the Authorization header with full unicode noise."""
with TestClient(app) as _client:
try:
_response: httpx.Response = _client.get(
"/api/v1/stats",
headers={"Authorization": f"Bearer {token}"}
)
assert _response.status_code in (401, 422)
except (UnicodeEncodeError, httpx.InvalidURL, httpx.CookieConflict):
# Expected client-side rejection of invalid header characters
pass