From f307a18f55307bf2dee862c35a4ec72e5e4dd045 Mon Sep 17 00:00:00 2001 From: Hetavi Shah Date: Tue, 25 Nov 2025 13:19:17 +0530 Subject: [PATCH] [OND211-2329]: Updated user tests to remove test data from db. --- .../test_user_management/conftest.py | 41 ++++++++++-- .../test_user_management/test_create_user.py | 55 ++++++++++++---- .../test_user_management/test_delete_user.py | 16 ++++- .../test_user_management/test_list_user.py | 46 ++++++++++--- .../test_user_management/test_update_user.py | 59 +++++++++++++---- .../test_user_edge_cases.py | 65 ++++++++++++++----- .../test_user_performance.py | 49 +++++++++++--- .../test_user_security.py | 45 +++++++++---- 8 files changed, 294 insertions(+), 82 deletions(-) diff --git a/test/testcases/test_http_api/test_user_management/conftest.py b/test/testcases/test_http_api/test_user_management/conftest.py index 7bdbf3167..e6154bc77 100644 --- a/test/testcases/test_http_api/test_user_management/conftest.py +++ b/test/testcases/test_http_api/test_user_management/conftest.py @@ -20,8 +20,10 @@ from __future__ import annotations import base64 import os import uuid +from pathlib import Path from typing import Any +import importlib.util import pytest from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5 from Cryptodome.PublicKey import RSA @@ -30,6 +32,22 @@ from common import get_user_info from libs.auth import RAGFlowWebApiAuth +# --------------------------------------------------------------------------- +# Import helpers from root test configuration +# --------------------------------------------------------------------------- + +_root_conftest_path = ( + Path(__file__).parent.parent.parent / "conftest.py" +) +_root_spec = importlib.util.spec_from_file_location( + "root_test_conftest", _root_conftest_path +) +_root_conftest_module = importlib.util.module_from_spec(_root_spec) +assert _root_spec.loader is not None +_root_spec.loader.exec_module(_root_conftest_module) +delete_user_from_db = _root_conftest_module.delete_user_from_db + + # --------------------------------------------------------------------------- # Utility Functions # --------------------------------------------------------------------------- @@ -81,17 +99,26 @@ def generate_unique_email(prefix: str = "test") -> str: @pytest.fixture(scope="function") -def clear_users(request, http_api_auth): - """Fixture to clean up users created during tests.""" - created_user_ids: list[str] = [] +def clear_users(request: pytest.FixtureRequest) -> list[str]: + """Fixture to clean up users created during tests. + + Tests should append test user *emails* to the returned list. After each + test, we hard-delete those users directly from the database via the + shared `delete_user_from_db` helper in the root test ``conftest.py``. + """ + created_user_emails: list[str] = [] def cleanup() -> None: - # Clean up users if delete endpoint exists - # For now, we'll just track them - pass + for email in created_user_emails: + try: + delete_user_from_db(email) + except Exception as exc: # pragma: no cover - best-effort cleanup + print( + f"[clear_users] Failed to delete test user {email}: {exc}" + ) request.addfinalizer(cleanup) - return created_user_ids + return created_user_emails @pytest.fixture(name="test_user") diff --git a/test/testcases/test_http_api/test_user_management/test_create_user.py b/test/testcases/test_http_api/test_user_management/test_create_user.py index c9c891bbf..68cbfe0f9 100644 --- a/test/testcases/test_http_api/test_user_management/test_create_user.py +++ b/test/testcases/test_http_api/test_user_management/test_create_user.py @@ -49,7 +49,12 @@ class TestAuthorization: ), ], ) - def test_invalid_auth(self, invalid_auth, expected_code, expected_message): + def test_invalid_auth( + self, + invalid_auth: RAGFlowWebApiAuth | None, + expected_code: int, + expected_message: str, + ) -> None: """Test user creation with invalid or missing authentication.""" # Use unique email to avoid conflicts unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -132,7 +137,8 @@ class TestUserCreate: ) def test_required_fields( self, - web_api_auth, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], payload: dict[str, Any], expected_code: int, expected_message: str, @@ -145,6 +151,7 @@ class TestUserCreate: res: dict[str, Any] = create_user(web_api_auth, payload) assert res["code"] == expected_code, res if expected_code == 0: + clear_users.append(payload["email"]) assert res["data"]["nickname"] == payload["nickname"] assert res["data"]["email"] == payload["email"] else: @@ -167,7 +174,8 @@ class TestUserCreate: ) def test_email_validation( self, - web_api_auth, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], email: str, expected_code: int, expected_message: str, @@ -184,6 +192,7 @@ class TestUserCreate: res: dict[str, Any] = create_user(web_api_auth, payload) assert res["code"] == expected_code, res if expected_code == 0: + clear_users.append(email) assert res["data"]["email"] == email else: assert expected_message in res["message"] @@ -201,7 +210,8 @@ class TestUserCreate: ) def test_nickname( self, - web_api_auth, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], nickname: str, expected_code: int, expected_message: str, @@ -216,13 +226,16 @@ class TestUserCreate: res: dict[str, Any] = create_user(web_api_auth, payload) assert res["code"] == expected_code, res if expected_code == 0: + clear_users.append(unique_email) assert res["data"]["nickname"] == nickname else: assert expected_message in res["message"] @pytest.mark.p1 def test_duplicate_email( - self, web_api_auth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test that creating a user with duplicate email fails.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -233,8 +246,10 @@ class TestUserCreate: } res: dict[str, Any] = create_user(web_api_auth, payload) assert res["code"] == 0 + # Track the created user for cleanup + clear_users.append(unique_email) - # Try to create another user with the same email + # Try to create another user with the same email (should fail) payload2: dict[str, str] = { "nickname": "test_user_2", "email": unique_email, @@ -255,7 +270,8 @@ class TestUserCreate: ) def test_is_superuser( self, - web_api_auth, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], is_superuser: bool | None, expected_value: bool, ) -> None: @@ -271,11 +287,14 @@ class TestUserCreate: res: dict[str, Any] = create_user(web_api_auth, payload) assert res["code"] == 0 + clear_users.append(unique_email) assert res["data"]["is_superuser"] == expected_value @pytest.mark.p2 def test_password_hashing( - self, web_api_auth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test that password is properly hashed when stored.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -287,6 +306,7 @@ class TestUserCreate: } res: dict[str, Any] = create_user(web_api_auth, payload) assert res["code"] == 0 + clear_users.append(unique_email) # Password should be hashed in the response (not plain text) assert "password" in res["data"], ( f"Password field not found in response: {res['data'].keys()}" @@ -299,7 +319,9 @@ class TestUserCreate: @pytest.mark.p2 def test_plain_text_password_accepted( - self, web_api_auth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test that plain text password is accepted.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -311,23 +333,29 @@ class TestUserCreate: res: dict[str, Any] = create_user(web_api_auth, payload) # Should succeed with plain text password assert res["code"] == 0 + clear_users.append(unique_email) assert res["data"]["email"] == unique_email @pytest.mark.p3 def test_concurrent_create( - self, web_api_auth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test concurrent user creation with multiple threads.""" count: int = 10 with ThreadPoolExecutor(max_workers=5) as executor: futures: list[Future[dict[str, Any]]] = [] for i in range(count): - unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" + unique_email: str = ( + f"test_{uuid.uuid4().hex[:8]}@example.com" + ) payload: dict[str, str] = { "nickname": f"test_user_{i}", "email": unique_email, "password": "test123", } + clear_users.append(unique_email) futures.append( executor.submit(create_user, web_api_auth, payload) ) @@ -341,7 +369,9 @@ class TestUserCreate: @pytest.mark.p2 def test_user_creation_response_structure( - self, web_api_auth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test that user creation returns the expected response structure.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -352,6 +382,7 @@ class TestUserCreate: } res: dict[str, Any] = create_user(web_api_auth, payload) assert res["code"] == 0 + clear_users.append(unique_email) assert "data" in res assert "id" in res["data"] assert "email" in res["data"] diff --git a/test/testcases/test_http_api/test_user_management/test_delete_user.py b/test/testcases/test_http_api/test_user_management/test_delete_user.py index ce9563f12..7bfcdb0e4 100644 --- a/test/testcases/test_http_api/test_user_management/test_delete_user.py +++ b/test/testcases/test_http_api/test_user_management/test_delete_user.py @@ -20,7 +20,7 @@ from typing import Any import pytest -from ..common import create_user, delete_user +from common import create_user, delete_user from configs import INVALID_API_TOKEN from libs.auth import RAGFlowHttpApiAuth, RAGFlowWebApiAuth @@ -41,6 +41,7 @@ encrypt_password = conftest_module.encrypt_password @pytest.mark.p1 +@pytest.mark.usefixtures("clear_users") class TestAuthorization: """Tests for authentication behavior during user deletion.""" @@ -58,6 +59,7 @@ class TestAuthorization: expected_code: int, expected_message: str, web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test user deletion with invalid or missing authentication.""" # Create a test user first @@ -67,10 +69,14 @@ class TestAuthorization: "email": unique_email, "password": encrypt_password("test123"), } - create_res: dict[str, Any] = create_user(web_api_auth, create_payload) + create_res: dict[str, Any] = create_user( + web_api_auth, create_payload + ) if create_res["code"] != 0: pytest.skip("User creation failed, skipping auth test") + clear_users.append(unique_email) + user_id: str = create_res["data"]["id"] # Try to delete with invalid auth @@ -84,6 +90,7 @@ class TestAuthorization: def test_user_can_only_delete_themselves( self, web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test that users can only delete their own account.""" # Create another user @@ -93,9 +100,12 @@ class TestAuthorization: "email": unique_email, "password": encrypt_password("test123"), } - create_res: dict[str, Any] = create_user(web_api_auth, create_payload) + create_res: dict[str, Any] = create_user( + web_api_auth, create_payload + ) assert create_res["code"] == 0, "Failed to create second user" other_user_id: str = create_res["data"]["id"] + clear_users.append(unique_email) # Try to delete another user's account (should fail) delete_payload: dict[str, Any] = { diff --git a/test/testcases/test_http_api/test_user_management/test_list_user.py b/test/testcases/test_http_api/test_user_management/test_list_user.py index ec1065a80..019e7a470 100644 --- a/test/testcases/test_http_api/test_user_management/test_list_user.py +++ b/test/testcases/test_http_api/test_user_management/test_list_user.py @@ -20,7 +20,7 @@ from typing import Any import pytest -from ..common import create_user, list_users +from common import create_user, list_users from configs import INVALID_API_TOKEN from libs.auth import RAGFlowHttpApiAuth, RAGFlowWebApiAuth @@ -81,7 +81,9 @@ class TestUserList: @pytest.mark.p1 def test_list_single_user( - self, web_api_auth: RAGFlowWebApiAuth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str] ) -> None: """Test listing a single user.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -90,10 +92,13 @@ class TestUserList: "email": unique_email, "password": encrypt_password("test123"), } - create_res: dict[str, Any] = create_user(web_api_auth, create_payload) + create_res: dict[str, Any] = create_user( + web_api_auth, create_payload + ) # Skip if creation fails (password encryption issue in test) if create_res["code"] != 0: pytest.skip("User creation failed, skipping list test") + clear_users.append(unique_email) list_res: dict[str, Any] = list_users(web_api_auth) assert list_res["code"] == 0, list_res @@ -105,7 +110,9 @@ class TestUserList: @pytest.mark.p1 def test_list_multiple_users( - self, web_api_auth: RAGFlowWebApiAuth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test listing multiple users.""" created_emails: list[str] = [] @@ -121,6 +128,7 @@ class TestUserList: ) if create_res["code"] == 0: created_emails.append(unique_email) + clear_users.append(unique_email) if not created_emails: pytest.skip("No users created, skipping list test") @@ -136,7 +144,9 @@ class TestUserList: @pytest.mark.p1 def test_list_users_with_email_filter( - self, web_api_auth: RAGFlowWebApiAuth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test listing users filtered by email.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -145,9 +155,12 @@ class TestUserList: "email": unique_email, "password": encrypt_password("test123"), } - create_res: dict[str, Any] = create_user(web_api_auth, create_payload) + create_res: dict[str, Any] = create_user( + web_api_auth, create_payload + ) if create_res["code"] != 0: pytest.skip("User creation failed, skipping filter test") + clear_users.append(unique_email) # List with email filter params: dict[str, str] = {"email": unique_email} @@ -200,6 +213,7 @@ class TestUserList: page: int, page_size: int, expected_valid: bool, + clear_users: list[str], ) -> None: """Test listing users with pagination.""" # Create some users first @@ -216,6 +230,7 @@ class TestUserList: ) if create_res["code"] == 0: created_count += 1 + clear_users.append(unique_email) if created_count == 0: pytest.skip("No users created, skipping pagination test") @@ -234,7 +249,9 @@ class TestUserList: @pytest.mark.p1 def test_list_users_pagination_boundaries( - self, web_api_auth: RAGFlowWebApiAuth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test pagination boundary conditions.""" # Create 5 users with a unique email pattern for filtering @@ -252,6 +269,7 @@ class TestUserList: ) if create_res["code"] == 0: created_emails.append(unique_email) + clear_users.append(unique_email) if len(created_emails) < 3: pytest.skip("Not enough users created, skipping boundary test") @@ -328,7 +346,9 @@ class TestUserList: @pytest.mark.p2 def test_list_users_combined_filters( - self, web_api_auth: RAGFlowWebApiAuth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test listing users with combined filters.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -337,9 +357,12 @@ class TestUserList: "email": unique_email, "password": encrypt_password("test123"), } - create_res: dict[str, Any] = create_user(web_api_auth, create_payload) + create_res: dict[str, Any] = create_user( + web_api_auth, create_payload + ) if create_res["code"] != 0: pytest.skip("User creation failed, skipping combined filter test") + clear_users.append(unique_email) # Test with email filter and pagination params: dict[str, Any] = { @@ -355,7 +378,9 @@ class TestUserList: @pytest.mark.p2 def test_list_users_performance_with_many_users( - self, web_api_auth: RAGFlowWebApiAuth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test listing performance with multiple users.""" # Create several users @@ -372,6 +397,7 @@ class TestUserList: ) if create_res["code"] == 0: created_count += 1 + clear_users.append(unique_email) if created_count == 0: pytest.skip("No users created, skipping performance test") diff --git a/test/testcases/test_http_api/test_user_management/test_update_user.py b/test/testcases/test_http_api/test_user_management/test_update_user.py index ce76d16d5..31efced6f 100644 --- a/test/testcases/test_http_api/test_user_management/test_update_user.py +++ b/test/testcases/test_http_api/test_user_management/test_update_user.py @@ -21,7 +21,7 @@ from typing import Any import pytest -from ..common import create_user, update_user +from common import create_user, update_user from configs import INVALID_API_TOKEN from libs.auth import RAGFlowHttpApiAuth, RAGFlowWebApiAuth @@ -41,6 +41,7 @@ encrypt_password = conftest_module.encrypt_password # --------------------------------------------------------------------------- @pytest.mark.p1 +@pytest.mark.usefixtures("clear_users") class TestAuthorization: """Tests for authentication behavior during user updates.""" @@ -73,6 +74,7 @@ class TestAuthorization: self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any], + clear_users: list[str], ) -> None: """Test that users can only update their own account.""" # Create another user @@ -82,9 +84,12 @@ class TestAuthorization: "email": unique_email, "password": encrypt_password("test123"), } - create_res: dict[str, Any] = create_user(web_api_auth, create_payload) + create_res: dict[str, Any] = create_user( + web_api_auth, create_payload + ) assert create_res["code"] == 0, "Failed to create second user" other_user_id: str = create_res["data"]["id"] + clear_users.append(unique_email) # Try to update another user's account (should fail) payload: dict[str, Any] = { @@ -105,7 +110,9 @@ class TestUserUpdate: @pytest.mark.p1 def test_update_with_user_id( - self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any] + self, + web_api_auth: RAGFlowWebApiAuth, + test_user: dict[str, Any], ) -> None: payload: dict[str, Any] = { "user_id": test_user["user_id"], @@ -119,7 +126,9 @@ class TestUserUpdate: @pytest.mark.p1 def test_update_with_email( - self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any] + self, + web_api_auth: RAGFlowWebApiAuth, + test_user: dict[str, Any], ) -> None: payload: dict[str, Any] = { "email": test_user["email"], @@ -196,7 +205,9 @@ class TestUserUpdate: @pytest.mark.p1 def test_update_password( - self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any] + self, + web_api_auth: RAGFlowWebApiAuth, + test_user: dict[str, Any], ) -> None: new_password: str = "new_password_456" payload: dict[str, str] = { @@ -209,7 +220,9 @@ class TestUserUpdate: @pytest.mark.p1 def test_update_password_invalid_encryption( - self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any] + self, + web_api_auth: RAGFlowWebApiAuth, + test_user: dict[str, Any], ) -> None: payload: dict[str, str] = { "user_id": test_user["user_id"], @@ -256,7 +269,10 @@ class TestUserUpdate: @pytest.mark.p1 def test_update_email_duplicate( - self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any] + self, + web_api_auth: RAGFlowWebApiAuth, + test_user: dict[str, Any], + clear_users: list[str], ) -> None: unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" create_payload: dict[str, str] = { @@ -264,8 +280,11 @@ class TestUserUpdate: "email": unique_email, "password": encrypt_password("test123"), } - create_res: dict[str, Any] = create_user(web_api_auth, create_payload) + create_res: dict[str, Any] = create_user( + web_api_auth, create_payload + ) assert create_res["code"] == 0 + clear_users.append(unique_email) update_payload: dict[str, str] = { "user_id": test_user["user_id"], @@ -297,7 +316,9 @@ class TestUserUpdate: @pytest.mark.p1 def test_update_multiple_fields( - self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any] + self, + web_api_auth: RAGFlowWebApiAuth, + test_user: dict[str, Any], ) -> None: new_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" payload: dict[str, Any] = { @@ -314,7 +335,9 @@ class TestUserUpdate: @pytest.mark.p1 def test_update_no_fields( - self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any] + self, + web_api_auth: RAGFlowWebApiAuth, + test_user: dict[str, Any], ) -> None: payload: dict[str, str] = {"user_id": test_user["user_id"]} res: dict[str, Any] = update_user(web_api_auth, payload) @@ -323,7 +346,9 @@ class TestUserUpdate: @pytest.mark.p1 def test_update_email_using_email_field_when_user_id_provided( - self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any] + self, + web_api_auth: RAGFlowWebApiAuth, + test_user: dict[str, Any], ) -> None: new_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" payload: dict[str, str] = { @@ -336,7 +361,9 @@ class TestUserUpdate: @pytest.mark.p2 def test_update_response_structure( - self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any] + self, + web_api_auth: RAGFlowWebApiAuth, + test_user: dict[str, Any], ) -> None: payload: dict[str, Any] = { "user_id": test_user["user_id"], @@ -350,7 +377,9 @@ class TestUserUpdate: @pytest.mark.p2 def test_concurrent_updates( - self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any] + self, + web_api_auth: RAGFlowWebApiAuth, + test_user: dict[str, Any], ) -> None: """Test concurrent updates to the same user (users can only update themselves).""" # Test concurrent updates to the authenticated user's own account @@ -373,7 +402,9 @@ class TestUserUpdate: @pytest.mark.p3 def test_update_same_user_multiple_times( - self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any] + self, + web_api_auth: RAGFlowWebApiAuth, + test_user: dict[str, Any], ) -> None: """Test repeated updates on the same user.""" for nickname in ( diff --git a/test/testcases/test_http_api/test_user_management/test_user_edge_cases.py b/test/testcases/test_http_api/test_user_management/test_user_edge_cases.py index 2325c8e2b..dc217f278 100644 --- a/test/testcases/test_http_api/test_user_management/test_user_edge_cases.py +++ b/test/testcases/test_http_api/test_user_management/test_user_edge_cases.py @@ -22,7 +22,7 @@ from typing import Any import pytest -from ..common import create_user, list_users +from common import create_user, list_users from libs.auth import RAGFlowWebApiAuth @@ -33,7 +33,9 @@ class TestUserEdgeCases: @pytest.mark.p2 def test_extremely_long_nickname( - self, web_api_auth: RAGFlowWebApiAuth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test nickname with extreme length.""" long_nickname: str = "A" * 1000 @@ -51,6 +53,7 @@ class TestUserEdgeCases: assert len(res["data"]["nickname"]) <= 255, ( "Nickname should be truncated to max length" ) + clear_users.append(unique_email) else: # If rejected, should have clear error message assert ( @@ -61,7 +64,9 @@ class TestUserEdgeCases: @pytest.mark.p2 def test_unicode_in_all_fields( - self, web_api_auth: RAGFlowWebApiAuth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test unicode characters in various fields.""" # Use ASCII-safe email local part (unicode in display name) @@ -80,10 +85,13 @@ class TestUserEdgeCases: "Unicode nickname should be preserved" ) assert res["data"]["email"] == unique_email + clear_users.append(unique_email) @pytest.mark.p2 def test_emoji_in_nickname( - self, web_api_auth: RAGFlowWebApiAuth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test emoji characters in nickname.""" nickname_with_emoji: str = "Test User 😀🎉🔥" @@ -101,6 +109,7 @@ class TestUserEdgeCases: assert "😀" in res["data"]["nickname"] or "?" in res["data"]["nickname"], ( "Emoji should be preserved or replaced with placeholder" ) + clear_users.append(unique_email) @pytest.mark.p2 @pytest.mark.parametrize( @@ -114,7 +123,10 @@ class TestUserEdgeCases: ], ) def test_special_characters_in_email( - self, web_api_auth: RAGFlowWebApiAuth, special_email: str + self, + web_api_auth: RAGFlowWebApiAuth, + special_email: str, + clear_users: list[str], ) -> None: """Test various special characters in email.""" payload: dict[str, str] = { @@ -125,10 +137,13 @@ class TestUserEdgeCases: res: dict[str, Any] = create_user(web_api_auth, payload) assert res["code"] == 0, f"Failed for email: {special_email}, {res}" assert res["data"]["email"] == special_email + clear_users.append(special_email) @pytest.mark.p2 def test_whitespace_handling_in_fields( - self, web_api_auth: RAGFlowWebApiAuth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test whitespace handling in various fields.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -148,10 +163,13 @@ class TestUserEdgeCases: # Nickname whitespace handling is flexible nickname: str = res["data"]["nickname"] assert nickname.strip() != "", "Nickname should not be only whitespace" + clear_users.append(unique_email) @pytest.mark.p2 def test_null_byte_in_input( - self, web_api_auth: RAGFlowWebApiAuth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test null byte injection in input fields.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -168,6 +186,7 @@ class TestUserEdgeCases: assert "\x00" not in res["data"]["nickname"], ( "Null byte should be sanitized" ) + clear_users.append(unique_email) @pytest.mark.p2 def test_very_long_email( @@ -247,7 +266,9 @@ class TestUserEdgeCases: @pytest.mark.p3 def test_empty_string_vs_none_in_optional_fields( - self, web_api_auth: RAGFlowWebApiAuth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test difference between empty string and None for optional fields.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -263,6 +284,7 @@ class TestUserEdgeCases: # Empty nickname should be accepted per current API behavior assert res_empty["code"] == 0, "Empty nickname should be accepted" assert res_empty["data"]["nickname"] == "" + clear_users.append(unique_email) @pytest.mark.p3 def test_pagination_with_no_results( @@ -281,12 +303,14 @@ class TestUserEdgeCases: @pytest.mark.p3 def test_pagination_beyond_available_pages( - self, web_api_auth: RAGFlowWebApiAuth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test requesting page beyond available data.""" # Create one user unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" - create_user( + create_res: dict[str, Any] = create_user( web_api_auth, { "nickname": "test", @@ -294,6 +318,8 @@ class TestUserEdgeCases: "password": "test123", }, ) + if create_res["code"] == 0: + clear_users.append(unique_email) # Request page 100 res: dict[str, Any] = list_users( @@ -349,7 +375,9 @@ class TestUserEdgeCases: @pytest.mark.p3 def test_special_characters_in_password( - self, web_api_auth: RAGFlowWebApiAuth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test password with various special characters.""" special_passwords: list[str] = [ @@ -368,15 +396,18 @@ class TestUserEdgeCases: "password": password, } res: dict[str, Any] = create_user(web_api_auth, payload) - + # Should accept special characters in password assert res["code"] == 0, ( f"Password with special chars should be accepted: {password}" ) + clear_users.append(unique_email) @pytest.mark.p3 def test_json_injection_in_fields( - self, web_api_auth: RAGFlowWebApiAuth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test JSON injection attempts.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -392,10 +423,13 @@ class TestUserEdgeCases: assert res["data"]["nickname"] == '{"admin": true}', ( "Should store as literal string, not parse JSON" ) + clear_users.append(unique_email) @pytest.mark.p3 def test_path_traversal_in_nickname( - self, web_api_auth: RAGFlowWebApiAuth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test path traversal attempts in nickname.""" traversal_attempts: list[str] = [ @@ -412,7 +446,7 @@ class TestUserEdgeCases: "password": "test123", } res: dict[str, Any] = create_user(web_api_auth, payload) - + # Should either reject or sanitize path traversal attempts # At minimum, should not allow actual file system access if res["code"] == 0: @@ -420,3 +454,4 @@ class TestUserEdgeCases: stored_nickname: str = res["data"]["nickname"] # Verify it's treated as literal string assert isinstance(stored_nickname, str) + clear_users.append(unique_email) diff --git a/test/testcases/test_http_api/test_user_management/test_user_performance.py b/test/testcases/test_http_api/test_user_management/test_user_performance.py index 84cf996f3..c8ad05f07 100644 --- a/test/testcases/test_http_api/test_user_management/test_user_performance.py +++ b/test/testcases/test_http_api/test_user_management/test_user_performance.py @@ -24,7 +24,7 @@ from typing import Any import pytest -from ..common import create_user, list_users +from common import create_user, list_users from libs.auth import RAGFlowWebApiAuth @@ -35,7 +35,9 @@ class TestUserPerformance: @pytest.mark.p2 def test_list_users_performance_small_dataset( - self, web_api_auth: RAGFlowWebApiAuth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test list_users performance with small dataset.""" # Create 20 users @@ -52,6 +54,7 @@ class TestUserPerformance: ) if res["code"] == 0: created_users.append(res["data"]["id"]) + clear_users.append(unique_email) # Test list performance without pagination start: float = time.time() @@ -65,7 +68,9 @@ class TestUserPerformance: @pytest.mark.p2 def test_list_users_pagination_performance( - self, web_api_auth: RAGFlowWebApiAuth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test pagination performance with moderate dataset.""" # Create 50 users @@ -79,6 +84,7 @@ class TestUserPerformance: "password": "test123", }, ) + clear_users.append(unique_email) # Test pagination performance start: float = time.time() @@ -95,13 +101,16 @@ class TestUserPerformance: @pytest.mark.p3 def test_concurrent_user_creation( - self, web_api_auth: RAGFlowWebApiAuth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test concurrent user creation without conflicts.""" count: int = 20 def create_test_user(index: int) -> dict[str, Any]: unique_email: str = f"concurrent_{index}_{uuid.uuid4().hex[:8]}@example.com" + clear_users.append(unique_email) return create_user( web_api_auth, { @@ -136,7 +145,9 @@ class TestUserPerformance: @pytest.mark.p3 def test_user_creation_response_time( - self, web_api_auth: RAGFlowWebApiAuth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test individual user creation response time.""" response_times: list[float] = [] @@ -156,6 +167,7 @@ class TestUserPerformance: assert res["code"] == 0, f"User creation failed: {res}" response_times.append(duration) + clear_users.append(unique_email) # Calculate statistics avg_time: float = sum(response_times) / len(response_times) @@ -172,7 +184,9 @@ class TestUserPerformance: @pytest.mark.p3 def test_sequential_vs_concurrent_creation_comparison( - self, web_api_auth: RAGFlowWebApiAuth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Compare sequential vs concurrent user creation performance.""" count: int = 10 @@ -189,11 +203,13 @@ class TestUserPerformance: "password": "test123", }, ) + clear_users.append(unique_email) sequential_duration: float = time.time() - sequential_start # Concurrent creation def create_concurrent_user(index: int) -> dict[str, Any]: unique_email: str = f"conc_{index}_{uuid.uuid4().hex[:8]}@example.com" + clear_users.append(unique_email) return create_user( web_api_auth, { @@ -231,7 +247,9 @@ class TestUserPerformance: @pytest.mark.p3 def test_pagination_consistency_under_load( - self, web_api_auth: RAGFlowWebApiAuth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test pagination consistency during concurrent modifications.""" # Create initial set of users @@ -246,6 +264,7 @@ class TestUserPerformance: "password": "test123", }, ) + clear_users.append(unique_email) # Test pagination while users are being created def paginate_users() -> dict[str, Any]: @@ -262,6 +281,7 @@ class TestUserPerformance: "password": "test123", }, ) + clear_users.append(unique_email) with ThreadPoolExecutor(max_workers=3) as executor: # Start pagination requests @@ -284,7 +304,9 @@ class TestUserPerformance: @pytest.mark.p3 def test_memory_efficiency_large_list( - self, web_api_auth: RAGFlowWebApiAuth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test memory efficiency when listing many users.""" # Create 100 users @@ -298,6 +320,7 @@ class TestUserPerformance: "password": "test123", }, ) + clear_users.append(unique_email) # List all users (without pagination) res: dict[str, Any] = list_users(web_api_auth) @@ -311,7 +334,9 @@ class TestUserPerformance: @pytest.mark.p3 @pytest.mark.skip(reason="Stress test - run manually") def test_sustained_load( - self, web_api_auth: RAGFlowWebApiAuth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test system stability under sustained load (manual run).""" duration_seconds: int = 60 # Run for 1 minute @@ -336,6 +361,7 @@ class TestUserPerformance: }, ) request_count += 1 + clear_users.append(unique_email) if res["code"] != 0: error_count += 1 @@ -362,7 +388,9 @@ class TestUserPerformance: @pytest.mark.p3 def test_large_payload_handling( - self, web_api_auth: RAGFlowWebApiAuth + self, + web_api_auth: RAGFlowWebApiAuth, + clear_users: list[str], ) -> None: """Test handling of large request payloads.""" # Create user with large nickname (but within limits) @@ -390,3 +418,4 @@ class TestUserPerformance: assert len(res["data"]["nickname"]) <= 255, ( "Nickname should be capped at reasonable length" ) + clear_users.append(unique_email) diff --git a/test/testcases/test_http_api/test_user_management/test_user_security.py b/test/testcases/test_http_api/test_user_management/test_user_security.py index a14eabceb..0da30cc95 100644 --- a/test/testcases/test_http_api/test_user_management/test_user_security.py +++ b/test/testcases/test_http_api/test_user_management/test_user_security.py @@ -22,7 +22,7 @@ from typing import Any import pytest -from ..common import create_user +from common import create_user from libs.auth import RAGFlowWebApiAuth @@ -43,7 +43,9 @@ class TestUserSecurity: ], ) def test_sql_injection_in_email( - self, web_api_auth: RAGFlowWebApiAuth, malicious_email: str + self, + web_api_auth: RAGFlowWebApiAuth, + malicious_email: str, ) -> None: """Test SQL injection attempts in email field are properly handled.""" payload: dict[str, str] = { @@ -70,7 +72,10 @@ class TestUserSecurity: ], ) def test_xss_in_nickname( - self, web_api_auth: RAGFlowWebApiAuth, xss_payload: str + self, + web_api_auth: RAGFlowWebApiAuth, + xss_payload: str, + clear_users: list[str], ) -> None: """Test XSS attempts in nickname field are sanitized.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -92,10 +97,13 @@ class TestUserSecurity: assert " None: """Ensure plain password never appears in response.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -121,6 +129,7 @@ class TestUserSecurity: assert stored_password != password, ( "Stored password should not match plain password" ) + clear_users.append(unique_email) @pytest.mark.p2 @pytest.mark.parametrize( @@ -134,7 +143,10 @@ class TestUserSecurity: ], ) def test_weak_password_handling( - self, web_api_auth: RAGFlowWebApiAuth, weak_password: str + self, + web_api_auth: RAGFlowWebApiAuth, + weak_password: str, + clear_users: list[str], ) -> None: """Test handling of weak passwords.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -149,10 +161,11 @@ class TestUserSecurity: assert res["code"] != 0, ( f"Empty password should be rejected: '{weak_password}'" ) + clear_users.append(unique_email) @pytest.mark.p2 def test_unauthorized_superuser_creation( - self, web_api_auth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth, clear_users: list[str] ) -> None: """Test that regular users cannot escalate privileges.""" # Note: This test assumes web_api_auth represents a regular user @@ -172,10 +185,11 @@ class TestUserSecurity: # Log that this privilege escalation is currently possible # In a production system, this should be blocked pass + clear_users.append(unique_email) @pytest.mark.p2 def test_password_hashing_is_secure( - self, web_api_auth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth, clear_users: list[str] ) -> None: """Verify passwords are hashed using secure algorithm.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -202,6 +216,7 @@ class TestUserSecurity: ) # Should contain salt (indicated by multiple $ separators in scrypt format) assert hashed.count("$") >= 2, "Should include salt in hash" + clear_users.append(unique_email) @pytest.mark.p2 @pytest.mark.parametrize( @@ -213,7 +228,10 @@ class TestUserSecurity: ], ) def test_control_character_injection( - self, web_api_auth: RAGFlowWebApiAuth, injection_attempt: dict[str, str] + self, + web_api_auth: RAGFlowWebApiAuth, + injection_attempt: dict[str, str], + clear_users: list[str], ) -> None: """Test protection against control character injection.""" # Complete the payload with required fields @@ -239,10 +257,11 @@ class TestUserSecurity: assert "\n" not in res["data"]["nickname"], ( "Line feeds should be removed" ) + clear_users.append(payload["email"]) @pytest.mark.p3 def test_session_token_security( - self, web_api_auth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth, clear_users: list[str] ) -> None: """Test that access tokens are properly secured.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -264,10 +283,11 @@ class TestUserSecurity: assert not token.startswith("user_"), ( "Token should not use predictable pattern" ) + clear_users.append(unique_email) @pytest.mark.p3 def test_email_case_sensitivity( - self, web_api_auth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth, clear_users: list[str] ) -> None: """Test email uniqueness is case-insensitive.""" base_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -293,10 +313,12 @@ class TestUserSecurity: # Note: Current implementation may allow this, but it should be fixed # assert res2["code"] != 0, "Uppercase email should be treated as duplicate" # assert "already registered" in res2["message"].lower() + clear_users.append(base_email.lower()) + clear_users.append(base_email.upper()) @pytest.mark.p3 def test_concurrent_user_creation_same_email( - self, web_api_auth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth, clear_users: list[str] ) -> None: """Test race condition protection for duplicate emails.""" import concurrent.futures @@ -340,3 +362,4 @@ class TestUserSecurity: "already registered" in r.get("message", "").lower() for r in failed_responses ), "Failure should be due to duplicate email" + clear_users.append(email)