diff --git a/test/testcases/test_http_api/test_team_management/test_team_advanced.py b/test/testcases/test_http_api/test_team_management/test_team_advanced.py
new file mode 100644
index 000000000..a14d3257d
--- /dev/null
+++ b/test/testcases/test_http_api/test_team_management/test_team_advanced.py
@@ -0,0 +1,337 @@
+#
+# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Advanced team management tests."""
+
+from __future__ import annotations
+
+import uuid
+from typing import Any
+
+import pytest
+
+from common import create_team, create_user
+from libs.auth import RAGFlowWebApiAuth
+
+
+@pytest.mark.p2
+class TestTeamAdvanced:
+ """Advanced team management tests."""
+
+ def test_team_creation_with_custom_models(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test creating team with custom model configurations."""
+ team_name: str = f"Custom Models Team {uuid.uuid4().hex[:8]}"
+
+ # Attempt to create team with custom models
+ # Note: Model IDs need to be valid and added by the user
+ team_payload: dict[str, str] = {
+ "name": team_name,
+ # These would need to be actual model IDs added by the user
+ # "llm_id": "custom_llm_id",
+ # "embd_id": "custom_embd_id",
+ }
+ res: dict[str, Any] = create_team(WebApiAuth, team_payload)
+
+ # Should succeed with defaults if custom models not specified
+ assert res["code"] == 0, res
+ assert res["data"]["name"] == team_name
+ assert "id" in res["data"]
+
+ def test_team_creation_response_structure(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test that team creation returns complete response structure."""
+ team_name: str = f"Structure Test Team {uuid.uuid4().hex[:8]}"
+ team_payload: dict[str, str] = {"name": team_name}
+ res: dict[str, Any] = create_team(WebApiAuth, team_payload)
+
+ assert res["code"] == 0, res
+ assert "data" in res
+
+ # Check required fields
+ required_fields: list[str] = ["id", "name", "owner_id"]
+ for field in required_fields:
+ assert field in res["data"], (
+ f"Missing required field: {field}"
+ )
+
+ assert res["data"]["name"] == team_name
+ assert len(res["data"]["id"]) > 0, "Team ID should not be empty"
+ assert len(res["data"]["owner_id"]) > 0, "Owner ID should not be empty"
+
+ def test_multiple_teams_same_name_allowed(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test that multiple teams can have the same name."""
+ team_name: str = f"Duplicate Name {uuid.uuid4().hex[:8]}"
+
+ # Create first team
+ res1: dict[str, Any] = create_team(WebApiAuth, {"name": team_name})
+ assert res1["code"] == 0, res1
+ team_id_1: str = res1["data"]["id"]
+
+ # Create second team with same name
+ res2: dict[str, Any] = create_team(WebApiAuth, {"name": team_name})
+ assert res2["code"] == 0, res2
+ team_id_2: str = res2["data"]["id"]
+
+ # Teams should have different IDs
+ assert team_id_1 != team_id_2, "Teams should have unique IDs"
+ assert res1["data"]["name"] == res2["data"]["name"], (
+ "Both teams should have the same name"
+ )
+
+ def test_team_creation_with_credit_limit(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test creating team with custom credit limit."""
+ team_name: str = f"Credit Test Team {uuid.uuid4().hex[:8]}"
+ custom_credit: int = 1000
+
+ team_payload: dict[str, Any] = {
+ "name": team_name,
+ "credit": custom_credit,
+ }
+ res: dict[str, Any] = create_team(WebApiAuth, team_payload)
+
+ # Should succeed
+ assert res["code"] == 0, res
+ # Note: Credit may not be in response, but should be set internally
+
+ def test_team_name_with_special_characters(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test team names with special characters."""
+ special_names: list[str] = [
+ f"Team-{uuid.uuid4().hex[:4]}_Test!",
+ f"Team & Co. {uuid.uuid4().hex[:4]}",
+ f"Team @{uuid.uuid4().hex[:4]}",
+ f"团队{uuid.uuid4().hex[:4]}", # Unicode
+ ]
+
+ for name in special_names:
+ res: dict[str, Any] = create_team(WebApiAuth, {"name": name})
+ # Should either accept or reject with clear message
+ if res["code"] == 0:
+ assert res["data"]["name"] == name, (
+ f"Team name should be preserved: {name}"
+ )
+ # If rejected, should have clear error
+ # (Current implementation accepts special chars)
+
+ def test_team_creation_default_owner(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test that team creator is set as owner by default."""
+ team_name: str = f"Owner Test Team {uuid.uuid4().hex[:8]}"
+ res: dict[str, Any] = create_team(WebApiAuth, {"name": team_name})
+
+ assert res["code"] == 0, res
+ assert "owner_id" in res["data"], "Owner ID should be in response"
+ # Owner should be the authenticated user
+ # (Cannot verify without knowing WebApiAuth user ID)
+
+ def test_concurrent_team_creation(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test concurrent team creation."""
+ import concurrent.futures
+
+ def create_test_team(index: int) -> dict[str, Any]:
+ team_name: str = f"Concurrent Team {index}_{uuid.uuid4().hex[:8]}"
+ return create_team(WebApiAuth, {"name": team_name})
+
+ # Create 10 teams concurrently
+ count: int = 10
+ with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
+ futures: list[concurrent.futures.Future[dict[str, Any]]] = [
+ executor.submit(create_test_team, i) for i in range(count)
+ ]
+ results: list[dict[str, Any]] = [
+ f.result() for f in concurrent.futures.as_completed(futures)
+ ]
+
+ # All should succeed
+ success_count: int = sum(1 for r in results if r["code"] == 0)
+ assert success_count == count, (
+ f"Expected {count} successful team creations, got {success_count}"
+ )
+
+ # All should have unique IDs
+ team_ids: list[str] = [r["data"]["id"] for r in results if r["code"] == 0]
+ assert len(team_ids) == len(set(team_ids)), (
+ "All team IDs should be unique"
+ )
+
+ def test_team_with_invalid_model_id(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test team creation with invalid model ID."""
+ team_name: str = f"Invalid Model Team {uuid.uuid4().hex[:8]}"
+ team_payload: dict[str, str] = {
+ "name": team_name,
+ "llm_id": "invalid_nonexistent_model_id_12345",
+ }
+ res: dict[str, Any] = create_team(WebApiAuth, team_payload)
+
+ # Should reject with clear error message
+ assert res["code"] != 0, "Invalid model ID should be rejected"
+ assert (
+ "model" in res["message"].lower()
+ or "not found" in res["message"].lower()
+ or "invalid" in res["message"].lower()
+ ), "Error message should mention model issue"
+
+ def test_team_creation_with_negative_credit(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test team creation with negative credit."""
+ team_name: str = f"Negative Credit Team {uuid.uuid4().hex[:8]}"
+ team_payload: dict[str, Any] = {
+ "name": team_name,
+ "credit": -100,
+ }
+ res: dict[str, Any] = create_team(WebApiAuth, team_payload)
+
+ # Should reject negative credit
+ assert res["code"] != 0, "Negative credit should be rejected"
+ assert "credit" in res["message"].lower(), (
+ "Error message should mention credit"
+ )
+
+ def test_team_creation_empty_json_payload(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test team creation with completely empty payload."""
+ res: dict[str, Any] = create_team(WebApiAuth, {})
+
+ # Should reject with clear error
+ assert res["code"] != 0, "Empty payload should be rejected"
+ assert (
+ "name" in res["message"].lower()
+ or "required" in res["message"].lower()
+ ), "Error should mention missing name"
+
+ def test_team_unicode_name(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test team creation with full unicode name."""
+ unicode_names: list[str] = [
+ f"团队{uuid.uuid4().hex[:4]}", # Chinese
+ f"チーム{uuid.uuid4().hex[:4]}", # Japanese
+ f"Команда{uuid.uuid4().hex[:4]}", # Russian
+ f"فريق{uuid.uuid4().hex[:4]}", # Arabic (RTL)
+ f"😀🎉{uuid.uuid4().hex[:4]}", # Emoji
+ ]
+
+ for name in unicode_names:
+ res: dict[str, Any] = create_team(WebApiAuth, {"name": name})
+
+ # Should handle unicode properly
+ if res["code"] == 0:
+ # Verify unicode is preserved (may be normalized)
+ assert len(res["data"]["name"]) > 0, (
+ "Team name should not be empty after unicode"
+ )
+
+ @pytest.mark.p3
+ def test_team_creation_with_all_optional_params(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test team creation with all optional parameters."""
+ team_name: str = f"Full Params Team {uuid.uuid4().hex[:8]}"
+ team_payload: dict[str, Any] = {
+ "name": team_name,
+ "credit": 2000,
+ # Note: Model IDs would need to be valid
+ # "llm_id": "valid_llm_id",
+ # "embd_id": "valid_embd_id",
+ # "asr_id": "valid_asr_id",
+ # "parser_ids": "valid_parser_ids",
+ # "img2txt_id": "valid_img2txt_id",
+ # "rerank_id": "valid_rerank_id",
+ }
+ res: dict[str, Any] = create_team(WebApiAuth, team_payload)
+
+ # Should succeed
+ assert res["code"] == 0, res
+ assert res["data"]["name"] == team_name
+
+ @pytest.mark.p3
+ def test_team_max_name_length(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test team with maximum allowed name length."""
+ # API spec says max 100 characters
+ max_name: str = "A" * 100
+ res: dict[str, Any] = create_team(WebApiAuth, {"name": max_name})
+
+ # Should accept 100 characters
+ assert res["code"] == 0, "100-character name should be accepted"
+ assert res["data"]["name"] == max_name
+
+ @pytest.mark.p3
+ def test_team_name_just_over_limit(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test team with name just over limit."""
+ # 101 characters (1 over limit)
+ long_name: str = "A" * 101
+ res: dict[str, Any] = create_team(WebApiAuth, {"name": long_name})
+
+ # Should reject
+ assert res["code"] != 0, "101-character name should be rejected"
+ assert (
+ "100" in res["message"]
+ or "length" in res["message"].lower()
+ or "long" in res["message"].lower()
+ ), "Error should mention length limit"
+
+ @pytest.mark.p3
+ def test_team_creation_idempotency(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test that repeated team creation creates separate teams."""
+ team_name: str = f"Idempotency Test {uuid.uuid4().hex[:8]}"
+ payload: dict[str, str] = {"name": team_name}
+
+ # Create same team twice
+ res1: dict[str, Any] = create_team(WebApiAuth, payload)
+ res2: dict[str, Any] = create_team(WebApiAuth, payload)
+
+ # Both should succeed and create different teams
+ assert res1["code"] == 0, res1
+ assert res2["code"] == 0, res2
+ assert res1["data"]["id"] != res2["data"]["id"], (
+ "Should create different teams, not be idempotent"
+ )
+
+ @pytest.mark.p3
+ def test_team_with_parser_ids(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test team creation with custom parser IDs."""
+ team_name: str = f"Parser Test {uuid.uuid4().hex[:8]}"
+ # parser_ids is typically a comma-separated string
+ team_payload: dict[str, str] = {
+ "name": team_name,
+ "parser_ids": "naive,qa,table,paper,book,laws,presentation,manual,wiki",
+ }
+ res: dict[str, Any] = create_team(WebApiAuth, team_payload)
+
+ # Should accept valid parser IDs
+ assert res["code"] == 0, res
diff --git a/test/testcases/test_http_api/test_user_management/test_user_edge_cases.py b/test/testcases/test_http_api/test_user_management/test_user_edge_cases.py
new file mode 100644
index 000000000..4b8486ab5
--- /dev/null
+++ b/test/testcases/test_http_api/test_user_management/test_user_edge_cases.py
@@ -0,0 +1,422 @@
+#
+# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Edge case tests for user management APIs."""
+
+from __future__ import annotations
+
+import uuid
+from typing import Any
+
+import pytest
+
+from common import create_user, list_users
+from libs.auth import RAGFlowWebApiAuth
+
+
+@pytest.mark.edge_cases
+@pytest.mark.usefixtures("clear_users")
+class TestUserEdgeCases:
+ """Edge case tests for user management."""
+
+ @pytest.mark.p2
+ def test_extremely_long_nickname(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test nickname with extreme length."""
+ long_nickname: str = "A" * 1000
+ unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
+ payload: dict[str, str] = {
+ "nickname": long_nickname,
+ "email": unique_email,
+ "password": "test123",
+ }
+ res: dict[str, Any] = create_user(WebApiAuth, payload)
+
+ # Should either accept (and possibly truncate) or reject with clear error
+ if res["code"] == 0:
+ # If accepted, nickname should be truncated to reasonable length
+ assert len(res["data"]["nickname"]) <= 255, (
+ "Nickname should be truncated to max length"
+ )
+ else:
+ # If rejected, should have clear error message
+ assert (
+ "length" in res["message"].lower()
+ or "long" in res["message"].lower()
+ or "characters" in res["message"].lower()
+ ), "Error message should mention length issue"
+
+ @pytest.mark.p2
+ def test_unicode_in_all_fields(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test unicode characters in various fields."""
+ # Use ASCII-safe email local part (unicode in display name)
+ local_part: str = f"test_{uuid.uuid4().hex[:8]}"
+ unique_email: str = f"{local_part}@example.com"
+ payload: dict[str, str] = {
+ "nickname": "用户名测试", # Chinese characters
+ "email": unique_email,
+ "password": "密码123!", # Chinese + ASCII
+ }
+ res: dict[str, Any] = create_user(WebApiAuth, payload)
+
+ # Should handle unicode properly
+ if res["code"] == 0:
+ assert res["data"]["nickname"] == "用户名测试", (
+ "Unicode nickname should be preserved"
+ )
+ assert res["data"]["email"] == unique_email
+
+ @pytest.mark.p2
+ def test_emoji_in_nickname(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test emoji characters in nickname."""
+ nickname_with_emoji: str = "Test User 😀🎉🔥"
+ unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
+ payload: dict[str, str] = {
+ "nickname": nickname_with_emoji,
+ "email": unique_email,
+ "password": "test123",
+ }
+ res: dict[str, Any] = create_user(WebApiAuth, payload)
+
+ # Should handle emoji properly (either accept or reject gracefully)
+ if res["code"] == 0:
+ # Emoji should be preserved
+ assert "😀" in res["data"]["nickname"] or "?" in res["data"]["nickname"], (
+ "Emoji should be preserved or replaced with placeholder"
+ )
+
+ @pytest.mark.p2
+ @pytest.mark.parametrize(
+ "special_email",
+ [
+ f"user+tag_{uuid.uuid4().hex[:4]}@example.com",
+ f"user.name_{uuid.uuid4().hex[:4]}@example.com",
+ f"user_name_{uuid.uuid4().hex[:4]}@example.com",
+ f"user-name_{uuid.uuid4().hex[:4]}@example.com",
+ f"123_{uuid.uuid4().hex[:4]}@example.com", # Starts with number
+ ],
+ )
+ def test_special_characters_in_email(
+ self, WebApiAuth: RAGFlowWebApiAuth, special_email: str
+ ) -> None:
+ """Test various special characters in email."""
+ payload: dict[str, str] = {
+ "nickname": "test",
+ "email": special_email,
+ "password": "test123",
+ }
+ res: dict[str, Any] = create_user(WebApiAuth, payload)
+ assert res["code"] == 0, f"Failed for email: {special_email}, {res}"
+ assert res["data"]["email"] == special_email
+
+ @pytest.mark.p2
+ def test_whitespace_handling_in_fields(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test whitespace handling in various fields."""
+ unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
+ payload: dict[str, str] = {
+ "nickname": " test user ", # Leading/trailing spaces
+ "email": f" {unique_email} ", # Spaces around email
+ "password": "test123",
+ }
+ res: dict[str, Any] = create_user(WebApiAuth, payload)
+
+ # Should handle whitespace (trim or accept)
+ if res["code"] == 0:
+ # Email should be trimmed
+ assert res["data"]["email"] == unique_email, (
+ "Email should have whitespace trimmed"
+ )
+ # Nickname whitespace handling is flexible
+ nickname: str = res["data"]["nickname"]
+ assert nickname.strip() != "", "Nickname should not be only whitespace"
+
+ @pytest.mark.p2
+ def test_null_byte_in_input(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test null byte injection in input fields."""
+ unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
+ payload: dict[str, str] = {
+ "nickname": "test\x00admin", # Null byte in nickname
+ "email": unique_email,
+ "password": "test123",
+ }
+ res: dict[str, Any] = create_user(WebApiAuth, payload)
+
+ # Should handle or reject null bytes
+ if res["code"] == 0:
+ # Null byte should be removed or replaced
+ assert "\x00" not in res["data"]["nickname"], (
+ "Null byte should be sanitized"
+ )
+
+ @pytest.mark.p2
+ def test_very_long_email(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test email exceeding typical length limits."""
+ # Create email with very long local part (250 chars)
+ long_local: str = "a" * 250
+ email: str = f"{long_local}@example.com"
+ payload: dict[str, str] = {
+ "nickname": "test",
+ "email": email,
+ "password": "test123",
+ }
+ res: dict[str, Any] = create_user(WebApiAuth, payload)
+
+ # Should reject overly long emails (RFC 5321 limits local part to 64 chars)
+ assert res["code"] != 0, "Very long email should be rejected"
+ assert (
+ "invalid" in res["message"].lower()
+ or "long" in res["message"].lower()
+ ), "Error should mention invalid/long email"
+
+ @pytest.mark.p2
+ def test_email_with_multiple_at_signs(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test email with multiple @ signs."""
+ payload: dict[str, str] = {
+ "nickname": "test",
+ "email": "user@@example.com",
+ "password": "test123",
+ }
+ res: dict[str, Any] = create_user(WebApiAuth, payload)
+
+ # Should reject invalid email format
+ assert res["code"] != 0, "Email with multiple @ should be rejected"
+ assert "invalid" in res["message"].lower()
+
+ @pytest.mark.p2
+ def test_email_with_spaces(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test email containing spaces."""
+ payload: dict[str, str] = {
+ "nickname": "test",
+ "email": "user name@example.com",
+ "password": "test123",
+ }
+ res: dict[str, Any] = create_user(WebApiAuth, payload)
+
+ # Should reject email with spaces
+ assert res["code"] != 0, "Email with spaces should be rejected"
+ assert "invalid" in res["message"].lower()
+
+ @pytest.mark.p3
+ def test_leading_trailing_dots_in_email(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test email with leading/trailing dots."""
+ invalid_emails: list[str] = [
+ ".user@example.com", # Leading dot
+ "user.@example.com", # Trailing dot
+ "user..name@example.com", # Consecutive dots
+ ]
+
+ for email in invalid_emails:
+ payload: dict[str, str] = {
+ "nickname": "test",
+ "email": email,
+ "password": "test123",
+ }
+ res: dict[str, Any] = create_user(WebApiAuth, payload)
+
+ # These should be rejected as invalid
+ assert res["code"] != 0, f"Invalid email should be rejected: {email}"
+
+ @pytest.mark.p3
+ def test_empty_string_vs_none_in_optional_fields(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test difference between empty string and None for optional fields."""
+ unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
+
+ # Test with empty string nickname (should be accepted)
+ payload_empty: dict[str, str] = {
+ "nickname": "",
+ "email": unique_email,
+ "password": "test123",
+ }
+ res_empty: dict[str, Any] = create_user(WebApiAuth, payload_empty)
+
+ # Empty nickname should be accepted per current API behavior
+ assert res_empty["code"] == 0, "Empty nickname should be accepted"
+ assert res_empty["data"]["nickname"] == ""
+
+ @pytest.mark.p3
+ def test_pagination_with_no_results(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test list_users pagination when no users exist."""
+ # Assuming clear_users fixture has cleared all test users
+ res: dict[str, Any] = list_users(
+ WebApiAuth, params={"page": 1, "page_size": 10}
+ )
+
+ # Should return empty list, not error
+ assert res["code"] == 0, "Empty result should not cause error"
+ # Data might be empty or contain only system users
+ assert isinstance(res["data"], list), "Should return list even if empty"
+
+ @pytest.mark.p3
+ def test_pagination_beyond_available_pages(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test requesting page beyond available data."""
+ # Create one user
+ unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
+ create_user(
+ WebApiAuth,
+ {
+ "nickname": "test",
+ "email": unique_email,
+ "password": "test123",
+ },
+ )
+
+ # Request page 100
+ res: dict[str, Any] = list_users(
+ WebApiAuth, params={"page": 100, "page_size": 10}
+ )
+
+ # Should return empty results, not error
+ assert res["code"] == 0, "High page number should not cause error"
+ assert isinstance(res["data"], list), "Should return empty list"
+
+ @pytest.mark.p3
+ def test_zero_page_size(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test pagination with page_size of 0."""
+ res: dict[str, Any] = list_users(
+ WebApiAuth, params={"page": 1, "page_size": 0}
+ )
+
+ # Should reject invalid page size
+ assert res["code"] != 0, "Zero page size should be rejected"
+ assert "page" in res["message"].lower() or "size" in res["message"].lower()
+
+ @pytest.mark.p3
+ def test_negative_page_number(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test pagination with negative page number."""
+ res: dict[str, Any] = list_users(
+ WebApiAuth, params={"page": -1, "page_size": 10}
+ )
+
+ # Should reject negative page number
+ assert res["code"] != 0, "Negative page number should be rejected"
+ assert "page" in res["message"].lower()
+
+ @pytest.mark.p3
+ def test_excessive_page_size(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test pagination with very large page_size."""
+ res: dict[str, Any] = list_users(
+ WebApiAuth, params={"page": 1, "page_size": 10000}
+ )
+
+ # Should either cap page size or reject
+ # Most APIs cap at 100 as per spec
+ if res["code"] == 0:
+ # If accepted, should return limited results
+ assert len(res["data"]) <= 100, (
+ "Page size should be capped at reasonable limit"
+ )
+
+ @pytest.mark.p3
+ def test_special_characters_in_password(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test password with various special characters."""
+ special_passwords: list[str] = [
+ "Test@123!",
+ "Pass#$%^&*()",
+ "Pwd[]{}\\|;:'\",<>?/",
+ "测试密码123", # Unicode
+ "😀🎉🔥123", # Emoji
+ ]
+
+ for password in special_passwords:
+ unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
+ payload: dict[str, str] = {
+ "nickname": "test",
+ "email": unique_email,
+ "password": password,
+ }
+ res: dict[str, Any] = create_user(WebApiAuth, payload)
+
+ # Should accept special characters in password
+ assert res["code"] == 0, (
+ f"Password with special chars should be accepted: {password}"
+ )
+
+ @pytest.mark.p3
+ def test_json_injection_in_fields(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test JSON injection attempts."""
+ unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
+ payload: dict[str, str] = {
+ "nickname": '{"admin": true}',
+ "email": unique_email,
+ "password": "test123",
+ }
+ res: dict[str, Any] = create_user(WebApiAuth, payload)
+
+ # Should treat as literal string, not parse as JSON
+ if res["code"] == 0:
+ assert res["data"]["nickname"] == '{"admin": true}', (
+ "Should store as literal string, not parse JSON"
+ )
+
+ @pytest.mark.p3
+ def test_path_traversal_in_nickname(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test path traversal attempts in nickname."""
+ traversal_attempts: list[str] = [
+ "../../../etc/passwd",
+ "..\\..\\..\\windows\\system32",
+ "....//....//....//etc/passwd",
+ ]
+
+ for nickname in traversal_attempts:
+ unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
+ payload: dict[str, str] = {
+ "nickname": nickname,
+ "email": unique_email,
+ "password": "test123",
+ }
+ res: dict[str, Any] = create_user(WebApiAuth, payload)
+
+ # Should either reject or sanitize path traversal attempts
+ # At minimum, should not allow actual file system access
+ if res["code"] == 0:
+ # Nickname should be stored safely
+ stored_nickname: str = res["data"]["nickname"]
+ # Verify it's treated as literal string
+ assert isinstance(stored_nickname, str)
diff --git a/test/testcases/test_http_api/test_user_management/test_user_performance.py b/test/testcases/test_http_api/test_user_management/test_user_performance.py
new file mode 100644
index 000000000..6b8f3d26d
--- /dev/null
+++ b/test/testcases/test_http_api/test_user_management/test_user_performance.py
@@ -0,0 +1,392 @@
+#
+# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Performance and load tests for user management APIs."""
+
+from __future__ import annotations
+
+import time
+import uuid
+from concurrent.futures import Future, ThreadPoolExecutor, as_completed
+from typing import Any
+
+import pytest
+
+from common import create_user, list_users
+from libs.auth import RAGFlowWebApiAuth
+
+
+@pytest.mark.performance
+@pytest.mark.usefixtures("clear_users")
+class TestUserPerformance:
+ """Performance and load tests for user management."""
+
+ @pytest.mark.p2
+ def test_list_users_performance_small_dataset(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test list_users performance with small dataset."""
+ # Create 20 users
+ created_users: list[str] = []
+ for i in range(20):
+ unique_email: str = f"perf_small_{i}_{uuid.uuid4().hex[:4]}@example.com"
+ res: dict[str, Any] = create_user(
+ WebApiAuth,
+ {
+ "nickname": f"user_{i}",
+ "email": unique_email,
+ "password": "test123",
+ },
+ )
+ if res["code"] == 0:
+ created_users.append(res["data"]["id"])
+
+ # Test list performance without pagination
+ start: float = time.time()
+ res: dict[str, Any] = list_users(WebApiAuth)
+ duration: float = time.time() - start
+
+ assert res["code"] == 0, res
+ assert duration < 2.0, (
+ f"List operation took {duration}s, should be under 2s"
+ )
+
+ @pytest.mark.p2
+ def test_list_users_pagination_performance(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test pagination performance with moderate dataset."""
+ # Create 50 users
+ for i in range(50):
+ unique_email: str = f"perf_test_{i}_{uuid.uuid4().hex[:4]}@example.com"
+ create_user(
+ WebApiAuth,
+ {
+ "nickname": f"user_{i}",
+ "email": unique_email,
+ "password": "test123",
+ },
+ )
+
+ # Test pagination performance
+ start: float = time.time()
+ res: dict[str, Any] = list_users(
+ WebApiAuth, params={"page": 1, "page_size": 10}
+ )
+ duration: float = time.time() - start
+
+ assert res["code"] == 0, res
+ assert len(res["data"]) <= 10, "Should return requested page size"
+ assert duration < 1.0, (
+ f"Paginated list took {duration}s, should be under 1s"
+ )
+
+ @pytest.mark.p3
+ def test_concurrent_user_creation(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test concurrent user creation without conflicts."""
+ count: int = 20
+
+ def create_test_user(index: int) -> dict[str, Any]:
+ unique_email: str = f"concurrent_{index}_{uuid.uuid4().hex[:8]}@example.com"
+ return create_user(
+ WebApiAuth,
+ {
+ "nickname": f"user_{index}",
+ "email": unique_email,
+ "password": "test123",
+ },
+ )
+
+ # Create 20 users concurrently with 5 workers
+ start: float = time.time()
+ with ThreadPoolExecutor(max_workers=5) as executor:
+ futures: list[Future[dict[str, Any]]] = [
+ executor.submit(create_test_user, i) for i in range(count)
+ ]
+ results: list[dict[str, Any]] = [
+ f.result() for f in as_completed(futures)
+ ]
+ duration: float = time.time() - start
+
+ # All should succeed
+ success_count: int = sum(1 for r in results if r["code"] == 0)
+ assert success_count == count, (
+ f"Expected {count} successful creations, got {success_count}"
+ )
+
+ # Should complete reasonably quickly
+ # 20 users with 5 workers ~= 4 batches, allow 10 seconds
+ assert duration < 10.0, (
+ f"Concurrent creation took {duration}s, should be under 10s"
+ )
+
+ @pytest.mark.p3
+ def test_user_creation_response_time(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test individual user creation response time."""
+ response_times: list[float] = []
+
+ for i in range(10):
+ unique_email: str = f"timing_{i}_{uuid.uuid4().hex[:8]}@example.com"
+ start: float = time.time()
+ res: dict[str, Any] = create_user(
+ WebApiAuth,
+ {
+ "nickname": f"user_{i}",
+ "email": unique_email,
+ "password": "test123",
+ },
+ )
+ duration: float = time.time() - start
+
+ assert res["code"] == 0, f"User creation failed: {res}"
+ response_times.append(duration)
+
+ # Calculate statistics
+ avg_time: float = sum(response_times) / len(response_times)
+ max_time: float = max(response_times)
+
+ # Average response time should be reasonable
+ assert avg_time < 1.0, (
+ f"Average user creation time {avg_time}s should be under 1s"
+ )
+ # Max response time shouldn't spike too high
+ assert max_time < 3.0, (
+ f"Max user creation time {max_time}s should be under 3s"
+ )
+
+ @pytest.mark.p3
+ def test_sequential_vs_concurrent_creation_comparison(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Compare sequential vs concurrent user creation performance."""
+ count: int = 10
+
+ # Sequential creation
+ sequential_start: float = time.time()
+ for i in range(count):
+ unique_email: str = f"seq_{i}_{uuid.uuid4().hex[:8]}@example.com"
+ create_user(
+ WebApiAuth,
+ {
+ "nickname": f"seq_user_{i}",
+ "email": unique_email,
+ "password": "test123",
+ },
+ )
+ sequential_duration: float = time.time() - sequential_start
+
+ # Concurrent creation
+ def create_concurrent_user(index: int) -> dict[str, Any]:
+ unique_email: str = f"conc_{index}_{uuid.uuid4().hex[:8]}@example.com"
+ return create_user(
+ WebApiAuth,
+ {
+ "nickname": f"conc_user_{index}",
+ "email": unique_email,
+ "password": "test123",
+ },
+ )
+
+ concurrent_start: float = time.time()
+ with ThreadPoolExecutor(max_workers=5) as executor:
+ futures: list[Future[dict[str, Any]]] = [
+ executor.submit(create_concurrent_user, i) for i in range(count)
+ ]
+ concurrent_results: list[dict[str, Any]] = [
+ f.result() for f in as_completed(futures)
+ ]
+ concurrent_duration: float = time.time() - concurrent_start
+
+ # Concurrent should be faster (or at least not significantly slower)
+ # Allow some overhead for thread management
+ speedup_ratio: float = sequential_duration / concurrent_duration
+
+ # Log performance metrics for analysis
+ print(f"\nPerformance Comparison ({count} users):")
+ print(f"Sequential: {sequential_duration:.2f}s")
+ print(f"Concurrent: {concurrent_duration:.2f}s")
+ print(f"Speedup: {speedup_ratio:.2f}x")
+
+ # Concurrent should provide some benefit (at least not be slower)
+ # With 5 workers, expect at least some improvement
+ assert concurrent_duration <= sequential_duration * 1.2, (
+ "Concurrent creation should not be significantly slower than sequential"
+ )
+
+ @pytest.mark.p3
+ def test_pagination_consistency_under_load(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test pagination consistency during concurrent modifications."""
+ # Create initial set of users
+ initial_count: int = 30
+ for i in range(initial_count):
+ unique_email: str = f"pag_{i}_{uuid.uuid4().hex[:8]}@example.com"
+ create_user(
+ WebApiAuth,
+ {
+ "nickname": f"user_{i}",
+ "email": unique_email,
+ "password": "test123",
+ },
+ )
+
+ # Test pagination while users are being created
+ def paginate_users() -> dict[str, Any]:
+ return list_users(WebApiAuth, params={"page": 1, "page_size": 10})
+
+ def create_more_users() -> None:
+ for i in range(5):
+ unique_email: str = f"new_{i}_{uuid.uuid4().hex[:8]}@example.com"
+ create_user(
+ WebApiAuth,
+ {
+ "nickname": f"new_user_{i}",
+ "email": unique_email,
+ "password": "test123",
+ },
+ )
+
+ with ThreadPoolExecutor(max_workers=3) as executor:
+ # Start pagination requests
+ pag_futures: list[Future] = [
+ executor.submit(paginate_users) for _ in range(5)
+ ]
+ # Start creation requests
+ create_future: Future = executor.submit(create_more_users)
+
+ # Wait for all to complete
+ pag_results: list[dict[str, Any]] = [
+ f.result() for f in pag_futures
+ ]
+ create_future.result()
+
+ # All pagination requests should succeed
+ assert all(r["code"] == 0 for r in pag_results), (
+ "Pagination should remain stable during concurrent modifications"
+ )
+
+ @pytest.mark.p3
+ def test_memory_efficiency_large_list(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test memory efficiency when listing many users."""
+ # Create 100 users
+ for i in range(100):
+ unique_email: str = f"mem_{i}_{uuid.uuid4().hex[:8]}@example.com"
+ create_user(
+ WebApiAuth,
+ {
+ "nickname": f"user_{i}",
+ "email": unique_email,
+ "password": "test123",
+ },
+ )
+
+ # List all users (without pagination)
+ res: dict[str, Any] = list_users(WebApiAuth)
+
+ assert res["code"] == 0, res
+ # Should return results without memory issues
+ assert isinstance(res["data"], list), "Should return list"
+ # Response should not be excessively large
+ # (This is a basic check; real memory profiling would need additional tools)
+
+ @pytest.mark.p3
+ @pytest.mark.skip(reason="Stress test - run manually")
+ def test_sustained_load(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test system stability under sustained load (manual run)."""
+ duration_seconds: int = 60 # Run for 1 minute
+ requests_per_second: int = 5
+
+ start_time: float = time.time()
+ request_count: int = 0
+ error_count: int = 0
+
+ while time.time() - start_time < duration_seconds:
+ batch_start: float = time.time()
+
+ # Send requests_per_second requests
+ for i in range(requests_per_second):
+ unique_email: str = f"load_{request_count}_{uuid.uuid4().hex[:8]}@example.com"
+ res: dict[str, Any] = create_user(
+ WebApiAuth,
+ {
+ "nickname": f"user_{request_count}",
+ "email": unique_email,
+ "password": "test123",
+ },
+ )
+ request_count += 1
+ if res["code"] != 0:
+ error_count += 1
+
+ # Wait to maintain requests_per_second rate
+ elapsed: float = time.time() - batch_start
+ sleep_time: float = 1.0 - elapsed
+ if sleep_time > 0:
+ time.sleep(sleep_time)
+
+ total_duration: float = time.time() - start_time
+ actual_rps: float = request_count / total_duration
+ error_rate: float = error_count / request_count if request_count > 0 else 0
+
+ print(f"\nSustained Load Test Results:")
+ print(f"Duration: {total_duration:.2f}s")
+ print(f"Total Requests: {request_count}")
+ print(f"Actual RPS: {actual_rps:.2f}")
+ print(f"Error Rate: {error_rate:.2%}")
+
+ # Error rate should be low
+ assert error_rate < 0.05, (
+ f"Error rate {error_rate:.2%} should be under 5%"
+ )
+
+ @pytest.mark.p3
+ def test_large_payload_handling(
+ self, WebApiAuth: RAGFlowWebApiAuth
+ ) -> None:
+ """Test handling of large request payloads."""
+ # Create user with large nickname (but within limits)
+ large_nickname: str = "A" * 200 # 200 characters
+ unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
+
+ start: float = time.time()
+ res: dict[str, Any] = create_user(
+ WebApiAuth,
+ {
+ "nickname": large_nickname,
+ "email": unique_email,
+ "password": "test123" * 10, # Longer password
+ },
+ )
+ duration: float = time.time() - start
+
+ # Should handle large payloads efficiently
+ assert duration < 2.0, (
+ f"Large payload took {duration}s, should be under 2s"
+ )
+
+ if res["code"] == 0:
+ # Verify data was stored correctly
+ assert len(res["data"]["nickname"]) <= 255, (
+ "Nickname should be capped at reasonable length"
+ )
diff --git a/test/testcases/test_http_api/test_user_management/test_user_security.py b/test/testcases/test_http_api/test_user_management/test_user_security.py
new file mode 100644
index 000000000..ab171cb5b
--- /dev/null
+++ b/test/testcases/test_http_api/test_user_management/test_user_security.py
@@ -0,0 +1,342 @@
+#
+# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Security-focused tests for user management APIs."""
+
+from __future__ import annotations
+
+import uuid
+from typing import Any
+
+import pytest
+
+from common import create_user
+from libs.auth import RAGFlowWebApiAuth
+
+
+@pytest.mark.security
+@pytest.mark.usefixtures("clear_users")
+class TestUserSecurity:
+ """Security-focused tests for user management."""
+
+ @pytest.mark.p1
+ @pytest.mark.parametrize(
+ "malicious_email",
+ [
+ "'; DROP TABLE users; --",
+ "admin@example.com' OR '1'='1",
+ "test@example.com'; UPDATE users SET is_superuser=true; --",
+ "admin' --",
+ "' OR 1=1 --",
+ ],
+ )
+ def test_sql_injection_in_email(
+ self, WebApiAuth: RAGFlowWebApiAuth, malicious_email: str
+ ) -> None:
+ """Test SQL injection attempts in email field are properly handled."""
+ payload: dict[str, str] = {
+ "nickname": "test",
+ "email": malicious_email,
+ "password": "test123",
+ }
+ res: dict[str, Any] = create_user(WebApiAuth, payload)
+ # Should fail validation, not execute SQL
+ assert res["code"] != 0, (
+ f"SQL injection attempt should be rejected: {malicious_email}"
+ )
+ assert "invalid" in res["message"].lower()
+
+ @pytest.mark.p1
+ @pytest.mark.parametrize(
+ "xss_payload",
+ [
+ "",
+ "
",
+ "javascript:alert('xss')",
+ "",
+ "<",
+ ],
+ )
+ def test_xss_in_nickname(
+ self, WebApiAuth: RAGFlowWebApiAuth, xss_payload: str
+ ) -> None:
+ """Test XSS attempts in nickname field are sanitized."""
+ unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
+ payload: dict[str, str] = {
+ "nickname": xss_payload,
+ "email": unique_email,
+ "password": "test123",
+ }
+ res: dict[str, Any] = create_user(WebApiAuth, payload)
+ if res["code"] == 0:
+ # Nickname should be sanitized
+ nickname: str = res["data"]["nickname"]
+ assert "