Files
DECNET/decnet/web/db/factory.py
anti 721122a7ef chore(types): enable warn_return_any and cast all no-any-return sites
Turn on mypy warn_return_any (pyproject) and resolve the 84 resulting
[no-any-return] errors across 43 files with typing.cast() at the return
sites — runtime no-ops that make the declared return type explicit where a
dependency (SQLAlchemy scalar/first/one, httpx .json(), subprocess, docker
SDK) hands back Any. No behavior change: no DTO/table field types altered, no
validation/coercion calls added, every cast reflects the true runtime type.

Locks in return-type strictness so the class of bug where a function silently
widens to Any can't regress. mypy decnet/ clean; adversarially verified
behavior-preserving (84 casts 1:1 with prior returns).

Bump tornado 6.5.5 -> 6.5.7 (CVE-2026-49854, transitive via snakeviz).
2026-06-12 18:21:22 -04:00

36 lines
1.2 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Repository factory — selects a :class:`BaseRepository` implementation based on
``DECNET_DB_TYPE`` (``sqlite`` or ``mysql``).
"""
from __future__ import annotations
import os
from typing import Any, cast
from decnet.web.db.repository import BaseRepository
def get_repository(**kwargs: Any) -> BaseRepository:
"""Instantiate the repository implementation selected by ``DECNET_DB_TYPE``.
Keyword arguments are forwarded to the concrete implementation:
* SQLite accepts ``db_path``.
* MySQL accepts ``url`` and engine tuning knobs (``pool_size``, …).
"""
db_type = os.environ.get("DECNET_DB_TYPE", "sqlite").lower()
repo: BaseRepository
if db_type == "sqlite":
from decnet.web.db.sqlite.repository import SQLiteRepository
repo = SQLiteRepository(**kwargs)
elif db_type == "mysql":
from decnet.web.db.mysql.repository import MySQLRepository
repo = MySQLRepository(**kwargs)
else:
raise ValueError(f"Unsupported database type: {db_type}")
from decnet.telemetry import wrap_repository
return cast(BaseRepository, wrap_repository(repo))