Files
DECNET/decnet/web/router/auth/api_login.py
anti 698ecaa322 feat(auth): jti claim and token-revocation store
Stateless JWTs had no revocation path: a stolen token stayed valid for
its full 24h even after the victim changed their password, and there was
no logout. This lays the foundation for revoking them.

- User.tokens_valid_from: per-user bulk-revocation cutoff (compared against
  the token's iat). RevokedToken(jti PK, exp): single-token denylist, pruned
  opportunistically on insert so it never outgrows live-but-revoked tokens.
- login() now mints a jti; create_access_token already stamps iat/exp.
- repo.revoke_token / is_token_revoked / set_tokens_valid_from (abstract +
  shared sqlmodel impl + DummyRepo coverage stubs).
- Centralized validate path in dependencies.py: every auth dependency now
  resolves the user and fails closed on (1) missing jti (legacy/pre-deploy
  token -> one forced re-login), (2) iat before the cutoff, (3) a denylisted
  jti. Denylist lookups ride a 10s membership cache mirroring the user cache.
- Contract/fuzz harness seeds its fixed-uuid principal under
  DECNET_CONTRACT_TEST so its minted token resolves to a live admin user.
2026-05-30 18:18:41 -04:00

67 lines
2.7 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
from datetime import timedelta
from typing import Any, Optional
from uuid import uuid4
from fastapi import APIRouter, HTTPException, Request, status
from decnet.telemetry import traced as _traced
from decnet.web.auth import (
ACCESS_TOKEN_EXPIRE_MINUTES,
averify_password,
create_access_token,
)
from decnet.web.dependencies import get_user_by_username_cached
from decnet.web.db.models import LoginRequest, Token
from decnet.web.limiter import limiter, login_ip_key, login_username_key
router = APIRouter()
# Two independent buckets, tripping either → 429:
#
# - per-IP (login_ip_key): catches a botnet hitting one account.
# - per-user (login_username_key): catches distributed credential
# stuffing against one account.
#
# Limits: 10 attempts per 5 minutes per bucket. Buckets are process-local
# (memory://); see decnet/web/limiter.py for the rationale. Buckets do
# NOT reset on successful login — a legitimate user tripping the limit
# via fat-fingering will need to wait the window out. 10 tries is
# generous; a rolling window naturally drains.
@router.post(
"/auth/login",
response_model=Token,
tags=["Authentication"],
responses={
400: {"description": "Bad Request (e.g. malformed JSON)"},
401: {"description": "Incorrect username or password"},
422: {"description": "Validation error"},
429: {"description": "Too many login attempts — retry after the window resets"},
},
)
@limiter.limit("10/5 minutes", key_func=login_ip_key)
@limiter.limit("10/5 minutes", key_func=login_username_key) # type: ignore[arg-type]
@_traced("api.login")
async def login(request: Request, payload: LoginRequest) -> dict[str, Any]:
_user: Optional[dict[str, Any]] = await get_user_by_username_cached(payload.username)
if not _user or not await averify_password(payload.password, _user["password_hash"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
_access_token_expires: timedelta = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
# Token uses uuid instead of sub; jti is the per-token id the denylist
# keys on (logout). create_access_token stamps exp + iat.
_access_token: str = create_access_token(
data={"uuid": _user["uuid"], "jti": uuid4().hex},
expires_delta=_access_token_expires,
)
return {
"access_token": _access_token,
"token_type": "bearer", # nosec B105 — OAuth2 token type, not a password
"must_change_password": bool(_user.get("must_change_password", False))
}