From 0d48560e66e8e795a5f391e78a8731ffa9aa2a89 Mon Sep 17 00:00:00 2001 From: Hetavi Shah Date: Wed, 19 Nov 2025 18:14:49 +0530 Subject: [PATCH] [OND211-2329]: Added new tests and updated existing tests for team API. --- test/testcases/test_http_api/common.py | 33 +- .../test_accept_invite.py | 357 +++++++++++++++++ .../{test_team_users.py => test_add_users.py} | 316 +++++---------- .../test_team_management/test_create_team.py | 294 ++++++++++++++ .../test_reject_invite.py | 363 +++++++++++++++++ .../test_team_management/test_remove_users.py | 376 ++++++++++++++++++ .../test_team_advanced.py | 337 ---------------- 7 files changed, 1524 insertions(+), 552 deletions(-) create mode 100644 test/testcases/test_http_api/test_team_management/test_accept_invite.py rename test/testcases/test_http_api/test_team_management/{test_team_users.py => test_add_users.py} (52%) create mode 100644 test/testcases/test_http_api/test_team_management/test_reject_invite.py create mode 100644 test/testcases/test_http_api/test_team_management/test_remove_users.py delete mode 100644 test/testcases/test_http_api/test_team_management/test_team_advanced.py diff --git a/test/testcases/test_http_api/common.py b/test/testcases/test_http_api/common.py index beb8a0fe9..4f0949c4e 100644 --- a/test/testcases/test_http_api/common.py +++ b/test/testcases/test_http_api/common.py @@ -481,6 +481,7 @@ def remove_users_from_team( def accept_team_invitation( auth: Union[AuthBase, str, None], tenant_id: str, + role: Optional[str] = None, *, headers: Dict[str, str] = HEADERS, ) -> Dict[str, Any]: @@ -489,6 +490,7 @@ def accept_team_invitation( Args: auth: Authentication object (AuthBase subclass), token string, or None. tenant_id: The tenant/team ID to accept invitation for. + role: Optional role to assign after acceptance (normal, admin). Defaults to normal. headers: Optional HTTP headers. Defaults to HEADERS. Returns: @@ -498,7 +500,36 @@ def accept_team_invitation( requests.RequestException: If the HTTP request fails. """ url: str = f"{HOST_ADDRESS}{TEAM_API_URL}/update-request/{tenant_id}" - payload: Dict[str, bool] = {"accept": True} + payload: Dict[str, Any] = {"accept": True} + if role: + payload["role"] = role + res: requests.Response = requests.put( + url=url, headers=headers, auth=auth, json=payload + ) + return res.json() + + +def reject_team_invitation( + auth: Union[AuthBase, str, None], + tenant_id: str, + *, + headers: Dict[str, str] = HEADERS, +) -> Dict[str, Any]: + """Reject a team invitation. + + Args: + auth: Authentication object (AuthBase subclass), token string, or None. + tenant_id: The tenant/team ID to reject invitation for. + headers: Optional HTTP headers. Defaults to HEADERS. + + Returns: + JSON response as a dictionary containing the rejection result. + + Raises: + requests.RequestException: If the HTTP request fails. + """ + url: str = f"{HOST_ADDRESS}{TEAM_API_URL}/update-request/{tenant_id}" + payload: Dict[str, bool] = {"accept": False} res: requests.Response = requests.put( url=url, headers=headers, auth=auth, json=payload ) diff --git a/test/testcases/test_http_api/test_team_management/test_accept_invite.py b/test/testcases/test_http_api/test_team_management/test_accept_invite.py new file mode 100644 index 000000000..ad99b46ae --- /dev/null +++ b/test/testcases/test_http_api/test_team_management/test_accept_invite.py @@ -0,0 +1,357 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import annotations + +import time +import uuid +from typing import Any + +import pytest + +from common import ( + accept_team_invitation, + add_users_to_team, + create_team, + create_user, + encrypt_password, + login_as_user, +) +from configs import INVALID_API_TOKEN +from libs.auth import RAGFlowWebApiAuth + + +# --------------------------------------------------------------------------- +# Test Classes +# --------------------------------------------------------------------------- + + +@pytest.mark.p1 +class TestAuthorization: + """Tests for authentication behavior when accepting team invitations.""" + + @pytest.mark.parametrize( + ("invalid_auth", "expected_code", "expected_message"), + [ + (None, 401, "Unauthorized"), + (RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "Unauthorized"), + ], + ) + def test_invalid_auth( + self, + invalid_auth: RAGFlowWebApiAuth | None, + expected_code: int, + expected_message: str, + web_api_auth: RAGFlowWebApiAuth, + ) -> None: + """Test accepting invitation with invalid or missing authentication.""" + # Create a team and send invitation first + team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"} + team_res: dict[str, Any] = create_team(web_api_auth, team_payload) + if team_res["code"] != 0: + pytest.skip("Team creation failed, skipping auth test") + + tenant_id: str = team_res["data"]["id"] + + # Create and invite a user + email = f"testuser_{uuid.uuid4().hex[:8]}@example.com" + password = "TestPassword123!" + encrypted_password = encrypt_password(password) + user_payload: dict[str, str] = { + "email": email, + "password": encrypted_password, + "nickname": "Test User", + } + user_res: dict[str, Any] = create_user(web_api_auth, user_payload) + if user_res["code"] != 0: + pytest.skip("User creation failed, skipping auth test") + + add_payload: dict[str, list[str]] = {"users": [email]} + add_users_to_team(web_api_auth, tenant_id, add_payload) + + # Try to accept invitation with invalid auth + res: dict[str, Any] = accept_team_invitation(invalid_auth, tenant_id) + assert res["code"] == expected_code, res + if expected_message: + assert expected_message in res["message"] + + +@pytest.mark.p1 +class TestAcceptInvite: + """Comprehensive tests for accepting team invitations.""" + + @pytest.fixture + def test_team(self, web_api_auth: RAGFlowWebApiAuth) -> dict[str, Any]: + """Create a test team for use in tests.""" + team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"} + res: dict[str, Any] = create_team(web_api_auth, team_payload) + assert res["code"] == 0 + return res["data"] + + @pytest.fixture + def invited_user(self, web_api_auth: RAGFlowWebApiAuth) -> dict[str, Any]: + """Create a test user who will be invited.""" + email = f"inviteduser_{uuid.uuid4().hex[:8]}@example.com" + password = "TestPassword123!" + encrypted_password = encrypt_password(password) + user_payload: dict[str, str] = { + "email": email, + "password": encrypted_password, + "nickname": "Invited User", + } + user_res: dict[str, Any] = create_user(web_api_auth, user_payload) + assert user_res["code"] == 0 + return { + "email": email, + "id": user_res["data"]["id"], + "password": password, + } + + @pytest.fixture + def team_with_invitation( + self, + web_api_auth: RAGFlowWebApiAuth, + test_team: dict[str, Any], + invited_user: dict[str, Any], + ) -> dict[str, Any]: + """Create a team and send invitation to a user.""" + tenant_id: str = test_team["id"] + add_payload: dict[str, list[str]] = {"users": [invited_user["email"]]} + add_res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload) + assert add_res["code"] == 0 + return { + "team": test_team, + "invited_user": invited_user, + } + + @pytest.mark.p1 + def test_accept_invitation_default_role( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_invitation: dict[str, Any], + ) -> None: + """Test accepting an invitation with default role (normal).""" + tenant_id: str = team_with_invitation["team"]["id"] + invited_user: dict[str, Any] = team_with_invitation["invited_user"] + + # Login as the invited user + user_auth: RAGFlowWebApiAuth = login_as_user(invited_user["email"], invited_user["password"]) + + # Accept the invitation + res: dict[str, Any] = accept_team_invitation(user_auth, tenant_id) + assert res["code"] == 0, res + assert res["data"] is True + assert "joined" in res["message"].lower() or "successfully" in res["message"].lower() + assert "normal" in res["message"].lower() or "role" in res["message"].lower() + + @pytest.mark.p1 + def test_accept_invitation_with_admin_role( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_invitation: dict[str, Any], + ) -> None: + """Test accepting an invitation with admin role.""" + tenant_id: str = team_with_invitation["team"]["id"] + invited_user: dict[str, Any] = team_with_invitation["invited_user"] + + # Login as the invited user + user_auth: RAGFlowWebApiAuth = login_as_user(invited_user["email"], invited_user["password"]) + + # Accept the invitation with admin role + res: dict[str, Any] = accept_team_invitation(user_auth, tenant_id, role="admin") + assert res["code"] == 0, res + assert res["data"] is True + assert "joined" in res["message"].lower() or "successfully" in res["message"].lower() + assert "admin" in res["message"].lower() or "role" in res["message"].lower() + + @pytest.mark.p1 + def test_accept_invitation_with_normal_role( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_invitation: dict[str, Any], + ) -> None: + """Test accepting an invitation with normal role explicitly.""" + tenant_id: str = team_with_invitation["team"]["id"] + invited_user: dict[str, Any] = team_with_invitation["invited_user"] + + # Login as the invited user + user_auth: RAGFlowWebApiAuth = login_as_user(invited_user["email"], invited_user["password"]) + + # Accept the invitation with normal role + res: dict[str, Any] = accept_team_invitation(user_auth, tenant_id, role="normal") + assert res["code"] == 0, res + assert res["data"] is True + assert "joined" in res["message"].lower() or "successfully" in res["message"].lower() + + @pytest.mark.p1 + def test_accept_invitation_no_invitation( + self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any] + ) -> None: + """Test accepting an invitation when no invitation exists.""" + # Create a user who is not invited + email = f"notinvited_{uuid.uuid4().hex[:8]}@example.com" + password = "TestPassword123!" + encrypted_password = encrypt_password(password) + user_payload: dict[str, str] = { + "email": email, + "password": encrypted_password, + "nickname": "Not Invited User", + } + user_res: dict[str, Any] = create_user(web_api_auth, user_payload) + assert user_res["code"] == 0 + + # Login as the user + user_auth: RAGFlowWebApiAuth = login_as_user(email, password) + + # Try to accept invitation for a team they're not invited to + tenant_id: str = test_team["id"] + res: dict[str, Any] = accept_team_invitation(user_auth, tenant_id) + assert res["code"] != 0 + assert "not found" in res["message"].lower() or "invitation" in res["message"].lower() + + @pytest.mark.p1 + def test_accept_invitation_already_accepted( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_invitation: dict[str, Any], + ) -> None: + """Test accepting an invitation that has already been accepted.""" + tenant_id: str = team_with_invitation["team"]["id"] + invited_user: dict[str, Any] = team_with_invitation["invited_user"] + + # Login as the invited user + user_auth: RAGFlowWebApiAuth = login_as_user(invited_user["email"], invited_user["password"]) + + # Accept the invitation first time + res1: dict[str, Any] = accept_team_invitation(user_auth, tenant_id) + assert res1["code"] == 0 + + # Try to accept again + res2: dict[str, Any] = accept_team_invitation(user_auth, tenant_id) + assert res2["code"] != 0 + assert "invite" in res2["message"].lower() or "role" in res2["message"].lower() or "not found" in res2["message"].lower() + + @pytest.mark.p1 + def test_accept_invitation_invalid_tenant_id( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_invitation: dict[str, Any], + ) -> None: + """Test accepting an invitation with invalid team ID.""" + invited_user: dict[str, Any] = team_with_invitation["invited_user"] + + # Login as the invited user + user_auth: RAGFlowWebApiAuth = login_as_user(invited_user["email"], invited_user["password"]) + + # Try to accept invitation for non-existent team + invalid_tenant_id: str = f"invalid_{uuid.uuid4().hex[:8]}" + res: dict[str, Any] = accept_team_invitation(user_auth, invalid_tenant_id) + assert res["code"] != 0 + assert "not found" in res["message"].lower() or "invitation" in res["message"].lower() + + @pytest.mark.p1 + def test_accept_invitation_response_structure( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_invitation: dict[str, Any], + ) -> None: + """Test that accepting invitation returns the expected response structure.""" + tenant_id: str = team_with_invitation["team"]["id"] + invited_user: dict[str, Any] = team_with_invitation["invited_user"] + + # Login as the invited user + user_auth: RAGFlowWebApiAuth = login_as_user(invited_user["email"], invited_user["password"]) + + # Accept the invitation + res: dict[str, Any] = accept_team_invitation(user_auth, tenant_id) + assert res["code"] == 0 + assert "data" in res + assert res["data"] is True + assert "message" in res + assert isinstance(res["message"], str) + assert "successfully" in res["message"].lower() or "joined" in res["message"].lower() + + @pytest.mark.p2 + def test_accept_invitation_wrong_user( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_invitation: dict[str, Any], + ) -> None: + """Test that a user cannot accept another user's invitation.""" + # Create another user who is not invited + email = f"otheruser_{uuid.uuid4().hex[:8]}@example.com" + password = "TestPassword123!" + encrypted_password = encrypt_password(password) + user_payload: dict[str, str] = { + "email": email, + "password": encrypted_password, + "nickname": "Other User", + } + user_res: dict[str, Any] = create_user(web_api_auth, user_payload) + assert user_res["code"] == 0 + + # Login as the other user + other_user_auth: RAGFlowWebApiAuth = login_as_user(email, password) + + # Try to accept invitation meant for another user + tenant_id: str = team_with_invitation["team"]["id"] + res: dict[str, Any] = accept_team_invitation(other_user_auth, tenant_id) + assert res["code"] != 0 + assert "not found" in res["message"].lower() or "invitation" in res["message"].lower() + + @pytest.mark.p2 + def test_accept_invitation_multiple_invitations( + self, web_api_auth: RAGFlowWebApiAuth + ) -> None: + """Test accepting invitations to multiple teams.""" + # Create two teams + team1_payload: dict[str, str] = {"name": f"Team 1 {uuid.uuid4().hex[:8]}"} + team1_res: dict[str, Any] = create_team(web_api_auth, team1_payload) + assert team1_res["code"] == 0 + tenant_id_1: str = team1_res["data"]["id"] + + team2_payload: dict[str, str] = {"name": f"Team 2 {uuid.uuid4().hex[:8]}"} + team2_res: dict[str, Any] = create_team(web_api_auth, team2_payload) + assert team2_res["code"] == 0 + tenant_id_2: str = team2_res["data"]["id"] + + # Create and invite a user to both teams + email = f"multiuser_{uuid.uuid4().hex[:8]}@example.com" + password = "TestPassword123!" + encrypted_password = encrypt_password(password) + user_payload: dict[str, str] = { + "email": email, + "password": encrypted_password, + "nickname": "Multi User", + } + user_res: dict[str, Any] = create_user(web_api_auth, user_payload) + assert user_res["code"] == 0 + + # Invite to both teams + add_payload1: dict[str, list[str]] = {"users": [email]} + add_users_to_team(web_api_auth, tenant_id_1, add_payload1) + add_payload2: dict[str, list[str]] = {"users": [email]} + add_users_to_team(web_api_auth, tenant_id_2, add_payload2) + + # Login as the user + user_auth: RAGFlowWebApiAuth = login_as_user(email, password) + + # Accept both invitations + res1: dict[str, Any] = accept_team_invitation(user_auth, tenant_id_1) + assert res1["code"] == 0 + + res2: dict[str, Any] = accept_team_invitation(user_auth, tenant_id_2) + assert res2["code"] == 0 + diff --git a/test/testcases/test_http_api/test_team_management/test_team_users.py b/test/testcases/test_http_api/test_team_management/test_add_users.py similarity index 52% rename from test/testcases/test_http_api/test_team_management/test_team_users.py rename to test/testcases/test_http_api/test_team_management/test_add_users.py index 1eef6c001..4a2752e5d 100644 --- a/test/testcases/test_http_api/test_team_management/test_team_users.py +++ b/test/testcases/test_http_api/test_team_management/test_add_users.py @@ -15,6 +15,7 @@ # from __future__ import annotations +import time import uuid from typing import Any @@ -24,7 +25,8 @@ from common import ( add_users_to_team, create_team, create_user, - remove_users_from_team, + encrypt_password, + login_as_user, ) from configs import INVALID_API_TOKEN from libs.auth import RAGFlowWebApiAuth @@ -36,32 +38,38 @@ from libs.auth import RAGFlowWebApiAuth @pytest.mark.p1 -class TestAddUsersAuthorization: +class TestAuthorization: """Tests for authentication behavior when adding users to a team.""" @pytest.mark.parametrize( - ("invalid_auth", "expected_code"), + ("invalid_auth", "expected_code", "expected_message"), [ - (None, 401), - (RAGFlowWebApiAuth(INVALID_API_TOKEN), 401), + (None, 401, "Unauthorized"), + (RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "Unauthorized"), ], ) - def test_add_users_invalid_auth( + def test_invalid_auth( self, invalid_auth: RAGFlowWebApiAuth | None, expected_code: int, + expected_message: str, web_api_auth: RAGFlowWebApiAuth, ) -> None: """Test adding users with invalid or missing authentication.""" # Create a team first team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"} team_res: dict[str, Any] = create_team(web_api_auth, team_payload) + if team_res["code"] != 0: + pytest.skip("Team creation failed, skipping auth test") + tenant_id: str = team_res["data"]["id"] # Try to add users with invalid auth add_payload: dict[str, list[str]] = {"users": ["test@example.com"]} res: dict[str, Any] = add_users_to_team(invalid_auth, tenant_id, add_payload) - assert res["code"] == expected_code + assert res["code"] == expected_code, res + if expected_message: + assert expected_message in res["message"] @pytest.mark.p1 @@ -83,14 +91,15 @@ class TestAddUsers: for i in range(5): email = f"testuser{i}_{uuid.uuid4().hex[:8]}@example.com" password = "TestPassword123!" + encrypted_password = encrypt_password(password) user_payload: dict[str, str] = { "email": email, - "password": password, + "password": encrypted_password, "nickname": f"Test User {i}", } user_res: dict[str, Any] = create_user(web_api_auth, user_payload) if user_res["code"] == 0: - users.append({"email": email, "id": user_res["data"]["id"]}) + users.append({"email": email, "id": user_res["data"]["id"], "password": password}) return users @pytest.mark.p1 @@ -98,18 +107,21 @@ class TestAddUsers: self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] ) -> None: """Test adding a single user using email string format.""" + if not test_users: + pytest.skip("No test users created") + tenant_id: str = test_team["id"] user_email: str = test_users[0]["email"] add_payload: dict[str, list[str]] = {"users": [user_email]} res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload) - assert res["code"] == 0 + assert res["code"] == 0, res assert "data" in res assert "added" in res["data"] assert len(res["data"]["added"]) == 1 assert res["data"]["added"][0]["email"] == user_email - assert res["data"]["added"][0]["role"] == "normal" + assert res["data"]["added"][0]["role"] == "invite" # Users are added with invite role initially assert "failed" in res["data"] assert len(res["data"]["failed"]) == 0 @@ -118,6 +130,9 @@ class TestAddUsers: self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] ) -> None: """Test adding a single user with admin role.""" + if not test_users: + pytest.skip("No test users created") + tenant_id: str = test_team["id"] user_email: str = test_users[0]["email"] @@ -126,23 +141,27 @@ class TestAddUsers: } res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload) - assert res["code"] == 0 + assert res["code"] == 0, res assert len(res["data"]["added"]) == 1 assert res["data"]["added"][0]["email"] == user_email - assert res["data"]["added"][0]["role"] == "admin" + assert res["data"]["added"][0]["role"] == "invite" # Users are added with invite role initially + assert res["data"]["added"][0]["intended_role"] == "admin" # Intended role after acceptance @pytest.mark.p1 def test_add_multiple_users( self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] ) -> None: """Test adding multiple users in bulk.""" + if len(test_users) < 3: + pytest.skip("Need at least 3 test users") + tenant_id: str = test_team["id"] user_emails: list[str] = [user["email"] for user in test_users[:3]] add_payload: dict[str, list[str]] = {"users": user_emails} res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload) - assert res["code"] == 0 + assert res["code"] == 0, res assert len(res["data"]["added"]) == 3 assert len(res["data"]["failed"]) == 0 added_emails = {user["email"] for user in res["data"]["added"]} @@ -153,6 +172,9 @@ class TestAddUsers: self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] ) -> None: """Test adding users with mixed string and object formats.""" + if len(test_users) < 3: + pytest.skip("Need at least 3 test users") + tenant_id: str = test_team["id"] add_payload: dict[str, list[Any]] = { @@ -164,11 +186,14 @@ class TestAddUsers: } res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload) - assert res["code"] == 0 + assert res["code"] == 0, res assert len(res["data"]["added"]) == 3 - assert res["data"]["added"][0]["role"] == "normal" # String format defaults to normal - assert res["data"]["added"][1]["role"] == "admin" # Object format with admin role - assert res["data"]["added"][2]["role"] == "normal" # String format defaults to normal + assert res["data"]["added"][0]["role"] == "invite" # String format defaults to invite role initially + assert res["data"]["added"][0].get("intended_role") == "normal" # Intended role after acceptance + assert res["data"]["added"][1]["role"] == "invite" # Object format - still invite initially + assert res["data"]["added"][1]["intended_role"] == "admin" # Intended role after acceptance + assert res["data"]["added"][2]["role"] == "invite" # String format defaults to invite role initially + assert res["data"]["added"][2].get("intended_role") == "normal" # Intended role after acceptance @pytest.mark.p1 def test_add_user_unregistered_email( @@ -191,6 +216,9 @@ class TestAddUsers: self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] ) -> None: """Test adding a user who is already a member of the team.""" + if not test_users: + pytest.skip("No test users created") + tenant_id: str = test_team["id"] user_email: str = test_users[0]["email"] @@ -201,16 +229,20 @@ class TestAddUsers: # Try to add same user again res2: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload) - assert res2["code"] == 0 # Returns success but with failed entry - assert len(res2["data"]["added"]) == 0 - assert len(res2["data"]["failed"]) == 1 - assert "already a member" in res2["data"]["failed"][0]["error"].lower() + assert res2["code"] == 0 # Returns success - invitation is resent + # API resends invitation instead of failing + assert len(res2["data"]["added"]) == 1 + assert res2["data"]["added"][0]["email"] == user_email + assert res2["data"]["added"][0].get("status") == "invitation_resent" or "intended_role" in res2["data"]["added"][0] @pytest.mark.p1 def test_add_users_partial_success( self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] ) -> None: """Test adding users where some succeed and some fail.""" + if len(test_users) < 2: + pytest.skip("Need at least 2 test users") + tenant_id: str = test_team["id"] unregistered_email: str = f"unregistered_{uuid.uuid4().hex[:8]}@example.com" @@ -219,7 +251,7 @@ class TestAddUsers: } res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload) - assert res["code"] == 0 + assert res["code"] == 0, res assert len(res["data"]["added"]) == 2 assert len(res["data"]["failed"]) == 1 assert "not found" in res["data"]["failed"][0]["error"].lower() @@ -252,6 +284,9 @@ class TestAddUsers: self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] ) -> None: """Test adding user with invalid role.""" + if not test_users: + pytest.skip("No test users created") + tenant_id: str = test_team["id"] user_email: str = test_users[0]["email"] @@ -260,218 +295,71 @@ class TestAddUsers: } res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload) - assert res["code"] == 0 # Returns success but with failed entry + # API returns error code when all users fail + assert res["code"] == 102 # DATA_ERROR assert len(res["data"]["added"]) == 0 assert len(res["data"]["failed"]) == 1 - assert "invalid role" in res["data"]["failed"][0]["error"].lower() + assert "invalid role" in res["data"]["failed"][0]["error"].lower() or "invalid" in res["data"]["failed"][0]["error"].lower() - @pytest.mark.p2 + @pytest.mark.p1 def test_add_users_not_owner_or_admin( self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] ) -> None: """Test that non-admin/non-owner users cannot add users.""" + if not test_users: + pytest.skip("No test users created") + tenant_id: str = test_team["id"] user_email: str = test_users[0]["email"] + other_user_email: str = test_users[1]["email"] if len(test_users) > 1 else None + + if not other_user_email: + pytest.skip("Need at least 2 test users") # Add a normal user to the team add_payload: dict[str, list[str]] = {"users": [user_email]} add_res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload) assert add_res["code"] == 0 - # Create auth for the normal user (this would require getting their token) - # For now, we'll test that owner/admin can add users - # This test would need the normal user's auth token to fully test - pass + # Small delay to ensure user is fully added + time.sleep(0.5) + # Login as the normal user + normal_user_auth: RAGFlowWebApiAuth = login_as_user(user_email, test_users[0]["password"]) -@pytest.mark.p1 -class TestRemoveUsers: - """Comprehensive tests for removing users from a team.""" - - @pytest.fixture - def test_team(self, web_api_auth: RAGFlowWebApiAuth) -> dict[str, Any]: - """Create a test team for use in tests.""" - team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"} - res: dict[str, Any] = create_team(web_api_auth, team_payload) - assert res["code"] == 0 - return res["data"] - - @pytest.fixture - def test_users(self, web_api_auth: RAGFlowWebApiAuth) -> list[dict[str, Any]]: - """Create test users for use in tests.""" - users = [] - for i in range(5): - email = f"testuser{i}_{uuid.uuid4().hex[:8]}@example.com" - password = "TestPassword123!" - user_payload: dict[str, str] = { - "email": email, - "password": password, - "nickname": f"Test User {i}", - } - user_res: dict[str, Any] = create_user(web_api_auth, user_payload) - if user_res["code"] == 0: - users.append({"email": email, "id": user_res["data"]["id"]}) - return users - - @pytest.fixture - def team_with_users( - self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] - ) -> dict[str, Any]: - """Create a team with users already added.""" - tenant_id: str = test_team["id"] - user_emails: list[str] = [user["email"] for user in test_users[:3]] - - add_payload: dict[str, list[str]] = {"users": user_emails} - add_res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload) - assert add_res["code"] == 0 - - return { - "team": test_team, - "users": test_users[:3], - } - - @pytest.mark.p1 - def test_remove_single_user( - self, web_api_auth: RAGFlowWebApiAuth, team_with_users: dict[str, Any] - ) -> None: - """Test removing a single user from a team.""" - tenant_id: str = team_with_users["team"]["id"] - user_id: str = team_with_users["users"][0]["id"] - - remove_payload: dict[str, list[str]] = {"user_ids": [user_id]} - res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload) - - assert res["code"] == 0 - assert "data" in res - assert "removed" in res["data"] - assert len(res["data"]["removed"]) == 1 - assert res["data"]["removed"][0]["user_id"] == user_id - assert "failed" in res["data"] - assert len(res["data"]["failed"]) == 0 - - @pytest.mark.p1 - def test_remove_multiple_users( - self, web_api_auth: RAGFlowWebApiAuth, team_with_users: dict[str, Any] - ) -> None: - """Test removing multiple users in bulk.""" - tenant_id: str = team_with_users["team"]["id"] - user_ids: list[str] = [user["id"] for user in team_with_users["users"][:2]] - - remove_payload: dict[str, list[str]] = {"user_ids": user_ids} - res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload) - - assert res["code"] == 0 - assert len(res["data"]["removed"]) == 2 - assert len(res["data"]["failed"]) == 0 - removed_ids = {user["user_id"] for user in res["data"]["removed"]} - assert removed_ids == set(user_ids) - - @pytest.mark.p1 - def test_remove_user_not_in_team( - self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] - ) -> None: - """Test removing a user who is not a member of the team.""" - tenant_id: str = test_team["id"] - # Use a user that was not added to the team - user_id: str = test_users[3]["id"] - - remove_payload: dict[str, list[str]] = {"user_ids": [user_id]} - res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload) - - assert res["code"] == 0 # Returns success but with failed entry - assert len(res["data"]["removed"]) == 0 - assert len(res["data"]["failed"]) == 1 - assert "not a member" in res["data"]["failed"][0]["error"].lower() - - @pytest.mark.p1 - def test_remove_owner( - self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any] - ) -> None: - """Test that owner cannot be removed.""" - tenant_id: str = test_team["id"] - owner_id: str = test_team["owner_id"] - - remove_payload: dict[str, list[str]] = {"user_ids": [owner_id]} - res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload) - - assert res["code"] == 0 # Returns success but with failed entry - assert len(res["data"]["removed"]) == 0 - assert len(res["data"]["failed"]) == 1 - assert "owner" in res["data"]["failed"][0]["error"].lower() - - @pytest.mark.p1 - def test_remove_users_partial_success( - self, web_api_auth: RAGFlowWebApiAuth, team_with_users: dict[str, Any], test_users: list[dict[str, Any]] - ) -> None: - """Test removing users where some succeed and some fail.""" - tenant_id: str = team_with_users["team"]["id"] - # Mix of valid and invalid user IDs - valid_user_id: str = team_with_users["users"][0]["id"] - invalid_user_id: str = test_users[3]["id"] # Not in team - - remove_payload: dict[str, list[str]] = {"user_ids": [valid_user_id, invalid_user_id]} - res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload) - - assert res["code"] == 0 - assert len(res["data"]["removed"]) == 1 - assert len(res["data"]["failed"]) == 1 - assert res["data"]["removed"][0]["user_id"] == valid_user_id - assert "not a member" in res["data"]["failed"][0]["error"].lower() - - @pytest.mark.p1 - def test_remove_users_empty_list( - self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any] - ) -> None: - """Test removing users with empty list.""" - tenant_id: str = test_team["id"] - remove_payload: dict[str, list[str]] = {"user_ids": []} - - res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload) - assert res["code"] == 101 # ARGUMENT_ERROR - assert "non-empty" in res["message"].lower() or "empty" in res["message"].lower() - - @pytest.mark.p1 - def test_remove_users_missing_user_ids_field( - self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any] - ) -> None: - """Test removing users without 'user_ids' field.""" - tenant_id: str = test_team["id"] - remove_payload: dict[str, Any] = {} - - res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload) - assert res["code"] == 101 # ARGUMENT_ERROR - - @pytest.mark.p1 - def test_remove_users_invalid_user_id_format( - self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any] - ) -> None: - """Test removing users with invalid user ID format.""" - tenant_id: str = test_team["id"] - remove_payload: dict[str, list[Any]] = {"user_ids": [12345]} # Not a string - - res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload) - assert res["code"] == 0 # Returns success but with failed entry - assert len(res["data"]["removed"]) == 0 - assert len(res["data"]["failed"]) == 1 - assert "invalid" in res["data"]["failed"][0]["error"].lower() + # Try to add another user (normal user should not be able to) + add_payload2: dict[str, list[str]] = {"users": [other_user_email]} + res: dict[str, Any] = add_users_to_team(normal_user_auth, tenant_id, add_payload2) + assert res["code"] == 108 # PERMISSION_ERROR + assert "owner" in res["message"].lower() or "admin" in res["message"].lower() @pytest.mark.p2 - def test_remove_last_admin( - self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] + def test_add_users_invalid_tenant_id( + self, web_api_auth: RAGFlowWebApiAuth, test_users: list[dict[str, Any]] ) -> None: - """Test that the last admin cannot remove themselves.""" + """Test adding users to a non-existent team.""" + if not test_users: + pytest.skip("No test users created") + + invalid_tenant_id: str = f"invalid_{uuid.uuid4().hex[:8]}" + add_payload: dict[str, list[str]] = {"users": [test_users[0]["email"]]} + + res: dict[str, Any] = add_users_to_team(web_api_auth, invalid_tenant_id, add_payload) + assert res["code"] != 0 + assert "not found" in res["message"].lower() or res["code"] in [100, 102, 108] + + @pytest.mark.p2 + def test_add_users_invalid_email_format( + self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any] + ) -> None: + """Test adding users with invalid email format.""" tenant_id: str = test_team["id"] - user_email: str = test_users[0]["email"] + invalid_email: str = "not_an_email" - # Add user as admin - add_payload: dict[str, list[dict[str, str]]] = { - "users": [{"email": user_email, "role": "admin"}] - } - add_res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload) - assert add_res["code"] == 0 - admin_user_id: str = add_res["data"]["added"][0]["id"] + add_payload: dict[str, list[str]] = {"users": [invalid_email]} + res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload) - # Try to remove the admin (would need admin's auth token to fully test) - # This test would require the admin user's authentication - pass + # Should fail - either validation error or user not found + assert res["code"] != 0 + assert len(res["data"]["added"]) == 0 diff --git a/test/testcases/test_http_api/test_team_management/test_create_team.py b/test/testcases/test_http_api/test_team_management/test_create_team.py index 4d5ee05fd..f4d1e709f 100644 --- a/test/testcases/test_http_api/test_team_management/test_create_team.py +++ b/test/testcases/test_http_api/test_team_management/test_create_team.py @@ -252,3 +252,297 @@ class TestTeamCreate: # Should succeed if unicode is supported assert res["code"] in (0, 101) + @pytest.mark.p2 + def test_team_creation_with_custom_models( + self, web_api_auth: RAGFlowWebApiAuth + ) -> None: + """Test creating team with custom model configurations.""" + team_name: str = f"Custom Models Team {uuid.uuid4().hex[:8]}" + + # Attempt to create team with custom models + # Note: Model IDs need to be valid and added by the user + team_payload: dict[str, str] = { + "name": team_name, + # These would need to be actual model IDs added by the user + # "llm_id": "custom_llm_id", + # "embd_id": "custom_embd_id", + } + res: dict[str, Any] = create_team(web_api_auth, team_payload) + + # Should succeed with defaults if custom models not specified + assert res["code"] == 0, res + assert res["data"]["name"] == team_name + assert "id" in res["data"] + + @pytest.mark.p2 + def test_multiple_teams_same_name_allowed( + self, web_api_auth: RAGFlowWebApiAuth + ) -> None: + """Test that multiple teams can have the same name.""" + team_name: str = f"Duplicate Name {uuid.uuid4().hex[:8]}" + + # Create first team + res1: dict[str, Any] = create_team(web_api_auth, {"name": team_name}) + assert res1["code"] == 0, res1 + team_id_1: str = res1["data"]["id"] + + # Create second team with same name + res2: dict[str, Any] = create_team(web_api_auth, {"name": team_name}) + assert res2["code"] == 0, res2 + team_id_2: str = res2["data"]["id"] + + # Teams should have different IDs + assert team_id_1 != team_id_2, "Teams should have unique IDs" + assert res1["data"]["name"] == res2["data"]["name"], ( + "Both teams should have the same name" + ) + + @pytest.mark.p2 + def test_team_creation_with_credit_limit( + self, web_api_auth: RAGFlowWebApiAuth + ) -> None: + """Test creating team with custom credit limit.""" + team_name: str = f"Credit Test Team {uuid.uuid4().hex[:8]}" + custom_credit: int = 1000 + + team_payload: dict[str, Any] = { + "name": team_name, + "credit": custom_credit, + } + res: dict[str, Any] = create_team(web_api_auth, team_payload) + + # Should succeed + assert res["code"] == 0, res + # Note: Credit may not be in response, but should be set internally + + @pytest.mark.p2 + def test_team_name_with_special_characters( + self, web_api_auth: RAGFlowWebApiAuth + ) -> None: + """Test team names with special characters.""" + special_names: list[str] = [ + f"Team-{uuid.uuid4().hex[:4]}_Test!", + f"Team & Co. {uuid.uuid4().hex[:4]}", + f"Team @{uuid.uuid4().hex[:4]}", + f"团队{uuid.uuid4().hex[:4]}", # Unicode + ] + + for name in special_names: + res: dict[str, Any] = create_team(web_api_auth, {"name": name}) + # Should either accept or reject with clear message + if res["code"] == 0: + assert res["data"]["name"] == name, ( + f"Team name should be preserved: {name}" + ) + # If rejected, should have clear error + # (Current implementation accepts special chars) + + @pytest.mark.p2 + def test_team_creation_default_owner( + self, web_api_auth: RAGFlowWebApiAuth + ) -> None: + """Test that team creator is set as owner by default.""" + team_name: str = f"Owner Test Team {uuid.uuid4().hex[:8]}" + res: dict[str, Any] = create_team(web_api_auth, {"name": team_name}) + + assert res["code"] == 0, res + assert "owner_id" in res["data"], "Owner ID should be in response" + # Owner should be the authenticated user + # (Cannot verify without knowing web_api_auth user ID) + + @pytest.mark.p2 + def test_concurrent_team_creation( + self, web_api_auth: RAGFlowWebApiAuth + ) -> None: + """Test concurrent team creation.""" + import concurrent.futures + + def create_test_team(index: int) -> dict[str, Any]: + team_name: str = f"Concurrent Team {index}_{uuid.uuid4().hex[:8]}" + return create_team(web_api_auth, {"name": team_name}) + + # Create 10 teams concurrently + count: int = 10 + with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: + futures: list[concurrent.futures.Future[dict[str, Any]]] = [ + executor.submit(create_test_team, i) for i in range(count) + ] + results: list[dict[str, Any]] = [ + f.result() for f in concurrent.futures.as_completed(futures) + ] + + # All should succeed + success_count: int = sum(1 for r in results if r["code"] == 0) + assert success_count == count, ( + f"Expected {count} successful team creations, got {success_count}" + ) + + # All should have unique IDs + team_ids: list[str] = [r["data"]["id"] for r in results if r["code"] == 0] + assert len(team_ids) == len(set(team_ids)), ( + "All team IDs should be unique" + ) + + @pytest.mark.p2 + def test_team_with_invalid_model_id( + self, web_api_auth: RAGFlowWebApiAuth + ) -> None: + """Test team creation with invalid model ID.""" + team_name: str = f"Invalid Model Team {uuid.uuid4().hex[:8]}" + team_payload: dict[str, str] = { + "name": team_name, + "llm_id": "invalid_nonexistent_model_id_12345", + } + res: dict[str, Any] = create_team(web_api_auth, team_payload) + + # Should reject with clear error message + assert res["code"] != 0, "Invalid model ID should be rejected" + assert ( + "model" in res["message"].lower() + or "not found" in res["message"].lower() + or "invalid" in res["message"].lower() + ), "Error message should mention model issue" + + @pytest.mark.p2 + def test_team_creation_with_negative_credit( + self, web_api_auth: RAGFlowWebApiAuth + ) -> None: + """Test team creation with negative credit.""" + team_name: str = f"Negative Credit Team {uuid.uuid4().hex[:8]}" + team_payload: dict[str, Any] = { + "name": team_name, + "credit": -100, + } + res: dict[str, Any] = create_team(web_api_auth, team_payload) + + # Should reject negative credit + assert res["code"] != 0, "Negative credit should be rejected" + assert "credit" in res["message"].lower(), ( + "Error message should mention credit" + ) + + @pytest.mark.p2 + def test_team_creation_empty_json_payload( + self, web_api_auth: RAGFlowWebApiAuth + ) -> None: + """Test team creation with completely empty payload.""" + res: dict[str, Any] = create_team(web_api_auth, {}) + + # Should reject with clear error + assert res["code"] != 0, "Empty payload should be rejected" + assert ( + "name" in res["message"].lower() + or "required" in res["message"].lower() + ), "Error should mention missing name" + + @pytest.mark.p3 + def test_team_unicode_name( + self, web_api_auth: RAGFlowWebApiAuth + ) -> None: + """Test team creation with full unicode name.""" + unicode_names: list[str] = [ + f"团队{uuid.uuid4().hex[:4]}", # Chinese + f"チーム{uuid.uuid4().hex[:4]}", # Japanese + f"Команда{uuid.uuid4().hex[:4]}", # Russian + f"فريق{uuid.uuid4().hex[:4]}", # Arabic (RTL) + f"😀🎉{uuid.uuid4().hex[:4]}", # Emoji + ] + + for name in unicode_names: + res: dict[str, Any] = create_team(web_api_auth, {"name": name}) + + # Should handle unicode properly + if res["code"] == 0: + # Verify unicode is preserved (may be normalized) + assert len(res["data"]["name"]) > 0, ( + "Team name should not be empty after unicode" + ) + + @pytest.mark.p3 + def test_team_creation_with_all_optional_params( + self, web_api_auth: RAGFlowWebApiAuth + ) -> None: + """Test team creation with all optional parameters.""" + team_name: str = f"Full Params Team {uuid.uuid4().hex[:8]}" + team_payload: dict[str, Any] = { + "name": team_name, + "credit": 2000, + # Note: Model IDs would need to be valid + # "llm_id": "valid_llm_id", + # "embd_id": "valid_embd_id", + # "asr_id": "valid_asr_id", + # "parser_ids": "valid_parser_ids", + # "img2txt_id": "valid_img2txt_id", + # "rerank_id": "valid_rerank_id", + } + res: dict[str, Any] = create_team(web_api_auth, team_payload) + + # Should succeed + assert res["code"] == 0, res + assert res["data"]["name"] == team_name + + @pytest.mark.p3 + def test_team_max_name_length( + self, web_api_auth: RAGFlowWebApiAuth + ) -> None: + """Test team with maximum allowed name length.""" + # API spec says max 100 characters + max_name: str = "A" * 100 + res: dict[str, Any] = create_team(web_api_auth, {"name": max_name}) + + # Should accept 100 characters + assert res["code"] == 0, "100-character name should be accepted" + assert res["data"]["name"] == max_name + + @pytest.mark.p3 + def test_team_name_just_over_limit( + self, web_api_auth: RAGFlowWebApiAuth + ) -> None: + """Test team with name just over limit.""" + # 101 characters (1 over limit) + long_name: str = "A" * 101 + res: dict[str, Any] = create_team(web_api_auth, {"name": long_name}) + + # Should reject + assert res["code"] != 0, "101-character name should be rejected" + assert ( + "100" in res["message"] + or "length" in res["message"].lower() + or "long" in res["message"].lower() + ), "Error should mention length limit" + + @pytest.mark.p3 + def test_team_creation_idempotency( + self, web_api_auth: RAGFlowWebApiAuth + ) -> None: + """Test that repeated team creation creates separate teams.""" + team_name: str = f"Idempotency Test {uuid.uuid4().hex[:8]}" + payload: dict[str, str] = {"name": team_name} + + # Create same team twice + res1: dict[str, Any] = create_team(web_api_auth, payload) + res2: dict[str, Any] = create_team(web_api_auth, payload) + + # Both should succeed and create different teams + assert res1["code"] == 0, res1 + assert res2["code"] == 0, res2 + assert res1["data"]["id"] != res2["data"]["id"], ( + "Should create different teams, not be idempotent" + ) + + @pytest.mark.p3 + def test_team_with_parser_ids( + self, web_api_auth: RAGFlowWebApiAuth + ) -> None: + """Test team creation with custom parser IDs.""" + team_name: str = f"Parser Test {uuid.uuid4().hex[:8]}" + # parser_ids is typically a comma-separated string + team_payload: dict[str, str] = { + "name": team_name, + "parser_ids": "naive,qa,table,paper,book,laws,presentation,manual,wiki", + } + res: dict[str, Any] = create_team(web_api_auth, team_payload) + + # Should accept valid parser IDs + assert res["code"] == 0, res + diff --git a/test/testcases/test_http_api/test_team_management/test_reject_invite.py b/test/testcases/test_http_api/test_team_management/test_reject_invite.py new file mode 100644 index 000000000..370f0c8fb --- /dev/null +++ b/test/testcases/test_http_api/test_team_management/test_reject_invite.py @@ -0,0 +1,363 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import annotations + +import time +import uuid +from typing import Any + +import pytest + +from common import ( + add_users_to_team, + create_team, + create_user, + encrypt_password, + login_as_user, + reject_team_invitation, +) +from configs import INVALID_API_TOKEN +from libs.auth import RAGFlowWebApiAuth + + +# --------------------------------------------------------------------------- +# Test Classes +# --------------------------------------------------------------------------- + + +@pytest.mark.p1 +class TestAuthorization: + """Tests for authentication behavior when rejecting team invitations.""" + + @pytest.mark.parametrize( + ("invalid_auth", "expected_code", "expected_message"), + [ + (None, 401, "Unauthorized"), + (RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "Unauthorized"), + ], + ) + def test_invalid_auth( + self, + invalid_auth: RAGFlowWebApiAuth | None, + expected_code: int, + expected_message: str, + web_api_auth: RAGFlowWebApiAuth, + ) -> None: + """Test rejecting invitation with invalid or missing authentication.""" + # Create a team and send invitation first + team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"} + team_res: dict[str, Any] = create_team(web_api_auth, team_payload) + if team_res["code"] != 0: + pytest.skip("Team creation failed, skipping auth test") + + tenant_id: str = team_res["data"]["id"] + + # Create and invite a user + email = f"testuser_{uuid.uuid4().hex[:8]}@example.com" + password = "TestPassword123!" + encrypted_password = encrypt_password(password) + user_payload: dict[str, str] = { + "email": email, + "password": encrypted_password, + "nickname": "Test User", + } + user_res: dict[str, Any] = create_user(web_api_auth, user_payload) + if user_res["code"] != 0: + pytest.skip("User creation failed, skipping auth test") + + add_payload: dict[str, list[str]] = {"users": [email]} + add_users_to_team(web_api_auth, tenant_id, add_payload) + + # Try to reject invitation with invalid auth + res: dict[str, Any] = reject_team_invitation(invalid_auth, tenant_id) + assert res["code"] == expected_code, res + if expected_message: + assert expected_message in res["message"] + + +@pytest.mark.p1 +class TestRejectInvite: + """Comprehensive tests for rejecting team invitations.""" + + @pytest.fixture + def test_team(self, web_api_auth: RAGFlowWebApiAuth) -> dict[str, Any]: + """Create a test team for use in tests.""" + team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"} + res: dict[str, Any] = create_team(web_api_auth, team_payload) + assert res["code"] == 0 + return res["data"] + + @pytest.fixture + def invited_user(self, web_api_auth: RAGFlowWebApiAuth) -> dict[str, Any]: + """Create a test user who will be invited.""" + email = f"inviteduser_{uuid.uuid4().hex[:8]}@example.com" + password = "TestPassword123!" + encrypted_password = encrypt_password(password) + user_payload: dict[str, str] = { + "email": email, + "password": encrypted_password, + "nickname": "Invited User", + } + user_res: dict[str, Any] = create_user(web_api_auth, user_payload) + assert user_res["code"] == 0 + return { + "email": email, + "id": user_res["data"]["id"], + "password": password, + } + + @pytest.fixture + def team_with_invitation( + self, + web_api_auth: RAGFlowWebApiAuth, + test_team: dict[str, Any], + invited_user: dict[str, Any], + ) -> dict[str, Any]: + """Create a team and send invitation to a user.""" + tenant_id: str = test_team["id"] + add_payload: dict[str, list[str]] = {"users": [invited_user["email"]]} + add_res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload) + assert add_res["code"] == 0 + return { + "team": test_team, + "invited_user": invited_user, + } + + @pytest.mark.p1 + def test_reject_invitation_success( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_invitation: dict[str, Any], + ) -> None: + """Test successfully rejecting an invitation.""" + tenant_id: str = team_with_invitation["team"]["id"] + invited_user: dict[str, Any] = team_with_invitation["invited_user"] + + # Login as the invited user + user_auth: RAGFlowWebApiAuth = login_as_user(invited_user["email"], invited_user["password"]) + + # Reject the invitation + res: dict[str, Any] = reject_team_invitation(user_auth, tenant_id) + assert res["code"] == 0, res + assert res["data"] is True + assert "rejected" in res["message"].lower() or "successfully" in res["message"].lower() + + @pytest.mark.p1 + def test_reject_invitation_no_invitation( + self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any] + ) -> None: + """Test rejecting an invitation when no invitation exists.""" + # Create a user who is not invited + email = f"notinvited_{uuid.uuid4().hex[:8]}@example.com" + password = "TestPassword123!" + encrypted_password = encrypt_password(password) + user_payload: dict[str, str] = { + "email": email, + "password": encrypted_password, + "nickname": "Not Invited User", + } + user_res: dict[str, Any] = create_user(web_api_auth, user_payload) + assert user_res["code"] == 0 + + # Login as the user + user_auth: RAGFlowWebApiAuth = login_as_user(email, password) + + # Try to reject invitation for a team they're not invited to + tenant_id: str = test_team["id"] + res: dict[str, Any] = reject_team_invitation(user_auth, tenant_id) + assert res["code"] != 0 + assert "not found" in res["message"].lower() or "invitation" in res["message"].lower() + + @pytest.mark.p1 + def test_reject_invitation_already_rejected( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_invitation: dict[str, Any], + ) -> None: + """Test rejecting an invitation that has already been rejected.""" + tenant_id: str = team_with_invitation["team"]["id"] + invited_user: dict[str, Any] = team_with_invitation["invited_user"] + + # Login as the invited user + user_auth: RAGFlowWebApiAuth = login_as_user(invited_user["email"], invited_user["password"]) + + # Reject the invitation first time + res1: dict[str, Any] = reject_team_invitation(user_auth, tenant_id) + assert res1["code"] == 0 + + # Try to reject again (should fail - invitation no longer exists) + res2: dict[str, Any] = reject_team_invitation(user_auth, tenant_id) + assert res2["code"] != 0 + assert "not found" in res2["message"].lower() or "invitation" in res2["message"].lower() + + @pytest.mark.p1 + def test_reject_invitation_after_accepted( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_invitation: dict[str, Any], + ) -> None: + """Test rejecting an invitation that has already been accepted.""" + from common import accept_team_invitation + + tenant_id: str = team_with_invitation["team"]["id"] + invited_user: dict[str, Any] = team_with_invitation["invited_user"] + + # Login as the invited user + user_auth: RAGFlowWebApiAuth = login_as_user(invited_user["email"], invited_user["password"]) + + # Accept the invitation first + accept_res: dict[str, Any] = accept_team_invitation(user_auth, tenant_id) + assert accept_res["code"] == 0 + + # Try to reject after accepting (should fail - no longer has INVITE role) + res: dict[str, Any] = reject_team_invitation(user_auth, tenant_id) + assert res["code"] != 0 + assert "invite" in res["message"].lower() or "role" in res["message"].lower() or "not found" in res["message"].lower() + + @pytest.mark.p1 + def test_reject_invitation_invalid_tenant_id( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_invitation: dict[str, Any], + ) -> None: + """Test rejecting an invitation with invalid team ID.""" + invited_user: dict[str, Any] = team_with_invitation["invited_user"] + + # Login as the invited user + user_auth: RAGFlowWebApiAuth = login_as_user(invited_user["email"], invited_user["password"]) + + # Try to reject invitation for non-existent team + invalid_tenant_id: str = f"invalid_{uuid.uuid4().hex[:8]}" + res: dict[str, Any] = reject_team_invitation(user_auth, invalid_tenant_id) + assert res["code"] != 0 + assert "not found" in res["message"].lower() or "invitation" in res["message"].lower() + + @pytest.mark.p1 + def test_reject_invitation_response_structure( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_invitation: dict[str, Any], + ) -> None: + """Test that rejecting invitation returns the expected response structure.""" + tenant_id: str = team_with_invitation["team"]["id"] + invited_user: dict[str, Any] = team_with_invitation["invited_user"] + + # Login as the invited user + user_auth: RAGFlowWebApiAuth = login_as_user(invited_user["email"], invited_user["password"]) + + # Reject the invitation + res: dict[str, Any] = reject_team_invitation(user_auth, tenant_id) + assert res["code"] == 0 + assert "data" in res + assert res["data"] is True + assert "message" in res + assert isinstance(res["message"], str) + assert "successfully" in res["message"].lower() or "rejected" in res["message"].lower() + + @pytest.mark.p1 + def test_reject_invitation_removes_relationship( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_invitation: dict[str, Any], + ) -> None: + """Test that rejecting an invitation removes the user-tenant relationship.""" + tenant_id: str = team_with_invitation["team"]["id"] + invited_user: dict[str, Any] = team_with_invitation["invited_user"] + + # Login as the invited user + user_auth: RAGFlowWebApiAuth = login_as_user(invited_user["email"], invited_user["password"]) + + # Reject the invitation + res: dict[str, Any] = reject_team_invitation(user_auth, tenant_id) + assert res["code"] == 0 + + # Try to reject again (should fail - relationship removed) + res2: dict[str, Any] = reject_team_invitation(user_auth, tenant_id) + assert res2["code"] != 0 + assert "not found" in res2["message"].lower() or "invitation" in res2["message"].lower() + + @pytest.mark.p2 + def test_reject_invitation_wrong_user( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_invitation: dict[str, Any], + ) -> None: + """Test that a user cannot reject another user's invitation.""" + # Create another user who is not invited + email = f"otheruser_{uuid.uuid4().hex[:8]}@example.com" + password = "TestPassword123!" + encrypted_password = encrypt_password(password) + user_payload: dict[str, str] = { + "email": email, + "password": encrypted_password, + "nickname": "Other User", + } + user_res: dict[str, Any] = create_user(web_api_auth, user_payload) + assert user_res["code"] == 0 + + # Login as the other user + other_user_auth: RAGFlowWebApiAuth = login_as_user(email, password) + + # Try to reject invitation meant for another user + tenant_id: str = team_with_invitation["team"]["id"] + res: dict[str, Any] = reject_team_invitation(other_user_auth, tenant_id) + assert res["code"] != 0 + assert "not found" in res["message"].lower() or "invitation" in res["message"].lower() + + @pytest.mark.p2 + def test_reject_invitation_multiple_invitations( + self, web_api_auth: RAGFlowWebApiAuth + ) -> None: + """Test rejecting invitations to multiple teams.""" + # Create two teams + team1_payload: dict[str, str] = {"name": f"Team 1 {uuid.uuid4().hex[:8]}"} + team1_res: dict[str, Any] = create_team(web_api_auth, team1_payload) + assert team1_res["code"] == 0 + tenant_id_1: str = team1_res["data"]["id"] + + team2_payload: dict[str, str] = {"name": f"Team 2 {uuid.uuid4().hex[:8]}"} + team2_res: dict[str, Any] = create_team(web_api_auth, team2_payload) + assert team2_res["code"] == 0 + tenant_id_2: str = team2_res["data"]["id"] + + # Create and invite a user to both teams + email = f"multiuser_{uuid.uuid4().hex[:8]}@example.com" + password = "TestPassword123!" + encrypted_password = encrypt_password(password) + user_payload: dict[str, str] = { + "email": email, + "password": encrypted_password, + "nickname": "Multi User", + } + user_res: dict[str, Any] = create_user(web_api_auth, user_payload) + assert user_res["code"] == 0 + + # Invite to both teams + add_payload1: dict[str, list[str]] = {"users": [email]} + add_users_to_team(web_api_auth, tenant_id_1, add_payload1) + add_payload2: dict[str, list[str]] = {"users": [email]} + add_users_to_team(web_api_auth, tenant_id_2, add_payload2) + + # Login as the user + user_auth: RAGFlowWebApiAuth = login_as_user(email, password) + + # Reject both invitations + res1: dict[str, Any] = reject_team_invitation(user_auth, tenant_id_1) + assert res1["code"] == 0 + + res2: dict[str, Any] = reject_team_invitation(user_auth, tenant_id_2) + assert res2["code"] == 0 + diff --git a/test/testcases/test_http_api/test_team_management/test_remove_users.py b/test/testcases/test_http_api/test_team_management/test_remove_users.py new file mode 100644 index 000000000..22729248e --- /dev/null +++ b/test/testcases/test_http_api/test_team_management/test_remove_users.py @@ -0,0 +1,376 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import annotations + +import time +import uuid +from typing import Any + +import pytest + +from common import ( + add_users_to_team, + create_team, + create_user, + encrypt_password, + login_as_user, + remove_users_from_team, +) +from configs import INVALID_API_TOKEN +from libs.auth import RAGFlowWebApiAuth + + +# --------------------------------------------------------------------------- +# Test Classes +# --------------------------------------------------------------------------- + + +@pytest.mark.p1 +class TestAuthorization: + """Tests for authentication behavior when removing users from a team.""" + + @pytest.mark.parametrize( + ("invalid_auth", "expected_code", "expected_message"), + [ + (None, 401, "Unauthorized"), + (RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "Unauthorized"), + ], + ) + def test_invalid_auth( + self, + invalid_auth: RAGFlowWebApiAuth | None, + expected_code: int, + expected_message: str, + web_api_auth: RAGFlowWebApiAuth, + ) -> None: + """Test removing users with invalid or missing authentication.""" + # Create a team and add a user first + team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"} + team_res: dict[str, Any] = create_team(web_api_auth, team_payload) + if team_res["code"] != 0: + pytest.skip("Team creation failed, skipping auth test") + + tenant_id: str = team_res["data"]["id"] + + # Create and add a user + email = f"testuser_{uuid.uuid4().hex[:8]}@example.com" + password = "TestPassword123!" + encrypted_password = encrypt_password(password) + user_payload: dict[str, str] = { + "email": email, + "password": encrypted_password, + "nickname": "Test User", + } + user_res: dict[str, Any] = create_user(web_api_auth, user_payload) + if user_res["code"] != 0: + pytest.skip("User creation failed, skipping auth test") + + user_id: str = user_res["data"]["id"] + add_payload: dict[str, list[str]] = {"users": [email]} + add_users_to_team(web_api_auth, tenant_id, add_payload) + + # Try to remove user with invalid auth + remove_payload: dict[str, list[str]] = {"user_ids": [user_id]} + res: dict[str, Any] = remove_users_from_team(invalid_auth, tenant_id, remove_payload) + assert res["code"] == expected_code, res + if expected_message: + assert expected_message in res["message"] + + +@pytest.mark.p1 +class TestRemoveUsers: + """Comprehensive tests for removing users from a team.""" + + @pytest.fixture + def test_team(self, web_api_auth: RAGFlowWebApiAuth) -> dict[str, Any]: + """Create a test team for use in tests.""" + team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"} + res: dict[str, Any] = create_team(web_api_auth, team_payload) + assert res["code"] == 0 + return res["data"] + + @pytest.fixture + def test_users(self, web_api_auth: RAGFlowWebApiAuth) -> list[dict[str, Any]]: + """Create test users for use in tests.""" + users = [] + for i in range(5): + email = f"testuser{i}_{uuid.uuid4().hex[:8]}@example.com" + password = "TestPassword123!" + encrypted_password = encrypt_password(password) + user_payload: dict[str, str] = { + "email": email, + "password": encrypted_password, + "nickname": f"Test User {i}", + } + user_res: dict[str, Any] = create_user(web_api_auth, user_payload) + if user_res["code"] == 0: + users.append({"email": email, "id": user_res["data"]["id"], "password": password}) + return users + + @pytest.fixture + def team_with_users( + self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] + ) -> dict[str, Any]: + """Create a team with users already added.""" + if not test_users: + return {"team": test_team, "users": []} + + tenant_id: str = test_team["id"] + user_emails: list[str] = [user["email"] for user in test_users[:3]] + + add_payload: dict[str, list[str]] = {"users": user_emails} + add_res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload) + assert add_res["code"] == 0 + + return { + "team": test_team, + "users": test_users[:3], + } + + @pytest.mark.p1 + def test_remove_single_user( + self, web_api_auth: RAGFlowWebApiAuth, team_with_users: dict[str, Any] + ) -> None: + """Test removing a single user from a team.""" + if not team_with_users["users"]: + pytest.skip("No users in team") + + tenant_id: str = team_with_users["team"]["id"] + user_id: str = team_with_users["users"][0]["id"] + + remove_payload: dict[str, list[str]] = {"user_ids": [user_id]} + res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload) + + assert res["code"] == 0, res + assert "data" in res + assert "removed" in res["data"] + assert len(res["data"]["removed"]) == 1 + assert res["data"]["removed"][0]["user_id"] == user_id + assert "failed" in res["data"] + assert len(res["data"]["failed"]) == 0 + + @pytest.mark.p1 + def test_remove_multiple_users( + self, web_api_auth: RAGFlowWebApiAuth, team_with_users: dict[str, Any] + ) -> None: + """Test removing multiple users in bulk.""" + if len(team_with_users["users"]) < 2: + pytest.skip("Need at least 2 users in team") + + tenant_id: str = team_with_users["team"]["id"] + user_ids: list[str] = [user["id"] for user in team_with_users["users"][:2]] + + remove_payload: dict[str, list[str]] = {"user_ids": user_ids} + res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload) + + assert res["code"] == 0, res + assert len(res["data"]["removed"]) == 2 + assert len(res["data"]["failed"]) == 0 + removed_ids = {user["user_id"] for user in res["data"]["removed"]} + assert removed_ids == set(user_ids) + + @pytest.mark.p1 + def test_remove_user_not_in_team( + self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] + ) -> None: + """Test removing a user who is not a member of the team.""" + if len(test_users) < 4: + pytest.skip("Need at least 4 test users") + + tenant_id: str = test_team["id"] + # Use a user that was not added to the team + user_id: str = test_users[3]["id"] + + remove_payload: dict[str, list[str]] = {"user_ids": [user_id]} + res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload) + + # API returns error code when all removals fail + assert res["code"] == 102 # DATA_ERROR + assert len(res["data"]["removed"]) == 0 + assert len(res["data"]["failed"]) == 1 + assert "not a member" in res["data"]["failed"][0]["error"].lower() + + @pytest.mark.p1 + def test_remove_owner( + self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any] + ) -> None: + """Test that owner cannot be removed.""" + tenant_id: str = test_team["id"] + owner_id: str = test_team["owner_id"] + + remove_payload: dict[str, list[str]] = {"user_ids": [owner_id]} + res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload) + + # API returns error code when all removals fail + assert res["code"] == 102 # DATA_ERROR + assert len(res["data"]["removed"]) == 0 + assert len(res["data"]["failed"]) == 1 + assert "owner" in res["data"]["failed"][0]["error"].lower() + + @pytest.mark.p1 + def test_remove_users_partial_success( + self, web_api_auth: RAGFlowWebApiAuth, team_with_users: dict[str, Any], test_users: list[dict[str, Any]] + ) -> None: + """Test removing users where some succeed and some fail.""" + if not team_with_users["users"] or len(test_users) < 4: + pytest.skip("Need users in team and at least 4 test users") + + tenant_id: str = team_with_users["team"]["id"] + # Mix of valid and invalid user IDs + valid_user_id: str = team_with_users["users"][0]["id"] + invalid_user_id: str = test_users[3]["id"] # Not in team + + remove_payload: dict[str, list[str]] = {"user_ids": [valid_user_id, invalid_user_id]} + res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload) + + assert res["code"] == 0, res + assert len(res["data"]["removed"]) == 1 + assert len(res["data"]["failed"]) == 1 + assert res["data"]["removed"][0]["user_id"] == valid_user_id + assert "not a member" in res["data"]["failed"][0]["error"].lower() + + @pytest.mark.p1 + def test_remove_users_empty_list( + self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any] + ) -> None: + """Test removing users with empty list.""" + tenant_id: str = test_team["id"] + remove_payload: dict[str, list[str]] = {"user_ids": []} + + res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload) + assert res["code"] == 101 # ARGUMENT_ERROR + assert "non-empty" in res["message"].lower() or "empty" in res["message"].lower() + + @pytest.mark.p1 + def test_remove_users_missing_user_ids_field( + self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any] + ) -> None: + """Test removing users without 'user_ids' field.""" + tenant_id: str = test_team["id"] + remove_payload: dict[str, Any] = {} + + res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload) + assert res["code"] == 101 # ARGUMENT_ERROR + + @pytest.mark.p1 + def test_remove_users_invalid_user_id_format( + self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any] + ) -> None: + """Test removing users with invalid user ID format.""" + tenant_id: str = test_team["id"] + remove_payload: dict[str, list[Any]] = {"user_ids": [12345]} # Not a string + + res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload) + # API returns error code when all removals fail + assert res["code"] == 102 # DATA_ERROR + assert len(res["data"]["removed"]) == 0 + assert len(res["data"]["failed"]) == 1 + assert "invalid" in res["data"]["failed"][0]["error"].lower() + + @pytest.mark.p1 + def test_remove_users_not_owner_or_admin( + self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] + ) -> None: + """Test that non-admin/non-owner users cannot remove users.""" + if len(test_users) < 2: + pytest.skip("Need at least 2 test users") + + tenant_id: str = test_team["id"] + user_email: str = test_users[0]["email"] + other_user_email: str = test_users[1]["email"] + + # Add two users to the team + add_payload: dict[str, list[str]] = {"users": [user_email, other_user_email]} + add_res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload) + assert add_res["code"] == 0 + + # Small delay to ensure users are fully added + time.sleep(0.5) + + # Login as the first normal user + normal_user_auth: RAGFlowWebApiAuth = login_as_user(user_email, test_users[0]["password"]) + + # Try to remove the other user (normal user should not be able to) + other_user_id: str = test_users[1]["id"] + remove_payload: dict[str, list[str]] = {"user_ids": [other_user_id]} + res: dict[str, Any] = remove_users_from_team(normal_user_auth, tenant_id, remove_payload) + assert res["code"] == 108 # PERMISSION_ERROR + assert "owner" in res["message"].lower() or "admin" in res["message"].lower() + + @pytest.mark.p2 + def test_remove_last_admin( + self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]] + ) -> None: + """Test that the last admin cannot remove themselves.""" + if not test_users: + pytest.skip("No test users created") + + from common import accept_team_invitation + + tenant_id: str = test_team["id"] + user_email: str = test_users[0]["email"] + + # Add user as admin + add_payload: dict[str, list[dict[str, str]]] = { + "users": [{"email": user_email, "role": "admin"}] + } + add_res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload) + assert add_res["code"] == 0 + + # Small delay + time.sleep(0.5) + + # Login as the admin + admin_auth: RAGFlowWebApiAuth = login_as_user(user_email, test_users[0]["password"]) + + # Accept the invitation to become admin + accept_res: dict[str, Any] = accept_team_invitation(admin_auth, tenant_id, role="admin") + assert accept_res["code"] == 0 + + # Small delay to ensure role is updated + time.sleep(0.5) + + admin_user_id: str = test_users[0]["id"] + + # Try to remove the admin (should fail - last admin cannot remove themselves) + remove_payload: dict[str, list[str]] = {"user_ids": [admin_user_id]} + res: dict[str, Any] = remove_users_from_team(admin_auth, tenant_id, remove_payload) + # API may return error code when all removals fail, or permission error if role not updated + assert res["code"] in [102, 108] # DATA_ERROR or PERMISSION_ERROR + if res["code"] == 102: + # If we get DATA_ERROR, check the failed entry + assert len(res["data"]["removed"]) == 0 + assert len(res["data"]["failed"]) == 1 + assert "cannot remove yourself" in res["data"]["failed"][0]["error"].lower() or "at least one" in res["data"]["failed"][0]["error"].lower() + else: + # If we get PERMISSION_ERROR, the user might not have admin role yet + assert "owner" in res["message"].lower() or "admin" in res["message"].lower() + + @pytest.mark.p2 + def test_remove_users_invalid_tenant_id( + self, web_api_auth: RAGFlowWebApiAuth, test_users: list[dict[str, Any]] + ) -> None: + """Test removing users from a non-existent team.""" + if not test_users: + pytest.skip("No test users created") + + invalid_tenant_id: str = f"invalid_{uuid.uuid4().hex[:8]}" + remove_payload: dict[str, list[str]] = {"user_ids": [test_users[0]["id"]]} + + res: dict[str, Any] = remove_users_from_team(web_api_auth, invalid_tenant_id, remove_payload) + assert res["code"] != 0 + assert "not found" in res["message"].lower() or res["code"] in [100, 102, 108] + diff --git a/test/testcases/test_http_api/test_team_management/test_team_advanced.py b/test/testcases/test_http_api/test_team_management/test_team_advanced.py deleted file mode 100644 index 35a812250..000000000 --- a/test/testcases/test_http_api/test_team_management/test_team_advanced.py +++ /dev/null @@ -1,337 +0,0 @@ -# -# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -"""Advanced team management tests.""" - -from __future__ import annotations - -import uuid -from typing import Any - -import pytest - -from common import create_team, create_user -from libs.auth import RAGFlowWebApiAuth - - -@pytest.mark.p2 -class TestTeamAdvanced: - """Advanced team management tests.""" - - def test_team_creation_with_custom_models( - self, web_api_auth: RAGFlowWebApiAuth - ) -> None: - """Test creating team with custom model configurations.""" - team_name: str = f"Custom Models Team {uuid.uuid4().hex[:8]}" - - # Attempt to create team with custom models - # Note: Model IDs need to be valid and added by the user - team_payload: dict[str, str] = { - "name": team_name, - # These would need to be actual model IDs added by the user - # "llm_id": "custom_llm_id", - # "embd_id": "custom_embd_id", - } - res: dict[str, Any] = create_team(web_api_auth, team_payload) - - # Should succeed with defaults if custom models not specified - assert res["code"] == 0, res - assert res["data"]["name"] == team_name - assert "id" in res["data"] - - def test_team_creation_response_structure( - self, web_api_auth: RAGFlowWebApiAuth - ) -> None: - """Test that team creation returns complete response structure.""" - team_name: str = f"Structure Test Team {uuid.uuid4().hex[:8]}" - team_payload: dict[str, str] = {"name": team_name} - res: dict[str, Any] = create_team(web_api_auth, team_payload) - - assert res["code"] == 0, res - assert "data" in res - - # Check required fields - required_fields: list[str] = ["id", "name", "owner_id"] - for field in required_fields: - assert field in res["data"], ( - f"Missing required field: {field}" - ) - - assert res["data"]["name"] == team_name - assert len(res["data"]["id"]) > 0, "Team ID should not be empty" - assert len(res["data"]["owner_id"]) > 0, "Owner ID should not be empty" - - def test_multiple_teams_same_name_allowed( - self, web_api_auth: RAGFlowWebApiAuth - ) -> None: - """Test that multiple teams can have the same name.""" - team_name: str = f"Duplicate Name {uuid.uuid4().hex[:8]}" - - # Create first team - res1: dict[str, Any] = create_team(web_api_auth, {"name": team_name}) - assert res1["code"] == 0, res1 - team_id_1: str = res1["data"]["id"] - - # Create second team with same name - res2: dict[str, Any] = create_team(web_api_auth, {"name": team_name}) - assert res2["code"] == 0, res2 - team_id_2: str = res2["data"]["id"] - - # Teams should have different IDs - assert team_id_1 != team_id_2, "Teams should have unique IDs" - assert res1["data"]["name"] == res2["data"]["name"], ( - "Both teams should have the same name" - ) - - def test_team_creation_with_credit_limit( - self, web_api_auth: RAGFlowWebApiAuth - ) -> None: - """Test creating team with custom credit limit.""" - team_name: str = f"Credit Test Team {uuid.uuid4().hex[:8]}" - custom_credit: int = 1000 - - team_payload: dict[str, Any] = { - "name": team_name, - "credit": custom_credit, - } - res: dict[str, Any] = create_team(web_api_auth, team_payload) - - # Should succeed - assert res["code"] == 0, res - # Note: Credit may not be in response, but should be set internally - - def test_team_name_with_special_characters( - self, web_api_auth: RAGFlowWebApiAuth - ) -> None: - """Test team names with special characters.""" - special_names: list[str] = [ - f"Team-{uuid.uuid4().hex[:4]}_Test!", - f"Team & Co. {uuid.uuid4().hex[:4]}", - f"Team @{uuid.uuid4().hex[:4]}", - f"团队{uuid.uuid4().hex[:4]}", # Unicode - ] - - for name in special_names: - res: dict[str, Any] = create_team(web_api_auth, {"name": name}) - # Should either accept or reject with clear message - if res["code"] == 0: - assert res["data"]["name"] == name, ( - f"Team name should be preserved: {name}" - ) - # If rejected, should have clear error - # (Current implementation accepts special chars) - - def test_team_creation_default_owner( - self, web_api_auth: RAGFlowWebApiAuth - ) -> None: - """Test that team creator is set as owner by default.""" - team_name: str = f"Owner Test Team {uuid.uuid4().hex[:8]}" - res: dict[str, Any] = create_team(web_api_auth, {"name": team_name}) - - assert res["code"] == 0, res - assert "owner_id" in res["data"], "Owner ID should be in response" - # Owner should be the authenticated user - # (Cannot verify without knowing web_api_auth user ID) - - def test_concurrent_team_creation( - self, web_api_auth: RAGFlowWebApiAuth - ) -> None: - """Test concurrent team creation.""" - import concurrent.futures - - def create_test_team(index: int) -> dict[str, Any]: - team_name: str = f"Concurrent Team {index}_{uuid.uuid4().hex[:8]}" - return create_team(web_api_auth, {"name": team_name}) - - # Create 10 teams concurrently - count: int = 10 - with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: - futures: list[concurrent.futures.Future[dict[str, Any]]] = [ - executor.submit(create_test_team, i) for i in range(count) - ] - results: list[dict[str, Any]] = [ - f.result() for f in concurrent.futures.as_completed(futures) - ] - - # All should succeed - success_count: int = sum(1 for r in results if r["code"] == 0) - assert success_count == count, ( - f"Expected {count} successful team creations, got {success_count}" - ) - - # All should have unique IDs - team_ids: list[str] = [r["data"]["id"] for r in results if r["code"] == 0] - assert len(team_ids) == len(set(team_ids)), ( - "All team IDs should be unique" - ) - - def test_team_with_invalid_model_id( - self, web_api_auth: RAGFlowWebApiAuth - ) -> None: - """Test team creation with invalid model ID.""" - team_name: str = f"Invalid Model Team {uuid.uuid4().hex[:8]}" - team_payload: dict[str, str] = { - "name": team_name, - "llm_id": "invalid_nonexistent_model_id_12345", - } - res: dict[str, Any] = create_team(web_api_auth, team_payload) - - # Should reject with clear error message - assert res["code"] != 0, "Invalid model ID should be rejected" - assert ( - "model" in res["message"].lower() - or "not found" in res["message"].lower() - or "invalid" in res["message"].lower() - ), "Error message should mention model issue" - - def test_team_creation_with_negative_credit( - self, web_api_auth: RAGFlowWebApiAuth - ) -> None: - """Test team creation with negative credit.""" - team_name: str = f"Negative Credit Team {uuid.uuid4().hex[:8]}" - team_payload: dict[str, Any] = { - "name": team_name, - "credit": -100, - } - res: dict[str, Any] = create_team(web_api_auth, team_payload) - - # Should reject negative credit - assert res["code"] != 0, "Negative credit should be rejected" - assert "credit" in res["message"].lower(), ( - "Error message should mention credit" - ) - - def test_team_creation_empty_json_payload( - self, web_api_auth: RAGFlowWebApiAuth - ) -> None: - """Test team creation with completely empty payload.""" - res: dict[str, Any] = create_team(web_api_auth, {}) - - # Should reject with clear error - assert res["code"] != 0, "Empty payload should be rejected" - assert ( - "name" in res["message"].lower() - or "required" in res["message"].lower() - ), "Error should mention missing name" - - def test_team_unicode_name( - self, web_api_auth: RAGFlowWebApiAuth - ) -> None: - """Test team creation with full unicode name.""" - unicode_names: list[str] = [ - f"团队{uuid.uuid4().hex[:4]}", # Chinese - f"チーム{uuid.uuid4().hex[:4]}", # Japanese - f"Команда{uuid.uuid4().hex[:4]}", # Russian - f"فريق{uuid.uuid4().hex[:4]}", # Arabic (RTL) - f"😀🎉{uuid.uuid4().hex[:4]}", # Emoji - ] - - for name in unicode_names: - res: dict[str, Any] = create_team(web_api_auth, {"name": name}) - - # Should handle unicode properly - if res["code"] == 0: - # Verify unicode is preserved (may be normalized) - assert len(res["data"]["name"]) > 0, ( - "Team name should not be empty after unicode" - ) - - @pytest.mark.p3 - def test_team_creation_with_all_optional_params( - self, web_api_auth: RAGFlowWebApiAuth - ) -> None: - """Test team creation with all optional parameters.""" - team_name: str = f"Full Params Team {uuid.uuid4().hex[:8]}" - team_payload: dict[str, Any] = { - "name": team_name, - "credit": 2000, - # Note: Model IDs would need to be valid - # "llm_id": "valid_llm_id", - # "embd_id": "valid_embd_id", - # "asr_id": "valid_asr_id", - # "parser_ids": "valid_parser_ids", - # "img2txt_id": "valid_img2txt_id", - # "rerank_id": "valid_rerank_id", - } - res: dict[str, Any] = create_team(web_api_auth, team_payload) - - # Should succeed - assert res["code"] == 0, res - assert res["data"]["name"] == team_name - - @pytest.mark.p3 - def test_team_max_name_length( - self, web_api_auth: RAGFlowWebApiAuth - ) -> None: - """Test team with maximum allowed name length.""" - # API spec says max 100 characters - max_name: str = "A" * 100 - res: dict[str, Any] = create_team(web_api_auth, {"name": max_name}) - - # Should accept 100 characters - assert res["code"] == 0, "100-character name should be accepted" - assert res["data"]["name"] == max_name - - @pytest.mark.p3 - def test_team_name_just_over_limit( - self, web_api_auth: RAGFlowWebApiAuth - ) -> None: - """Test team with name just over limit.""" - # 101 characters (1 over limit) - long_name: str = "A" * 101 - res: dict[str, Any] = create_team(web_api_auth, {"name": long_name}) - - # Should reject - assert res["code"] != 0, "101-character name should be rejected" - assert ( - "100" in res["message"] - or "length" in res["message"].lower() - or "long" in res["message"].lower() - ), "Error should mention length limit" - - @pytest.mark.p3 - def test_team_creation_idempotency( - self, web_api_auth: RAGFlowWebApiAuth - ) -> None: - """Test that repeated team creation creates separate teams.""" - team_name: str = f"Idempotency Test {uuid.uuid4().hex[:8]}" - payload: dict[str, str] = {"name": team_name} - - # Create same team twice - res1: dict[str, Any] = create_team(web_api_auth, payload) - res2: dict[str, Any] = create_team(web_api_auth, payload) - - # Both should succeed and create different teams - assert res1["code"] == 0, res1 - assert res2["code"] == 0, res2 - assert res1["data"]["id"] != res2["data"]["id"], ( - "Should create different teams, not be idempotent" - ) - - @pytest.mark.p3 - def test_team_with_parser_ids( - self, web_api_auth: RAGFlowWebApiAuth - ) -> None: - """Test team creation with custom parser IDs.""" - team_name: str = f"Parser Test {uuid.uuid4().hex[:8]}" - # parser_ids is typically a comma-separated string - team_payload: dict[str, str] = { - "name": team_name, - "parser_ids": "naive,qa,table,paper,book,laws,presentation,manual,wiki", - } - res: dict[str, Any] = create_team(web_api_auth, team_payload) - - # Should accept valid parser IDs - assert res["code"] == 0, res