fix(tests): eliminate tarpit OOM from global asyncio.sleep mock
Two interacting bugs caused asyncio.sleep to be mocked globally, letting tarpit_watcher_worker spin the event loop on a non-async mock and accumulate _increment_mock_call records without bound: 1. test_ingester.py patched `decnet.web.ingester.asyncio.sleep` via the asyncio singleton — any code in the process using asyncio.sleep (including the tarpit worker) hit the fake_sleep side_effect. Fix: add `_sleep = asyncio.sleep` alias in ingester.py and patch `decnet.web.ingester._sleep` instead — scopes the mock to ingester. 2. test_api_startup_guards.py called `_run_lifespan_startup` without DECNET_CONTRACT_TEST=true, which started the real tarpit task in a manually-constructed event loop that the tests never cancelled. Fix: set DECNET_CONTRACT_TEST=true inside _run_lifespan_startup so the lifespan skips all background workers.
This commit is contained in:
@@ -26,6 +26,7 @@ from decnet.web.db.repository import BaseRepository
|
|||||||
logger = get_logger("api")
|
logger = get_logger("api")
|
||||||
|
|
||||||
_INGEST_STATE_KEY = "ingest_worker_position"
|
_INGEST_STATE_KEY = "ingest_worker_position"
|
||||||
|
_sleep = asyncio.sleep # module-level alias so tests patch here, not the global asyncio singleton
|
||||||
|
|
||||||
|
|
||||||
async def log_ingestion_worker(repo: BaseRepository) -> None:
|
async def log_ingestion_worker(repo: BaseRepository) -> None:
|
||||||
@@ -73,7 +74,7 @@ async def _run_loop(
|
|||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
if not _json_log_path.exists():
|
if not _json_log_path.exists():
|
||||||
await asyncio.sleep(2)
|
await _sleep(2)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
_stat: os.stat_result = _json_log_path.stat()
|
_stat: os.stat_result = _json_log_path.stat()
|
||||||
@@ -84,7 +85,7 @@ async def _run_loop(
|
|||||||
|
|
||||||
if _stat.st_size == _position:
|
if _stat.st_size == _position:
|
||||||
# No new data
|
# No new data
|
||||||
await asyncio.sleep(1)
|
await _sleep(1)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Accumulate parsed rows and the file offset they end at. We
|
# Accumulate parsed rows and the file offset they end at. We
|
||||||
@@ -149,9 +150,9 @@ async def _run_loop(
|
|||||||
break # Exit worker — DB is gone or uninitialized
|
break # Exit worker — DB is gone or uninitialized
|
||||||
|
|
||||||
logger.error("ingest: error in worker: %s", _e)
|
logger.error("ingest: error in worker: %s", _e)
|
||||||
await asyncio.sleep(5)
|
await _sleep(5)
|
||||||
|
|
||||||
await asyncio.sleep(1)
|
await _sleep(1)
|
||||||
|
|
||||||
|
|
||||||
async def _publish_batch(bus: Any, flushed: int, position: int) -> None:
|
async def _publish_batch(bus: Any, flushed: int, position: int) -> None:
|
||||||
|
|||||||
@@ -30,17 +30,26 @@ def _strip_pytest_vars(monkeypatch: pytest.MonkeyPatch) -> None:
|
|||||||
|
|
||||||
|
|
||||||
async def _run_lifespan_startup(api_mod) -> None:
|
async def _run_lifespan_startup(api_mod) -> None:
|
||||||
"""Run the lifespan up to (but not past) yield, then unwind cleanly."""
|
"""Run the lifespan up to (but not past) yield, then unwind cleanly.
|
||||||
app = FastAPI()
|
|
||||||
cm = api_mod.lifespan(app)
|
DECNET_CONTRACT_TEST suppresses all background workers (ingestion,
|
||||||
await cm.__aenter__()
|
collector, TTP, tarpit) so no tasks escape test teardown.
|
||||||
|
"""
|
||||||
|
import os
|
||||||
|
os.environ["DECNET_CONTRACT_TEST"] = "true"
|
||||||
try:
|
try:
|
||||||
return
|
app = FastAPI()
|
||||||
finally:
|
cm = api_mod.lifespan(app)
|
||||||
|
await cm.__aenter__()
|
||||||
try:
|
try:
|
||||||
await cm.__aexit__(None, None, None)
|
return
|
||||||
except Exception:
|
finally:
|
||||||
pass
|
try:
|
||||||
|
await cm.__aexit__(None, None, None)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
os.environ.pop("DECNET_CONTRACT_TEST", None)
|
||||||
|
|
||||||
|
|
||||||
def test_master_api_refuses_to_start_in_agent_mode(
|
def test_master_api_refuses_to_start_in_agent_mode(
|
||||||
|
|||||||
@@ -476,7 +476,7 @@ class TestLogIngestionWorker:
|
|||||||
raise asyncio.CancelledError()
|
raise asyncio.CancelledError()
|
||||||
|
|
||||||
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
|
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
|
||||||
with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep):
|
with patch("decnet.web.ingester._sleep", side_effect=fake_sleep):
|
||||||
with pytest.raises(asyncio.CancelledError):
|
with pytest.raises(asyncio.CancelledError):
|
||||||
await log_ingestion_worker(mock_repo)
|
await log_ingestion_worker(mock_repo)
|
||||||
mock_repo.add_logs.assert_not_awaited()
|
mock_repo.add_logs.assert_not_awaited()
|
||||||
@@ -507,7 +507,7 @@ class TestLogIngestionWorker:
|
|||||||
raise asyncio.CancelledError()
|
raise asyncio.CancelledError()
|
||||||
|
|
||||||
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
|
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
|
||||||
with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep):
|
with patch("decnet.web.ingester._sleep", side_effect=fake_sleep):
|
||||||
with pytest.raises(asyncio.CancelledError):
|
with pytest.raises(asyncio.CancelledError):
|
||||||
await log_ingestion_worker(mock_repo)
|
await log_ingestion_worker(mock_repo)
|
||||||
|
|
||||||
@@ -539,7 +539,7 @@ class TestLogIngestionWorker:
|
|||||||
raise asyncio.CancelledError()
|
raise asyncio.CancelledError()
|
||||||
|
|
||||||
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
|
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
|
||||||
with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep):
|
with patch("decnet.web.ingester._sleep", side_effect=fake_sleep):
|
||||||
with pytest.raises(asyncio.CancelledError):
|
with pytest.raises(asyncio.CancelledError):
|
||||||
await log_ingestion_worker(mock_repo)
|
await log_ingestion_worker(mock_repo)
|
||||||
|
|
||||||
@@ -575,7 +575,7 @@ class TestLogIngestionWorker:
|
|||||||
raise asyncio.CancelledError()
|
raise asyncio.CancelledError()
|
||||||
|
|
||||||
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
|
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
|
||||||
with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep):
|
with patch("decnet.web.ingester._sleep", side_effect=fake_sleep):
|
||||||
with pytest.raises(asyncio.CancelledError):
|
with pytest.raises(asyncio.CancelledError):
|
||||||
await log_ingestion_worker(mock_repo)
|
await log_ingestion_worker(mock_repo)
|
||||||
|
|
||||||
@@ -607,7 +607,7 @@ class TestLogIngestionWorker:
|
|||||||
raise asyncio.CancelledError()
|
raise asyncio.CancelledError()
|
||||||
|
|
||||||
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
|
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
|
||||||
with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep):
|
with patch("decnet.web.ingester._sleep", side_effect=fake_sleep):
|
||||||
with pytest.raises(asyncio.CancelledError):
|
with pytest.raises(asyncio.CancelledError):
|
||||||
await log_ingestion_worker(mock_repo)
|
await log_ingestion_worker(mock_repo)
|
||||||
|
|
||||||
@@ -646,7 +646,7 @@ class TestLogIngestionWorker:
|
|||||||
raise asyncio.CancelledError()
|
raise asyncio.CancelledError()
|
||||||
|
|
||||||
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
|
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
|
||||||
with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep):
|
with patch("decnet.web.ingester._sleep", side_effect=fake_sleep):
|
||||||
with pytest.raises(asyncio.CancelledError):
|
with pytest.raises(asyncio.CancelledError):
|
||||||
await log_ingestion_worker(mock_repo)
|
await log_ingestion_worker(mock_repo)
|
||||||
|
|
||||||
@@ -680,7 +680,7 @@ class TestLogIngestionWorker:
|
|||||||
raise asyncio.CancelledError()
|
raise asyncio.CancelledError()
|
||||||
|
|
||||||
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
|
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
|
||||||
with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep):
|
with patch("decnet.web.ingester._sleep", side_effect=fake_sleep):
|
||||||
with pytest.raises(asyncio.CancelledError):
|
with pytest.raises(asyncio.CancelledError):
|
||||||
await log_ingestion_worker(mock_repo)
|
await log_ingestion_worker(mock_repo)
|
||||||
|
|
||||||
@@ -721,7 +721,7 @@ class TestLogIngestionWorker:
|
|||||||
raise asyncio.CancelledError()
|
raise asyncio.CancelledError()
|
||||||
|
|
||||||
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
|
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
|
||||||
with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep):
|
with patch("decnet.web.ingester._sleep", side_effect=fake_sleep):
|
||||||
with pytest.raises(asyncio.CancelledError):
|
with pytest.raises(asyncio.CancelledError):
|
||||||
await log_ingestion_worker(mock_repo)
|
await log_ingestion_worker(mock_repo)
|
||||||
|
|
||||||
@@ -761,7 +761,7 @@ class TestLogIngestionWorker:
|
|||||||
raise asyncio.CancelledError()
|
raise asyncio.CancelledError()
|
||||||
|
|
||||||
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
|
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
|
||||||
with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep):
|
with patch("decnet.web.ingester._sleep", side_effect=fake_sleep):
|
||||||
with pytest.raises(asyncio.CancelledError):
|
with pytest.raises(asyncio.CancelledError):
|
||||||
await log_ingestion_worker(mock_repo)
|
await log_ingestion_worker(mock_repo)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user