ragflow/test/testcases/test_http_api/test_user_management/conftest.py
2025-12-01 17:37:36 +05:30

149 lines
4.9 KiB
Python

#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Shared fixtures and utilities for user management tests."""
from __future__ import annotations
import base64
import importlib.util
import os
import uuid
from pathlib import Path
from typing import Any
import pytest
from Crypto.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5
from Crypto.PublicKey import RSA
from common import get_user_info
from libs.auth import RAGFlowWebApiAuth
# ---------------------------------------------------------------------------
# Import helpers from root test configuration
# ---------------------------------------------------------------------------
_root_conftest_path = (
Path(__file__).parent.parent.parent / "conftest.py"
)
_root_spec = importlib.util.spec_from_file_location(
"root_test_conftest", _root_conftest_path
)
_root_conftest_module = importlib.util.module_from_spec(_root_spec)
assert _root_spec.loader is not None
_root_spec.loader.exec_module(_root_conftest_module)
delete_user_from_db = _root_conftest_module.delete_user_from_db
# ---------------------------------------------------------------------------
# Utility Functions
# ---------------------------------------------------------------------------
def encrypt_password(password: str) -> str:
"""Encrypt password for API calls without importing from api.utils.crypt.
Avoids ModuleNotFoundError caused by test helper module named `common`.
Args:
password: Plain text password to encrypt.
Returns:
Encrypted password as a base64-encoded string.
"""
current_dir: str = os.path.dirname(os.path.abspath(__file__))
project_base: str = os.path.abspath(
os.path.join(current_dir, "..", "..", "..", "..")
)
file_path: str = os.path.join(project_base, "conf", "public.pem")
with open(file_path, encoding="utf-8") as pem_file:
rsa_key: RSA.RsaKey = RSA.import_key(
pem_file.read(), passphrase="Welcome"
)
cipher: Cipher_pkcs1_v1_5.PKCS115_Cipher = Cipher_pkcs1_v1_5.new(rsa_key)
password_base64: str = base64.b64encode(password.encode()).decode()
encrypted_password: bytes = cipher.encrypt(password_base64.encode())
return base64.b64encode(encrypted_password).decode()
def generate_unique_email(prefix: str = "test") -> str:
"""Generate a unique email address for testing.
Args:
prefix: Optional prefix for the email local part. Defaults to "test".
Returns:
A unique email address string.
"""
return f"{prefix}_{uuid.uuid4().hex[:8]}@example.com"
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(scope="function")
def clear_users(request: pytest.FixtureRequest) -> list[str]:
"""Fixture to clean up users created during tests.
Tests should append test user *emails* to the returned list. After each
test, we hard-delete those users directly from the database via the
shared `delete_user_from_db` helper in the root test ``conftest.py``.
"""
created_user_emails: list[str] = []
def cleanup() -> None:
for email in created_user_emails:
try:
delete_user_from_db(email)
except Exception as exc: # pragma: no cover - best-effort cleanup
print(
f"[clear_users] Failed to delete test user {email}: {exc}"
)
request.addfinalizer(cleanup)
return created_user_emails
@pytest.fixture(name="test_user")
def fixture_test_user(web_api_auth: RAGFlowWebApiAuth) -> dict[str, Any]:
"""Get the current authenticated user's information for update tests.
Since users can only update their own account, we use the authenticated
user's information instead of creating a new user.
Args:
web_api_auth: Authenticated user's auth object.
Returns:
Dictionary containing user_id, email, and original_nickname.
"""
res: dict[str, Any] = get_user_info(web_api_auth)
assert res["code"] == 0, f"Failed to get user info: {res}"
user_data: dict[str, Any] = res["data"]
# Store original nickname for comparison
original_nickname: str = user_data.get("nickname", "")
return {
"user_id": user_data["id"],
"email": user_data["email"],
"original_nickname": original_nickname,
}