From 467511e9978fbdecd207c671de2092965c2833fa Mon Sep 17 00:00:00 2001 From: anti Date: Fri, 17 Apr 2026 15:01:49 -0400 Subject: [PATCH] db: switch MySQL driver to asyncmy, env-tune pool, serialize DDL MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - aiomysql → asyncmy on both sides of the URL/import (faster, maintained). - Pool sizing now reads DECNET_DB_POOL_SIZE / MAX_OVERFLOW / RECYCLE / PRE_PING for both SQLite and MySQL engines so stress runs can bump without code edits. - MySQL initialize() now wraps schema DDL in a GET_LOCK advisory lock so concurrent uvicorn workers racing create_all() don't hit 'Table was skipped since its definition is being modified by concurrent DDL'. - sqlite & mysql repo get_log_histogram use the shared _session() helper instead of session_factory() for consistency with the rest of the repo. - SSE stream_events docstring updated to asyncmy. --- decnet/web/db/mysql/database.py | 14 +++++----- decnet/web/db/mysql/repository.py | 27 +++++++++++++------ decnet/web/db/sqlite/database.py | 13 +++++++++ decnet/web/db/sqlite/repository.py | 2 +- decnet/web/router/stream/api_stream_events.py | 2 +- 5 files changed, 41 insertions(+), 17 deletions(-) diff --git a/decnet/web/db/mysql/database.py b/decnet/web/db/mysql/database.py index 73a4185..2e7b329 100644 --- a/decnet/web/db/mysql/database.py +++ b/decnet/web/db/mysql/database.py @@ -1,7 +1,7 @@ """ MySQL async engine factory. -Builds a SQLAlchemy AsyncEngine against MySQL using the ``aiomysql`` driver. +Builds a SQLAlchemy AsyncEngine against MySQL using the ``asyncmy`` driver. Connection info is resolved (in order of precedence): @@ -23,10 +23,10 @@ from urllib.parse import quote_plus from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine -DEFAULT_POOL_SIZE = 10 -DEFAULT_MAX_OVERFLOW = 20 -DEFAULT_POOL_RECYCLE = 3600 # seconds — avoid MySQL ``wait_timeout`` disconnects -DEFAULT_POOL_PRE_PING = True +DEFAULT_POOL_SIZE = int(os.environ.get("DECNET_DB_POOL_SIZE", "20")) +DEFAULT_MAX_OVERFLOW = int(os.environ.get("DECNET_DB_MAX_OVERFLOW", "40")) +DEFAULT_POOL_RECYCLE = int(os.environ.get("DECNET_DB_POOL_RECYCLE", "3600")) +DEFAULT_POOL_PRE_PING = os.environ.get("DECNET_DB_POOL_PRE_PING", "true").lower() == "true" def build_mysql_url( @@ -36,7 +36,7 @@ def build_mysql_url( user: Optional[str] = None, password: Optional[str] = None, ) -> str: - """Compose an async SQLAlchemy URL for MySQL using the aiomysql driver. + """Compose an async SQLAlchemy URL for MySQL using the asyncmy driver. Component args override env vars. Password is percent-encoded so special characters (``@``, ``:``, ``/``…) don't break URL parsing. @@ -59,7 +59,7 @@ def build_mysql_url( pw_enc = quote_plus(password) user_enc = quote_plus(user) - return f"mysql+aiomysql://{user_enc}:{pw_enc}@{host}:{port}/{database}" + return f"mysql+asyncmy://{user_enc}:{pw_enc}@{host}:{port}/{database}" def resolve_url(url: Optional[str] = None) -> str: diff --git a/decnet/web/db/mysql/repository.py b/decnet/web/db/mysql/repository.py index 63fa8d9..f83b4bf 100644 --- a/decnet/web/db/mysql/repository.py +++ b/decnet/web/db/mysql/repository.py @@ -24,7 +24,7 @@ from decnet.web.db.sqlmodel_repo import SQLModelRepository class MySQLRepository(SQLModelRepository): - """MySQL backend — uses ``aiomysql``.""" + """MySQL backend — uses ``asyncmy``.""" def __init__(self, url: Optional[str] = None, **engine_kwargs) -> None: self.engine = get_async_engine(url=url, **engine_kwargs) @@ -81,13 +81,24 @@ class MySQLRepository(SQLModelRepository): )) async def initialize(self) -> None: - """Create tables and run all MySQL-specific migrations.""" + """Create tables and run all MySQL-specific migrations. + + Uses a MySQL advisory lock to serialize DDL across concurrent + uvicorn workers — prevents the 'Table was skipped since its + definition is being modified by concurrent DDL' race. + """ from sqlmodel import SQLModel - await self._migrate_attackers_table() - await self._migrate_column_types() - async with self.engine.begin() as conn: - await conn.run_sync(SQLModel.metadata.create_all) - await self._ensure_admin_user() + async with self.engine.connect() as lock_conn: + await lock_conn.execute(text("SELECT GET_LOCK('decnet_schema_init', 30)")) + try: + await self._migrate_attackers_table() + await self._migrate_column_types() + async with self.engine.begin() as conn: + await conn.run_sync(SQLModel.metadata.create_all) + await self._ensure_admin_user() + finally: + await lock_conn.execute(text("SELECT RELEASE_LOCK('decnet_schema_init')")) + await lock_conn.close() def _json_field_equals(self, key: str): # MySQL 5.7+ exposes JSON_EXTRACT; quoted string result returned for @@ -115,7 +126,7 @@ class MySQLRepository(SQLModelRepository): literal_column("bucket_time") ) - async with self.session_factory() as session: + async with self._session() as session: results = await session.execute(statement) # Normalize to ISO string for API parity with the SQLite backend # (SQLite's datetime() returns a string already; FROM_UNIXTIME diff --git a/decnet/web/db/sqlite/database.py b/decnet/web/db/sqlite/database.py index 9cddf9d..b1b99af 100644 --- a/decnet/web/db/sqlite/database.py +++ b/decnet/web/db/sqlite/database.py @@ -1,3 +1,5 @@ +import os + from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine from sqlalchemy import create_engine, Engine, event from sqlmodel import SQLModel @@ -11,9 +13,20 @@ def get_async_engine(db_path: str) -> AsyncEngine: prefix = "sqlite+aiosqlite:///" if db_path.startswith(":memory:"): prefix = "sqlite+aiosqlite://" + + pool_size = int(os.environ.get("DECNET_DB_POOL_SIZE", "20")) + max_overflow = int(os.environ.get("DECNET_DB_MAX_OVERFLOW", "40")) + + pool_recycle = int(os.environ.get("DECNET_DB_POOL_RECYCLE", "3600")) + pool_pre_ping = os.environ.get("DECNET_DB_POOL_PRE_PING", "true").lower() == "true" + engine = create_async_engine( f"{prefix}{db_path}", echo=False, + pool_size=pool_size, + max_overflow=max_overflow, + pool_recycle=pool_recycle, + pool_pre_ping=pool_pre_ping, connect_args={"uri": True, "timeout": 30}, ) diff --git a/decnet/web/db/sqlite/repository.py b/decnet/web/db/sqlite/repository.py index dc021db..5965d0b 100644 --- a/decnet/web/db/sqlite/repository.py +++ b/decnet/web/db/sqlite/repository.py @@ -54,6 +54,6 @@ class SQLiteRepository(SQLModelRepository): literal_column("bucket_time") ) - async with self.session_factory() as session: + async with self._session() as session: results = await session.execute(statement) return [{"time": r[0], "count": r[1]} for r in results.all()] diff --git a/decnet/web/router/stream/api_stream_events.py b/decnet/web/router/stream/api_stream_events.py index 6e028ac..643e401 100644 --- a/decnet/web/router/stream/api_stream_events.py +++ b/decnet/web/router/stream/api_stream_events.py @@ -66,7 +66,7 @@ async def stream_events( ) -> StreamingResponse: # Prefetch the initial snapshot before entering the streaming generator. - # With aiomysql (pure async TCP I/O), the first DB await inside the generator + # With asyncmy (pure async TCP I/O), the first DB await inside the generator # fires immediately after the ASGI layer sends the keepalive chunk — the HTTP # write and the MySQL read compete for asyncio I/O callbacks and the MySQL # callback can stall. Running these here (normal async context, no streaming)