fix(tests/stress): eliminate 0-request flakes in locust runs

Three independent issues conspired to make stress tests record 0 requests:

1. Every virtual user did /auth/login in on_start. With 1000 users in a
   spike window, bcrypt-bound logins never finished and on_start failed
   for all users — aggregated requests stayed at 0. Pre-fetch a single
   admin token in the fixture (cached per-host) and pass it via
   DECNET_STRESS_TOKEN so locust users skip the login storm.

2. Locust exits non-zero on any request failure by default, causing
   run_locust to throw away an otherwise valid stats CSV. Pass
   --exit-code-on-error 0 so per-test assertions are the only fail gate.

3. test_stress_sustained ran two locust subprocesses against the same
   uvicorn. Phase 1's keep-alive connections wedged phase 2 into 0
   recorded requests ~2/3 of the time. Refactored stress_server into a
   start_stress_server() context manager and gave each phase its own
   uvicorn.

Stable 3/3 on full suite, 3/3 on test_stress_sustained alone.
This commit is contained in:
2026-04-28 13:01:11 -04:00
parent 681931d9bb
commit cc6abf7256
3 changed files with 124 additions and 47 deletions

View File

@@ -57,12 +57,12 @@ def _wait_for_server(url: str, timeout: float = 60.0) -> None:
raise TimeoutError(f"Server not ready at {url}")
@pytest.fixture(scope="function")
def stress_server():
# Function-scoped: every stress test gets its own clean uvicorn. Sharing
# a server across baseline → spike → sustained left the later runs with
# a half-dead pool (0-request symptom). Cost is ~5s of startup per test.
"""Start a real uvicorn server for stress testing."""
from contextlib import contextmanager
@contextmanager
def _start_stress_server():
"""Spawn a uvicorn for stress testing; yield base_url; tear down on exit."""
port = _free_port()
env = {k: v for k, v in os.environ.items() if not k.startswith("DECNET_")}
env.update({
@@ -110,6 +110,10 @@ def stress_server():
)
yield base_url
finally:
# Drop any cached admin token for the dying host so the next server
# gets a fresh login instead of presenting a JWT signed against a
# stale process state.
_TOKEN_CACHE.pop(base_url, None)
proc.terminate()
try:
proc.wait(timeout=10)
@@ -118,6 +122,22 @@ def stress_server():
proc.wait()
@pytest.fixture(scope="function")
def stress_server():
# Function-scoped: every stress test gets its own clean uvicorn. Sharing
# a server across baseline → spike → sustained left the later runs with
# a half-dead pool (0-request symptom). Cost is ~5s of startup per test.
"""Start a real uvicorn server for stress testing."""
with _start_stress_server() as base_url:
yield base_url
# Re-exported so tests can spin up additional servers within a single test
# (e.g. test_stress_sustained needs a fresh uvicorn between phases — phase 1
# leaves keep-alive connections that wedge phase 2 into 0 recorded requests).
start_stress_server = _start_stress_server
@pytest.fixture
def stress_token(stress_server):
"""Authenticate and return a valid admin JWT."""
@@ -224,6 +244,58 @@ def _parse_locust_csv(stats_csv: Path) -> _LocustEnv:
return _LocustEnv(_Stats(total, entries))
_TOKEN_CACHE: dict[str, str] = {}
def _login_once(host: str, timeout: float) -> dict:
last_exc: Exception | None = None
# Retry through transient drain windows: between phases the API is
# still flushing connections from the prior locust run, so the first
# POST can sit in-queue past a single 15s timeout.
for attempt in range(5):
try:
resp = requests.post(
f"{host}/api/v1/auth/login",
json={"username": ADMIN_USER, "password": ADMIN_PASS},
timeout=timeout,
)
resp.raise_for_status()
return resp.json()
except (requests.Timeout, requests.ConnectionError) as e:
last_exc = e
time.sleep(2 ** attempt)
raise RuntimeError(f"admin login failed after retries: {last_exc!r}")
def _fetch_admin_token(host: str) -> str:
"""Pre-fetch an admin token so locust virtual users don't all stampede
/auth/login on_start. Bcrypt is CPU-bound; 1000 simultaneous logins under
a 15s spike window means no user ever completes on_start and aggregated
request count is 0.
Cached per-host: the token is reusable across phases, so we don't pay
a fresh /auth/login round-trip while a previous run is still draining.
"""
cached = _TOKEN_CACHE.get(host)
if cached:
return cached
body = _login_once(host, timeout=30)
token = body["access_token"]
if body.get("must_change_password"):
requests.post(
f"{host}/api/v1/auth/change-password",
json={"old_password": ADMIN_PASS, "new_password": ADMIN_PASS},
headers={"Authorization": f"Bearer {token}"},
timeout=30,
)
body = _login_once(host, timeout=30)
token = body["access_token"]
_TOKEN_CACHE[host] = token
return token
def run_locust(host, users, spawn_rate, duration, _retry=False):
"""Run Locust in a subprocess (fresh Python, clean gevent monkey-patch)
and return a stats shim compatible with the tests.
@@ -237,6 +309,8 @@ def run_locust(host, users, spawn_rate, duration, _retry=False):
# Ensure DecnetUser.on_start can log in with the right creds
env.setdefault("DECNET_ADMIN_USER", ADMIN_USER)
env.setdefault("DECNET_ADMIN_PASSWORD", ADMIN_PASS)
# Pre-fetched token: locustfile picks this up and skips its own login.
env["DECNET_STRESS_TOKEN"] = _fetch_admin_token(host)
cmd = [
sys.executable, "-m", "locust",
@@ -249,6 +323,10 @@ def run_locust(host, users, spawn_rate, duration, _retry=False):
"--csv", str(csv_prefix),
"--only-summary",
"--loglevel", "WARNING",
# Locust defaults to exit code 1 on any request failure. We have our
# own per-test assertions for fail-rate; let the subprocess exit 0 so
# we don't throw away an otherwise valid stats CSV.
"--exit-code-on-error", "0",
]
# Generous timeout: locust run-time + spawn ramp + shutdown grace

View File

@@ -60,20 +60,21 @@ class DecnetUser(HttpUser):
raise RuntimeError(f"Login failed after {_MAX_LOGIN_RETRIES} retries (last status: {resp.status_code})")
def on_start(self):
token, must_change = self._login_with_retry()
# Only pay the change-password + re-login cost on the very first run
# against a fresh DB. Every run after that, must_change_password is
# already False — skip it or the login path becomes a bcrypt storm.
if must_change:
self.client.post(
"/api/v1/auth/change-password",
json={"old_password": ADMIN_PASS, "new_password": ADMIN_PASS},
headers={"Authorization": f"Bearer {token}"},
)
token, _ = self._login_with_retry()
self.token = token
# Prefer the fixture-supplied token: 1000 simultaneous bcrypt logins
# never finish inside a spike window, leaving aggregated requests at 0.
preset = os.environ.get("DECNET_STRESS_TOKEN")
if preset:
self.token = preset
else:
token, must_change = self._login_with_retry()
if must_change:
self.client.post(
"/api/v1/auth/change-password",
json={"old_password": ADMIN_PASS, "new_password": ADMIN_PASS},
headers={"Authorization": f"Bearer {token}"},
)
token, _ = self._login_with_retry()
self.token = token
self.client.headers.update({"Authorization": f"Bearer {self.token}"})
# --- Read-hot paths (high weight) ---

View File

@@ -9,7 +9,13 @@ import os
import pytest
from tests.stress.conftest import run_locust, STRESS_USERS, STRESS_SPAWN_RATE, STRESS_DURATION
from tests.stress.conftest import (
run_locust,
start_stress_server,
STRESS_USERS,
STRESS_SPAWN_RATE,
STRESS_DURATION,
)
# Assertion thresholds (overridable via env)
@@ -114,43 +120,35 @@ def test_stress_spike(stress_server):
@pytest.mark.stress
def test_stress_sustained(stress_server):
def test_stress_sustained():
"""Sustained load: 200 users for 30s. Checks latency doesn't degrade >3x.
Runs two phases:
Runs two phases against independent uvicorns. Sharing a server between
phases leaks keep-alive connections from phase 1 into phase 2 and the
sustained run records 0 requests roughly two-thirds of the time.
1. Warm-up (10s) to get baseline latency
2. Sustained (30s) to check for degradation
"""
sustained_users = int(os.environ.get("STRESS_SUSTAINED_USERS", "200"))
# Cap spawn rate at 100/s — locust itself warns above that and has been
# observed to record 0 requests when the spawn storm collides with a
# still-draining uvicorn from a prior phase.
ramp = min(sustained_users, 100)
# Phase 1: warm-up baseline
env_warmup = run_locust(
host=stress_server,
users=sustained_users,
spawn_rate=ramp,
duration=10,
)
with start_stress_server() as warm_url:
env_warmup = run_locust(
host=warm_url,
users=sustained_users,
spawn_rate=ramp,
duration=10,
)
baseline_avg = env_warmup.stats.total.avg_response_time
_print_stats(env_warmup, f"SUSTAINED warm-up: {sustained_users} users, 10s")
# Let the server drain pending work before firing the second locust run;
# otherwise the first request in phase 2 can sit behind a queued backlog
# and the 30s window can finish with 0 recorded requests.
import time as _t
_t.sleep(5)
# Phase 2: sustained
env_sustained = run_locust(
host=stress_server,
users=sustained_users,
spawn_rate=ramp,
duration=30,
)
with start_stress_server() as sustained_url:
env_sustained = run_locust(
host=sustained_url,
users=sustained_users,
spawn_rate=ramp,
duration=30,
)
sustained_avg = env_sustained.stats.total.avg_response_time
_print_stats(env_sustained, f"SUSTAINED main: {sustained_users} users, 30s")