- Rename project to stealergram throughout - Add pyproject.toml (replaces requirements.txt split, folds pytest.ini) - Replace all em-dashes with hyphens across all source files Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
77 lines
2.4 KiB
Python
77 lines
2.4 KiB
Python
"""
|
|
web/auth.py - JWT signing/verification and bcrypt password helpers.
|
|
|
|
Tokens:
|
|
access - HS256, 15 min TTL, payload: {sub, role, type:"access"}
|
|
refresh - HS256, 7 day TTL, payload: {sub, jti, type:"refresh"}
|
|
|
|
Both tokens live in httpOnly SameSite=Strict cookies.
|
|
The `type` claim prevents an access token being used as a refresh token.
|
|
|
|
Secret: WEB_SECRET_KEY env var (required; no hardcoded default).
|
|
"""
|
|
|
|
import os
|
|
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
import bcrypt as _bcrypt_lib
|
|
from jose import JWTError, jwt
|
|
|
|
_SECRET_KEY = os.environ.get("WEB_SECRET_KEY", "")
|
|
_ALGORITHM = "HS256"
|
|
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 15
|
|
REFRESH_TOKEN_EXPIRE_DAYS = 7
|
|
|
|
|
|
def _secret() -> str:
|
|
if not _SECRET_KEY:
|
|
raise RuntimeError("WEB_SECRET_KEY env var is required.")
|
|
return _SECRET_KEY
|
|
|
|
|
|
def hash_password(plain: str) -> str:
|
|
return _bcrypt_lib.hashpw(plain.encode("utf-8"), _bcrypt_lib.gensalt()).decode("utf-8")
|
|
|
|
|
|
def verify_password(plain: str, hashed: str) -> bool:
|
|
return _bcrypt_lib.checkpw(plain.encode("utf-8"), hashed.encode("utf-8"))
|
|
|
|
|
|
def create_access_token(user_id: str, role: str) -> str:
|
|
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
payload = {"sub": user_id, "role": role, "type": "access", "exp": expire}
|
|
return jwt.encode(payload, _secret(), algorithm=_ALGORITHM)
|
|
|
|
|
|
def create_refresh_token(user_id: str) -> tuple[str, str, datetime]:
|
|
"""Returns (encoded_token, jti, expires_at)."""
|
|
jti = str(uuid.uuid4())
|
|
expires_at = datetime.now(timezone.utc) + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
|
|
payload = {"sub": user_id, "jti": jti, "type": "refresh", "exp": expires_at}
|
|
token = jwt.encode(payload, _secret(), algorithm=_ALGORITHM)
|
|
return token, jti, expires_at
|
|
|
|
|
|
def decode_access_token(token: str) -> dict | None:
|
|
"""Returns payload dict or None if invalid/expired."""
|
|
try:
|
|
payload = jwt.decode(token, _secret(), algorithms=[_ALGORITHM])
|
|
if payload.get("type") != "access":
|
|
return None
|
|
return payload
|
|
except JWTError:
|
|
return None
|
|
|
|
|
|
def decode_refresh_token(token: str) -> dict | None:
|
|
"""Returns payload dict or None if invalid/expired."""
|
|
try:
|
|
payload = jwt.decode(token, _secret(), algorithms=[_ALGORITHM])
|
|
if payload.get("type") != "refresh":
|
|
return None
|
|
return payload
|
|
except JWTError:
|
|
return None
|