From e16f47ad2402ff6c24046208e91d8e76ecf3d9b7 Mon Sep 17 00:00:00 2001 From: anti Date: Tue, 28 Apr 2026 14:38:26 -0400 Subject: [PATCH] refactor(db): extract _safe_session/_detach_close to _helpers.py Module-level session helpers move into sqlmodel_repo/_helpers.py. __init__.py re-exports them so external import paths (decnet.web.db.sqlmodel_repo._safe_session) keep resolving. --- decnet/web/db/sqlmodel_repo/__init__.py | 78 ++--------------------- decnet/web/db/sqlmodel_repo/_helpers.py | 83 +++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 73 deletions(-) create mode 100644 decnet/web/db/sqlmodel_repo/_helpers.py diff --git a/decnet/web/db/sqlmodel_repo/__init__.py b/decnet/web/db/sqlmodel_repo/__init__.py index b2b8aa05..1a5fae2d 100644 --- a/decnet/web/db/sqlmodel_repo/__init__.py +++ b/decnet/web/db/sqlmodel_repo/__init__.py @@ -62,79 +62,11 @@ from decnet.web.db.models import ( ) -from contextlib import asynccontextmanager - -from decnet.logging import get_logger - -_log = get_logger("db.pool") - -# Hold strong refs to in-flight cleanup tasks so they aren't GC'd mid-run. -_cleanup_tasks: set[asyncio.Task] = set() - - -def _detach_close(session: AsyncSession) -> None: - """Hand session cleanup to a fresh task so the caller's cancellation - doesn't interrupt it. - - ``asyncio.shield`` doesn't help on the exception path: shield prevents - *other* tasks from cancelling the inner coroutine, but if the *current* - task is already cancelled, its next ``await`` re-raises - ``CancelledError`` as soon as the inner coroutine yields. That's what - happens when uvicorn cancels a request mid-query — the rollback inside - ``session.close()`` can't complete, and the aiomysql connection is - orphaned (pool logs "non-checked-in connection" on GC). - - A fresh task isn't subject to the caller's pending cancellation, so - ``close()`` (or the ``invalidate()`` fallback for a dead connection) - runs to completion and the pool reclaims the connection promptly. - - Fire-and-forget on purpose: the caller is already unwinding and must - not wait on cleanup. - """ - async def _cleanup() -> None: - try: - await session.close() - except BaseException: - try: - session.sync_session.invalidate() - except BaseException: - _log.debug("detach-close: invalidate failed", exc_info=True) - - try: - loop = asyncio.get_running_loop() - except RuntimeError: - # No running loop (shutdown path) — best-effort sync invalidate. - try: - session.sync_session.invalidate() - except BaseException: - _log.debug("detach-close: no-loop invalidate failed", exc_info=True) - return - task = loop.create_task(_cleanup()) - _cleanup_tasks.add(task) - # Consume any exception to silence "Task exception was never retrieved". - task.add_done_callback(lambda t: (_cleanup_tasks.discard(t), t.exception())) - - -@asynccontextmanager -async def _safe_session(factory: async_sessionmaker[AsyncSession]): - """Session context manager that keeps close() reliable under cancellation. - - Success path: await close() inline so the caller observes cleanup - (commit visibility, connection release) before proceeding. - - Exception path (includes CancelledError from client disconnects): - detach close() to a fresh task. The caller is unwinding and its - own cancellation would abort an inline close mid-rollback, leaving - the aiomysql connection orphaned. - """ - session = factory() - try: - yield session - except BaseException: - _detach_close(session) - raise - else: - await session.close() +from decnet.web.db.sqlmodel_repo._helpers import ( # noqa: F401 (re-exported for tests/external) + _safe_session, + _detach_close, + _cleanup_tasks, +) class SQLModelRepository(BaseRepository): diff --git a/decnet/web/db/sqlmodel_repo/_helpers.py b/decnet/web/db/sqlmodel_repo/_helpers.py new file mode 100644 index 00000000..eae889b0 --- /dev/null +++ b/decnet/web/db/sqlmodel_repo/_helpers.py @@ -0,0 +1,83 @@ +"""Module-level session helpers shared by every repository mixin. + +``_safe_session`` and ``_detach_close`` make session cleanup robust under +client-cancellation. See ``_detach_close`` for the full rationale. +""" +from __future__ import annotations + +import asyncio +from contextlib import asynccontextmanager + +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + +from decnet.logging import get_logger + +_log = get_logger("db.pool") + +# Hold strong refs to in-flight cleanup tasks so they aren't GC'd mid-run. +_cleanup_tasks: set[asyncio.Task] = set() + + +def _detach_close(session: AsyncSession) -> None: + """Hand session cleanup to a fresh task so the caller's cancellation + doesn't interrupt it. + + ``asyncio.shield`` doesn't help on the exception path: shield prevents + *other* tasks from cancelling the inner coroutine, but if the *current* + task is already cancelled, its next ``await`` re-raises + ``CancelledError`` as soon as the inner coroutine yields. That's what + happens when uvicorn cancels a request mid-query — the rollback inside + ``session.close()`` can't complete, and the aiomysql connection is + orphaned (pool logs "non-checked-in connection" on GC). + + A fresh task isn't subject to the caller's pending cancellation, so + ``close()`` (or the ``invalidate()`` fallback for a dead connection) + runs to completion and the pool reclaims the connection promptly. + + Fire-and-forget on purpose: the caller is already unwinding and must + not wait on cleanup. + """ + async def _cleanup() -> None: + try: + await session.close() + except BaseException: + try: + session.sync_session.invalidate() + except BaseException: + _log.debug("detach-close: invalidate failed", exc_info=True) + + try: + loop = asyncio.get_running_loop() + except RuntimeError: + # No running loop (shutdown path) — best-effort sync invalidate. + try: + session.sync_session.invalidate() + except BaseException: + _log.debug("detach-close: no-loop invalidate failed", exc_info=True) + return + task = loop.create_task(_cleanup()) + _cleanup_tasks.add(task) + # Consume any exception to silence "Task exception was never retrieved". + task.add_done_callback(lambda t: (_cleanup_tasks.discard(t), t.exception())) + + +@asynccontextmanager +async def _safe_session(factory: async_sessionmaker[AsyncSession]): + """Session context manager that keeps close() reliable under cancellation. + + Success path: await close() inline so the caller observes cleanup + (commit visibility, connection release) before proceeding. + + Exception path (includes CancelledError from client disconnects): + detach close() to a fresh task. The caller is unwinding and its + own cancellation would abort an inline close mid-rollback, leaving + the aiomysql connection orphaned. + """ + session = factory() + try: + yield session + except BaseException: + _detach_close(session) + raise + else: + await session.close()