[OND211-2329]: Added API and tests to add/remove users in a team.
This commit is contained in:
parent
d7b9925b8c
commit
b388a3dca0
2 changed files with 875 additions and 0 deletions
|
|
@ -44,6 +44,23 @@ from api.utils.web_utils import send_invite_email
|
|||
from common import settings
|
||||
|
||||
|
||||
def is_team_admin_or_owner(tenant_id: str, user_id: str) -> bool:
|
||||
"""
|
||||
Check if a user is an OWNER or ADMIN of a team.
|
||||
|
||||
Args:
|
||||
tenant_id: The team/tenant ID
|
||||
user_id: The user ID to check
|
||||
|
||||
Returns:
|
||||
True if user is OWNER or ADMIN, False otherwise
|
||||
"""
|
||||
user_tenant = UserTenantService.filter_by_tenant_and_user_id(tenant_id, user_id)
|
||||
if not user_tenant:
|
||||
return False
|
||||
return user_tenant.role in [UserTenantRole.OWNER, UserTenantRole.ADMIN]
|
||||
|
||||
|
||||
@manager.route("/<tenant_id>/user/list", methods=["GET"]) # noqa: F821
|
||||
@login_required
|
||||
def user_list(tenant_id):
|
||||
|
|
@ -435,3 +452,384 @@ def agree(tenant_id):
|
|||
return get_json_result(data=True)
|
||||
except Exception as e:
|
||||
return server_error_response(e)
|
||||
|
||||
|
||||
@manager.route('/<tenant_id>/users/add', methods=['POST']) # noqa: F821
|
||||
@login_required
|
||||
@validate_request("users")
|
||||
def add_users(tenant_id):
|
||||
"""
|
||||
Add one or more users to a team. Only OWNER or ADMIN can add users.
|
||||
Supports both single user and bulk operations.
|
||||
|
||||
---
|
||||
tags:
|
||||
- Team
|
||||
security:
|
||||
- ApiKeyAuth: []
|
||||
parameters:
|
||||
- in: path
|
||||
name: tenant_id
|
||||
required: true
|
||||
type: string
|
||||
description: Team ID
|
||||
- in: body
|
||||
name: body
|
||||
required: true
|
||||
schema:
|
||||
type: object
|
||||
required:
|
||||
- users
|
||||
properties:
|
||||
users:
|
||||
type: array
|
||||
description: List of users to add. Each user can be an email string or an object with email and role.
|
||||
items:
|
||||
oneOf:
|
||||
- type: string
|
||||
description: User email (will be added with 'normal' role)
|
||||
- type: object
|
||||
properties:
|
||||
email:
|
||||
type: string
|
||||
description: User email
|
||||
role:
|
||||
type: string
|
||||
description: Role to assign (normal, admin). Defaults to normal.
|
||||
enum: [normal, admin]
|
||||
responses:
|
||||
200:
|
||||
description: Users added successfully
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
data:
|
||||
type: object
|
||||
properties:
|
||||
added:
|
||||
type: array
|
||||
description: Successfully added users
|
||||
failed:
|
||||
type: array
|
||||
description: Users that failed to be added with error messages
|
||||
message:
|
||||
type: string
|
||||
400:
|
||||
description: Invalid request
|
||||
401:
|
||||
description: Unauthorized
|
||||
403:
|
||||
description: Forbidden - not owner or admin
|
||||
"""
|
||||
# Check if current user is OWNER or ADMIN of the team
|
||||
if not is_team_admin_or_owner(tenant_id, current_user.id):
|
||||
return get_json_result(
|
||||
data=False,
|
||||
message='Only team owners or admins can add users.',
|
||||
code=RetCode.PERMISSION_ERROR
|
||||
)
|
||||
|
||||
req = request.json
|
||||
users_input = req.get("users", [])
|
||||
|
||||
if not isinstance(users_input, list) or len(users_input) == 0:
|
||||
return get_json_result(
|
||||
data=False,
|
||||
message="'users' must be a non-empty array.",
|
||||
code=RetCode.ARGUMENT_ERROR
|
||||
)
|
||||
|
||||
added_users = []
|
||||
failed_users = []
|
||||
|
||||
for user_input in users_input:
|
||||
# Handle both string (email) and object formats
|
||||
if isinstance(user_input, str):
|
||||
email = user_input
|
||||
role = UserTenantRole.NORMAL.value
|
||||
elif isinstance(user_input, dict):
|
||||
email = user_input.get("email")
|
||||
role = user_input.get("role", UserTenantRole.NORMAL.value)
|
||||
else:
|
||||
failed_users.append({
|
||||
"email": str(user_input),
|
||||
"error": "Invalid format. Must be a string (email) or object with 'email' and optional 'role'."
|
||||
})
|
||||
continue
|
||||
|
||||
if not email:
|
||||
failed_users.append({
|
||||
"email": str(user_input),
|
||||
"error": "Email is required."
|
||||
})
|
||||
continue
|
||||
|
||||
# Validate role
|
||||
if role not in [UserTenantRole.NORMAL.value, UserTenantRole.ADMIN.value]:
|
||||
failed_users.append({
|
||||
"email": email,
|
||||
"error": f"Invalid role '{role}'. Allowed roles: {UserTenantRole.NORMAL.value}, {UserTenantRole.ADMIN.value}"
|
||||
})
|
||||
continue
|
||||
|
||||
try:
|
||||
# Find user by email
|
||||
invite_users = UserService.query(email=email)
|
||||
if not invite_users:
|
||||
failed_users.append({
|
||||
"email": email,
|
||||
"error": f"User with email '{email}' not found."
|
||||
})
|
||||
continue
|
||||
|
||||
user_id_to_add = invite_users[0].id
|
||||
|
||||
# Check if user is already in the team
|
||||
existing_user_tenants = UserTenantService.query(user_id=user_id_to_add, tenant_id=tenant_id)
|
||||
if existing_user_tenants:
|
||||
existing_role = existing_user_tenants[0].role
|
||||
if existing_role in [UserTenantRole.NORMAL, UserTenantRole.ADMIN]:
|
||||
failed_users.append({
|
||||
"email": email,
|
||||
"error": f"User is already a member of the team with role '{existing_role}'."
|
||||
})
|
||||
continue
|
||||
if existing_role == UserTenantRole.OWNER:
|
||||
failed_users.append({
|
||||
"email": email,
|
||||
"error": "User is the owner of the team and cannot be added again."
|
||||
})
|
||||
continue
|
||||
# If user has INVITE role, update to the requested role
|
||||
if existing_role == UserTenantRole.INVITE:
|
||||
UserTenantService.filter_update(
|
||||
[UserTenant.tenant_id == tenant_id, UserTenant.user_id == user_id_to_add],
|
||||
{"role": role, "status": StatusEnum.VALID.value}
|
||||
)
|
||||
usr = invite_users[0].to_dict()
|
||||
usr = {k: v for k, v in usr.items() if k in ["id", "avatar", "email", "nickname"]}
|
||||
usr["role"] = role
|
||||
added_users.append(usr)
|
||||
continue
|
||||
|
||||
# Add user to team
|
||||
UserTenantService.save(
|
||||
id=get_uuid(),
|
||||
user_id=user_id_to_add,
|
||||
tenant_id=tenant_id,
|
||||
invited_by=current_user.id,
|
||||
role=role,
|
||||
status=StatusEnum.VALID.value
|
||||
)
|
||||
|
||||
# Send invitation email if configured
|
||||
if smtp_mail_server and settings.SMTP_CONF:
|
||||
from threading import Thread
|
||||
user_name = ""
|
||||
_, user = UserService.get_by_id(current_user.id)
|
||||
if user:
|
||||
user_name = user.nickname
|
||||
Thread(
|
||||
target=send_invite_email,
|
||||
args=(email, settings.MAIL_FRONTEND_URL, tenant_id, user_name or current_user.email),
|
||||
daemon=True
|
||||
).start()
|
||||
|
||||
usr = invite_users[0].to_dict()
|
||||
usr = {k: v for k, v in usr.items() if k in ["id", "avatar", "email", "nickname"]}
|
||||
usr["role"] = role
|
||||
added_users.append(usr)
|
||||
|
||||
except Exception as e:
|
||||
logging.exception(f"Error adding user {email}: {e}")
|
||||
failed_users.append({
|
||||
"email": email,
|
||||
"error": f"Failed to add user: {str(e)}"
|
||||
})
|
||||
|
||||
result = {
|
||||
"added": added_users,
|
||||
"failed": failed_users
|
||||
}
|
||||
|
||||
if failed_users and not added_users:
|
||||
return get_json_result(
|
||||
data=result,
|
||||
message=f"Failed to add all users. {len(failed_users)} error(s).",
|
||||
code=RetCode.DATA_ERROR
|
||||
)
|
||||
elif failed_users:
|
||||
return get_json_result(
|
||||
data=result,
|
||||
message=f"Added {len(added_users)} user(s). {len(failed_users)} user(s) failed."
|
||||
)
|
||||
else:
|
||||
return get_json_result(
|
||||
data=result,
|
||||
message=f"Successfully added {len(added_users)} user(s)."
|
||||
)
|
||||
|
||||
|
||||
@manager.route('/<tenant_id>/users/remove', methods=['POST']) # noqa: F821
|
||||
@login_required
|
||||
@validate_request("user_ids")
|
||||
def remove_users(tenant_id):
|
||||
"""
|
||||
Remove one or more users from a team. Only OWNER or ADMIN can remove users.
|
||||
Owners cannot be removed. Supports both single user and bulk operations.
|
||||
|
||||
---
|
||||
tags:
|
||||
- Team
|
||||
security:
|
||||
- ApiKeyAuth: []
|
||||
parameters:
|
||||
- in: path
|
||||
name: tenant_id
|
||||
required: true
|
||||
type: string
|
||||
description: Team ID
|
||||
- in: body
|
||||
name: body
|
||||
required: true
|
||||
schema:
|
||||
type: object
|
||||
required:
|
||||
- user_ids
|
||||
properties:
|
||||
user_ids:
|
||||
type: array
|
||||
description: List of user IDs to remove
|
||||
items:
|
||||
type: string
|
||||
responses:
|
||||
200:
|
||||
description: Users removed successfully
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
data:
|
||||
type: object
|
||||
properties:
|
||||
removed:
|
||||
type: array
|
||||
description: Successfully removed user IDs
|
||||
failed:
|
||||
type: array
|
||||
description: Users that failed to be removed with error messages
|
||||
message:
|
||||
type: string
|
||||
400:
|
||||
description: Invalid request
|
||||
401:
|
||||
description: Unauthorized
|
||||
403:
|
||||
description: Forbidden - not owner or admin
|
||||
"""
|
||||
# Check if current user is OWNER or ADMIN of the team
|
||||
if not is_team_admin_or_owner(tenant_id, current_user.id):
|
||||
return get_json_result(
|
||||
data=False,
|
||||
message='Only team owners or admins can remove users.',
|
||||
code=RetCode.PERMISSION_ERROR
|
||||
)
|
||||
|
||||
req = request.json
|
||||
user_ids = req.get("user_ids", [])
|
||||
|
||||
if not isinstance(user_ids, list) or len(user_ids) == 0:
|
||||
return get_json_result(
|
||||
data=False,
|
||||
message="'user_ids' must be a non-empty array.",
|
||||
code=RetCode.ARGUMENT_ERROR
|
||||
)
|
||||
|
||||
removed_users = []
|
||||
failed_users = []
|
||||
|
||||
# Get all admins/owners for validation (check if removing would leave team without admin/owner)
|
||||
all_user_tenants = UserTenantService.query(tenant_id=tenant_id)
|
||||
admin_owner_ids = {
|
||||
ut.user_id for ut in all_user_tenants
|
||||
if ut.role in [UserTenantRole.OWNER, UserTenantRole.ADMIN] and ut.status == StatusEnum.VALID.value
|
||||
}
|
||||
|
||||
for user_id in user_ids:
|
||||
if not isinstance(user_id, str):
|
||||
failed_users.append({
|
||||
"user_id": str(user_id),
|
||||
"error": "Invalid user ID format."
|
||||
})
|
||||
continue
|
||||
|
||||
try:
|
||||
# Check if user exists in the team
|
||||
user_tenant = UserTenantService.filter_by_tenant_and_user_id(tenant_id, user_id)
|
||||
if not user_tenant:
|
||||
failed_users.append({
|
||||
"user_id": user_id,
|
||||
"error": "User is not a member of this team."
|
||||
})
|
||||
continue
|
||||
|
||||
# Prevent removing the owner
|
||||
if user_tenant.role == UserTenantRole.OWNER:
|
||||
failed_users.append({
|
||||
"user_id": user_id,
|
||||
"error": "Cannot remove the team owner."
|
||||
})
|
||||
continue
|
||||
|
||||
# Prevent removing yourself if you're the only admin
|
||||
if user_id == current_user.id and user_tenant.role == UserTenantRole.ADMIN:
|
||||
remaining_admins = admin_owner_ids - {user_id}
|
||||
if len(remaining_admins) == 0:
|
||||
failed_users.append({
|
||||
"user_id": user_id,
|
||||
"error": "Cannot remove yourself. At least one owner or admin must remain in the team."
|
||||
})
|
||||
continue
|
||||
|
||||
# Remove user from team
|
||||
UserTenantService.filter_delete([
|
||||
UserTenant.tenant_id == tenant_id,
|
||||
UserTenant.user_id == user_id
|
||||
])
|
||||
|
||||
# Get user info for response
|
||||
user = UserService.filter_by_id(user_id)
|
||||
user_email = user.email if user else "Unknown"
|
||||
|
||||
removed_users.append({
|
||||
"user_id": user_id,
|
||||
"email": user_email
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logging.exception(f"Error removing user {user_id}: {e}")
|
||||
failed_users.append({
|
||||
"user_id": user_id,
|
||||
"error": f"Failed to remove user: {str(e)}"
|
||||
})
|
||||
|
||||
result = {
|
||||
"removed": removed_users,
|
||||
"failed": failed_users
|
||||
}
|
||||
|
||||
if failed_users and not removed_users:
|
||||
return get_json_result(
|
||||
data=result,
|
||||
message=f"Failed to remove all users. {len(failed_users)} error(s).",
|
||||
code=RetCode.DATA_ERROR
|
||||
)
|
||||
elif failed_users:
|
||||
return get_json_result(
|
||||
data=result,
|
||||
message=f"Removed {len(removed_users)} user(s). {len(failed_users)} user(s) failed."
|
||||
)
|
||||
else:
|
||||
return get_json_result(
|
||||
data=result,
|
||||
message=f"Successfully removed {len(removed_users)} user(s)."
|
||||
)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,477 @@
|
|||
#
|
||||
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from common import (
|
||||
add_users_to_team,
|
||||
create_team,
|
||||
create_user,
|
||||
remove_users_from_team,
|
||||
)
|
||||
from configs import INVALID_API_TOKEN
|
||||
from libs.auth import RAGFlowWebApiAuth
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test Classes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.p1
|
||||
class TestAddUsersAuthorization:
|
||||
"""Tests for authentication behavior when adding users to a team."""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("invalid_auth", "expected_code"),
|
||||
[
|
||||
(None, 401),
|
||||
(RAGFlowWebApiAuth(INVALID_API_TOKEN), 401),
|
||||
],
|
||||
)
|
||||
def test_add_users_invalid_auth(
|
||||
self,
|
||||
invalid_auth: RAGFlowWebApiAuth | None,
|
||||
expected_code: int,
|
||||
WebApiAuth: RAGFlowWebApiAuth,
|
||||
) -> None:
|
||||
"""Test adding users with invalid or missing authentication."""
|
||||
# Create a team first
|
||||
team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"}
|
||||
team_res: dict[str, Any] = create_team(WebApiAuth, team_payload)
|
||||
tenant_id: str = team_res["data"]["id"]
|
||||
|
||||
# Try to add users with invalid auth
|
||||
add_payload: dict[str, list[str]] = {"users": ["test@example.com"]}
|
||||
res: dict[str, Any] = add_users_to_team(invalid_auth, tenant_id, add_payload)
|
||||
assert res["code"] == expected_code
|
||||
|
||||
|
||||
@pytest.mark.p1
|
||||
class TestAddUsers:
|
||||
"""Comprehensive tests for adding users to a team."""
|
||||
|
||||
@pytest.fixture
|
||||
def test_team(self, WebApiAuth: RAGFlowWebApiAuth) -> dict[str, Any]:
|
||||
"""Create a test team for use in tests."""
|
||||
team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"}
|
||||
res: dict[str, Any] = create_team(WebApiAuth, team_payload)
|
||||
assert res["code"] == 0
|
||||
return res["data"]
|
||||
|
||||
@pytest.fixture
|
||||
def test_users(self, WebApiAuth: RAGFlowWebApiAuth) -> list[dict[str, Any]]:
|
||||
"""Create test users for use in tests."""
|
||||
users = []
|
||||
for i in range(5):
|
||||
email = f"testuser{i}_{uuid.uuid4().hex[:8]}@example.com"
|
||||
password = "TestPassword123!"
|
||||
user_payload: dict[str, str] = {
|
||||
"email": email,
|
||||
"password": password,
|
||||
"nickname": f"Test User {i}",
|
||||
}
|
||||
user_res: dict[str, Any] = create_user(WebApiAuth, user_payload)
|
||||
if user_res["code"] == 0:
|
||||
users.append({"email": email, "id": user_res["data"]["id"]})
|
||||
return users
|
||||
|
||||
@pytest.mark.p1
|
||||
def test_add_single_user_with_email_string(
|
||||
self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
|
||||
) -> None:
|
||||
"""Test adding a single user using email string format."""
|
||||
tenant_id: str = test_team["id"]
|
||||
user_email: str = test_users[0]["email"]
|
||||
|
||||
add_payload: dict[str, list[str]] = {"users": [user_email]}
|
||||
res: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload)
|
||||
|
||||
assert res["code"] == 0
|
||||
assert "data" in res
|
||||
assert "added" in res["data"]
|
||||
assert len(res["data"]["added"]) == 1
|
||||
assert res["data"]["added"][0]["email"] == user_email
|
||||
assert res["data"]["added"][0]["role"] == "normal"
|
||||
assert "failed" in res["data"]
|
||||
assert len(res["data"]["failed"]) == 0
|
||||
|
||||
@pytest.mark.p1
|
||||
def test_add_single_user_with_role(
|
||||
self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
|
||||
) -> None:
|
||||
"""Test adding a single user with admin role."""
|
||||
tenant_id: str = test_team["id"]
|
||||
user_email: str = test_users[0]["email"]
|
||||
|
||||
add_payload: dict[str, list[dict[str, str]]] = {
|
||||
"users": [{"email": user_email, "role": "admin"}]
|
||||
}
|
||||
res: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload)
|
||||
|
||||
assert res["code"] == 0
|
||||
assert len(res["data"]["added"]) == 1
|
||||
assert res["data"]["added"][0]["email"] == user_email
|
||||
assert res["data"]["added"][0]["role"] == "admin"
|
||||
|
||||
@pytest.mark.p1
|
||||
def test_add_multiple_users(
|
||||
self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
|
||||
) -> None:
|
||||
"""Test adding multiple users in bulk."""
|
||||
tenant_id: str = test_team["id"]
|
||||
user_emails: list[str] = [user["email"] for user in test_users[:3]]
|
||||
|
||||
add_payload: dict[str, list[str]] = {"users": user_emails}
|
||||
res: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload)
|
||||
|
||||
assert res["code"] == 0
|
||||
assert len(res["data"]["added"]) == 3
|
||||
assert len(res["data"]["failed"]) == 0
|
||||
added_emails = {user["email"] for user in res["data"]["added"]}
|
||||
assert added_emails == set(user_emails)
|
||||
|
||||
@pytest.mark.p1
|
||||
def test_add_users_mixed_formats(
|
||||
self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
|
||||
) -> None:
|
||||
"""Test adding users with mixed string and object formats."""
|
||||
tenant_id: str = test_team["id"]
|
||||
|
||||
add_payload: dict[str, list[Any]] = {
|
||||
"users": [
|
||||
test_users[0]["email"], # String format
|
||||
{"email": test_users[1]["email"], "role": "admin"}, # Object format
|
||||
test_users[2]["email"], # String format
|
||||
]
|
||||
}
|
||||
res: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload)
|
||||
|
||||
assert res["code"] == 0
|
||||
assert len(res["data"]["added"]) == 3
|
||||
assert res["data"]["added"][0]["role"] == "normal" # String format defaults to normal
|
||||
assert res["data"]["added"][1]["role"] == "admin" # Object format with admin role
|
||||
assert res["data"]["added"][2]["role"] == "normal" # String format defaults to normal
|
||||
|
||||
@pytest.mark.p1
|
||||
def test_add_user_unregistered_email(
|
||||
self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any]
|
||||
) -> None:
|
||||
"""Test adding a user with unregistered email."""
|
||||
tenant_id: str = test_team["id"]
|
||||
unregistered_email: str = f"unregistered_{uuid.uuid4().hex[:8]}@example.com"
|
||||
|
||||
add_payload: dict[str, list[str]] = {"users": [unregistered_email]}
|
||||
res: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload)
|
||||
|
||||
assert res["code"] == 102 # DATA_ERROR
|
||||
assert len(res["data"]["added"]) == 0
|
||||
assert len(res["data"]["failed"]) == 1
|
||||
assert "not found" in res["data"]["failed"][0]["error"].lower()
|
||||
|
||||
@pytest.mark.p1
|
||||
def test_add_user_already_member(
|
||||
self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
|
||||
) -> None:
|
||||
"""Test adding a user who is already a member of the team."""
|
||||
tenant_id: str = test_team["id"]
|
||||
user_email: str = test_users[0]["email"]
|
||||
|
||||
# Add user first time
|
||||
add_payload: dict[str, list[str]] = {"users": [user_email]}
|
||||
res1: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload)
|
||||
assert res1["code"] == 0
|
||||
|
||||
# Try to add same user again
|
||||
res2: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload)
|
||||
assert res2["code"] == 0 # Returns success but with failed entry
|
||||
assert len(res2["data"]["added"]) == 0
|
||||
assert len(res2["data"]["failed"]) == 1
|
||||
assert "already a member" in res2["data"]["failed"][0]["error"].lower()
|
||||
|
||||
@pytest.mark.p1
|
||||
def test_add_users_partial_success(
|
||||
self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
|
||||
) -> None:
|
||||
"""Test adding users where some succeed and some fail."""
|
||||
tenant_id: str = test_team["id"]
|
||||
unregistered_email: str = f"unregistered_{uuid.uuid4().hex[:8]}@example.com"
|
||||
|
||||
add_payload: dict[str, list[str]] = {
|
||||
"users": [test_users[0]["email"], unregistered_email, test_users[1]["email"]]
|
||||
}
|
||||
res: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload)
|
||||
|
||||
assert res["code"] == 0
|
||||
assert len(res["data"]["added"]) == 2
|
||||
assert len(res["data"]["failed"]) == 1
|
||||
assert "not found" in res["data"]["failed"][0]["error"].lower()
|
||||
|
||||
@pytest.mark.p1
|
||||
def test_add_users_empty_list(
|
||||
self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any]
|
||||
) -> None:
|
||||
"""Test adding users with empty list."""
|
||||
tenant_id: str = test_team["id"]
|
||||
add_payload: dict[str, list[str]] = {"users": []}
|
||||
|
||||
res: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload)
|
||||
assert res["code"] == 101 # ARGUMENT_ERROR
|
||||
assert "non-empty" in res["message"].lower() or "empty" in res["message"].lower()
|
||||
|
||||
@pytest.mark.p1
|
||||
def test_add_users_missing_users_field(
|
||||
self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any]
|
||||
) -> None:
|
||||
"""Test adding users without 'users' field."""
|
||||
tenant_id: str = test_team["id"]
|
||||
add_payload: dict[str, Any] = {}
|
||||
|
||||
res: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload)
|
||||
assert res["code"] == 101 # ARGUMENT_ERROR
|
||||
|
||||
@pytest.mark.p1
|
||||
def test_add_users_invalid_role(
|
||||
self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
|
||||
) -> None:
|
||||
"""Test adding user with invalid role."""
|
||||
tenant_id: str = test_team["id"]
|
||||
user_email: str = test_users[0]["email"]
|
||||
|
||||
add_payload: dict[str, list[dict[str, str]]] = {
|
||||
"users": [{"email": user_email, "role": "invalid_role"}]
|
||||
}
|
||||
res: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload)
|
||||
|
||||
assert res["code"] == 0 # Returns success but with failed entry
|
||||
assert len(res["data"]["added"]) == 0
|
||||
assert len(res["data"]["failed"]) == 1
|
||||
assert "invalid role" in res["data"]["failed"][0]["error"].lower()
|
||||
|
||||
@pytest.mark.p2
|
||||
def test_add_users_not_owner_or_admin(
|
||||
self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
|
||||
) -> None:
|
||||
"""Test that non-admin/non-owner users cannot add users."""
|
||||
tenant_id: str = test_team["id"]
|
||||
user_email: str = test_users[0]["email"]
|
||||
|
||||
# Add a normal user to the team
|
||||
add_payload: dict[str, list[str]] = {"users": [user_email]}
|
||||
add_res: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload)
|
||||
assert add_res["code"] == 0
|
||||
|
||||
# Create auth for the normal user (this would require getting their token)
|
||||
# For now, we'll test that owner/admin can add users
|
||||
# This test would need the normal user's auth token to fully test
|
||||
pass
|
||||
|
||||
|
||||
@pytest.mark.p1
|
||||
class TestRemoveUsers:
|
||||
"""Comprehensive tests for removing users from a team."""
|
||||
|
||||
@pytest.fixture
|
||||
def test_team(self, WebApiAuth: RAGFlowWebApiAuth) -> dict[str, Any]:
|
||||
"""Create a test team for use in tests."""
|
||||
team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"}
|
||||
res: dict[str, Any] = create_team(WebApiAuth, team_payload)
|
||||
assert res["code"] == 0
|
||||
return res["data"]
|
||||
|
||||
@pytest.fixture
|
||||
def test_users(self, WebApiAuth: RAGFlowWebApiAuth) -> list[dict[str, Any]]:
|
||||
"""Create test users for use in tests."""
|
||||
users = []
|
||||
for i in range(5):
|
||||
email = f"testuser{i}_{uuid.uuid4().hex[:8]}@example.com"
|
||||
password = "TestPassword123!"
|
||||
user_payload: dict[str, str] = {
|
||||
"email": email,
|
||||
"password": password,
|
||||
"nickname": f"Test User {i}",
|
||||
}
|
||||
user_res: dict[str, Any] = create_user(WebApiAuth, user_payload)
|
||||
if user_res["code"] == 0:
|
||||
users.append({"email": email, "id": user_res["data"]["id"]})
|
||||
return users
|
||||
|
||||
@pytest.fixture
|
||||
def team_with_users(
|
||||
self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
|
||||
) -> dict[str, Any]:
|
||||
"""Create a team with users already added."""
|
||||
tenant_id: str = test_team["id"]
|
||||
user_emails: list[str] = [user["email"] for user in test_users[:3]]
|
||||
|
||||
add_payload: dict[str, list[str]] = {"users": user_emails}
|
||||
add_res: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload)
|
||||
assert add_res["code"] == 0
|
||||
|
||||
return {
|
||||
"team": test_team,
|
||||
"users": test_users[:3],
|
||||
}
|
||||
|
||||
@pytest.mark.p1
|
||||
def test_remove_single_user(
|
||||
self, WebApiAuth: RAGFlowWebApiAuth, team_with_users: dict[str, Any]
|
||||
) -> None:
|
||||
"""Test removing a single user from a team."""
|
||||
tenant_id: str = team_with_users["team"]["id"]
|
||||
user_id: str = team_with_users["users"][0]["id"]
|
||||
|
||||
remove_payload: dict[str, list[str]] = {"user_ids": [user_id]}
|
||||
res: dict[str, Any] = remove_users_from_team(WebApiAuth, tenant_id, remove_payload)
|
||||
|
||||
assert res["code"] == 0
|
||||
assert "data" in res
|
||||
assert "removed" in res["data"]
|
||||
assert len(res["data"]["removed"]) == 1
|
||||
assert res["data"]["removed"][0]["user_id"] == user_id
|
||||
assert "failed" in res["data"]
|
||||
assert len(res["data"]["failed"]) == 0
|
||||
|
||||
@pytest.mark.p1
|
||||
def test_remove_multiple_users(
|
||||
self, WebApiAuth: RAGFlowWebApiAuth, team_with_users: dict[str, Any]
|
||||
) -> None:
|
||||
"""Test removing multiple users in bulk."""
|
||||
tenant_id: str = team_with_users["team"]["id"]
|
||||
user_ids: list[str] = [user["id"] for user in team_with_users["users"][:2]]
|
||||
|
||||
remove_payload: dict[str, list[str]] = {"user_ids": user_ids}
|
||||
res: dict[str, Any] = remove_users_from_team(WebApiAuth, tenant_id, remove_payload)
|
||||
|
||||
assert res["code"] == 0
|
||||
assert len(res["data"]["removed"]) == 2
|
||||
assert len(res["data"]["failed"]) == 0
|
||||
removed_ids = {user["user_id"] for user in res["data"]["removed"]}
|
||||
assert removed_ids == set(user_ids)
|
||||
|
||||
@pytest.mark.p1
|
||||
def test_remove_user_not_in_team(
|
||||
self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
|
||||
) -> None:
|
||||
"""Test removing a user who is not a member of the team."""
|
||||
tenant_id: str = test_team["id"]
|
||||
# Use a user that was not added to the team
|
||||
user_id: str = test_users[3]["id"]
|
||||
|
||||
remove_payload: dict[str, list[str]] = {"user_ids": [user_id]}
|
||||
res: dict[str, Any] = remove_users_from_team(WebApiAuth, tenant_id, remove_payload)
|
||||
|
||||
assert res["code"] == 0 # Returns success but with failed entry
|
||||
assert len(res["data"]["removed"]) == 0
|
||||
assert len(res["data"]["failed"]) == 1
|
||||
assert "not a member" in res["data"]["failed"][0]["error"].lower()
|
||||
|
||||
@pytest.mark.p1
|
||||
def test_remove_owner(
|
||||
self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any]
|
||||
) -> None:
|
||||
"""Test that owner cannot be removed."""
|
||||
tenant_id: str = test_team["id"]
|
||||
owner_id: str = test_team["owner_id"]
|
||||
|
||||
remove_payload: dict[str, list[str]] = {"user_ids": [owner_id]}
|
||||
res: dict[str, Any] = remove_users_from_team(WebApiAuth, tenant_id, remove_payload)
|
||||
|
||||
assert res["code"] == 0 # Returns success but with failed entry
|
||||
assert len(res["data"]["removed"]) == 0
|
||||
assert len(res["data"]["failed"]) == 1
|
||||
assert "owner" in res["data"]["failed"][0]["error"].lower()
|
||||
|
||||
@pytest.mark.p1
|
||||
def test_remove_users_partial_success(
|
||||
self, WebApiAuth: RAGFlowWebApiAuth, team_with_users: dict[str, Any], test_users: list[dict[str, Any]]
|
||||
) -> None:
|
||||
"""Test removing users where some succeed and some fail."""
|
||||
tenant_id: str = team_with_users["team"]["id"]
|
||||
# Mix of valid and invalid user IDs
|
||||
valid_user_id: str = team_with_users["users"][0]["id"]
|
||||
invalid_user_id: str = test_users[3]["id"] # Not in team
|
||||
|
||||
remove_payload: dict[str, list[str]] = {"user_ids": [valid_user_id, invalid_user_id]}
|
||||
res: dict[str, Any] = remove_users_from_team(WebApiAuth, tenant_id, remove_payload)
|
||||
|
||||
assert res["code"] == 0
|
||||
assert len(res["data"]["removed"]) == 1
|
||||
assert len(res["data"]["failed"]) == 1
|
||||
assert res["data"]["removed"][0]["user_id"] == valid_user_id
|
||||
assert "not a member" in res["data"]["failed"][0]["error"].lower()
|
||||
|
||||
@pytest.mark.p1
|
||||
def test_remove_users_empty_list(
|
||||
self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any]
|
||||
) -> None:
|
||||
"""Test removing users with empty list."""
|
||||
tenant_id: str = test_team["id"]
|
||||
remove_payload: dict[str, list[str]] = {"user_ids": []}
|
||||
|
||||
res: dict[str, Any] = remove_users_from_team(WebApiAuth, tenant_id, remove_payload)
|
||||
assert res["code"] == 101 # ARGUMENT_ERROR
|
||||
assert "non-empty" in res["message"].lower() or "empty" in res["message"].lower()
|
||||
|
||||
@pytest.mark.p1
|
||||
def test_remove_users_missing_user_ids_field(
|
||||
self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any]
|
||||
) -> None:
|
||||
"""Test removing users without 'user_ids' field."""
|
||||
tenant_id: str = test_team["id"]
|
||||
remove_payload: dict[str, Any] = {}
|
||||
|
||||
res: dict[str, Any] = remove_users_from_team(WebApiAuth, tenant_id, remove_payload)
|
||||
assert res["code"] == 101 # ARGUMENT_ERROR
|
||||
|
||||
@pytest.mark.p1
|
||||
def test_remove_users_invalid_user_id_format(
|
||||
self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any]
|
||||
) -> None:
|
||||
"""Test removing users with invalid user ID format."""
|
||||
tenant_id: str = test_team["id"]
|
||||
remove_payload: dict[str, list[Any]] = {"user_ids": [12345]} # Not a string
|
||||
|
||||
res: dict[str, Any] = remove_users_from_team(WebApiAuth, tenant_id, remove_payload)
|
||||
assert res["code"] == 0 # Returns success but with failed entry
|
||||
assert len(res["data"]["removed"]) == 0
|
||||
assert len(res["data"]["failed"]) == 1
|
||||
assert "invalid" in res["data"]["failed"][0]["error"].lower()
|
||||
|
||||
@pytest.mark.p2
|
||||
def test_remove_last_admin(
|
||||
self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
|
||||
) -> None:
|
||||
"""Test that the last admin cannot remove themselves."""
|
||||
tenant_id: str = test_team["id"]
|
||||
user_email: str = test_users[0]["email"]
|
||||
|
||||
# Add user as admin
|
||||
add_payload: dict[str, list[dict[str, str]]] = {
|
||||
"users": [{"email": user_email, "role": "admin"}]
|
||||
}
|
||||
add_res: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload)
|
||||
assert add_res["code"] == 0
|
||||
admin_user_id: str = add_res["data"]["added"][0]["id"]
|
||||
|
||||
# Try to remove the admin (would need admin's auth token to fully test)
|
||||
# This test would require the admin user's authentication
|
||||
pass
|
||||
|
||||
Loading…
Add table
Reference in a new issue