diff --git a/decnet/web/ingester.py b/decnet/web/ingester.py index b7d51f1a..d8c791bc 100644 --- a/decnet/web/ingester.py +++ b/decnet/web/ingester.py @@ -26,6 +26,7 @@ from decnet.web.db.repository import BaseRepository logger = get_logger("api") _INGEST_STATE_KEY = "ingest_worker_position" +_sleep = asyncio.sleep # module-level alias so tests patch here, not the global asyncio singleton async def log_ingestion_worker(repo: BaseRepository) -> None: @@ -73,7 +74,7 @@ async def _run_loop( while True: try: if not _json_log_path.exists(): - await asyncio.sleep(2) + await _sleep(2) continue _stat: os.stat_result = _json_log_path.stat() @@ -84,7 +85,7 @@ async def _run_loop( if _stat.st_size == _position: # No new data - await asyncio.sleep(1) + await _sleep(1) continue # Accumulate parsed rows and the file offset they end at. We @@ -149,9 +150,9 @@ async def _run_loop( break # Exit worker — DB is gone or uninitialized logger.error("ingest: error in worker: %s", _e) - await asyncio.sleep(5) + await _sleep(5) - await asyncio.sleep(1) + await _sleep(1) async def _publish_batch(bus: Any, flushed: int, position: int) -> None: diff --git a/tests/web/test_api_startup_guards.py b/tests/web/test_api_startup_guards.py index f6f9d17f..c2f335ce 100644 --- a/tests/web/test_api_startup_guards.py +++ b/tests/web/test_api_startup_guards.py @@ -30,17 +30,26 @@ def _strip_pytest_vars(monkeypatch: pytest.MonkeyPatch) -> None: async def _run_lifespan_startup(api_mod) -> None: - """Run the lifespan up to (but not past) yield, then unwind cleanly.""" - app = FastAPI() - cm = api_mod.lifespan(app) - await cm.__aenter__() + """Run the lifespan up to (but not past) yield, then unwind cleanly. + + DECNET_CONTRACT_TEST suppresses all background workers (ingestion, + collector, TTP, tarpit) so no tasks escape test teardown. + """ + import os + os.environ["DECNET_CONTRACT_TEST"] = "true" try: - return - finally: + app = FastAPI() + cm = api_mod.lifespan(app) + await cm.__aenter__() try: - await cm.__aexit__(None, None, None) - except Exception: - pass + return + finally: + try: + await cm.__aexit__(None, None, None) + except Exception: + pass + finally: + os.environ.pop("DECNET_CONTRACT_TEST", None) def test_master_api_refuses_to_start_in_agent_mode( diff --git a/tests/web/test_ingester.py b/tests/web/test_ingester.py index d7efce6d..89b8669e 100644 --- a/tests/web/test_ingester.py +++ b/tests/web/test_ingester.py @@ -476,7 +476,7 @@ class TestLogIngestionWorker: raise asyncio.CancelledError() with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}): - with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep): + with patch("decnet.web.ingester._sleep", side_effect=fake_sleep): with pytest.raises(asyncio.CancelledError): await log_ingestion_worker(mock_repo) mock_repo.add_logs.assert_not_awaited() @@ -507,7 +507,7 @@ class TestLogIngestionWorker: raise asyncio.CancelledError() with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}): - with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep): + with patch("decnet.web.ingester._sleep", side_effect=fake_sleep): with pytest.raises(asyncio.CancelledError): await log_ingestion_worker(mock_repo) @@ -539,7 +539,7 @@ class TestLogIngestionWorker: raise asyncio.CancelledError() with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}): - with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep): + with patch("decnet.web.ingester._sleep", side_effect=fake_sleep): with pytest.raises(asyncio.CancelledError): await log_ingestion_worker(mock_repo) @@ -575,7 +575,7 @@ class TestLogIngestionWorker: raise asyncio.CancelledError() with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}): - with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep): + with patch("decnet.web.ingester._sleep", side_effect=fake_sleep): with pytest.raises(asyncio.CancelledError): await log_ingestion_worker(mock_repo) @@ -607,7 +607,7 @@ class TestLogIngestionWorker: raise asyncio.CancelledError() with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}): - with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep): + with patch("decnet.web.ingester._sleep", side_effect=fake_sleep): with pytest.raises(asyncio.CancelledError): await log_ingestion_worker(mock_repo) @@ -646,7 +646,7 @@ class TestLogIngestionWorker: raise asyncio.CancelledError() with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}): - with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep): + with patch("decnet.web.ingester._sleep", side_effect=fake_sleep): with pytest.raises(asyncio.CancelledError): await log_ingestion_worker(mock_repo) @@ -680,7 +680,7 @@ class TestLogIngestionWorker: raise asyncio.CancelledError() with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}): - with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep): + with patch("decnet.web.ingester._sleep", side_effect=fake_sleep): with pytest.raises(asyncio.CancelledError): await log_ingestion_worker(mock_repo) @@ -721,7 +721,7 @@ class TestLogIngestionWorker: raise asyncio.CancelledError() with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}): - with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep): + with patch("decnet.web.ingester._sleep", side_effect=fake_sleep): with pytest.raises(asyncio.CancelledError): await log_ingestion_worker(mock_repo) @@ -761,7 +761,7 @@ class TestLogIngestionWorker: raise asyncio.CancelledError() with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}): - with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep): + with patch("decnet.web.ingester._sleep", side_effect=fake_sleep): with pytest.raises(asyncio.CancelledError): await log_ingestion_worker(mock_repo)