fix(types): T7 — eliminate all remaining 38 mypy errors; fix DeckyRow subscript in engine tests

This commit is contained in:
2026-05-01 02:07:53 -04:00
parent 7e4da95091
commit ee24a7551f
27 changed files with 58 additions and 50 deletions

View File

@@ -11,7 +11,7 @@ from typing import Any, AsyncGenerator, Optional
from fastapi import FastAPI, Request, status
from fastapi.exceptions import RequestValidationError
from fastapi.responses import ORJSONResponse
from fastapi.responses import ORJSONResponse, Response
from pydantic import ValidationError
from fastapi.middleware.cors import CORSMiddleware
@@ -226,7 +226,7 @@ app: FastAPI = FastAPI(
)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) # type: ignore[arg-type]
app.add_middleware(SlowAPIMiddleware)
app.add_middleware(
@@ -267,7 +267,7 @@ app.include_router(api_router, prefix="/api/v1")
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError) -> ORJSONResponse:
async def validation_exception_handler(request: Request, exc: RequestValidationError) -> Response:
"""
Handle validation errors with targeted status codes to satisfy contract tests.
Tiered Prioritization:

View File

@@ -19,6 +19,7 @@ def get_repository(**kwargs: Any) -> BaseRepository:
* MySQL accepts ``url`` and engine tuning knobs (``pool_size``, …).
"""
db_type = os.environ.get("DECNET_DB_TYPE", "sqlite").lower()
repo: BaseRepository
if db_type == "sqlite":
from decnet.web.db.sqlite.repository import SQLiteRepository

View File

@@ -12,11 +12,11 @@ SQLite's:
"""
from __future__ import annotations
from typing import List, Optional
from typing import Any, List, Optional
from sqlalchemy import func, select, text, literal_column
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from sqlmodel.sql.expression import SelectOfScalar
from decnet.web.db.models import Log
from decnet.web.db.mysql.database import get_async_engine
@@ -162,11 +162,11 @@ class MySQLRepository(SQLModelRepository):
# Truncate each timestamp to the start of its bucket:
# FROM_UNIXTIME( (UNIX_TIMESTAMP(timestamp) DIV N) * N )
# DIV is MySQL's integer division operator.
bucket_expr = literal_column(
bucket_expr: Any = literal_column(
f"FROM_UNIXTIME((UNIX_TIMESTAMP(timestamp) DIV {bucket_seconds}) * {bucket_seconds})"
).label("bucket_time")
statement: SelectOfScalar = select(bucket_expr, func.count().label("count")).select_from(Log)
statement: Any = select(bucket_expr, func.count().label("count")).select_from(Log)
statement = self._apply_filters(statement, search, start_time, end_time)
statement = statement.group_by(literal_column("bucket_time")).order_by(
literal_column("bucket_time")

View File

@@ -1,8 +1,8 @@
from typing import List, Optional
from typing import Any, List, Optional
from sqlalchemy import func, select, text, literal_column
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from sqlmodel.sql.expression import SelectOfScalar
from decnet.config import _ROOT
from decnet.web.db.models import Log
@@ -91,11 +91,11 @@ class SQLiteRepository(SQLModelRepository):
interval_minutes: int = 15,
) -> List[dict]:
bucket_seconds = max(interval_minutes, 1) * 60
bucket_expr = literal_column(
bucket_expr: Any = literal_column(
f"datetime((strftime('%s', timestamp) / {bucket_seconds}) * {bucket_seconds}, 'unixepoch')"
).label("bucket_time")
statement: SelectOfScalar = select(bucket_expr, func.count().label("count")).select_from(Log)
statement: Any = select(bucket_expr, func.count().label("count")).select_from(Log)
statement = self._apply_filters(statement, search, start_time, end_time)
statement = statement.group_by(literal_column("bucket_time")).order_by(
literal_column("bucket_time")

View File

@@ -39,7 +39,7 @@ router = APIRouter()
},
)
@limiter.limit("10/5 minutes", key_func=login_ip_key)
@limiter.limit("10/5 minutes", key_func=login_username_key)
@limiter.limit("10/5 minutes", key_func=login_username_key) # type: ignore[arg-type]
@_traced("api.login")
async def login(request: Request, payload: LoginRequest) -> dict[str, Any]:
_user: Optional[dict[str, Any]] = await get_user_by_username_cached(payload.username)

View File

@@ -114,7 +114,7 @@ def _get_active_connections(pid: int, ports: list[int]) -> list[dict]:
)
async def api_enable_tarpit(
decky_name: str = Path(..., pattern=_DECKY_RE),
req: TarpitEnableRequest = ...,
req: TarpitEnableRequest = ..., # type: ignore[assignment]
admin: dict = Depends(require_admin),
) -> MessageResponse:
try:

View File

@@ -108,7 +108,7 @@ async def get_health(user: dict = Depends(require_viewer)) -> Any:
if _docker_client is None:
_docker_client = await asyncio.to_thread(docker.from_env)
await asyncio.to_thread(_docker_client.ping)
await asyncio.to_thread(_docker_client.ping) # type: ignore[union-attr]
_docker_healthy = True
_docker_detail = ""
except Exception as exc:

View File

@@ -59,7 +59,7 @@ def _db_key(topology_id: str, decky_name: str) -> str:
async def api_enable_tarpit(
topology_id: str = Path(..., pattern=_TOPO_RE),
decky_name: str = Path(..., pattern=_DECKY_RE),
req: TarpitEnableRequest = ...,
req: TarpitEnableRequest = ..., # type: ignore[assignment]
admin: dict = Depends(require_admin),
) -> MessageResponse:
try:

View File

@@ -4,7 +4,7 @@ from __future__ import annotations
import json
import secrets
from datetime import datetime, timezone
from typing import Any
from typing import Any, cast
from fastapi import APIRouter, Depends, HTTPException
@@ -66,7 +66,7 @@ async def api_create_webhook(
req: WebhookCreateRequest,
admin: dict = Depends(require_admin),
) -> WebhookCreateResponse:
patterns = merge_patterns(req.simple_events, req.topic_patterns)
patterns = merge_patterns(cast(list[str], req.simple_events), req.topic_patterns)
if not patterns:
raise HTTPException(
status_code=400,
@@ -188,7 +188,7 @@ async def api_update_webhook(
# to clear all patterns must explicitly pass both as empty lists.
simple = req.simple_events if req.simple_events is not None else []
raw = req.topic_patterns if req.topic_patterns is not None else []
patterns = merge_patterns(simple, raw)
patterns = merge_patterns(cast(list[str], simple), raw)
if not patterns:
raise HTTPException(
status_code=400,