From 78d3e3a6b989e87391147b4d3c14ed9dc797e0ec Mon Sep 17 00:00:00 2001 From: anti Date: Thu, 30 Apr 2026 22:16:06 -0400 Subject: [PATCH] refactor(auth): hoist _CREDENTIALS_EXCEPTION constant, tighten JWT dependency chain --- decnet/web/dependencies.py | 76 ++++++++++++++++---------------------- 1 file changed, 32 insertions(+), 44 deletions(-) diff --git a/decnet/web/dependencies.py b/decnet/web/dependencies.py index d3f83d29..abb25167 100644 --- a/decnet/web/dependencies.py +++ b/decnet/web/dependencies.py @@ -107,60 +107,48 @@ async def _get_user_cached(user_uuid: str) -> Optional[dict[str, Any]]: return user +_CREDENTIALS_EXCEPTION = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Could not validate credentials", + headers={"WWW-Authenticate": "Bearer"}, +) + + +def _jwt_to_uuid(token: str) -> str: + """Decode a raw JWT string and return the user UUID, or raise 401.""" + try: + payload: dict[str, Any] = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + user_uuid: Optional[str] = payload.get("uuid") + if user_uuid is None: + raise _CREDENTIALS_EXCEPTION + return user_uuid + except jwt.PyJWTError: + raise _CREDENTIALS_EXCEPTION + + +def _bearer_from_header(request: Request) -> Optional[str]: + auth = request.headers.get("Authorization") + if auth and auth.startswith("Bearer "): + return auth.split(" ", 1)[1] + return None + + async def get_stream_user(request: Request, token: Optional[str] = None) -> str: """Auth dependency for SSE endpoints — accepts Bearer header OR ?token= query param. EventSource does not support custom headers, so the query-string fallback is intentional here only. """ - _credentials_exception = HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Could not validate credentials", - headers={"WWW-Authenticate": "Bearer"}, - ) - - auth_header = request.headers.get("Authorization") - resolved: str | None = ( - auth_header.split(" ", 1)[1] - if auth_header and auth_header.startswith("Bearer ") - else token - ) + resolved = _bearer_from_header(request) or token if not resolved: - raise _credentials_exception - - try: - _payload: dict[str, Any] = jwt.decode(resolved, SECRET_KEY, algorithms=[ALGORITHM]) - _user_uuid: Optional[str] = _payload.get("uuid") - if _user_uuid is None: - raise _credentials_exception - return _user_uuid - except jwt.PyJWTError: - raise _credentials_exception + raise _CREDENTIALS_EXCEPTION + return _jwt_to_uuid(resolved) async def _decode_token(request: Request) -> str: """Decode and validate a Bearer JWT, returning the user UUID.""" - _credentials_exception = HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Could not validate credentials", - headers={"WWW-Authenticate": "Bearer"}, - ) - - auth_header = request.headers.get("Authorization") - token: str | None = ( - auth_header.split(" ", 1)[1] - if auth_header and auth_header.startswith("Bearer ") - else None - ) + token = _bearer_from_header(request) if not token: - raise _credentials_exception - - try: - _payload: dict[str, Any] = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) - _user_uuid: Optional[str] = _payload.get("uuid") - if _user_uuid is None: - raise _credentials_exception - return _user_uuid - except jwt.PyJWTError: - raise _credentials_exception + raise _CREDENTIALS_EXCEPTION + return _jwt_to_uuid(token) async def get_current_user(request: Request) -> str: