fix(tests): profiler worker tests patched asyncio.sleep, but main loop uses wait_for
Since the event-driven shutdown refactor (0fbb07c), the profiler main
loop is asyncio.wait_for(shutdown.wait(), timeout=interval) — no sleep
on the hot path. The four worker tests that patched asyncio.sleep to
raise CancelledError on the Nth call were silently no-op'ing and
hanging on the real 30 s wait_for timeout.
Replace the sleep patches with a shared _cancel_after helper that
patches wait_for itself. Pass interval=0 so the loop ticks without
delay between iterations.
This commit is contained in:
@@ -36,6 +36,38 @@ from decnet.profiler.worker import (
|
||||
|
||||
# ─── Helpers ──────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _cancel_after(ticks: int):
|
||||
"""Cancel the worker loop after N ``asyncio.wait_for`` calls.
|
||||
|
||||
The worker's main tick is ``asyncio.wait_for(shutdown.wait(), timeout=
|
||||
interval)``. These tests want to let the loop body run a few times
|
||||
then unwind; patching wait_for is the natural knob. On call N the
|
||||
patched wait_for raises ``CancelledError``, which bubbles up through
|
||||
the worker and satisfies the test's ``pytest.raises`` assertion.
|
||||
|
||||
Earlier revisions patched ``asyncio.sleep`` — that hasn't been on the
|
||||
worker's hot path since the event-driven shutdown refactor, so the
|
||||
sleep patch silently no-op'd and the tests hung on the real 30 s
|
||||
``wait_for`` timeout.
|
||||
"""
|
||||
call_count = 0
|
||||
real_wait_for = asyncio.wait_for
|
||||
|
||||
async def _patched(awaitable, timeout):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
if call_count >= ticks:
|
||||
# Close the coroutine so asyncio doesn't warn about it never
|
||||
# being awaited.
|
||||
if asyncio.iscoroutine(awaitable):
|
||||
awaitable.close()
|
||||
raise asyncio.CancelledError()
|
||||
return await real_wait_for(awaitable, timeout)
|
||||
|
||||
return patch("decnet.profiler.worker.asyncio.wait_for", side_effect=_patched)
|
||||
|
||||
|
||||
_TS1 = "2026-04-04T10:00:00+00:00"
|
||||
_TS2 = "2026-04-04T10:05:00+00:00"
|
||||
_TS3 = "2026-04-04T10:10:00+00:00"
|
||||
@@ -575,42 +607,27 @@ class TestAttackerProfileWorker:
|
||||
@pytest.mark.asyncio
|
||||
async def test_worker_handles_update_error_without_crashing(self):
|
||||
repo = _make_repo()
|
||||
_call_count = 0
|
||||
|
||||
async def fake_sleep(secs):
|
||||
nonlocal _call_count
|
||||
_call_count += 1
|
||||
if _call_count >= 2:
|
||||
raise asyncio.CancelledError()
|
||||
|
||||
async def bad_update(_repo, _state):
|
||||
raise RuntimeError("DB exploded")
|
||||
|
||||
with patch("decnet.profiler.worker.asyncio.sleep", side_effect=fake_sleep):
|
||||
with _cancel_after(2):
|
||||
with patch("decnet.profiler.worker._incremental_update", side_effect=bad_update):
|
||||
with pytest.raises(asyncio.CancelledError):
|
||||
await attacker_profile_worker(repo)
|
||||
await attacker_profile_worker(repo, interval=0)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_worker_calls_update_after_sleep(self):
|
||||
repo = _make_repo()
|
||||
_call_count = 0
|
||||
|
||||
async def fake_sleep(secs):
|
||||
nonlocal _call_count
|
||||
_call_count += 1
|
||||
if _call_count >= 2:
|
||||
raise asyncio.CancelledError()
|
||||
|
||||
update_calls = []
|
||||
|
||||
async def mock_update(_repo, _state):
|
||||
update_calls.append(True)
|
||||
|
||||
with patch("decnet.profiler.worker.asyncio.sleep", side_effect=fake_sleep):
|
||||
with _cancel_after(2):
|
||||
with patch("decnet.profiler.worker._incremental_update", side_effect=mock_update):
|
||||
with pytest.raises(asyncio.CancelledError):
|
||||
await attacker_profile_worker(repo)
|
||||
await attacker_profile_worker(repo, interval=0)
|
||||
|
||||
assert len(update_calls) >= 1
|
||||
|
||||
@@ -618,23 +635,15 @@ class TestAttackerProfileWorker:
|
||||
async def test_cursor_restored_from_db_on_startup(self):
|
||||
"""Worker loads saved last_log_id from DB and passes it to _incremental_update."""
|
||||
repo = _make_repo(saved_state={"last_log_id": 99})
|
||||
_call_count = 0
|
||||
|
||||
async def fake_sleep(secs):
|
||||
nonlocal _call_count
|
||||
_call_count += 1
|
||||
if _call_count >= 2:
|
||||
raise asyncio.CancelledError()
|
||||
|
||||
captured_states = []
|
||||
|
||||
async def mock_update(_repo, state):
|
||||
captured_states.append((state.last_log_id, state.initialized))
|
||||
|
||||
with patch("decnet.profiler.worker.asyncio.sleep", side_effect=fake_sleep):
|
||||
with _cancel_after(2):
|
||||
with patch("decnet.profiler.worker._incremental_update", side_effect=mock_update):
|
||||
with pytest.raises(asyncio.CancelledError):
|
||||
await attacker_profile_worker(repo)
|
||||
await attacker_profile_worker(repo, interval=0)
|
||||
|
||||
assert captured_states, "_incremental_update never called"
|
||||
restored_id, initialized = captured_states[0]
|
||||
@@ -645,23 +654,15 @@ class TestAttackerProfileWorker:
|
||||
async def test_no_saved_cursor_starts_from_zero(self):
|
||||
"""When get_state returns None, worker starts fresh from log ID 0."""
|
||||
repo = _make_repo(saved_state=None)
|
||||
_call_count = 0
|
||||
|
||||
async def fake_sleep(secs):
|
||||
nonlocal _call_count
|
||||
_call_count += 1
|
||||
if _call_count >= 2:
|
||||
raise asyncio.CancelledError()
|
||||
|
||||
captured_states = []
|
||||
|
||||
async def mock_update(_repo, state):
|
||||
captured_states.append((state.last_log_id, state.initialized))
|
||||
|
||||
with patch("decnet.profiler.worker.asyncio.sleep", side_effect=fake_sleep):
|
||||
with _cancel_after(2):
|
||||
with patch("decnet.profiler.worker._incremental_update", side_effect=mock_update):
|
||||
with pytest.raises(asyncio.CancelledError):
|
||||
await attacker_profile_worker(repo)
|
||||
await attacker_profile_worker(repo, interval=0)
|
||||
|
||||
assert captured_states, "_incremental_update never called"
|
||||
restored_id, initialized = captured_states[0]
|
||||
|
||||
Reference in New Issue
Block a user