diff --git a/test/testcases/test_http_api/common.py b/test/testcases/test_http_api/common.py index 126ee110f..4d6b4a496 100644 --- a/test/testcases/test_http_api/common.py +++ b/test/testcases/test_http_api/common.py @@ -14,9 +14,11 @@ # limitations under the License. # from pathlib import Path +from typing import Any, Dict, Optional, Union import requests from configs import HOST_ADDRESS, VERSION +from requests.auth import AuthBase from requests_toolbelt import MultipartEncoder from utils.file_utils import create_txt_file @@ -250,38 +252,162 @@ def batch_add_sessions_with_chat_assistant(auth, chat_assistant_id, num): # USER MANAGEMENT -USER_API_URL = f"/{VERSION}/user" +USER_API_URL: str = f"/{VERSION}/user" -def create_user(auth, payload=None, *, headers=HEADERS): - url = f"{HOST_ADDRESS}{USER_API_URL}/create" - res = requests.post(url=url, headers=headers, auth=auth, json=payload) +def create_user( + auth: Union[AuthBase, str, None], + payload: Optional[Dict[str, Any]] = None, + *, + headers: Dict[str, str] = HEADERS, +) -> Dict[str, Any]: + """Create a new user. + + Args: + auth: Authentication object (AuthBase subclass), token string, or None. + payload: Optional JSON payload containing user data. + headers: Optional HTTP headers. Defaults to HEADERS. + + Returns: + JSON response as a dictionary containing the created user data. + + Raises: + requests.RequestException: If the HTTP request fails. + """ + url: str = f"{HOST_ADDRESS}{USER_API_URL}/create" + res: requests.Response = requests.post( + url=url, headers=headers, auth=auth, json=payload + ) return res.json() -def update_user(auth, payload=None, *, headers=HEADERS): - url = f"{HOST_ADDRESS}{USER_API_URL}/update" - res = requests.put(url=url, headers=headers, auth=auth, json=payload) +def update_user( + auth: Union[AuthBase, str, None], + payload: Optional[Dict[str, Any]] = None, + *, + headers: Dict[str, str] = HEADERS, +) -> Dict[str, Any]: + """Update an existing user. + + Args: + auth: Authentication object (AuthBase subclass), token string, or None. + payload: Optional JSON payload containing updated user data. + headers: Optional HTTP headers. Defaults to HEADERS. + + Returns: + JSON response as a dictionary containing the updated user data. + + Raises: + requests.RequestException: If the HTTP request fails. + """ + url: str = f"{HOST_ADDRESS}{USER_API_URL}/update" + res: requests.Response = requests.put( + url=url, headers=headers, auth=auth, json=payload + ) return res.json() -def list_users(auth, params=None, *, headers=HEADERS): - url = f"{HOST_ADDRESS}{USER_API_URL}/list" - res = requests.get(url=url, headers=headers, auth=auth, params=params) +def list_users( + auth: Union[AuthBase, str, None], + params: Optional[Dict[str, Any]] = None, + *, + headers: Dict[str, str] = HEADERS, +) -> Dict[str, Any]: + """List users with optional filtering. + + Args: + auth: Authentication object (AuthBase subclass), token string, or None. + params: Optional query parameters for filtering/pagination. + headers: Optional HTTP headers. Defaults to HEADERS. + + Returns: + JSON response as a dictionary containing the list of users. + + Raises: + requests.RequestException: If the HTTP request fails. + """ + url: str = f"{HOST_ADDRESS}{USER_API_URL}/list" + res: requests.Response = requests.get( + url=url, headers=headers, auth=auth, params=params + ) return res.json() -def delete_user(auth, payload=None, *, headers=HEADERS): - url = f"{HOST_ADDRESS}{USER_API_URL}/delete" - res = requests.delete(url=url, headers=headers, auth=auth, json=payload) +def delete_user( + auth: Union[AuthBase, str, None], + payload: Optional[Dict[str, Any]] = None, + *, + headers: Dict[str, str] = HEADERS, +) -> Dict[str, Any]: + """Delete a user. + + Args: + auth: Authentication object (AuthBase subclass), token string, or None. + payload: Optional JSON payload containing user identification data. + headers: Optional HTTP headers. Defaults to HEADERS. + + Returns: + JSON response as a dictionary containing the deletion result. + + Raises: + requests.RequestException: If the HTTP request fails. + """ + url: str = f"{HOST_ADDRESS}{USER_API_URL}/delete" + res: requests.Response = requests.delete( + url=url, headers=headers, auth=auth, json=payload + ) + return res.json() + + +def get_user_info( + auth: Union[AuthBase, str, None], + *, + headers: Dict[str, str] = HEADERS, +) -> Dict[str, Any]: + """Get the current authenticated user's information. + + Args: + auth: Authentication object (AuthBase subclass), token string, or None. + headers: Optional HTTP headers. Defaults to HEADERS. + + Returns: + JSON response as a dictionary containing the user information. + + Raises: + requests.RequestException: If the HTTP request fails. + """ + url: str = f"{HOST_ADDRESS}{USER_API_URL}/info" + res: requests.Response = requests.get( + url=url, headers=headers, auth=auth + ) return res.json() # TEAM MANAGEMENT -TEAM_API_URL = f"/{VERSION}/tenant" +TEAM_API_URL: str = f"/{VERSION}/tenant" -def create_team(auth, payload=None, *, headers=HEADERS): - url = f"{HOST_ADDRESS}{TEAM_API_URL}/create" - res = requests.post(url=url, headers=headers, auth=auth, json=payload) +def create_team( + auth: Union[AuthBase, str, None], + payload: Optional[Dict[str, Any]] = None, + *, + headers: Dict[str, str] = HEADERS, +) -> Dict[str, Any]: + """Create a new team (tenant). + + Args: + auth: Authentication object (AuthBase subclass), token string, or None. + payload: Optional JSON payload containing team data (e.g., name, user_id). + headers: Optional HTTP headers. Defaults to HEADERS. + + Returns: + JSON response as a dictionary containing the created team data. + + Raises: + requests.RequestException: If the HTTP request fails. + """ + url: str = f"{HOST_ADDRESS}{TEAM_API_URL}/create" + res: requests.Response = requests.post( + url=url, headers=headers, auth=auth, json=payload + ) return res.json() diff --git a/test/testcases/test_http_api/test_user_management/conftest.py b/test/testcases/test_http_api/test_user_management/conftest.py index d2885a351..5e26eedd2 100644 --- a/test/testcases/test_http_api/test_user_management/conftest.py +++ b/test/testcases/test_http_api/test_user_management/conftest.py @@ -13,15 +13,79 @@ # See the License for the specific language governing permissions and # limitations under the License. # +"""Shared fixtures and utilities for user management tests.""" + +from __future__ import annotations + +import base64 +import os +import uuid +from typing import Any + import pytest +from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5 +from Cryptodome.PublicKey import RSA + +from common import get_user_info +from libs.auth import RAGFlowWebApiAuth + + +# --------------------------------------------------------------------------- +# Utility Functions +# --------------------------------------------------------------------------- + + +def encrypt_password(password: str) -> str: + """Encrypt password for API calls without importing from api.utils.crypt. + + Avoids ModuleNotFoundError caused by test helper module named `common`. + + Args: + password: Plain text password to encrypt. + + Returns: + Encrypted password as a base64-encoded string. + """ + current_dir: str = os.path.dirname(os.path.abspath(__file__)) + project_base: str = os.path.abspath( + os.path.join(current_dir, "..", "..", "..", "..") + ) + file_path: str = os.path.join(project_base, "conf", "public.pem") + + with open(file_path, encoding="utf-8") as pem_file: + rsa_key: RSA.RsaKey = RSA.import_key( + pem_file.read(), passphrase="Welcome" + ) + + cipher: Cipher_pkcs1_v1_5.PKCS115_Cipher = Cipher_pkcs1_v1_5.new(rsa_key) + password_base64: str = base64.b64encode(password.encode()).decode() + encrypted_password: bytes = cipher.encrypt(password_base64.encode()) + return base64.b64encode(encrypted_password).decode() + + +def generate_unique_email(prefix: str = "test") -> str: + """Generate a unique email address for testing. + + Args: + prefix: Optional prefix for the email local part. Defaults to "test". + + Returns: + A unique email address string. + """ + return f"{prefix}_{uuid.uuid4().hex[:8]}@example.com" + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- @pytest.fixture(scope="function") def clear_users(request, HttpApiAuth): - """Fixture to clean up users created during tests""" - created_user_ids = [] + """Fixture to clean up users created during tests.""" + created_user_ids: list[str] = [] - def cleanup(): + def cleanup() -> None: # Clean up users if delete endpoint exists # For now, we'll just track them pass @@ -29,3 +93,30 @@ def clear_users(request, HttpApiAuth): request.addfinalizer(cleanup) return created_user_ids + +@pytest.fixture(name="test_user") +def fixture_test_user(WebApiAuth: RAGFlowWebApiAuth) -> dict[str, Any]: + """Get the current authenticated user's information for update tests. + + Since users can only update their own account, we use the authenticated + user's information instead of creating a new user. + + Args: + WebApiAuth: Authenticated user's auth object. + + Returns: + Dictionary containing user_id, email, and original_nickname. + """ + res: dict[str, Any] = get_user_info(WebApiAuth) + assert res["code"] == 0, f"Failed to get user info: {res}" + + user_data: dict[str, Any] = res["data"] + + # Store original nickname for comparison + original_nickname: str = user_data.get("nickname", "") + + return { + "user_id": user_data["id"], + "email": user_data["email"], + "original_nickname": original_nickname, + } diff --git a/test/testcases/test_http_api/test_user_management/test_delete_user.py b/test/testcases/test_http_api/test_user_management/test_delete_user.py index cb15eddfa..f6b1e54c1 100644 --- a/test/testcases/test_http_api/test_user_management/test_delete_user.py +++ b/test/testcases/test_http_api/test_user_management/test_delete_user.py @@ -15,46 +15,24 @@ # from __future__ import annotations -import base64 -import os import uuid from typing import Any import pytest -from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5 -from Cryptodome.PublicKey import RSA from common import create_user, delete_user from configs import INVALID_API_TOKEN from libs.auth import RAGFlowHttpApiAuth, RAGFlowWebApiAuth +# Import from conftest - load it directly to avoid import issues +import importlib.util +from pathlib import Path -# --------------------------------------------------------------------------- -# Utility Functions -# --------------------------------------------------------------------------- - - -def encrypt_password(password: str) -> str: - """ - Encrypt password for API calls without importing from api.utils.crypt. - - Avoids ModuleNotFoundError caused by test helper module named `common`. - """ - current_dir: str = os.path.dirname(os.path.abspath(__file__)) - project_base: str = os.path.abspath( - os.path.join(current_dir, "..", "..", "..", "..") - ) - file_path: str = os.path.join(project_base, "conf", "public.pem") - - with open(file_path, encoding="utf-8") as pem_file: - rsa_key: RSA.RsaKey = RSA.import_key( - pem_file.read(), passphrase="Welcome" - ) - - cipher: Cipher_pkcs1_v1_5.PKCS115_Cipher = Cipher_pkcs1_v1_5.new(rsa_key) - password_base64: str = base64.b64encode(password.encode()).decode() - encrypted_password: bytes = cipher.encrypt(password_base64.encode()) - return base64.b64encode(encrypted_password).decode() +_conftest_path = Path(__file__).parent / "conftest.py" +spec = importlib.util.spec_from_file_location("conftest", _conftest_path) +conftest_module = importlib.util.module_from_spec(spec) +spec.loader.exec_module(conftest_module) +encrypt_password = conftest_module.encrypt_password # --------------------------------------------------------------------------- diff --git a/test/testcases/test_http_api/test_user_management/test_list_user.py b/test/testcases/test_http_api/test_user_management/test_list_user.py index 1d3129ccb..d555e6926 100644 --- a/test/testcases/test_http_api/test_user_management/test_list_user.py +++ b/test/testcases/test_http_api/test_user_management/test_list_user.py @@ -15,46 +15,24 @@ # from __future__ import annotations -import base64 -import os import uuid from typing import Any import pytest -from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5 -from Cryptodome.PublicKey import RSA from common import create_user, list_users from configs import INVALID_API_TOKEN from libs.auth import RAGFlowHttpApiAuth, RAGFlowWebApiAuth +# Import from conftest - load it directly to avoid import issues +import importlib.util +from pathlib import Path -# --------------------------------------------------------------------------- -# Utility Functions -# --------------------------------------------------------------------------- - - -def encrypt_password(password: str) -> str: - """ - Encrypt password for API calls without importing from api.utils.crypt. - - Avoids ModuleNotFoundError caused by test helper module named `common`. - """ - current_dir: str = os.path.dirname(os.path.abspath(__file__)) - project_base: str = os.path.abspath( - os.path.join(current_dir, "..", "..", "..", "..") - ) - file_path: str = os.path.join(project_base, "conf", "public.pem") - - with open(file_path, encoding="utf-8") as pem_file: - rsa_key: RSA.RsaKey = RSA.import_key( - pem_file.read(), passphrase="Welcome" - ) - - cipher: Cipher_pkcs1_v1_5.PKCS115_Cipher = Cipher_pkcs1_v1_5.new(rsa_key) - password_base64: str = base64.b64encode(password.encode()).decode() - encrypted_password: bytes = cipher.encrypt(password_base64.encode()) - return base64.b64encode(encrypted_password).decode() +_conftest_path = Path(__file__).parent / "conftest.py" +spec = importlib.util.spec_from_file_location("conftest", _conftest_path) +conftest_module = importlib.util.module_from_spec(spec) +spec.loader.exec_module(conftest_module) +encrypt_password = conftest_module.encrypt_password # --------------------------------------------------------------------------- diff --git a/test/testcases/test_http_api/test_user_management/test_update_user.py b/test/testcases/test_http_api/test_user_management/test_update_user.py index 127707dbf..6f792f6ae 100644 --- a/test/testcases/test_http_api/test_user_management/test_update_user.py +++ b/test/testcases/test_http_api/test_user_management/test_update_user.py @@ -15,68 +15,25 @@ # from __future__ import annotations -import base64 -import os import uuid from concurrent.futures import Future, ThreadPoolExecutor, as_completed from typing import Any import pytest -from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5 -from Cryptodome.PublicKey import RSA from common import create_user, update_user from configs import INVALID_API_TOKEN from libs.auth import RAGFlowHttpApiAuth, RAGFlowWebApiAuth +# Import from conftest - load it directly to avoid import issues +import importlib.util +from pathlib import Path -# --------------------------------------------------------------------------- -# Utility Functions -# --------------------------------------------------------------------------- - -def encrypt_password(password: str) -> str: - """ - Encrypt password for API calls without importing from api.utils.crypt. - - Avoids ModuleNotFoundError caused by test helper module named `common`. - """ - current_dir: str = os.path.dirname(os.path.abspath(__file__)) - project_base: str = os.path.abspath( - os.path.join(current_dir, "..", "..", "..", "..") - ) - file_path: str = os.path.join(project_base, "conf", "public.pem") - - with open(file_path, encoding="utf-8") as pem_file: - rsa_key: RSA.RsaKey = RSA.import_key(pem_file.read(), passphrase="Welcome") - - cipher: Cipher_pkcs1_v1_5.PKCS115_Cipher = Cipher_pkcs1_v1_5.new(rsa_key) - password_base64: str = base64.b64encode(password.encode()).decode() - encrypted_password: bytes = cipher.encrypt(password_base64.encode()) - return base64.b64encode(encrypted_password).decode() - - -# --------------------------------------------------------------------------- -# Fixtures -# --------------------------------------------------------------------------- - -@pytest.fixture(name="test_user") -def fixture_test_user(WebApiAuth: RAGFlowWebApiAuth) -> dict[str, Any]: - """Create a temporary user for update tests.""" - unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" - payload: dict[str, str] = { - "nickname": "test_user_original", - "email": unique_email, - "password": encrypt_password("test123"), - } - - res: dict[str, Any] = create_user(WebApiAuth, payload) - assert res["code"] == 0, f"Failed to create test user: {res}" - - return { - "user_id": res["data"]["id"], - "email": unique_email, - "original_nickname": "test_user_original", - } +_conftest_path = Path(__file__).parent / "conftest.py" +spec = importlib.util.spec_from_file_location("conftest", _conftest_path) +conftest_module = importlib.util.module_from_spec(spec) +spec.loader.exec_module(conftest_module) +encrypt_password = conftest_module.encrypt_password # ---------------------------------------------------------------------------