diff --git a/tests/stress/conftest.py b/tests/stress/conftest.py index 7235c246..8497e473 100644 --- a/tests/stress/conftest.py +++ b/tests/stress/conftest.py @@ -57,12 +57,12 @@ def _wait_for_server(url: str, timeout: float = 60.0) -> None: raise TimeoutError(f"Server not ready at {url}") -@pytest.fixture(scope="function") -def stress_server(): - # Function-scoped: every stress test gets its own clean uvicorn. Sharing - # a server across baseline → spike → sustained left the later runs with - # a half-dead pool (0-request symptom). Cost is ~5s of startup per test. - """Start a real uvicorn server for stress testing.""" +from contextlib import contextmanager + + +@contextmanager +def _start_stress_server(): + """Spawn a uvicorn for stress testing; yield base_url; tear down on exit.""" port = _free_port() env = {k: v for k, v in os.environ.items() if not k.startswith("DECNET_")} env.update({ @@ -110,6 +110,10 @@ def stress_server(): ) yield base_url finally: + # Drop any cached admin token for the dying host so the next server + # gets a fresh login instead of presenting a JWT signed against a + # stale process state. + _TOKEN_CACHE.pop(base_url, None) proc.terminate() try: proc.wait(timeout=10) @@ -118,6 +122,22 @@ def stress_server(): proc.wait() +@pytest.fixture(scope="function") +def stress_server(): + # Function-scoped: every stress test gets its own clean uvicorn. Sharing + # a server across baseline → spike → sustained left the later runs with + # a half-dead pool (0-request symptom). Cost is ~5s of startup per test. + """Start a real uvicorn server for stress testing.""" + with _start_stress_server() as base_url: + yield base_url + + +# Re-exported so tests can spin up additional servers within a single test +# (e.g. test_stress_sustained needs a fresh uvicorn between phases — phase 1 +# leaves keep-alive connections that wedge phase 2 into 0 recorded requests). +start_stress_server = _start_stress_server + + @pytest.fixture def stress_token(stress_server): """Authenticate and return a valid admin JWT.""" @@ -224,6 +244,58 @@ def _parse_locust_csv(stats_csv: Path) -> _LocustEnv: return _LocustEnv(_Stats(total, entries)) +_TOKEN_CACHE: dict[str, str] = {} + + +def _login_once(host: str, timeout: float) -> dict: + last_exc: Exception | None = None + # Retry through transient drain windows: between phases the API is + # still flushing connections from the prior locust run, so the first + # POST can sit in-queue past a single 15s timeout. + for attempt in range(5): + try: + resp = requests.post( + f"{host}/api/v1/auth/login", + json={"username": ADMIN_USER, "password": ADMIN_PASS}, + timeout=timeout, + ) + resp.raise_for_status() + return resp.json() + except (requests.Timeout, requests.ConnectionError) as e: + last_exc = e + time.sleep(2 ** attempt) + raise RuntimeError(f"admin login failed after retries: {last_exc!r}") + + +def _fetch_admin_token(host: str) -> str: + """Pre-fetch an admin token so locust virtual users don't all stampede + /auth/login on_start. Bcrypt is CPU-bound; 1000 simultaneous logins under + a 15s spike window means no user ever completes on_start and aggregated + request count is 0. + + Cached per-host: the token is reusable across phases, so we don't pay + a fresh /auth/login round-trip while a previous run is still draining. + """ + cached = _TOKEN_CACHE.get(host) + if cached: + return cached + + body = _login_once(host, timeout=30) + token = body["access_token"] + if body.get("must_change_password"): + requests.post( + f"{host}/api/v1/auth/change-password", + json={"old_password": ADMIN_PASS, "new_password": ADMIN_PASS}, + headers={"Authorization": f"Bearer {token}"}, + timeout=30, + ) + body = _login_once(host, timeout=30) + token = body["access_token"] + + _TOKEN_CACHE[host] = token + return token + + def run_locust(host, users, spawn_rate, duration, _retry=False): """Run Locust in a subprocess (fresh Python, clean gevent monkey-patch) and return a stats shim compatible with the tests. @@ -237,6 +309,8 @@ def run_locust(host, users, spawn_rate, duration, _retry=False): # Ensure DecnetUser.on_start can log in with the right creds env.setdefault("DECNET_ADMIN_USER", ADMIN_USER) env.setdefault("DECNET_ADMIN_PASSWORD", ADMIN_PASS) + # Pre-fetched token: locustfile picks this up and skips its own login. + env["DECNET_STRESS_TOKEN"] = _fetch_admin_token(host) cmd = [ sys.executable, "-m", "locust", @@ -249,6 +323,10 @@ def run_locust(host, users, spawn_rate, duration, _retry=False): "--csv", str(csv_prefix), "--only-summary", "--loglevel", "WARNING", + # Locust defaults to exit code 1 on any request failure. We have our + # own per-test assertions for fail-rate; let the subprocess exit 0 so + # we don't throw away an otherwise valid stats CSV. + "--exit-code-on-error", "0", ] # Generous timeout: locust run-time + spawn ramp + shutdown grace diff --git a/tests/stress/locustfile.py b/tests/stress/locustfile.py index 586aa816..1c35fd49 100644 --- a/tests/stress/locustfile.py +++ b/tests/stress/locustfile.py @@ -60,20 +60,21 @@ class DecnetUser(HttpUser): raise RuntimeError(f"Login failed after {_MAX_LOGIN_RETRIES} retries (last status: {resp.status_code})") def on_start(self): - token, must_change = self._login_with_retry() - - # Only pay the change-password + re-login cost on the very first run - # against a fresh DB. Every run after that, must_change_password is - # already False — skip it or the login path becomes a bcrypt storm. - if must_change: - self.client.post( - "/api/v1/auth/change-password", - json={"old_password": ADMIN_PASS, "new_password": ADMIN_PASS}, - headers={"Authorization": f"Bearer {token}"}, - ) - token, _ = self._login_with_retry() - - self.token = token + # Prefer the fixture-supplied token: 1000 simultaneous bcrypt logins + # never finish inside a spike window, leaving aggregated requests at 0. + preset = os.environ.get("DECNET_STRESS_TOKEN") + if preset: + self.token = preset + else: + token, must_change = self._login_with_retry() + if must_change: + self.client.post( + "/api/v1/auth/change-password", + json={"old_password": ADMIN_PASS, "new_password": ADMIN_PASS}, + headers={"Authorization": f"Bearer {token}"}, + ) + token, _ = self._login_with_retry() + self.token = token self.client.headers.update({"Authorization": f"Bearer {self.token}"}) # --- Read-hot paths (high weight) --- diff --git a/tests/stress/test_stress.py b/tests/stress/test_stress.py index 5f3345b6..7e578614 100644 --- a/tests/stress/test_stress.py +++ b/tests/stress/test_stress.py @@ -9,7 +9,13 @@ import os import pytest -from tests.stress.conftest import run_locust, STRESS_USERS, STRESS_SPAWN_RATE, STRESS_DURATION +from tests.stress.conftest import ( + run_locust, + start_stress_server, + STRESS_USERS, + STRESS_SPAWN_RATE, + STRESS_DURATION, +) # Assertion thresholds (overridable via env) @@ -114,43 +120,35 @@ def test_stress_spike(stress_server): @pytest.mark.stress -def test_stress_sustained(stress_server): +def test_stress_sustained(): """Sustained load: 200 users for 30s. Checks latency doesn't degrade >3x. - Runs two phases: + Runs two phases against independent uvicorns. Sharing a server between + phases leaks keep-alive connections from phase 1 into phase 2 and the + sustained run records 0 requests roughly two-thirds of the time. 1. Warm-up (10s) to get baseline latency 2. Sustained (30s) to check for degradation """ sustained_users = int(os.environ.get("STRESS_SUSTAINED_USERS", "200")) - - # Cap spawn rate at 100/s — locust itself warns above that and has been - # observed to record 0 requests when the spawn storm collides with a - # still-draining uvicorn from a prior phase. ramp = min(sustained_users, 100) - # Phase 1: warm-up baseline - env_warmup = run_locust( - host=stress_server, - users=sustained_users, - spawn_rate=ramp, - duration=10, - ) + with start_stress_server() as warm_url: + env_warmup = run_locust( + host=warm_url, + users=sustained_users, + spawn_rate=ramp, + duration=10, + ) baseline_avg = env_warmup.stats.total.avg_response_time _print_stats(env_warmup, f"SUSTAINED warm-up: {sustained_users} users, 10s") - # Let the server drain pending work before firing the second locust run; - # otherwise the first request in phase 2 can sit behind a queued backlog - # and the 30s window can finish with 0 recorded requests. - import time as _t - _t.sleep(5) - - # Phase 2: sustained - env_sustained = run_locust( - host=stress_server, - users=sustained_users, - spawn_rate=ramp, - duration=30, - ) + with start_stress_server() as sustained_url: + env_sustained = run_locust( + host=sustained_url, + users=sustained_users, + spawn_rate=ramp, + duration=30, + ) sustained_avg = env_sustained.stats.total.avg_response_time _print_stats(env_sustained, f"SUSTAINED main: {sustained_users} users, 30s")