From a80b2ff529c99c1c4484a05c08f8a307826acf64 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 13 Nov 2025 18:22:37 +0000 Subject: [PATCH] Add comprehensive test coverage: security, performance, edge cases, and advanced team tests Co-authored-by: teddius <890232+teddius@users.noreply.github.com> --- .../test_team_advanced.py | 337 ++++++++++++++ .../test_user_edge_cases.py | 422 ++++++++++++++++++ .../test_user_performance.py | 392 ++++++++++++++++ .../test_user_security.py | 342 ++++++++++++++ 4 files changed, 1493 insertions(+) create mode 100644 test/testcases/test_http_api/test_team_management/test_team_advanced.py create mode 100644 test/testcases/test_http_api/test_user_management/test_user_edge_cases.py create mode 100644 test/testcases/test_http_api/test_user_management/test_user_performance.py create mode 100644 test/testcases/test_http_api/test_user_management/test_user_security.py diff --git a/test/testcases/test_http_api/test_team_management/test_team_advanced.py b/test/testcases/test_http_api/test_team_management/test_team_advanced.py new file mode 100644 index 000000000..a14d3257d --- /dev/null +++ b/test/testcases/test_http_api/test_team_management/test_team_advanced.py @@ -0,0 +1,337 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Advanced team management tests.""" + +from __future__ import annotations + +import uuid +from typing import Any + +import pytest + +from common import create_team, create_user +from libs.auth import RAGFlowWebApiAuth + + +@pytest.mark.p2 +class TestTeamAdvanced: + """Advanced team management tests.""" + + def test_team_creation_with_custom_models( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test creating team with custom model configurations.""" + team_name: str = f"Custom Models Team {uuid.uuid4().hex[:8]}" + + # Attempt to create team with custom models + # Note: Model IDs need to be valid and added by the user + team_payload: dict[str, str] = { + "name": team_name, + # These would need to be actual model IDs added by the user + # "llm_id": "custom_llm_id", + # "embd_id": "custom_embd_id", + } + res: dict[str, Any] = create_team(WebApiAuth, team_payload) + + # Should succeed with defaults if custom models not specified + assert res["code"] == 0, res + assert res["data"]["name"] == team_name + assert "id" in res["data"] + + def test_team_creation_response_structure( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test that team creation returns complete response structure.""" + team_name: str = f"Structure Test Team {uuid.uuid4().hex[:8]}" + team_payload: dict[str, str] = {"name": team_name} + res: dict[str, Any] = create_team(WebApiAuth, team_payload) + + assert res["code"] == 0, res + assert "data" in res + + # Check required fields + required_fields: list[str] = ["id", "name", "owner_id"] + for field in required_fields: + assert field in res["data"], ( + f"Missing required field: {field}" + ) + + assert res["data"]["name"] == team_name + assert len(res["data"]["id"]) > 0, "Team ID should not be empty" + assert len(res["data"]["owner_id"]) > 0, "Owner ID should not be empty" + + def test_multiple_teams_same_name_allowed( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test that multiple teams can have the same name.""" + team_name: str = f"Duplicate Name {uuid.uuid4().hex[:8]}" + + # Create first team + res1: dict[str, Any] = create_team(WebApiAuth, {"name": team_name}) + assert res1["code"] == 0, res1 + team_id_1: str = res1["data"]["id"] + + # Create second team with same name + res2: dict[str, Any] = create_team(WebApiAuth, {"name": team_name}) + assert res2["code"] == 0, res2 + team_id_2: str = res2["data"]["id"] + + # Teams should have different IDs + assert team_id_1 != team_id_2, "Teams should have unique IDs" + assert res1["data"]["name"] == res2["data"]["name"], ( + "Both teams should have the same name" + ) + + def test_team_creation_with_credit_limit( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test creating team with custom credit limit.""" + team_name: str = f"Credit Test Team {uuid.uuid4().hex[:8]}" + custom_credit: int = 1000 + + team_payload: dict[str, Any] = { + "name": team_name, + "credit": custom_credit, + } + res: dict[str, Any] = create_team(WebApiAuth, team_payload) + + # Should succeed + assert res["code"] == 0, res + # Note: Credit may not be in response, but should be set internally + + def test_team_name_with_special_characters( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test team names with special characters.""" + special_names: list[str] = [ + f"Team-{uuid.uuid4().hex[:4]}_Test!", + f"Team & Co. {uuid.uuid4().hex[:4]}", + f"Team @{uuid.uuid4().hex[:4]}", + f"团队{uuid.uuid4().hex[:4]}", # Unicode + ] + + for name in special_names: + res: dict[str, Any] = create_team(WebApiAuth, {"name": name}) + # Should either accept or reject with clear message + if res["code"] == 0: + assert res["data"]["name"] == name, ( + f"Team name should be preserved: {name}" + ) + # If rejected, should have clear error + # (Current implementation accepts special chars) + + def test_team_creation_default_owner( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test that team creator is set as owner by default.""" + team_name: str = f"Owner Test Team {uuid.uuid4().hex[:8]}" + res: dict[str, Any] = create_team(WebApiAuth, {"name": team_name}) + + assert res["code"] == 0, res + assert "owner_id" in res["data"], "Owner ID should be in response" + # Owner should be the authenticated user + # (Cannot verify without knowing WebApiAuth user ID) + + def test_concurrent_team_creation( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test concurrent team creation.""" + import concurrent.futures + + def create_test_team(index: int) -> dict[str, Any]: + team_name: str = f"Concurrent Team {index}_{uuid.uuid4().hex[:8]}" + return create_team(WebApiAuth, {"name": team_name}) + + # Create 10 teams concurrently + count: int = 10 + with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: + futures: list[concurrent.futures.Future[dict[str, Any]]] = [ + executor.submit(create_test_team, i) for i in range(count) + ] + results: list[dict[str, Any]] = [ + f.result() for f in concurrent.futures.as_completed(futures) + ] + + # All should succeed + success_count: int = sum(1 for r in results if r["code"] == 0) + assert success_count == count, ( + f"Expected {count} successful team creations, got {success_count}" + ) + + # All should have unique IDs + team_ids: list[str] = [r["data"]["id"] for r in results if r["code"] == 0] + assert len(team_ids) == len(set(team_ids)), ( + "All team IDs should be unique" + ) + + def test_team_with_invalid_model_id( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test team creation with invalid model ID.""" + team_name: str = f"Invalid Model Team {uuid.uuid4().hex[:8]}" + team_payload: dict[str, str] = { + "name": team_name, + "llm_id": "invalid_nonexistent_model_id_12345", + } + res: dict[str, Any] = create_team(WebApiAuth, team_payload) + + # Should reject with clear error message + assert res["code"] != 0, "Invalid model ID should be rejected" + assert ( + "model" in res["message"].lower() + or "not found" in res["message"].lower() + or "invalid" in res["message"].lower() + ), "Error message should mention model issue" + + def test_team_creation_with_negative_credit( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test team creation with negative credit.""" + team_name: str = f"Negative Credit Team {uuid.uuid4().hex[:8]}" + team_payload: dict[str, Any] = { + "name": team_name, + "credit": -100, + } + res: dict[str, Any] = create_team(WebApiAuth, team_payload) + + # Should reject negative credit + assert res["code"] != 0, "Negative credit should be rejected" + assert "credit" in res["message"].lower(), ( + "Error message should mention credit" + ) + + def test_team_creation_empty_json_payload( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test team creation with completely empty payload.""" + res: dict[str, Any] = create_team(WebApiAuth, {}) + + # Should reject with clear error + assert res["code"] != 0, "Empty payload should be rejected" + assert ( + "name" in res["message"].lower() + or "required" in res["message"].lower() + ), "Error should mention missing name" + + def test_team_unicode_name( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test team creation with full unicode name.""" + unicode_names: list[str] = [ + f"团队{uuid.uuid4().hex[:4]}", # Chinese + f"チーム{uuid.uuid4().hex[:4]}", # Japanese + f"Команда{uuid.uuid4().hex[:4]}", # Russian + f"فريق{uuid.uuid4().hex[:4]}", # Arabic (RTL) + f"😀🎉{uuid.uuid4().hex[:4]}", # Emoji + ] + + for name in unicode_names: + res: dict[str, Any] = create_team(WebApiAuth, {"name": name}) + + # Should handle unicode properly + if res["code"] == 0: + # Verify unicode is preserved (may be normalized) + assert len(res["data"]["name"]) > 0, ( + "Team name should not be empty after unicode" + ) + + @pytest.mark.p3 + def test_team_creation_with_all_optional_params( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test team creation with all optional parameters.""" + team_name: str = f"Full Params Team {uuid.uuid4().hex[:8]}" + team_payload: dict[str, Any] = { + "name": team_name, + "credit": 2000, + # Note: Model IDs would need to be valid + # "llm_id": "valid_llm_id", + # "embd_id": "valid_embd_id", + # "asr_id": "valid_asr_id", + # "parser_ids": "valid_parser_ids", + # "img2txt_id": "valid_img2txt_id", + # "rerank_id": "valid_rerank_id", + } + res: dict[str, Any] = create_team(WebApiAuth, team_payload) + + # Should succeed + assert res["code"] == 0, res + assert res["data"]["name"] == team_name + + @pytest.mark.p3 + def test_team_max_name_length( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test team with maximum allowed name length.""" + # API spec says max 100 characters + max_name: str = "A" * 100 + res: dict[str, Any] = create_team(WebApiAuth, {"name": max_name}) + + # Should accept 100 characters + assert res["code"] == 0, "100-character name should be accepted" + assert res["data"]["name"] == max_name + + @pytest.mark.p3 + def test_team_name_just_over_limit( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test team with name just over limit.""" + # 101 characters (1 over limit) + long_name: str = "A" * 101 + res: dict[str, Any] = create_team(WebApiAuth, {"name": long_name}) + + # Should reject + assert res["code"] != 0, "101-character name should be rejected" + assert ( + "100" in res["message"] + or "length" in res["message"].lower() + or "long" in res["message"].lower() + ), "Error should mention length limit" + + @pytest.mark.p3 + def test_team_creation_idempotency( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test that repeated team creation creates separate teams.""" + team_name: str = f"Idempotency Test {uuid.uuid4().hex[:8]}" + payload: dict[str, str] = {"name": team_name} + + # Create same team twice + res1: dict[str, Any] = create_team(WebApiAuth, payload) + res2: dict[str, Any] = create_team(WebApiAuth, payload) + + # Both should succeed and create different teams + assert res1["code"] == 0, res1 + assert res2["code"] == 0, res2 + assert res1["data"]["id"] != res2["data"]["id"], ( + "Should create different teams, not be idempotent" + ) + + @pytest.mark.p3 + def test_team_with_parser_ids( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test team creation with custom parser IDs.""" + team_name: str = f"Parser Test {uuid.uuid4().hex[:8]}" + # parser_ids is typically a comma-separated string + team_payload: dict[str, str] = { + "name": team_name, + "parser_ids": "naive,qa,table,paper,book,laws,presentation,manual,wiki", + } + res: dict[str, Any] = create_team(WebApiAuth, team_payload) + + # Should accept valid parser IDs + assert res["code"] == 0, res diff --git a/test/testcases/test_http_api/test_user_management/test_user_edge_cases.py b/test/testcases/test_http_api/test_user_management/test_user_edge_cases.py new file mode 100644 index 000000000..4b8486ab5 --- /dev/null +++ b/test/testcases/test_http_api/test_user_management/test_user_edge_cases.py @@ -0,0 +1,422 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Edge case tests for user management APIs.""" + +from __future__ import annotations + +import uuid +from typing import Any + +import pytest + +from common import create_user, list_users +from libs.auth import RAGFlowWebApiAuth + + +@pytest.mark.edge_cases +@pytest.mark.usefixtures("clear_users") +class TestUserEdgeCases: + """Edge case tests for user management.""" + + @pytest.mark.p2 + def test_extremely_long_nickname( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test nickname with extreme length.""" + long_nickname: str = "A" * 1000 + unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" + payload: dict[str, str] = { + "nickname": long_nickname, + "email": unique_email, + "password": "test123", + } + res: dict[str, Any] = create_user(WebApiAuth, payload) + + # Should either accept (and possibly truncate) or reject with clear error + if res["code"] == 0: + # If accepted, nickname should be truncated to reasonable length + assert len(res["data"]["nickname"]) <= 255, ( + "Nickname should be truncated to max length" + ) + else: + # If rejected, should have clear error message + assert ( + "length" in res["message"].lower() + or "long" in res["message"].lower() + or "characters" in res["message"].lower() + ), "Error message should mention length issue" + + @pytest.mark.p2 + def test_unicode_in_all_fields( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test unicode characters in various fields.""" + # Use ASCII-safe email local part (unicode in display name) + local_part: str = f"test_{uuid.uuid4().hex[:8]}" + unique_email: str = f"{local_part}@example.com" + payload: dict[str, str] = { + "nickname": "用户名测试", # Chinese characters + "email": unique_email, + "password": "密码123!", # Chinese + ASCII + } + res: dict[str, Any] = create_user(WebApiAuth, payload) + + # Should handle unicode properly + if res["code"] == 0: + assert res["data"]["nickname"] == "用户名测试", ( + "Unicode nickname should be preserved" + ) + assert res["data"]["email"] == unique_email + + @pytest.mark.p2 + def test_emoji_in_nickname( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test emoji characters in nickname.""" + nickname_with_emoji: str = "Test User 😀🎉🔥" + unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" + payload: dict[str, str] = { + "nickname": nickname_with_emoji, + "email": unique_email, + "password": "test123", + } + res: dict[str, Any] = create_user(WebApiAuth, payload) + + # Should handle emoji properly (either accept or reject gracefully) + if res["code"] == 0: + # Emoji should be preserved + assert "😀" in res["data"]["nickname"] or "?" in res["data"]["nickname"], ( + "Emoji should be preserved or replaced with placeholder" + ) + + @pytest.mark.p2 + @pytest.mark.parametrize( + "special_email", + [ + f"user+tag_{uuid.uuid4().hex[:4]}@example.com", + f"user.name_{uuid.uuid4().hex[:4]}@example.com", + f"user_name_{uuid.uuid4().hex[:4]}@example.com", + f"user-name_{uuid.uuid4().hex[:4]}@example.com", + f"123_{uuid.uuid4().hex[:4]}@example.com", # Starts with number + ], + ) + def test_special_characters_in_email( + self, WebApiAuth: RAGFlowWebApiAuth, special_email: str + ) -> None: + """Test various special characters in email.""" + payload: dict[str, str] = { + "nickname": "test", + "email": special_email, + "password": "test123", + } + res: dict[str, Any] = create_user(WebApiAuth, payload) + assert res["code"] == 0, f"Failed for email: {special_email}, {res}" + assert res["data"]["email"] == special_email + + @pytest.mark.p2 + def test_whitespace_handling_in_fields( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test whitespace handling in various fields.""" + unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" + payload: dict[str, str] = { + "nickname": " test user ", # Leading/trailing spaces + "email": f" {unique_email} ", # Spaces around email + "password": "test123", + } + res: dict[str, Any] = create_user(WebApiAuth, payload) + + # Should handle whitespace (trim or accept) + if res["code"] == 0: + # Email should be trimmed + assert res["data"]["email"] == unique_email, ( + "Email should have whitespace trimmed" + ) + # Nickname whitespace handling is flexible + nickname: str = res["data"]["nickname"] + assert nickname.strip() != "", "Nickname should not be only whitespace" + + @pytest.mark.p2 + def test_null_byte_in_input( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test null byte injection in input fields.""" + unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" + payload: dict[str, str] = { + "nickname": "test\x00admin", # Null byte in nickname + "email": unique_email, + "password": "test123", + } + res: dict[str, Any] = create_user(WebApiAuth, payload) + + # Should handle or reject null bytes + if res["code"] == 0: + # Null byte should be removed or replaced + assert "\x00" not in res["data"]["nickname"], ( + "Null byte should be sanitized" + ) + + @pytest.mark.p2 + def test_very_long_email( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test email exceeding typical length limits.""" + # Create email with very long local part (250 chars) + long_local: str = "a" * 250 + email: str = f"{long_local}@example.com" + payload: dict[str, str] = { + "nickname": "test", + "email": email, + "password": "test123", + } + res: dict[str, Any] = create_user(WebApiAuth, payload) + + # Should reject overly long emails (RFC 5321 limits local part to 64 chars) + assert res["code"] != 0, "Very long email should be rejected" + assert ( + "invalid" in res["message"].lower() + or "long" in res["message"].lower() + ), "Error should mention invalid/long email" + + @pytest.mark.p2 + def test_email_with_multiple_at_signs( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test email with multiple @ signs.""" + payload: dict[str, str] = { + "nickname": "test", + "email": "user@@example.com", + "password": "test123", + } + res: dict[str, Any] = create_user(WebApiAuth, payload) + + # Should reject invalid email format + assert res["code"] != 0, "Email with multiple @ should be rejected" + assert "invalid" in res["message"].lower() + + @pytest.mark.p2 + def test_email_with_spaces( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test email containing spaces.""" + payload: dict[str, str] = { + "nickname": "test", + "email": "user name@example.com", + "password": "test123", + } + res: dict[str, Any] = create_user(WebApiAuth, payload) + + # Should reject email with spaces + assert res["code"] != 0, "Email with spaces should be rejected" + assert "invalid" in res["message"].lower() + + @pytest.mark.p3 + def test_leading_trailing_dots_in_email( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test email with leading/trailing dots.""" + invalid_emails: list[str] = [ + ".user@example.com", # Leading dot + "user.@example.com", # Trailing dot + "user..name@example.com", # Consecutive dots + ] + + for email in invalid_emails: + payload: dict[str, str] = { + "nickname": "test", + "email": email, + "password": "test123", + } + res: dict[str, Any] = create_user(WebApiAuth, payload) + + # These should be rejected as invalid + assert res["code"] != 0, f"Invalid email should be rejected: {email}" + + @pytest.mark.p3 + def test_empty_string_vs_none_in_optional_fields( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test difference between empty string and None for optional fields.""" + unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" + + # Test with empty string nickname (should be accepted) + payload_empty: dict[str, str] = { + "nickname": "", + "email": unique_email, + "password": "test123", + } + res_empty: dict[str, Any] = create_user(WebApiAuth, payload_empty) + + # Empty nickname should be accepted per current API behavior + assert res_empty["code"] == 0, "Empty nickname should be accepted" + assert res_empty["data"]["nickname"] == "" + + @pytest.mark.p3 + def test_pagination_with_no_results( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test list_users pagination when no users exist.""" + # Assuming clear_users fixture has cleared all test users + res: dict[str, Any] = list_users( + WebApiAuth, params={"page": 1, "page_size": 10} + ) + + # Should return empty list, not error + assert res["code"] == 0, "Empty result should not cause error" + # Data might be empty or contain only system users + assert isinstance(res["data"], list), "Should return list even if empty" + + @pytest.mark.p3 + def test_pagination_beyond_available_pages( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test requesting page beyond available data.""" + # Create one user + unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" + create_user( + WebApiAuth, + { + "nickname": "test", + "email": unique_email, + "password": "test123", + }, + ) + + # Request page 100 + res: dict[str, Any] = list_users( + WebApiAuth, params={"page": 100, "page_size": 10} + ) + + # Should return empty results, not error + assert res["code"] == 0, "High page number should not cause error" + assert isinstance(res["data"], list), "Should return empty list" + + @pytest.mark.p3 + def test_zero_page_size( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test pagination with page_size of 0.""" + res: dict[str, Any] = list_users( + WebApiAuth, params={"page": 1, "page_size": 0} + ) + + # Should reject invalid page size + assert res["code"] != 0, "Zero page size should be rejected" + assert "page" in res["message"].lower() or "size" in res["message"].lower() + + @pytest.mark.p3 + def test_negative_page_number( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test pagination with negative page number.""" + res: dict[str, Any] = list_users( + WebApiAuth, params={"page": -1, "page_size": 10} + ) + + # Should reject negative page number + assert res["code"] != 0, "Negative page number should be rejected" + assert "page" in res["message"].lower() + + @pytest.mark.p3 + def test_excessive_page_size( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test pagination with very large page_size.""" + res: dict[str, Any] = list_users( + WebApiAuth, params={"page": 1, "page_size": 10000} + ) + + # Should either cap page size or reject + # Most APIs cap at 100 as per spec + if res["code"] == 0: + # If accepted, should return limited results + assert len(res["data"]) <= 100, ( + "Page size should be capped at reasonable limit" + ) + + @pytest.mark.p3 + def test_special_characters_in_password( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test password with various special characters.""" + special_passwords: list[str] = [ + "Test@123!", + "Pass#$%^&*()", + "Pwd[]{}\\|;:'\",<>?/", + "测试密码123", # Unicode + "😀🎉🔥123", # Emoji + ] + + for password in special_passwords: + unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" + payload: dict[str, str] = { + "nickname": "test", + "email": unique_email, + "password": password, + } + res: dict[str, Any] = create_user(WebApiAuth, payload) + + # Should accept special characters in password + assert res["code"] == 0, ( + f"Password with special chars should be accepted: {password}" + ) + + @pytest.mark.p3 + def test_json_injection_in_fields( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test JSON injection attempts.""" + unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" + payload: dict[str, str] = { + "nickname": '{"admin": true}', + "email": unique_email, + "password": "test123", + } + res: dict[str, Any] = create_user(WebApiAuth, payload) + + # Should treat as literal string, not parse as JSON + if res["code"] == 0: + assert res["data"]["nickname"] == '{"admin": true}', ( + "Should store as literal string, not parse JSON" + ) + + @pytest.mark.p3 + def test_path_traversal_in_nickname( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test path traversal attempts in nickname.""" + traversal_attempts: list[str] = [ + "../../../etc/passwd", + "..\\..\\..\\windows\\system32", + "....//....//....//etc/passwd", + ] + + for nickname in traversal_attempts: + unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" + payload: dict[str, str] = { + "nickname": nickname, + "email": unique_email, + "password": "test123", + } + res: dict[str, Any] = create_user(WebApiAuth, payload) + + # Should either reject or sanitize path traversal attempts + # At minimum, should not allow actual file system access + if res["code"] == 0: + # Nickname should be stored safely + stored_nickname: str = res["data"]["nickname"] + # Verify it's treated as literal string + assert isinstance(stored_nickname, str) diff --git a/test/testcases/test_http_api/test_user_management/test_user_performance.py b/test/testcases/test_http_api/test_user_management/test_user_performance.py new file mode 100644 index 000000000..6b8f3d26d --- /dev/null +++ b/test/testcases/test_http_api/test_user_management/test_user_performance.py @@ -0,0 +1,392 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Performance and load tests for user management APIs.""" + +from __future__ import annotations + +import time +import uuid +from concurrent.futures import Future, ThreadPoolExecutor, as_completed +from typing import Any + +import pytest + +from common import create_user, list_users +from libs.auth import RAGFlowWebApiAuth + + +@pytest.mark.performance +@pytest.mark.usefixtures("clear_users") +class TestUserPerformance: + """Performance and load tests for user management.""" + + @pytest.mark.p2 + def test_list_users_performance_small_dataset( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test list_users performance with small dataset.""" + # Create 20 users + created_users: list[str] = [] + for i in range(20): + unique_email: str = f"perf_small_{i}_{uuid.uuid4().hex[:4]}@example.com" + res: dict[str, Any] = create_user( + WebApiAuth, + { + "nickname": f"user_{i}", + "email": unique_email, + "password": "test123", + }, + ) + if res["code"] == 0: + created_users.append(res["data"]["id"]) + + # Test list performance without pagination + start: float = time.time() + res: dict[str, Any] = list_users(WebApiAuth) + duration: float = time.time() - start + + assert res["code"] == 0, res + assert duration < 2.0, ( + f"List operation took {duration}s, should be under 2s" + ) + + @pytest.mark.p2 + def test_list_users_pagination_performance( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test pagination performance with moderate dataset.""" + # Create 50 users + for i in range(50): + unique_email: str = f"perf_test_{i}_{uuid.uuid4().hex[:4]}@example.com" + create_user( + WebApiAuth, + { + "nickname": f"user_{i}", + "email": unique_email, + "password": "test123", + }, + ) + + # Test pagination performance + start: float = time.time() + res: dict[str, Any] = list_users( + WebApiAuth, params={"page": 1, "page_size": 10} + ) + duration: float = time.time() - start + + assert res["code"] == 0, res + assert len(res["data"]) <= 10, "Should return requested page size" + assert duration < 1.0, ( + f"Paginated list took {duration}s, should be under 1s" + ) + + @pytest.mark.p3 + def test_concurrent_user_creation( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test concurrent user creation without conflicts.""" + count: int = 20 + + def create_test_user(index: int) -> dict[str, Any]: + unique_email: str = f"concurrent_{index}_{uuid.uuid4().hex[:8]}@example.com" + return create_user( + WebApiAuth, + { + "nickname": f"user_{index}", + "email": unique_email, + "password": "test123", + }, + ) + + # Create 20 users concurrently with 5 workers + start: float = time.time() + with ThreadPoolExecutor(max_workers=5) as executor: + futures: list[Future[dict[str, Any]]] = [ + executor.submit(create_test_user, i) for i in range(count) + ] + results: list[dict[str, Any]] = [ + f.result() for f in as_completed(futures) + ] + duration: float = time.time() - start + + # All should succeed + success_count: int = sum(1 for r in results if r["code"] == 0) + assert success_count == count, ( + f"Expected {count} successful creations, got {success_count}" + ) + + # Should complete reasonably quickly + # 20 users with 5 workers ~= 4 batches, allow 10 seconds + assert duration < 10.0, ( + f"Concurrent creation took {duration}s, should be under 10s" + ) + + @pytest.mark.p3 + def test_user_creation_response_time( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test individual user creation response time.""" + response_times: list[float] = [] + + for i in range(10): + unique_email: str = f"timing_{i}_{uuid.uuid4().hex[:8]}@example.com" + start: float = time.time() + res: dict[str, Any] = create_user( + WebApiAuth, + { + "nickname": f"user_{i}", + "email": unique_email, + "password": "test123", + }, + ) + duration: float = time.time() - start + + assert res["code"] == 0, f"User creation failed: {res}" + response_times.append(duration) + + # Calculate statistics + avg_time: float = sum(response_times) / len(response_times) + max_time: float = max(response_times) + + # Average response time should be reasonable + assert avg_time < 1.0, ( + f"Average user creation time {avg_time}s should be under 1s" + ) + # Max response time shouldn't spike too high + assert max_time < 3.0, ( + f"Max user creation time {max_time}s should be under 3s" + ) + + @pytest.mark.p3 + def test_sequential_vs_concurrent_creation_comparison( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Compare sequential vs concurrent user creation performance.""" + count: int = 10 + + # Sequential creation + sequential_start: float = time.time() + for i in range(count): + unique_email: str = f"seq_{i}_{uuid.uuid4().hex[:8]}@example.com" + create_user( + WebApiAuth, + { + "nickname": f"seq_user_{i}", + "email": unique_email, + "password": "test123", + }, + ) + sequential_duration: float = time.time() - sequential_start + + # Concurrent creation + def create_concurrent_user(index: int) -> dict[str, Any]: + unique_email: str = f"conc_{index}_{uuid.uuid4().hex[:8]}@example.com" + return create_user( + WebApiAuth, + { + "nickname": f"conc_user_{index}", + "email": unique_email, + "password": "test123", + }, + ) + + concurrent_start: float = time.time() + with ThreadPoolExecutor(max_workers=5) as executor: + futures: list[Future[dict[str, Any]]] = [ + executor.submit(create_concurrent_user, i) for i in range(count) + ] + concurrent_results: list[dict[str, Any]] = [ + f.result() for f in as_completed(futures) + ] + concurrent_duration: float = time.time() - concurrent_start + + # Concurrent should be faster (or at least not significantly slower) + # Allow some overhead for thread management + speedup_ratio: float = sequential_duration / concurrent_duration + + # Log performance metrics for analysis + print(f"\nPerformance Comparison ({count} users):") + print(f"Sequential: {sequential_duration:.2f}s") + print(f"Concurrent: {concurrent_duration:.2f}s") + print(f"Speedup: {speedup_ratio:.2f}x") + + # Concurrent should provide some benefit (at least not be slower) + # With 5 workers, expect at least some improvement + assert concurrent_duration <= sequential_duration * 1.2, ( + "Concurrent creation should not be significantly slower than sequential" + ) + + @pytest.mark.p3 + def test_pagination_consistency_under_load( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test pagination consistency during concurrent modifications.""" + # Create initial set of users + initial_count: int = 30 + for i in range(initial_count): + unique_email: str = f"pag_{i}_{uuid.uuid4().hex[:8]}@example.com" + create_user( + WebApiAuth, + { + "nickname": f"user_{i}", + "email": unique_email, + "password": "test123", + }, + ) + + # Test pagination while users are being created + def paginate_users() -> dict[str, Any]: + return list_users(WebApiAuth, params={"page": 1, "page_size": 10}) + + def create_more_users() -> None: + for i in range(5): + unique_email: str = f"new_{i}_{uuid.uuid4().hex[:8]}@example.com" + create_user( + WebApiAuth, + { + "nickname": f"new_user_{i}", + "email": unique_email, + "password": "test123", + }, + ) + + with ThreadPoolExecutor(max_workers=3) as executor: + # Start pagination requests + pag_futures: list[Future] = [ + executor.submit(paginate_users) for _ in range(5) + ] + # Start creation requests + create_future: Future = executor.submit(create_more_users) + + # Wait for all to complete + pag_results: list[dict[str, Any]] = [ + f.result() for f in pag_futures + ] + create_future.result() + + # All pagination requests should succeed + assert all(r["code"] == 0 for r in pag_results), ( + "Pagination should remain stable during concurrent modifications" + ) + + @pytest.mark.p3 + def test_memory_efficiency_large_list( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test memory efficiency when listing many users.""" + # Create 100 users + for i in range(100): + unique_email: str = f"mem_{i}_{uuid.uuid4().hex[:8]}@example.com" + create_user( + WebApiAuth, + { + "nickname": f"user_{i}", + "email": unique_email, + "password": "test123", + }, + ) + + # List all users (without pagination) + res: dict[str, Any] = list_users(WebApiAuth) + + assert res["code"] == 0, res + # Should return results without memory issues + assert isinstance(res["data"], list), "Should return list" + # Response should not be excessively large + # (This is a basic check; real memory profiling would need additional tools) + + @pytest.mark.p3 + @pytest.mark.skip(reason="Stress test - run manually") + def test_sustained_load( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test system stability under sustained load (manual run).""" + duration_seconds: int = 60 # Run for 1 minute + requests_per_second: int = 5 + + start_time: float = time.time() + request_count: int = 0 + error_count: int = 0 + + while time.time() - start_time < duration_seconds: + batch_start: float = time.time() + + # Send requests_per_second requests + for i in range(requests_per_second): + unique_email: str = f"load_{request_count}_{uuid.uuid4().hex[:8]}@example.com" + res: dict[str, Any] = create_user( + WebApiAuth, + { + "nickname": f"user_{request_count}", + "email": unique_email, + "password": "test123", + }, + ) + request_count += 1 + if res["code"] != 0: + error_count += 1 + + # Wait to maintain requests_per_second rate + elapsed: float = time.time() - batch_start + sleep_time: float = 1.0 - elapsed + if sleep_time > 0: + time.sleep(sleep_time) + + total_duration: float = time.time() - start_time + actual_rps: float = request_count / total_duration + error_rate: float = error_count / request_count if request_count > 0 else 0 + + print(f"\nSustained Load Test Results:") + print(f"Duration: {total_duration:.2f}s") + print(f"Total Requests: {request_count}") + print(f"Actual RPS: {actual_rps:.2f}") + print(f"Error Rate: {error_rate:.2%}") + + # Error rate should be low + assert error_rate < 0.05, ( + f"Error rate {error_rate:.2%} should be under 5%" + ) + + @pytest.mark.p3 + def test_large_payload_handling( + self, WebApiAuth: RAGFlowWebApiAuth + ) -> None: + """Test handling of large request payloads.""" + # Create user with large nickname (but within limits) + large_nickname: str = "A" * 200 # 200 characters + unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" + + start: float = time.time() + res: dict[str, Any] = create_user( + WebApiAuth, + { + "nickname": large_nickname, + "email": unique_email, + "password": "test123" * 10, # Longer password + }, + ) + duration: float = time.time() - start + + # Should handle large payloads efficiently + assert duration < 2.0, ( + f"Large payload took {duration}s, should be under 2s" + ) + + if res["code"] == 0: + # Verify data was stored correctly + assert len(res["data"]["nickname"]) <= 255, ( + "Nickname should be capped at reasonable length" + ) diff --git a/test/testcases/test_http_api/test_user_management/test_user_security.py b/test/testcases/test_http_api/test_user_management/test_user_security.py new file mode 100644 index 000000000..ab171cb5b --- /dev/null +++ b/test/testcases/test_http_api/test_user_management/test_user_security.py @@ -0,0 +1,342 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Security-focused tests for user management APIs.""" + +from __future__ import annotations + +import uuid +from typing import Any + +import pytest + +from common import create_user +from libs.auth import RAGFlowWebApiAuth + + +@pytest.mark.security +@pytest.mark.usefixtures("clear_users") +class TestUserSecurity: + """Security-focused tests for user management.""" + + @pytest.mark.p1 + @pytest.mark.parametrize( + "malicious_email", + [ + "'; DROP TABLE users; --", + "admin@example.com' OR '1'='1", + "test@example.com'; UPDATE users SET is_superuser=true; --", + "admin' --", + "' OR 1=1 --", + ], + ) + def test_sql_injection_in_email( + self, WebApiAuth: RAGFlowWebApiAuth, malicious_email: str + ) -> None: + """Test SQL injection attempts in email field are properly handled.""" + payload: dict[str, str] = { + "nickname": "test", + "email": malicious_email, + "password": "test123", + } + res: dict[str, Any] = create_user(WebApiAuth, payload) + # Should fail validation, not execute SQL + assert res["code"] != 0, ( + f"SQL injection attempt should be rejected: {malicious_email}" + ) + assert "invalid" in res["message"].lower() + + @pytest.mark.p1 + @pytest.mark.parametrize( + "xss_payload", + [ + "", + "", + "javascript:alert('xss')", + "", + "<", + ], + ) + def test_xss_in_nickname( + self, WebApiAuth: RAGFlowWebApiAuth, xss_payload: str + ) -> None: + """Test XSS attempts in nickname field are sanitized.""" + unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" + payload: dict[str, str] = { + "nickname": xss_payload, + "email": unique_email, + "password": "test123", + } + res: dict[str, Any] = create_user(WebApiAuth, payload) + if res["code"] == 0: + # Nickname should be sanitized + nickname: str = res["data"]["nickname"] + assert "