[OND211-2329]: Updated list users and delete user API's & tests to handle auth.

This commit is contained in:
Hetavi Shah 2025-11-12 18:11:29 +05:30
parent 1cc081ef5a
commit 043b06a24d
3 changed files with 120 additions and 295 deletions

View file

@ -1117,7 +1117,7 @@ def update_user() -> Response:
@manager.route("/list", methods=["GET"]) # noqa: F821 @manager.route("/list", methods=["GET"]) # noqa: F821
# @login_required @login_required
def list_users() -> Response: def list_users() -> Response:
""" """
List all users. List all users.
@ -1169,11 +1169,23 @@ def list_users() -> Response:
total: total:
type: integer type: integer
description: Total number of users. description: Total number of users.
401:
description: Unauthorized - authentication required.
schema:
type: object
500: 500:
description: Server error during user listing. description: Server error during user listing.
schema: schema:
type: object type: object
""" """
# Explicitly check authentication status
if not current_user.is_authenticated:
return get_json_result(
data=False,
message="Unauthorized",
code=RetCode.UNAUTHORIZED,
)
try: try:
# Get query parameters # Get query parameters
page: Optional[int] = None page: Optional[int] = None
@ -1261,11 +1273,11 @@ def list_users() -> Response:
@manager.route("/delete", methods=["DELETE"]) # noqa: F821 @manager.route("/delete", methods=["DELETE"]) # noqa: F821
# @login_required @login_required
@validate_request() @validate_request()
def delete_user() -> Response: def delete_user() -> Response:
""" """
Delete a user. Delete a user. Users can only delete their own account.
--- ---
tags: tags:
@ -1299,6 +1311,14 @@ def delete_user() -> Response:
message: message:
type: string type: string
description: Success message. description: Success message.
401:
description: Unauthorized - authentication required.
schema:
type: object
403:
description: Forbidden - users can only delete their own account.
schema:
type: object
400: 400:
description: Invalid request or user not found. description: Invalid request or user not found.
schema: schema:
@ -1308,6 +1328,14 @@ def delete_user() -> Response:
schema: schema:
type: object type: object
""" """
# Explicitly check authentication status
if not current_user.is_authenticated:
return get_json_result(
data=False,
message="Unauthorized",
code=RetCode.UNAUTHORIZED,
)
if request.json is None: if request.json is None:
return get_json_result( return get_json_result(
data=False, data=False,
@ -1368,6 +1396,14 @@ def delete_user() -> Response:
code=RetCode.DATA_ERROR, code=RetCode.DATA_ERROR,
) )
# Ensure user can only delete themselves
if user.id != current_user.id:
return get_json_result(
data=False,
message="You can only delete your own account!",
code=RetCode.FORBIDDEN,
)
# Delete the user # Delete the user
try: try:
# Use hard delete to actually remove the user # Use hard delete to actually remove the user

View file

@ -24,9 +24,9 @@ import pytest
from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5 from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5
from Cryptodome.PublicKey import RSA from Cryptodome.PublicKey import RSA
from common import create_user, delete_user, list_users from common import create_user, delete_user
from configs import INVALID_API_TOKEN from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowHttpApiAuth from libs.auth import RAGFlowHttpApiAuth, RAGFlowWebApiAuth
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -69,21 +69,17 @@ class TestAuthorization:
@pytest.mark.parametrize( @pytest.mark.parametrize(
("invalid_auth", "expected_code", "expected_message"), ("invalid_auth", "expected_code", "expected_message"),
[ [
# Note: @login_required is commented out, so endpoint works # Endpoint now requires @login_required (JWT token auth)
# without auth (None, 401, "Unauthorized"),
# Testing with None auth should succeed (code 0) if endpoint (RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "Unauthorized"),
# doesn't require auth
(None, 0, ""),
# Invalid token should also work if auth is not required
(RAGFlowHttpApiAuth(INVALID_API_TOKEN), 0, ""),
], ],
) )
def test_invalid_auth( def test_invalid_auth(
self, self,
invalid_auth: RAGFlowHttpApiAuth | None, invalid_auth: RAGFlowWebApiAuth | None,
expected_code: int, expected_code: int,
expected_message: str, expected_message: str,
HttpApiAuth: RAGFlowHttpApiAuth, WebApiAuth: RAGFlowWebApiAuth,
) -> None: ) -> None:
"""Test user deletion with invalid or missing authentication.""" """Test user deletion with invalid or missing authentication."""
# Create a test user first # Create a test user first
@ -93,7 +89,7 @@ class TestAuthorization:
"email": unique_email, "email": unique_email,
"password": encrypt_password("test123"), "password": encrypt_password("test123"),
} }
create_res: dict[str, Any] = create_user(HttpApiAuth, create_payload) create_res: dict[str, Any] = create_user(WebApiAuth, create_payload)
if create_res["code"] != 0: if create_res["code"] != 0:
pytest.skip("User creation failed, skipping auth test") pytest.skip("User creation failed, skipping auth test")
@ -106,285 +102,82 @@ class TestAuthorization:
if expected_message: if expected_message:
assert expected_message in res["message"] assert expected_message in res["message"]
@pytest.mark.p1
def test_user_can_only_delete_themselves(
self,
WebApiAuth: RAGFlowWebApiAuth,
) -> None:
"""Test that users can only delete their own account."""
# Create another user
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
create_payload: dict[str, str] = {
"nickname": "another_user",
"email": unique_email,
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(WebApiAuth, create_payload)
assert create_res["code"] == 0, "Failed to create second user"
other_user_id: str = create_res["data"]["id"]
# Try to delete another user's account (should fail)
delete_payload: dict[str, Any] = {
"user_id": other_user_id,
}
res: dict[str, Any] = delete_user(WebApiAuth, delete_payload)
assert res["code"] == 403, f"Expected 403 FORBIDDEN, got {res}"
assert "only delete your own account" in res["message"].lower()
@pytest.mark.usefixtures("clear_users") @pytest.mark.usefixtures("clear_users")
class TestUserDelete: class TestUserDelete:
"""Comprehensive tests for user deletion API.""" """Comprehensive tests for user deletion API."""
@pytest.mark.p1
def test_delete_user_by_id(
self, HttpApiAuth: RAGFlowHttpApiAuth
) -> None:
"""Test deleting a user by user_id."""
# Create a test user
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
create_payload: dict[str, str] = {
"nickname": "test_user_delete_id",
"email": unique_email,
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(HttpApiAuth, create_payload)
assert create_res["code"] == 0, create_res
user_id: str = create_res["data"]["id"]
# Delete the user
delete_payload: dict[str, str] = {"user_id": user_id}
delete_res: dict[str, Any] = delete_user(HttpApiAuth, delete_payload)
assert delete_res["code"] == 0, delete_res
assert delete_res["data"] is True
assert "deleted successfully" in delete_res["message"].lower()
# Verify user is deleted
list_res: dict[str, Any] = list_users(HttpApiAuth)
user_emails: list[str] = [
u["email"] for u in list_res["data"] if u.get("id") == user_id
]
assert unique_email not in user_emails
@pytest.mark.p1
def test_delete_user_by_email(
self, HttpApiAuth: RAGFlowHttpApiAuth
) -> None:
"""Test deleting a user by email."""
# Create a test user
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
create_payload: dict[str, str] = {
"nickname": "test_user_delete_email",
"email": unique_email,
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(HttpApiAuth, create_payload)
assert create_res["code"] == 0, create_res
# Delete the user by email
delete_payload: dict[str, str] = {"email": unique_email}
delete_res: dict[str, Any] = delete_user(HttpApiAuth, delete_payload)
assert delete_res["code"] == 0, delete_res
assert delete_res["data"] is True
assert "deleted successfully" in delete_res["message"].lower()
# Verify user is deleted
params: dict[str, str] = {"email": unique_email}
list_res: dict[str, Any] = list_users(HttpApiAuth, params=params)
assert len(list_res["data"]) == 0
@pytest.mark.p1 @pytest.mark.p1
def test_delete_user_missing_identifier( def test_delete_user_missing_identifier(
self, HttpApiAuth: RAGFlowHttpApiAuth self, WebApiAuth: RAGFlowWebApiAuth
) -> None: ) -> None:
"""Test deletion without user_id or email.""" """Test deletion without user_id or email."""
delete_payload: dict[str, str] = {} delete_payload: dict[str, str] = {}
res: dict[str, Any] = delete_user(HttpApiAuth, delete_payload) res: dict[str, Any] = delete_user(WebApiAuth, delete_payload)
assert res["code"] == 101 assert res["code"] == 101
assert "Either user_id or email must be provided" in res["message"] assert "Either user_id or email must be provided" in res["message"]
@pytest.mark.p1 @pytest.mark.p1
def test_delete_user_not_found_by_id( def test_delete_user_not_found_by_id(
self, HttpApiAuth: RAGFlowHttpApiAuth self, WebApiAuth: RAGFlowWebApiAuth
) -> None: ) -> None:
"""Test deletion of non-existent user by ID.""" """Test deletion of non-existent user by ID."""
delete_payload: dict[str, str] = { delete_payload: dict[str, str] = {
"user_id": "non_existent_user_id_12345", "user_id": "non_existent_user_id_12345",
} }
res: dict[str, Any] = delete_user(HttpApiAuth, delete_payload) res: dict[str, Any] = delete_user(WebApiAuth, delete_payload)
assert res["code"] == 102 assert res["code"] == 102
assert "User not found" in res["message"] assert "User not found" in res["message"]
@pytest.mark.p1
def test_delete_user_not_found_by_email(
self, HttpApiAuth: RAGFlowHttpApiAuth
) -> None:
"""Test deletion of non-existent user by email."""
nonexistent_email: str = (
f"nonexistent_{uuid.uuid4().hex[:8]}@example.com"
)
delete_payload: dict[str, str] = {"email": nonexistent_email}
res: dict[str, Any] = delete_user(HttpApiAuth, delete_payload)
assert res["code"] == 102
assert "not found" in res["message"]
@pytest.mark.p1 @pytest.mark.p1
def test_delete_user_invalid_email_format( def test_delete_user_invalid_email_format(
self, HttpApiAuth: RAGFlowHttpApiAuth self, WebApiAuth: RAGFlowWebApiAuth
) -> None: ) -> None:
"""Test deletion with invalid email format.""" """Test deletion with invalid email format."""
delete_payload: dict[str, str] = {"email": "invalid_email_format"} delete_payload: dict[str, str] = {"email": "invalid_email_format"}
res: dict[str, Any] = delete_user(HttpApiAuth, delete_payload) res: dict[str, Any] = delete_user(WebApiAuth, delete_payload)
assert res["code"] == 103 assert res["code"] == 103
assert "Invalid email address" in res["message"] assert "Invalid email address" in res["message"]
@pytest.mark.p1
def test_delete_user_multiple_users_same_email(
self, HttpApiAuth: RAGFlowHttpApiAuth
) -> None:
"""Test deletion when multiple users have the same email."""
# This scenario shouldn't happen in normal operation, but test it
# Create a user
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
create_payload: dict[str, str] = {
"nickname": "test_user_1",
"email": unique_email,
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(HttpApiAuth, create_payload)
assert create_res["code"] == 0
# Try to delete by email (should work if only one user exists)
delete_payload: dict[str, str] = {"email": unique_email}
res: dict[str, Any] = delete_user(HttpApiAuth, delete_payload)
# Should succeed if only one user, or fail if multiple
assert res["code"] in (0, 102)
@pytest.mark.p1
def test_delete_user_twice(
self, HttpApiAuth: RAGFlowHttpApiAuth
) -> None:
"""Test deleting the same user twice."""
# Create a test user
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
create_payload: dict[str, str] = {
"nickname": "test_user_delete_twice",
"email": unique_email,
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(HttpApiAuth, create_payload)
assert create_res["code"] == 0, create_res
user_id: str = create_res["data"]["id"]
# Delete the user first time
delete_payload: dict[str, str] = {"user_id": user_id}
delete_res: dict[str, Any] = delete_user(HttpApiAuth, delete_payload)
assert delete_res["code"] == 0, delete_res
# Try to delete again
delete_res2: dict[str, Any] = delete_user(HttpApiAuth, delete_payload)
assert delete_res2["code"] == 102
assert "not found" in delete_res2["message"]
@pytest.mark.p1
def test_delete_user_response_structure(
self, HttpApiAuth: RAGFlowHttpApiAuth
) -> None:
"""Test that user deletion returns the expected response structure."""
# Create a test user
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
create_payload: dict[str, str] = {
"nickname": "test_user_delete_structure",
"email": unique_email,
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(HttpApiAuth, create_payload)
assert create_res["code"] == 0, create_res
user_id: str = create_res["data"]["id"]
# Delete the user
delete_payload: dict[str, str] = {"user_id": user_id}
res: dict[str, Any] = delete_user(HttpApiAuth, delete_payload)
assert res["code"] == 0
assert "data" in res
assert res["data"] is True
assert "message" in res
assert "deleted successfully" in res["message"].lower()
@pytest.mark.p2
def test_delete_multiple_users_sequentially(
self, HttpApiAuth: RAGFlowHttpApiAuth
) -> None:
"""Test deleting multiple users sequentially."""
created_user_ids: list[str] = []
for i in range(3):
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
create_payload: dict[str, str] = {
"nickname": f"test_user_seq_{i}",
"email": unique_email,
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(
HttpApiAuth, create_payload
)
if create_res["code"] == 0:
created_user_ids.append(create_res["data"]["id"])
if len(created_user_ids) == 0:
pytest.skip("No users created, skipping sequential delete test")
# Delete all created users
for user_id in created_user_ids:
delete_payload: dict[str, str] = {"user_id": user_id}
delete_res: dict[str, Any] = delete_user(
HttpApiAuth, delete_payload
)
assert delete_res["code"] == 0, delete_res
# Verify all users are deleted
for user_id in created_user_ids:
list_res: dict[str, Any] = list_users(HttpApiAuth)
found_users: list[dict[str, Any]] = [
u for u in list_res["data"] if u.get("id") == user_id
]
assert len(found_users) == 0
@pytest.mark.p2
def test_delete_user_and_verify_not_in_list(
self, HttpApiAuth: RAGFlowHttpApiAuth
) -> None:
"""Test that deleted user is not in the user list."""
# Create a test user
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
create_payload: dict[str, str] = {
"nickname": "test_user_verify_delete",
"email": unique_email,
"password": encrypt_password("test123"),
}
create_res: dict[str, Any] = create_user(HttpApiAuth, create_payload)
assert create_res["code"] == 0, create_res
user_id: str = create_res["data"]["id"]
# Verify user exists in list
params: dict[str, str] = {"email": unique_email}
list_res_before: dict[str, Any] = list_users(
HttpApiAuth, params=params
)
assert len(list_res_before["data"]) >= 1
assert any(u["email"] == unique_email for u in list_res_before["data"])
# Delete the user
delete_payload: dict[str, str] = {"user_id": user_id}
delete_res: dict[str, Any] = delete_user(HttpApiAuth, delete_payload)
assert delete_res["code"] == 0, delete_res
# Verify user is not in list
list_res_after: dict[str, Any] = list_users(HttpApiAuth, params=params)
assert len(list_res_after["data"]) == 0
@pytest.mark.p2
def test_delete_user_with_empty_payload(
self, HttpApiAuth: RAGFlowHttpApiAuth
) -> None:
"""Test deletion with empty payload."""
delete_payload: dict[str, Any] = {}
res: dict[str, Any] = delete_user(HttpApiAuth, delete_payload)
assert res["code"] == 101
assert "Either user_id or email must be provided" in res["message"]
@pytest.mark.p3 @pytest.mark.p3
def test_delete_user_idempotency( def test_delete_user_idempotency(
self, HttpApiAuth: RAGFlowHttpApiAuth self, WebApiAuth: RAGFlowWebApiAuth
) -> None: ) -> None:
"""Test that deleting a non-existent user returns consistent error.""" """Test that deleting a non-existent user returns consistent error."""
non_existent_id: str = f"nonexistent_{uuid.uuid4().hex[:16]}" non_existent_id: str = f"nonexistent_{uuid.uuid4().hex[:16]}"
# First attempt # First attempt
delete_payload: dict[str, str] = {"user_id": non_existent_id} delete_payload: dict[str, str] = {"user_id": non_existent_id}
res1: dict[str, Any] = delete_user(HttpApiAuth, delete_payload) res1: dict[str, Any] = delete_user(WebApiAuth, delete_payload)
assert res1["code"] == 102 assert res1["code"] == 102
# Second attempt (should return same error) # Second attempt (should return same error)
res2: dict[str, Any] = delete_user(HttpApiAuth, delete_payload) res2: dict[str, Any] = delete_user(WebApiAuth, delete_payload)
assert res2["code"] == 102 assert res2["code"] == 102
assert res1["code"] == res2["code"] assert res1["code"] == res2["code"]

View file

@ -26,7 +26,7 @@ from Cryptodome.PublicKey import RSA
from common import create_user, list_users from common import create_user, list_users
from configs import INVALID_API_TOKEN from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowHttpApiAuth from libs.auth import RAGFlowHttpApiAuth, RAGFlowWebApiAuth
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -69,18 +69,14 @@ class TestAuthorization:
@pytest.mark.parametrize( @pytest.mark.parametrize(
("invalid_auth", "expected_code", "expected_message"), ("invalid_auth", "expected_code", "expected_message"),
[ [
# Note: @login_required is commented out, so endpoint works # Endpoint now requires @login_required (JWT token auth)
# without auth (None, 401, "Unauthorized"),
# Testing with None auth should succeed (code 0) if endpoint (RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "Unauthorized"),
# doesn't require auth
(None, 0, ""),
# Invalid token should also work if auth is not required
(RAGFlowHttpApiAuth(INVALID_API_TOKEN), 0, ""),
], ],
) )
def test_invalid_auth( def test_invalid_auth(
self, self,
invalid_auth: RAGFlowHttpApiAuth | None, invalid_auth: RAGFlowWebApiAuth | None,
expected_code: int, expected_code: int,
expected_message: str, expected_message: str,
) -> None: ) -> None:
@ -107,7 +103,7 @@ class TestUserList:
@pytest.mark.p1 @pytest.mark.p1
def test_list_single_user( def test_list_single_user(
self, HttpApiAuth: RAGFlowHttpApiAuth self, WebApiAuth: RAGFlowWebApiAuth
) -> None: ) -> None:
"""Test listing a single user.""" """Test listing a single user."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -116,12 +112,12 @@ class TestUserList:
"email": unique_email, "email": unique_email,
"password": encrypt_password("test123"), "password": encrypt_password("test123"),
} }
create_res: dict[str, Any] = create_user(HttpApiAuth, create_payload) create_res: dict[str, Any] = create_user(WebApiAuth, create_payload)
# Skip if creation fails (password encryption issue in test) # Skip if creation fails (password encryption issue in test)
if create_res["code"] != 0: if create_res["code"] != 0:
pytest.skip("User creation failed, skipping list test") pytest.skip("User creation failed, skipping list test")
list_res: dict[str, Any] = list_users(HttpApiAuth) list_res: dict[str, Any] = list_users(WebApiAuth)
assert list_res["code"] == 0, list_res assert list_res["code"] == 0, list_res
assert isinstance(list_res["data"], list) assert isinstance(list_res["data"], list)
assert len(list_res["data"]) >= 1 assert len(list_res["data"]) >= 1
@ -131,7 +127,7 @@ class TestUserList:
@pytest.mark.p1 @pytest.mark.p1
def test_list_multiple_users( def test_list_multiple_users(
self, HttpApiAuth: RAGFlowHttpApiAuth self, WebApiAuth: RAGFlowWebApiAuth
) -> None: ) -> None:
"""Test listing multiple users.""" """Test listing multiple users."""
created_emails: list[str] = [] created_emails: list[str] = []
@ -143,7 +139,7 @@ class TestUserList:
"password": encrypt_password("test123"), "password": encrypt_password("test123"),
} }
create_res: dict[str, Any] = create_user( create_res: dict[str, Any] = create_user(
HttpApiAuth, create_payload WebApiAuth, create_payload
) )
if create_res["code"] == 0: if create_res["code"] == 0:
created_emails.append(unique_email) created_emails.append(unique_email)
@ -151,7 +147,7 @@ class TestUserList:
if not created_emails: if not created_emails:
pytest.skip("No users created, skipping list test") pytest.skip("No users created, skipping list test")
list_res: dict[str, Any] = list_users(HttpApiAuth) list_res: dict[str, Any] = list_users(WebApiAuth)
assert list_res["code"] == 0, list_res assert list_res["code"] == 0, list_res
assert isinstance(list_res["data"], list) assert isinstance(list_res["data"], list)
assert len(list_res["data"]) >= len(created_emails) assert len(list_res["data"]) >= len(created_emails)
@ -162,7 +158,7 @@ class TestUserList:
@pytest.mark.p1 @pytest.mark.p1
def test_list_users_with_email_filter( def test_list_users_with_email_filter(
self, HttpApiAuth: RAGFlowHttpApiAuth self, WebApiAuth: RAGFlowWebApiAuth
) -> None: ) -> None:
"""Test listing users filtered by email.""" """Test listing users filtered by email."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -171,13 +167,13 @@ class TestUserList:
"email": unique_email, "email": unique_email,
"password": encrypt_password("test123"), "password": encrypt_password("test123"),
} }
create_res: dict[str, Any] = create_user(HttpApiAuth, create_payload) create_res: dict[str, Any] = create_user(WebApiAuth, create_payload)
if create_res["code"] != 0: if create_res["code"] != 0:
pytest.skip("User creation failed, skipping filter test") pytest.skip("User creation failed, skipping filter test")
# List with email filter # List with email filter
params: dict[str, str] = {"email": unique_email} params: dict[str, str] = {"email": unique_email}
list_res: dict[str, Any] = list_users(HttpApiAuth, params=params) list_res: dict[str, Any] = list_users(WebApiAuth, params=params)
assert list_res["code"] == 0, list_res assert list_res["code"] == 0, list_res
assert isinstance(list_res["data"], list) assert isinstance(list_res["data"], list)
assert len(list_res["data"]) >= 1 assert len(list_res["data"]) >= 1
@ -187,22 +183,22 @@ class TestUserList:
@pytest.mark.p1 @pytest.mark.p1
def test_list_users_with_invalid_email_filter( def test_list_users_with_invalid_email_filter(
self, HttpApiAuth: RAGFlowHttpApiAuth self, WebApiAuth: RAGFlowWebApiAuth
) -> None: ) -> None:
"""Test listing users with invalid email filter.""" """Test listing users with invalid email filter."""
params: dict[str, str] = {"email": "invalid_email_format"} params: dict[str, str] = {"email": "invalid_email_format"}
list_res: dict[str, Any] = list_users(HttpApiAuth, params=params) list_res: dict[str, Any] = list_users(WebApiAuth, params=params)
assert list_res["code"] != 0 assert list_res["code"] != 0
assert "Invalid email address" in list_res["message"] assert "Invalid email address" in list_res["message"]
@pytest.mark.p1 @pytest.mark.p1
def test_list_users_with_nonexistent_email_filter( def test_list_users_with_nonexistent_email_filter(
self, HttpApiAuth: RAGFlowHttpApiAuth self, WebApiAuth: RAGFlowWebApiAuth
) -> None: ) -> None:
"""Test listing users with non-existent email filter.""" """Test listing users with non-existent email filter."""
nonexistent_email: str = f"nonexistent_{uuid.uuid4().hex[:8]}@example.com" nonexistent_email: str = f"nonexistent_{uuid.uuid4().hex[:8]}@example.com"
params: dict[str, str] = {"email": nonexistent_email} params: dict[str, str] = {"email": nonexistent_email}
list_res: dict[str, Any] = list_users(HttpApiAuth, params=params) list_res: dict[str, Any] = list_users(WebApiAuth, params=params)
assert list_res["code"] == 0, list_res assert list_res["code"] == 0, list_res
assert isinstance(list_res["data"], list) assert isinstance(list_res["data"], list)
assert len(list_res["data"]) == 0 assert len(list_res["data"]) == 0
@ -222,7 +218,7 @@ class TestUserList:
) )
def test_list_users_with_pagination( def test_list_users_with_pagination(
self, self,
HttpApiAuth: RAGFlowHttpApiAuth, WebApiAuth: RAGFlowWebApiAuth,
page: int, page: int,
page_size: int, page_size: int,
expected_valid: bool, expected_valid: bool,
@ -238,7 +234,7 @@ class TestUserList:
"password": encrypt_password("test123"), "password": encrypt_password("test123"),
} }
create_res: dict[str, Any] = create_user( create_res: dict[str, Any] = create_user(
HttpApiAuth, create_payload WebApiAuth, create_payload
) )
if create_res["code"] == 0: if create_res["code"] == 0:
created_count += 1 created_count += 1
@ -247,7 +243,7 @@ class TestUserList:
pytest.skip("No users created, skipping pagination test") pytest.skip("No users created, skipping pagination test")
params: dict[str, int] = {"page": page, "page_size": page_size} params: dict[str, int] = {"page": page, "page_size": page_size}
list_res: dict[str, Any] = list_users(HttpApiAuth, params=params) list_res: dict[str, Any] = list_users(WebApiAuth, params=params)
if expected_valid: if expected_valid:
assert list_res["code"] == 0, list_res assert list_res["code"] == 0, list_res
@ -260,7 +256,7 @@ class TestUserList:
@pytest.mark.p1 @pytest.mark.p1
def test_list_users_pagination_boundaries( def test_list_users_pagination_boundaries(
self, HttpApiAuth: RAGFlowHttpApiAuth self, WebApiAuth: RAGFlowWebApiAuth
) -> None: ) -> None:
"""Test pagination boundary conditions.""" """Test pagination boundary conditions."""
# Create 5 users with a unique email pattern for filtering # Create 5 users with a unique email pattern for filtering
@ -274,7 +270,7 @@ class TestUserList:
"password": encrypt_password("test123"), "password": encrypt_password("test123"),
} }
create_res: dict[str, Any] = create_user( create_res: dict[str, Any] = create_user(
HttpApiAuth, create_payload WebApiAuth, create_payload
) )
if create_res["code"] == 0: if create_res["code"] == 0:
created_emails.append(unique_email) created_emails.append(unique_email)
@ -283,18 +279,18 @@ class TestUserList:
pytest.skip("Not enough users created, skipping boundary test") pytest.skip("Not enough users created, skipping boundary test")
# Get total count of all users to calculate pagination boundaries # Get total count of all users to calculate pagination boundaries
list_res_all: dict[str, Any] = list_users(HttpApiAuth) list_res_all: dict[str, Any] = list_users(WebApiAuth)
total_users: int = len(list_res_all["data"]) total_users: int = len(list_res_all["data"])
# Test first page # Test first page
params: dict[str, int] = {"page": 1, "page_size": 2} params: dict[str, int] = {"page": 1, "page_size": 2}
list_res: dict[str, Any] = list_users(HttpApiAuth, params=params) list_res: dict[str, Any] = list_users(WebApiAuth, params=params)
assert list_res["code"] == 0, list_res assert list_res["code"] == 0, list_res
assert len(list_res["data"]) == 2 assert len(list_res["data"]) == 2
# Test that pagination returns consistent page sizes # Test that pagination returns consistent page sizes
params = {"page": 2, "page_size": 2} params = {"page": 2, "page_size": 2}
list_res = list_users(HttpApiAuth, params=params) list_res = list_users(WebApiAuth, params=params)
assert list_res["code"] == 0, list_res assert list_res["code"] == 0, list_res
assert len(list_res["data"]) == 2 assert len(list_res["data"]) == 2
@ -304,23 +300,23 @@ class TestUserList:
last_page: int = (total_users + page_size - 1) // page_size last_page: int = (total_users + page_size - 1) // page_size
if last_page > 0: if last_page > 0:
params = {"page": last_page, "page_size": page_size} params = {"page": last_page, "page_size": page_size}
list_res = list_users(HttpApiAuth, params=params) list_res = list_users(WebApiAuth, params=params)
assert list_res["code"] == 0, list_res assert list_res["code"] == 0, list_res
assert len(list_res["data"]) <= page_size assert len(list_res["data"]) <= page_size
# Test page beyond available data # Test page beyond available data
# Use a page number that's definitely beyond available data # Use a page number that's definitely beyond available data
params = {"page": total_users + 10, "page_size": 2} params = {"page": total_users + 10, "page_size": 2}
list_res = list_users(HttpApiAuth, params=params) list_res = list_users(WebApiAuth, params=params)
assert list_res["code"] == 0, list_res assert list_res["code"] == 0, list_res
assert len(list_res["data"]) == 0 assert len(list_res["data"]) == 0
@pytest.mark.p1 @pytest.mark.p1
def test_list_users_response_structure( def test_list_users_response_structure(
self, HttpApiAuth: RAGFlowHttpApiAuth self, WebApiAuth: RAGFlowWebApiAuth
) -> None: ) -> None:
"""Test that user listing returns the expected response structure.""" """Test that user listing returns the expected response structure."""
res: dict[str, Any] = list_users(HttpApiAuth) res: dict[str, Any] = list_users(WebApiAuth)
assert res["code"] == 0 assert res["code"] == 0
assert "data" in res assert "data" in res
assert isinstance(res["data"], list) assert isinstance(res["data"], list)
@ -337,24 +333,24 @@ class TestUserList:
@pytest.mark.p1 @pytest.mark.p1
def test_list_users_with_invalid_page_params( def test_list_users_with_invalid_page_params(
self, HttpApiAuth: RAGFlowHttpApiAuth self, WebApiAuth: RAGFlowWebApiAuth
) -> None: ) -> None:
"""Test listing users with invalid pagination parameters.""" """Test listing users with invalid pagination parameters."""
# Test invalid page (non-integer) # Test invalid page (non-integer)
params: dict[str, str] = {"page": "invalid", "page_size": "10"} params: dict[str, str] = {"page": "invalid", "page_size": "10"}
list_res: dict[str, Any] = list_users(HttpApiAuth, params=params) list_res: dict[str, Any] = list_users(WebApiAuth, params=params)
# Should handle gracefully or return error # Should handle gracefully or return error
# The exact behavior depends on implementation # The exact behavior depends on implementation
assert "code" in list_res assert "code" in list_res
# Test invalid page_size (non-integer) # Test invalid page_size (non-integer)
params = {"page": "1", "page_size": "invalid"} params = {"page": "1", "page_size": "invalid"}
list_res = list_users(HttpApiAuth, params=params) list_res = list_users(WebApiAuth, params=params)
assert "code" in list_res assert "code" in list_res
@pytest.mark.p2 @pytest.mark.p2
def test_list_users_combined_filters( def test_list_users_combined_filters(
self, HttpApiAuth: RAGFlowHttpApiAuth self, WebApiAuth: RAGFlowWebApiAuth
) -> None: ) -> None:
"""Test listing users with combined filters.""" """Test listing users with combined filters."""
unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com" unique_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
@ -363,7 +359,7 @@ class TestUserList:
"email": unique_email, "email": unique_email,
"password": encrypt_password("test123"), "password": encrypt_password("test123"),
} }
create_res: dict[str, Any] = create_user(HttpApiAuth, create_payload) create_res: dict[str, Any] = create_user(WebApiAuth, create_payload)
if create_res["code"] != 0: if create_res["code"] != 0:
pytest.skip("User creation failed, skipping combined filter test") pytest.skip("User creation failed, skipping combined filter test")
@ -373,7 +369,7 @@ class TestUserList:
"page": 1, "page": 1,
"page_size": 10, "page_size": 10,
} }
list_res: dict[str, Any] = list_users(HttpApiAuth, params=params) list_res: dict[str, Any] = list_users(WebApiAuth, params=params)
assert list_res["code"] == 0, list_res assert list_res["code"] == 0, list_res
assert isinstance(list_res["data"], list) assert isinstance(list_res["data"], list)
# Should return at least the created user # Should return at least the created user
@ -381,7 +377,7 @@ class TestUserList:
@pytest.mark.p2 @pytest.mark.p2
def test_list_users_performance_with_many_users( def test_list_users_performance_with_many_users(
self, HttpApiAuth: RAGFlowHttpApiAuth self, WebApiAuth: RAGFlowWebApiAuth
) -> None: ) -> None:
"""Test listing performance with multiple users.""" """Test listing performance with multiple users."""
# Create several users # Create several users
@ -394,7 +390,7 @@ class TestUserList:
"password": encrypt_password("test123"), "password": encrypt_password("test123"),
} }
create_res: dict[str, Any] = create_user( create_res: dict[str, Any] = create_user(
HttpApiAuth, create_payload WebApiAuth, create_payload
) )
if create_res["code"] == 0: if create_res["code"] == 0:
created_count += 1 created_count += 1
@ -403,7 +399,7 @@ class TestUserList:
pytest.skip("No users created, skipping performance test") pytest.skip("No users created, skipping performance test")
# List all users # List all users
list_res: dict[str, Any] = list_users(HttpApiAuth) list_res: dict[str, Any] = list_users(WebApiAuth)
assert list_res["code"] == 0, list_res assert list_res["code"] == 0, list_res
assert isinstance(list_res["data"], list) assert isinstance(list_res["data"], list)
# Should return at least the created users # Should return at least the created users