refactor(auth): hoist _CREDENTIALS_EXCEPTION constant, tighten JWT dependency chain

This commit is contained in:
2026-04-30 22:16:06 -04:00
parent 0b5228eb94
commit 78d3e3a6b9

View File

@@ -107,60 +107,48 @@ async def _get_user_cached(user_uuid: str) -> Optional[dict[str, Any]]:
return user
_CREDENTIALS_EXCEPTION = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
def _jwt_to_uuid(token: str) -> str:
"""Decode a raw JWT string and return the user UUID, or raise 401."""
try:
payload: dict[str, Any] = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_uuid: Optional[str] = payload.get("uuid")
if user_uuid is None:
raise _CREDENTIALS_EXCEPTION
return user_uuid
except jwt.PyJWTError:
raise _CREDENTIALS_EXCEPTION
def _bearer_from_header(request: Request) -> Optional[str]:
auth = request.headers.get("Authorization")
if auth and auth.startswith("Bearer "):
return auth.split(" ", 1)[1]
return None
async def get_stream_user(request: Request, token: Optional[str] = None) -> str:
"""Auth dependency for SSE endpoints — accepts Bearer header OR ?token= query param.
EventSource does not support custom headers, so the query-string fallback is intentional here only.
"""
_credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
auth_header = request.headers.get("Authorization")
resolved: str | None = (
auth_header.split(" ", 1)[1]
if auth_header and auth_header.startswith("Bearer ")
else token
)
resolved = _bearer_from_header(request) or token
if not resolved:
raise _credentials_exception
try:
_payload: dict[str, Any] = jwt.decode(resolved, SECRET_KEY, algorithms=[ALGORITHM])
_user_uuid: Optional[str] = _payload.get("uuid")
if _user_uuid is None:
raise _credentials_exception
return _user_uuid
except jwt.PyJWTError:
raise _credentials_exception
raise _CREDENTIALS_EXCEPTION
return _jwt_to_uuid(resolved)
async def _decode_token(request: Request) -> str:
"""Decode and validate a Bearer JWT, returning the user UUID."""
_credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
auth_header = request.headers.get("Authorization")
token: str | None = (
auth_header.split(" ", 1)[1]
if auth_header and auth_header.startswith("Bearer ")
else None
)
token = _bearer_from_header(request)
if not token:
raise _credentials_exception
try:
_payload: dict[str, Any] = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
_user_uuid: Optional[str] = _payload.get("uuid")
if _user_uuid is None:
raise _credentials_exception
return _user_uuid
except jwt.PyJWTError:
raise _credentials_exception
raise _CREDENTIALS_EXCEPTION
return _jwt_to_uuid(token)
async def get_current_user(request: Request) -> str: