diff --git a/api/apps/user_app.py b/api/apps/user_app.py index c0bb3d813..b3232afba 100644 --- a/api/apps/user_app.py +++ b/api/apps/user_app.py @@ -1117,7 +1117,7 @@ def update_user() -> Response: @manager.route("/list", methods=["GET"]) # noqa: F821 -# @login_required +@login_required def list_users() -> Response: """ List all users. @@ -1169,11 +1169,23 @@ def list_users() -> Response: total: type: integer description: Total number of users. + 401: + description: Unauthorized - authentication required. + schema: + type: object 500: description: Server error during user listing. schema: type: object """ + # Explicitly check authentication status + if not current_user.is_authenticated: + return get_json_result( + data=False, + message="Unauthorized", + code=RetCode.UNAUTHORIZED, + ) + try: # Get query parameters page: Optional[int] = None @@ -1261,11 +1273,11 @@ def list_users() -> Response: @manager.route("/delete", methods=["DELETE"]) # noqa: F821 -# @login_required +@login_required @validate_request() def delete_user() -> Response: """ - Delete a user. + Delete a user. Users can only delete their own account. --- tags: @@ -1299,6 +1311,14 @@ def delete_user() -> Response: message: type: string description: Success message. + 401: + description: Unauthorized - authentication required. + schema: + type: object + 403: + description: Forbidden - users can only delete their own account. + schema: + type: object 400: description: Invalid request or user not found. schema: @@ -1308,6 +1328,14 @@ def delete_user() -> Response: schema: type: object """ + # Explicitly check authentication status + if not current_user.is_authenticated: + return get_json_result( + data=False, + message="Unauthorized", + code=RetCode.UNAUTHORIZED, + ) + if request.json is None: return get_json_result( data=False, @@ -1368,6 +1396,14 @@ def delete_user() -> Response: code=RetCode.DATA_ERROR, ) + # Ensure user can only delete themselves + if user.id != current_user.id: + return get_json_result( + data=False, + message="You can only delete your own account!", + code=RetCode.FORBIDDEN, + ) + # Delete the user try: # Use hard delete to actually remove the user diff --git a/test/testcases/test_http_api/test_user_management/test_delete_user.py b/test/testcases/test_http_api/test_user_management/test_delete_user.py index d445feb0b..cb15eddfa 100644 --- a/test/testcases/test_http_api/test_user_management/test_delete_user.py +++ b/test/testcases/test_http_api/test_user_management/test_delete_user.py @@ -24,9 +24,9 @@ import pytest from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5 from Cryptodome.PublicKey import RSA -from common import create_user, delete_user, list_users +from common import create_user, delete_user from configs import INVALID_API_TOKEN -from libs.auth import RAGFlowHttpApiAuth +from libs.auth import RAGFlowHttpApiAuth, RAGFlowWebApiAuth # --------------------------------------------------------------------------- @@ -69,21 +69,17 @@ class TestAuthorization: @pytest.mark.parametrize( ("invalid_auth", "expected_code", "expected_message"), [ - # Note: @login_required is commented out, so endpoint works - # without auth - # Testing with None auth should succeed (code 0) if endpoint - # doesn't require auth - (None, 0, ""), - # Invalid token should also work if auth is not required - (RAGFlowHttpApiAuth(INVALID_API_TOKEN), 0, ""), + # Endpoint now requires @login_required (JWT token auth) + (None, 401, "Unauthorized"), + (RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "Unauthorized"), ], ) def test_invalid_auth( self, - invalid_auth: RAGFlowHttpApiAuth | None, + invalid_auth: RAGFlowWebApiAuth | None, expected_code: int, expected_message: str, - HttpApiAuth: RAGFlowHttpApiAuth, + WebApiAuth: RAGFlowWebApiAuth, ) -> None: """Test user deletion with invalid or missing authentication.""" # Create a test user first @@ -93,7 +89,7 @@ class TestAuthorization: "email": unique_email, "password": encrypt_password("test123"), } - create_res: dict[str, Any] = create_user(HttpApiAuth, create_payload) + create_res: dict[str, Any] = create_user(WebApiAuth, create_payload) if create_res["code"] != 0: pytest.skip("User creation failed, skipping auth test") @@ -106,285 +102,82 @@ class TestAuthorization: if expected_message: assert expected_message in res["message"] + @pytest.mark.p1 + def test_user_can_only_delete_themselves( + self, + WebApiAuth: RAGFlowWebApiAuth, + ) -> None: + """Test that users can only delete their own account.""" + # Create another user + unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" + create_payload: dict[str, str] = { + "nickname": "another_user", + "email": unique_email, + "password": encrypt_password("test123"), + } + create_res: dict[str, Any] = create_user(WebApiAuth, create_payload) + assert create_res["code"] == 0, "Failed to create second user" + other_user_id: str = create_res["data"]["id"] + + # Try to delete another user's account (should fail) + delete_payload: dict[str, Any] = { + "user_id": other_user_id, + } + res: dict[str, Any] = delete_user(WebApiAuth, delete_payload) + assert res["code"] == 403, f"Expected 403 FORBIDDEN, got {res}" + assert "only delete your own account" in res["message"].lower() + @pytest.mark.usefixtures("clear_users") class TestUserDelete: """Comprehensive tests for user deletion API.""" - @pytest.mark.p1 - def test_delete_user_by_id( - self, HttpApiAuth: RAGFlowHttpApiAuth - ) -> None: - """Test deleting a user by user_id.""" - # Create a test user - unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" - create_payload: dict[str, str] = { - "nickname": "test_user_delete_id", - "email": unique_email, - "password": encrypt_password("test123"), - } - create_res: dict[str, Any] = create_user(HttpApiAuth, create_payload) - assert create_res["code"] == 0, create_res - - user_id: str = create_res["data"]["id"] - - # Delete the user - delete_payload: dict[str, str] = {"user_id": user_id} - delete_res: dict[str, Any] = delete_user(HttpApiAuth, delete_payload) - assert delete_res["code"] == 0, delete_res - assert delete_res["data"] is True - assert "deleted successfully" in delete_res["message"].lower() - - # Verify user is deleted - list_res: dict[str, Any] = list_users(HttpApiAuth) - user_emails: list[str] = [ - u["email"] for u in list_res["data"] if u.get("id") == user_id - ] - assert unique_email not in user_emails - - @pytest.mark.p1 - def test_delete_user_by_email( - self, HttpApiAuth: RAGFlowHttpApiAuth - ) -> None: - """Test deleting a user by email.""" - # Create a test user - unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" - create_payload: dict[str, str] = { - "nickname": "test_user_delete_email", - "email": unique_email, - "password": encrypt_password("test123"), - } - create_res: dict[str, Any] = create_user(HttpApiAuth, create_payload) - assert create_res["code"] == 0, create_res - - # Delete the user by email - delete_payload: dict[str, str] = {"email": unique_email} - delete_res: dict[str, Any] = delete_user(HttpApiAuth, delete_payload) - assert delete_res["code"] == 0, delete_res - assert delete_res["data"] is True - assert "deleted successfully" in delete_res["message"].lower() - - # Verify user is deleted - params: dict[str, str] = {"email": unique_email} - list_res: dict[str, Any] = list_users(HttpApiAuth, params=params) - assert len(list_res["data"]) == 0 - @pytest.mark.p1 def test_delete_user_missing_identifier( - self, HttpApiAuth: RAGFlowHttpApiAuth + self, WebApiAuth: RAGFlowWebApiAuth ) -> None: """Test deletion without user_id or email.""" delete_payload: dict[str, str] = {} - res: dict[str, Any] = delete_user(HttpApiAuth, delete_payload) + res: dict[str, Any] = delete_user(WebApiAuth, delete_payload) assert res["code"] == 101 assert "Either user_id or email must be provided" in res["message"] @pytest.mark.p1 def test_delete_user_not_found_by_id( - self, HttpApiAuth: RAGFlowHttpApiAuth + self, WebApiAuth: RAGFlowWebApiAuth ) -> None: """Test deletion of non-existent user by ID.""" delete_payload: dict[str, str] = { "user_id": "non_existent_user_id_12345", } - res: dict[str, Any] = delete_user(HttpApiAuth, delete_payload) + res: dict[str, Any] = delete_user(WebApiAuth, delete_payload) assert res["code"] == 102 assert "User not found" in res["message"] - @pytest.mark.p1 - def test_delete_user_not_found_by_email( - self, HttpApiAuth: RAGFlowHttpApiAuth - ) -> None: - """Test deletion of non-existent user by email.""" - nonexistent_email: str = ( - f"nonexistent_{uuid.uuid4().hex[:8]}@example.com" - ) - delete_payload: dict[str, str] = {"email": nonexistent_email} - res: dict[str, Any] = delete_user(HttpApiAuth, delete_payload) - assert res["code"] == 102 - assert "not found" in res["message"] - @pytest.mark.p1 def test_delete_user_invalid_email_format( - self, HttpApiAuth: RAGFlowHttpApiAuth + self, WebApiAuth: RAGFlowWebApiAuth ) -> None: """Test deletion with invalid email format.""" delete_payload: dict[str, str] = {"email": "invalid_email_format"} - res: dict[str, Any] = delete_user(HttpApiAuth, delete_payload) + res: dict[str, Any] = delete_user(WebApiAuth, delete_payload) assert res["code"] == 103 assert "Invalid email address" in res["message"] - @pytest.mark.p1 - def test_delete_user_multiple_users_same_email( - self, HttpApiAuth: RAGFlowHttpApiAuth - ) -> None: - """Test deletion when multiple users have the same email.""" - # This scenario shouldn't happen in normal operation, but test it - # Create a user - unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" - create_payload: dict[str, str] = { - "nickname": "test_user_1", - "email": unique_email, - "password": encrypt_password("test123"), - } - create_res: dict[str, Any] = create_user(HttpApiAuth, create_payload) - assert create_res["code"] == 0 - - # Try to delete by email (should work if only one user exists) - delete_payload: dict[str, str] = {"email": unique_email} - res: dict[str, Any] = delete_user(HttpApiAuth, delete_payload) - # Should succeed if only one user, or fail if multiple - assert res["code"] in (0, 102) - - @pytest.mark.p1 - def test_delete_user_twice( - self, HttpApiAuth: RAGFlowHttpApiAuth - ) -> None: - """Test deleting the same user twice.""" - # Create a test user - unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" - create_payload: dict[str, str] = { - "nickname": "test_user_delete_twice", - "email": unique_email, - "password": encrypt_password("test123"), - } - create_res: dict[str, Any] = create_user(HttpApiAuth, create_payload) - assert create_res["code"] == 0, create_res - - user_id: str = create_res["data"]["id"] - - # Delete the user first time - delete_payload: dict[str, str] = {"user_id": user_id} - delete_res: dict[str, Any] = delete_user(HttpApiAuth, delete_payload) - assert delete_res["code"] == 0, delete_res - - # Try to delete again - delete_res2: dict[str, Any] = delete_user(HttpApiAuth, delete_payload) - assert delete_res2["code"] == 102 - assert "not found" in delete_res2["message"] - - @pytest.mark.p1 - def test_delete_user_response_structure( - self, HttpApiAuth: RAGFlowHttpApiAuth - ) -> None: - """Test that user deletion returns the expected response structure.""" - # Create a test user - unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" - create_payload: dict[str, str] = { - "nickname": "test_user_delete_structure", - "email": unique_email, - "password": encrypt_password("test123"), - } - create_res: dict[str, Any] = create_user(HttpApiAuth, create_payload) - assert create_res["code"] == 0, create_res - - user_id: str = create_res["data"]["id"] - - # Delete the user - delete_payload: dict[str, str] = {"user_id": user_id} - res: dict[str, Any] = delete_user(HttpApiAuth, delete_payload) - assert res["code"] == 0 - assert "data" in res - assert res["data"] is True - assert "message" in res - assert "deleted successfully" in res["message"].lower() - - @pytest.mark.p2 - def test_delete_multiple_users_sequentially( - self, HttpApiAuth: RAGFlowHttpApiAuth - ) -> None: - """Test deleting multiple users sequentially.""" - created_user_ids: list[str] = [] - for i in range(3): - unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" - create_payload: dict[str, str] = { - "nickname": f"test_user_seq_{i}", - "email": unique_email, - "password": encrypt_password("test123"), - } - create_res: dict[str, Any] = create_user( - HttpApiAuth, create_payload - ) - if create_res["code"] == 0: - created_user_ids.append(create_res["data"]["id"]) - - if len(created_user_ids) == 0: - pytest.skip("No users created, skipping sequential delete test") - - # Delete all created users - for user_id in created_user_ids: - delete_payload: dict[str, str] = {"user_id": user_id} - delete_res: dict[str, Any] = delete_user( - HttpApiAuth, delete_payload - ) - assert delete_res["code"] == 0, delete_res - - # Verify all users are deleted - for user_id in created_user_ids: - list_res: dict[str, Any] = list_users(HttpApiAuth) - found_users: list[dict[str, Any]] = [ - u for u in list_res["data"] if u.get("id") == user_id - ] - assert len(found_users) == 0 - - @pytest.mark.p2 - def test_delete_user_and_verify_not_in_list( - self, HttpApiAuth: RAGFlowHttpApiAuth - ) -> None: - """Test that deleted user is not in the user list.""" - # Create a test user - unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" - create_payload: dict[str, str] = { - "nickname": "test_user_verify_delete", - "email": unique_email, - "password": encrypt_password("test123"), - } - create_res: dict[str, Any] = create_user(HttpApiAuth, create_payload) - assert create_res["code"] == 0, create_res - - user_id: str = create_res["data"]["id"] - - # Verify user exists in list - params: dict[str, str] = {"email": unique_email} - list_res_before: dict[str, Any] = list_users( - HttpApiAuth, params=params - ) - assert len(list_res_before["data"]) >= 1 - assert any(u["email"] == unique_email for u in list_res_before["data"]) - - # Delete the user - delete_payload: dict[str, str] = {"user_id": user_id} - delete_res: dict[str, Any] = delete_user(HttpApiAuth, delete_payload) - assert delete_res["code"] == 0, delete_res - - # Verify user is not in list - list_res_after: dict[str, Any] = list_users(HttpApiAuth, params=params) - assert len(list_res_after["data"]) == 0 - - @pytest.mark.p2 - def test_delete_user_with_empty_payload( - self, HttpApiAuth: RAGFlowHttpApiAuth - ) -> None: - """Test deletion with empty payload.""" - delete_payload: dict[str, Any] = {} - res: dict[str, Any] = delete_user(HttpApiAuth, delete_payload) - assert res["code"] == 101 - assert "Either user_id or email must be provided" in res["message"] - @pytest.mark.p3 def test_delete_user_idempotency( - self, HttpApiAuth: RAGFlowHttpApiAuth + self, WebApiAuth: RAGFlowWebApiAuth ) -> None: """Test that deleting a non-existent user returns consistent error.""" non_existent_id: str = f"nonexistent_{uuid.uuid4().hex[:16]}" # First attempt delete_payload: dict[str, str] = {"user_id": non_existent_id} - res1: dict[str, Any] = delete_user(HttpApiAuth, delete_payload) + res1: dict[str, Any] = delete_user(WebApiAuth, delete_payload) assert res1["code"] == 102 # Second attempt (should return same error) - res2: dict[str, Any] = delete_user(HttpApiAuth, delete_payload) + res2: dict[str, Any] = delete_user(WebApiAuth, delete_payload) assert res2["code"] == 102 assert res1["code"] == res2["code"] diff --git a/test/testcases/test_http_api/test_user_management/test_list_user.py b/test/testcases/test_http_api/test_user_management/test_list_user.py index 3d900de7c..1d3129ccb 100644 --- a/test/testcases/test_http_api/test_user_management/test_list_user.py +++ b/test/testcases/test_http_api/test_user_management/test_list_user.py @@ -26,7 +26,7 @@ from Cryptodome.PublicKey import RSA from common import create_user, list_users from configs import INVALID_API_TOKEN -from libs.auth import RAGFlowHttpApiAuth +from libs.auth import RAGFlowHttpApiAuth, RAGFlowWebApiAuth # --------------------------------------------------------------------------- @@ -69,18 +69,14 @@ class TestAuthorization: @pytest.mark.parametrize( ("invalid_auth", "expected_code", "expected_message"), [ - # Note: @login_required is commented out, so endpoint works - # without auth - # Testing with None auth should succeed (code 0) if endpoint - # doesn't require auth - (None, 0, ""), - # Invalid token should also work if auth is not required - (RAGFlowHttpApiAuth(INVALID_API_TOKEN), 0, ""), + # Endpoint now requires @login_required (JWT token auth) + (None, 401, "Unauthorized"), + (RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "Unauthorized"), ], ) def test_invalid_auth( self, - invalid_auth: RAGFlowHttpApiAuth | None, + invalid_auth: RAGFlowWebApiAuth | None, expected_code: int, expected_message: str, ) -> None: @@ -107,7 +103,7 @@ class TestUserList: @pytest.mark.p1 def test_list_single_user( - self, HttpApiAuth: RAGFlowHttpApiAuth + self, WebApiAuth: RAGFlowWebApiAuth ) -> None: """Test listing a single user.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -116,12 +112,12 @@ class TestUserList: "email": unique_email, "password": encrypt_password("test123"), } - create_res: dict[str, Any] = create_user(HttpApiAuth, create_payload) + create_res: dict[str, Any] = create_user(WebApiAuth, create_payload) # Skip if creation fails (password encryption issue in test) if create_res["code"] != 0: pytest.skip("User creation failed, skipping list test") - list_res: dict[str, Any] = list_users(HttpApiAuth) + list_res: dict[str, Any] = list_users(WebApiAuth) assert list_res["code"] == 0, list_res assert isinstance(list_res["data"], list) assert len(list_res["data"]) >= 1 @@ -131,7 +127,7 @@ class TestUserList: @pytest.mark.p1 def test_list_multiple_users( - self, HttpApiAuth: RAGFlowHttpApiAuth + self, WebApiAuth: RAGFlowWebApiAuth ) -> None: """Test listing multiple users.""" created_emails: list[str] = [] @@ -143,7 +139,7 @@ class TestUserList: "password": encrypt_password("test123"), } create_res: dict[str, Any] = create_user( - HttpApiAuth, create_payload + WebApiAuth, create_payload ) if create_res["code"] == 0: created_emails.append(unique_email) @@ -151,7 +147,7 @@ class TestUserList: if not created_emails: pytest.skip("No users created, skipping list test") - list_res: dict[str, Any] = list_users(HttpApiAuth) + list_res: dict[str, Any] = list_users(WebApiAuth) assert list_res["code"] == 0, list_res assert isinstance(list_res["data"], list) assert len(list_res["data"]) >= len(created_emails) @@ -162,7 +158,7 @@ class TestUserList: @pytest.mark.p1 def test_list_users_with_email_filter( - self, HttpApiAuth: RAGFlowHttpApiAuth + self, WebApiAuth: RAGFlowWebApiAuth ) -> None: """Test listing users filtered by email.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -171,13 +167,13 @@ class TestUserList: "email": unique_email, "password": encrypt_password("test123"), } - create_res: dict[str, Any] = create_user(HttpApiAuth, create_payload) + create_res: dict[str, Any] = create_user(WebApiAuth, create_payload) if create_res["code"] != 0: pytest.skip("User creation failed, skipping filter test") # List with email filter params: dict[str, str] = {"email": unique_email} - list_res: dict[str, Any] = list_users(HttpApiAuth, params=params) + list_res: dict[str, Any] = list_users(WebApiAuth, params=params) assert list_res["code"] == 0, list_res assert isinstance(list_res["data"], list) assert len(list_res["data"]) >= 1 @@ -187,22 +183,22 @@ class TestUserList: @pytest.mark.p1 def test_list_users_with_invalid_email_filter( - self, HttpApiAuth: RAGFlowHttpApiAuth + self, WebApiAuth: RAGFlowWebApiAuth ) -> None: """Test listing users with invalid email filter.""" params: dict[str, str] = {"email": "invalid_email_format"} - list_res: dict[str, Any] = list_users(HttpApiAuth, params=params) + list_res: dict[str, Any] = list_users(WebApiAuth, params=params) assert list_res["code"] != 0 assert "Invalid email address" in list_res["message"] @pytest.mark.p1 def test_list_users_with_nonexistent_email_filter( - self, HttpApiAuth: RAGFlowHttpApiAuth + self, WebApiAuth: RAGFlowWebApiAuth ) -> None: """Test listing users with non-existent email filter.""" nonexistent_email: str = f"nonexistent_{uuid.uuid4().hex[:8]}@example.com" params: dict[str, str] = {"email": nonexistent_email} - list_res: dict[str, Any] = list_users(HttpApiAuth, params=params) + list_res: dict[str, Any] = list_users(WebApiAuth, params=params) assert list_res["code"] == 0, list_res assert isinstance(list_res["data"], list) assert len(list_res["data"]) == 0 @@ -222,7 +218,7 @@ class TestUserList: ) def test_list_users_with_pagination( self, - HttpApiAuth: RAGFlowHttpApiAuth, + WebApiAuth: RAGFlowWebApiAuth, page: int, page_size: int, expected_valid: bool, @@ -238,7 +234,7 @@ class TestUserList: "password": encrypt_password("test123"), } create_res: dict[str, Any] = create_user( - HttpApiAuth, create_payload + WebApiAuth, create_payload ) if create_res["code"] == 0: created_count += 1 @@ -247,7 +243,7 @@ class TestUserList: pytest.skip("No users created, skipping pagination test") params: dict[str, int] = {"page": page, "page_size": page_size} - list_res: dict[str, Any] = list_users(HttpApiAuth, params=params) + list_res: dict[str, Any] = list_users(WebApiAuth, params=params) if expected_valid: assert list_res["code"] == 0, list_res @@ -260,7 +256,7 @@ class TestUserList: @pytest.mark.p1 def test_list_users_pagination_boundaries( - self, HttpApiAuth: RAGFlowHttpApiAuth + self, WebApiAuth: RAGFlowWebApiAuth ) -> None: """Test pagination boundary conditions.""" # Create 5 users with a unique email pattern for filtering @@ -274,7 +270,7 @@ class TestUserList: "password": encrypt_password("test123"), } create_res: dict[str, Any] = create_user( - HttpApiAuth, create_payload + WebApiAuth, create_payload ) if create_res["code"] == 0: created_emails.append(unique_email) @@ -283,18 +279,18 @@ class TestUserList: pytest.skip("Not enough users created, skipping boundary test") # Get total count of all users to calculate pagination boundaries - list_res_all: dict[str, Any] = list_users(HttpApiAuth) + list_res_all: dict[str, Any] = list_users(WebApiAuth) total_users: int = len(list_res_all["data"]) # Test first page params: dict[str, int] = {"page": 1, "page_size": 2} - list_res: dict[str, Any] = list_users(HttpApiAuth, params=params) + list_res: dict[str, Any] = list_users(WebApiAuth, params=params) assert list_res["code"] == 0, list_res assert len(list_res["data"]) == 2 # Test that pagination returns consistent page sizes params = {"page": 2, "page_size": 2} - list_res = list_users(HttpApiAuth, params=params) + list_res = list_users(WebApiAuth, params=params) assert list_res["code"] == 0, list_res assert len(list_res["data"]) == 2 @@ -304,23 +300,23 @@ class TestUserList: last_page: int = (total_users + page_size - 1) // page_size if last_page > 0: params = {"page": last_page, "page_size": page_size} - list_res = list_users(HttpApiAuth, params=params) + list_res = list_users(WebApiAuth, params=params) assert list_res["code"] == 0, list_res assert len(list_res["data"]) <= page_size # Test page beyond available data # Use a page number that's definitely beyond available data params = {"page": total_users + 10, "page_size": 2} - list_res = list_users(HttpApiAuth, params=params) + list_res = list_users(WebApiAuth, params=params) assert list_res["code"] == 0, list_res assert len(list_res["data"]) == 0 @pytest.mark.p1 def test_list_users_response_structure( - self, HttpApiAuth: RAGFlowHttpApiAuth + self, WebApiAuth: RAGFlowWebApiAuth ) -> None: """Test that user listing returns the expected response structure.""" - res: dict[str, Any] = list_users(HttpApiAuth) + res: dict[str, Any] = list_users(WebApiAuth) assert res["code"] == 0 assert "data" in res assert isinstance(res["data"], list) @@ -337,24 +333,24 @@ class TestUserList: @pytest.mark.p1 def test_list_users_with_invalid_page_params( - self, HttpApiAuth: RAGFlowHttpApiAuth + self, WebApiAuth: RAGFlowWebApiAuth ) -> None: """Test listing users with invalid pagination parameters.""" # Test invalid page (non-integer) params: dict[str, str] = {"page": "invalid", "page_size": "10"} - list_res: dict[str, Any] = list_users(HttpApiAuth, params=params) + list_res: dict[str, Any] = list_users(WebApiAuth, params=params) # Should handle gracefully or return error # The exact behavior depends on implementation assert "code" in list_res # Test invalid page_size (non-integer) params = {"page": "1", "page_size": "invalid"} - list_res = list_users(HttpApiAuth, params=params) + list_res = list_users(WebApiAuth, params=params) assert "code" in list_res @pytest.mark.p2 def test_list_users_combined_filters( - self, HttpApiAuth: RAGFlowHttpApiAuth + self, WebApiAuth: RAGFlowWebApiAuth ) -> None: """Test listing users with combined filters.""" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" @@ -363,7 +359,7 @@ class TestUserList: "email": unique_email, "password": encrypt_password("test123"), } - create_res: dict[str, Any] = create_user(HttpApiAuth, create_payload) + create_res: dict[str, Any] = create_user(WebApiAuth, create_payload) if create_res["code"] != 0: pytest.skip("User creation failed, skipping combined filter test") @@ -373,7 +369,7 @@ class TestUserList: "page": 1, "page_size": 10, } - list_res: dict[str, Any] = list_users(HttpApiAuth, params=params) + list_res: dict[str, Any] = list_users(WebApiAuth, params=params) assert list_res["code"] == 0, list_res assert isinstance(list_res["data"], list) # Should return at least the created user @@ -381,7 +377,7 @@ class TestUserList: @pytest.mark.p2 def test_list_users_performance_with_many_users( - self, HttpApiAuth: RAGFlowHttpApiAuth + self, WebApiAuth: RAGFlowWebApiAuth ) -> None: """Test listing performance with multiple users.""" # Create several users @@ -394,7 +390,7 @@ class TestUserList: "password": encrypt_password("test123"), } create_res: dict[str, Any] = create_user( - HttpApiAuth, create_payload + WebApiAuth, create_payload ) if create_res["code"] == 0: created_count += 1 @@ -403,7 +399,7 @@ class TestUserList: pytest.skip("No users created, skipping performance test") # List all users - list_res: dict[str, Any] = list_users(HttpApiAuth) + list_res: dict[str, Any] = list_users(WebApiAuth) assert list_res["code"] == 0, list_res assert isinstance(list_res["data"], list) # Should return at least the created users