fix(tests): prevent xdist worker OOM from leaked tarpit asyncio task

asyncio_default_fixture_loop_scope was 'module', so all async tests in
a module share one event loop. test_lifespan_startup_and_shutdown patched
log_ingestion_worker/log_collector_worker/attacker_profile_worker but not
tarpit_watcher_worker — the real while-True coroutine was created as an
asyncio task on the shared loop and never cancelled. The xdist worker ran
for 4+ hours (confirmed via py-spy + etime=04:48) consuming 15+ GB before
OOM-kill.

Fixes:
- Patch tarpit_watcher_worker in both TestLifespan tests
- Change asyncio_default_fixture_loop_scope to 'function' so each test
  gets its own loop; tasks cannot outlive their test
- Add loop_scope='module' to precision_engine which legitimately needs
  a module-scoped event loop
This commit is contained in:
2026-05-10 09:53:25 -04:00
parent 9a7b03700c
commit a2c34cac02
3 changed files with 8 additions and 6 deletions

View File

@@ -116,7 +116,7 @@ decnet = "decnet.cli:app"
[tool.pytest.ini_options] [tool.pytest.ini_options]
asyncio_mode = "auto" asyncio_mode = "auto"
asyncio_debug = "true" asyncio_debug = "true"
asyncio_default_fixture_loop_scope = "module" asyncio_default_fixture_loop_scope = "function"
addopts = "-v -q -x -n 4 --dist load" addopts = "-v -q -x -n 4 --dist load"
norecursedirs = [ norecursedirs = [
"tests/live", "tests/live",

View File

@@ -102,7 +102,7 @@ def compiled_rules() -> list[CompiledRule]:
return _load_compiled_rules() return _load_compiled_rules()
@pytest_asyncio.fixture(scope="module") @pytest_asyncio.fixture(scope="module", loop_scope="module")
async def precision_engine( async def precision_engine(
compiled_rules: list[CompiledRule], compiled_rules: list[CompiledRule],
) -> RuleEngine: ) -> RuleEngine:

View File

@@ -129,8 +129,9 @@ class TestLifespan:
with patch("decnet.web.api.log_ingestion_worker", return_value=asyncio.sleep(0)): with patch("decnet.web.api.log_ingestion_worker", return_value=asyncio.sleep(0)):
with patch("decnet.web.api.log_collector_worker", return_value=asyncio.sleep(0)): with patch("decnet.web.api.log_collector_worker", return_value=asyncio.sleep(0)):
with patch("decnet.web.api.attacker_profile_worker", return_value=asyncio.sleep(0)): with patch("decnet.web.api.attacker_profile_worker", return_value=asyncio.sleep(0)):
async with lifespan(mock_app): with patch("decnet.web.api.tarpit_watcher_worker", return_value=asyncio.sleep(0)):
mock_repo.initialize.assert_awaited_once() async with lifespan(mock_app):
mock_repo.initialize.assert_awaited_once()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_lifespan_db_retry(self): async def test_lifespan_db_retry(self):
@@ -155,5 +156,6 @@ class TestLifespan:
with patch("decnet.web.api.log_ingestion_worker", return_value=asyncio.sleep(0)): with patch("decnet.web.api.log_ingestion_worker", return_value=asyncio.sleep(0)):
with patch("decnet.web.api.log_collector_worker", return_value=asyncio.sleep(0)): with patch("decnet.web.api.log_collector_worker", return_value=asyncio.sleep(0)):
with patch("decnet.web.api.attacker_profile_worker", return_value=asyncio.sleep(0)): with patch("decnet.web.api.attacker_profile_worker", return_value=asyncio.sleep(0)):
async with lifespan(mock_app): with patch("decnet.web.api.tarpit_watcher_worker", return_value=asyncio.sleep(0)):
assert _call_count == 3 async with lifespan(mock_app):
assert _call_count == 3