diff --git a/test/testcases/conftest.py b/test/testcases/conftest.py index 1d9ed32f3..2c206c3b9 100644 --- a/test/testcases/conftest.py +++ b/test/testcases/conftest.py @@ -14,9 +14,18 @@ # limitations under the License. # +import base64 +import os +import time +from typing import Any, Dict + import pytest import requests +from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5 +from Cryptodome.PublicKey import RSA + from configs import EMAIL, HOST_ADDRESS, PASSWORD, VERSION, ZHIPU_AI_API_KEY +from libs.auth import RAGFlowWebApiAuth MARKER_EXPRESSIONS = { "p1": "p1", @@ -50,53 +59,120 @@ def pytest_configure(config: pytest.Config) -> None: print(f"\n[CONFIG] Active test level: {level}") +def encrypt_password(password: str) -> str: + """Encrypt password for API calls. + + Args: + password: Plain text password to encrypt. + + Returns: + Encrypted password as a base64-encoded string. + """ + current_dir: str = os.path.dirname(os.path.abspath(__file__)) + project_base: str = os.path.abspath(os.path.join(current_dir, "..", "..")) + file_path: str = os.path.join(project_base, "conf", "public.pem") + + with open(file_path, encoding="utf-8") as pem_file: + rsa_key: RSA.RsaKey = RSA.import_key(pem_file.read(), passphrase="Welcome") + + cipher: Cipher_pkcs1_v1_5.PKCS115_Cipher = Cipher_pkcs1_v1_5.new(rsa_key) + password_base64: str = base64.b64encode(password.encode()).decode() + encrypted_password: bytes = cipher.encrypt(password_base64.encode()) + return base64.b64encode(encrypted_password).decode() + + def register(): - url = HOST_ADDRESS + f"/{VERSION}/user/register" - name = "qa" - register_data = {"email": EMAIL, "nickname": name, "password": PASSWORD} - res = requests.post(url=url, json=register_data) - res = res.json() + url: str = HOST_ADDRESS + f"/{VERSION}/user/register" + name: str = "qa" + # Encrypt the plain password "123" before sending + plain_password: str = "123" + encrypted_password: str = encrypt_password(plain_password) + register_data = {"email": EMAIL, "nickname": name, "password": encrypted_password} + res: requests.Response = requests.post(url=url, json=register_data) + res: Dict[str, Any] = res.json() if res.get("code") != 0 and "has already registered" not in res.get("message"): raise Exception(res.get("message")) def login(): - url = HOST_ADDRESS + f"/{VERSION}/user/login" - login_data = {"email": EMAIL, "password": PASSWORD} - response = requests.post(url=url, json=login_data) - res = response.json() + url: str = HOST_ADDRESS + f"/{VERSION}/user/login" + # Encrypt the plain password "123" before sending + plain_password: str = "123" + encrypted_password: str = encrypt_password(plain_password) + login_data = {"email": EMAIL, "password": encrypted_password} + response: requests.Response = requests.post(url=url, json=login_data) + res: Dict[str, Any] = response.json() if res.get("code") != 0: raise Exception(res.get("message")) - auth = response.headers["Authorization"] + auth: str = response.headers["Authorization"] return auth +def login_as_user(email: str, password: str) -> RAGFlowWebApiAuth: + """Login as a user and return authentication object. + + Args: + email: User email address. + password: Plain text password. + + Returns: + RAGFlowWebApiAuth object for authenticated requests. + + Raises: + Exception: If login fails. + """ + # Small delay to ensure user creation is committed to database + time.sleep(0.1) + + url: str = HOST_ADDRESS + f"/{VERSION}/user/login" + encrypted_password: str = encrypt_password(password) + login_data: Dict[str, str] = {"email": email, "password": encrypted_password} + response: requests.Response = requests.post(url=url, json=login_data) + res: Dict[str, Any] = response.json() + if res.get("code") != 0: + message: str = str(res.get("message", "Login failed")) + raise Exception(f"Login failed: {message}") + auth_token: str = response.headers.get("Authorization", "") + if not auth_token: + raise Exception("No authorization token received from login response") + return RAGFlowWebApiAuth(auth_token) + + @pytest.fixture(scope="session") def auth(): try: register() except Exception as e: print(e) - auth = login() - return auth + try: + auth: str = login() + return auth + except Exception as e: + error_msg = str(e) + if "Email and password do not match" in error_msg: + raise Exception( + f"Login failed: User {EMAIL} exists but password doesn't match. " + f"Please ensure the user has the correct password or delete the user first." + ) from e + raise @pytest.fixture(scope="session") def token(auth): - url = HOST_ADDRESS + f"/{VERSION}/system/new_token" - auth = {"Authorization": auth} - response = requests.post(url=url, headers=auth) - res = response.json() + url: str = HOST_ADDRESS + f"/{VERSION}/system/new_token" + auth: Dict[str, str] = {"Authorization": auth} + response: requests.Response = requests.post(url=url, headers=auth) + res: Dict[str, Any] = response.json() if res.get("code") != 0: raise Exception(res.get("message")) return res["data"].get("token") -def get_my_llms(auth, name): - url = HOST_ADDRESS + f"/{VERSION}/llm/my_llms" - authorization = {"Authorization": auth} - response = requests.get(url=url, headers=authorization) - res = response.json() +def get_my_llms(auth: str, name: str) -> bool: + url: str = HOST_ADDRESS + f"/{VERSION}/llm/my_llms" + authorization: Dict[str, str] = {"Authorization": auth} + response: requests.Response = requests.get(url=url, headers=authorization) + res: Dict[str, Any] = response.json() if res.get("code") != 0: raise Exception(res.get("message")) if name in res.get("data"): @@ -104,41 +180,41 @@ def get_my_llms(auth, name): return False -def add_models(auth): - url = HOST_ADDRESS + f"/{VERSION}/llm/set_api_key" - authorization = {"Authorization": auth} - models_info = { +def add_models(auth: str): + url: str = HOST_ADDRESS + f"/{VERSION}/llm/set_api_key" + authorization: Dict[str, str] = {"Authorization": auth} + models_info: Dict[str, Dict[str, str]] = { "ZHIPU-AI": {"llm_factory": "ZHIPU-AI", "api_key": ZHIPU_AI_API_KEY}, } for name, model_info in models_info.items(): if not get_my_llms(auth, name): - response = requests.post(url=url, headers=authorization, json=model_info) - res = response.json() + response: requests.Response = requests.post(url=url, headers=authorization, json=model_info) + res: Dict[str, Any] = response.json() if res.get("code") != 0: pytest.exit(f"Critical error in add_models: {res.get('message')}") -def get_tenant_info(auth): - url = HOST_ADDRESS + f"/{VERSION}/user/tenant_info" - authorization = {"Authorization": auth} - response = requests.get(url=url, headers=authorization) - res = response.json() +def get_tenant_info(auth: str) -> str: + url: str = HOST_ADDRESS + f"/{VERSION}/user/tenant_info" + authorization: Dict[str, str] = {"Authorization": auth} + response: requests.Response = requests.get(url=url, headers=authorization) + res: Dict[str, Any] = response.json() if res.get("code") != 0: raise Exception(res.get("message")) return res["data"].get("tenant_id") @pytest.fixture(scope="session", autouse=True) -def set_tenant_info(auth): +def set_tenant_info(auth: str): try: add_models(auth) - tenant_id = get_tenant_info(auth) + tenant_id: str = get_tenant_info(auth) except Exception as e: pytest.exit(f"Error in set_tenant_info: {str(e)}") - url = HOST_ADDRESS + f"/{VERSION}/user/set_tenant_info" - authorization = {"Authorization": auth} - tenant_info = { + url: str = HOST_ADDRESS + f"/{VERSION}/user/set_tenant_info" + authorization: Dict[str, str] = {"Authorization": auth} + tenant_info: Dict[str, Any] = { "tenant_id": tenant_id, "llm_id": "glm-4-flash@ZHIPU-AI", "embd_id": "BAAI/bge-small-en-v1.5@Builtin", @@ -146,7 +222,7 @@ def set_tenant_info(auth): "asr_id": "", "tts_id": None, } - response = requests.post(url=url, headers=authorization, json=tenant_info) - res = response.json() + response: requests.Response = requests.post(url=url, headers=authorization, json=tenant_info) + res: Dict[str, Any] = response.json() if res.get("code") != 0: raise Exception(res.get("message")) diff --git a/test/testcases/test_http_api/common.py b/test/testcases/test_http_api/common.py index d67a47d0a..089e13425 100644 --- a/test/testcases/test_http_api/common.py +++ b/test/testcases/test_http_api/common.py @@ -22,6 +22,14 @@ from requests.auth import AuthBase from requests_toolbelt import MultipartEncoder from utils.file_utils import create_txt_file +# Import login_as_user from root conftest +import importlib.util +_root_conftest_path = Path(__file__).parent.parent / "conftest.py" +_root_spec = importlib.util.spec_from_file_location("root_conftest", _root_conftest_path) +_root_conftest_module = importlib.util.module_from_spec(_root_spec) +_root_spec.loader.exec_module(_root_conftest_module) +login_as_user = _root_conftest_module.login_as_user + HEADERS = {"Content-Type": "application/json"} DATASETS_API_URL = f"/api/{VERSION}/datasets" FILE_API_URL = f"/api/{VERSION}/datasets/{{dataset_id}}/documents" @@ -553,3 +561,83 @@ def remove_department_member( url=url, headers=headers, auth=auth ) return res.json() + + +def update_department( + auth: Union[AuthBase, str, None], + department_id: str, + payload: Optional[Dict[str, Any]] = None, + *, + headers: Dict[str, str] = HEADERS, +) -> Dict[str, Any]: + """Update a department's details. + + Args: + auth: Authentication object (AuthBase subclass), token string, or None. + department_id: The department ID to update. + payload: Optional JSON payload containing update data (e.g., name, description). + headers: Optional HTTP headers. Defaults to HEADERS. + + Returns: + JSON response as a dictionary containing the updated department data. + + Raises: + requests.RequestException: If the HTTP request fails. + """ + url: str = f"{HOST_ADDRESS}{DEPARTMENT_API_URL}/{department_id}" + res: requests.Response = requests.put( + url=url, headers=headers, auth=auth, json=payload + ) + return res.json() + + +def delete_department( + auth: Union[AuthBase, str, None], + department_id: str, + *, + headers: Dict[str, str] = HEADERS, +) -> Dict[str, Any]: + """Delete a department. + + Args: + auth: Authentication object (AuthBase subclass), token string, or None. + department_id: The department ID to delete. + headers: Optional HTTP headers. Defaults to HEADERS. + + Returns: + JSON response as a dictionary containing the deletion result. + + Raises: + requests.RequestException: If the HTTP request fails. + """ + url: str = f"{HOST_ADDRESS}{DEPARTMENT_API_URL}/{department_id}" + res: requests.Response = requests.delete( + url=url, headers=headers, auth=auth + ) + return res.json() + + +def list_department_members( + auth: Union[AuthBase, str, None], + department_id: str, + *, + headers: Dict[str, str] = HEADERS, +) -> Dict[str, Any]: + """List all members in a department. + + Args: + auth: Authentication object (AuthBase subclass), token string, or None. + department_id: The department ID to list members from. + headers: Optional HTTP headers. Defaults to HEADERS. + + Returns: + JSON response as a dictionary containing the list of department members. + + Raises: + requests.RequestException: If the HTTP request fails. + """ + url: str = f"{HOST_ADDRESS}{DEPARTMENT_API_URL}/{department_id}/members" + res: requests.Response = requests.get( + url=url, headers=headers, auth=auth + ) + return res.json()