refactor(db): run Alembic at boot, retire ad-hoc _migrate_* helpers

initialize() now delegates to _apply_schema(): real boots run
'alembic upgrade head' (schema owned by the migration history); tests
(DECNET_TESTING=1) keep create_all, which is faster and needs no upgrade
path. MySQL wraps the upgrade in the existing GET_LOCK advisory lock so
concurrent uvicorn workers don't race on DDL.

Deletes the three _migrate_* crimes (attackers-table legacy drop +
GeoIP backfill, TEXT->MEDIUMTEXT widening) — all now handled by the
baseline migration and the _BIG_TEXT model variants. Drops the test
file that only exercised the deleted helpers; adds tests pinning the
alembic-vs-create_all gate and guarding that every model table is in
the migration head.
This commit is contained in:
2026-06-16 16:31:10 -04:00
parent ef4d67cbef
commit 372375194c
6 changed files with 157 additions and 358 deletions

38
decnet/web/db/migrate.py Normal file
View File

@@ -0,0 +1,38 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Programmatic Alembic upgrade, run at app boot for managed databases.
Real boots run ``alembic upgrade head`` so the schema is owned by the
versioned migration history. Test/ephemeral DBs skip this and use
``SQLModel.metadata.create_all`` instead (see
:meth:`SQLModelRepository._apply_schema`) — faster, and a throwaway DB never
needs an upgrade path.
The migration scripts live inside the package (``db/migrations``), so this
works from an installed wheel without depending on the repo-root
``alembic.ini`` (that file exists only for the ``alembic`` CLI).
"""
from __future__ import annotations
from pathlib import Path
from alembic import command
from alembic.config import Config
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import AsyncEngine
_MIGRATIONS_DIR = Path(__file__).resolve().parent / "migrations"
def _upgrade(connection: Connection) -> None:
# No ini file: env.py skips fileConfig and reuses this connection
# (passed via attributes) instead of building its own engine.
cfg = Config()
cfg.set_main_option("script_location", str(_MIGRATIONS_DIR))
cfg.attributes["connection"] = connection
command.upgrade(cfg, "head")
async def run_migrations(engine: AsyncEngine) -> None:
"""Upgrade ``engine``'s database to the latest revision (alembic head)."""
async with engine.begin() as conn:
await conn.run_sync(_upgrade)

View File

@@ -3,13 +3,12 @@
MySQL implementation of :class:`BaseRepository`.
Inherits the portable SQLModel query code from :class:`SQLModelRepository`
and only overrides the two places where MySQL's SQL dialect differs from
SQLite's:
and only overrides where MySQL's SQL dialect differs from SQLite's:
* :meth:`_migrate_attackers_table` — uses ``information_schema`` (MySQL
has no ``PRAGMA``).
* :meth:`get_log_histogram` — uses ``FROM_UNIXTIME`` /
``UNIX_TIMESTAMP`` + integer division for bucketing.
* :meth:`_apply_schema` — wraps the Alembic upgrade in a MySQL advisory
lock to serialize DDL across concurrent workers.
* :meth:`get_log_histogram` — uses ``FROM_UNIXTIME`` / ``UNIX_TIMESTAMP`` +
integer division for bucketing.
"""
from __future__ import annotations
@@ -34,86 +33,24 @@ class MySQLRepository(SQLModelRepository):
self.engine, class_=AsyncSession, expire_on_commit=False
)
async def _migrate_attackers_table(self) -> None:
"""Drop the legacy (pre-UUID) ``attackers`` table if it exists without a ``uuid`` column.
async def _apply_schema(self) -> None:
"""Run the Alembic upgrade under a MySQL advisory lock.
Also adds the GeoIP columns (``country_code``, ``country_source``)
to existing tables that predate them. MySQL exposes column
metadata via ``information_schema.COLUMNS``; ``DATABASE()`` scopes
the lookup to the currently connected schema.
The lock serializes DDL across concurrent uvicorn workers — Alembic
does not lock MySQL DDL itself, so without it parallel workers race
('Table was skipped since its definition is being modified by
concurrent DDL'). Tests (``DECNET_TESTING=1``) take the base
``create_all`` path, which is single-process and needs no lock.
"""
async with self.engine.begin() as conn:
rows = (await conn.execute(text(
"SELECT COLUMN_NAME FROM information_schema.COLUMNS "
"WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'attackers'"
))).fetchall()
if not rows:
return # table absent; create_all() handles it.
if not any(r[0] == "uuid" for r in rows):
await conn.execute(text("DROP TABLE attackers"))
return
existing_cols = {r[0] for r in rows}
if "country_code" not in existing_cols:
await conn.execute(text(
"ALTER TABLE attackers "
"ADD COLUMN country_code VARCHAR(2) NULL, "
"ADD INDEX ix_attackers_country_code (country_code)"
))
if "country_source" not in existing_cols:
await conn.execute(text(
"ALTER TABLE attackers ADD COLUMN country_source VARCHAR(16) NULL"
))
async def _migrate_column_types(self) -> None:
"""Upgrade TEXT → MEDIUMTEXT for columns that accumulate large JSON blobs.
``create_all()`` never alters existing columns, so tables created before
``_BIG_TEXT`` was introduced keep their 64 KiB ``TEXT`` cap. This method
inspects ``information_schema`` and issues ``ALTER TABLE … MODIFY COLUMN``
for each offending column found.
"""
targets: dict[str, dict[str, str]] = {
"attackers": {
"commands": "MEDIUMTEXT NOT NULL DEFAULT '[]'",
"fingerprints": "MEDIUMTEXT NOT NULL DEFAULT '[]'",
"services": "MEDIUMTEXT NOT NULL DEFAULT '[]'",
"deckies": "MEDIUMTEXT NOT NULL DEFAULT '[]'",
},
"state": {
"value": "MEDIUMTEXT NOT NULL",
},
}
async with self.engine.begin() as conn:
rows = (await conn.execute(text(
"SELECT TABLE_NAME, COLUMN_NAME FROM information_schema.COLUMNS "
"WHERE TABLE_SCHEMA = DATABASE() "
" AND TABLE_NAME IN ('attackers', 'state') "
" AND COLUMN_NAME IN ('commands','fingerprints','services','deckies','value') "
" AND DATA_TYPE = 'text'"
))).fetchall()
for table_name, col_name in rows:
spec = targets.get(table_name, {}).get(col_name)
if spec:
await conn.execute(text(
f"ALTER TABLE `{table_name}` MODIFY COLUMN `{col_name}` {spec}"
))
async def initialize(self) -> None:
"""Create tables and run all MySQL-specific migrations.
Uses a MySQL advisory lock to serialize DDL across concurrent
uvicorn workers — prevents the 'Table was skipped since its
definition is being modified by concurrent DDL' race.
"""
from sqlmodel import SQLModel
import os
if os.environ.get("DECNET_TESTING") == "1":
await super()._apply_schema()
return
from decnet.web.db.migrate import run_migrations
async with self.engine.connect() as lock_conn:
await lock_conn.execute(text("SELECT GET_LOCK('decnet_schema_init', 30)"))
try:
await self._migrate_attackers_table()
await self._migrate_column_types()
async with self.engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
await self._ensure_admin_user()
await run_migrations(self.engine)
finally:
await lock_conn.execute(text("SELECT RELEASE_LOCK('decnet_schema_init')"))
await lock_conn.close()

View File

@@ -15,9 +15,9 @@ from decnet.web.db.sqlmodel_repo import SQLModelRepository
class SQLiteRepository(SQLModelRepository):
"""SQLite backend — uses ``aiosqlite``.
Overrides the two places where SQLite's SQL dialect differs from
MySQL/PostgreSQL: legacy-schema migration (via ``PRAGMA table_info``)
and the log-histogram bucket expression (via ``strftime`` + ``unixepoch``).
Overrides the one place where SQLite's SQL dialect differs from
MySQL/PostgreSQL: the log-histogram bucket expression (via ``strftime``
+ ``unixepoch``). Schema is managed by Alembic (see db/migrate.py).
"""
def __init__(self, db_path: str = str(_ROOT / "decnet.db")) -> None:
@@ -27,35 +27,6 @@ class SQLiteRepository(SQLModelRepository):
self.engine, class_=AsyncSession, expire_on_commit=False
)
async def _migrate_attackers_table(self) -> None:
"""Drop the old attackers table if it lacks the uuid column (pre-UUID schema).
Also adds the GeoIP columns (``country_code``, ``country_source``)
to existing tables that predate them. SQLite's
``ALTER TABLE ADD COLUMN`` is idempotent only if we gate on
``PRAGMA table_info`` first — re-adding raises.
"""
async with self.engine.begin() as conn:
rows = (await conn.execute(text("PRAGMA table_info(attackers)"))).fetchall()
if rows and not any(r[1] == "uuid" for r in rows):
await conn.execute(text("DROP TABLE attackers"))
return # create_all() rebuilds fresh — no need to patch columns.
if not rows:
return # table absent; create_all() handles it.
existing_cols = {r[1] for r in rows}
if "country_code" not in existing_cols:
await conn.execute(text(
"ALTER TABLE attackers ADD COLUMN country_code VARCHAR(2)"
))
await conn.execute(text(
"CREATE INDEX IF NOT EXISTS ix_attackers_country_code "
"ON attackers (country_code)"
))
if "country_source" not in existing_cols:
await conn.execute(text(
"ALTER TABLE attackers ADD COLUMN country_source VARCHAR(16)"
))
def _json_field_equals(self, key: str, param_name: str = "val"):
# SQLite stores JSON as text; json_extract is the canonical accessor.
return text(f"json_extract(fields, '$.{key}') = :{param_name}")

View File

@@ -6,8 +6,7 @@ Contains all dialect-portable query code used by the SQLite and MySQL
backends. Dialect-specific behavior lives in subclasses:
* engine/session construction (``__init__``)
* ``_migrate_attackers_table`` (legacy schema check; DDL introspection
is not portable)
* ``_apply_schema`` (MySQL wraps the Alembic upgrade in an advisory lock)
* ``get_log_histogram`` (date-bucket expression differs per dialect)
"""
from __future__ import annotations
@@ -103,14 +102,27 @@ class SQLModelRepository(
# ------------------------------------------------------------ lifecycle
async def initialize(self) -> None:
"""Create tables if absent and seed the admin user."""
from sqlmodel import SQLModel
await self._migrate_attackers_table()
async with self.engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
"""Bring the schema up to date and seed the admin user."""
await self._apply_schema()
await self._ensure_admin_user()
await self._ensure_contract_user()
async def _apply_schema(self) -> None:
"""Create/upgrade tables.
Real boots run Alembic migrations — the schema is owned by the
versioned migration history. Test/ephemeral DBs (``DECNET_TESTING=1``)
skip Alembic and use ``create_all``: faster, and an in-memory/throwaway
DB never needs an upgrade path.
"""
from sqlmodel import SQLModel
if os.environ.get("DECNET_TESTING") == "1":
async with self.engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
return
from decnet.web.db.migrate import run_migrations
await run_migrations(self.engine)
async def reinitialize(self) -> None:
"""Re-create schema (for tests / reset flows). Does NOT drop existing tables."""
from sqlmodel import SQLModel
@@ -165,10 +177,6 @@ class SQLModelRepository(
))
await session.commit()
async def _migrate_attackers_table(self) -> None:
"""Legacy-schema cleanup. Override per dialect (DDL introspection is non-portable)."""
return None
async def get_deckies(self) -> List[dict]:
# The fleet inventory the UI/API sees is fleet_deckies — the
# engine-mirrored table written on EVERY deploy/teardown (CLI or web),