diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index d932ea8..c027b48 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -36,50 +36,73 @@ from decnet.logging import get_logger _log = get_logger("db.pool") +# Hold strong refs to in-flight cleanup tasks so they aren't GC'd mid-run. +_cleanup_tasks: set[asyncio.Task] = set() -async def _force_close(session: AsyncSession) -> None: - """Close a session, forcing connection invalidation if clean close fails. - Under cancellation, ``session.close()`` may try to issue a ROLLBACK on - a connection that was interrupted mid-query — aiomysql then raises - ``InterfaceError("Cancelled during execution")`` and the connection is - left checked-out, reported by the pool as ``non-checked-in connection`` - on GC. When clean close fails, invalidate the session's connections - directly (no I/O, just flips the pool record) so the pool discards - them immediately instead of waiting for garbage collection. +def _detach_close(session: AsyncSession) -> None: + """Hand session cleanup to a fresh task so the caller's cancellation + doesn't interrupt it. + + ``asyncio.shield`` doesn't help on the exception path: shield prevents + *other* tasks from cancelling the inner coroutine, but if the *current* + task is already cancelled, its next ``await`` re-raises + ``CancelledError`` as soon as the inner coroutine yields. That's what + happens when uvicorn cancels a request mid-query — the rollback inside + ``session.close()`` can't complete, and the aiomysql connection is + orphaned (pool logs "non-checked-in connection" on GC). + + A fresh task isn't subject to the caller's pending cancellation, so + ``close()`` (or the ``invalidate()`` fallback for a dead connection) + runs to completion and the pool reclaims the connection promptly. + + Fire-and-forget on purpose: the caller is already unwinding and must + not wait on cleanup. """ + async def _cleanup() -> None: + try: + await session.close() + except BaseException: + try: + session.sync_session.invalidate() + except BaseException: + _log.debug("detach-close: invalidate failed", exc_info=True) + try: - await asyncio.shield(session.close()) + loop = asyncio.get_running_loop() + except RuntimeError: + # No running loop (shutdown path) — best-effort sync invalidate. + try: + session.sync_session.invalidate() + except BaseException: + _log.debug("detach-close: no-loop invalidate failed", exc_info=True) return - except BaseException: - pass - try: - # invalidate() is sync and does no network I/O — safe inside a - # cancelled task. Tells the pool to drop the underlying DBAPI - # connection rather than return it for reuse. - session.sync_session.invalidate() - except BaseException: - _log.debug("force-close: invalidate failed", exc_info=True) + task = loop.create_task(_cleanup()) + _cleanup_tasks.add(task) + # Consume any exception to silence "Task exception was never retrieved". + task.add_done_callback(lambda t: (_cleanup_tasks.discard(t), t.exception())) @asynccontextmanager async def _safe_session(factory: async_sessionmaker[AsyncSession]): - """Session context manager that shields cleanup from cancellation. + """Session context manager that keeps close() reliable under cancellation. - Under high concurrency, uvicorn cancels request tasks when clients - disconnect. If a CancelledError hits during session.__aexit__, - the underlying DB connection is orphaned — never returned to the - pool. This wrapper ensures close() always completes, preventing - the pool-drain death spiral. + Success path: await close() inline so the caller observes cleanup + (commit visibility, connection release) before proceeding. + + Exception path (includes CancelledError from client disconnects): + detach close() to a fresh task. The caller is unwinding and its + own cancellation would abort an inline close mid-rollback, leaving + the aiomysql connection orphaned. """ session = factory() try: yield session except BaseException: - await _force_close(session) + _detach_close(session) raise else: - await _force_close(session) + await session.close() class SQLModelRepository(BaseRepository):