diff --git a/api/apps/__init__.py b/api/apps/__init__.py index f2009db2c..e56f6ff61 100644 --- a/api/apps/__init__.py +++ b/api/apps/__init__.py @@ -32,10 +32,13 @@ from api.utils import commands from flask_mail import Mail from flask_session import Session -from flask_login import LoginManager +from flask_login import LoginManager, UserMixin from common import settings -from api.utils.api_utils import server_error_response +from api.utils.api_utils import server_error_response, get_json_result from api.constants import API_VERSION +from flask import request as flask_request +from api.db.db_models import APIToken +from common.constants import RetCode __all__ = ["app"] @@ -142,39 +145,97 @@ client_urls_prefix = [ ] +class DefaultUser(UserMixin): + def __init__(self, tenant_id: str): + self.tenant_id = tenant_id + + +def load_user_from_jwt(authorization): + """ + Load user from JWT token for web UI authentication. + + Args: + authorization: The authorization info + + Returns: + User object if JWT is valid, None otherwise + """ + + try: + jwt = Serializer(secret_key=settings.SECRET_KEY) + access_token = str(jwt.loads(authorization)) + + if not access_token or not access_token.strip(): + logging.warning("Authentication attempt with empty access token") + return None + + # Access tokens should be UUIDs (32 hex characters) + if len(access_token.strip()) < 32: + logging.warning(f"Authentication attempt with invalid token format: {len(access_token)} chars") + return None + + user = UserService.query( + access_token=access_token, status=StatusEnum.VALID.value + ) + if user: + if not user[0].access_token or not user[0].access_token.strip(): + logging.warning(f"User {user[0].email} has empty access_token in database") + return None + return user[0] + else: + return None + except Exception as e: + logging.warning(f"JWT authentication failed: {e}") + return None + + +def load_user_from_api_key(authorization): + """ + Load user from API Key for external API authentication. + + Args: + authorization: The authorization info + + Returns: + User object if API Key is valid, None otherwise + """ + try: + if os.environ.get("DISABLE_SDK"): + return None + authorization_str = flask_request.headers.get("Authorization") + if not authorization_str: + return None + authorization_list = authorization_str.split() + if len(authorization_list) < 2: + return None + token = authorization_list[1] + objs = APIToken.query(token=token) + if not objs: + return None + + default_user = DefaultUser(objs[0].tenant_id) + return default_user + except Exception as e: + logging.warning(f"API Key authentication failed: {e}") + return None + + @login_manager.request_loader def load_user(web_request): - jwt = Serializer(secret_key=settings.SECRET_KEY) authorization = web_request.headers.get("Authorization") - if authorization: - try: - access_token = str(jwt.loads(authorization)) - - if not access_token or not access_token.strip(): - logging.warning("Authentication attempt with empty access token") - return None - - # Access tokens should be UUIDs (32 hex characters) - if len(access_token.strip()) < 32: - logging.warning(f"Authentication attempt with invalid token format: {len(access_token)} chars") - return None - - user = UserService.query( - access_token=access_token, status=StatusEnum.VALID.value - ) - if user: - if not user[0].access_token or not user[0].access_token.strip(): - logging.warning(f"User {user[0].email} has empty access_token in database") - return None - return user[0] - else: - return None - except Exception as e: - logging.warning(f"load_user got exception {e}") - return None - else: + if authorization is None: return None + user = load_user_from_jwt(authorization) + if user: + return user + + user = load_user_from_api_key(authorization) + if user: + return user + + return None + @app.teardown_request def _db_close(exc):