[OND211-2329]: Updated the user tests to follow typings and shifted reusable variables and funtions to the conftest.py file.

This commit is contained in:
Hetavi Shah 2025-11-14 18:59:27 +05:30
parent a80b2ff529
commit 113620ea80
5 changed files with 261 additions and 131 deletions

View file

@ -14,9 +14,11 @@
# limitations under the License. # limitations under the License.
# #
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Optional, Union
import requests import requests
from configs import HOST_ADDRESS, VERSION from configs import HOST_ADDRESS, VERSION
from requests.auth import AuthBase
from requests_toolbelt import MultipartEncoder from requests_toolbelt import MultipartEncoder
from utils.file_utils import create_txt_file from utils.file_utils import create_txt_file
@ -250,38 +252,162 @@ def batch_add_sessions_with_chat_assistant(auth, chat_assistant_id, num):
# USER MANAGEMENT # USER MANAGEMENT
USER_API_URL = f"/{VERSION}/user" USER_API_URL: str = f"/{VERSION}/user"
def create_user(auth, payload=None, *, headers=HEADERS): def create_user(
url = f"{HOST_ADDRESS}{USER_API_URL}/create" auth: Union[AuthBase, str, None],
res = requests.post(url=url, headers=headers, auth=auth, json=payload) payload: Optional[Dict[str, Any]] = None,
*,
headers: Dict[str, str] = HEADERS,
) -> Dict[str, Any]:
"""Create a new user.
Args:
auth: Authentication object (AuthBase subclass), token string, or None.
payload: Optional JSON payload containing user data.
headers: Optional HTTP headers. Defaults to HEADERS.
Returns:
JSON response as a dictionary containing the created user data.
Raises:
requests.RequestException: If the HTTP request fails.
"""
url: str = f"{HOST_ADDRESS}{USER_API_URL}/create"
res: requests.Response = requests.post(
url=url, headers=headers, auth=auth, json=payload
)
return res.json() return res.json()
def update_user(auth, payload=None, *, headers=HEADERS): def update_user(
url = f"{HOST_ADDRESS}{USER_API_URL}/update" auth: Union[AuthBase, str, None],
res = requests.put(url=url, headers=headers, auth=auth, json=payload) payload: Optional[Dict[str, Any]] = None,
*,
headers: Dict[str, str] = HEADERS,
) -> Dict[str, Any]:
"""Update an existing user.
Args:
auth: Authentication object (AuthBase subclass), token string, or None.
payload: Optional JSON payload containing updated user data.
headers: Optional HTTP headers. Defaults to HEADERS.
Returns:
JSON response as a dictionary containing the updated user data.
Raises:
requests.RequestException: If the HTTP request fails.
"""
url: str = f"{HOST_ADDRESS}{USER_API_URL}/update"
res: requests.Response = requests.put(
url=url, headers=headers, auth=auth, json=payload
)
return res.json() return res.json()
def list_users(auth, params=None, *, headers=HEADERS): def list_users(
url = f"{HOST_ADDRESS}{USER_API_URL}/list" auth: Union[AuthBase, str, None],
res = requests.get(url=url, headers=headers, auth=auth, params=params) params: Optional[Dict[str, Any]] = None,
*,
headers: Dict[str, str] = HEADERS,
) -> Dict[str, Any]:
"""List users with optional filtering.
Args:
auth: Authentication object (AuthBase subclass), token string, or None.
params: Optional query parameters for filtering/pagination.
headers: Optional HTTP headers. Defaults to HEADERS.
Returns:
JSON response as a dictionary containing the list of users.
Raises:
requests.RequestException: If the HTTP request fails.
"""
url: str = f"{HOST_ADDRESS}{USER_API_URL}/list"
res: requests.Response = requests.get(
url=url, headers=headers, auth=auth, params=params
)
return res.json() return res.json()
def delete_user(auth, payload=None, *, headers=HEADERS): def delete_user(
url = f"{HOST_ADDRESS}{USER_API_URL}/delete" auth: Union[AuthBase, str, None],
res = requests.delete(url=url, headers=headers, auth=auth, json=payload) payload: Optional[Dict[str, Any]] = None,
*,
headers: Dict[str, str] = HEADERS,
) -> Dict[str, Any]:
"""Delete a user.
Args:
auth: Authentication object (AuthBase subclass), token string, or None.
payload: Optional JSON payload containing user identification data.
headers: Optional HTTP headers. Defaults to HEADERS.
Returns:
JSON response as a dictionary containing the deletion result.
Raises:
requests.RequestException: If the HTTP request fails.
"""
url: str = f"{HOST_ADDRESS}{USER_API_URL}/delete"
res: requests.Response = requests.delete(
url=url, headers=headers, auth=auth, json=payload
)
return res.json()
def get_user_info(
auth: Union[AuthBase, str, None],
*,
headers: Dict[str, str] = HEADERS,
) -> Dict[str, Any]:
"""Get the current authenticated user's information.
Args:
auth: Authentication object (AuthBase subclass), token string, or None.
headers: Optional HTTP headers. Defaults to HEADERS.
Returns:
JSON response as a dictionary containing the user information.
Raises:
requests.RequestException: If the HTTP request fails.
"""
url: str = f"{HOST_ADDRESS}{USER_API_URL}/info"
res: requests.Response = requests.get(
url=url, headers=headers, auth=auth
)
return res.json() return res.json()
# TEAM MANAGEMENT # TEAM MANAGEMENT
TEAM_API_URL = f"/{VERSION}/tenant" TEAM_API_URL: str = f"/{VERSION}/tenant"
def create_team(auth, payload=None, *, headers=HEADERS): def create_team(
url = f"{HOST_ADDRESS}{TEAM_API_URL}/create" auth: Union[AuthBase, str, None],
res = requests.post(url=url, headers=headers, auth=auth, json=payload) payload: Optional[Dict[str, Any]] = None,
*,
headers: Dict[str, str] = HEADERS,
) -> Dict[str, Any]:
"""Create a new team (tenant).
Args:
auth: Authentication object (AuthBase subclass), token string, or None.
payload: Optional JSON payload containing team data (e.g., name, user_id).
headers: Optional HTTP headers. Defaults to HEADERS.
Returns:
JSON response as a dictionary containing the created team data.
Raises:
requests.RequestException: If the HTTP request fails.
"""
url: str = f"{HOST_ADDRESS}{TEAM_API_URL}/create"
res: requests.Response = requests.post(
url=url, headers=headers, auth=auth, json=payload
)
return res.json() return res.json()

View file

@ -13,15 +13,79 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# #
"""Shared fixtures and utilities for user management tests."""
from __future__ import annotations
import base64
import os
import uuid
from typing import Any
import pytest import pytest
from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5
from Cryptodome.PublicKey import RSA
from common import get_user_info
from libs.auth import RAGFlowWebApiAuth
# ---------------------------------------------------------------------------
# Utility Functions
# ---------------------------------------------------------------------------
def encrypt_password(password: str) -> str:
"""Encrypt password for API calls without importing from api.utils.crypt.
Avoids ModuleNotFoundError caused by test helper module named `common`.
Args:
password: Plain text password to encrypt.
Returns:
Encrypted password as a base64-encoded string.
"""
current_dir: str = os.path.dirname(os.path.abspath(__file__))
project_base: str = os.path.abspath(
os.path.join(current_dir, "..", "..", "..", "..")
)
file_path: str = os.path.join(project_base, "conf", "public.pem")
with open(file_path, encoding="utf-8") as pem_file:
rsa_key: RSA.RsaKey = RSA.import_key(
pem_file.read(), passphrase="Welcome"
)
cipher: Cipher_pkcs1_v1_5.PKCS115_Cipher = Cipher_pkcs1_v1_5.new(rsa_key)
password_base64: str = base64.b64encode(password.encode()).decode()
encrypted_password: bytes = cipher.encrypt(password_base64.encode())
return base64.b64encode(encrypted_password).decode()
def generate_unique_email(prefix: str = "test") -> str:
"""Generate a unique email address for testing.
Args:
prefix: Optional prefix for the email local part. Defaults to "test".
Returns:
A unique email address string.
"""
return f"{prefix}_{uuid.uuid4().hex[:8]}@example.com"
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def clear_users(request, HttpApiAuth): def clear_users(request, HttpApiAuth):
"""Fixture to clean up users created during tests""" """Fixture to clean up users created during tests."""
created_user_ids = [] created_user_ids: list[str] = []
def cleanup(): def cleanup() -> None:
# Clean up users if delete endpoint exists # Clean up users if delete endpoint exists
# For now, we'll just track them # For now, we'll just track them
pass pass
@ -29,3 +93,30 @@ def clear_users(request, HttpApiAuth):
request.addfinalizer(cleanup) request.addfinalizer(cleanup)
return created_user_ids return created_user_ids
@pytest.fixture(name="test_user")
def fixture_test_user(WebApiAuth: RAGFlowWebApiAuth) -> dict[str, Any]:
"""Get the current authenticated user's information for update tests.
Since users can only update their own account, we use the authenticated
user's information instead of creating a new user.
Args:
WebApiAuth: Authenticated user's auth object.
Returns:
Dictionary containing user_id, email, and original_nickname.
"""
res: dict[str, Any] = get_user_info(WebApiAuth)
assert res["code"] == 0, f"Failed to get user info: {res}"
user_data: dict[str, Any] = res["data"]
# Store original nickname for comparison
original_nickname: str = user_data.get("nickname", "")
return {
"user_id": user_data["id"],
"email": user_data["email"],
"original_nickname": original_nickname,
}

View file

@ -15,46 +15,24 @@
# #
from __future__ import annotations from __future__ import annotations
import base64
import os
import uuid import uuid
from typing import Any from typing import Any
import pytest import pytest
from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5
from Cryptodome.PublicKey import RSA
from common import create_user, delete_user from common import create_user, delete_user
from configs import INVALID_API_TOKEN from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowHttpApiAuth, RAGFlowWebApiAuth from libs.auth import RAGFlowHttpApiAuth, RAGFlowWebApiAuth
# Import from conftest - load it directly to avoid import issues
import importlib.util
from pathlib import Path
# --------------------------------------------------------------------------- _conftest_path = Path(__file__).parent / "conftest.py"
# Utility Functions spec = importlib.util.spec_from_file_location("conftest", _conftest_path)
# --------------------------------------------------------------------------- conftest_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(conftest_module)
encrypt_password = conftest_module.encrypt_password
def encrypt_password(password: str) -> str:
"""
Encrypt password for API calls without importing from api.utils.crypt.
Avoids ModuleNotFoundError caused by test helper module named `common`.
"""
current_dir: str = os.path.dirname(os.path.abspath(__file__))
project_base: str = os.path.abspath(
os.path.join(current_dir, "..", "..", "..", "..")
)
file_path: str = os.path.join(project_base, "conf", "public.pem")
with open(file_path, encoding="utf-8") as pem_file:
rsa_key: RSA.RsaKey = RSA.import_key(
pem_file.read(), passphrase="Welcome"
)
cipher: Cipher_pkcs1_v1_5.PKCS115_Cipher = Cipher_pkcs1_v1_5.new(rsa_key)
password_base64: str = base64.b64encode(password.encode()).decode()
encrypted_password: bytes = cipher.encrypt(password_base64.encode())
return base64.b64encode(encrypted_password).decode()
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------

View file

@ -15,46 +15,24 @@
# #
from __future__ import annotations from __future__ import annotations
import base64
import os
import uuid import uuid
from typing import Any from typing import Any
import pytest import pytest
from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5
from Cryptodome.PublicKey import RSA
from common import create_user, list_users from common import create_user, list_users
from configs import INVALID_API_TOKEN from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowHttpApiAuth, RAGFlowWebApiAuth from libs.auth import RAGFlowHttpApiAuth, RAGFlowWebApiAuth
# Import from conftest - load it directly to avoid import issues
import importlib.util
from pathlib import Path
# --------------------------------------------------------------------------- _conftest_path = Path(__file__).parent / "conftest.py"
# Utility Functions spec = importlib.util.spec_from_file_location("conftest", _conftest_path)
# --------------------------------------------------------------------------- conftest_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(conftest_module)
encrypt_password = conftest_module.encrypt_password
def encrypt_password(password: str) -> str:
"""
Encrypt password for API calls without importing from api.utils.crypt.
Avoids ModuleNotFoundError caused by test helper module named `common`.
"""
current_dir: str = os.path.dirname(os.path.abspath(__file__))
project_base: str = os.path.abspath(
os.path.join(current_dir, "..", "..", "..", "..")
)
file_path: str = os.path.join(project_base, "conf", "public.pem")
with open(file_path, encoding="utf-8") as pem_file:
rsa_key: RSA.RsaKey = RSA.import_key(
pem_file.read(), passphrase="Welcome"
)
cipher: Cipher_pkcs1_v1_5.PKCS115_Cipher = Cipher_pkcs1_v1_5.new(rsa_key)
password_base64: str = base64.b64encode(password.encode()).decode()
encrypted_password: bytes = cipher.encrypt(password_base64.encode())
return base64.b64encode(encrypted_password).decode()
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------

View file

@ -15,68 +15,25 @@
# #
from __future__ import annotations from __future__ import annotations
import base64
import os
import uuid import uuid
from concurrent.futures import Future, ThreadPoolExecutor, as_completed from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from typing import Any from typing import Any
import pytest import pytest
from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5
from Cryptodome.PublicKey import RSA
from common import create_user, update_user from common import create_user, update_user
from configs import INVALID_API_TOKEN from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowHttpApiAuth, RAGFlowWebApiAuth from libs.auth import RAGFlowHttpApiAuth, RAGFlowWebApiAuth
# Import from conftest - load it directly to avoid import issues
import importlib.util
from pathlib import Path
# --------------------------------------------------------------------------- _conftest_path = Path(__file__).parent / "conftest.py"
# Utility Functions spec = importlib.util.spec_from_file_location("conftest", _conftest_path)
# --------------------------------------------------------------------------- conftest_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(conftest_module)
def encrypt_password(password: str) -> str: encrypt_password = conftest_module.encrypt_password
"""
Encrypt password for API calls without importing from api.utils.crypt.
Avoids ModuleNotFoundError caused by test helper module named `common`.
"""
current_dir: str = os.path.dirname(os.path.abspath(__file__))
project_base: str = os.path.abspath(
os.path.join(current_dir, "..", "..", "..", "..")
)
file_path: str = os.path.join(project_base, "conf", "public.pem")
with open(file_path, encoding="utf-8") as pem_file:
rsa_key: RSA.RsaKey = RSA.import_key(pem_file.read(), passphrase="Welcome")
cipher: Cipher_pkcs1_v1_5.PKCS115_Cipher = Cipher_pkcs1_v1_5.new(rsa_key)
password_base64: str = base64.b64encode(password.encode()).decode()
encrypted_password: bytes = cipher.encrypt(password_base64.encode())
return base64.b64encode(encrypted_password).decode()
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(name="test_user")
def fixture_test_user(WebApiAuth: RAGFlowWebApiAuth) -> dict[str, Any]:
"""Create a temporary user for update tests."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
payload: dict[str, str] = {
"nickname": "test_user_original",
"email": unique_email,
"password": encrypt_password("test123"),
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
assert res["code"] == 0, f"Failed to create test user: {res}"
return {
"user_id": res["data"]["id"],
"email": unique_email,
"original_nickname": "test_user_original",
}
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------