diff --git a/api/apps/tenant_app.py b/api/apps/tenant_app.py index e00a91c86..f8a060a09 100644 --- a/api/apps/tenant_app.py +++ b/api/apps/tenant_app.py @@ -44,6 +44,23 @@ from api.utils.web_utils import send_invite_email from common import settings +def is_team_admin_or_owner(tenant_id: str, user_id: str) -> bool: + """ + Check if a user is an OWNER or ADMIN of a team. + + Args: + tenant_id: The team/tenant ID + user_id: The user ID to check + + Returns: + True if user is OWNER or ADMIN, False otherwise + """ + user_tenant = UserTenantService.filter_by_tenant_and_user_id(tenant_id, user_id) + if not user_tenant: + return False + return user_tenant.role in [UserTenantRole.OWNER, UserTenantRole.ADMIN] + + @manager.route("//user/list", methods=["GET"]) # noqa: F821 @login_required def user_list(tenant_id): @@ -435,3 +452,384 @@ def agree(tenant_id): return get_json_result(data=True) except Exception as e: return server_error_response(e) + + +@manager.route('//users/add', methods=['POST']) # noqa: F821 +@login_required +@validate_request("users") +def add_users(tenant_id): + """ + Add one or more users to a team. Only OWNER or ADMIN can add users. + Supports both single user and bulk operations. + + --- + tags: + - Team + security: + - ApiKeyAuth: [] + parameters: + - in: path + name: tenant_id + required: true + type: string + description: Team ID + - in: body + name: body + required: true + schema: + type: object + required: + - users + properties: + users: + type: array + description: List of users to add. Each user can be an email string or an object with email and role. + items: + oneOf: + - type: string + description: User email (will be added with 'normal' role) + - type: object + properties: + email: + type: string + description: User email + role: + type: string + description: Role to assign (normal, admin). Defaults to normal. + enum: [normal, admin] + responses: + 200: + description: Users added successfully + schema: + type: object + properties: + data: + type: object + properties: + added: + type: array + description: Successfully added users + failed: + type: array + description: Users that failed to be added with error messages + message: + type: string + 400: + description: Invalid request + 401: + description: Unauthorized + 403: + description: Forbidden - not owner or admin + """ + # Check if current user is OWNER or ADMIN of the team + if not is_team_admin_or_owner(tenant_id, current_user.id): + return get_json_result( + data=False, + message='Only team owners or admins can add users.', + code=RetCode.PERMISSION_ERROR + ) + + req = request.json + users_input = req.get("users", []) + + if not isinstance(users_input, list) or len(users_input) == 0: + return get_json_result( + data=False, + message="'users' must be a non-empty array.", + code=RetCode.ARGUMENT_ERROR + ) + + added_users = [] + failed_users = [] + + for user_input in users_input: + # Handle both string (email) and object formats + if isinstance(user_input, str): + email = user_input + role = UserTenantRole.NORMAL.value + elif isinstance(user_input, dict): + email = user_input.get("email") + role = user_input.get("role", UserTenantRole.NORMAL.value) + else: + failed_users.append({ + "email": str(user_input), + "error": "Invalid format. Must be a string (email) or object with 'email' and optional 'role'." + }) + continue + + if not email: + failed_users.append({ + "email": str(user_input), + "error": "Email is required." + }) + continue + + # Validate role + if role not in [UserTenantRole.NORMAL.value, UserTenantRole.ADMIN.value]: + failed_users.append({ + "email": email, + "error": f"Invalid role '{role}'. Allowed roles: {UserTenantRole.NORMAL.value}, {UserTenantRole.ADMIN.value}" + }) + continue + + try: + # Find user by email + invite_users = UserService.query(email=email) + if not invite_users: + failed_users.append({ + "email": email, + "error": f"User with email '{email}' not found." + }) + continue + + user_id_to_add = invite_users[0].id + + # Check if user is already in the team + existing_user_tenants = UserTenantService.query(user_id=user_id_to_add, tenant_id=tenant_id) + if existing_user_tenants: + existing_role = existing_user_tenants[0].role + if existing_role in [UserTenantRole.NORMAL, UserTenantRole.ADMIN]: + failed_users.append({ + "email": email, + "error": f"User is already a member of the team with role '{existing_role}'." + }) + continue + if existing_role == UserTenantRole.OWNER: + failed_users.append({ + "email": email, + "error": "User is the owner of the team and cannot be added again." + }) + continue + # If user has INVITE role, update to the requested role + if existing_role == UserTenantRole.INVITE: + UserTenantService.filter_update( + [UserTenant.tenant_id == tenant_id, UserTenant.user_id == user_id_to_add], + {"role": role, "status": StatusEnum.VALID.value} + ) + usr = invite_users[0].to_dict() + usr = {k: v for k, v in usr.items() if k in ["id", "avatar", "email", "nickname"]} + usr["role"] = role + added_users.append(usr) + continue + + # Add user to team + UserTenantService.save( + id=get_uuid(), + user_id=user_id_to_add, + tenant_id=tenant_id, + invited_by=current_user.id, + role=role, + status=StatusEnum.VALID.value + ) + + # Send invitation email if configured + if smtp_mail_server and settings.SMTP_CONF: + from threading import Thread + user_name = "" + _, user = UserService.get_by_id(current_user.id) + if user: + user_name = user.nickname + Thread( + target=send_invite_email, + args=(email, settings.MAIL_FRONTEND_URL, tenant_id, user_name or current_user.email), + daemon=True + ).start() + + usr = invite_users[0].to_dict() + usr = {k: v for k, v in usr.items() if k in ["id", "avatar", "email", "nickname"]} + usr["role"] = role + added_users.append(usr) + + except Exception as e: + logging.exception(f"Error adding user {email}: {e}") + failed_users.append({ + "email": email, + "error": f"Failed to add user: {str(e)}" + }) + + result = { + "added": added_users, + "failed": failed_users + } + + if failed_users and not added_users: + return get_json_result( + data=result, + message=f"Failed to add all users. {len(failed_users)} error(s).", + code=RetCode.DATA_ERROR + ) + elif failed_users: + return get_json_result( + data=result, + message=f"Added {len(added_users)} user(s). {len(failed_users)} user(s) failed." + ) + else: + return get_json_result( + data=result, + message=f"Successfully added {len(added_users)} user(s)." + ) + + +@manager.route('//users/remove', methods=['POST']) # noqa: F821 +@login_required +@validate_request("user_ids") +def remove_users(tenant_id): + """ + Remove one or more users from a team. Only OWNER or ADMIN can remove users. + Owners cannot be removed. Supports both single user and bulk operations. + + --- + tags: + - Team + security: + - ApiKeyAuth: [] + parameters: + - in: path + name: tenant_id + required: true + type: string + description: Team ID + - in: body + name: body + required: true + schema: + type: object + required: + - user_ids + properties: + user_ids: + type: array + description: List of user IDs to remove + items: + type: string + responses: + 200: + description: Users removed successfully + schema: + type: object + properties: + data: + type: object + properties: + removed: + type: array + description: Successfully removed user IDs + failed: + type: array + description: Users that failed to be removed with error messages + message: + type: string + 400: + description: Invalid request + 401: + description: Unauthorized + 403: + description: Forbidden - not owner or admin + """ + # Check if current user is OWNER or ADMIN of the team + if not is_team_admin_or_owner(tenant_id, current_user.id): + return get_json_result( + data=False, + message='Only team owners or admins can remove users.', + code=RetCode.PERMISSION_ERROR + ) + + req = request.json + user_ids = req.get("user_ids", []) + + if not isinstance(user_ids, list) or len(user_ids) == 0: + return get_json_result( + data=False, + message="'user_ids' must be a non-empty array.", + code=RetCode.ARGUMENT_ERROR + ) + + removed_users = [] + failed_users = [] + + # Get all admins/owners for validation (check if removing would leave team without admin/owner) + all_user_tenants = UserTenantService.query(tenant_id=tenant_id) + admin_owner_ids = { + ut.user_id for ut in all_user_tenants + if ut.role in [UserTenantRole.OWNER, UserTenantRole.ADMIN] and ut.status == StatusEnum.VALID.value + } + + for user_id in user_ids: + if not isinstance(user_id, str): + failed_users.append({ + "user_id": str(user_id), + "error": "Invalid user ID format." + }) + continue + + try: + # Check if user exists in the team + user_tenant = UserTenantService.filter_by_tenant_and_user_id(tenant_id, user_id) + if not user_tenant: + failed_users.append({ + "user_id": user_id, + "error": "User is not a member of this team." + }) + continue + + # Prevent removing the owner + if user_tenant.role == UserTenantRole.OWNER: + failed_users.append({ + "user_id": user_id, + "error": "Cannot remove the team owner." + }) + continue + + # Prevent removing yourself if you're the only admin + if user_id == current_user.id and user_tenant.role == UserTenantRole.ADMIN: + remaining_admins = admin_owner_ids - {user_id} + if len(remaining_admins) == 0: + failed_users.append({ + "user_id": user_id, + "error": "Cannot remove yourself. At least one owner or admin must remain in the team." + }) + continue + + # Remove user from team + UserTenantService.filter_delete([ + UserTenant.tenant_id == tenant_id, + UserTenant.user_id == user_id + ]) + + # Get user info for response + user = UserService.filter_by_id(user_id) + user_email = user.email if user else "Unknown" + + removed_users.append({ + "user_id": user_id, + "email": user_email + }) + + except Exception as e: + logging.exception(f"Error removing user {user_id}: {e}") + failed_users.append({ + "user_id": user_id, + "error": f"Failed to remove user: {str(e)}" + }) + + result = { + "removed": removed_users, + "failed": failed_users + } + + if failed_users and not removed_users: + return get_json_result( + data=result, + message=f"Failed to remove all users. {len(failed_users)} error(s).", + code=RetCode.DATA_ERROR + ) + elif failed_users: + return get_json_result( + data=result, + message=f"Removed {len(removed_users)} user(s). {len(failed_users)} user(s) failed." + ) + else: + return get_json_result( + data=result, + message=f"Successfully removed {len(removed_users)} user(s)." + ) diff --git a/test/testcases/test_http_api/test_team_management/test_team_users.py b/test/testcases/test_http_api/test_team_management/test_team_users.py new file mode 100644 index 000000000..e0294856a --- /dev/null +++ b/test/testcases/test_http_api/test_team_management/test_team_users.py @@ -0,0 +1,477 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import annotations + +import uuid +from typing import Any + +import pytest + +from common import ( + add_users_to_team, + create_team, + create_user, + remove_users_from_team, +) +from configs import INVALID_API_TOKEN +from libs.auth import RAGFlowWebApiAuth + + +# --------------------------------------------------------------------------- +# Test Classes +# --------------------------------------------------------------------------- + + +@pytest.mark.p1 +class TestAddUsersAuthorization: + """Tests for authentication behavior when adding users to a team.""" + + @pytest.mark.parametrize( + ("invalid_auth", "expected_code"), + [ + (None, 401), + (RAGFlowWebApiAuth(INVALID_API_TOKEN), 401), + ], + ) + def test_add_users_invalid_auth( + self, + invalid_auth: RAGFlowWebApiAuth | None, + expected_code: int, + WebApiAuth: RAGFlowWebApiAuth, + ) -> None: + """Test adding users with invalid or missing authentication.""" + # Create a team first + team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"} + team_res: dict[str, Any] = create_team(WebApiAuth, team_payload) + tenant_id: str = team_res["data"]["id"] + + # Try to add users with invalid auth + add_payload: dict[str, list[str]] = {"users": ["test@example.com"]} + res: dict[str, Any] = add_users_to_team(invalid_auth, tenant_id, add_payload) + assert res["code"] == expected_code + + +@pytest.mark.p1 +class TestAddUsers: + """Comprehensive tests for adding users to a team.""" + + @pytest.fixture + def test_team(self, WebApiAuth: RAGFlowWebApiAuth) -> dict[str, Any]: + """Create a test team for use in tests.""" + team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"} + res: dict[str, Any] = create_team(WebApiAuth, team_payload) + assert res["code"] == 0 + return res["data"] + + @pytest.fixture + def test_users(self, WebApiAuth: RAGFlowWebApiAuth) -> list[dict[str, Any]]: + """Create test users for use in tests.""" + users = [] + for i in range(5): + email = f"testuser{i}_{uuid.uuid4().hex[:8]}@example.com" + password = "TestPassword123!" + user_payload: dict[str, str] = { + "email": email, + "password": password, + "nickname": f"Test User {i}", + } + user_res: dict[str, Any] = create_user(WebApiAuth, user_payload) + if user_res["code"] == 0: + users.append({"email": email, "id": user_res["data"]["id"]}) + return users + + @pytest.mark.p1 + def test_add_single_user_with_email_string( + self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] + ) -> None: + """Test adding a single user using email string format.""" + tenant_id: str = test_team["id"] + user_email: str = test_users[0]["email"] + + add_payload: dict[str, list[str]] = {"users": [user_email]} + res: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload) + + assert res["code"] == 0 + assert "data" in res + assert "added" in res["data"] + assert len(res["data"]["added"]) == 1 + assert res["data"]["added"][0]["email"] == user_email + assert res["data"]["added"][0]["role"] == "normal" + assert "failed" in res["data"] + assert len(res["data"]["failed"]) == 0 + + @pytest.mark.p1 + def test_add_single_user_with_role( + self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] + ) -> None: + """Test adding a single user with admin role.""" + tenant_id: str = test_team["id"] + user_email: str = test_users[0]["email"] + + add_payload: dict[str, list[dict[str, str]]] = { + "users": [{"email": user_email, "role": "admin"}] + } + res: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload) + + assert res["code"] == 0 + assert len(res["data"]["added"]) == 1 + assert res["data"]["added"][0]["email"] == user_email + assert res["data"]["added"][0]["role"] == "admin" + + @pytest.mark.p1 + def test_add_multiple_users( + self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] + ) -> None: + """Test adding multiple users in bulk.""" + tenant_id: str = test_team["id"] + user_emails: list[str] = [user["email"] for user in test_users[:3]] + + add_payload: dict[str, list[str]] = {"users": user_emails} + res: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload) + + assert res["code"] == 0 + assert len(res["data"]["added"]) == 3 + assert len(res["data"]["failed"]) == 0 + added_emails = {user["email"] for user in res["data"]["added"]} + assert added_emails == set(user_emails) + + @pytest.mark.p1 + def test_add_users_mixed_formats( + self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] + ) -> None: + """Test adding users with mixed string and object formats.""" + tenant_id: str = test_team["id"] + + add_payload: dict[str, list[Any]] = { + "users": [ + test_users[0]["email"], # String format + {"email": test_users[1]["email"], "role": "admin"}, # Object format + test_users[2]["email"], # String format + ] + } + res: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload) + + assert res["code"] == 0 + assert len(res["data"]["added"]) == 3 + assert res["data"]["added"][0]["role"] == "normal" # String format defaults to normal + assert res["data"]["added"][1]["role"] == "admin" # Object format with admin role + assert res["data"]["added"][2]["role"] == "normal" # String format defaults to normal + + @pytest.mark.p1 + def test_add_user_unregistered_email( + self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any] + ) -> None: + """Test adding a user with unregistered email.""" + tenant_id: str = test_team["id"] + unregistered_email: str = f"unregistered_{uuid.uuid4().hex[:8]}@example.com" + + add_payload: dict[str, list[str]] = {"users": [unregistered_email]} + res: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload) + + assert res["code"] == 102 # DATA_ERROR + assert len(res["data"]["added"]) == 0 + assert len(res["data"]["failed"]) == 1 + assert "not found" in res["data"]["failed"][0]["error"].lower() + + @pytest.mark.p1 + def test_add_user_already_member( + self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] + ) -> None: + """Test adding a user who is already a member of the team.""" + tenant_id: str = test_team["id"] + user_email: str = test_users[0]["email"] + + # Add user first time + add_payload: dict[str, list[str]] = {"users": [user_email]} + res1: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload) + assert res1["code"] == 0 + + # Try to add same user again + res2: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload) + assert res2["code"] == 0 # Returns success but with failed entry + assert len(res2["data"]["added"]) == 0 + assert len(res2["data"]["failed"]) == 1 + assert "already a member" in res2["data"]["failed"][0]["error"].lower() + + @pytest.mark.p1 + def test_add_users_partial_success( + self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] + ) -> None: + """Test adding users where some succeed and some fail.""" + tenant_id: str = test_team["id"] + unregistered_email: str = f"unregistered_{uuid.uuid4().hex[:8]}@example.com" + + add_payload: dict[str, list[str]] = { + "users": [test_users[0]["email"], unregistered_email, test_users[1]["email"]] + } + res: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload) + + assert res["code"] == 0 + assert len(res["data"]["added"]) == 2 + assert len(res["data"]["failed"]) == 1 + assert "not found" in res["data"]["failed"][0]["error"].lower() + + @pytest.mark.p1 + def test_add_users_empty_list( + self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any] + ) -> None: + """Test adding users with empty list.""" + tenant_id: str = test_team["id"] + add_payload: dict[str, list[str]] = {"users": []} + + res: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload) + assert res["code"] == 101 # ARGUMENT_ERROR + assert "non-empty" in res["message"].lower() or "empty" in res["message"].lower() + + @pytest.mark.p1 + def test_add_users_missing_users_field( + self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any] + ) -> None: + """Test adding users without 'users' field.""" + tenant_id: str = test_team["id"] + add_payload: dict[str, Any] = {} + + res: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload) + assert res["code"] == 101 # ARGUMENT_ERROR + + @pytest.mark.p1 + def test_add_users_invalid_role( + self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] + ) -> None: + """Test adding user with invalid role.""" + tenant_id: str = test_team["id"] + user_email: str = test_users[0]["email"] + + add_payload: dict[str, list[dict[str, str]]] = { + "users": [{"email": user_email, "role": "invalid_role"}] + } + res: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload) + + assert res["code"] == 0 # Returns success but with failed entry + assert len(res["data"]["added"]) == 0 + assert len(res["data"]["failed"]) == 1 + assert "invalid role" in res["data"]["failed"][0]["error"].lower() + + @pytest.mark.p2 + def test_add_users_not_owner_or_admin( + self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] + ) -> None: + """Test that non-admin/non-owner users cannot add users.""" + tenant_id: str = test_team["id"] + user_email: str = test_users[0]["email"] + + # Add a normal user to the team + add_payload: dict[str, list[str]] = {"users": [user_email]} + add_res: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload) + assert add_res["code"] == 0 + + # Create auth for the normal user (this would require getting their token) + # For now, we'll test that owner/admin can add users + # This test would need the normal user's auth token to fully test + pass + + +@pytest.mark.p1 +class TestRemoveUsers: + """Comprehensive tests for removing users from a team.""" + + @pytest.fixture + def test_team(self, WebApiAuth: RAGFlowWebApiAuth) -> dict[str, Any]: + """Create a test team for use in tests.""" + team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"} + res: dict[str, Any] = create_team(WebApiAuth, team_payload) + assert res["code"] == 0 + return res["data"] + + @pytest.fixture + def test_users(self, WebApiAuth: RAGFlowWebApiAuth) -> list[dict[str, Any]]: + """Create test users for use in tests.""" + users = [] + for i in range(5): + email = f"testuser{i}_{uuid.uuid4().hex[:8]}@example.com" + password = "TestPassword123!" + user_payload: dict[str, str] = { + "email": email, + "password": password, + "nickname": f"Test User {i}", + } + user_res: dict[str, Any] = create_user(WebApiAuth, user_payload) + if user_res["code"] == 0: + users.append({"email": email, "id": user_res["data"]["id"]}) + return users + + @pytest.fixture + def team_with_users( + self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] + ) -> dict[str, Any]: + """Create a team with users already added.""" + tenant_id: str = test_team["id"] + user_emails: list[str] = [user["email"] for user in test_users[:3]] + + add_payload: dict[str, list[str]] = {"users": user_emails} + add_res: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload) + assert add_res["code"] == 0 + + return { + "team": test_team, + "users": test_users[:3], + } + + @pytest.mark.p1 + def test_remove_single_user( + self, WebApiAuth: RAGFlowWebApiAuth, team_with_users: dict[str, Any] + ) -> None: + """Test removing a single user from a team.""" + tenant_id: str = team_with_users["team"]["id"] + user_id: str = team_with_users["users"][0]["id"] + + remove_payload: dict[str, list[str]] = {"user_ids": [user_id]} + res: dict[str, Any] = remove_users_from_team(WebApiAuth, tenant_id, remove_payload) + + assert res["code"] == 0 + assert "data" in res + assert "removed" in res["data"] + assert len(res["data"]["removed"]) == 1 + assert res["data"]["removed"][0]["user_id"] == user_id + assert "failed" in res["data"] + assert len(res["data"]["failed"]) == 0 + + @pytest.mark.p1 + def test_remove_multiple_users( + self, WebApiAuth: RAGFlowWebApiAuth, team_with_users: dict[str, Any] + ) -> None: + """Test removing multiple users in bulk.""" + tenant_id: str = team_with_users["team"]["id"] + user_ids: list[str] = [user["id"] for user in team_with_users["users"][:2]] + + remove_payload: dict[str, list[str]] = {"user_ids": user_ids} + res: dict[str, Any] = remove_users_from_team(WebApiAuth, tenant_id, remove_payload) + + assert res["code"] == 0 + assert len(res["data"]["removed"]) == 2 + assert len(res["data"]["failed"]) == 0 + removed_ids = {user["user_id"] for user in res["data"]["removed"]} + assert removed_ids == set(user_ids) + + @pytest.mark.p1 + def test_remove_user_not_in_team( + self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] + ) -> None: + """Test removing a user who is not a member of the team.""" + tenant_id: str = test_team["id"] + # Use a user that was not added to the team + user_id: str = test_users[3]["id"] + + remove_payload: dict[str, list[str]] = {"user_ids": [user_id]} + res: dict[str, Any] = remove_users_from_team(WebApiAuth, tenant_id, remove_payload) + + assert res["code"] == 0 # Returns success but with failed entry + assert len(res["data"]["removed"]) == 0 + assert len(res["data"]["failed"]) == 1 + assert "not a member" in res["data"]["failed"][0]["error"].lower() + + @pytest.mark.p1 + def test_remove_owner( + self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any] + ) -> None: + """Test that owner cannot be removed.""" + tenant_id: str = test_team["id"] + owner_id: str = test_team["owner_id"] + + remove_payload: dict[str, list[str]] = {"user_ids": [owner_id]} + res: dict[str, Any] = remove_users_from_team(WebApiAuth, tenant_id, remove_payload) + + assert res["code"] == 0 # Returns success but with failed entry + assert len(res["data"]["removed"]) == 0 + assert len(res["data"]["failed"]) == 1 + assert "owner" in res["data"]["failed"][0]["error"].lower() + + @pytest.mark.p1 + def test_remove_users_partial_success( + self, WebApiAuth: RAGFlowWebApiAuth, team_with_users: dict[str, Any], test_users: list[dict[str, Any]] + ) -> None: + """Test removing users where some succeed and some fail.""" + tenant_id: str = team_with_users["team"]["id"] + # Mix of valid and invalid user IDs + valid_user_id: str = team_with_users["users"][0]["id"] + invalid_user_id: str = test_users[3]["id"] # Not in team + + remove_payload: dict[str, list[str]] = {"user_ids": [valid_user_id, invalid_user_id]} + res: dict[str, Any] = remove_users_from_team(WebApiAuth, tenant_id, remove_payload) + + assert res["code"] == 0 + assert len(res["data"]["removed"]) == 1 + assert len(res["data"]["failed"]) == 1 + assert res["data"]["removed"][0]["user_id"] == valid_user_id + assert "not a member" in res["data"]["failed"][0]["error"].lower() + + @pytest.mark.p1 + def test_remove_users_empty_list( + self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any] + ) -> None: + """Test removing users with empty list.""" + tenant_id: str = test_team["id"] + remove_payload: dict[str, list[str]] = {"user_ids": []} + + res: dict[str, Any] = remove_users_from_team(WebApiAuth, tenant_id, remove_payload) + assert res["code"] == 101 # ARGUMENT_ERROR + assert "non-empty" in res["message"].lower() or "empty" in res["message"].lower() + + @pytest.mark.p1 + def test_remove_users_missing_user_ids_field( + self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any] + ) -> None: + """Test removing users without 'user_ids' field.""" + tenant_id: str = test_team["id"] + remove_payload: dict[str, Any] = {} + + res: dict[str, Any] = remove_users_from_team(WebApiAuth, tenant_id, remove_payload) + assert res["code"] == 101 # ARGUMENT_ERROR + + @pytest.mark.p1 + def test_remove_users_invalid_user_id_format( + self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any] + ) -> None: + """Test removing users with invalid user ID format.""" + tenant_id: str = test_team["id"] + remove_payload: dict[str, list[Any]] = {"user_ids": [12345]} # Not a string + + res: dict[str, Any] = remove_users_from_team(WebApiAuth, tenant_id, remove_payload) + assert res["code"] == 0 # Returns success but with failed entry + assert len(res["data"]["removed"]) == 0 + assert len(res["data"]["failed"]) == 1 + assert "invalid" in res["data"]["failed"][0]["error"].lower() + + @pytest.mark.p2 + def test_remove_last_admin( + self, WebApiAuth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] + ) -> None: + """Test that the last admin cannot remove themselves.""" + tenant_id: str = test_team["id"] + user_email: str = test_users[0]["email"] + + # Add user as admin + add_payload: dict[str, list[dict[str, str]]] = { + "users": [{"email": user_email, "role": "admin"}] + } + add_res: dict[str, Any] = add_users_to_team(WebApiAuth, tenant_id, add_payload) + assert add_res["code"] == 0 + admin_user_id: str = add_res["data"]["added"][0]["id"] + + # Try to remove the admin (would need admin's auth token to fully test) + # This test would require the admin user's authentication + pass +