fix(types): T7 — eliminate all remaining 38 mypy errors; fix DeckyRow subscript in engine tests

This commit is contained in:
2026-05-01 02:07:53 -04:00
parent bd50b0d8b2
commit 776861a1b7
21 changed files with 49 additions and 41 deletions

View File

@@ -11,7 +11,7 @@ from typing import Any, AsyncGenerator, Optional
from fastapi import FastAPI, Request, status
from fastapi.exceptions import RequestValidationError
from fastapi.responses import ORJSONResponse
from fastapi.responses import ORJSONResponse, Response
from pydantic import ValidationError
from fastapi.middleware.cors import CORSMiddleware
@@ -218,7 +218,7 @@ app: FastAPI = FastAPI(
)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) # type: ignore[arg-type]
app.add_middleware(SlowAPIMiddleware)
app.add_middleware(
@@ -259,7 +259,7 @@ app.include_router(api_router, prefix="/api/v1")
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError) -> ORJSONResponse:
async def validation_exception_handler(request: Request, exc: RequestValidationError) -> Response:
"""
Handle validation errors with targeted status codes to satisfy contract tests.
Tiered Prioritization:

View File

@@ -19,6 +19,7 @@ def get_repository(**kwargs: Any) -> BaseRepository:
* MySQL accepts ``url`` and engine tuning knobs (``pool_size``, …).
"""
db_type = os.environ.get("DECNET_DB_TYPE", "sqlite").lower()
repo: BaseRepository
if db_type == "sqlite":
from decnet.web.db.sqlite.repository import SQLiteRepository

View File

@@ -12,11 +12,11 @@ SQLite's:
"""
from __future__ import annotations
from typing import List, Optional
from typing import Any, List, Optional
from sqlalchemy import func, select, text, literal_column
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from sqlmodel.sql.expression import SelectOfScalar
from decnet.web.db.models import Log
from decnet.web.db.mysql.database import get_async_engine
@@ -162,11 +162,11 @@ class MySQLRepository(SQLModelRepository):
# Truncate each timestamp to the start of its bucket:
# FROM_UNIXTIME( (UNIX_TIMESTAMP(timestamp) DIV N) * N )
# DIV is MySQL's integer division operator.
bucket_expr = literal_column(
bucket_expr: Any = literal_column(
f"FROM_UNIXTIME((UNIX_TIMESTAMP(timestamp) DIV {bucket_seconds}) * {bucket_seconds})"
).label("bucket_time")
statement: SelectOfScalar = select(bucket_expr, func.count().label("count")).select_from(Log)
statement: Any = select(bucket_expr, func.count().label("count")).select_from(Log)
statement = self._apply_filters(statement, search, start_time, end_time)
statement = statement.group_by(literal_column("bucket_time")).order_by(
literal_column("bucket_time")

View File

@@ -1,8 +1,8 @@
from typing import List, Optional
from typing import Any, List, Optional
from sqlalchemy import func, select, text, literal_column
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from sqlmodel.sql.expression import SelectOfScalar
from decnet.config import _ROOT
from decnet.web.db.models import Log
@@ -91,11 +91,11 @@ class SQLiteRepository(SQLModelRepository):
interval_minutes: int = 15,
) -> List[dict]:
bucket_seconds = max(interval_minutes, 1) * 60
bucket_expr = literal_column(
bucket_expr: Any = literal_column(
f"datetime((strftime('%s', timestamp) / {bucket_seconds}) * {bucket_seconds}, 'unixepoch')"
).label("bucket_time")
statement: SelectOfScalar = select(bucket_expr, func.count().label("count")).select_from(Log)
statement: Any = select(bucket_expr, func.count().label("count")).select_from(Log)
statement = self._apply_filters(statement, search, start_time, end_time)
statement = statement.group_by(literal_column("bucket_time")).order_by(
literal_column("bucket_time")

View File

@@ -39,7 +39,7 @@ router = APIRouter()
},
)
@limiter.limit("10/5 minutes", key_func=login_ip_key)
@limiter.limit("10/5 minutes", key_func=login_username_key)
@limiter.limit("10/5 minutes", key_func=login_username_key) # type: ignore[arg-type]
@_traced("api.login")
async def login(request: Request, payload: LoginRequest) -> dict[str, Any]:
_user: Optional[dict[str, Any]] = await get_user_by_username_cached(payload.username)

View File

@@ -108,7 +108,7 @@ async def get_health(user: dict = Depends(require_viewer)) -> Any:
if _docker_client is None:
_docker_client = await asyncio.to_thread(docker.from_env)
await asyncio.to_thread(_docker_client.ping)
await asyncio.to_thread(_docker_client.ping) # type: ignore[union-attr]
_docker_healthy = True
_docker_detail = ""
except Exception as exc:

View File

@@ -4,7 +4,7 @@ from __future__ import annotations
import json
import secrets
from datetime import datetime, timezone
from typing import Any
from typing import Any, cast
from fastapi import APIRouter, Depends, HTTPException
@@ -66,7 +66,7 @@ async def api_create_webhook(
req: WebhookCreateRequest,
admin: dict = Depends(require_admin),
) -> WebhookCreateResponse:
patterns = merge_patterns(req.simple_events, req.topic_patterns)
patterns = merge_patterns(cast(list[str], req.simple_events), req.topic_patterns)
if not patterns:
raise HTTPException(
status_code=400,
@@ -188,7 +188,7 @@ async def api_update_webhook(
# to clear all patterns must explicitly pass both as empty lists.
simple = req.simple_events if req.simple_events is not None else []
raw = req.topic_patterns if req.topic_patterns is not None else []
patterns = merge_patterns(simple, raw)
patterns = merge_patterns(cast(list[str], simple), raw)
if not patterns:
raise HTTPException(
status_code=400,