[OND211-2329]: Updated user management tests, fixed typings.

This commit is contained in:
Hetavi Shah 2025-11-18 13:26:32 +05:30
parent f3dd4bd8ed
commit bd83d24a08
7 changed files with 192 additions and 192 deletions

View file

@ -132,7 +132,7 @@ class TestUserCreate:
)
def test_required_fields(
self,
WebApiAuth,
web_api_auth,
payload: dict[str, Any],
expected_code: int,
expected_message: str,
@ -142,7 +142,7 @@ class TestUserCreate:
# Use unique email to avoid conflicts
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
payload["email"] = unique_email
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
assert res["code"] == expected_code, res
if expected_code == 0:
assert res["data"]["nickname"] == payload["nickname"]
@ -167,7 +167,7 @@ class TestUserCreate:
)
def test_email_validation(
self,
WebApiAuth,
web_api_auth,
email: str,
expected_code: int,
expected_message: str,
@ -181,7 +181,7 @@ class TestUserCreate:
"email": email,
"password": "test123",
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
assert res["code"] == expected_code, res
if expected_code == 0:
assert res["data"]["email"] == email
@ -201,7 +201,7 @@ class TestUserCreate:
)
def test_nickname(
self,
WebApiAuth,
web_api_auth,
nickname: str,
expected_code: int,
expected_message: str,
@ -213,7 +213,7 @@ class TestUserCreate:
"email": unique_email,
"password": "test123",
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
assert res["code"] == expected_code, res
if expected_code == 0:
assert res["data"]["nickname"] == nickname
@ -222,7 +222,7 @@ class TestUserCreate:
@pytest.mark.p1
def test_duplicate_email(
self, WebApiAuth
self, web_api_auth
) -> None:
"""Test that creating a user with duplicate email fails."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -231,7 +231,7 @@ class TestUserCreate:
"email": unique_email,
"password": "test123",
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
assert res["code"] == 0
# Try to create another user with the same email
@ -240,7 +240,7 @@ class TestUserCreate:
"email": unique_email,
"password": "test123",
}
res2: dict[str, Any] = create_user(WebApiAuth, payload2)
res2: dict[str, Any] = create_user(web_api_auth, payload2)
assert res2["code"] == 103
assert "has already registered" in res2["message"]
@ -255,7 +255,7 @@ class TestUserCreate:
)
def test_is_superuser(
self,
WebApiAuth,
web_api_auth,
is_superuser: bool | None,
expected_value: bool,
) -> None:
@ -269,13 +269,13 @@ class TestUserCreate:
if is_superuser is not None:
payload["is_superuser"] = is_superuser
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
assert res["code"] == 0
assert res["data"]["is_superuser"] == expected_value
@pytest.mark.p2
def test_password_hashing(
self, WebApiAuth
self, web_api_auth
) -> None:
"""Test that password is properly hashed when stored."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -285,7 +285,7 @@ class TestUserCreate:
"email": unique_email,
"password": password, # Plain text password
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
assert res["code"] == 0
# Password should be hashed in the response (not plain text)
assert "password" in res["data"], (
@ -299,7 +299,7 @@ class TestUserCreate:
@pytest.mark.p2
def test_plain_text_password_accepted(
self, WebApiAuth
self, web_api_auth
) -> None:
"""Test that plain text password is accepted."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -308,14 +308,14 @@ class TestUserCreate:
"email": unique_email,
"password": "plain_text_password", # Plain text, no encryption
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
# Should succeed with plain text password
assert res["code"] == 0
assert res["data"]["email"] == unique_email
@pytest.mark.p3
def test_concurrent_create(
self, WebApiAuth
self, web_api_auth
) -> None:
"""Test concurrent user creation with multiple threads."""
count: int = 10
@ -329,7 +329,7 @@ class TestUserCreate:
"password": "test123",
}
futures.append(
executor.submit(create_user, WebApiAuth, payload)
executor.submit(create_user, web_api_auth, payload)
)
responses: list[Future[dict[str, Any]]] = list(
as_completed(futures)
@ -341,7 +341,7 @@ class TestUserCreate:
@pytest.mark.p2
def test_user_creation_response_structure(
self, WebApiAuth
self, web_api_auth
) -> None:
"""Test that user creation returns the expected response structure."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -350,7 +350,7 @@ class TestUserCreate:
"email": unique_email,
"password": "test123",
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
assert res["code"] == 0
assert "data" in res
assert "id" in res["data"]

View file

@ -57,7 +57,7 @@ class TestAuthorization:
invalid_auth: RAGFlowWebApiAuth | None,
expected_code: int,
expected_message: str,
WebApiAuth: RAGFlowWebApiAuth,
web_api_auth: RAGFlowWebApiAuth,
) -> None:
"""Test user deletion with invalid or missing authentication."""
# Create a test user first
@ -67,7 +67,7 @@ class TestAuthorization:
"email": unique_email,
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(WebApiAuth, create_payload)
create_res: dict[str, Any] = create_user(web_api_auth, create_payload)
if create_res["code"] != 0:
pytest.skip("User creation failed, skipping auth test")
@ -83,7 +83,7 @@ class TestAuthorization:
@pytest.mark.p1
def test_user_can_only_delete_themselves(
self,
WebApiAuth: RAGFlowWebApiAuth,
web_api_auth: RAGFlowWebApiAuth,
) -> None:
"""Test that users can only delete their own account."""
# Create another user
@ -93,7 +93,7 @@ class TestAuthorization:
"email": unique_email,
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(WebApiAuth, create_payload)
create_res: dict[str, Any] = create_user(web_api_auth, create_payload)
assert create_res["code"] == 0, "Failed to create second user"
other_user_id: str = create_res["data"]["id"]
@ -101,7 +101,7 @@ class TestAuthorization:
delete_payload: dict[str, Any] = {
"user_id": other_user_id,
}
res: dict[str, Any] = delete_user(WebApiAuth, delete_payload)
res: dict[str, Any] = delete_user(web_api_auth, delete_payload)
assert res["code"] == 403, f"Expected 403 FORBIDDEN, got {res}"
assert "only delete your own account" in res["message"].lower()
@ -112,50 +112,50 @@ class TestUserDelete:
@pytest.mark.p1
def test_delete_user_missing_identifier(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test deletion without user_id or email."""
delete_payload: dict[str, str] = {}
res: dict[str, Any] = delete_user(WebApiAuth, delete_payload)
res: dict[str, Any] = delete_user(web_api_auth, delete_payload)
assert res["code"] == 101
assert "Either user_id or email must be provided" in res["message"]
@pytest.mark.p1
def test_delete_user_not_found_by_id(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test deletion of non-existent user by ID."""
delete_payload: dict[str, str] = {
"user_id": "non_existent_user_id_12345",
}
res: dict[str, Any] = delete_user(WebApiAuth, delete_payload)
res: dict[str, Any] = delete_user(web_api_auth, delete_payload)
assert res["code"] == 102
assert "User not found" in res["message"]
@pytest.mark.p1
def test_delete_user_invalid_email_format(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test deletion with invalid email format."""
delete_payload: dict[str, str] = {"email": "invalid_email_format"}
res: dict[str, Any] = delete_user(WebApiAuth, delete_payload)
res: dict[str, Any] = delete_user(web_api_auth, delete_payload)
assert res["code"] == 103
assert "Invalid email address" in res["message"]
@pytest.mark.p3
def test_delete_user_idempotency(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test that deleting a non-existent user returns consistent error."""
non_existent_id: str = f"nonexistent_{uuid.uuid4().hex[:16]}"
# First attempt
delete_payload: dict[str, str] = {"user_id": non_existent_id}
res1: dict[str, Any] = delete_user(WebApiAuth, delete_payload)
res1: dict[str, Any] = delete_user(web_api_auth, delete_payload)
assert res1["code"] == 102
# Second attempt (should return same error)
res2: dict[str, Any] = delete_user(WebApiAuth, delete_payload)
res2: dict[str, Any] = delete_user(web_api_auth, delete_payload)
assert res2["code"] == 102
assert res1["code"] == res2["code"]

View file

@ -71,17 +71,17 @@ class TestUserList:
# @pytest.mark.p1
# def test_list_empty_users(
# self, HttpApiAuth: RAGFlowHttpApiAuth
# self, http_api_auth: RAGFlowHttpApiAuth
# ) -> None:
# """Test listing users when no users exist."""
# res: dict[str, Any] = list_users(HttpApiAuth)
# res: dict[str, Any] = list_users(http_api_auth)
# assert res["code"] == 0, res
# assert isinstance(res["data"], list)
# assert len(res["data"]) == 0
@pytest.mark.p1
def test_list_single_user(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test listing a single user."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -90,12 +90,12 @@ class TestUserList:
"email": unique_email,
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(WebApiAuth, create_payload)
create_res: dict[str, Any] = create_user(web_api_auth, create_payload)
# Skip if creation fails (password encryption issue in test)
if create_res["code"] != 0:
pytest.skip("User creation failed, skipping list test")
list_res: dict[str, Any] = list_users(WebApiAuth)
list_res: dict[str, Any] = list_users(web_api_auth)
assert list_res["code"] == 0, list_res
assert isinstance(list_res["data"], list)
assert len(list_res["data"]) >= 1
@ -105,7 +105,7 @@ class TestUserList:
@pytest.mark.p1
def test_list_multiple_users(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test listing multiple users."""
created_emails: list[str] = []
@ -117,7 +117,7 @@ class TestUserList:
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(
WebApiAuth, create_payload
web_api_auth, create_payload
)
if create_res["code"] == 0:
created_emails.append(unique_email)
@ -125,7 +125,7 @@ class TestUserList:
if not created_emails:
pytest.skip("No users created, skipping list test")
list_res: dict[str, Any] = list_users(WebApiAuth)
list_res: dict[str, Any] = list_users(web_api_auth)
assert list_res["code"] == 0, list_res
assert isinstance(list_res["data"], list)
assert len(list_res["data"]) >= len(created_emails)
@ -136,7 +136,7 @@ class TestUserList:
@pytest.mark.p1
def test_list_users_with_email_filter(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test listing users filtered by email."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -145,13 +145,13 @@ class TestUserList:
"email": unique_email,
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(WebApiAuth, create_payload)
create_res: dict[str, Any] = create_user(web_api_auth, create_payload)
if create_res["code"] != 0:
pytest.skip("User creation failed, skipping filter test")
# List with email filter
params: dict[str, str] = {"email": unique_email}
list_res: dict[str, Any] = list_users(WebApiAuth, params=params)
list_res: dict[str, Any] = list_users(web_api_auth, params=params)
assert list_res["code"] == 0, list_res
assert isinstance(list_res["data"], list)
assert len(list_res["data"]) >= 1
@ -161,22 +161,22 @@ class TestUserList:
@pytest.mark.p1
def test_list_users_with_invalid_email_filter(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test listing users with invalid email filter."""
params: dict[str, str] = {"email": "invalid_email_format"}
list_res: dict[str, Any] = list_users(WebApiAuth, params=params)
list_res: dict[str, Any] = list_users(web_api_auth, params=params)
assert list_res["code"] != 0
assert "Invalid email address" in list_res["message"]
@pytest.mark.p1
def test_list_users_with_nonexistent_email_filter(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test listing users with non-existent email filter."""
nonexistent_email: str = f"nonexistent_{uuid.uuid4().hex[:8]}@example.com"
params: dict[str, str] = {"email": nonexistent_email}
list_res: dict[str, Any] = list_users(WebApiAuth, params=params)
list_res: dict[str, Any] = list_users(web_api_auth, params=params)
assert list_res["code"] == 0, list_res
assert isinstance(list_res["data"], list)
assert len(list_res["data"]) == 0
@ -196,7 +196,7 @@ class TestUserList:
)
def test_list_users_with_pagination(
self,
WebApiAuth: RAGFlowWebApiAuth,
web_api_auth: RAGFlowWebApiAuth,
page: int,
page_size: int,
expected_valid: bool,
@ -212,7 +212,7 @@ class TestUserList:
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(
WebApiAuth, create_payload
web_api_auth, create_payload
)
if create_res["code"] == 0:
created_count += 1
@ -221,7 +221,7 @@ class TestUserList:
pytest.skip("No users created, skipping pagination test")
params: dict[str, int] = {"page": page, "page_size": page_size}
list_res: dict[str, Any] = list_users(WebApiAuth, params=params)
list_res: dict[str, Any] = list_users(web_api_auth, params=params)
if expected_valid:
assert list_res["code"] == 0, list_res
@ -234,7 +234,7 @@ class TestUserList:
@pytest.mark.p1
def test_list_users_pagination_boundaries(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test pagination boundary conditions."""
# Create 5 users with a unique email pattern for filtering
@ -248,7 +248,7 @@ class TestUserList:
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(
WebApiAuth, create_payload
web_api_auth, create_payload
)
if create_res["code"] == 0:
created_emails.append(unique_email)
@ -257,18 +257,18 @@ class TestUserList:
pytest.skip("Not enough users created, skipping boundary test")
# Get total count of all users to calculate pagination boundaries
list_res_all: dict[str, Any] = list_users(WebApiAuth)
list_res_all: dict[str, Any] = list_users(web_api_auth)
total_users: int = len(list_res_all["data"])
# Test first page
params: dict[str, int] = {"page": 1, "page_size": 2}
list_res: dict[str, Any] = list_users(WebApiAuth, params=params)
list_res: dict[str, Any] = list_users(web_api_auth, params=params)
assert list_res["code"] == 0, list_res
assert len(list_res["data"]) == 2
# Test that pagination returns consistent page sizes
params = {"page": 2, "page_size": 2}
list_res = list_users(WebApiAuth, params=params)
list_res = list_users(web_api_auth, params=params)
assert list_res["code"] == 0, list_res
assert len(list_res["data"]) == 2
@ -278,23 +278,23 @@ class TestUserList:
last_page: int = (total_users + page_size - 1) // page_size
if last_page > 0:
params = {"page": last_page, "page_size": page_size}
list_res = list_users(WebApiAuth, params=params)
list_res = list_users(web_api_auth, params=params)
assert list_res["code"] == 0, list_res
assert len(list_res["data"]) <= page_size
# Test page beyond available data
# Use a page number that's definitely beyond available data
params = {"page": total_users + 10, "page_size": 2}
list_res = list_users(WebApiAuth, params=params)
list_res = list_users(web_api_auth, params=params)
assert list_res["code"] == 0, list_res
assert len(list_res["data"]) == 0
@pytest.mark.p1
def test_list_users_response_structure(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test that user listing returns the expected response structure."""
res: dict[str, Any] = list_users(WebApiAuth)
res: dict[str, Any] = list_users(web_api_auth)
assert res["code"] == 0
assert "data" in res
assert isinstance(res["data"], list)
@ -311,24 +311,24 @@ class TestUserList:
@pytest.mark.p1
def test_list_users_with_invalid_page_params(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test listing users with invalid pagination parameters."""
# Test invalid page (non-integer)
params: dict[str, str] = {"page": "invalid", "page_size": "10"}
list_res: dict[str, Any] = list_users(WebApiAuth, params=params)
list_res: dict[str, Any] = list_users(web_api_auth, params=params)
# Should handle gracefully or return error
# The exact behavior depends on implementation
assert "code" in list_res
# Test invalid page_size (non-integer)
params = {"page": "1", "page_size": "invalid"}
list_res = list_users(WebApiAuth, params=params)
list_res = list_users(web_api_auth, params=params)
assert "code" in list_res
@pytest.mark.p2
def test_list_users_combined_filters(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test listing users with combined filters."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -337,7 +337,7 @@ class TestUserList:
"email": unique_email,
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(WebApiAuth, create_payload)
create_res: dict[str, Any] = create_user(web_api_auth, create_payload)
if create_res["code"] != 0:
pytest.skip("User creation failed, skipping combined filter test")
@ -347,7 +347,7 @@ class TestUserList:
"page": 1,
"page_size": 10,
}
list_res: dict[str, Any] = list_users(WebApiAuth, params=params)
list_res: dict[str, Any] = list_users(web_api_auth, params=params)
assert list_res["code"] == 0, list_res
assert isinstance(list_res["data"], list)
# Should return at least the created user
@ -355,7 +355,7 @@ class TestUserList:
@pytest.mark.p2
def test_list_users_performance_with_many_users(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test listing performance with multiple users."""
# Create several users
@ -368,7 +368,7 @@ class TestUserList:
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(
WebApiAuth, create_payload
web_api_auth, create_payload
)
if create_res["code"] == 0:
created_count += 1
@ -377,7 +377,7 @@ class TestUserList:
pytest.skip("No users created, skipping performance test")
# List all users
list_res: dict[str, Any] = list_users(WebApiAuth)
list_res: dict[str, Any] = list_users(web_api_auth)
assert list_res["code"] == 0, list_res
assert isinstance(list_res["data"], list)
# Should return at least the created users

View file

@ -71,7 +71,7 @@ class TestAuthorization:
@pytest.mark.p1
def test_user_can_only_update_themselves(
self,
WebApiAuth: RAGFlowWebApiAuth,
web_api_auth: RAGFlowWebApiAuth,
test_user: dict[str, Any],
) -> None:
"""Test that users can only update their own account."""
@ -82,7 +82,7 @@ class TestAuthorization:
"email": unique_email,
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(WebApiAuth, create_payload)
create_res: dict[str, Any] = create_user(web_api_auth, create_payload)
assert create_res["code"] == 0, "Failed to create second user"
other_user_id: str = create_res["data"]["id"]
@ -91,7 +91,7 @@ class TestAuthorization:
"user_id": other_user_id,
"nickname": "hacked_nickname",
}
res: dict[str, Any] = update_user(WebApiAuth, payload)
res: dict[str, Any] = update_user(web_api_auth, payload)
assert res["code"] == 403, f"Expected 403 FORBIDDEN, got {res}"
assert "only update your own account" in res["message"].lower()
@ -105,13 +105,13 @@ class TestUserUpdate:
@pytest.mark.p1
def test_update_with_user_id(
self, WebApiAuth: RAGFlowWebApiAuth, test_user: dict[str, Any]
self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any]
) -> None:
payload: dict[str, Any] = {
"user_id": test_user["user_id"],
"nickname": "updated_nickname",
}
res: dict[str, Any] = update_user(WebApiAuth, payload)
res: dict[str, Any] = update_user(web_api_auth, payload)
assert res["code"] == 0, res
assert res["data"]["nickname"] == "updated_nickname"
assert res["data"]["email"] == test_user["email"]
@ -119,48 +119,48 @@ class TestUserUpdate:
@pytest.mark.p1
def test_update_with_email(
self, WebApiAuth: RAGFlowWebApiAuth, test_user: dict[str, Any]
self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any]
) -> None:
payload: dict[str, Any] = {
"email": test_user["email"],
"nickname": "updated_nickname_email",
}
res: dict[str, Any] = update_user(WebApiAuth, payload)
res: dict[str, Any] = update_user(web_api_auth, payload)
assert res["code"] == 0, res
assert res["data"]["nickname"] == "updated_nickname_email"
assert res["data"]["email"] == test_user["email"]
@pytest.mark.p1
def test_update_missing_identifier(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test update without user_id or email."""
payload: dict[str, str] = {"nickname": "updated_nickname"}
res: dict[str, Any] = update_user(WebApiAuth, payload)
res: dict[str, Any] = update_user(web_api_auth, payload)
assert res["code"] == 101
assert "Either user_id or email must be provided" in res["message"]
@pytest.mark.p1
def test_update_user_not_found_by_id(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
payload: dict[str, str] = {
"user_id": "non_existent_user_id_12345",
"nickname": "updated_nickname",
}
res: dict[str, Any] = update_user(WebApiAuth, payload)
res: dict[str, Any] = update_user(web_api_auth, payload)
assert res["code"] == 102
assert "User not found" in res["message"]
@pytest.mark.p1
def test_update_user_not_found_by_email(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
payload: dict[str, str] = {
"email": "nonexistent@example.com",
"nickname": "updated_nickname",
}
res: dict[str, Any] = update_user(WebApiAuth, payload)
res: dict[str, Any] = update_user(web_api_auth, payload)
assert res["code"] == 102
assert "not found" in res["message"]
@ -177,7 +177,7 @@ class TestUserUpdate:
)
def test_update_nickname(
self,
WebApiAuth: RAGFlowWebApiAuth,
web_api_auth: RAGFlowWebApiAuth,
test_user: dict[str, Any],
nickname: str,
expected_code: int,
@ -187,7 +187,7 @@ class TestUserUpdate:
"user_id": test_user["user_id"],
"nickname": nickname,
}
res: dict[str, Any] = update_user(WebApiAuth, payload)
res: dict[str, Any] = update_user(web_api_auth, payload)
assert res["code"] == expected_code, res
if expected_code == 0:
assert res["data"]["nickname"] == nickname
@ -196,26 +196,26 @@ class TestUserUpdate:
@pytest.mark.p1
def test_update_password(
self, WebApiAuth: RAGFlowWebApiAuth, test_user: dict[str, Any]
self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any]
) -> None:
new_password: str = "new_password_456"
payload: dict[str, str] = {
"user_id": test_user["user_id"],
"password": encrypt_password(new_password),
}
res: dict[str, Any] = update_user(WebApiAuth, payload)
res: dict[str, Any] = update_user(web_api_auth, payload)
assert res["code"] == 0, res
assert "updated successfully" in res["message"].lower()
@pytest.mark.p1
def test_update_password_invalid_encryption(
self, WebApiAuth: RAGFlowWebApiAuth, test_user: dict[str, Any]
self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any]
) -> None:
payload: dict[str, str] = {
"user_id": test_user["user_id"],
"password": "plain_text_password",
}
res: dict[str, Any] = update_user(WebApiAuth, payload)
res: dict[str, Any] = update_user(web_api_auth, payload)
assert res["code"] == 500
assert "Fail to decrypt password" in res["message"]
@ -235,7 +235,7 @@ class TestUserUpdate:
)
def test_update_email(
self,
WebApiAuth: RAGFlowWebApiAuth,
web_api_auth: RAGFlowWebApiAuth,
test_user: dict[str, Any],
new_email: str,
expected_code: int,
@ -247,7 +247,7 @@ class TestUserUpdate:
"user_id": test_user["user_id"],
"new_email": new_email,
}
res: dict[str, Any] = update_user(WebApiAuth, payload)
res: dict[str, Any] = update_user(web_api_auth, payload)
assert res["code"] == expected_code, res
if expected_code == 0:
assert res["data"]["email"] == new_email
@ -256,7 +256,7 @@ class TestUserUpdate:
@pytest.mark.p1
def test_update_email_duplicate(
self, WebApiAuth: RAGFlowWebApiAuth, test_user: dict[str, Any]
self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any]
) -> None:
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
create_payload: dict[str, str] = {
@ -264,14 +264,14 @@ class TestUserUpdate:
"email": unique_email,
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(WebApiAuth, create_payload)
create_res: dict[str, Any] = create_user(web_api_auth, create_payload)
assert create_res["code"] == 0
update_payload: dict[str, str] = {
"user_id": test_user["user_id"],
"new_email": unique_email,
}
res: dict[str, Any] = update_user(WebApiAuth, update_payload)
res: dict[str, Any] = update_user(web_api_auth, update_payload)
assert res["code"] == 103
assert "already in use" in res["message"]
@ -282,7 +282,7 @@ class TestUserUpdate:
)
def test_update_is_superuser(
self,
WebApiAuth: RAGFlowWebApiAuth,
web_api_auth: RAGFlowWebApiAuth,
test_user: dict[str, Any],
is_superuser: bool,
expected_value: bool,
@ -291,13 +291,13 @@ class TestUserUpdate:
"user_id": test_user["user_id"],
"is_superuser": is_superuser,
}
res: dict[str, Any] = update_user(WebApiAuth, payload)
res: dict[str, Any] = update_user(web_api_auth, payload)
assert res["code"] == 0, res
assert res["data"]["is_superuser"] is expected_value
@pytest.mark.p1
def test_update_multiple_fields(
self, WebApiAuth: RAGFlowWebApiAuth, test_user: dict[str, Any]
self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any]
) -> None:
new_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
payload: dict[str, Any] = {
@ -306,7 +306,7 @@ class TestUserUpdate:
"new_email": new_email,
"is_superuser": True,
}
res: dict[str, Any] = update_user(WebApiAuth, payload)
res: dict[str, Any] = update_user(web_api_auth, payload)
assert res["code"] == 0, res
assert res["data"]["nickname"] == "updated_multiple"
assert res["data"]["email"] == new_email
@ -314,35 +314,35 @@ class TestUserUpdate:
@pytest.mark.p1
def test_update_no_fields(
self, WebApiAuth: RAGFlowWebApiAuth, test_user: dict[str, Any]
self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any]
) -> None:
payload: dict[str, str] = {"user_id": test_user["user_id"]}
res: dict[str, Any] = update_user(WebApiAuth, payload)
res: dict[str, Any] = update_user(web_api_auth, payload)
assert res["code"] == 101
assert "No valid fields to update" in res["message"]
@pytest.mark.p1
def test_update_email_using_email_field_when_user_id_provided(
self, WebApiAuth: RAGFlowWebApiAuth, test_user: dict[str, Any]
self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any]
) -> None:
new_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
payload: dict[str, str] = {
"user_id": test_user["user_id"],
"email": new_email,
}
res: dict[str, Any] = update_user(WebApiAuth, payload)
res: dict[str, Any] = update_user(web_api_auth, payload)
assert res["code"] == 0, res
assert res["data"]["email"] == new_email
@pytest.mark.p2
def test_update_response_structure(
self, WebApiAuth: RAGFlowWebApiAuth, test_user: dict[str, Any]
self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any]
) -> None:
payload: dict[str, Any] = {
"user_id": test_user["user_id"],
"nickname": "response_test",
}
res: dict[str, Any] = update_user(WebApiAuth, payload)
res: dict[str, Any] = update_user(web_api_auth, payload)
assert res["code"] == 0
assert set(("id", "email", "nickname")) <= res["data"].keys()
assert res["data"]["nickname"] == "response_test"
@ -350,7 +350,7 @@ class TestUserUpdate:
@pytest.mark.p2
def test_concurrent_updates(
self, WebApiAuth: RAGFlowWebApiAuth, test_user: dict[str, Any]
self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any]
) -> None:
"""Test concurrent updates to the same user (users can only update themselves)."""
# Test concurrent updates to the authenticated user's own account
@ -358,7 +358,7 @@ class TestUserUpdate:
futures: list[Future[dict[str, Any]]] = [
executor.submit(
update_user,
WebApiAuth,
web_api_auth,
{
"user_id": test_user["user_id"],
"nickname": f"updated_user_{i}",
@ -373,7 +373,7 @@ class TestUserUpdate:
@pytest.mark.p3
def test_update_same_user_multiple_times(
self, WebApiAuth: RAGFlowWebApiAuth, test_user: dict[str, Any]
self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any]
) -> None:
"""Test repeated updates on the same user."""
for nickname in (
@ -385,6 +385,6 @@ class TestUserUpdate:
"user_id": test_user["user_id"],
"nickname": nickname,
}
res: dict[str, Any] = update_user(WebApiAuth, payload)
res: dict[str, Any] = update_user(web_api_auth, payload)
assert res["code"] == 0
assert res["data"]["nickname"] == nickname

View file

@ -33,7 +33,7 @@ class TestUserEdgeCases:
@pytest.mark.p2
def test_extremely_long_nickname(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test nickname with extreme length."""
long_nickname: str = "A" * 1000
@ -43,7 +43,7 @@ class TestUserEdgeCases:
"email": unique_email,
"password": "test123",
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
# Should either accept (and possibly truncate) or reject with clear error
if res["code"] == 0:
@ -61,7 +61,7 @@ class TestUserEdgeCases:
@pytest.mark.p2
def test_unicode_in_all_fields(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test unicode characters in various fields."""
# Use ASCII-safe email local part (unicode in display name)
@ -72,7 +72,7 @@ class TestUserEdgeCases:
"email": unique_email,
"password": "密码123!", # Chinese + ASCII
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
# Should handle unicode properly
if res["code"] == 0:
@ -83,7 +83,7 @@ class TestUserEdgeCases:
@pytest.mark.p2
def test_emoji_in_nickname(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test emoji characters in nickname."""
nickname_with_emoji: str = "Test User 😀🎉🔥"
@ -93,7 +93,7 @@ class TestUserEdgeCases:
"email": unique_email,
"password": "test123",
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
# Should handle emoji properly (either accept or reject gracefully)
if res["code"] == 0:
@ -114,7 +114,7 @@ class TestUserEdgeCases:
],
)
def test_special_characters_in_email(
self, WebApiAuth: RAGFlowWebApiAuth, special_email: str
self, web_api_auth: RAGFlowWebApiAuth, special_email: str
) -> None:
"""Test various special characters in email."""
payload: dict[str, str] = {
@ -122,13 +122,13 @@ class TestUserEdgeCases:
"email": special_email,
"password": "test123",
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
assert res["code"] == 0, f"Failed for email: {special_email}, {res}"
assert res["data"]["email"] == special_email
@pytest.mark.p2
def test_whitespace_handling_in_fields(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test whitespace handling in various fields."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -137,7 +137,7 @@ class TestUserEdgeCases:
"email": f" {unique_email} ", # Spaces around email
"password": "test123",
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
# Should handle whitespace (trim or accept)
if res["code"] == 0:
@ -151,7 +151,7 @@ class TestUserEdgeCases:
@pytest.mark.p2
def test_null_byte_in_input(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test null byte injection in input fields."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -160,7 +160,7 @@ class TestUserEdgeCases:
"email": unique_email,
"password": "test123",
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
# Should handle or reject null bytes
if res["code"] == 0:
@ -171,7 +171,7 @@ class TestUserEdgeCases:
@pytest.mark.p2
def test_very_long_email(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test email exceeding typical length limits."""
# Create email with very long local part (250 chars)
@ -182,7 +182,7 @@ class TestUserEdgeCases:
"email": email,
"password": "test123",
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
# Should reject overly long emails (RFC 5321 limits local part to 64 chars)
assert res["code"] != 0, "Very long email should be rejected"
@ -193,7 +193,7 @@ class TestUserEdgeCases:
@pytest.mark.p2
def test_email_with_multiple_at_signs(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test email with multiple @ signs."""
payload: dict[str, str] = {
@ -201,7 +201,7 @@ class TestUserEdgeCases:
"email": "user@@example.com",
"password": "test123",
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
# Should reject invalid email format
assert res["code"] != 0, "Email with multiple @ should be rejected"
@ -209,7 +209,7 @@ class TestUserEdgeCases:
@pytest.mark.p2
def test_email_with_spaces(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test email containing spaces."""
payload: dict[str, str] = {
@ -217,7 +217,7 @@ class TestUserEdgeCases:
"email": "user name@example.com",
"password": "test123",
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
# Should reject email with spaces
assert res["code"] != 0, "Email with spaces should be rejected"
@ -225,7 +225,7 @@ class TestUserEdgeCases:
@pytest.mark.p3
def test_leading_trailing_dots_in_email(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test email with leading/trailing dots."""
invalid_emails: list[str] = [
@ -240,14 +240,14 @@ class TestUserEdgeCases:
"email": email,
"password": "test123",
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
# These should be rejected as invalid
assert res["code"] != 0, f"Invalid email should be rejected: {email}"
@pytest.mark.p3
def test_empty_string_vs_none_in_optional_fields(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test difference between empty string and None for optional fields."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -258,7 +258,7 @@ class TestUserEdgeCases:
"email": unique_email,
"password": "test123",
}
res_empty: dict[str, Any] = create_user(WebApiAuth, payload_empty)
res_empty: dict[str, Any] = create_user(web_api_auth, payload_empty)
# Empty nickname should be accepted per current API behavior
assert res_empty["code"] == 0, "Empty nickname should be accepted"
@ -266,12 +266,12 @@ class TestUserEdgeCases:
@pytest.mark.p3
def test_pagination_with_no_results(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test list_users pagination when no users exist."""
# Assuming clear_users fixture has cleared all test users
res: dict[str, Any] = list_users(
WebApiAuth, params={"page": 1, "page_size": 10}
web_api_auth, params={"page": 1, "page_size": 10}
)
# Should return empty list, not error
@ -281,13 +281,13 @@ class TestUserEdgeCases:
@pytest.mark.p3
def test_pagination_beyond_available_pages(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test requesting page beyond available data."""
# Create one user
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
create_user(
WebApiAuth,
web_api_auth,
{
"nickname": "test",
"email": unique_email,
@ -297,7 +297,7 @@ class TestUserEdgeCases:
# Request page 100
res: dict[str, Any] = list_users(
WebApiAuth, params={"page": 100, "page_size": 10}
web_api_auth, params={"page": 100, "page_size": 10}
)
# Should return empty results, not error
@ -306,11 +306,11 @@ class TestUserEdgeCases:
@pytest.mark.p3
def test_zero_page_size(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test pagination with page_size of 0."""
res: dict[str, Any] = list_users(
WebApiAuth, params={"page": 1, "page_size": 0}
web_api_auth, params={"page": 1, "page_size": 0}
)
# Should reject invalid page size
@ -319,11 +319,11 @@ class TestUserEdgeCases:
@pytest.mark.p3
def test_negative_page_number(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test pagination with negative page number."""
res: dict[str, Any] = list_users(
WebApiAuth, params={"page": -1, "page_size": 10}
web_api_auth, params={"page": -1, "page_size": 10}
)
# Should reject negative page number
@ -332,11 +332,11 @@ class TestUserEdgeCases:
@pytest.mark.p3
def test_excessive_page_size(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test pagination with very large page_size."""
res: dict[str, Any] = list_users(
WebApiAuth, params={"page": 1, "page_size": 10000}
web_api_auth, params={"page": 1, "page_size": 10000}
)
# Should either cap page size or reject
@ -349,7 +349,7 @@ class TestUserEdgeCases:
@pytest.mark.p3
def test_special_characters_in_password(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test password with various special characters."""
special_passwords: list[str] = [
@ -367,7 +367,7 @@ class TestUserEdgeCases:
"email": unique_email,
"password": password,
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
# Should accept special characters in password
assert res["code"] == 0, (
@ -376,7 +376,7 @@ class TestUserEdgeCases:
@pytest.mark.p3
def test_json_injection_in_fields(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test JSON injection attempts."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -385,7 +385,7 @@ class TestUserEdgeCases:
"email": unique_email,
"password": "test123",
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
# Should treat as literal string, not parse as JSON
if res["code"] == 0:
@ -395,7 +395,7 @@ class TestUserEdgeCases:
@pytest.mark.p3
def test_path_traversal_in_nickname(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test path traversal attempts in nickname."""
traversal_attempts: list[str] = [
@ -411,7 +411,7 @@ class TestUserEdgeCases:
"email": unique_email,
"password": "test123",
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
# Should either reject or sanitize path traversal attempts
# At minimum, should not allow actual file system access

View file

@ -35,7 +35,7 @@ class TestUserPerformance:
@pytest.mark.p2
def test_list_users_performance_small_dataset(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test list_users performance with small dataset."""
# Create 20 users
@ -43,7 +43,7 @@ class TestUserPerformance:
for i in range(20):
unique_email: str = f"perf_small_{i}_{uuid.uuid4().hex[:4]}@example.com"
res: dict[str, Any] = create_user(
WebApiAuth,
web_api_auth,
{
"nickname": f"user_{i}",
"email": unique_email,
@ -55,7 +55,7 @@ class TestUserPerformance:
# Test list performance without pagination
start: float = time.time()
res: dict[str, Any] = list_users(WebApiAuth)
res: dict[str, Any] = list_users(web_api_auth)
duration: float = time.time() - start
assert res["code"] == 0, res
@ -65,14 +65,14 @@ class TestUserPerformance:
@pytest.mark.p2
def test_list_users_pagination_performance(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test pagination performance with moderate dataset."""
# Create 50 users
for i in range(50):
unique_email: str = f"perf_test_{i}_{uuid.uuid4().hex[:4]}@example.com"
create_user(
WebApiAuth,
web_api_auth,
{
"nickname": f"user_{i}",
"email": unique_email,
@ -83,7 +83,7 @@ class TestUserPerformance:
# Test pagination performance
start: float = time.time()
res: dict[str, Any] = list_users(
WebApiAuth, params={"page": 1, "page_size": 10}
web_api_auth, params={"page": 1, "page_size": 10}
)
duration: float = time.time() - start
@ -95,7 +95,7 @@ class TestUserPerformance:
@pytest.mark.p3
def test_concurrent_user_creation(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test concurrent user creation without conflicts."""
count: int = 20
@ -103,7 +103,7 @@ class TestUserPerformance:
def create_test_user(index: int) -> dict[str, Any]:
unique_email: str = f"concurrent_{index}_{uuid.uuid4().hex[:8]}@example.com"
return create_user(
WebApiAuth,
web_api_auth,
{
"nickname": f"user_{index}",
"email": unique_email,
@ -136,7 +136,7 @@ class TestUserPerformance:
@pytest.mark.p3
def test_user_creation_response_time(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test individual user creation response time."""
response_times: list[float] = []
@ -145,7 +145,7 @@ class TestUserPerformance:
unique_email: str = f"timing_{i}_{uuid.uuid4().hex[:8]}@example.com"
start: float = time.time()
res: dict[str, Any] = create_user(
WebApiAuth,
web_api_auth,
{
"nickname": f"user_{i}",
"email": unique_email,
@ -172,7 +172,7 @@ class TestUserPerformance:
@pytest.mark.p3
def test_sequential_vs_concurrent_creation_comparison(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Compare sequential vs concurrent user creation performance."""
count: int = 10
@ -182,7 +182,7 @@ class TestUserPerformance:
for i in range(count):
unique_email: str = f"seq_{i}_{uuid.uuid4().hex[:8]}@example.com"
create_user(
WebApiAuth,
web_api_auth,
{
"nickname": f"seq_user_{i}",
"email": unique_email,
@ -195,7 +195,7 @@ class TestUserPerformance:
def create_concurrent_user(index: int) -> dict[str, Any]:
unique_email: str = f"conc_{index}_{uuid.uuid4().hex[:8]}@example.com"
return create_user(
WebApiAuth,
web_api_auth,
{
"nickname": f"conc_user_{index}",
"email": unique_email,
@ -231,7 +231,7 @@ class TestUserPerformance:
@pytest.mark.p3
def test_pagination_consistency_under_load(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test pagination consistency during concurrent modifications."""
# Create initial set of users
@ -239,7 +239,7 @@ class TestUserPerformance:
for i in range(initial_count):
unique_email: str = f"pag_{i}_{uuid.uuid4().hex[:8]}@example.com"
create_user(
WebApiAuth,
web_api_auth,
{
"nickname": f"user_{i}",
"email": unique_email,
@ -249,13 +249,13 @@ class TestUserPerformance:
# Test pagination while users are being created
def paginate_users() -> dict[str, Any]:
return list_users(WebApiAuth, params={"page": 1, "page_size": 10})
return list_users(web_api_auth, params={"page": 1, "page_size": 10})
def create_more_users() -> None:
for i in range(5):
unique_email: str = f"new_{i}_{uuid.uuid4().hex[:8]}@example.com"
create_user(
WebApiAuth,
web_api_auth,
{
"nickname": f"new_user_{i}",
"email": unique_email,
@ -284,14 +284,14 @@ class TestUserPerformance:
@pytest.mark.p3
def test_memory_efficiency_large_list(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test memory efficiency when listing many users."""
# Create 100 users
for i in range(100):
unique_email: str = f"mem_{i}_{uuid.uuid4().hex[:8]}@example.com"
create_user(
WebApiAuth,
web_api_auth,
{
"nickname": f"user_{i}",
"email": unique_email,
@ -300,7 +300,7 @@ class TestUserPerformance:
)
# List all users (without pagination)
res: dict[str, Any] = list_users(WebApiAuth)
res: dict[str, Any] = list_users(web_api_auth)
assert res["code"] == 0, res
# Should return results without memory issues
@ -311,7 +311,7 @@ class TestUserPerformance:
@pytest.mark.p3
@pytest.mark.skip(reason="Stress test - run manually")
def test_sustained_load(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test system stability under sustained load (manual run)."""
duration_seconds: int = 60 # Run for 1 minute
@ -328,7 +328,7 @@ class TestUserPerformance:
for i in range(requests_per_second):
unique_email: str = f"load_{request_count}_{uuid.uuid4().hex[:8]}@example.com"
res: dict[str, Any] = create_user(
WebApiAuth,
web_api_auth,
{
"nickname": f"user_{request_count}",
"email": unique_email,
@ -362,7 +362,7 @@ class TestUserPerformance:
@pytest.mark.p3
def test_large_payload_handling(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test handling of large request payloads."""
# Create user with large nickname (but within limits)
@ -371,7 +371,7 @@ class TestUserPerformance:
start: float = time.time()
res: dict[str, Any] = create_user(
WebApiAuth,
web_api_auth,
{
"nickname": large_nickname,
"email": unique_email,

View file

@ -43,7 +43,7 @@ class TestUserSecurity:
],
)
def test_sql_injection_in_email(
self, WebApiAuth: RAGFlowWebApiAuth, malicious_email: str
self, web_api_auth: RAGFlowWebApiAuth, malicious_email: str
) -> None:
"""Test SQL injection attempts in email field are properly handled."""
payload: dict[str, str] = {
@ -51,7 +51,7 @@ class TestUserSecurity:
"email": malicious_email,
"password": "test123",
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
# Should fail validation, not execute SQL
assert res["code"] != 0, (
f"SQL injection attempt should be rejected: {malicious_email}"
@ -70,7 +70,7 @@ class TestUserSecurity:
],
)
def test_xss_in_nickname(
self, WebApiAuth: RAGFlowWebApiAuth, xss_payload: str
self, web_api_auth: RAGFlowWebApiAuth, xss_payload: str
) -> None:
"""Test XSS attempts in nickname field are sanitized."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -79,7 +79,7 @@ class TestUserSecurity:
"email": unique_email,
"password": "test123",
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
if res["code"] == 0:
# Nickname should be sanitized
nickname: str = res["data"]["nickname"]
@ -95,7 +95,7 @@ class TestUserSecurity:
@pytest.mark.p1
def test_password_not_in_response(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Ensure plain password never appears in response."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -105,7 +105,7 @@ class TestUserSecurity:
"email": unique_email,
"password": password,
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
assert res["code"] == 0, res
# Response should contain hashed password, not plain text
response_str: str = str(res["data"])
@ -134,7 +134,7 @@ class TestUserSecurity:
],
)
def test_weak_password_handling(
self, WebApiAuth: RAGFlowWebApiAuth, weak_password: str
self, web_api_auth: RAGFlowWebApiAuth, weak_password: str
) -> None:
"""Test handling of weak passwords."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -143,7 +143,7 @@ class TestUserSecurity:
"email": unique_email,
"password": weak_password,
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
# Should reject empty/whitespace passwords at minimum
if not weak_password or not weak_password.strip():
assert res["code"] != 0, (
@ -152,10 +152,10 @@ class TestUserSecurity:
@pytest.mark.p2
def test_unauthorized_superuser_creation(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test that regular users cannot escalate privileges."""
# Note: This test assumes WebApiAuth represents a regular user
# Note: This test assumes web_api_auth represents a regular user
# In production, only admins should be able to create superusers
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
payload: dict[str, Any] = {
@ -164,7 +164,7 @@ class TestUserSecurity:
"password": "test123",
"is_superuser": True,
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
# The API currently allows this, but in production this should be restricted
# For now, we document the expected behavior
@ -175,7 +175,7 @@ class TestUserSecurity:
@pytest.mark.p2
def test_password_hashing_is_secure(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Verify passwords are hashed using secure algorithm."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -185,7 +185,7 @@ class TestUserSecurity:
"email": unique_email,
"password": password,
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
assert res["code"] == 0, res
# Check password is hashed
@ -213,7 +213,7 @@ class TestUserSecurity:
],
)
def test_control_character_injection(
self, WebApiAuth: RAGFlowWebApiAuth, injection_attempt: dict[str, str]
self, web_api_auth: RAGFlowWebApiAuth, injection_attempt: dict[str, str]
) -> None:
"""Test protection against control character injection."""
# Complete the payload with required fields
@ -224,7 +224,7 @@ class TestUserSecurity:
}
payload.update(injection_attempt)
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
# Should either reject or sanitize control characters
if res["code"] == 0:
@ -242,7 +242,7 @@ class TestUserSecurity:
@pytest.mark.p3
def test_session_token_security(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test that access tokens are properly secured."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -251,7 +251,7 @@ class TestUserSecurity:
"email": unique_email,
"password": "test123",
}
res: dict[str, Any] = create_user(WebApiAuth, payload)
res: dict[str, Any] = create_user(web_api_auth, payload)
assert res["code"] == 0, res
# Check that access_token exists and is properly formatted
@ -267,7 +267,7 @@ class TestUserSecurity:
@pytest.mark.p3
def test_email_case_sensitivity(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test email uniqueness is case-insensitive."""
base_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -278,7 +278,7 @@ class TestUserSecurity:
"email": base_email.lower(),
"password": "test123",
}
res1: dict[str, Any] = create_user(WebApiAuth, payload1)
res1: dict[str, Any] = create_user(web_api_auth, payload1)
assert res1["code"] == 0, res1
# Try to create another user with uppercase version of same email
@ -287,7 +287,7 @@ class TestUserSecurity:
"email": base_email.upper(),
"password": "test123",
}
res2: dict[str, Any] = create_user(WebApiAuth, payload2)
res2: dict[str, Any] = create_user(web_api_auth, payload2)
# Should reject duplicate email regardless of case
# Note: Current implementation may allow this, but it should be fixed
@ -296,7 +296,7 @@ class TestUserSecurity:
@pytest.mark.p3
def test_concurrent_user_creation_same_email(
self, WebApiAuth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test race condition protection for duplicate emails."""
import concurrent.futures
@ -305,7 +305,7 @@ class TestUserSecurity:
def create_with_email(index: int) -> dict[str, Any]:
return create_user(
WebApiAuth,
web_api_auth,
{
"nickname": f"test{index}",
"email": email,