feat: add MySQL backend support for DECNET database
- Implement MySQLRepository extending BaseRepository - Add SQLAlchemy/SQLModel ORM abstraction layer (sqlmodel_repo.py) - Support connection pooling and tuning via DECNET_DB_URL env var - Cross-compatible with SQLite backend via factory pattern - Prepared for production deployment with MySQL SIEM/ELK integration
This commit is contained in:
0
decnet/web/db/mysql/__init__.py
Normal file
0
decnet/web/db/mysql/__init__.py
Normal file
BIN
decnet/web/db/mysql/__pycache__/__init__.cpython-314.pyc
Normal file
BIN
decnet/web/db/mysql/__pycache__/__init__.cpython-314.pyc
Normal file
Binary file not shown.
BIN
decnet/web/db/mysql/__pycache__/database.cpython-314.pyc
Normal file
BIN
decnet/web/db/mysql/__pycache__/database.cpython-314.pyc
Normal file
Binary file not shown.
BIN
decnet/web/db/mysql/__pycache__/repository.cpython-314.pyc
Normal file
BIN
decnet/web/db/mysql/__pycache__/repository.cpython-314.pyc
Normal file
Binary file not shown.
98
decnet/web/db/mysql/database.py
Normal file
98
decnet/web/db/mysql/database.py
Normal file
@@ -0,0 +1,98 @@
|
||||
"""
|
||||
MySQL async engine factory.
|
||||
|
||||
Builds a SQLAlchemy AsyncEngine against MySQL using the ``aiomysql`` driver.
|
||||
|
||||
Connection info is resolved (in order of precedence):
|
||||
|
||||
1. An explicit ``url`` argument passed to :func:`get_async_engine`
|
||||
2. ``DECNET_DB_URL`` — full SQLAlchemy URL
|
||||
3. Component env vars:
|
||||
``DECNET_DB_HOST`` (default ``localhost``)
|
||||
``DECNET_DB_PORT`` (default ``3306``)
|
||||
``DECNET_DB_NAME`` (default ``decnet``)
|
||||
``DECNET_DB_USER`` (default ``decnet``)
|
||||
``DECNET_DB_PASSWORD`` (default empty — raises unless pytest is running)
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from typing import Optional
|
||||
from urllib.parse import quote_plus
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
|
||||
|
||||
|
||||
DEFAULT_POOL_SIZE = 10
|
||||
DEFAULT_MAX_OVERFLOW = 20
|
||||
DEFAULT_POOL_RECYCLE = 3600 # seconds — avoid MySQL ``wait_timeout`` disconnects
|
||||
DEFAULT_POOL_PRE_PING = True
|
||||
|
||||
|
||||
def build_mysql_url(
|
||||
host: Optional[str] = None,
|
||||
port: Optional[int] = None,
|
||||
database: Optional[str] = None,
|
||||
user: Optional[str] = None,
|
||||
password: Optional[str] = None,
|
||||
) -> str:
|
||||
"""Compose an async SQLAlchemy URL for MySQL using the aiomysql driver.
|
||||
|
||||
Component args override env vars. Password is percent-encoded so special
|
||||
characters (``@``, ``:``, ``/``…) don't break URL parsing.
|
||||
"""
|
||||
host = host or os.environ.get("DECNET_DB_HOST", "localhost")
|
||||
port = port or int(os.environ.get("DECNET_DB_PORT", "3306"))
|
||||
database = database or os.environ.get("DECNET_DB_NAME", "decnet")
|
||||
user = user or os.environ.get("DECNET_DB_USER", "decnet")
|
||||
|
||||
if password is None:
|
||||
password = os.environ.get("DECNET_DB_PASSWORD", "")
|
||||
|
||||
# Allow empty passwords during tests (pytest sets PYTEST_* env vars).
|
||||
# Outside tests, an empty MySQL password is almost never intentional.
|
||||
if not password and not any(k.startswith("PYTEST") for k in os.environ):
|
||||
raise ValueError(
|
||||
"DECNET_DB_PASSWORD is not set. Either export it, set DECNET_DB_URL, "
|
||||
"or run under pytest for an empty-password default."
|
||||
)
|
||||
|
||||
pw_enc = quote_plus(password)
|
||||
user_enc = quote_plus(user)
|
||||
return f"mysql+aiomysql://{user_enc}:{pw_enc}@{host}:{port}/{database}"
|
||||
|
||||
|
||||
def resolve_url(url: Optional[str] = None) -> str:
|
||||
"""Pick a connection URL: explicit arg → DECNET_DB_URL env → built from components."""
|
||||
if url:
|
||||
return url
|
||||
env_url = os.environ.get("DECNET_DB_URL")
|
||||
if env_url:
|
||||
return env_url
|
||||
return build_mysql_url()
|
||||
|
||||
|
||||
def get_async_engine(
|
||||
url: Optional[str] = None,
|
||||
*,
|
||||
pool_size: int = DEFAULT_POOL_SIZE,
|
||||
max_overflow: int = DEFAULT_MAX_OVERFLOW,
|
||||
pool_recycle: int = DEFAULT_POOL_RECYCLE,
|
||||
pool_pre_ping: bool = DEFAULT_POOL_PRE_PING,
|
||||
echo: bool = False,
|
||||
) -> AsyncEngine:
|
||||
"""Create an AsyncEngine for MySQL.
|
||||
|
||||
Defaults tuned for a dashboard workload: a modest pool, hourly recycle
|
||||
to sidestep MySQL's idle-connection reaper, and pre-ping to fail fast
|
||||
if a pooled connection has been killed server-side.
|
||||
"""
|
||||
dsn = resolve_url(url)
|
||||
return create_async_engine(
|
||||
dsn,
|
||||
echo=echo,
|
||||
pool_size=pool_size,
|
||||
max_overflow=max_overflow,
|
||||
pool_recycle=pool_recycle,
|
||||
pool_pre_ping=pool_pre_ping,
|
||||
)
|
||||
87
decnet/web/db/mysql/repository.py
Normal file
87
decnet/web/db/mysql/repository.py
Normal file
@@ -0,0 +1,87 @@
|
||||
"""
|
||||
MySQL implementation of :class:`BaseRepository`.
|
||||
|
||||
Inherits the portable SQLModel query code from :class:`SQLModelRepository`
|
||||
and only overrides the two places where MySQL's SQL dialect differs from
|
||||
SQLite's:
|
||||
|
||||
* :meth:`_migrate_attackers_table` — uses ``information_schema`` (MySQL
|
||||
has no ``PRAGMA``).
|
||||
* :meth:`get_log_histogram` — uses ``FROM_UNIXTIME`` /
|
||||
``UNIX_TIMESTAMP`` + integer division for bucketing.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import List, Optional
|
||||
|
||||
from sqlalchemy import func, select, text, literal_column
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
||||
from sqlmodel.sql.expression import SelectOfScalar
|
||||
|
||||
from decnet.web.db.models import Log
|
||||
from decnet.web.db.mysql.database import get_async_engine
|
||||
from decnet.web.db.sqlmodel_repo import SQLModelRepository
|
||||
|
||||
|
||||
class MySQLRepository(SQLModelRepository):
|
||||
"""MySQL backend — uses ``aiomysql``."""
|
||||
|
||||
def __init__(self, url: Optional[str] = None, **engine_kwargs) -> None:
|
||||
self.engine = get_async_engine(url=url, **engine_kwargs)
|
||||
self.session_factory = async_sessionmaker(
|
||||
self.engine, class_=AsyncSession, expire_on_commit=False
|
||||
)
|
||||
|
||||
async def _migrate_attackers_table(self) -> None:
|
||||
"""Drop the legacy (pre-UUID) ``attackers`` table if it exists without a ``uuid`` column.
|
||||
|
||||
MySQL exposes column metadata via ``information_schema.COLUMNS``.
|
||||
``DATABASE()`` scopes the lookup to the currently connected schema.
|
||||
"""
|
||||
async with self.engine.begin() as conn:
|
||||
rows = (await conn.execute(text(
|
||||
"SELECT COLUMN_NAME FROM information_schema.COLUMNS "
|
||||
"WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'attackers'"
|
||||
))).fetchall()
|
||||
if rows and not any(r[0] == "uuid" for r in rows):
|
||||
await conn.execute(text("DROP TABLE attackers"))
|
||||
|
||||
def _json_field_equals(self, key: str):
|
||||
# MySQL 5.7+ exposes JSON_EXTRACT; quoted string result returned for
|
||||
# TEXT-stored JSON, same behavior we rely on in SQLite.
|
||||
return text(f"JSON_UNQUOTE(JSON_EXTRACT(fields, '$.{key}')) = :val")
|
||||
|
||||
async def get_log_histogram(
|
||||
self,
|
||||
search: Optional[str] = None,
|
||||
start_time: Optional[str] = None,
|
||||
end_time: Optional[str] = None,
|
||||
interval_minutes: int = 15,
|
||||
) -> List[dict]:
|
||||
bucket_seconds = max(interval_minutes, 1) * 60
|
||||
# Truncate each timestamp to the start of its bucket:
|
||||
# FROM_UNIXTIME( (UNIX_TIMESTAMP(timestamp) DIV N) * N )
|
||||
# DIV is MySQL's integer division operator.
|
||||
bucket_expr = literal_column(
|
||||
f"FROM_UNIXTIME((UNIX_TIMESTAMP(timestamp) DIV {bucket_seconds}) * {bucket_seconds})"
|
||||
).label("bucket_time")
|
||||
|
||||
statement: SelectOfScalar = select(bucket_expr, func.count().label("count")).select_from(Log)
|
||||
statement = self._apply_filters(statement, search, start_time, end_time)
|
||||
statement = statement.group_by(literal_column("bucket_time")).order_by(
|
||||
literal_column("bucket_time")
|
||||
)
|
||||
|
||||
async with self.session_factory() as session:
|
||||
results = await session.execute(statement)
|
||||
# Normalize to ISO string for API parity with the SQLite backend
|
||||
# (SQLite's datetime() returns a string already; FROM_UNIXTIME
|
||||
# returns a datetime).
|
||||
out: List[dict] = []
|
||||
for r in results.all():
|
||||
ts = r[0]
|
||||
out.append({
|
||||
"time": ts.isoformat(sep=" ") if hasattr(ts, "isoformat") else ts,
|
||||
"count": r[1],
|
||||
})
|
||||
return out
|
||||
Reference in New Issue
Block a user