fix(db): detach session cleanup onto fresh task on cancellation
Previous attempt (shield + sync invalidate fallback) didn't work because shield only protects against cancellation from *other* tasks. When the caller task itself is cancelled mid-query, its next await re-raises CancelledError as soon as the shielded coroutine yields — rollback inside session.close() never completes, the aiomysql connection is orphaned, and the pool logs 'non-checked-in connection' when GC finally reaches it. Hand exception-path cleanup to loop.create_task() so the new task isn't subject to the caller's pending cancellation. close() (and the invalidate() fallback for a dead connection) runs to completion. Success path is unchanged — still awaits close() inline so callers see commit visibility and pool release before proceeding.
This commit is contained in:
@@ -36,50 +36,73 @@ from decnet.logging import get_logger
|
||||
|
||||
_log = get_logger("db.pool")
|
||||
|
||||
# Hold strong refs to in-flight cleanup tasks so they aren't GC'd mid-run.
|
||||
_cleanup_tasks: set[asyncio.Task] = set()
|
||||
|
||||
async def _force_close(session: AsyncSession) -> None:
|
||||
"""Close a session, forcing connection invalidation if clean close fails.
|
||||
|
||||
Under cancellation, ``session.close()`` may try to issue a ROLLBACK on
|
||||
a connection that was interrupted mid-query — aiomysql then raises
|
||||
``InterfaceError("Cancelled during execution")`` and the connection is
|
||||
left checked-out, reported by the pool as ``non-checked-in connection``
|
||||
on GC. When clean close fails, invalidate the session's connections
|
||||
directly (no I/O, just flips the pool record) so the pool discards
|
||||
them immediately instead of waiting for garbage collection.
|
||||
def _detach_close(session: AsyncSession) -> None:
|
||||
"""Hand session cleanup to a fresh task so the caller's cancellation
|
||||
doesn't interrupt it.
|
||||
|
||||
``asyncio.shield`` doesn't help on the exception path: shield prevents
|
||||
*other* tasks from cancelling the inner coroutine, but if the *current*
|
||||
task is already cancelled, its next ``await`` re-raises
|
||||
``CancelledError`` as soon as the inner coroutine yields. That's what
|
||||
happens when uvicorn cancels a request mid-query — the rollback inside
|
||||
``session.close()`` can't complete, and the aiomysql connection is
|
||||
orphaned (pool logs "non-checked-in connection" on GC).
|
||||
|
||||
A fresh task isn't subject to the caller's pending cancellation, so
|
||||
``close()`` (or the ``invalidate()`` fallback for a dead connection)
|
||||
runs to completion and the pool reclaims the connection promptly.
|
||||
|
||||
Fire-and-forget on purpose: the caller is already unwinding and must
|
||||
not wait on cleanup.
|
||||
"""
|
||||
async def _cleanup() -> None:
|
||||
try:
|
||||
await session.close()
|
||||
except BaseException:
|
||||
try:
|
||||
session.sync_session.invalidate()
|
||||
except BaseException:
|
||||
_log.debug("detach-close: invalidate failed", exc_info=True)
|
||||
|
||||
try:
|
||||
await asyncio.shield(session.close())
|
||||
loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
# No running loop (shutdown path) — best-effort sync invalidate.
|
||||
try:
|
||||
session.sync_session.invalidate()
|
||||
except BaseException:
|
||||
_log.debug("detach-close: no-loop invalidate failed", exc_info=True)
|
||||
return
|
||||
except BaseException:
|
||||
pass
|
||||
try:
|
||||
# invalidate() is sync and does no network I/O — safe inside a
|
||||
# cancelled task. Tells the pool to drop the underlying DBAPI
|
||||
# connection rather than return it for reuse.
|
||||
session.sync_session.invalidate()
|
||||
except BaseException:
|
||||
_log.debug("force-close: invalidate failed", exc_info=True)
|
||||
task = loop.create_task(_cleanup())
|
||||
_cleanup_tasks.add(task)
|
||||
# Consume any exception to silence "Task exception was never retrieved".
|
||||
task.add_done_callback(lambda t: (_cleanup_tasks.discard(t), t.exception()))
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def _safe_session(factory: async_sessionmaker[AsyncSession]):
|
||||
"""Session context manager that shields cleanup from cancellation.
|
||||
"""Session context manager that keeps close() reliable under cancellation.
|
||||
|
||||
Under high concurrency, uvicorn cancels request tasks when clients
|
||||
disconnect. If a CancelledError hits during session.__aexit__,
|
||||
the underlying DB connection is orphaned — never returned to the
|
||||
pool. This wrapper ensures close() always completes, preventing
|
||||
the pool-drain death spiral.
|
||||
Success path: await close() inline so the caller observes cleanup
|
||||
(commit visibility, connection release) before proceeding.
|
||||
|
||||
Exception path (includes CancelledError from client disconnects):
|
||||
detach close() to a fresh task. The caller is unwinding and its
|
||||
own cancellation would abort an inline close mid-rollback, leaving
|
||||
the aiomysql connection orphaned.
|
||||
"""
|
||||
session = factory()
|
||||
try:
|
||||
yield session
|
||||
except BaseException:
|
||||
await _force_close(session)
|
||||
_detach_close(session)
|
||||
raise
|
||||
else:
|
||||
await _force_close(session)
|
||||
await session.close()
|
||||
|
||||
|
||||
class SQLModelRepository(BaseRepository):
|
||||
|
||||
Reference in New Issue
Block a user