From 3945e72e110d0699479d3d348bf326d95c651cbb Mon Sep 17 00:00:00 2001 From: anti Date: Fri, 17 Apr 2026 14:52:22 -0400 Subject: [PATCH] perf: run bcrypt on a thread so it doesn't block the event loop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit verify_password / get_password_hash are CPU-bound and take ~250ms each at rounds=12. Called directly from async endpoints, they stall every other coroutine for that window — the single biggest single-worker bottleneck on the login path. Adds averify_password / ahash_password that wrap the sync versions in asyncio.to_thread. Sync versions stay put because _ensure_admin_user and tests still use them. 5 call sites updated: login, change-password, create-user, reset-password. tests/test_auth_async.py asserts parallel averify runs concurrently (~1x of a single verify, not 2x). --- .gitignore | 1 + README.md | 55 +++++++ decnet/web/auth.py | 10 ++ decnet/web/router/auth/api_change_pass.py | 6 +- decnet/web/router/auth/api_login.py | 4 +- decnet/web/router/config/api_manage_users.py | 6 +- development/DEVELOPMENT.md | 8 +- schemathesis.toml | 87 ++++++++++- tests/api/health/test_get_health.py | 9 ++ tests/api/test_schemathesis.py | 115 ++++++++++---- tests/stress/__init__.py | 0 tests/stress/conftest.py | 130 ++++++++++++++++ tests/stress/locustfile.py | 130 ++++++++++++++++ tests/stress/test_stress.py | 154 +++++++++++++++++++ tests/test_auth_async.py | 51 ++++++ 15 files changed, 724 insertions(+), 42 deletions(-) create mode 100644 tests/stress/__init__.py create mode 100644 tests/stress/conftest.py create mode 100644 tests/stress/locustfile.py create mode 100644 tests/stress/test_stress.py create mode 100644 tests/test_auth_async.py diff --git a/.gitignore b/.gitignore index b775e9c..fad29ae 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,4 @@ decnet.json .env.local .coverage .hypothesis/ +profiles/* diff --git a/README.md b/README.md index 5e52a67..652df92 100644 --- a/README.md +++ b/README.md @@ -508,6 +508,10 @@ DECNET_WEB_HOST=0.0.0.0 DECNET_WEB_PORT=8080 DECNET_ADMIN_USER=admin DECNET_ADMIN_PASSWORD=admin + +# Database pool tuning (applies to both SQLite and MySQL) +DECNET_DB_POOL_SIZE=20 # base pool connections (default: 20) +DECNET_DB_MAX_OVERFLOW=40 # extra connections under burst (default: 40) ``` Copy `.env.example` to `.env.local` and modify it to suit your environment. @@ -676,6 +680,57 @@ The test suite covers: Every new feature requires passing tests before merging. +### Stress Testing + +A [Locust](https://locust.io)-based stress test suite lives in `tests/stress/`. It hammers every API endpoint with realistic traffic patterns to find throughput ceilings and latency degradation. + +```bash +# Run via pytest (starts its own server) +pytest -m stress tests/stress/ -v -x -n0 -s + +# Crank it up +STRESS_USERS=2000 STRESS_SPAWN_RATE=200 STRESS_DURATION=120 pytest -m stress tests/stress/ -v -x -n0 -s + +# Standalone Locust web UI against a running server +locust -f tests/stress/locustfile.py --host http://localhost:8000 +``` + +| Env var | Default | Description | +|---|---|---| +| `STRESS_USERS` | `500` | Total simulated users | +| `STRESS_SPAWN_RATE` | `50` | Users spawned per second | +| `STRESS_DURATION` | `60` | Test duration in seconds | +| `STRESS_WORKERS` | CPU count (max 4) | Uvicorn workers for the test server | +| `STRESS_MIN_RPS` | `500` | Minimum RPS to pass baseline test | +| `STRESS_MAX_P99_MS` | `200` | Maximum p99 latency (ms) to pass | +| `STRESS_SPIKE_USERS` | `1000` | Users for thundering herd test | +| `STRESS_SUSTAINED_USERS` | `200` | Users for sustained load test | + +#### System tuning: open file limit + +Under heavy load (500+ concurrent users), the server will exhaust the default Linux open file limit (`ulimit -n`), causing `OSError: [Errno 24] Too many open files`. Most distros default to **1024**, which is far too low for stress testing or production use. + +**Before running stress tests:** + +```bash +# Check current limit +ulimit -n + +# Bump for this shell session +ulimit -n 65536 +``` + +**Permanent fix** — add to `/etc/security/limits.conf`: + +``` +* soft nofile 65536 +* hard nofile 65536 +``` + +Or for systemd-managed services, add `LimitNOFILE=65536` to the unit file. + +> This applies to production deployments too — any server handling hundreds of concurrent connections needs a raised file descriptor limit. + # AI Disclosure This project has been made with lots, and I mean lots of help from AIs. While most of the design was made by me, most of the coding was done by AI models. diff --git a/decnet/web/auth.py b/decnet/web/auth.py index 6ece1e3..81879c5 100644 --- a/decnet/web/auth.py +++ b/decnet/web/auth.py @@ -1,3 +1,4 @@ +import asyncio from datetime import datetime, timedelta, timezone from typing import Optional, Any import jwt @@ -24,6 +25,15 @@ def get_password_hash(password: str) -> str: return _hashed.decode("utf-8") +async def averify_password(plain_password: str, hashed_password: str) -> bool: + # bcrypt is CPU-bound and ~250ms/call; keep it off the event loop. + return await asyncio.to_thread(verify_password, plain_password, hashed_password) + + +async def ahash_password(password: str) -> str: + return await asyncio.to_thread(get_password_hash, password) + + def create_access_token(data: dict[str, Any], expires_delta: Optional[timedelta] = None) -> str: _to_encode: dict[str, Any] = data.copy() _expire: datetime diff --git a/decnet/web/router/auth/api_change_pass.py b/decnet/web/router/auth/api_change_pass.py index fec8bac..efca5bf 100644 --- a/decnet/web/router/auth/api_change_pass.py +++ b/decnet/web/router/auth/api_change_pass.py @@ -3,7 +3,7 @@ from typing import Any, Optional from fastapi import APIRouter, Depends, HTTPException, status from decnet.telemetry import traced as _traced -from decnet.web.auth import get_password_hash, verify_password +from decnet.web.auth import ahash_password, averify_password from decnet.web.dependencies import get_current_user_unchecked, repo from decnet.web.db.models import ChangePasswordRequest @@ -22,12 +22,12 @@ router = APIRouter() @_traced("api.change_password") async def change_password(request: ChangePasswordRequest, current_user: str = Depends(get_current_user_unchecked)) -> dict[str, str]: _user: Optional[dict[str, Any]] = await repo.get_user_by_uuid(current_user) - if not _user or not verify_password(request.old_password, _user["password_hash"]): + if not _user or not await averify_password(request.old_password, _user["password_hash"]): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect old password", ) - _new_hash: str = get_password_hash(request.new_password) + _new_hash: str = await ahash_password(request.new_password) await repo.update_user_password(current_user, _new_hash, must_change_password=False) return {"message": "Password updated successfully"} diff --git a/decnet/web/router/auth/api_login.py b/decnet/web/router/auth/api_login.py index 3c0030e..d3c1af7 100644 --- a/decnet/web/router/auth/api_login.py +++ b/decnet/web/router/auth/api_login.py @@ -6,8 +6,8 @@ from fastapi import APIRouter, HTTPException, status from decnet.telemetry import traced as _traced from decnet.web.auth import ( ACCESS_TOKEN_EXPIRE_MINUTES, + averify_password, create_access_token, - verify_password, ) from decnet.web.dependencies import repo from decnet.web.db.models import LoginRequest, Token @@ -28,7 +28,7 @@ router = APIRouter() @_traced("api.login") async def login(request: LoginRequest) -> dict[str, Any]: _user: Optional[dict[str, Any]] = await repo.get_user_by_username(request.username) - if not _user or not verify_password(request.password, _user["password_hash"]): + if not _user or not await averify_password(request.password, _user["password_hash"]): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", diff --git a/decnet/web/router/config/api_manage_users.py b/decnet/web/router/config/api_manage_users.py index 12263ab..976c810 100644 --- a/decnet/web/router/config/api_manage_users.py +++ b/decnet/web/router/config/api_manage_users.py @@ -3,7 +3,7 @@ import uuid as _uuid from fastapi import APIRouter, Depends, HTTPException from decnet.telemetry import traced as _traced -from decnet.web.auth import get_password_hash +from decnet.web.auth import ahash_password from decnet.web.dependencies import require_admin, repo from decnet.web.db.models import ( CreateUserRequest, @@ -39,7 +39,7 @@ async def api_create_user( await repo.create_user({ "uuid": user_uuid, "username": req.username, - "password_hash": get_password_hash(req.password), + "password_hash": await ahash_password(req.password), "role": req.role, "must_change_password": True, # nosec B105 — not a password }) @@ -125,7 +125,7 @@ async def api_reset_user_password( await repo.update_user_password( user_uuid, - get_password_hash(req.new_password), + await ahash_password(req.new_password), must_change_password=True, ) return {"message": "Password reset successfully"} diff --git a/development/DEVELOPMENT.md b/development/DEVELOPMENT.md index 8aea107..882fe1d 100644 --- a/development/DEVELOPMENT.md +++ b/development/DEVELOPMENT.md @@ -79,7 +79,7 @@ ## Services & Realism -- [ ] **HTTPS/TLS support** — Honeypots with SSL certificates. +- [x] **HTTPS/TLS support** — Honeypots with SSL certificates. - [ ] **Fake Active Directory** — Convincing AD/LDAP emulation. - [ ] **Realistic web apps** — Fake WordPress, Grafana, and phpMyAdmin templates. - [ ] **OT/ICS profiles** — Expanded Modbus, DNP3, and BACnet support. @@ -140,3 +140,9 @@ - [x] **Strict Typing** — Project-wide enforcement of PEP 484 type hints. - [ ] **Plugin SDK docs** — Documentation for adding custom services. - [ ] **Config generator wizard** — `decnet wizard` for interactive setup. + +## API Improvements + +- [ ] Enable up to 250 concurrent users with close to zero performance degradation. +- [ ] Enable up to 100 requests per second with close to zero performance degradation. + diff --git a/schemathesis.toml b/schemathesis.toml index 1091856..e1f5a9a 100644 --- a/schemathesis.toml +++ b/schemathesis.toml @@ -1,9 +1,84 @@ +# Run: schemathesis run http://127.0.0.1:${DECNET_API_PORT}/openapi.json +# Or: schemathesis run --config schemathesis.toml http://127.0.0.1:8000/openapi.json + [[project]] -title = "DECNET API" -continue-on-failure = true -request-timeout = 5.0 +title = "DECNET API" +continue-on-failure = true +request-timeout = 10.0 +#suppress-health-check = ["too_slow", "data_too_large", "filter_too_much", "large_base_example"] +workers = "auto" + +# ── Generation: throw everything at it ─────────────────────────────────────── +[generation] +mode = "all" # valid AND invalid inputs +max-examples = 500 # 5× the default +no-shrink = false # keep shrinking — you want minimal repros +allow-x00 = true # null bytes in strings +unique-inputs = true # no duplicate test cases +codec = "utf-8" # full unicode range +maximize = "response_time" # targeted: hunt for slow paths too + +# ── All phases on ───────────────────────────────────────────────────────────── +[phases.examples] +enabled = true +fill-missing = true # generate random cases even where no examples exist + +[phases.coverage] +enabled = true +generate-duplicate-query-parameters = true # e.g. ?x=1&x=2 edge cases +unexpected-methods = ["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS", "TRACE"] + +[phases.fuzzing] +enabled = true + +[phases.stateful] +enabled = true +max-steps = 20 + +# ── Every check enabled ─────────────────────────────────────────────────────── +[checks] +not_a_server_error.enabled = true +status_code_conformance.enabled = true +content_type_conformance.enabled = true +response_headers_conformance.enabled = true +response_schema_conformance.enabled = true +positive_data_acceptance.enabled = true +negative_data_rejection.enabled = true +missing_required_header.enabled = true +unsupported_method.enabled = true +use_after_free.enabled = true +ensure_resource_availability.enabled = true +ignored_auth.enabled = true +max_response_time = 2.0 # anything slower than 2s is a failure + +# ── Per-operation timeouts ──────────────────────────────────────────────────── +# Auth — must be instant +[[operations]] +include-operation-id = "login_api_v1_auth_login_post" +request-timeout = 3.0 [[operations]] -# Target your SSE endpoint specifically -include-path = "/stream" -request-timeout = 2.0 +include-operation-id = "change_password_api_v1_auth_change_password_post" +request-timeout = 3.0 + +# Deploy — expensive by design, give it room but not infinite +[[operations]] +include-operation-id = "api_deploy_deckies_api_v1_deckies_deploy_post" +request-timeout = 30.0 +checks.max_response_time = 30.0 # override the global 2s threshold for this op + +# Mutate — engine work, allow some slack +[[operations]] +include-operation-id = "api_mutate_decky_api_v1_deckies__decky_name__mutate_post" +request-timeout = 15.0 +checks.max_response_time = 15.0 + +# SSE stream — must not block the suite +[[operations]] +include-operation-id = "stream_events_api_v1_stream_get" +request-timeout = 2.0 + +# Reinit — destructive, assert it never 500s regardless of state +[[operations]] +include-operation-id = "api_reinit_api_v1_config_reinit_delete" +request-timeout = 10.0 diff --git a/tests/api/health/test_get_health.py b/tests/api/health/test_get_health.py index e5e521e..75f8a65 100644 --- a/tests/api/health/test_get_health.py +++ b/tests/api/health/test_get_health.py @@ -4,6 +4,15 @@ from unittest.mock import AsyncMock, MagicMock, patch import httpx import pytest +from decnet.web.router.health.api_get_health import _reset_docker_cache + + +@pytest.fixture(autouse=True) +def _clear_docker_cache(): + _reset_docker_cache() + yield + _reset_docker_cache() + @pytest.mark.anyio async def test_health_requires_auth(client: httpx.AsyncClient) -> None: diff --git a/tests/api/test_schemathesis.py b/tests/api/test_schemathesis.py index 9c000bd..938d92a 100644 --- a/tests/api/test_schemathesis.py +++ b/tests/api/test_schemathesis.py @@ -1,17 +1,24 @@ """ -Schemathesis contract tests. - -Generates requests from the OpenAPI spec and verifies that no input causes a 5xx. - -Currently scoped to `not_a_server_error` only — full response-schema conformance -(including undocumented 401 responses) is blocked by DEBT-020 (missing error -response declarations across all protected endpoints). Once DEBT-020 is resolved, -replace the checks list with the default (remove the argument) for full compliance. +Schemathesis contract tests — full compliance, all checks enabled. Requires DECNET_DEVELOPER=true (set in tests/conftest.py) to expose /openapi.json. """ import pytest import schemathesis as st +from schemathesis.checks import not_a_server_error +from schemathesis.specs.openapi.checks import ( + status_code_conformance, + content_type_conformance, + response_headers_conformance, + response_schema_conformance, + positive_data_acceptance, + negative_data_rejection, + missing_required_header, + unsupported_method, + use_after_free, + ensure_resource_availability, + ignored_auth, +) from hypothesis import settings, Verbosity, HealthCheck from decnet.web.auth import create_access_token @@ -24,49 +31,65 @@ import time from datetime import datetime, timezone from pathlib import Path + def _free_port() -> int: - """Bind to port 0, let the OS pick a free port, return it.""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", 0)) return s.getsockname()[1] -# Configuration for the automated live server + LIVE_PORT = _free_port() LIVE_SERVER_URL = f"http://127.0.0.1:{LIVE_PORT}" TEST_SECRET = "test-secret-for-automated-fuzzing" -# Standardize the secret for the test process too so tokens can be verified import decnet.web.auth decnet.web.auth.SECRET_KEY = TEST_SECRET -# Create a valid token for an admin-like user TEST_TOKEN = create_access_token({"uuid": "00000000-0000-0000-0000-000000000001"}) +ALL_CHECKS = ( + not_a_server_error, + status_code_conformance, + content_type_conformance, + response_headers_conformance, + response_schema_conformance, + positive_data_acceptance, + negative_data_rejection, + missing_required_header, + unsupported_method, + use_after_free, + ensure_resource_availability, +) + +AUTH_CHECKS = ( + not_a_server_error, + ignored_auth, +) + + @st.hook def before_call(context, case, *args): - # Logged-in admin for all requests case.headers = case.headers or {} case.headers["Authorization"] = f"Bearer {TEST_TOKEN}" - # Force SSE stream to close after the initial snapshot so the test doesn't hang if case.path and case.path.endswith("/stream"): case.query = case.query or {} case.query["maxOutput"] = 0 -def wait_for_port(port, timeout=10): - start_time = time.time() - while time.time() - start_time < timeout: + +def wait_for_port(port: int, timeout: float = 10.0) -> bool: + deadline = time.time() + timeout + while time.time() < deadline: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: - if sock.connect_ex(('127.0.0.1', port)) == 0: + if sock.connect_ex(("127.0.0.1", port)) == 0: return True time.sleep(0.2) return False -def start_automated_server(): - # Use the current venv's uvicorn + +def start_automated_server() -> subprocess.Popen: uvicorn_bin = "uvicorn" if os.name != "nt" else "uvicorn.exe" uvicorn_path = str(Path(sys.executable).parent / uvicorn_bin) - # Force developer and contract test modes for the sub-process env = os.environ.copy() env["DECNET_DEVELOPER"] = "true" env["DECNET_CONTRACT_TEST"] = "true" @@ -78,13 +101,18 @@ def start_automated_server(): log_file = open(log_dir / f"fuzz_server_{LIVE_PORT}_{ts}.log", "w") proc = subprocess.Popen( - [uvicorn_path, "decnet.web.api:app", "--host", "127.0.0.1", "--port", str(LIVE_PORT), "--log-level", "info"], + [ + uvicorn_path, + "decnet.web.api:app", + "--host", "127.0.0.1", + "--port", str(LIVE_PORT), + "--log-level", "info", + ], env=env, stdout=log_file, stderr=log_file, ) - # Register cleanup atexit.register(proc.terminate) atexit.register(log_file.close) @@ -94,14 +122,47 @@ def start_automated_server(): return proc -# Stir up the server! + _server_proc = start_automated_server() -# Now Schemathesis can pull the schema from the real network port schema = st.openapi.from_url(f"{LIVE_SERVER_URL}/openapi.json") + @pytest.mark.fuzz @st.pytest.parametrize(api=schema) -@settings(max_examples=3000, deadline=None, verbosity=Verbosity.debug, suppress_health_check=[HealthCheck.filter_too_much]) +@settings( + max_examples=3000, + deadline=None, + verbosity=Verbosity.debug, + suppress_health_check=[ + HealthCheck.filter_too_much, + HealthCheck.too_slow, + HealthCheck.data_too_large, + ], +) def test_schema_compliance(case): - case.call_and_validate() + """Full contract test: valid + invalid inputs, all response checks.""" + case.call_and_validate(checks=ALL_CHECKS) + + +@pytest.mark.fuzz +@st.pytest.parametrize(api=schema) +@settings( + max_examples=500, + deadline=None, + verbosity=Verbosity.normal, + suppress_health_check=[ + HealthCheck.filter_too_much, + HealthCheck.too_slow, + ], +) +def test_auth_enforcement(case): + """Verify every protected endpoint rejects requests with no token.""" + case.headers = { + k: v for k, v in (case.headers or {}).items() + if k.lower() != "authorization" + } + if case.path and case.path.endswith("/stream"): + case.query = case.query or {} + case.query["maxOutput"] = 0 + case.call_and_validate(checks=AUTH_CHECKS) diff --git a/tests/stress/__init__.py b/tests/stress/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/stress/conftest.py b/tests/stress/conftest.py new file mode 100644 index 0000000..95a5bd7 --- /dev/null +++ b/tests/stress/conftest.py @@ -0,0 +1,130 @@ +""" +Stress-test fixtures: real uvicorn server + programmatic Locust runner. +""" + +import multiprocessing +import os +import sys +import time +import socket +import signal +import subprocess + +import pytest +import requests + + +# --------------------------------------------------------------------------- +# Configuration (env-var driven for CI flexibility) +# --------------------------------------------------------------------------- +STRESS_USERS = int(os.environ.get("STRESS_USERS", "500")) +STRESS_SPAWN_RATE = int(os.environ.get("STRESS_SPAWN_RATE", "50")) +STRESS_DURATION = int(os.environ.get("STRESS_DURATION", "60")) +STRESS_WORKERS = int(os.environ.get("STRESS_WORKERS", str(min(multiprocessing.cpu_count(), 4)))) + +ADMIN_USER = "admin" +ADMIN_PASS = "test-password-123" +JWT_SECRET = "stable-test-secret-key-at-least-32-chars-long" + + +def _free_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("127.0.0.1", 0)) + return s.getsockname()[1] + + +def _wait_for_server(url: str, timeout: float = 15.0) -> None: + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + r = requests.get(url, timeout=2) + if r.status_code in (200, 503): + return + except requests.ConnectionError: + pass + time.sleep(0.1) + raise TimeoutError(f"Server not ready at {url}") + + +@pytest.fixture(scope="session") +def stress_server(): + """Start a real uvicorn server for stress testing.""" + port = _free_port() + env = { + **os.environ, + "DECNET_JWT_SECRET": JWT_SECRET, + "DECNET_ADMIN_PASSWORD": ADMIN_PASS, + "DECNET_DEVELOPER": "true", + "DECNET_DEVELOPER_TRACING": "false", + "DECNET_DB_TYPE": "sqlite", + } + proc = subprocess.Popen( + [ + sys.executable, "-m", "uvicorn", + "decnet.web.api:app", + "--host", "127.0.0.1", + "--port", str(port), + "--workers", str(STRESS_WORKERS), + "--log-level", "warning", + ], + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + base_url = f"http://127.0.0.1:{port}" + try: + _wait_for_server(f"{base_url}/api/v1/health") + yield base_url + finally: + proc.terminate() + try: + proc.wait(timeout=10) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait() + + +@pytest.fixture(scope="session") +def stress_token(stress_server): + """Authenticate and return a valid admin JWT.""" + url = stress_server + resp = requests.post( + f"{url}/api/v1/auth/login", + json={"username": ADMIN_USER, "password": ADMIN_PASS}, + ) + assert resp.status_code == 200, f"Login failed: {resp.text}" + token = resp.json()["access_token"] + + # Clear must_change_password + requests.post( + f"{url}/api/v1/auth/change-password", + json={"old_password": ADMIN_PASS, "new_password": ADMIN_PASS}, + headers={"Authorization": f"Bearer {token}"}, + ) + # Re-login for clean token + resp2 = requests.post( + f"{url}/api/v1/auth/login", + json={"username": ADMIN_USER, "password": ADMIN_PASS}, + ) + return resp2.json()["access_token"] + + +def run_locust(host, users, spawn_rate, duration): + """Run Locust programmatically and return the Environment with stats.""" + import gevent + from locust.env import Environment + from locust.stats import stats_printer, stats_history, StatsCSVFileWriter + from tests.stress.locustfile import DecnetUser + + env = Environment(user_classes=[DecnetUser], host=host) + env.create_local_runner() + + env.runner.start(users, spawn_rate=spawn_rate) + + # Let it run for the specified duration + gevent.sleep(duration) + + env.runner.quit() + env.runner.greenlet.join(timeout=10) + + return env diff --git a/tests/stress/locustfile.py b/tests/stress/locustfile.py new file mode 100644 index 0000000..bf5089e --- /dev/null +++ b/tests/stress/locustfile.py @@ -0,0 +1,130 @@ +""" +Locust user class for DECNET API stress testing. + +Hammers every endpoint from the OpenAPI spec with realistic traffic weights. +Can be used standalone (`locust -f tests/stress/locustfile.py`) or +programmatically via the pytest fixtures in conftest.py. +""" + +import os +import random +import time + +from locust import HttpUser, task, between + + +ADMIN_USER = os.environ.get("DECNET_ADMIN_USER", "admin") +ADMIN_PASS = os.environ.get("DECNET_ADMIN_PASSWORD", "admin") + +_MAX_LOGIN_RETRIES = 5 +_LOGIN_BACKOFF_BASE = 0.5 # seconds, doubles each retry + + +class DecnetUser(HttpUser): + wait_time = between(0.01, 0.05) # near-zero think time — max pressure + + def _login_with_retry(self): + """Login with exponential backoff — handles connection storms.""" + for attempt in range(_MAX_LOGIN_RETRIES): + resp = self.client.post( + "/api/v1/auth/login", + json={"username": ADMIN_USER, "password": ADMIN_PASS}, + name="/api/v1/auth/login [on_start]", + ) + if resp.status_code == 200: + return resp.json()["access_token"] + # Status 0 = connection refused, retry with backoff + if resp.status_code == 0 or resp.status_code >= 500: + time.sleep(_LOGIN_BACKOFF_BASE * (2 ** attempt)) + continue + raise RuntimeError(f"Login failed (non-retryable): {resp.status_code} {resp.text}") + raise RuntimeError(f"Login failed after {_MAX_LOGIN_RETRIES} retries (last status: {resp.status_code})") + + def on_start(self): + token = self._login_with_retry() + + # Clear must_change_password + self.client.post( + "/api/v1/auth/change-password", + json={"old_password": ADMIN_PASS, "new_password": ADMIN_PASS}, + headers={"Authorization": f"Bearer {token}"}, + ) + # Re-login for a clean token + self.token = self._login_with_retry() + self.client.headers.update({"Authorization": f"Bearer {self.token}"}) + + # --- Read-hot paths (high weight) --- + + @task(10) + def get_stats(self): + self.client.get("/api/v1/stats") + + @task(8) + def get_logs(self): + self.client.get("/api/v1/logs", params={"limit": 50}) + + @task(8) + def get_attackers(self): + self.client.get("/api/v1/attackers") + + @task(7) + def get_deckies(self): + self.client.get("/api/v1/deckies") + + @task(6) + def get_bounties(self): + self.client.get("/api/v1/bounty") + + @task(5) + def get_logs_histogram(self): + self.client.get("/api/v1/logs/histogram") + + @task(5) + def search_logs(self): + self.client.get("/api/v1/logs", params={"search": "ssh", "limit": 100}) + + @task(4) + def search_attackers(self): + self.client.get( + "/api/v1/attackers", params={"search": "brute", "sort_by": "recent"} + ) + + @task(4) + def paginate_logs(self): + offset = random.randint(0, 1000) + self.client.get("/api/v1/logs", params={"limit": 100, "offset": offset}) + + @task(3) + def get_health(self): + self.client.get("/api/v1/health") + + @task(3) + def get_config(self): + self.client.get("/api/v1/config") + + # --- Write / auth paths (low weight) --- + + @task(2) + def login(self): + self.client.post( + "/api/v1/auth/login", + json={"username": ADMIN_USER, "password": ADMIN_PASS}, + ) + + @task(1) + def stream_sse(self): + """Short-lived SSE connection — read a few bytes then close.""" + with self.client.get( + "/api/v1/stream", + params={"maxOutput": 3}, + stream=True, + catch_response=True, + name="/api/v1/stream", + ) as resp: + if resp.status_code == 200: + # Read up to 4KB then bail — we're stress-testing connection setup + for chunk in resp.iter_content(chunk_size=1024): + break + resp.success() + else: + resp.failure(f"SSE returned {resp.status_code}") diff --git a/tests/stress/test_stress.py b/tests/stress/test_stress.py new file mode 100644 index 0000000..3668dce --- /dev/null +++ b/tests/stress/test_stress.py @@ -0,0 +1,154 @@ +""" +Locust-based stress tests for the DECNET API. + +Run: pytest -m stress tests/stress/ -v -x -n0 +Tune: STRESS_USERS=2000 STRESS_SPAWN_RATE=200 STRESS_DURATION=120 pytest -m stress ... +""" + +import os + +import pytest + +from tests.stress.conftest import run_locust, STRESS_USERS, STRESS_SPAWN_RATE, STRESS_DURATION + + +# Assertion thresholds (overridable via env) +MIN_RPS = int(os.environ.get("STRESS_MIN_RPS", "500")) +MAX_P99_MS = int(os.environ.get("STRESS_MAX_P99_MS", "200")) +MAX_FAIL_RATE = float(os.environ.get("STRESS_MAX_FAIL_RATE", "0.01")) # 1% + + +def _print_stats(env, label=""): + """Print a compact stats summary table.""" + total = env.stats.total + num_reqs = total.num_requests + num_fails = total.num_failures + fail_pct = (num_fails / num_reqs * 100) if num_reqs else 0 + rps = total.total_rps + + print(f"\n{'=' * 70}") + if label: + print(f" {label}") + print(f"{'=' * 70}") + print(f" {'Metric':<30} {'Value':>15}") + print(f" {'-' * 45}") + print(f" {'Total requests':<30} {num_reqs:>15,}") + print(f" {'Failures':<30} {num_fails:>15,} ({fail_pct:.2f}%)") + print(f" {'RPS (total)':<30} {rps:>15.1f}") + print(f" {'Avg latency (ms)':<30} {total.avg_response_time:>15.1f}") + print(f" {'p50 (ms)':<30} {total.get_response_time_percentile(0.50) or 0:>15.0f}") + print(f" {'p95 (ms)':<30} {total.get_response_time_percentile(0.95) or 0:>15.0f}") + print(f" {'p99 (ms)':<30} {total.get_response_time_percentile(0.99) or 0:>15.0f}") + print(f" {'Min (ms)':<30} {total.min_response_time:>15.0f}") + print(f" {'Max (ms)':<30} {total.max_response_time:>15.0f}") + print(f"{'=' * 70}") + + # Per-endpoint breakdown + print(f"\n {'Endpoint':<45} {'Reqs':>8} {'Fails':>8} {'Avg(ms)':>10} {'p99(ms)':>10}") + print(f" {'-' * 81}") + for entry in sorted(env.stats.entries.values(), key=lambda e: e.num_requests, reverse=True): + p99 = entry.get_response_time_percentile(0.99) or 0 + print( + f" {entry.method + ' ' + entry.name:<45} " + f"{entry.num_requests:>8,} " + f"{entry.num_failures:>8,} " + f"{entry.avg_response_time:>10.1f} " + f"{p99:>10.0f}" + ) + print() + + +@pytest.mark.stress +def test_stress_rps_baseline(stress_server): + """Baseline throughput: ramp to STRESS_USERS users, sustain for STRESS_DURATION seconds. + + Asserts: + - RPS exceeds MIN_RPS + - p99 latency < MAX_P99_MS + - Failure rate < MAX_FAIL_RATE + """ + env = run_locust( + host=stress_server, + users=STRESS_USERS, + spawn_rate=STRESS_SPAWN_RATE, + duration=STRESS_DURATION, + ) + _print_stats(env, f"BASELINE: {STRESS_USERS} users, {STRESS_DURATION}s") + + total = env.stats.total + num_reqs = total.num_requests + assert num_reqs > 0, "No requests were made" + + rps = total.total_rps + fail_rate = total.num_failures / num_reqs if num_reqs else 1.0 + p99 = total.get_response_time_percentile(0.99) or 0 + + assert rps >= MIN_RPS, f"RPS {rps:.1f} below minimum {MIN_RPS}" + assert p99 <= MAX_P99_MS, f"p99 {p99:.0f}ms exceeds max {MAX_P99_MS}ms" + assert fail_rate <= MAX_FAIL_RATE, f"Failure rate {fail_rate:.2%} exceeds max {MAX_FAIL_RATE:.2%}" + + +@pytest.mark.stress +def test_stress_spike(stress_server): + """Thundering herd: ramp from 0 to 1000 users in 5 seconds. + + Asserts: no 5xx errors (failure rate < 2%). + """ + spike_users = int(os.environ.get("STRESS_SPIKE_USERS", "1000")) + spike_spawn = spike_users // 5 # all users in ~5 seconds + + env = run_locust( + host=stress_server, + users=spike_users, + spawn_rate=spike_spawn, + duration=15, # 5s ramp + 10s sustained + ) + _print_stats(env, f"SPIKE: 0 -> {spike_users} users in 5s") + + total = env.stats.total + num_reqs = total.num_requests + assert num_reqs > 0, "No requests were made" + + fail_rate = total.num_failures / num_reqs + assert fail_rate < 0.02, f"Spike failure rate {fail_rate:.2%} — server buckled under thundering herd" + + +@pytest.mark.stress +def test_stress_sustained(stress_server): + """Sustained load: 200 users for 30s. Checks latency doesn't degrade >3x. + + Runs two phases: + 1. Warm-up (10s) to get baseline latency + 2. Sustained (30s) to check for degradation + """ + sustained_users = int(os.environ.get("STRESS_SUSTAINED_USERS", "200")) + + # Phase 1: warm-up baseline + env_warmup = run_locust( + host=stress_server, + users=sustained_users, + spawn_rate=sustained_users, # instant ramp + duration=10, + ) + baseline_avg = env_warmup.stats.total.avg_response_time + _print_stats(env_warmup, f"SUSTAINED warm-up: {sustained_users} users, 10s") + + # Phase 2: sustained + env_sustained = run_locust( + host=stress_server, + users=sustained_users, + spawn_rate=sustained_users, + duration=30, + ) + sustained_avg = env_sustained.stats.total.avg_response_time + _print_stats(env_sustained, f"SUSTAINED main: {sustained_users} users, 30s") + + assert env_sustained.stats.total.num_requests > 0, "No requests during sustained phase" + + if baseline_avg > 0: + degradation = sustained_avg / baseline_avg + print(f"\n Latency degradation factor: {degradation:.2f}x (baseline {baseline_avg:.1f}ms -> sustained {sustained_avg:.1f}ms)") + assert degradation < 3.0, ( + f"Latency degraded {degradation:.1f}x under sustained load " + f"(baseline {baseline_avg:.1f}ms -> {sustained_avg:.1f}ms)" + ) diff --git a/tests/test_auth_async.py b/tests/test_auth_async.py new file mode 100644 index 0000000..eb80777 --- /dev/null +++ b/tests/test_auth_async.py @@ -0,0 +1,51 @@ +""" +averify_password / ahash_password run bcrypt on a thread so the event +loop can serve other requests while hashing. Contract: they must produce +identical results to the sync versions. +""" +import pytest + +from decnet.web.auth import ( + ahash_password, + averify_password, + get_password_hash, + verify_password, +) + + +@pytest.mark.asyncio +async def test_ahash_matches_sync_hash_verify(): + hashed = await ahash_password("hunter2") + assert verify_password("hunter2", hashed) + assert not verify_password("wrong", hashed) + + +@pytest.mark.asyncio +async def test_averify_matches_sync_verify(): + hashed = get_password_hash("s3cret") + assert await averify_password("s3cret", hashed) is True + assert await averify_password("s3cre", hashed) is False + + +@pytest.mark.asyncio +async def test_averify_does_not_block_loop(): + """Two concurrent averify calls should run in parallel (on threads). + + With `asyncio.to_thread`, total wall time is ~max(a, b), not a+b. + """ + import asyncio, time + + hashed = get_password_hash("x") + t0 = time.perf_counter() + a, b = await asyncio.gather( + averify_password("x", hashed), + averify_password("x", hashed), + ) + elapsed = time.perf_counter() - t0 + assert a and b + # Sequential would be ~2× a single verify. Parallel on threads is ~1×. + # Single verify is ~250ms at rounds=12. Allow slack for CI noise. + single = time.perf_counter() + verify_password("x", hashed) + single_time = time.perf_counter() - single + assert elapsed < 1.7 * single_time, f"concurrent {elapsed:.3f}s vs single {single_time:.3f}s"