refactor(db): extract _safe_session/_detach_close to _helpers.py
Module-level session helpers move into sqlmodel_repo/_helpers.py. __init__.py re-exports them so external import paths (decnet.web.db.sqlmodel_repo._safe_session) keep resolving.
This commit is contained in:
@@ -62,79 +62,11 @@ from decnet.web.db.models import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
from contextlib import asynccontextmanager
|
from decnet.web.db.sqlmodel_repo._helpers import ( # noqa: F401 (re-exported for tests/external)
|
||||||
|
_safe_session,
|
||||||
from decnet.logging import get_logger
|
_detach_close,
|
||||||
|
_cleanup_tasks,
|
||||||
_log = get_logger("db.pool")
|
)
|
||||||
|
|
||||||
# Hold strong refs to in-flight cleanup tasks so they aren't GC'd mid-run.
|
|
||||||
_cleanup_tasks: set[asyncio.Task] = set()
|
|
||||||
|
|
||||||
|
|
||||||
def _detach_close(session: AsyncSession) -> None:
|
|
||||||
"""Hand session cleanup to a fresh task so the caller's cancellation
|
|
||||||
doesn't interrupt it.
|
|
||||||
|
|
||||||
``asyncio.shield`` doesn't help on the exception path: shield prevents
|
|
||||||
*other* tasks from cancelling the inner coroutine, but if the *current*
|
|
||||||
task is already cancelled, its next ``await`` re-raises
|
|
||||||
``CancelledError`` as soon as the inner coroutine yields. That's what
|
|
||||||
happens when uvicorn cancels a request mid-query — the rollback inside
|
|
||||||
``session.close()`` can't complete, and the aiomysql connection is
|
|
||||||
orphaned (pool logs "non-checked-in connection" on GC).
|
|
||||||
|
|
||||||
A fresh task isn't subject to the caller's pending cancellation, so
|
|
||||||
``close()`` (or the ``invalidate()`` fallback for a dead connection)
|
|
||||||
runs to completion and the pool reclaims the connection promptly.
|
|
||||||
|
|
||||||
Fire-and-forget on purpose: the caller is already unwinding and must
|
|
||||||
not wait on cleanup.
|
|
||||||
"""
|
|
||||||
async def _cleanup() -> None:
|
|
||||||
try:
|
|
||||||
await session.close()
|
|
||||||
except BaseException:
|
|
||||||
try:
|
|
||||||
session.sync_session.invalidate()
|
|
||||||
except BaseException:
|
|
||||||
_log.debug("detach-close: invalidate failed", exc_info=True)
|
|
||||||
|
|
||||||
try:
|
|
||||||
loop = asyncio.get_running_loop()
|
|
||||||
except RuntimeError:
|
|
||||||
# No running loop (shutdown path) — best-effort sync invalidate.
|
|
||||||
try:
|
|
||||||
session.sync_session.invalidate()
|
|
||||||
except BaseException:
|
|
||||||
_log.debug("detach-close: no-loop invalidate failed", exc_info=True)
|
|
||||||
return
|
|
||||||
task = loop.create_task(_cleanup())
|
|
||||||
_cleanup_tasks.add(task)
|
|
||||||
# Consume any exception to silence "Task exception was never retrieved".
|
|
||||||
task.add_done_callback(lambda t: (_cleanup_tasks.discard(t), t.exception()))
|
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
|
||||||
async def _safe_session(factory: async_sessionmaker[AsyncSession]):
|
|
||||||
"""Session context manager that keeps close() reliable under cancellation.
|
|
||||||
|
|
||||||
Success path: await close() inline so the caller observes cleanup
|
|
||||||
(commit visibility, connection release) before proceeding.
|
|
||||||
|
|
||||||
Exception path (includes CancelledError from client disconnects):
|
|
||||||
detach close() to a fresh task. The caller is unwinding and its
|
|
||||||
own cancellation would abort an inline close mid-rollback, leaving
|
|
||||||
the aiomysql connection orphaned.
|
|
||||||
"""
|
|
||||||
session = factory()
|
|
||||||
try:
|
|
||||||
yield session
|
|
||||||
except BaseException:
|
|
||||||
_detach_close(session)
|
|
||||||
raise
|
|
||||||
else:
|
|
||||||
await session.close()
|
|
||||||
|
|
||||||
|
|
||||||
class SQLModelRepository(BaseRepository):
|
class SQLModelRepository(BaseRepository):
|
||||||
|
|||||||
83
decnet/web/db/sqlmodel_repo/_helpers.py
Normal file
83
decnet/web/db/sqlmodel_repo/_helpers.py
Normal file
@@ -0,0 +1,83 @@
|
|||||||
|
"""Module-level session helpers shared by every repository mixin.
|
||||||
|
|
||||||
|
``_safe_session`` and ``_detach_close`` make session cleanup robust under
|
||||||
|
client-cancellation. See ``_detach_close`` for the full rationale.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from contextlib import asynccontextmanager
|
||||||
|
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
||||||
|
|
||||||
|
from decnet.logging import get_logger
|
||||||
|
|
||||||
|
_log = get_logger("db.pool")
|
||||||
|
|
||||||
|
# Hold strong refs to in-flight cleanup tasks so they aren't GC'd mid-run.
|
||||||
|
_cleanup_tasks: set[asyncio.Task] = set()
|
||||||
|
|
||||||
|
|
||||||
|
def _detach_close(session: AsyncSession) -> None:
|
||||||
|
"""Hand session cleanup to a fresh task so the caller's cancellation
|
||||||
|
doesn't interrupt it.
|
||||||
|
|
||||||
|
``asyncio.shield`` doesn't help on the exception path: shield prevents
|
||||||
|
*other* tasks from cancelling the inner coroutine, but if the *current*
|
||||||
|
task is already cancelled, its next ``await`` re-raises
|
||||||
|
``CancelledError`` as soon as the inner coroutine yields. That's what
|
||||||
|
happens when uvicorn cancels a request mid-query — the rollback inside
|
||||||
|
``session.close()`` can't complete, and the aiomysql connection is
|
||||||
|
orphaned (pool logs "non-checked-in connection" on GC).
|
||||||
|
|
||||||
|
A fresh task isn't subject to the caller's pending cancellation, so
|
||||||
|
``close()`` (or the ``invalidate()`` fallback for a dead connection)
|
||||||
|
runs to completion and the pool reclaims the connection promptly.
|
||||||
|
|
||||||
|
Fire-and-forget on purpose: the caller is already unwinding and must
|
||||||
|
not wait on cleanup.
|
||||||
|
"""
|
||||||
|
async def _cleanup() -> None:
|
||||||
|
try:
|
||||||
|
await session.close()
|
||||||
|
except BaseException:
|
||||||
|
try:
|
||||||
|
session.sync_session.invalidate()
|
||||||
|
except BaseException:
|
||||||
|
_log.debug("detach-close: invalidate failed", exc_info=True)
|
||||||
|
|
||||||
|
try:
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
except RuntimeError:
|
||||||
|
# No running loop (shutdown path) — best-effort sync invalidate.
|
||||||
|
try:
|
||||||
|
session.sync_session.invalidate()
|
||||||
|
except BaseException:
|
||||||
|
_log.debug("detach-close: no-loop invalidate failed", exc_info=True)
|
||||||
|
return
|
||||||
|
task = loop.create_task(_cleanup())
|
||||||
|
_cleanup_tasks.add(task)
|
||||||
|
# Consume any exception to silence "Task exception was never retrieved".
|
||||||
|
task.add_done_callback(lambda t: (_cleanup_tasks.discard(t), t.exception()))
|
||||||
|
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def _safe_session(factory: async_sessionmaker[AsyncSession]):
|
||||||
|
"""Session context manager that keeps close() reliable under cancellation.
|
||||||
|
|
||||||
|
Success path: await close() inline so the caller observes cleanup
|
||||||
|
(commit visibility, connection release) before proceeding.
|
||||||
|
|
||||||
|
Exception path (includes CancelledError from client disconnects):
|
||||||
|
detach close() to a fresh task. The caller is unwinding and its
|
||||||
|
own cancellation would abort an inline close mid-rollback, leaving
|
||||||
|
the aiomysql connection orphaned.
|
||||||
|
"""
|
||||||
|
session = factory()
|
||||||
|
try:
|
||||||
|
yield session
|
||||||
|
except BaseException:
|
||||||
|
_detach_close(session)
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
await session.close()
|
||||||
Reference in New Issue
Block a user