Unify the web UI login and api key authentication

Signed-off-by: Jin Hai <haijin.chn@gmail.com>
This commit is contained in:
Jin Hai 2025-11-07 15:19:39 +08:00
parent fa98cc2bb9
commit 60b5da6c06

View file

@ -32,10 +32,13 @@ from api.utils import commands
from flask_mail import Mail
from flask_session import Session
from flask_login import LoginManager
from flask_login import LoginManager, UserMixin
from common import settings
from api.utils.api_utils import server_error_response
from api.utils.api_utils import server_error_response, get_json_result
from api.constants import API_VERSION
from flask import request as flask_request
from api.db.db_models import APIToken
from common.constants import RetCode
__all__ = ["app"]
@ -142,39 +145,97 @@ client_urls_prefix = [
]
class DefaultUser(UserMixin):
def __init__(self, tenant_id: str):
self.tenant_id = tenant_id
def load_user_from_jwt(authorization):
"""
Load user from JWT token for web UI authentication.
Args:
authorization: The authorization info
Returns:
User object if JWT is valid, None otherwise
"""
try:
jwt = Serializer(secret_key=settings.SECRET_KEY)
access_token = str(jwt.loads(authorization))
if not access_token or not access_token.strip():
logging.warning("Authentication attempt with empty access token")
return None
# Access tokens should be UUIDs (32 hex characters)
if len(access_token.strip()) < 32:
logging.warning(f"Authentication attempt with invalid token format: {len(access_token)} chars")
return None
user = UserService.query(
access_token=access_token, status=StatusEnum.VALID.value
)
if user:
if not user[0].access_token or not user[0].access_token.strip():
logging.warning(f"User {user[0].email} has empty access_token in database")
return None
return user[0]
else:
return None
except Exception as e:
logging.warning(f"JWT authentication failed: {e}")
return None
def load_user_from_api_key(authorization):
"""
Load user from API Key for external API authentication.
Args:
authorization: The authorization info
Returns:
User object if API Key is valid, None otherwise
"""
try:
if os.environ.get("DISABLE_SDK"):
return None
authorization_str = flask_request.headers.get("Authorization")
if not authorization_str:
return None
authorization_list = authorization_str.split()
if len(authorization_list) < 2:
return None
token = authorization_list[1]
objs = APIToken.query(token=token)
if not objs:
return None
default_user = DefaultUser(objs[0].tenant_id)
return default_user
except Exception as e:
logging.warning(f"API Key authentication failed: {e}")
return None
@login_manager.request_loader
def load_user(web_request):
jwt = Serializer(secret_key=settings.SECRET_KEY)
authorization = web_request.headers.get("Authorization")
if authorization:
try:
access_token = str(jwt.loads(authorization))
if not access_token or not access_token.strip():
logging.warning("Authentication attempt with empty access token")
return None
# Access tokens should be UUIDs (32 hex characters)
if len(access_token.strip()) < 32:
logging.warning(f"Authentication attempt with invalid token format: {len(access_token)} chars")
return None
user = UserService.query(
access_token=access_token, status=StatusEnum.VALID.value
)
if user:
if not user[0].access_token or not user[0].access_token.strip():
logging.warning(f"User {user[0].email} has empty access_token in database")
return None
return user[0]
else:
return None
except Exception as e:
logging.warning(f"load_user got exception {e}")
return None
else:
if authorization is None:
return None
user = load_user_from_jwt(authorization)
if user:
return user
user = load_user_from_api_key(authorization)
if user:
return user
return None
@app.teardown_request
def _db_close(exc):