fix(types): P2 — wire _MixinBase + col() across sqlmodel_repo; suppress pydantic/SQLModel column typing false positives

- Add _MixinBase abstract class to _helpers.py: declares _session(),
  _deserialize_attacker(), _assert_pending(), _check_and_bump_version(),
  and list_running_topology_deckies() so mypy can see cross-mixin contracts
- Add _require(val, msg) helper for narrowing T | None → T
- Inherit _MixinBase in all 26 leaf mixin classes
- Wrap SQLAlchemy column method calls (.is_(), .like(), .notin_(), .in_(),
  .contains()) with col() from sqlmodel — fixes attr-defined false positives
  caused by pydantic plugin typing class-level fields as Python value types
- Wrap select(Model.field) with select(col(Model.field)) for column projections
- Add pyproject.toml [[tool.mypy.overrides]] to disable arg-type in
  sqlmodel_repo.*: pydantic plugin resolves .where(Model.field == v) as
  where(bool), a false positive; call-arg still catches real argument errors
- Remove 9 stale # type: ignore comments (logging, helpers, credentials)
- Fix telemetry.py traced() overload no-redef + misc
- Fix logs.py datetime/str operator and nullable PK comparison with col()
- sqlmodel_repo/ now has 0 mypy errors
This commit is contained in:
2026-05-01 00:49:18 -04:00
parent d777a1c4e0
commit 614780f144
30 changed files with 221 additions and 100 deletions

View File

@@ -6,12 +6,15 @@ from datetime import datetime, timezone
from typing import Any, List, Optional
from sqlalchemy import desc, func, or_, select, update
from sqlmodel import col
from sqlmodel.sql.expression import SelectOfScalar
from decnet.web.db.models import Credential
class CredentialsCoreMixin:
from decnet.web.db.sqlmodel_repo._helpers import _MixinBase
class CredentialsCoreMixin(_MixinBase):
async def upsert_credential(self, data: dict[str, Any]) -> int:
"""Upsert a credential attempt; returns the row id.
@@ -37,7 +40,7 @@ class CredentialsCoreMixin:
Credential.secret_sha256 == payload["secret_sha256"],
# NULL == NULL is False under SQL — branch the predicate.
(Credential.principal == principal) if principal is not None
else Credential.principal.is_(None),
else col(Credential.principal).is_(None),
)
existing = (await session.execute(stmt)).scalar_one_or_none()
now = datetime.now(timezone.utc)
@@ -48,7 +51,7 @@ class CredentialsCoreMixin:
existing.outcome = payload["outcome"]
session.add(existing)
await session.commit()
return existing.id # type: ignore[return-value]
return existing.id
row = Credential(
attacker_ip=payload["attacker_ip"],
decky_name=payload["decky_name"],
@@ -84,10 +87,10 @@ class CredentialsCoreMixin:
lk = f"%{search}%"
statement = statement.where(
or_(
Credential.decky_name.like(lk),
Credential.service.like(lk),
Credential.principal.like(lk),
Credential.secret_printable.like(lk),
col(Credential.decky_name).like(lk),
col(Credential.service).like(lk),
col(Credential.principal).like(lk),
col(Credential.secret_printable).like(lk),
)
)
return statement
@@ -188,7 +191,7 @@ class CredentialsCoreMixin:
update(Credential)
.where(
Credential.attacker_ip == attacker_ip,
Credential.attacker_uuid.is_(None),
col(Credential.attacker_uuid).is_(None),
)
.values(attacker_uuid=attacker_uuid)
)

View File

@@ -9,11 +9,14 @@ from datetime import datetime, timezone
from typing import Any, List, Optional
from sqlalchemy import desc, func, select
from sqlmodel import col
from decnet.web.db.models import Credential, CredentialReuse
class CredentialReuseMixin:
from decnet.web.db.sqlmodel_repo._helpers import _MixinBase
class CredentialReuseMixin(_MixinBase):
@staticmethod
def _merge_unique(existing_json: str, value: Optional[str]) -> tuple[str, bool]:
"""Append ``value`` to a JSON list[str] column if not present.
@@ -117,7 +120,7 @@ class CredentialReuseMixin:
Credential.secret_sha256 == secret_sha256,
Credential.secret_kind == secret_kind,
(Credential.principal == principal) if principal is not None
else Credential.principal.is_(None),
else col(Credential.principal).is_(None),
)
)
target_count = (await session.execute(stmt)).scalar() or 0
@@ -150,7 +153,7 @@ class CredentialReuseMixin:
).label("target_count")
async with self._session() as session:
group_stmt = (
select(
select( # type: ignore[call-overload]
Credential.secret_sha256,
Credential.secret_kind,
Credential.principal,
@@ -171,7 +174,7 @@ class CredentialReuseMixin:
Credential.secret_kind == kind,
(Credential.principal == principal)
if principal is not None
else Credential.principal.is_(None),
else col(Credential.principal).is_(None),
)
rows = (await session.execute(cred_stmt)).scalars().all()
out.append({
@@ -253,13 +256,13 @@ class CredentialReuseMixin:
sha_set = {r["secret_sha256"] for r in rows}
if not sha_set:
return
stmt = select(
stmt = select( # type: ignore[call-overload]
Credential.secret_sha256,
Credential.secret_kind,
Credential.principal,
Credential.secret_printable,
Credential.secret_b64,
).where(Credential.secret_sha256.in_(sha_set))
).where(col(Credential.secret_sha256).in_(sha_set))
secret_map: dict[
tuple[str, str, Optional[str]],
tuple[Optional[str], Optional[str]],