- Implement MySQLRepository extending BaseRepository - Add SQLAlchemy/SQLModel ORM abstraction layer (sqlmodel_repo.py) - Support connection pooling and tuning via DECNET_DB_URL env var - Cross-compatible with SQLite backend via factory pattern - Prepared for production deployment with MySQL SIEM/ELK integration
99 lines
3.2 KiB
Python
99 lines
3.2 KiB
Python
"""
|
|
MySQL async engine factory.
|
|
|
|
Builds a SQLAlchemy AsyncEngine against MySQL using the ``aiomysql`` driver.
|
|
|
|
Connection info is resolved (in order of precedence):
|
|
|
|
1. An explicit ``url`` argument passed to :func:`get_async_engine`
|
|
2. ``DECNET_DB_URL`` — full SQLAlchemy URL
|
|
3. Component env vars:
|
|
``DECNET_DB_HOST`` (default ``localhost``)
|
|
``DECNET_DB_PORT`` (default ``3306``)
|
|
``DECNET_DB_NAME`` (default ``decnet``)
|
|
``DECNET_DB_USER`` (default ``decnet``)
|
|
``DECNET_DB_PASSWORD`` (default empty — raises unless pytest is running)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
from typing import Optional
|
|
from urllib.parse import quote_plus
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
|
|
|
|
|
|
DEFAULT_POOL_SIZE = 10
|
|
DEFAULT_MAX_OVERFLOW = 20
|
|
DEFAULT_POOL_RECYCLE = 3600 # seconds — avoid MySQL ``wait_timeout`` disconnects
|
|
DEFAULT_POOL_PRE_PING = True
|
|
|
|
|
|
def build_mysql_url(
|
|
host: Optional[str] = None,
|
|
port: Optional[int] = None,
|
|
database: Optional[str] = None,
|
|
user: Optional[str] = None,
|
|
password: Optional[str] = None,
|
|
) -> str:
|
|
"""Compose an async SQLAlchemy URL for MySQL using the aiomysql driver.
|
|
|
|
Component args override env vars. Password is percent-encoded so special
|
|
characters (``@``, ``:``, ``/``…) don't break URL parsing.
|
|
"""
|
|
host = host or os.environ.get("DECNET_DB_HOST", "localhost")
|
|
port = port or int(os.environ.get("DECNET_DB_PORT", "3306"))
|
|
database = database or os.environ.get("DECNET_DB_NAME", "decnet")
|
|
user = user or os.environ.get("DECNET_DB_USER", "decnet")
|
|
|
|
if password is None:
|
|
password = os.environ.get("DECNET_DB_PASSWORD", "")
|
|
|
|
# Allow empty passwords during tests (pytest sets PYTEST_* env vars).
|
|
# Outside tests, an empty MySQL password is almost never intentional.
|
|
if not password and not any(k.startswith("PYTEST") for k in os.environ):
|
|
raise ValueError(
|
|
"DECNET_DB_PASSWORD is not set. Either export it, set DECNET_DB_URL, "
|
|
"or run under pytest for an empty-password default."
|
|
)
|
|
|
|
pw_enc = quote_plus(password)
|
|
user_enc = quote_plus(user)
|
|
return f"mysql+aiomysql://{user_enc}:{pw_enc}@{host}:{port}/{database}"
|
|
|
|
|
|
def resolve_url(url: Optional[str] = None) -> str:
|
|
"""Pick a connection URL: explicit arg → DECNET_DB_URL env → built from components."""
|
|
if url:
|
|
return url
|
|
env_url = os.environ.get("DECNET_DB_URL")
|
|
if env_url:
|
|
return env_url
|
|
return build_mysql_url()
|
|
|
|
|
|
def get_async_engine(
|
|
url: Optional[str] = None,
|
|
*,
|
|
pool_size: int = DEFAULT_POOL_SIZE,
|
|
max_overflow: int = DEFAULT_MAX_OVERFLOW,
|
|
pool_recycle: int = DEFAULT_POOL_RECYCLE,
|
|
pool_pre_ping: bool = DEFAULT_POOL_PRE_PING,
|
|
echo: bool = False,
|
|
) -> AsyncEngine:
|
|
"""Create an AsyncEngine for MySQL.
|
|
|
|
Defaults tuned for a dashboard workload: a modest pool, hourly recycle
|
|
to sidestep MySQL's idle-connection reaper, and pre-ping to fail fast
|
|
if a pooled connection has been killed server-side.
|
|
"""
|
|
dsn = resolve_url(url)
|
|
return create_async_engine(
|
|
dsn,
|
|
echo=echo,
|
|
pool_size=pool_size,
|
|
max_overflow=max_overflow,
|
|
pool_recycle=pool_recycle,
|
|
pool_pre_ping=pool_pre_ping,
|
|
)
|