diff --git a/test/testcases/test_http_api/test_user_management/test_create_user.py b/test/testcases/test_http_api/test_user_management/test_create_user.py index 466cc224b..7f3664522 100644 --- a/test/testcases/test_http_api/test_user_management/test_create_user.py +++ b/test/testcases/test_http_api/test_user_management/test_create_user.py @@ -132,7 +132,7 @@ class TestUserCreate: ) def test_required_fields( self, - WebApiAuth, + web_api_auth, payload: dict[str, Any], expected_code: int, expected_message: str, @@ -142,7 +142,7 @@ class TestUserCreate: # Use unique email to avoid conflicts unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" payload["email"] = unique_email - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) assert res["code"] == expected_code, res if expected_code == 0: assert res["data"]["nickname"] == payload["nickname"] @@ -167,7 +167,7 @@ class TestUserCreate: ) def test_email_validation( self, - WebApiAuth, + web_api_auth, email: str, expected_code: int, expected_message: str, @@ -181,7 +181,7 @@ class TestUserCreate: "email": email, "password": "test123", } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) assert res["code"] == expected_code, res if expected_code == 0: assert res["data"]["email"] == email @@ -201,7 +201,7 @@ class TestUserCreate: ) def test_nickname( self, - WebApiAuth, + web_api_auth, nickname: str, expected_code: int, expected_message: str, @@ -213,7 +213,7 @@ class TestUserCreate: "email": unique_email, "password": "test123", } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) assert res["code"] == expected_code, res if expected_code == 0: assert res["data"]["nickname"] == nickname @@ -222,7 +222,7 @@ class TestUserCreate: @pytest.mark.p1 def test_duplicate_email( - self, WebApiAuth + self, web_api_auth ) -> None: """Test that creating a user with duplicate email fails.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -231,7 +231,7 @@ class TestUserCreate: "email": unique_email, "password": "test123", } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) assert res["code"] == 0 # Try to create another user with the same email @@ -240,7 +240,7 @@ class TestUserCreate: "email": unique_email, "password": "test123", } - res2: dict[str, Any] = create_user(WebApiAuth, payload2) + res2: dict[str, Any] = create_user(web_api_auth, payload2) assert res2["code"] == 103 assert "has already registered" in res2["message"] @@ -255,7 +255,7 @@ class TestUserCreate: ) def test_is_superuser( self, - WebApiAuth, + web_api_auth, is_superuser: bool | None, expected_value: bool, ) -> None: @@ -269,13 +269,13 @@ class TestUserCreate: if is_superuser is not None: payload["is_superuser"] = is_superuser - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) assert res["code"] == 0 assert res["data"]["is_superuser"] == expected_value @pytest.mark.p2 def test_password_hashing( - self, WebApiAuth + self, web_api_auth ) -> None: """Test that password is properly hashed when stored.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -285,7 +285,7 @@ class TestUserCreate: "email": unique_email, "password": password, # Plain text password } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) assert res["code"] == 0 # Password should be hashed in the response (not plain text) assert "password" in res["data"], ( @@ -299,7 +299,7 @@ class TestUserCreate: @pytest.mark.p2 def test_plain_text_password_accepted( - self, WebApiAuth + self, web_api_auth ) -> None: """Test that plain text password is accepted.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -308,14 +308,14 @@ class TestUserCreate: "email": unique_email, "password": "plain_text_password", # Plain text, no encryption } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) # Should succeed with plain text password assert res["code"] == 0 assert res["data"]["email"] == unique_email @pytest.mark.p3 def test_concurrent_create( - self, WebApiAuth + self, web_api_auth ) -> None: """Test concurrent user creation with multiple threads.""" count: int = 10 @@ -329,7 +329,7 @@ class TestUserCreate: "password": "test123", } futures.append( - executor.submit(create_user, WebApiAuth, payload) + executor.submit(create_user, web_api_auth, payload) ) responses: list[Future[dict[str, Any]]] = list( as_completed(futures) @@ -341,7 +341,7 @@ class TestUserCreate: @pytest.mark.p2 def test_user_creation_response_structure( - self, WebApiAuth + self, web_api_auth ) -> None: """Test that user creation returns the expected response structure.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -350,7 +350,7 @@ class TestUserCreate: "email": unique_email, "password": "test123", } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) assert res["code"] == 0 assert "data" in res assert "id" in res["data"] diff --git a/test/testcases/test_http_api/test_user_management/test_delete_user.py b/test/testcases/test_http_api/test_user_management/test_delete_user.py index f6b1e54c1..f39c99436 100644 --- a/test/testcases/test_http_api/test_user_management/test_delete_user.py +++ b/test/testcases/test_http_api/test_user_management/test_delete_user.py @@ -57,7 +57,7 @@ class TestAuthorization: invalid_auth: RAGFlowWebApiAuth | None, expected_code: int, expected_message: str, - WebApiAuth: RAGFlowWebApiAuth, + web_api_auth: RAGFlowWebApiAuth, ) -> None: """Test user deletion with invalid or missing authentication.""" # Create a test user first @@ -67,7 +67,7 @@ class TestAuthorization: "email": unique_email, "password": encrypt_password("test123"), } - create_res: dict[str, Any] = create_user(WebApiAuth, create_payload) + create_res: dict[str, Any] = create_user(web_api_auth, create_payload) if create_res["code"] != 0: pytest.skip("User creation failed, skipping auth test") @@ -83,7 +83,7 @@ class TestAuthorization: @pytest.mark.p1 def test_user_can_only_delete_themselves( self, - WebApiAuth: RAGFlowWebApiAuth, + web_api_auth: RAGFlowWebApiAuth, ) -> None: """Test that users can only delete their own account.""" # Create another user @@ -93,7 +93,7 @@ class TestAuthorization: "email": unique_email, "password": encrypt_password("test123"), } - create_res: dict[str, Any] = create_user(WebApiAuth, create_payload) + create_res: dict[str, Any] = create_user(web_api_auth, create_payload) assert create_res["code"] == 0, "Failed to create second user" other_user_id: str = create_res["data"]["id"] @@ -101,7 +101,7 @@ class TestAuthorization: delete_payload: dict[str, Any] = { "user_id": other_user_id, } - res: dict[str, Any] = delete_user(WebApiAuth, delete_payload) + res: dict[str, Any] = delete_user(web_api_auth, delete_payload) assert res["code"] == 403, f"Expected 403 FORBIDDEN, got {res}" assert "only delete your own account" in res["message"].lower() @@ -112,50 +112,50 @@ class TestUserDelete: @pytest.mark.p1 def test_delete_user_missing_identifier( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test deletion without user_id or email.""" delete_payload: dict[str, str] = {} - res: dict[str, Any] = delete_user(WebApiAuth, delete_payload) + res: dict[str, Any] = delete_user(web_api_auth, delete_payload) assert res["code"] == 101 assert "Either user_id or email must be provided" in res["message"] @pytest.mark.p1 def test_delete_user_not_found_by_id( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test deletion of non-existent user by ID.""" delete_payload: dict[str, str] = { "user_id": "non_existent_user_id_12345", } - res: dict[str, Any] = delete_user(WebApiAuth, delete_payload) + res: dict[str, Any] = delete_user(web_api_auth, delete_payload) assert res["code"] == 102 assert "User not found" in res["message"] @pytest.mark.p1 def test_delete_user_invalid_email_format( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test deletion with invalid email format.""" delete_payload: dict[str, str] = {"email": "invalid_email_format"} - res: dict[str, Any] = delete_user(WebApiAuth, delete_payload) + res: dict[str, Any] = delete_user(web_api_auth, delete_payload) assert res["code"] == 103 assert "Invalid email address" in res["message"] @pytest.mark.p3 def test_delete_user_idempotency( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test that deleting a non-existent user returns consistent error.""" non_existent_id: str = f"nonexistent_{uuid.uuid4().hex[:16]}" # First attempt delete_payload: dict[str, str] = {"user_id": non_existent_id} - res1: dict[str, Any] = delete_user(WebApiAuth, delete_payload) + res1: dict[str, Any] = delete_user(web_api_auth, delete_payload) assert res1["code"] == 102 # Second attempt (should return same error) - res2: dict[str, Any] = delete_user(WebApiAuth, delete_payload) + res2: dict[str, Any] = delete_user(web_api_auth, delete_payload) assert res2["code"] == 102 assert res1["code"] == res2["code"] diff --git a/test/testcases/test_http_api/test_user_management/test_list_user.py b/test/testcases/test_http_api/test_user_management/test_list_user.py index d555e6926..e60676e6e 100644 --- a/test/testcases/test_http_api/test_user_management/test_list_user.py +++ b/test/testcases/test_http_api/test_user_management/test_list_user.py @@ -71,17 +71,17 @@ class TestUserList: # @pytest.mark.p1 # def test_list_empty_users( - # self, HttpApiAuth: RAGFlowHttpApiAuth + # self, http_api_auth: RAGFlowHttpApiAuth # ) -> None: # """Test listing users when no users exist.""" - # res: dict[str, Any] = list_users(HttpApiAuth) + # res: dict[str, Any] = list_users(http_api_auth) # assert res["code"] == 0, res # assert isinstance(res["data"], list) # assert len(res["data"]) == 0 @pytest.mark.p1 def test_list_single_user( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test listing a single user.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -90,12 +90,12 @@ class TestUserList: "email": unique_email, "password": encrypt_password("test123"), } - create_res: dict[str, Any] = create_user(WebApiAuth, create_payload) + create_res: dict[str, Any] = create_user(web_api_auth, create_payload) # Skip if creation fails (password encryption issue in test) if create_res["code"] != 0: pytest.skip("User creation failed, skipping list test") - list_res: dict[str, Any] = list_users(WebApiAuth) + list_res: dict[str, Any] = list_users(web_api_auth) assert list_res["code"] == 0, list_res assert isinstance(list_res["data"], list) assert len(list_res["data"]) >= 1 @@ -105,7 +105,7 @@ class TestUserList: @pytest.mark.p1 def test_list_multiple_users( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test listing multiple users.""" created_emails: list[str] = [] @@ -117,7 +117,7 @@ class TestUserList: "password": encrypt_password("test123"), } create_res: dict[str, Any] = create_user( - WebApiAuth, create_payload + web_api_auth, create_payload ) if create_res["code"] == 0: created_emails.append(unique_email) @@ -125,7 +125,7 @@ class TestUserList: if not created_emails: pytest.skip("No users created, skipping list test") - list_res: dict[str, Any] = list_users(WebApiAuth) + list_res: dict[str, Any] = list_users(web_api_auth) assert list_res["code"] == 0, list_res assert isinstance(list_res["data"], list) assert len(list_res["data"]) >= len(created_emails) @@ -136,7 +136,7 @@ class TestUserList: @pytest.mark.p1 def test_list_users_with_email_filter( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test listing users filtered by email.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -145,13 +145,13 @@ class TestUserList: "email": unique_email, "password": encrypt_password("test123"), } - create_res: dict[str, Any] = create_user(WebApiAuth, create_payload) + create_res: dict[str, Any] = create_user(web_api_auth, create_payload) if create_res["code"] != 0: pytest.skip("User creation failed, skipping filter test") # List with email filter params: dict[str, str] = {"email": unique_email} - list_res: dict[str, Any] = list_users(WebApiAuth, params=params) + list_res: dict[str, Any] = list_users(web_api_auth, params=params) assert list_res["code"] == 0, list_res assert isinstance(list_res["data"], list) assert len(list_res["data"]) >= 1 @@ -161,22 +161,22 @@ class TestUserList: @pytest.mark.p1 def test_list_users_with_invalid_email_filter( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test listing users with invalid email filter.""" params: dict[str, str] = {"email": "invalid_email_format"} - list_res: dict[str, Any] = list_users(WebApiAuth, params=params) + list_res: dict[str, Any] = list_users(web_api_auth, params=params) assert list_res["code"] != 0 assert "Invalid email address" in list_res["message"] @pytest.mark.p1 def test_list_users_with_nonexistent_email_filter( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test listing users with non-existent email filter.""" nonexistent_email: str = f"nonexistent_{uuid.uuid4().hex[:8]}@example.com" params: dict[str, str] = {"email": nonexistent_email} - list_res: dict[str, Any] = list_users(WebApiAuth, params=params) + list_res: dict[str, Any] = list_users(web_api_auth, params=params) assert list_res["code"] == 0, list_res assert isinstance(list_res["data"], list) assert len(list_res["data"]) == 0 @@ -196,7 +196,7 @@ class TestUserList: ) def test_list_users_with_pagination( self, - WebApiAuth: RAGFlowWebApiAuth, + web_api_auth: RAGFlowWebApiAuth, page: int, page_size: int, expected_valid: bool, @@ -212,7 +212,7 @@ class TestUserList: "password": encrypt_password("test123"), } create_res: dict[str, Any] = create_user( - WebApiAuth, create_payload + web_api_auth, create_payload ) if create_res["code"] == 0: created_count += 1 @@ -221,7 +221,7 @@ class TestUserList: pytest.skip("No users created, skipping pagination test") params: dict[str, int] = {"page": page, "page_size": page_size} - list_res: dict[str, Any] = list_users(WebApiAuth, params=params) + list_res: dict[str, Any] = list_users(web_api_auth, params=params) if expected_valid: assert list_res["code"] == 0, list_res @@ -234,7 +234,7 @@ class TestUserList: @pytest.mark.p1 def test_list_users_pagination_boundaries( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test pagination boundary conditions.""" # Create 5 users with a unique email pattern for filtering @@ -248,7 +248,7 @@ class TestUserList: "password": encrypt_password("test123"), } create_res: dict[str, Any] = create_user( - WebApiAuth, create_payload + web_api_auth, create_payload ) if create_res["code"] == 0: created_emails.append(unique_email) @@ -257,18 +257,18 @@ class TestUserList: pytest.skip("Not enough users created, skipping boundary test") # Get total count of all users to calculate pagination boundaries - list_res_all: dict[str, Any] = list_users(WebApiAuth) + list_res_all: dict[str, Any] = list_users(web_api_auth) total_users: int = len(list_res_all["data"]) # Test first page params: dict[str, int] = {"page": 1, "page_size": 2} - list_res: dict[str, Any] = list_users(WebApiAuth, params=params) + list_res: dict[str, Any] = list_users(web_api_auth, params=params) assert list_res["code"] == 0, list_res assert len(list_res["data"]) == 2 # Test that pagination returns consistent page sizes params = {"page": 2, "page_size": 2} - list_res = list_users(WebApiAuth, params=params) + list_res = list_users(web_api_auth, params=params) assert list_res["code"] == 0, list_res assert len(list_res["data"]) == 2 @@ -278,23 +278,23 @@ class TestUserList: last_page: int = (total_users + page_size - 1) // page_size if last_page > 0: params = {"page": last_page, "page_size": page_size} - list_res = list_users(WebApiAuth, params=params) + list_res = list_users(web_api_auth, params=params) assert list_res["code"] == 0, list_res assert len(list_res["data"]) <= page_size # Test page beyond available data # Use a page number that's definitely beyond available data params = {"page": total_users + 10, "page_size": 2} - list_res = list_users(WebApiAuth, params=params) + list_res = list_users(web_api_auth, params=params) assert list_res["code"] == 0, list_res assert len(list_res["data"]) == 0 @pytest.mark.p1 def test_list_users_response_structure( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test that user listing returns the expected response structure.""" - res: dict[str, Any] = list_users(WebApiAuth) + res: dict[str, Any] = list_users(web_api_auth) assert res["code"] == 0 assert "data" in res assert isinstance(res["data"], list) @@ -311,24 +311,24 @@ class TestUserList: @pytest.mark.p1 def test_list_users_with_invalid_page_params( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test listing users with invalid pagination parameters.""" # Test invalid page (non-integer) params: dict[str, str] = {"page": "invalid", "page_size": "10"} - list_res: dict[str, Any] = list_users(WebApiAuth, params=params) + list_res: dict[str, Any] = list_users(web_api_auth, params=params) # Should handle gracefully or return error # The exact behavior depends on implementation assert "code" in list_res # Test invalid page_size (non-integer) params = {"page": "1", "page_size": "invalid"} - list_res = list_users(WebApiAuth, params=params) + list_res = list_users(web_api_auth, params=params) assert "code" in list_res @pytest.mark.p2 def test_list_users_combined_filters( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test listing users with combined filters.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -337,7 +337,7 @@ class TestUserList: "email": unique_email, "password": encrypt_password("test123"), } - create_res: dict[str, Any] = create_user(WebApiAuth, create_payload) + create_res: dict[str, Any] = create_user(web_api_auth, create_payload) if create_res["code"] != 0: pytest.skip("User creation failed, skipping combined filter test") @@ -347,7 +347,7 @@ class TestUserList: "page": 1, "page_size": 10, } - list_res: dict[str, Any] = list_users(WebApiAuth, params=params) + list_res: dict[str, Any] = list_users(web_api_auth, params=params) assert list_res["code"] == 0, list_res assert isinstance(list_res["data"], list) # Should return at least the created user @@ -355,7 +355,7 @@ class TestUserList: @pytest.mark.p2 def test_list_users_performance_with_many_users( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test listing performance with multiple users.""" # Create several users @@ -368,7 +368,7 @@ class TestUserList: "password": encrypt_password("test123"), } create_res: dict[str, Any] = create_user( - WebApiAuth, create_payload + web_api_auth, create_payload ) if create_res["code"] == 0: created_count += 1 @@ -377,7 +377,7 @@ class TestUserList: pytest.skip("No users created, skipping performance test") # List all users - list_res: dict[str, Any] = list_users(WebApiAuth) + list_res: dict[str, Any] = list_users(web_api_auth) assert list_res["code"] == 0, list_res assert isinstance(list_res["data"], list) # Should return at least the created users diff --git a/test/testcases/test_http_api/test_user_management/test_update_user.py b/test/testcases/test_http_api/test_user_management/test_update_user.py index 6f792f6ae..08e1684c6 100644 --- a/test/testcases/test_http_api/test_user_management/test_update_user.py +++ b/test/testcases/test_http_api/test_user_management/test_update_user.py @@ -71,7 +71,7 @@ class TestAuthorization: @pytest.mark.p1 def test_user_can_only_update_themselves( self, - WebApiAuth: RAGFlowWebApiAuth, + web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any], ) -> None: """Test that users can only update their own account.""" @@ -82,7 +82,7 @@ class TestAuthorization: "email": unique_email, "password": encrypt_password("test123"), } - create_res: dict[str, Any] = create_user(WebApiAuth, create_payload) + create_res: dict[str, Any] = create_user(web_api_auth, create_payload) assert create_res["code"] == 0, "Failed to create second user" other_user_id: str = create_res["data"]["id"] @@ -91,7 +91,7 @@ class TestAuthorization: "user_id": other_user_id, "nickname": "hacked_nickname", } - res: dict[str, Any] = update_user(WebApiAuth, payload) + res: dict[str, Any] = update_user(web_api_auth, payload) assert res["code"] == 403, f"Expected 403 FORBIDDEN, got {res}" assert "only update your own account" in res["message"].lower() @@ -105,13 +105,13 @@ class TestUserUpdate: @pytest.mark.p1 def test_update_with_user_id( - self, WebApiAuth: RAGFlowWebApiAuth, test_user: dict[str, Any] + self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any] ) -> None: payload: dict[str, Any] = { "user_id": test_user["user_id"], "nickname": "updated_nickname", } - res: dict[str, Any] = update_user(WebApiAuth, payload) + res: dict[str, Any] = update_user(web_api_auth, payload) assert res["code"] == 0, res assert res["data"]["nickname"] == "updated_nickname" assert res["data"]["email"] == test_user["email"] @@ -119,48 +119,48 @@ class TestUserUpdate: @pytest.mark.p1 def test_update_with_email( - self, WebApiAuth: RAGFlowWebApiAuth, test_user: dict[str, Any] + self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any] ) -> None: payload: dict[str, Any] = { "email": test_user["email"], "nickname": "updated_nickname_email", } - res: dict[str, Any] = update_user(WebApiAuth, payload) + res: dict[str, Any] = update_user(web_api_auth, payload) assert res["code"] == 0, res assert res["data"]["nickname"] == "updated_nickname_email" assert res["data"]["email"] == test_user["email"] @pytest.mark.p1 def test_update_missing_identifier( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test update without user_id or email.""" payload: dict[str, str] = {"nickname": "updated_nickname"} - res: dict[str, Any] = update_user(WebApiAuth, payload) + res: dict[str, Any] = update_user(web_api_auth, payload) assert res["code"] == 101 assert "Either user_id or email must be provided" in res["message"] @pytest.mark.p1 def test_update_user_not_found_by_id( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: payload: dict[str, str] = { "user_id": "non_existent_user_id_12345", "nickname": "updated_nickname", } - res: dict[str, Any] = update_user(WebApiAuth, payload) + res: dict[str, Any] = update_user(web_api_auth, payload) assert res["code"] == 102 assert "User not found" in res["message"] @pytest.mark.p1 def test_update_user_not_found_by_email( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: payload: dict[str, str] = { "email": "nonexistent@example.com", "nickname": "updated_nickname", } - res: dict[str, Any] = update_user(WebApiAuth, payload) + res: dict[str, Any] = update_user(web_api_auth, payload) assert res["code"] == 102 assert "not found" in res["message"] @@ -177,7 +177,7 @@ class TestUserUpdate: ) def test_update_nickname( self, - WebApiAuth: RAGFlowWebApiAuth, + web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any], nickname: str, expected_code: int, @@ -187,7 +187,7 @@ class TestUserUpdate: "user_id": test_user["user_id"], "nickname": nickname, } - res: dict[str, Any] = update_user(WebApiAuth, payload) + res: dict[str, Any] = update_user(web_api_auth, payload) assert res["code"] == expected_code, res if expected_code == 0: assert res["data"]["nickname"] == nickname @@ -196,26 +196,26 @@ class TestUserUpdate: @pytest.mark.p1 def test_update_password( - self, WebApiAuth: RAGFlowWebApiAuth, test_user: dict[str, Any] + self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any] ) -> None: new_password: str = "new_password_456" payload: dict[str, str] = { "user_id": test_user["user_id"], "password": encrypt_password(new_password), } - res: dict[str, Any] = update_user(WebApiAuth, payload) + res: dict[str, Any] = update_user(web_api_auth, payload) assert res["code"] == 0, res assert "updated successfully" in res["message"].lower() @pytest.mark.p1 def test_update_password_invalid_encryption( - self, WebApiAuth: RAGFlowWebApiAuth, test_user: dict[str, Any] + self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any] ) -> None: payload: dict[str, str] = { "user_id": test_user["user_id"], "password": "plain_text_password", } - res: dict[str, Any] = update_user(WebApiAuth, payload) + res: dict[str, Any] = update_user(web_api_auth, payload) assert res["code"] == 500 assert "Fail to decrypt password" in res["message"] @@ -235,7 +235,7 @@ class TestUserUpdate: ) def test_update_email( self, - WebApiAuth: RAGFlowWebApiAuth, + web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any], new_email: str, expected_code: int, @@ -247,7 +247,7 @@ class TestUserUpdate: "user_id": test_user["user_id"], "new_email": new_email, } - res: dict[str, Any] = update_user(WebApiAuth, payload) + res: dict[str, Any] = update_user(web_api_auth, payload) assert res["code"] == expected_code, res if expected_code == 0: assert res["data"]["email"] == new_email @@ -256,7 +256,7 @@ class TestUserUpdate: @pytest.mark.p1 def test_update_email_duplicate( - self, WebApiAuth: RAGFlowWebApiAuth, test_user: dict[str, Any] + self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any] ) -> None: unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" create_payload: dict[str, str] = { @@ -264,14 +264,14 @@ class TestUserUpdate: "email": unique_email, "password": encrypt_password("test123"), } - create_res: dict[str, Any] = create_user(WebApiAuth, create_payload) + create_res: dict[str, Any] = create_user(web_api_auth, create_payload) assert create_res["code"] == 0 update_payload: dict[str, str] = { "user_id": test_user["user_id"], "new_email": unique_email, } - res: dict[str, Any] = update_user(WebApiAuth, update_payload) + res: dict[str, Any] = update_user(web_api_auth, update_payload) assert res["code"] == 103 assert "already in use" in res["message"] @@ -282,7 +282,7 @@ class TestUserUpdate: ) def test_update_is_superuser( self, - WebApiAuth: RAGFlowWebApiAuth, + web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any], is_superuser: bool, expected_value: bool, @@ -291,13 +291,13 @@ class TestUserUpdate: "user_id": test_user["user_id"], "is_superuser": is_superuser, } - res: dict[str, Any] = update_user(WebApiAuth, payload) + res: dict[str, Any] = update_user(web_api_auth, payload) assert res["code"] == 0, res assert res["data"]["is_superuser"] is expected_value @pytest.mark.p1 def test_update_multiple_fields( - self, WebApiAuth: RAGFlowWebApiAuth, test_user: dict[str, Any] + self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any] ) -> None: new_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" payload: dict[str, Any] = { @@ -306,7 +306,7 @@ class TestUserUpdate: "new_email": new_email, "is_superuser": True, } - res: dict[str, Any] = update_user(WebApiAuth, payload) + res: dict[str, Any] = update_user(web_api_auth, payload) assert res["code"] == 0, res assert res["data"]["nickname"] == "updated_multiple" assert res["data"]["email"] == new_email @@ -314,35 +314,35 @@ class TestUserUpdate: @pytest.mark.p1 def test_update_no_fields( - self, WebApiAuth: RAGFlowWebApiAuth, test_user: dict[str, Any] + self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any] ) -> None: payload: dict[str, str] = {"user_id": test_user["user_id"]} - res: dict[str, Any] = update_user(WebApiAuth, payload) + res: dict[str, Any] = update_user(web_api_auth, payload) assert res["code"] == 101 assert "No valid fields to update" in res["message"] @pytest.mark.p1 def test_update_email_using_email_field_when_user_id_provided( - self, WebApiAuth: RAGFlowWebApiAuth, test_user: dict[str, Any] + self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any] ) -> None: new_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" payload: dict[str, str] = { "user_id": test_user["user_id"], "email": new_email, } - res: dict[str, Any] = update_user(WebApiAuth, payload) + res: dict[str, Any] = update_user(web_api_auth, payload) assert res["code"] == 0, res assert res["data"]["email"] == new_email @pytest.mark.p2 def test_update_response_structure( - self, WebApiAuth: RAGFlowWebApiAuth, test_user: dict[str, Any] + self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any] ) -> None: payload: dict[str, Any] = { "user_id": test_user["user_id"], "nickname": "response_test", } - res: dict[str, Any] = update_user(WebApiAuth, payload) + res: dict[str, Any] = update_user(web_api_auth, payload) assert res["code"] == 0 assert set(("id", "email", "nickname")) <= res["data"].keys() assert res["data"]["nickname"] == "response_test" @@ -350,7 +350,7 @@ class TestUserUpdate: @pytest.mark.p2 def test_concurrent_updates( - self, WebApiAuth: RAGFlowWebApiAuth, test_user: dict[str, Any] + self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any] ) -> None: """Test concurrent updates to the same user (users can only update themselves).""" # Test concurrent updates to the authenticated user's own account @@ -358,7 +358,7 @@ class TestUserUpdate: futures: list[Future[dict[str, Any]]] = [ executor.submit( update_user, - WebApiAuth, + web_api_auth, { "user_id": test_user["user_id"], "nickname": f"updated_user_{i}", @@ -373,7 +373,7 @@ class TestUserUpdate: @pytest.mark.p3 def test_update_same_user_multiple_times( - self, WebApiAuth: RAGFlowWebApiAuth, test_user: dict[str, Any] + self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any] ) -> None: """Test repeated updates on the same user.""" for nickname in ( @@ -385,6 +385,6 @@ class TestUserUpdate: "user_id": test_user["user_id"], "nickname": nickname, } - res: dict[str, Any] = update_user(WebApiAuth, payload) + res: dict[str, Any] = update_user(web_api_auth, payload) assert res["code"] == 0 assert res["data"]["nickname"] == nickname diff --git a/test/testcases/test_http_api/test_user_management/test_user_edge_cases.py b/test/testcases/test_http_api/test_user_management/test_user_edge_cases.py index 4b8486ab5..50f9517ac 100644 --- a/test/testcases/test_http_api/test_user_management/test_user_edge_cases.py +++ b/test/testcases/test_http_api/test_user_management/test_user_edge_cases.py @@ -33,7 +33,7 @@ class TestUserEdgeCases: @pytest.mark.p2 def test_extremely_long_nickname( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test nickname with extreme length.""" long_nickname: str = "A" * 1000 @@ -43,7 +43,7 @@ class TestUserEdgeCases: "email": unique_email, "password": "test123", } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) # Should either accept (and possibly truncate) or reject with clear error if res["code"] == 0: @@ -61,7 +61,7 @@ class TestUserEdgeCases: @pytest.mark.p2 def test_unicode_in_all_fields( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test unicode characters in various fields.""" # Use ASCII-safe email local part (unicode in display name) @@ -72,7 +72,7 @@ class TestUserEdgeCases: "email": unique_email, "password": "密码123!", # Chinese + ASCII } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) # Should handle unicode properly if res["code"] == 0: @@ -83,7 +83,7 @@ class TestUserEdgeCases: @pytest.mark.p2 def test_emoji_in_nickname( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test emoji characters in nickname.""" nickname_with_emoji: str = "Test User 😀🎉🔥" @@ -93,7 +93,7 @@ class TestUserEdgeCases: "email": unique_email, "password": "test123", } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) # Should handle emoji properly (either accept or reject gracefully) if res["code"] == 0: @@ -114,7 +114,7 @@ class TestUserEdgeCases: ], ) def test_special_characters_in_email( - self, WebApiAuth: RAGFlowWebApiAuth, special_email: str + self, web_api_auth: RAGFlowWebApiAuth, special_email: str ) -> None: """Test various special characters in email.""" payload: dict[str, str] = { @@ -122,13 +122,13 @@ class TestUserEdgeCases: "email": special_email, "password": "test123", } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) assert res["code"] == 0, f"Failed for email: {special_email}, {res}" assert res["data"]["email"] == special_email @pytest.mark.p2 def test_whitespace_handling_in_fields( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test whitespace handling in various fields.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -137,7 +137,7 @@ class TestUserEdgeCases: "email": f" {unique_email} ", # Spaces around email "password": "test123", } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) # Should handle whitespace (trim or accept) if res["code"] == 0: @@ -151,7 +151,7 @@ class TestUserEdgeCases: @pytest.mark.p2 def test_null_byte_in_input( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test null byte injection in input fields.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -160,7 +160,7 @@ class TestUserEdgeCases: "email": unique_email, "password": "test123", } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) # Should handle or reject null bytes if res["code"] == 0: @@ -171,7 +171,7 @@ class TestUserEdgeCases: @pytest.mark.p2 def test_very_long_email( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test email exceeding typical length limits.""" # Create email with very long local part (250 chars) @@ -182,7 +182,7 @@ class TestUserEdgeCases: "email": email, "password": "test123", } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) # Should reject overly long emails (RFC 5321 limits local part to 64 chars) assert res["code"] != 0, "Very long email should be rejected" @@ -193,7 +193,7 @@ class TestUserEdgeCases: @pytest.mark.p2 def test_email_with_multiple_at_signs( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test email with multiple @ signs.""" payload: dict[str, str] = { @@ -201,7 +201,7 @@ class TestUserEdgeCases: "email": "user@@example.com", "password": "test123", } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) # Should reject invalid email format assert res["code"] != 0, "Email with multiple @ should be rejected" @@ -209,7 +209,7 @@ class TestUserEdgeCases: @pytest.mark.p2 def test_email_with_spaces( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test email containing spaces.""" payload: dict[str, str] = { @@ -217,7 +217,7 @@ class TestUserEdgeCases: "email": "user name@example.com", "password": "test123", } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) # Should reject email with spaces assert res["code"] != 0, "Email with spaces should be rejected" @@ -225,7 +225,7 @@ class TestUserEdgeCases: @pytest.mark.p3 def test_leading_trailing_dots_in_email( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test email with leading/trailing dots.""" invalid_emails: list[str] = [ @@ -240,14 +240,14 @@ class TestUserEdgeCases: "email": email, "password": "test123", } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) # These should be rejected as invalid assert res["code"] != 0, f"Invalid email should be rejected: {email}" @pytest.mark.p3 def test_empty_string_vs_none_in_optional_fields( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test difference between empty string and None for optional fields.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -258,7 +258,7 @@ class TestUserEdgeCases: "email": unique_email, "password": "test123", } - res_empty: dict[str, Any] = create_user(WebApiAuth, payload_empty) + res_empty: dict[str, Any] = create_user(web_api_auth, payload_empty) # Empty nickname should be accepted per current API behavior assert res_empty["code"] == 0, "Empty nickname should be accepted" @@ -266,12 +266,12 @@ class TestUserEdgeCases: @pytest.mark.p3 def test_pagination_with_no_results( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test list_users pagination when no users exist.""" # Assuming clear_users fixture has cleared all test users res: dict[str, Any] = list_users( - WebApiAuth, params={"page": 1, "page_size": 10} + web_api_auth, params={"page": 1, "page_size": 10} ) # Should return empty list, not error @@ -281,13 +281,13 @@ class TestUserEdgeCases: @pytest.mark.p3 def test_pagination_beyond_available_pages( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test requesting page beyond available data.""" # Create one user unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" create_user( - WebApiAuth, + web_api_auth, { "nickname": "test", "email": unique_email, @@ -297,7 +297,7 @@ class TestUserEdgeCases: # Request page 100 res: dict[str, Any] = list_users( - WebApiAuth, params={"page": 100, "page_size": 10} + web_api_auth, params={"page": 100, "page_size": 10} ) # Should return empty results, not error @@ -306,11 +306,11 @@ class TestUserEdgeCases: @pytest.mark.p3 def test_zero_page_size( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test pagination with page_size of 0.""" res: dict[str, Any] = list_users( - WebApiAuth, params={"page": 1, "page_size": 0} + web_api_auth, params={"page": 1, "page_size": 0} ) # Should reject invalid page size @@ -319,11 +319,11 @@ class TestUserEdgeCases: @pytest.mark.p3 def test_negative_page_number( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test pagination with negative page number.""" res: dict[str, Any] = list_users( - WebApiAuth, params={"page": -1, "page_size": 10} + web_api_auth, params={"page": -1, "page_size": 10} ) # Should reject negative page number @@ -332,11 +332,11 @@ class TestUserEdgeCases: @pytest.mark.p3 def test_excessive_page_size( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test pagination with very large page_size.""" res: dict[str, Any] = list_users( - WebApiAuth, params={"page": 1, "page_size": 10000} + web_api_auth, params={"page": 1, "page_size": 10000} ) # Should either cap page size or reject @@ -349,7 +349,7 @@ class TestUserEdgeCases: @pytest.mark.p3 def test_special_characters_in_password( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test password with various special characters.""" special_passwords: list[str] = [ @@ -367,7 +367,7 @@ class TestUserEdgeCases: "email": unique_email, "password": password, } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) # Should accept special characters in password assert res["code"] == 0, ( @@ -376,7 +376,7 @@ class TestUserEdgeCases: @pytest.mark.p3 def test_json_injection_in_fields( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test JSON injection attempts.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -385,7 +385,7 @@ class TestUserEdgeCases: "email": unique_email, "password": "test123", } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) # Should treat as literal string, not parse as JSON if res["code"] == 0: @@ -395,7 +395,7 @@ class TestUserEdgeCases: @pytest.mark.p3 def test_path_traversal_in_nickname( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test path traversal attempts in nickname.""" traversal_attempts: list[str] = [ @@ -411,7 +411,7 @@ class TestUserEdgeCases: "email": unique_email, "password": "test123", } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) # Should either reject or sanitize path traversal attempts # At minimum, should not allow actual file system access diff --git a/test/testcases/test_http_api/test_user_management/test_user_performance.py b/test/testcases/test_http_api/test_user_management/test_user_performance.py index 6b8f3d26d..15e16d025 100644 --- a/test/testcases/test_http_api/test_user_management/test_user_performance.py +++ b/test/testcases/test_http_api/test_user_management/test_user_performance.py @@ -35,7 +35,7 @@ class TestUserPerformance: @pytest.mark.p2 def test_list_users_performance_small_dataset( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test list_users performance with small dataset.""" # Create 20 users @@ -43,7 +43,7 @@ class TestUserPerformance: for i in range(20): unique_email: str = f"perf_small_{i}_{uuid.uuid4().hex[:4]}@example.com" res: dict[str, Any] = create_user( - WebApiAuth, + web_api_auth, { "nickname": f"user_{i}", "email": unique_email, @@ -55,7 +55,7 @@ class TestUserPerformance: # Test list performance without pagination start: float = time.time() - res: dict[str, Any] = list_users(WebApiAuth) + res: dict[str, Any] = list_users(web_api_auth) duration: float = time.time() - start assert res["code"] == 0, res @@ -65,14 +65,14 @@ class TestUserPerformance: @pytest.mark.p2 def test_list_users_pagination_performance( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test pagination performance with moderate dataset.""" # Create 50 users for i in range(50): unique_email: str = f"perf_test_{i}_{uuid.uuid4().hex[:4]}@example.com" create_user( - WebApiAuth, + web_api_auth, { "nickname": f"user_{i}", "email": unique_email, @@ -83,7 +83,7 @@ class TestUserPerformance: # Test pagination performance start: float = time.time() res: dict[str, Any] = list_users( - WebApiAuth, params={"page": 1, "page_size": 10} + web_api_auth, params={"page": 1, "page_size": 10} ) duration: float = time.time() - start @@ -95,7 +95,7 @@ class TestUserPerformance: @pytest.mark.p3 def test_concurrent_user_creation( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test concurrent user creation without conflicts.""" count: int = 20 @@ -103,7 +103,7 @@ class TestUserPerformance: def create_test_user(index: int) -> dict[str, Any]: unique_email: str = f"concurrent_{index}_{uuid.uuid4().hex[:8]}@example.com" return create_user( - WebApiAuth, + web_api_auth, { "nickname": f"user_{index}", "email": unique_email, @@ -136,7 +136,7 @@ class TestUserPerformance: @pytest.mark.p3 def test_user_creation_response_time( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test individual user creation response time.""" response_times: list[float] = [] @@ -145,7 +145,7 @@ class TestUserPerformance: unique_email: str = f"timing_{i}_{uuid.uuid4().hex[:8]}@example.com" start: float = time.time() res: dict[str, Any] = create_user( - WebApiAuth, + web_api_auth, { "nickname": f"user_{i}", "email": unique_email, @@ -172,7 +172,7 @@ class TestUserPerformance: @pytest.mark.p3 def test_sequential_vs_concurrent_creation_comparison( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Compare sequential vs concurrent user creation performance.""" count: int = 10 @@ -182,7 +182,7 @@ class TestUserPerformance: for i in range(count): unique_email: str = f"seq_{i}_{uuid.uuid4().hex[:8]}@example.com" create_user( - WebApiAuth, + web_api_auth, { "nickname": f"seq_user_{i}", "email": unique_email, @@ -195,7 +195,7 @@ class TestUserPerformance: def create_concurrent_user(index: int) -> dict[str, Any]: unique_email: str = f"conc_{index}_{uuid.uuid4().hex[:8]}@example.com" return create_user( - WebApiAuth, + web_api_auth, { "nickname": f"conc_user_{index}", "email": unique_email, @@ -231,7 +231,7 @@ class TestUserPerformance: @pytest.mark.p3 def test_pagination_consistency_under_load( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test pagination consistency during concurrent modifications.""" # Create initial set of users @@ -239,7 +239,7 @@ class TestUserPerformance: for i in range(initial_count): unique_email: str = f"pag_{i}_{uuid.uuid4().hex[:8]}@example.com" create_user( - WebApiAuth, + web_api_auth, { "nickname": f"user_{i}", "email": unique_email, @@ -249,13 +249,13 @@ class TestUserPerformance: # Test pagination while users are being created def paginate_users() -> dict[str, Any]: - return list_users(WebApiAuth, params={"page": 1, "page_size": 10}) + return list_users(web_api_auth, params={"page": 1, "page_size": 10}) def create_more_users() -> None: for i in range(5): unique_email: str = f"new_{i}_{uuid.uuid4().hex[:8]}@example.com" create_user( - WebApiAuth, + web_api_auth, { "nickname": f"new_user_{i}", "email": unique_email, @@ -284,14 +284,14 @@ class TestUserPerformance: @pytest.mark.p3 def test_memory_efficiency_large_list( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test memory efficiency when listing many users.""" # Create 100 users for i in range(100): unique_email: str = f"mem_{i}_{uuid.uuid4().hex[:8]}@example.com" create_user( - WebApiAuth, + web_api_auth, { "nickname": f"user_{i}", "email": unique_email, @@ -300,7 +300,7 @@ class TestUserPerformance: ) # List all users (without pagination) - res: dict[str, Any] = list_users(WebApiAuth) + res: dict[str, Any] = list_users(web_api_auth) assert res["code"] == 0, res # Should return results without memory issues @@ -311,7 +311,7 @@ class TestUserPerformance: @pytest.mark.p3 @pytest.mark.skip(reason="Stress test - run manually") def test_sustained_load( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test system stability under sustained load (manual run).""" duration_seconds: int = 60 # Run for 1 minute @@ -328,7 +328,7 @@ class TestUserPerformance: for i in range(requests_per_second): unique_email: str = f"load_{request_count}_{uuid.uuid4().hex[:8]}@example.com" res: dict[str, Any] = create_user( - WebApiAuth, + web_api_auth, { "nickname": f"user_{request_count}", "email": unique_email, @@ -362,7 +362,7 @@ class TestUserPerformance: @pytest.mark.p3 def test_large_payload_handling( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test handling of large request payloads.""" # Create user with large nickname (but within limits) @@ -371,7 +371,7 @@ class TestUserPerformance: start: float = time.time() res: dict[str, Any] = create_user( - WebApiAuth, + web_api_auth, { "nickname": large_nickname, "email": unique_email, diff --git a/test/testcases/test_http_api/test_user_management/test_user_security.py b/test/testcases/test_http_api/test_user_management/test_user_security.py index ab171cb5b..45e97186b 100644 --- a/test/testcases/test_http_api/test_user_management/test_user_security.py +++ b/test/testcases/test_http_api/test_user_management/test_user_security.py @@ -43,7 +43,7 @@ class TestUserSecurity: ], ) def test_sql_injection_in_email( - self, WebApiAuth: RAGFlowWebApiAuth, malicious_email: str + self, web_api_auth: RAGFlowWebApiAuth, malicious_email: str ) -> None: """Test SQL injection attempts in email field are properly handled.""" payload: dict[str, str] = { @@ -51,7 +51,7 @@ class TestUserSecurity: "email": malicious_email, "password": "test123", } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) # Should fail validation, not execute SQL assert res["code"] != 0, ( f"SQL injection attempt should be rejected: {malicious_email}" @@ -70,7 +70,7 @@ class TestUserSecurity: ], ) def test_xss_in_nickname( - self, WebApiAuth: RAGFlowWebApiAuth, xss_payload: str + self, web_api_auth: RAGFlowWebApiAuth, xss_payload: str ) -> None: """Test XSS attempts in nickname field are sanitized.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -79,7 +79,7 @@ class TestUserSecurity: "email": unique_email, "password": "test123", } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) if res["code"] == 0: # Nickname should be sanitized nickname: str = res["data"]["nickname"] @@ -95,7 +95,7 @@ class TestUserSecurity: @pytest.mark.p1 def test_password_not_in_response( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Ensure plain password never appears in response.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -105,7 +105,7 @@ class TestUserSecurity: "email": unique_email, "password": password, } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) assert res["code"] == 0, res # Response should contain hashed password, not plain text response_str: str = str(res["data"]) @@ -134,7 +134,7 @@ class TestUserSecurity: ], ) def test_weak_password_handling( - self, WebApiAuth: RAGFlowWebApiAuth, weak_password: str + self, web_api_auth: RAGFlowWebApiAuth, weak_password: str ) -> None: """Test handling of weak passwords.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -143,7 +143,7 @@ class TestUserSecurity: "email": unique_email, "password": weak_password, } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) # Should reject empty/whitespace passwords at minimum if not weak_password or not weak_password.strip(): assert res["code"] != 0, ( @@ -152,10 +152,10 @@ class TestUserSecurity: @pytest.mark.p2 def test_unauthorized_superuser_creation( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test that regular users cannot escalate privileges.""" - # Note: This test assumes WebApiAuth represents a regular user + # Note: This test assumes web_api_auth represents a regular user # In production, only admins should be able to create superusers unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" payload: dict[str, Any] = { @@ -164,7 +164,7 @@ class TestUserSecurity: "password": "test123", "is_superuser": True, } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) # The API currently allows this, but in production this should be restricted # For now, we document the expected behavior @@ -175,7 +175,7 @@ class TestUserSecurity: @pytest.mark.p2 def test_password_hashing_is_secure( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Verify passwords are hashed using secure algorithm.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -185,7 +185,7 @@ class TestUserSecurity: "email": unique_email, "password": password, } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) assert res["code"] == 0, res # Check password is hashed @@ -213,7 +213,7 @@ class TestUserSecurity: ], ) def test_control_character_injection( - self, WebApiAuth: RAGFlowWebApiAuth, injection_attempt: dict[str, str] + self, web_api_auth: RAGFlowWebApiAuth, injection_attempt: dict[str, str] ) -> None: """Test protection against control character injection.""" # Complete the payload with required fields @@ -224,7 +224,7 @@ class TestUserSecurity: } payload.update(injection_attempt) - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) # Should either reject or sanitize control characters if res["code"] == 0: @@ -242,7 +242,7 @@ class TestUserSecurity: @pytest.mark.p3 def test_session_token_security( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test that access tokens are properly secured.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -251,7 +251,7 @@ class TestUserSecurity: "email": unique_email, "password": "test123", } - res: dict[str, Any] = create_user(WebApiAuth, payload) + res: dict[str, Any] = create_user(web_api_auth, payload) assert res["code"] == 0, res # Check that access_token exists and is properly formatted @@ -267,7 +267,7 @@ class TestUserSecurity: @pytest.mark.p3 def test_email_case_sensitivity( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test email uniqueness is case-insensitive.""" base_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -278,7 +278,7 @@ class TestUserSecurity: "email": base_email.lower(), "password": "test123", } - res1: dict[str, Any] = create_user(WebApiAuth, payload1) + res1: dict[str, Any] = create_user(web_api_auth, payload1) assert res1["code"] == 0, res1 # Try to create another user with uppercase version of same email @@ -287,7 +287,7 @@ class TestUserSecurity: "email": base_email.upper(), "password": "test123", } - res2: dict[str, Any] = create_user(WebApiAuth, payload2) + res2: dict[str, Any] = create_user(web_api_auth, payload2) # Should reject duplicate email regardless of case # Note: Current implementation may allow this, but it should be fixed @@ -296,7 +296,7 @@ class TestUserSecurity: @pytest.mark.p3 def test_concurrent_user_creation_same_email( - self, WebApiAuth: RAGFlowWebApiAuth + self, web_api_auth: RAGFlowWebApiAuth ) -> None: """Test race condition protection for duplicate emails.""" import concurrent.futures @@ -305,7 +305,7 @@ class TestUserSecurity: def create_with_email(index: int) -> dict[str, Any]: return create_user( - WebApiAuth, + web_api_auth, { "nickname": f"test{index}", "email": email,