[OND211-2329]: Updated user tests to remove test data from db.

This commit is contained in:
Hetavi Shah 2025-11-25 13:19:17 +05:30
parent f710354b5c
commit f307a18f55
8 changed files with 294 additions and 82 deletions

View file

@ -20,8 +20,10 @@ from __future__ import annotations
import base64
import os
import uuid
from pathlib import Path
from typing import Any
import importlib.util
import pytest
from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5
from Cryptodome.PublicKey import RSA
@ -30,6 +32,22 @@ from common import get_user_info
from libs.auth import RAGFlowWebApiAuth
# ---------------------------------------------------------------------------
# Import helpers from root test configuration
# ---------------------------------------------------------------------------
_root_conftest_path = (
Path(__file__).parent.parent.parent / "conftest.py"
)
_root_spec = importlib.util.spec_from_file_location(
"root_test_conftest", _root_conftest_path
)
_root_conftest_module = importlib.util.module_from_spec(_root_spec)
assert _root_spec.loader is not None
_root_spec.loader.exec_module(_root_conftest_module)
delete_user_from_db = _root_conftest_module.delete_user_from_db
# ---------------------------------------------------------------------------
# Utility Functions
# ---------------------------------------------------------------------------
@ -81,17 +99,26 @@ def generate_unique_email(prefix: str = "test") -> str:
@pytest.fixture(scope="function")
def clear_users(request, http_api_auth):
"""Fixture to clean up users created during tests."""
created_user_ids: list[str] = []
def clear_users(request: pytest.FixtureRequest) -> list[str]:
"""Fixture to clean up users created during tests.
Tests should append test user *emails* to the returned list. After each
test, we hard-delete those users directly from the database via the
shared `delete_user_from_db` helper in the root test ``conftest.py``.
"""
created_user_emails: list[str] = []
def cleanup() -> None:
# Clean up users if delete endpoint exists
# For now, we'll just track them
pass
for email in created_user_emails:
try:
delete_user_from_db(email)
except Exception as exc: # pragma: no cover - best-effort cleanup
print(
f"[clear_users] Failed to delete test user {email}: {exc}"
)
request.addfinalizer(cleanup)
return created_user_ids
return created_user_emails
@pytest.fixture(name="test_user")

View file

@ -49,7 +49,12 @@ class TestAuthorization:
),
],
)
def test_invalid_auth(self, invalid_auth, expected_code, expected_message):
def test_invalid_auth(
self,
invalid_auth: RAGFlowWebApiAuth | None,
expected_code: int,
expected_message: str,
) -> None:
"""Test user creation with invalid or missing authentication."""
# Use unique email to avoid conflicts
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -132,7 +137,8 @@ class TestUserCreate:
)
def test_required_fields(
self,
web_api_auth,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
payload: dict[str, Any],
expected_code: int,
expected_message: str,
@ -145,6 +151,7 @@ class TestUserCreate:
res: dict[str, Any] = create_user(web_api_auth, payload)
assert res["code"] == expected_code, res
if expected_code == 0:
clear_users.append(payload["email"])
assert res["data"]["nickname"] == payload["nickname"]
assert res["data"]["email"] == payload["email"]
else:
@ -167,7 +174,8 @@ class TestUserCreate:
)
def test_email_validation(
self,
web_api_auth,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
email: str,
expected_code: int,
expected_message: str,
@ -184,6 +192,7 @@ class TestUserCreate:
res: dict[str, Any] = create_user(web_api_auth, payload)
assert res["code"] == expected_code, res
if expected_code == 0:
clear_users.append(email)
assert res["data"]["email"] == email
else:
assert expected_message in res["message"]
@ -201,7 +210,8 @@ class TestUserCreate:
)
def test_nickname(
self,
web_api_auth,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
nickname: str,
expected_code: int,
expected_message: str,
@ -216,13 +226,16 @@ class TestUserCreate:
res: dict[str, Any] = create_user(web_api_auth, payload)
assert res["code"] == expected_code, res
if expected_code == 0:
clear_users.append(unique_email)
assert res["data"]["nickname"] == nickname
else:
assert expected_message in res["message"]
@pytest.mark.p1
def test_duplicate_email(
self, web_api_auth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test that creating a user with duplicate email fails."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -233,8 +246,10 @@ class TestUserCreate:
}
res: dict[str, Any] = create_user(web_api_auth, payload)
assert res["code"] == 0
# Track the created user for cleanup
clear_users.append(unique_email)
# Try to create another user with the same email
# Try to create another user with the same email (should fail)
payload2: dict[str, str] = {
"nickname": "test_user_2",
"email": unique_email,
@ -255,7 +270,8 @@ class TestUserCreate:
)
def test_is_superuser(
self,
web_api_auth,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
is_superuser: bool | None,
expected_value: bool,
) -> None:
@ -271,11 +287,14 @@ class TestUserCreate:
res: dict[str, Any] = create_user(web_api_auth, payload)
assert res["code"] == 0
clear_users.append(unique_email)
assert res["data"]["is_superuser"] == expected_value
@pytest.mark.p2
def test_password_hashing(
self, web_api_auth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test that password is properly hashed when stored."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -287,6 +306,7 @@ class TestUserCreate:
}
res: dict[str, Any] = create_user(web_api_auth, payload)
assert res["code"] == 0
clear_users.append(unique_email)
# Password should be hashed in the response (not plain text)
assert "password" in res["data"], (
f"Password field not found in response: {res['data'].keys()}"
@ -299,7 +319,9 @@ class TestUserCreate:
@pytest.mark.p2
def test_plain_text_password_accepted(
self, web_api_auth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test that plain text password is accepted."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -311,23 +333,29 @@ class TestUserCreate:
res: dict[str, Any] = create_user(web_api_auth, payload)
# Should succeed with plain text password
assert res["code"] == 0
clear_users.append(unique_email)
assert res["data"]["email"] == unique_email
@pytest.mark.p3
def test_concurrent_create(
self, web_api_auth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test concurrent user creation with multiple threads."""
count: int = 10
with ThreadPoolExecutor(max_workers=5) as executor:
futures: list[Future[dict[str, Any]]] = []
for i in range(count):
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
unique_email: str = (
f"test_{uuid.uuid4().hex[:8]}@example.com"
)
payload: dict[str, str] = {
"nickname": f"test_user_{i}",
"email": unique_email,
"password": "test123",
}
clear_users.append(unique_email)
futures.append(
executor.submit(create_user, web_api_auth, payload)
)
@ -341,7 +369,9 @@ class TestUserCreate:
@pytest.mark.p2
def test_user_creation_response_structure(
self, web_api_auth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test that user creation returns the expected response structure."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -352,6 +382,7 @@ class TestUserCreate:
}
res: dict[str, Any] = create_user(web_api_auth, payload)
assert res["code"] == 0
clear_users.append(unique_email)
assert "data" in res
assert "id" in res["data"]
assert "email" in res["data"]

View file

@ -20,7 +20,7 @@ from typing import Any
import pytest
from ..common import create_user, delete_user
from common import create_user, delete_user
from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowHttpApiAuth, RAGFlowWebApiAuth
@ -41,6 +41,7 @@ encrypt_password = conftest_module.encrypt_password
@pytest.mark.p1
@pytest.mark.usefixtures("clear_users")
class TestAuthorization:
"""Tests for authentication behavior during user deletion."""
@ -58,6 +59,7 @@ class TestAuthorization:
expected_code: int,
expected_message: str,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test user deletion with invalid or missing authentication."""
# Create a test user first
@ -67,10 +69,14 @@ class TestAuthorization:
"email": unique_email,
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(web_api_auth, create_payload)
create_res: dict[str, Any] = create_user(
web_api_auth, create_payload
)
if create_res["code"] != 0:
pytest.skip("User creation failed, skipping auth test")
clear_users.append(unique_email)
user_id: str = create_res["data"]["id"]
# Try to delete with invalid auth
@ -84,6 +90,7 @@ class TestAuthorization:
def test_user_can_only_delete_themselves(
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test that users can only delete their own account."""
# Create another user
@ -93,9 +100,12 @@ class TestAuthorization:
"email": unique_email,
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(web_api_auth, create_payload)
create_res: dict[str, Any] = create_user(
web_api_auth, create_payload
)
assert create_res["code"] == 0, "Failed to create second user"
other_user_id: str = create_res["data"]["id"]
clear_users.append(unique_email)
# Try to delete another user's account (should fail)
delete_payload: dict[str, Any] = {

View file

@ -20,7 +20,7 @@ from typing import Any
import pytest
from ..common import create_user, list_users
from common import create_user, list_users
from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowHttpApiAuth, RAGFlowWebApiAuth
@ -81,7 +81,9 @@ class TestUserList:
@pytest.mark.p1
def test_list_single_user(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str]
) -> None:
"""Test listing a single user."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -90,10 +92,13 @@ class TestUserList:
"email": unique_email,
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(web_api_auth, create_payload)
create_res: dict[str, Any] = create_user(
web_api_auth, create_payload
)
# Skip if creation fails (password encryption issue in test)
if create_res["code"] != 0:
pytest.skip("User creation failed, skipping list test")
clear_users.append(unique_email)
list_res: dict[str, Any] = list_users(web_api_auth)
assert list_res["code"] == 0, list_res
@ -105,7 +110,9 @@ class TestUserList:
@pytest.mark.p1
def test_list_multiple_users(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test listing multiple users."""
created_emails: list[str] = []
@ -121,6 +128,7 @@ class TestUserList:
)
if create_res["code"] == 0:
created_emails.append(unique_email)
clear_users.append(unique_email)
if not created_emails:
pytest.skip("No users created, skipping list test")
@ -136,7 +144,9 @@ class TestUserList:
@pytest.mark.p1
def test_list_users_with_email_filter(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test listing users filtered by email."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -145,9 +155,12 @@ class TestUserList:
"email": unique_email,
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(web_api_auth, create_payload)
create_res: dict[str, Any] = create_user(
web_api_auth, create_payload
)
if create_res["code"] != 0:
pytest.skip("User creation failed, skipping filter test")
clear_users.append(unique_email)
# List with email filter
params: dict[str, str] = {"email": unique_email}
@ -200,6 +213,7 @@ class TestUserList:
page: int,
page_size: int,
expected_valid: bool,
clear_users: list[str],
) -> None:
"""Test listing users with pagination."""
# Create some users first
@ -216,6 +230,7 @@ class TestUserList:
)
if create_res["code"] == 0:
created_count += 1
clear_users.append(unique_email)
if created_count == 0:
pytest.skip("No users created, skipping pagination test")
@ -234,7 +249,9 @@ class TestUserList:
@pytest.mark.p1
def test_list_users_pagination_boundaries(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test pagination boundary conditions."""
# Create 5 users with a unique email pattern for filtering
@ -252,6 +269,7 @@ class TestUserList:
)
if create_res["code"] == 0:
created_emails.append(unique_email)
clear_users.append(unique_email)
if len(created_emails) < 3:
pytest.skip("Not enough users created, skipping boundary test")
@ -328,7 +346,9 @@ class TestUserList:
@pytest.mark.p2
def test_list_users_combined_filters(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test listing users with combined filters."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -337,9 +357,12 @@ class TestUserList:
"email": unique_email,
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(web_api_auth, create_payload)
create_res: dict[str, Any] = create_user(
web_api_auth, create_payload
)
if create_res["code"] != 0:
pytest.skip("User creation failed, skipping combined filter test")
clear_users.append(unique_email)
# Test with email filter and pagination
params: dict[str, Any] = {
@ -355,7 +378,9 @@ class TestUserList:
@pytest.mark.p2
def test_list_users_performance_with_many_users(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test listing performance with multiple users."""
# Create several users
@ -372,6 +397,7 @@ class TestUserList:
)
if create_res["code"] == 0:
created_count += 1
clear_users.append(unique_email)
if created_count == 0:
pytest.skip("No users created, skipping performance test")

View file

@ -21,7 +21,7 @@ from typing import Any
import pytest
from ..common import create_user, update_user
from common import create_user, update_user
from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowHttpApiAuth, RAGFlowWebApiAuth
@ -41,6 +41,7 @@ encrypt_password = conftest_module.encrypt_password
# ---------------------------------------------------------------------------
@pytest.mark.p1
@pytest.mark.usefixtures("clear_users")
class TestAuthorization:
"""Tests for authentication behavior during user updates."""
@ -73,6 +74,7 @@ class TestAuthorization:
self,
web_api_auth: RAGFlowWebApiAuth,
test_user: dict[str, Any],
clear_users: list[str],
) -> None:
"""Test that users can only update their own account."""
# Create another user
@ -82,9 +84,12 @@ class TestAuthorization:
"email": unique_email,
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(web_api_auth, create_payload)
create_res: dict[str, Any] = create_user(
web_api_auth, create_payload
)
assert create_res["code"] == 0, "Failed to create second user"
other_user_id: str = create_res["data"]["id"]
clear_users.append(unique_email)
# Try to update another user's account (should fail)
payload: dict[str, Any] = {
@ -105,7 +110,9 @@ class TestUserUpdate:
@pytest.mark.p1
def test_update_with_user_id(
self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any]
self,
web_api_auth: RAGFlowWebApiAuth,
test_user: dict[str, Any],
) -> None:
payload: dict[str, Any] = {
"user_id": test_user["user_id"],
@ -119,7 +126,9 @@ class TestUserUpdate:
@pytest.mark.p1
def test_update_with_email(
self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any]
self,
web_api_auth: RAGFlowWebApiAuth,
test_user: dict[str, Any],
) -> None:
payload: dict[str, Any] = {
"email": test_user["email"],
@ -196,7 +205,9 @@ class TestUserUpdate:
@pytest.mark.p1
def test_update_password(
self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any]
self,
web_api_auth: RAGFlowWebApiAuth,
test_user: dict[str, Any],
) -> None:
new_password: str = "new_password_456"
payload: dict[str, str] = {
@ -209,7 +220,9 @@ class TestUserUpdate:
@pytest.mark.p1
def test_update_password_invalid_encryption(
self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any]
self,
web_api_auth: RAGFlowWebApiAuth,
test_user: dict[str, Any],
) -> None:
payload: dict[str, str] = {
"user_id": test_user["user_id"],
@ -256,7 +269,10 @@ class TestUserUpdate:
@pytest.mark.p1
def test_update_email_duplicate(
self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any]
self,
web_api_auth: RAGFlowWebApiAuth,
test_user: dict[str, Any],
clear_users: list[str],
) -> None:
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
create_payload: dict[str, str] = {
@ -264,8 +280,11 @@ class TestUserUpdate:
"email": unique_email,
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(web_api_auth, create_payload)
create_res: dict[str, Any] = create_user(
web_api_auth, create_payload
)
assert create_res["code"] == 0
clear_users.append(unique_email)
update_payload: dict[str, str] = {
"user_id": test_user["user_id"],
@ -297,7 +316,9 @@ class TestUserUpdate:
@pytest.mark.p1
def test_update_multiple_fields(
self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any]
self,
web_api_auth: RAGFlowWebApiAuth,
test_user: dict[str, Any],
) -> None:
new_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
payload: dict[str, Any] = {
@ -314,7 +335,9 @@ class TestUserUpdate:
@pytest.mark.p1
def test_update_no_fields(
self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any]
self,
web_api_auth: RAGFlowWebApiAuth,
test_user: dict[str, Any],
) -> None:
payload: dict[str, str] = {"user_id": test_user["user_id"]}
res: dict[str, Any] = update_user(web_api_auth, payload)
@ -323,7 +346,9 @@ class TestUserUpdate:
@pytest.mark.p1
def test_update_email_using_email_field_when_user_id_provided(
self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any]
self,
web_api_auth: RAGFlowWebApiAuth,
test_user: dict[str, Any],
) -> None:
new_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
payload: dict[str, str] = {
@ -336,7 +361,9 @@ class TestUserUpdate:
@pytest.mark.p2
def test_update_response_structure(
self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any]
self,
web_api_auth: RAGFlowWebApiAuth,
test_user: dict[str, Any],
) -> None:
payload: dict[str, Any] = {
"user_id": test_user["user_id"],
@ -350,7 +377,9 @@ class TestUserUpdate:
@pytest.mark.p2
def test_concurrent_updates(
self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any]
self,
web_api_auth: RAGFlowWebApiAuth,
test_user: dict[str, Any],
) -> None:
"""Test concurrent updates to the same user (users can only update themselves)."""
# Test concurrent updates to the authenticated user's own account
@ -373,7 +402,9 @@ class TestUserUpdate:
@pytest.mark.p3
def test_update_same_user_multiple_times(
self, web_api_auth: RAGFlowWebApiAuth, test_user: dict[str, Any]
self,
web_api_auth: RAGFlowWebApiAuth,
test_user: dict[str, Any],
) -> None:
"""Test repeated updates on the same user."""
for nickname in (

View file

@ -22,7 +22,7 @@ from typing import Any
import pytest
from ..common import create_user, list_users
from common import create_user, list_users
from libs.auth import RAGFlowWebApiAuth
@ -33,7 +33,9 @@ class TestUserEdgeCases:
@pytest.mark.p2
def test_extremely_long_nickname(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test nickname with extreme length."""
long_nickname: str = "A" * 1000
@ -51,6 +53,7 @@ class TestUserEdgeCases:
assert len(res["data"]["nickname"]) <= 255, (
"Nickname should be truncated to max length"
)
clear_users.append(unique_email)
else:
# If rejected, should have clear error message
assert (
@ -61,7 +64,9 @@ class TestUserEdgeCases:
@pytest.mark.p2
def test_unicode_in_all_fields(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test unicode characters in various fields."""
# Use ASCII-safe email local part (unicode in display name)
@ -80,10 +85,13 @@ class TestUserEdgeCases:
"Unicode nickname should be preserved"
)
assert res["data"]["email"] == unique_email
clear_users.append(unique_email)
@pytest.mark.p2
def test_emoji_in_nickname(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test emoji characters in nickname."""
nickname_with_emoji: str = "Test User 😀🎉🔥"
@ -101,6 +109,7 @@ class TestUserEdgeCases:
assert "😀" in res["data"]["nickname"] or "?" in res["data"]["nickname"], (
"Emoji should be preserved or replaced with placeholder"
)
clear_users.append(unique_email)
@pytest.mark.p2
@pytest.mark.parametrize(
@ -114,7 +123,10 @@ class TestUserEdgeCases:
],
)
def test_special_characters_in_email(
self, web_api_auth: RAGFlowWebApiAuth, special_email: str
self,
web_api_auth: RAGFlowWebApiAuth,
special_email: str,
clear_users: list[str],
) -> None:
"""Test various special characters in email."""
payload: dict[str, str] = {
@ -125,10 +137,13 @@ class TestUserEdgeCases:
res: dict[str, Any] = create_user(web_api_auth, payload)
assert res["code"] == 0, f"Failed for email: {special_email}, {res}"
assert res["data"]["email"] == special_email
clear_users.append(special_email)
@pytest.mark.p2
def test_whitespace_handling_in_fields(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test whitespace handling in various fields."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -148,10 +163,13 @@ class TestUserEdgeCases:
# Nickname whitespace handling is flexible
nickname: str = res["data"]["nickname"]
assert nickname.strip() != "", "Nickname should not be only whitespace"
clear_users.append(unique_email)
@pytest.mark.p2
def test_null_byte_in_input(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test null byte injection in input fields."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -168,6 +186,7 @@ class TestUserEdgeCases:
assert "\x00" not in res["data"]["nickname"], (
"Null byte should be sanitized"
)
clear_users.append(unique_email)
@pytest.mark.p2
def test_very_long_email(
@ -247,7 +266,9 @@ class TestUserEdgeCases:
@pytest.mark.p3
def test_empty_string_vs_none_in_optional_fields(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test difference between empty string and None for optional fields."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -263,6 +284,7 @@ class TestUserEdgeCases:
# Empty nickname should be accepted per current API behavior
assert res_empty["code"] == 0, "Empty nickname should be accepted"
assert res_empty["data"]["nickname"] == ""
clear_users.append(unique_email)
@pytest.mark.p3
def test_pagination_with_no_results(
@ -281,12 +303,14 @@ class TestUserEdgeCases:
@pytest.mark.p3
def test_pagination_beyond_available_pages(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test requesting page beyond available data."""
# Create one user
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
create_user(
create_res: dict[str, Any] = create_user(
web_api_auth,
{
"nickname": "test",
@ -294,6 +318,8 @@ class TestUserEdgeCases:
"password": "test123",
},
)
if create_res["code"] == 0:
clear_users.append(unique_email)
# Request page 100
res: dict[str, Any] = list_users(
@ -349,7 +375,9 @@ class TestUserEdgeCases:
@pytest.mark.p3
def test_special_characters_in_password(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test password with various special characters."""
special_passwords: list[str] = [
@ -368,15 +396,18 @@ class TestUserEdgeCases:
"password": password,
}
res: dict[str, Any] = create_user(web_api_auth, payload)
# Should accept special characters in password
assert res["code"] == 0, (
f"Password with special chars should be accepted: {password}"
)
clear_users.append(unique_email)
@pytest.mark.p3
def test_json_injection_in_fields(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test JSON injection attempts."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -392,10 +423,13 @@ class TestUserEdgeCases:
assert res["data"]["nickname"] == '{"admin": true}', (
"Should store as literal string, not parse JSON"
)
clear_users.append(unique_email)
@pytest.mark.p3
def test_path_traversal_in_nickname(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test path traversal attempts in nickname."""
traversal_attempts: list[str] = [
@ -412,7 +446,7 @@ class TestUserEdgeCases:
"password": "test123",
}
res: dict[str, Any] = create_user(web_api_auth, payload)
# Should either reject or sanitize path traversal attempts
# At minimum, should not allow actual file system access
if res["code"] == 0:
@ -420,3 +454,4 @@ class TestUserEdgeCases:
stored_nickname: str = res["data"]["nickname"]
# Verify it's treated as literal string
assert isinstance(stored_nickname, str)
clear_users.append(unique_email)

View file

@ -24,7 +24,7 @@ from typing import Any
import pytest
from ..common import create_user, list_users
from common import create_user, list_users
from libs.auth import RAGFlowWebApiAuth
@ -35,7 +35,9 @@ class TestUserPerformance:
@pytest.mark.p2
def test_list_users_performance_small_dataset(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test list_users performance with small dataset."""
# Create 20 users
@ -52,6 +54,7 @@ class TestUserPerformance:
)
if res["code"] == 0:
created_users.append(res["data"]["id"])
clear_users.append(unique_email)
# Test list performance without pagination
start: float = time.time()
@ -65,7 +68,9 @@ class TestUserPerformance:
@pytest.mark.p2
def test_list_users_pagination_performance(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test pagination performance with moderate dataset."""
# Create 50 users
@ -79,6 +84,7 @@ class TestUserPerformance:
"password": "test123",
},
)
clear_users.append(unique_email)
# Test pagination performance
start: float = time.time()
@ -95,13 +101,16 @@ class TestUserPerformance:
@pytest.mark.p3
def test_concurrent_user_creation(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test concurrent user creation without conflicts."""
count: int = 20
def create_test_user(index: int) -> dict[str, Any]:
unique_email: str = f"concurrent_{index}_{uuid.uuid4().hex[:8]}@example.com"
clear_users.append(unique_email)
return create_user(
web_api_auth,
{
@ -136,7 +145,9 @@ class TestUserPerformance:
@pytest.mark.p3
def test_user_creation_response_time(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test individual user creation response time."""
response_times: list[float] = []
@ -156,6 +167,7 @@ class TestUserPerformance:
assert res["code"] == 0, f"User creation failed: {res}"
response_times.append(duration)
clear_users.append(unique_email)
# Calculate statistics
avg_time: float = sum(response_times) / len(response_times)
@ -172,7 +184,9 @@ class TestUserPerformance:
@pytest.mark.p3
def test_sequential_vs_concurrent_creation_comparison(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Compare sequential vs concurrent user creation performance."""
count: int = 10
@ -189,11 +203,13 @@ class TestUserPerformance:
"password": "test123",
},
)
clear_users.append(unique_email)
sequential_duration: float = time.time() - sequential_start
# Concurrent creation
def create_concurrent_user(index: int) -> dict[str, Any]:
unique_email: str = f"conc_{index}_{uuid.uuid4().hex[:8]}@example.com"
clear_users.append(unique_email)
return create_user(
web_api_auth,
{
@ -231,7 +247,9 @@ class TestUserPerformance:
@pytest.mark.p3
def test_pagination_consistency_under_load(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test pagination consistency during concurrent modifications."""
# Create initial set of users
@ -246,6 +264,7 @@ class TestUserPerformance:
"password": "test123",
},
)
clear_users.append(unique_email)
# Test pagination while users are being created
def paginate_users() -> dict[str, Any]:
@ -262,6 +281,7 @@ class TestUserPerformance:
"password": "test123",
},
)
clear_users.append(unique_email)
with ThreadPoolExecutor(max_workers=3) as executor:
# Start pagination requests
@ -284,7 +304,9 @@ class TestUserPerformance:
@pytest.mark.p3
def test_memory_efficiency_large_list(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test memory efficiency when listing many users."""
# Create 100 users
@ -298,6 +320,7 @@ class TestUserPerformance:
"password": "test123",
},
)
clear_users.append(unique_email)
# List all users (without pagination)
res: dict[str, Any] = list_users(web_api_auth)
@ -311,7 +334,9 @@ class TestUserPerformance:
@pytest.mark.p3
@pytest.mark.skip(reason="Stress test - run manually")
def test_sustained_load(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test system stability under sustained load (manual run)."""
duration_seconds: int = 60 # Run for 1 minute
@ -336,6 +361,7 @@ class TestUserPerformance:
},
)
request_count += 1
clear_users.append(unique_email)
if res["code"] != 0:
error_count += 1
@ -362,7 +388,9 @@ class TestUserPerformance:
@pytest.mark.p3
def test_large_payload_handling(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Test handling of large request payloads."""
# Create user with large nickname (but within limits)
@ -390,3 +418,4 @@ class TestUserPerformance:
assert len(res["data"]["nickname"]) <= 255, (
"Nickname should be capped at reasonable length"
)
clear_users.append(unique_email)

View file

@ -22,7 +22,7 @@ from typing import Any
import pytest
from ..common import create_user
from common import create_user
from libs.auth import RAGFlowWebApiAuth
@ -43,7 +43,9 @@ class TestUserSecurity:
],
)
def test_sql_injection_in_email(
self, web_api_auth: RAGFlowWebApiAuth, malicious_email: str
self,
web_api_auth: RAGFlowWebApiAuth,
malicious_email: str,
) -> None:
"""Test SQL injection attempts in email field are properly handled."""
payload: dict[str, str] = {
@ -70,7 +72,10 @@ class TestUserSecurity:
],
)
def test_xss_in_nickname(
self, web_api_auth: RAGFlowWebApiAuth, xss_payload: str
self,
web_api_auth: RAGFlowWebApiAuth,
xss_payload: str,
clear_users: list[str],
) -> None:
"""Test XSS attempts in nickname field are sanitized."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -92,10 +97,13 @@ class TestUserSecurity:
assert "<iframe" not in nickname.lower(), (
"Iframe tags should be sanitized"
)
clear_users.append(unique_email)
@pytest.mark.p1
def test_password_not_in_response(
self, web_api_auth: RAGFlowWebApiAuth
self,
web_api_auth: RAGFlowWebApiAuth,
clear_users: list[str],
) -> None:
"""Ensure plain password never appears in response."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -121,6 +129,7 @@ class TestUserSecurity:
assert stored_password != password, (
"Stored password should not match plain password"
)
clear_users.append(unique_email)
@pytest.mark.p2
@pytest.mark.parametrize(
@ -134,7 +143,10 @@ class TestUserSecurity:
],
)
def test_weak_password_handling(
self, web_api_auth: RAGFlowWebApiAuth, weak_password: str
self,
web_api_auth: RAGFlowWebApiAuth,
weak_password: str,
clear_users: list[str],
) -> None:
"""Test handling of weak passwords."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -149,10 +161,11 @@ class TestUserSecurity:
assert res["code"] != 0, (
f"Empty password should be rejected: '{weak_password}'"
)
clear_users.append(unique_email)
@pytest.mark.p2
def test_unauthorized_superuser_creation(
self, web_api_auth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth, clear_users: list[str]
) -> None:
"""Test that regular users cannot escalate privileges."""
# Note: This test assumes web_api_auth represents a regular user
@ -172,10 +185,11 @@ class TestUserSecurity:
# Log that this privilege escalation is currently possible
# In a production system, this should be blocked
pass
clear_users.append(unique_email)
@pytest.mark.p2
def test_password_hashing_is_secure(
self, web_api_auth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth, clear_users: list[str]
) -> None:
"""Verify passwords are hashed using secure algorithm."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -202,6 +216,7 @@ class TestUserSecurity:
)
# Should contain salt (indicated by multiple $ separators in scrypt format)
assert hashed.count("$") >= 2, "Should include salt in hash"
clear_users.append(unique_email)
@pytest.mark.p2
@pytest.mark.parametrize(
@ -213,7 +228,10 @@ class TestUserSecurity:
],
)
def test_control_character_injection(
self, web_api_auth: RAGFlowWebApiAuth, injection_attempt: dict[str, str]
self,
web_api_auth: RAGFlowWebApiAuth,
injection_attempt: dict[str, str],
clear_users: list[str],
) -> None:
"""Test protection against control character injection."""
# Complete the payload with required fields
@ -239,10 +257,11 @@ class TestUserSecurity:
assert "\n" not in res["data"]["nickname"], (
"Line feeds should be removed"
)
clear_users.append(payload["email"])
@pytest.mark.p3
def test_session_token_security(
self, web_api_auth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth, clear_users: list[str]
) -> None:
"""Test that access tokens are properly secured."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -264,10 +283,11 @@ class TestUserSecurity:
assert not token.startswith("user_"), (
"Token should not use predictable pattern"
)
clear_users.append(unique_email)
@pytest.mark.p3
def test_email_case_sensitivity(
self, web_api_auth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth, clear_users: list[str]
) -> None:
"""Test email uniqueness is case-insensitive."""
base_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -293,10 +313,12 @@ class TestUserSecurity:
# Note: Current implementation may allow this, but it should be fixed
# assert res2["code"] != 0, "Uppercase email should be treated as duplicate"
# assert "already registered" in res2["message"].lower()
clear_users.append(base_email.lower())
clear_users.append(base_email.upper())
@pytest.mark.p3
def test_concurrent_user_creation_same_email(
self, web_api_auth: RAGFlowWebApiAuth
self, web_api_auth: RAGFlowWebApiAuth, clear_users: list[str]
) -> None:
"""Test race condition protection for duplicate emails."""
import concurrent.futures
@ -340,3 +362,4 @@ class TestUserSecurity:
"already registered" in r.get("message", "").lower()
for r in failed_responses
), "Failure should be due to duplicate email"
clear_users.append(email)