[OND211-2329]: Added new tests and updated existing tests for team API.

This commit is contained in:
Hetavi Shah 2025-11-19 18:14:49 +05:30
parent b54a6ace78
commit 0d48560e66
7 changed files with 1524 additions and 552 deletions

View file

@ -481,6 +481,7 @@ def remove_users_from_team(
def accept_team_invitation(
auth: Union[AuthBase, str, None],
tenant_id: str,
role: Optional[str] = None,
*,
headers: Dict[str, str] = HEADERS,
) -> Dict[str, Any]:
@ -489,6 +490,7 @@ def accept_team_invitation(
Args:
auth: Authentication object (AuthBase subclass), token string, or None.
tenant_id: The tenant/team ID to accept invitation for.
role: Optional role to assign after acceptance (normal, admin). Defaults to normal.
headers: Optional HTTP headers. Defaults to HEADERS.
Returns:
@ -498,7 +500,36 @@ def accept_team_invitation(
requests.RequestException: If the HTTP request fails.
"""
url: str = f"{HOST_ADDRESS}{TEAM_API_URL}/update-request/{tenant_id}"
payload: Dict[str, bool] = {"accept": True}
payload: Dict[str, Any] = {"accept": True}
if role:
payload["role"] = role
res: requests.Response = requests.put(
url=url, headers=headers, auth=auth, json=payload
)
return res.json()
def reject_team_invitation(
auth: Union[AuthBase, str, None],
tenant_id: str,
*,
headers: Dict[str, str] = HEADERS,
) -> Dict[str, Any]:
"""Reject a team invitation.
Args:
auth: Authentication object (AuthBase subclass), token string, or None.
tenant_id: The tenant/team ID to reject invitation for.
headers: Optional HTTP headers. Defaults to HEADERS.
Returns:
JSON response as a dictionary containing the rejection result.
Raises:
requests.RequestException: If the HTTP request fails.
"""
url: str = f"{HOST_ADDRESS}{TEAM_API_URL}/update-request/{tenant_id}"
payload: Dict[str, bool] = {"accept": False}
res: requests.Response = requests.put(
url=url, headers=headers, auth=auth, json=payload
)

View file

@ -0,0 +1,357 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations
import time
import uuid
from typing import Any
import pytest
from common import (
accept_team_invitation,
add_users_to_team,
create_team,
create_user,
encrypt_password,
login_as_user,
)
from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowWebApiAuth
# ---------------------------------------------------------------------------
# Test Classes
# ---------------------------------------------------------------------------
@pytest.mark.p1
class TestAuthorization:
"""Tests for authentication behavior when accepting team invitations."""
@pytest.mark.parametrize(
("invalid_auth", "expected_code", "expected_message"),
[
(None, 401, "Unauthorized"),
(RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "Unauthorized"),
],
)
def test_invalid_auth(
self,
invalid_auth: RAGFlowWebApiAuth | None,
expected_code: int,
expected_message: str,
web_api_auth: RAGFlowWebApiAuth,
) -> None:
"""Test accepting invitation with invalid or missing authentication."""
# Create a team and send invitation first
team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"}
team_res: dict[str, Any] = create_team(web_api_auth, team_payload)
if team_res["code"] != 0:
pytest.skip("Team creation failed, skipping auth test")
tenant_id: str = team_res["data"]["id"]
# Create and invite a user
email = f"testuser_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
encrypted_password = encrypt_password(password)
user_payload: dict[str, str] = {
"email": email,
"password": encrypted_password,
"nickname": "Test User",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
if user_res["code"] != 0:
pytest.skip("User creation failed, skipping auth test")
add_payload: dict[str, list[str]] = {"users": [email]}
add_users_to_team(web_api_auth, tenant_id, add_payload)
# Try to accept invitation with invalid auth
res: dict[str, Any] = accept_team_invitation(invalid_auth, tenant_id)
assert res["code"] == expected_code, res
if expected_message:
assert expected_message in res["message"]
@pytest.mark.p1
class TestAcceptInvite:
"""Comprehensive tests for accepting team invitations."""
@pytest.fixture
def test_team(self, web_api_auth: RAGFlowWebApiAuth) -> dict[str, Any]:
"""Create a test team for use in tests."""
team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"}
res: dict[str, Any] = create_team(web_api_auth, team_payload)
assert res["code"] == 0
return res["data"]
@pytest.fixture
def invited_user(self, web_api_auth: RAGFlowWebApiAuth) -> dict[str, Any]:
"""Create a test user who will be invited."""
email = f"inviteduser_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
encrypted_password = encrypt_password(password)
user_payload: dict[str, str] = {
"email": email,
"password": encrypted_password,
"nickname": "Invited User",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
assert user_res["code"] == 0
return {
"email": email,
"id": user_res["data"]["id"],
"password": password,
}
@pytest.fixture
def team_with_invitation(
self,
web_api_auth: RAGFlowWebApiAuth,
test_team: dict[str, Any],
invited_user: dict[str, Any],
) -> dict[str, Any]:
"""Create a team and send invitation to a user."""
tenant_id: str = test_team["id"]
add_payload: dict[str, list[str]] = {"users": [invited_user["email"]]}
add_res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload)
assert add_res["code"] == 0
return {
"team": test_team,
"invited_user": invited_user,
}
@pytest.mark.p1
def test_accept_invitation_default_role(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_invitation: dict[str, Any],
) -> None:
"""Test accepting an invitation with default role (normal)."""
tenant_id: str = team_with_invitation["team"]["id"]
invited_user: dict[str, Any] = team_with_invitation["invited_user"]
# Login as the invited user
user_auth: RAGFlowWebApiAuth = login_as_user(invited_user["email"], invited_user["password"])
# Accept the invitation
res: dict[str, Any] = accept_team_invitation(user_auth, tenant_id)
assert res["code"] == 0, res
assert res["data"] is True
assert "joined" in res["message"].lower() or "successfully" in res["message"].lower()
assert "normal" in res["message"].lower() or "role" in res["message"].lower()
@pytest.mark.p1
def test_accept_invitation_with_admin_role(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_invitation: dict[str, Any],
) -> None:
"""Test accepting an invitation with admin role."""
tenant_id: str = team_with_invitation["team"]["id"]
invited_user: dict[str, Any] = team_with_invitation["invited_user"]
# Login as the invited user
user_auth: RAGFlowWebApiAuth = login_as_user(invited_user["email"], invited_user["password"])
# Accept the invitation with admin role
res: dict[str, Any] = accept_team_invitation(user_auth, tenant_id, role="admin")
assert res["code"] == 0, res
assert res["data"] is True
assert "joined" in res["message"].lower() or "successfully" in res["message"].lower()
assert "admin" in res["message"].lower() or "role" in res["message"].lower()
@pytest.mark.p1
def test_accept_invitation_with_normal_role(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_invitation: dict[str, Any],
) -> None:
"""Test accepting an invitation with normal role explicitly."""
tenant_id: str = team_with_invitation["team"]["id"]
invited_user: dict[str, Any] = team_with_invitation["invited_user"]
# Login as the invited user
user_auth: RAGFlowWebApiAuth = login_as_user(invited_user["email"], invited_user["password"])
# Accept the invitation with normal role
res: dict[str, Any] = accept_team_invitation(user_auth, tenant_id, role="normal")
assert res["code"] == 0, res
assert res["data"] is True
assert "joined" in res["message"].lower() or "successfully" in res["message"].lower()
@pytest.mark.p1
def test_accept_invitation_no_invitation(
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any]
) -> None:
"""Test accepting an invitation when no invitation exists."""
# Create a user who is not invited
email = f"notinvited_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
encrypted_password = encrypt_password(password)
user_payload: dict[str, str] = {
"email": email,
"password": encrypted_password,
"nickname": "Not Invited User",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
assert user_res["code"] == 0
# Login as the user
user_auth: RAGFlowWebApiAuth = login_as_user(email, password)
# Try to accept invitation for a team they're not invited to
tenant_id: str = test_team["id"]
res: dict[str, Any] = accept_team_invitation(user_auth, tenant_id)
assert res["code"] != 0
assert "not found" in res["message"].lower() or "invitation" in res["message"].lower()
@pytest.mark.p1
def test_accept_invitation_already_accepted(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_invitation: dict[str, Any],
) -> None:
"""Test accepting an invitation that has already been accepted."""
tenant_id: str = team_with_invitation["team"]["id"]
invited_user: dict[str, Any] = team_with_invitation["invited_user"]
# Login as the invited user
user_auth: RAGFlowWebApiAuth = login_as_user(invited_user["email"], invited_user["password"])
# Accept the invitation first time
res1: dict[str, Any] = accept_team_invitation(user_auth, tenant_id)
assert res1["code"] == 0
# Try to accept again
res2: dict[str, Any] = accept_team_invitation(user_auth, tenant_id)
assert res2["code"] != 0
assert "invite" in res2["message"].lower() or "role" in res2["message"].lower() or "not found" in res2["message"].lower()
@pytest.mark.p1
def test_accept_invitation_invalid_tenant_id(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_invitation: dict[str, Any],
) -> None:
"""Test accepting an invitation with invalid team ID."""
invited_user: dict[str, Any] = team_with_invitation["invited_user"]
# Login as the invited user
user_auth: RAGFlowWebApiAuth = login_as_user(invited_user["email"], invited_user["password"])
# Try to accept invitation for non-existent team
invalid_tenant_id: str = f"invalid_{uuid.uuid4().hex[:8]}"
res: dict[str, Any] = accept_team_invitation(user_auth, invalid_tenant_id)
assert res["code"] != 0
assert "not found" in res["message"].lower() or "invitation" in res["message"].lower()
@pytest.mark.p1
def test_accept_invitation_response_structure(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_invitation: dict[str, Any],
) -> None:
"""Test that accepting invitation returns the expected response structure."""
tenant_id: str = team_with_invitation["team"]["id"]
invited_user: dict[str, Any] = team_with_invitation["invited_user"]
# Login as the invited user
user_auth: RAGFlowWebApiAuth = login_as_user(invited_user["email"], invited_user["password"])
# Accept the invitation
res: dict[str, Any] = accept_team_invitation(user_auth, tenant_id)
assert res["code"] == 0
assert "data" in res
assert res["data"] is True
assert "message" in res
assert isinstance(res["message"], str)
assert "successfully" in res["message"].lower() or "joined" in res["message"].lower()
@pytest.mark.p2
def test_accept_invitation_wrong_user(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_invitation: dict[str, Any],
) -> None:
"""Test that a user cannot accept another user's invitation."""
# Create another user who is not invited
email = f"otheruser_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
encrypted_password = encrypt_password(password)
user_payload: dict[str, str] = {
"email": email,
"password": encrypted_password,
"nickname": "Other User",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
assert user_res["code"] == 0
# Login as the other user
other_user_auth: RAGFlowWebApiAuth = login_as_user(email, password)
# Try to accept invitation meant for another user
tenant_id: str = team_with_invitation["team"]["id"]
res: dict[str, Any] = accept_team_invitation(other_user_auth, tenant_id)
assert res["code"] != 0
assert "not found" in res["message"].lower() or "invitation" in res["message"].lower()
@pytest.mark.p2
def test_accept_invitation_multiple_invitations(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test accepting invitations to multiple teams."""
# Create two teams
team1_payload: dict[str, str] = {"name": f"Team 1 {uuid.uuid4().hex[:8]}"}
team1_res: dict[str, Any] = create_team(web_api_auth, team1_payload)
assert team1_res["code"] == 0
tenant_id_1: str = team1_res["data"]["id"]
team2_payload: dict[str, str] = {"name": f"Team 2 {uuid.uuid4().hex[:8]}"}
team2_res: dict[str, Any] = create_team(web_api_auth, team2_payload)
assert team2_res["code"] == 0
tenant_id_2: str = team2_res["data"]["id"]
# Create and invite a user to both teams
email = f"multiuser_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
encrypted_password = encrypt_password(password)
user_payload: dict[str, str] = {
"email": email,
"password": encrypted_password,
"nickname": "Multi User",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
assert user_res["code"] == 0
# Invite to both teams
add_payload1: dict[str, list[str]] = {"users": [email]}
add_users_to_team(web_api_auth, tenant_id_1, add_payload1)
add_payload2: dict[str, list[str]] = {"users": [email]}
add_users_to_team(web_api_auth, tenant_id_2, add_payload2)
# Login as the user
user_auth: RAGFlowWebApiAuth = login_as_user(email, password)
# Accept both invitations
res1: dict[str, Any] = accept_team_invitation(user_auth, tenant_id_1)
assert res1["code"] == 0
res2: dict[str, Any] = accept_team_invitation(user_auth, tenant_id_2)
assert res2["code"] == 0

View file

@ -15,6 +15,7 @@
#
from __future__ import annotations
import time
import uuid
from typing import Any
@ -24,7 +25,8 @@ from common import (
add_users_to_team,
create_team,
create_user,
remove_users_from_team,
encrypt_password,
login_as_user,
)
from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowWebApiAuth
@ -36,32 +38,38 @@ from libs.auth import RAGFlowWebApiAuth
@pytest.mark.p1
class TestAddUsersAuthorization:
class TestAuthorization:
"""Tests for authentication behavior when adding users to a team."""
@pytest.mark.parametrize(
("invalid_auth", "expected_code"),
("invalid_auth", "expected_code", "expected_message"),
[
(None, 401),
(RAGFlowWebApiAuth(INVALID_API_TOKEN), 401),
(None, 401, "Unauthorized"),
(RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "Unauthorized"),
],
)
def test_add_users_invalid_auth(
def test_invalid_auth(
self,
invalid_auth: RAGFlowWebApiAuth | None,
expected_code: int,
expected_message: str,
web_api_auth: RAGFlowWebApiAuth,
) -> None:
"""Test adding users with invalid or missing authentication."""
# Create a team first
team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"}
team_res: dict[str, Any] = create_team(web_api_auth, team_payload)
if team_res["code"] != 0:
pytest.skip("Team creation failed, skipping auth test")
tenant_id: str = team_res["data"]["id"]
# Try to add users with invalid auth
add_payload: dict[str, list[str]] = {"users": ["test@example.com"]}
res: dict[str, Any] = add_users_to_team(invalid_auth, tenant_id, add_payload)
assert res["code"] == expected_code
assert res["code"] == expected_code, res
if expected_message:
assert expected_message in res["message"]
@pytest.mark.p1
@ -83,14 +91,15 @@ class TestAddUsers:
for i in range(5):
email = f"testuser{i}_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
encrypted_password = encrypt_password(password)
user_payload: dict[str, str] = {
"email": email,
"password": password,
"password": encrypted_password,
"nickname": f"Test User {i}",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
if user_res["code"] == 0:
users.append({"email": email, "id": user_res["data"]["id"]})
users.append({"email": email, "id": user_res["data"]["id"], "password": password})
return users
@pytest.mark.p1
@ -98,18 +107,21 @@ class TestAddUsers:
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
) -> None:
"""Test adding a single user using email string format."""
if not test_users:
pytest.skip("No test users created")
tenant_id: str = test_team["id"]
user_email: str = test_users[0]["email"]
add_payload: dict[str, list[str]] = {"users": [user_email]}
res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload)
assert res["code"] == 0
assert res["code"] == 0, res
assert "data" in res
assert "added" in res["data"]
assert len(res["data"]["added"]) == 1
assert res["data"]["added"][0]["email"] == user_email
assert res["data"]["added"][0]["role"] == "normal"
assert res["data"]["added"][0]["role"] == "invite" # Users are added with invite role initially
assert "failed" in res["data"]
assert len(res["data"]["failed"]) == 0
@ -118,6 +130,9 @@ class TestAddUsers:
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
) -> None:
"""Test adding a single user with admin role."""
if not test_users:
pytest.skip("No test users created")
tenant_id: str = test_team["id"]
user_email: str = test_users[0]["email"]
@ -126,23 +141,27 @@ class TestAddUsers:
}
res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload)
assert res["code"] == 0
assert res["code"] == 0, res
assert len(res["data"]["added"]) == 1
assert res["data"]["added"][0]["email"] == user_email
assert res["data"]["added"][0]["role"] == "admin"
assert res["data"]["added"][0]["role"] == "invite" # Users are added with invite role initially
assert res["data"]["added"][0]["intended_role"] == "admin" # Intended role after acceptance
@pytest.mark.p1
def test_add_multiple_users(
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
) -> None:
"""Test adding multiple users in bulk."""
if len(test_users) < 3:
pytest.skip("Need at least 3 test users")
tenant_id: str = test_team["id"]
user_emails: list[str] = [user["email"] for user in test_users[:3]]
add_payload: dict[str, list[str]] = {"users": user_emails}
res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload)
assert res["code"] == 0
assert res["code"] == 0, res
assert len(res["data"]["added"]) == 3
assert len(res["data"]["failed"]) == 0
added_emails = {user["email"] for user in res["data"]["added"]}
@ -153,6 +172,9 @@ class TestAddUsers:
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
) -> None:
"""Test adding users with mixed string and object formats."""
if len(test_users) < 3:
pytest.skip("Need at least 3 test users")
tenant_id: str = test_team["id"]
add_payload: dict[str, list[Any]] = {
@ -164,11 +186,14 @@ class TestAddUsers:
}
res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload)
assert res["code"] == 0
assert res["code"] == 0, res
assert len(res["data"]["added"]) == 3
assert res["data"]["added"][0]["role"] == "normal" # String format defaults to normal
assert res["data"]["added"][1]["role"] == "admin" # Object format with admin role
assert res["data"]["added"][2]["role"] == "normal" # String format defaults to normal
assert res["data"]["added"][0]["role"] == "invite" # String format defaults to invite role initially
assert res["data"]["added"][0].get("intended_role") == "normal" # Intended role after acceptance
assert res["data"]["added"][1]["role"] == "invite" # Object format - still invite initially
assert res["data"]["added"][1]["intended_role"] == "admin" # Intended role after acceptance
assert res["data"]["added"][2]["role"] == "invite" # String format defaults to invite role initially
assert res["data"]["added"][2].get("intended_role") == "normal" # Intended role after acceptance
@pytest.mark.p1
def test_add_user_unregistered_email(
@ -191,6 +216,9 @@ class TestAddUsers:
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
) -> None:
"""Test adding a user who is already a member of the team."""
if not test_users:
pytest.skip("No test users created")
tenant_id: str = test_team["id"]
user_email: str = test_users[0]["email"]
@ -201,16 +229,20 @@ class TestAddUsers:
# Try to add same user again
res2: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload)
assert res2["code"] == 0 # Returns success but with failed entry
assert len(res2["data"]["added"]) == 0
assert len(res2["data"]["failed"]) == 1
assert "already a member" in res2["data"]["failed"][0]["error"].lower()
assert res2["code"] == 0 # Returns success - invitation is resent
# API resends invitation instead of failing
assert len(res2["data"]["added"]) == 1
assert res2["data"]["added"][0]["email"] == user_email
assert res2["data"]["added"][0].get("status") == "invitation_resent" or "intended_role" in res2["data"]["added"][0]
@pytest.mark.p1
def test_add_users_partial_success(
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
) -> None:
"""Test adding users where some succeed and some fail."""
if len(test_users) < 2:
pytest.skip("Need at least 2 test users")
tenant_id: str = test_team["id"]
unregistered_email: str = f"unregistered_{uuid.uuid4().hex[:8]}@example.com"
@ -219,7 +251,7 @@ class TestAddUsers:
}
res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload)
assert res["code"] == 0
assert res["code"] == 0, res
assert len(res["data"]["added"]) == 2
assert len(res["data"]["failed"]) == 1
assert "not found" in res["data"]["failed"][0]["error"].lower()
@ -252,6 +284,9 @@ class TestAddUsers:
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
) -> None:
"""Test adding user with invalid role."""
if not test_users:
pytest.skip("No test users created")
tenant_id: str = test_team["id"]
user_email: str = test_users[0]["email"]
@ -260,218 +295,71 @@ class TestAddUsers:
}
res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload)
assert res["code"] == 0 # Returns success but with failed entry
# API returns error code when all users fail
assert res["code"] == 102 # DATA_ERROR
assert len(res["data"]["added"]) == 0
assert len(res["data"]["failed"]) == 1
assert "invalid role" in res["data"]["failed"][0]["error"].lower()
assert "invalid role" in res["data"]["failed"][0]["error"].lower() or "invalid" in res["data"]["failed"][0]["error"].lower()
@pytest.mark.p2
@pytest.mark.p1
def test_add_users_not_owner_or_admin(
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
) -> None:
"""Test that non-admin/non-owner users cannot add users."""
if not test_users:
pytest.skip("No test users created")
tenant_id: str = test_team["id"]
user_email: str = test_users[0]["email"]
other_user_email: str = test_users[1]["email"] if len(test_users) > 1 else None
if not other_user_email:
pytest.skip("Need at least 2 test users")
# Add a normal user to the team
add_payload: dict[str, list[str]] = {"users": [user_email]}
add_res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload)
assert add_res["code"] == 0
# Create auth for the normal user (this would require getting their token)
# For now, we'll test that owner/admin can add users
# This test would need the normal user's auth token to fully test
pass
# Small delay to ensure user is fully added
time.sleep(0.5)
# Login as the normal user
normal_user_auth: RAGFlowWebApiAuth = login_as_user(user_email, test_users[0]["password"])
@pytest.mark.p1
class TestRemoveUsers:
"""Comprehensive tests for removing users from a team."""
@pytest.fixture
def test_team(self, web_api_auth: RAGFlowWebApiAuth) -> dict[str, Any]:
"""Create a test team for use in tests."""
team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"}
res: dict[str, Any] = create_team(web_api_auth, team_payload)
assert res["code"] == 0
return res["data"]
@pytest.fixture
def test_users(self, web_api_auth: RAGFlowWebApiAuth) -> list[dict[str, Any]]:
"""Create test users for use in tests."""
users = []
for i in range(5):
email = f"testuser{i}_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
user_payload: dict[str, str] = {
"email": email,
"password": password,
"nickname": f"Test User {i}",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
if user_res["code"] == 0:
users.append({"email": email, "id": user_res["data"]["id"]})
return users
@pytest.fixture
def team_with_users(
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
) -> dict[str, Any]:
"""Create a team with users already added."""
tenant_id: str = test_team["id"]
user_emails: list[str] = [user["email"] for user in test_users[:3]]
add_payload: dict[str, list[str]] = {"users": user_emails}
add_res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload)
assert add_res["code"] == 0
return {
"team": test_team,
"users": test_users[:3],
}
@pytest.mark.p1
def test_remove_single_user(
self, web_api_auth: RAGFlowWebApiAuth, team_with_users: dict[str, Any]
) -> None:
"""Test removing a single user from a team."""
tenant_id: str = team_with_users["team"]["id"]
user_id: str = team_with_users["users"][0]["id"]
remove_payload: dict[str, list[str]] = {"user_ids": [user_id]}
res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload)
assert res["code"] == 0
assert "data" in res
assert "removed" in res["data"]
assert len(res["data"]["removed"]) == 1
assert res["data"]["removed"][0]["user_id"] == user_id
assert "failed" in res["data"]
assert len(res["data"]["failed"]) == 0
@pytest.mark.p1
def test_remove_multiple_users(
self, web_api_auth: RAGFlowWebApiAuth, team_with_users: dict[str, Any]
) -> None:
"""Test removing multiple users in bulk."""
tenant_id: str = team_with_users["team"]["id"]
user_ids: list[str] = [user["id"] for user in team_with_users["users"][:2]]
remove_payload: dict[str, list[str]] = {"user_ids": user_ids}
res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload)
assert res["code"] == 0
assert len(res["data"]["removed"]) == 2
assert len(res["data"]["failed"]) == 0
removed_ids = {user["user_id"] for user in res["data"]["removed"]}
assert removed_ids == set(user_ids)
@pytest.mark.p1
def test_remove_user_not_in_team(
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
) -> None:
"""Test removing a user who is not a member of the team."""
tenant_id: str = test_team["id"]
# Use a user that was not added to the team
user_id: str = test_users[3]["id"]
remove_payload: dict[str, list[str]] = {"user_ids": [user_id]}
res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload)
assert res["code"] == 0 # Returns success but with failed entry
assert len(res["data"]["removed"]) == 0
assert len(res["data"]["failed"]) == 1
assert "not a member" in res["data"]["failed"][0]["error"].lower()
@pytest.mark.p1
def test_remove_owner(
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any]
) -> None:
"""Test that owner cannot be removed."""
tenant_id: str = test_team["id"]
owner_id: str = test_team["owner_id"]
remove_payload: dict[str, list[str]] = {"user_ids": [owner_id]}
res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload)
assert res["code"] == 0 # Returns success but with failed entry
assert len(res["data"]["removed"]) == 0
assert len(res["data"]["failed"]) == 1
assert "owner" in res["data"]["failed"][0]["error"].lower()
@pytest.mark.p1
def test_remove_users_partial_success(
self, web_api_auth: RAGFlowWebApiAuth, team_with_users: dict[str, Any], test_users: list[dict[str, Any]]
) -> None:
"""Test removing users where some succeed and some fail."""
tenant_id: str = team_with_users["team"]["id"]
# Mix of valid and invalid user IDs
valid_user_id: str = team_with_users["users"][0]["id"]
invalid_user_id: str = test_users[3]["id"] # Not in team
remove_payload: dict[str, list[str]] = {"user_ids": [valid_user_id, invalid_user_id]}
res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload)
assert res["code"] == 0
assert len(res["data"]["removed"]) == 1
assert len(res["data"]["failed"]) == 1
assert res["data"]["removed"][0]["user_id"] == valid_user_id
assert "not a member" in res["data"]["failed"][0]["error"].lower()
@pytest.mark.p1
def test_remove_users_empty_list(
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any]
) -> None:
"""Test removing users with empty list."""
tenant_id: str = test_team["id"]
remove_payload: dict[str, list[str]] = {"user_ids": []}
res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload)
assert res["code"] == 101 # ARGUMENT_ERROR
assert "non-empty" in res["message"].lower() or "empty" in res["message"].lower()
@pytest.mark.p1
def test_remove_users_missing_user_ids_field(
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any]
) -> None:
"""Test removing users without 'user_ids' field."""
tenant_id: str = test_team["id"]
remove_payload: dict[str, Any] = {}
res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload)
assert res["code"] == 101 # ARGUMENT_ERROR
@pytest.mark.p1
def test_remove_users_invalid_user_id_format(
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any]
) -> None:
"""Test removing users with invalid user ID format."""
tenant_id: str = test_team["id"]
remove_payload: dict[str, list[Any]] = {"user_ids": [12345]} # Not a string
res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload)
assert res["code"] == 0 # Returns success but with failed entry
assert len(res["data"]["removed"]) == 0
assert len(res["data"]["failed"]) == 1
assert "invalid" in res["data"]["failed"][0]["error"].lower()
# Try to add another user (normal user should not be able to)
add_payload2: dict[str, list[str]] = {"users": [other_user_email]}
res: dict[str, Any] = add_users_to_team(normal_user_auth, tenant_id, add_payload2)
assert res["code"] == 108 # PERMISSION_ERROR
assert "owner" in res["message"].lower() or "admin" in res["message"].lower()
@pytest.mark.p2
def test_remove_last_admin(
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
def test_add_users_invalid_tenant_id(
self, web_api_auth: RAGFlowWebApiAuth, test_users: list[dict[str, Any]]
) -> None:
"""Test that the last admin cannot remove themselves."""
"""Test adding users to a non-existent team."""
if not test_users:
pytest.skip("No test users created")
invalid_tenant_id: str = f"invalid_{uuid.uuid4().hex[:8]}"
add_payload: dict[str, list[str]] = {"users": [test_users[0]["email"]]}
res: dict[str, Any] = add_users_to_team(web_api_auth, invalid_tenant_id, add_payload)
assert res["code"] != 0
assert "not found" in res["message"].lower() or res["code"] in [100, 102, 108]
@pytest.mark.p2
def test_add_users_invalid_email_format(
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any]
) -> None:
"""Test adding users with invalid email format."""
tenant_id: str = test_team["id"]
user_email: str = test_users[0]["email"]
invalid_email: str = "not_an_email"
# Add user as admin
add_payload: dict[str, list[dict[str, str]]] = {
"users": [{"email": user_email, "role": "admin"}]
}
add_res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload)
assert add_res["code"] == 0
admin_user_id: str = add_res["data"]["added"][0]["id"]
add_payload: dict[str, list[str]] = {"users": [invalid_email]}
res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload)
# Try to remove the admin (would need admin's auth token to fully test)
# This test would require the admin user's authentication
pass
# Should fail - either validation error or user not found
assert res["code"] != 0
assert len(res["data"]["added"]) == 0

View file

@ -252,3 +252,297 @@ class TestTeamCreate:
# Should succeed if unicode is supported
assert res["code"] in (0, 101)
@pytest.mark.p2
def test_team_creation_with_custom_models(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test creating team with custom model configurations."""
team_name: str = f"Custom Models Team {uuid.uuid4().hex[:8]}"
# Attempt to create team with custom models
# Note: Model IDs need to be valid and added by the user
team_payload: dict[str, str] = {
"name": team_name,
# These would need to be actual model IDs added by the user
# "llm_id": "custom_llm_id",
# "embd_id": "custom_embd_id",
}
res: dict[str, Any] = create_team(web_api_auth, team_payload)
# Should succeed with defaults if custom models not specified
assert res["code"] == 0, res
assert res["data"]["name"] == team_name
assert "id" in res["data"]
@pytest.mark.p2
def test_multiple_teams_same_name_allowed(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test that multiple teams can have the same name."""
team_name: str = f"Duplicate Name {uuid.uuid4().hex[:8]}"
# Create first team
res1: dict[str, Any] = create_team(web_api_auth, {"name": team_name})
assert res1["code"] == 0, res1
team_id_1: str = res1["data"]["id"]
# Create second team with same name
res2: dict[str, Any] = create_team(web_api_auth, {"name": team_name})
assert res2["code"] == 0, res2
team_id_2: str = res2["data"]["id"]
# Teams should have different IDs
assert team_id_1 != team_id_2, "Teams should have unique IDs"
assert res1["data"]["name"] == res2["data"]["name"], (
"Both teams should have the same name"
)
@pytest.mark.p2
def test_team_creation_with_credit_limit(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test creating team with custom credit limit."""
team_name: str = f"Credit Test Team {uuid.uuid4().hex[:8]}"
custom_credit: int = 1000
team_payload: dict[str, Any] = {
"name": team_name,
"credit": custom_credit,
}
res: dict[str, Any] = create_team(web_api_auth, team_payload)
# Should succeed
assert res["code"] == 0, res
# Note: Credit may not be in response, but should be set internally
@pytest.mark.p2
def test_team_name_with_special_characters(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test team names with special characters."""
special_names: list[str] = [
f"Team-{uuid.uuid4().hex[:4]}_Test!",
f"Team & Co. {uuid.uuid4().hex[:4]}",
f"Team @{uuid.uuid4().hex[:4]}",
f"团队{uuid.uuid4().hex[:4]}", # Unicode
]
for name in special_names:
res: dict[str, Any] = create_team(web_api_auth, {"name": name})
# Should either accept or reject with clear message
if res["code"] == 0:
assert res["data"]["name"] == name, (
f"Team name should be preserved: {name}"
)
# If rejected, should have clear error
# (Current implementation accepts special chars)
@pytest.mark.p2
def test_team_creation_default_owner(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test that team creator is set as owner by default."""
team_name: str = f"Owner Test Team {uuid.uuid4().hex[:8]}"
res: dict[str, Any] = create_team(web_api_auth, {"name": team_name})
assert res["code"] == 0, res
assert "owner_id" in res["data"], "Owner ID should be in response"
# Owner should be the authenticated user
# (Cannot verify without knowing web_api_auth user ID)
@pytest.mark.p2
def test_concurrent_team_creation(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test concurrent team creation."""
import concurrent.futures
def create_test_team(index: int) -> dict[str, Any]:
team_name: str = f"Concurrent Team {index}_{uuid.uuid4().hex[:8]}"
return create_team(web_api_auth, {"name": team_name})
# Create 10 teams concurrently
count: int = 10
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures: list[concurrent.futures.Future[dict[str, Any]]] = [
executor.submit(create_test_team, i) for i in range(count)
]
results: list[dict[str, Any]] = [
f.result() for f in concurrent.futures.as_completed(futures)
]
# All should succeed
success_count: int = sum(1 for r in results if r["code"] == 0)
assert success_count == count, (
f"Expected {count} successful team creations, got {success_count}"
)
# All should have unique IDs
team_ids: list[str] = [r["data"]["id"] for r in results if r["code"] == 0]
assert len(team_ids) == len(set(team_ids)), (
"All team IDs should be unique"
)
@pytest.mark.p2
def test_team_with_invalid_model_id(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test team creation with invalid model ID."""
team_name: str = f"Invalid Model Team {uuid.uuid4().hex[:8]}"
team_payload: dict[str, str] = {
"name": team_name,
"llm_id": "invalid_nonexistent_model_id_12345",
}
res: dict[str, Any] = create_team(web_api_auth, team_payload)
# Should reject with clear error message
assert res["code"] != 0, "Invalid model ID should be rejected"
assert (
"model" in res["message"].lower()
or "not found" in res["message"].lower()
or "invalid" in res["message"].lower()
), "Error message should mention model issue"
@pytest.mark.p2
def test_team_creation_with_negative_credit(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test team creation with negative credit."""
team_name: str = f"Negative Credit Team {uuid.uuid4().hex[:8]}"
team_payload: dict[str, Any] = {
"name": team_name,
"credit": -100,
}
res: dict[str, Any] = create_team(web_api_auth, team_payload)
# Should reject negative credit
assert res["code"] != 0, "Negative credit should be rejected"
assert "credit" in res["message"].lower(), (
"Error message should mention credit"
)
@pytest.mark.p2
def test_team_creation_empty_json_payload(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test team creation with completely empty payload."""
res: dict[str, Any] = create_team(web_api_auth, {})
# Should reject with clear error
assert res["code"] != 0, "Empty payload should be rejected"
assert (
"name" in res["message"].lower()
or "required" in res["message"].lower()
), "Error should mention missing name"
@pytest.mark.p3
def test_team_unicode_name(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test team creation with full unicode name."""
unicode_names: list[str] = [
f"团队{uuid.uuid4().hex[:4]}", # Chinese
f"チーム{uuid.uuid4().hex[:4]}", # Japanese
f"Команда{uuid.uuid4().hex[:4]}", # Russian
f"فريق{uuid.uuid4().hex[:4]}", # Arabic (RTL)
f"😀🎉{uuid.uuid4().hex[:4]}", # Emoji
]
for name in unicode_names:
res: dict[str, Any] = create_team(web_api_auth, {"name": name})
# Should handle unicode properly
if res["code"] == 0:
# Verify unicode is preserved (may be normalized)
assert len(res["data"]["name"]) > 0, (
"Team name should not be empty after unicode"
)
@pytest.mark.p3
def test_team_creation_with_all_optional_params(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test team creation with all optional parameters."""
team_name: str = f"Full Params Team {uuid.uuid4().hex[:8]}"
team_payload: dict[str, Any] = {
"name": team_name,
"credit": 2000,
# Note: Model IDs would need to be valid
# "llm_id": "valid_llm_id",
# "embd_id": "valid_embd_id",
# "asr_id": "valid_asr_id",
# "parser_ids": "valid_parser_ids",
# "img2txt_id": "valid_img2txt_id",
# "rerank_id": "valid_rerank_id",
}
res: dict[str, Any] = create_team(web_api_auth, team_payload)
# Should succeed
assert res["code"] == 0, res
assert res["data"]["name"] == team_name
@pytest.mark.p3
def test_team_max_name_length(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test team with maximum allowed name length."""
# API spec says max 100 characters
max_name: str = "A" * 100
res: dict[str, Any] = create_team(web_api_auth, {"name": max_name})
# Should accept 100 characters
assert res["code"] == 0, "100-character name should be accepted"
assert res["data"]["name"] == max_name
@pytest.mark.p3
def test_team_name_just_over_limit(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test team with name just over limit."""
# 101 characters (1 over limit)
long_name: str = "A" * 101
res: dict[str, Any] = create_team(web_api_auth, {"name": long_name})
# Should reject
assert res["code"] != 0, "101-character name should be rejected"
assert (
"100" in res["message"]
or "length" in res["message"].lower()
or "long" in res["message"].lower()
), "Error should mention length limit"
@pytest.mark.p3
def test_team_creation_idempotency(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test that repeated team creation creates separate teams."""
team_name: str = f"Idempotency Test {uuid.uuid4().hex[:8]}"
payload: dict[str, str] = {"name": team_name}
# Create same team twice
res1: dict[str, Any] = create_team(web_api_auth, payload)
res2: dict[str, Any] = create_team(web_api_auth, payload)
# Both should succeed and create different teams
assert res1["code"] == 0, res1
assert res2["code"] == 0, res2
assert res1["data"]["id"] != res2["data"]["id"], (
"Should create different teams, not be idempotent"
)
@pytest.mark.p3
def test_team_with_parser_ids(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test team creation with custom parser IDs."""
team_name: str = f"Parser Test {uuid.uuid4().hex[:8]}"
# parser_ids is typically a comma-separated string
team_payload: dict[str, str] = {
"name": team_name,
"parser_ids": "naive,qa,table,paper,book,laws,presentation,manual,wiki",
}
res: dict[str, Any] = create_team(web_api_auth, team_payload)
# Should accept valid parser IDs
assert res["code"] == 0, res

View file

@ -0,0 +1,363 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations
import time
import uuid
from typing import Any
import pytest
from common import (
add_users_to_team,
create_team,
create_user,
encrypt_password,
login_as_user,
reject_team_invitation,
)
from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowWebApiAuth
# ---------------------------------------------------------------------------
# Test Classes
# ---------------------------------------------------------------------------
@pytest.mark.p1
class TestAuthorization:
"""Tests for authentication behavior when rejecting team invitations."""
@pytest.mark.parametrize(
("invalid_auth", "expected_code", "expected_message"),
[
(None, 401, "Unauthorized"),
(RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "Unauthorized"),
],
)
def test_invalid_auth(
self,
invalid_auth: RAGFlowWebApiAuth | None,
expected_code: int,
expected_message: str,
web_api_auth: RAGFlowWebApiAuth,
) -> None:
"""Test rejecting invitation with invalid or missing authentication."""
# Create a team and send invitation first
team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"}
team_res: dict[str, Any] = create_team(web_api_auth, team_payload)
if team_res["code"] != 0:
pytest.skip("Team creation failed, skipping auth test")
tenant_id: str = team_res["data"]["id"]
# Create and invite a user
email = f"testuser_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
encrypted_password = encrypt_password(password)
user_payload: dict[str, str] = {
"email": email,
"password": encrypted_password,
"nickname": "Test User",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
if user_res["code"] != 0:
pytest.skip("User creation failed, skipping auth test")
add_payload: dict[str, list[str]] = {"users": [email]}
add_users_to_team(web_api_auth, tenant_id, add_payload)
# Try to reject invitation with invalid auth
res: dict[str, Any] = reject_team_invitation(invalid_auth, tenant_id)
assert res["code"] == expected_code, res
if expected_message:
assert expected_message in res["message"]
@pytest.mark.p1
class TestRejectInvite:
"""Comprehensive tests for rejecting team invitations."""
@pytest.fixture
def test_team(self, web_api_auth: RAGFlowWebApiAuth) -> dict[str, Any]:
"""Create a test team for use in tests."""
team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"}
res: dict[str, Any] = create_team(web_api_auth, team_payload)
assert res["code"] == 0
return res["data"]
@pytest.fixture
def invited_user(self, web_api_auth: RAGFlowWebApiAuth) -> dict[str, Any]:
"""Create a test user who will be invited."""
email = f"inviteduser_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
encrypted_password = encrypt_password(password)
user_payload: dict[str, str] = {
"email": email,
"password": encrypted_password,
"nickname": "Invited User",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
assert user_res["code"] == 0
return {
"email": email,
"id": user_res["data"]["id"],
"password": password,
}
@pytest.fixture
def team_with_invitation(
self,
web_api_auth: RAGFlowWebApiAuth,
test_team: dict[str, Any],
invited_user: dict[str, Any],
) -> dict[str, Any]:
"""Create a team and send invitation to a user."""
tenant_id: str = test_team["id"]
add_payload: dict[str, list[str]] = {"users": [invited_user["email"]]}
add_res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload)
assert add_res["code"] == 0
return {
"team": test_team,
"invited_user": invited_user,
}
@pytest.mark.p1
def test_reject_invitation_success(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_invitation: dict[str, Any],
) -> None:
"""Test successfully rejecting an invitation."""
tenant_id: str = team_with_invitation["team"]["id"]
invited_user: dict[str, Any] = team_with_invitation["invited_user"]
# Login as the invited user
user_auth: RAGFlowWebApiAuth = login_as_user(invited_user["email"], invited_user["password"])
# Reject the invitation
res: dict[str, Any] = reject_team_invitation(user_auth, tenant_id)
assert res["code"] == 0, res
assert res["data"] is True
assert "rejected" in res["message"].lower() or "successfully" in res["message"].lower()
@pytest.mark.p1
def test_reject_invitation_no_invitation(
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any]
) -> None:
"""Test rejecting an invitation when no invitation exists."""
# Create a user who is not invited
email = f"notinvited_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
encrypted_password = encrypt_password(password)
user_payload: dict[str, str] = {
"email": email,
"password": encrypted_password,
"nickname": "Not Invited User",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
assert user_res["code"] == 0
# Login as the user
user_auth: RAGFlowWebApiAuth = login_as_user(email, password)
# Try to reject invitation for a team they're not invited to
tenant_id: str = test_team["id"]
res: dict[str, Any] = reject_team_invitation(user_auth, tenant_id)
assert res["code"] != 0
assert "not found" in res["message"].lower() or "invitation" in res["message"].lower()
@pytest.mark.p1
def test_reject_invitation_already_rejected(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_invitation: dict[str, Any],
) -> None:
"""Test rejecting an invitation that has already been rejected."""
tenant_id: str = team_with_invitation["team"]["id"]
invited_user: dict[str, Any] = team_with_invitation["invited_user"]
# Login as the invited user
user_auth: RAGFlowWebApiAuth = login_as_user(invited_user["email"], invited_user["password"])
# Reject the invitation first time
res1: dict[str, Any] = reject_team_invitation(user_auth, tenant_id)
assert res1["code"] == 0
# Try to reject again (should fail - invitation no longer exists)
res2: dict[str, Any] = reject_team_invitation(user_auth, tenant_id)
assert res2["code"] != 0
assert "not found" in res2["message"].lower() or "invitation" in res2["message"].lower()
@pytest.mark.p1
def test_reject_invitation_after_accepted(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_invitation: dict[str, Any],
) -> None:
"""Test rejecting an invitation that has already been accepted."""
from common import accept_team_invitation
tenant_id: str = team_with_invitation["team"]["id"]
invited_user: dict[str, Any] = team_with_invitation["invited_user"]
# Login as the invited user
user_auth: RAGFlowWebApiAuth = login_as_user(invited_user["email"], invited_user["password"])
# Accept the invitation first
accept_res: dict[str, Any] = accept_team_invitation(user_auth, tenant_id)
assert accept_res["code"] == 0
# Try to reject after accepting (should fail - no longer has INVITE role)
res: dict[str, Any] = reject_team_invitation(user_auth, tenant_id)
assert res["code"] != 0
assert "invite" in res["message"].lower() or "role" in res["message"].lower() or "not found" in res["message"].lower()
@pytest.mark.p1
def test_reject_invitation_invalid_tenant_id(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_invitation: dict[str, Any],
) -> None:
"""Test rejecting an invitation with invalid team ID."""
invited_user: dict[str, Any] = team_with_invitation["invited_user"]
# Login as the invited user
user_auth: RAGFlowWebApiAuth = login_as_user(invited_user["email"], invited_user["password"])
# Try to reject invitation for non-existent team
invalid_tenant_id: str = f"invalid_{uuid.uuid4().hex[:8]}"
res: dict[str, Any] = reject_team_invitation(user_auth, invalid_tenant_id)
assert res["code"] != 0
assert "not found" in res["message"].lower() or "invitation" in res["message"].lower()
@pytest.mark.p1
def test_reject_invitation_response_structure(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_invitation: dict[str, Any],
) -> None:
"""Test that rejecting invitation returns the expected response structure."""
tenant_id: str = team_with_invitation["team"]["id"]
invited_user: dict[str, Any] = team_with_invitation["invited_user"]
# Login as the invited user
user_auth: RAGFlowWebApiAuth = login_as_user(invited_user["email"], invited_user["password"])
# Reject the invitation
res: dict[str, Any] = reject_team_invitation(user_auth, tenant_id)
assert res["code"] == 0
assert "data" in res
assert res["data"] is True
assert "message" in res
assert isinstance(res["message"], str)
assert "successfully" in res["message"].lower() or "rejected" in res["message"].lower()
@pytest.mark.p1
def test_reject_invitation_removes_relationship(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_invitation: dict[str, Any],
) -> None:
"""Test that rejecting an invitation removes the user-tenant relationship."""
tenant_id: str = team_with_invitation["team"]["id"]
invited_user: dict[str, Any] = team_with_invitation["invited_user"]
# Login as the invited user
user_auth: RAGFlowWebApiAuth = login_as_user(invited_user["email"], invited_user["password"])
# Reject the invitation
res: dict[str, Any] = reject_team_invitation(user_auth, tenant_id)
assert res["code"] == 0
# Try to reject again (should fail - relationship removed)
res2: dict[str, Any] = reject_team_invitation(user_auth, tenant_id)
assert res2["code"] != 0
assert "not found" in res2["message"].lower() or "invitation" in res2["message"].lower()
@pytest.mark.p2
def test_reject_invitation_wrong_user(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_invitation: dict[str, Any],
) -> None:
"""Test that a user cannot reject another user's invitation."""
# Create another user who is not invited
email = f"otheruser_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
encrypted_password = encrypt_password(password)
user_payload: dict[str, str] = {
"email": email,
"password": encrypted_password,
"nickname": "Other User",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
assert user_res["code"] == 0
# Login as the other user
other_user_auth: RAGFlowWebApiAuth = login_as_user(email, password)
# Try to reject invitation meant for another user
tenant_id: str = team_with_invitation["team"]["id"]
res: dict[str, Any] = reject_team_invitation(other_user_auth, tenant_id)
assert res["code"] != 0
assert "not found" in res["message"].lower() or "invitation" in res["message"].lower()
@pytest.mark.p2
def test_reject_invitation_multiple_invitations(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test rejecting invitations to multiple teams."""
# Create two teams
team1_payload: dict[str, str] = {"name": f"Team 1 {uuid.uuid4().hex[:8]}"}
team1_res: dict[str, Any] = create_team(web_api_auth, team1_payload)
assert team1_res["code"] == 0
tenant_id_1: str = team1_res["data"]["id"]
team2_payload: dict[str, str] = {"name": f"Team 2 {uuid.uuid4().hex[:8]}"}
team2_res: dict[str, Any] = create_team(web_api_auth, team2_payload)
assert team2_res["code"] == 0
tenant_id_2: str = team2_res["data"]["id"]
# Create and invite a user to both teams
email = f"multiuser_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
encrypted_password = encrypt_password(password)
user_payload: dict[str, str] = {
"email": email,
"password": encrypted_password,
"nickname": "Multi User",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
assert user_res["code"] == 0
# Invite to both teams
add_payload1: dict[str, list[str]] = {"users": [email]}
add_users_to_team(web_api_auth, tenant_id_1, add_payload1)
add_payload2: dict[str, list[str]] = {"users": [email]}
add_users_to_team(web_api_auth, tenant_id_2, add_payload2)
# Login as the user
user_auth: RAGFlowWebApiAuth = login_as_user(email, password)
# Reject both invitations
res1: dict[str, Any] = reject_team_invitation(user_auth, tenant_id_1)
assert res1["code"] == 0
res2: dict[str, Any] = reject_team_invitation(user_auth, tenant_id_2)
assert res2["code"] == 0

View file

@ -0,0 +1,376 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations
import time
import uuid
from typing import Any
import pytest
from common import (
add_users_to_team,
create_team,
create_user,
encrypt_password,
login_as_user,
remove_users_from_team,
)
from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowWebApiAuth
# ---------------------------------------------------------------------------
# Test Classes
# ---------------------------------------------------------------------------
@pytest.mark.p1
class TestAuthorization:
"""Tests for authentication behavior when removing users from a team."""
@pytest.mark.parametrize(
("invalid_auth", "expected_code", "expected_message"),
[
(None, 401, "Unauthorized"),
(RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "Unauthorized"),
],
)
def test_invalid_auth(
self,
invalid_auth: RAGFlowWebApiAuth | None,
expected_code: int,
expected_message: str,
web_api_auth: RAGFlowWebApiAuth,
) -> None:
"""Test removing users with invalid or missing authentication."""
# Create a team and add a user first
team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"}
team_res: dict[str, Any] = create_team(web_api_auth, team_payload)
if team_res["code"] != 0:
pytest.skip("Team creation failed, skipping auth test")
tenant_id: str = team_res["data"]["id"]
# Create and add a user
email = f"testuser_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
encrypted_password = encrypt_password(password)
user_payload: dict[str, str] = {
"email": email,
"password": encrypted_password,
"nickname": "Test User",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
if user_res["code"] != 0:
pytest.skip("User creation failed, skipping auth test")
user_id: str = user_res["data"]["id"]
add_payload: dict[str, list[str]] = {"users": [email]}
add_users_to_team(web_api_auth, tenant_id, add_payload)
# Try to remove user with invalid auth
remove_payload: dict[str, list[str]] = {"user_ids": [user_id]}
res: dict[str, Any] = remove_users_from_team(invalid_auth, tenant_id, remove_payload)
assert res["code"] == expected_code, res
if expected_message:
assert expected_message in res["message"]
@pytest.mark.p1
class TestRemoveUsers:
"""Comprehensive tests for removing users from a team."""
@pytest.fixture
def test_team(self, web_api_auth: RAGFlowWebApiAuth) -> dict[str, Any]:
"""Create a test team for use in tests."""
team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"}
res: dict[str, Any] = create_team(web_api_auth, team_payload)
assert res["code"] == 0
return res["data"]
@pytest.fixture
def test_users(self, web_api_auth: RAGFlowWebApiAuth) -> list[dict[str, Any]]:
"""Create test users for use in tests."""
users = []
for i in range(5):
email = f"testuser{i}_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
encrypted_password = encrypt_password(password)
user_payload: dict[str, str] = {
"email": email,
"password": encrypted_password,
"nickname": f"Test User {i}",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
if user_res["code"] == 0:
users.append({"email": email, "id": user_res["data"]["id"], "password": password})
return users
@pytest.fixture
def team_with_users(
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
) -> dict[str, Any]:
"""Create a team with users already added."""
if not test_users:
return {"team": test_team, "users": []}
tenant_id: str = test_team["id"]
user_emails: list[str] = [user["email"] for user in test_users[:3]]
add_payload: dict[str, list[str]] = {"users": user_emails}
add_res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload)
assert add_res["code"] == 0
return {
"team": test_team,
"users": test_users[:3],
}
@pytest.mark.p1
def test_remove_single_user(
self, web_api_auth: RAGFlowWebApiAuth, team_with_users: dict[str, Any]
) -> None:
"""Test removing a single user from a team."""
if not team_with_users["users"]:
pytest.skip("No users in team")
tenant_id: str = team_with_users["team"]["id"]
user_id: str = team_with_users["users"][0]["id"]
remove_payload: dict[str, list[str]] = {"user_ids": [user_id]}
res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload)
assert res["code"] == 0, res
assert "data" in res
assert "removed" in res["data"]
assert len(res["data"]["removed"]) == 1
assert res["data"]["removed"][0]["user_id"] == user_id
assert "failed" in res["data"]
assert len(res["data"]["failed"]) == 0
@pytest.mark.p1
def test_remove_multiple_users(
self, web_api_auth: RAGFlowWebApiAuth, team_with_users: dict[str, Any]
) -> None:
"""Test removing multiple users in bulk."""
if len(team_with_users["users"]) < 2:
pytest.skip("Need at least 2 users in team")
tenant_id: str = team_with_users["team"]["id"]
user_ids: list[str] = [user["id"] for user in team_with_users["users"][:2]]
remove_payload: dict[str, list[str]] = {"user_ids": user_ids}
res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload)
assert res["code"] == 0, res
assert len(res["data"]["removed"]) == 2
assert len(res["data"]["failed"]) == 0
removed_ids = {user["user_id"] for user in res["data"]["removed"]}
assert removed_ids == set(user_ids)
@pytest.mark.p1
def test_remove_user_not_in_team(
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
) -> None:
"""Test removing a user who is not a member of the team."""
if len(test_users) < 4:
pytest.skip("Need at least 4 test users")
tenant_id: str = test_team["id"]
# Use a user that was not added to the team
user_id: str = test_users[3]["id"]
remove_payload: dict[str, list[str]] = {"user_ids": [user_id]}
res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload)
# API returns error code when all removals fail
assert res["code"] == 102 # DATA_ERROR
assert len(res["data"]["removed"]) == 0
assert len(res["data"]["failed"]) == 1
assert "not a member" in res["data"]["failed"][0]["error"].lower()
@pytest.mark.p1
def test_remove_owner(
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any]
) -> None:
"""Test that owner cannot be removed."""
tenant_id: str = test_team["id"]
owner_id: str = test_team["owner_id"]
remove_payload: dict[str, list[str]] = {"user_ids": [owner_id]}
res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload)
# API returns error code when all removals fail
assert res["code"] == 102 # DATA_ERROR
assert len(res["data"]["removed"]) == 0
assert len(res["data"]["failed"]) == 1
assert "owner" in res["data"]["failed"][0]["error"].lower()
@pytest.mark.p1
def test_remove_users_partial_success(
self, web_api_auth: RAGFlowWebApiAuth, team_with_users: dict[str, Any], test_users: list[dict[str, Any]]
) -> None:
"""Test removing users where some succeed and some fail."""
if not team_with_users["users"] or len(test_users) < 4:
pytest.skip("Need users in team and at least 4 test users")
tenant_id: str = team_with_users["team"]["id"]
# Mix of valid and invalid user IDs
valid_user_id: str = team_with_users["users"][0]["id"]
invalid_user_id: str = test_users[3]["id"] # Not in team
remove_payload: dict[str, list[str]] = {"user_ids": [valid_user_id, invalid_user_id]}
res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload)
assert res["code"] == 0, res
assert len(res["data"]["removed"]) == 1
assert len(res["data"]["failed"]) == 1
assert res["data"]["removed"][0]["user_id"] == valid_user_id
assert "not a member" in res["data"]["failed"][0]["error"].lower()
@pytest.mark.p1
def test_remove_users_empty_list(
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any]
) -> None:
"""Test removing users with empty list."""
tenant_id: str = test_team["id"]
remove_payload: dict[str, list[str]] = {"user_ids": []}
res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload)
assert res["code"] == 101 # ARGUMENT_ERROR
assert "non-empty" in res["message"].lower() or "empty" in res["message"].lower()
@pytest.mark.p1
def test_remove_users_missing_user_ids_field(
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any]
) -> None:
"""Test removing users without 'user_ids' field."""
tenant_id: str = test_team["id"]
remove_payload: dict[str, Any] = {}
res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload)
assert res["code"] == 101 # ARGUMENT_ERROR
@pytest.mark.p1
def test_remove_users_invalid_user_id_format(
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any]
) -> None:
"""Test removing users with invalid user ID format."""
tenant_id: str = test_team["id"]
remove_payload: dict[str, list[Any]] = {"user_ids": [12345]} # Not a string
res: dict[str, Any] = remove_users_from_team(web_api_auth, tenant_id, remove_payload)
# API returns error code when all removals fail
assert res["code"] == 102 # DATA_ERROR
assert len(res["data"]["removed"]) == 0
assert len(res["data"]["failed"]) == 1
assert "invalid" in res["data"]["failed"][0]["error"].lower()
@pytest.mark.p1
def test_remove_users_not_owner_or_admin(
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
) -> None:
"""Test that non-admin/non-owner users cannot remove users."""
if len(test_users) < 2:
pytest.skip("Need at least 2 test users")
tenant_id: str = test_team["id"]
user_email: str = test_users[0]["email"]
other_user_email: str = test_users[1]["email"]
# Add two users to the team
add_payload: dict[str, list[str]] = {"users": [user_email, other_user_email]}
add_res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload)
assert add_res["code"] == 0
# Small delay to ensure users are fully added
time.sleep(0.5)
# Login as the first normal user
normal_user_auth: RAGFlowWebApiAuth = login_as_user(user_email, test_users[0]["password"])
# Try to remove the other user (normal user should not be able to)
other_user_id: str = test_users[1]["id"]
remove_payload: dict[str, list[str]] = {"user_ids": [other_user_id]}
res: dict[str, Any] = remove_users_from_team(normal_user_auth, tenant_id, remove_payload)
assert res["code"] == 108 # PERMISSION_ERROR
assert "owner" in res["message"].lower() or "admin" in res["message"].lower()
@pytest.mark.p2
def test_remove_last_admin(
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any], test_users: list[dict[str, Any]]
) -> None:
"""Test that the last admin cannot remove themselves."""
if not test_users:
pytest.skip("No test users created")
from common import accept_team_invitation
tenant_id: str = test_team["id"]
user_email: str = test_users[0]["email"]
# Add user as admin
add_payload: dict[str, list[dict[str, str]]] = {
"users": [{"email": user_email, "role": "admin"}]
}
add_res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload)
assert add_res["code"] == 0
# Small delay
time.sleep(0.5)
# Login as the admin
admin_auth: RAGFlowWebApiAuth = login_as_user(user_email, test_users[0]["password"])
# Accept the invitation to become admin
accept_res: dict[str, Any] = accept_team_invitation(admin_auth, tenant_id, role="admin")
assert accept_res["code"] == 0
# Small delay to ensure role is updated
time.sleep(0.5)
admin_user_id: str = test_users[0]["id"]
# Try to remove the admin (should fail - last admin cannot remove themselves)
remove_payload: dict[str, list[str]] = {"user_ids": [admin_user_id]}
res: dict[str, Any] = remove_users_from_team(admin_auth, tenant_id, remove_payload)
# API may return error code when all removals fail, or permission error if role not updated
assert res["code"] in [102, 108] # DATA_ERROR or PERMISSION_ERROR
if res["code"] == 102:
# If we get DATA_ERROR, check the failed entry
assert len(res["data"]["removed"]) == 0
assert len(res["data"]["failed"]) == 1
assert "cannot remove yourself" in res["data"]["failed"][0]["error"].lower() or "at least one" in res["data"]["failed"][0]["error"].lower()
else:
# If we get PERMISSION_ERROR, the user might not have admin role yet
assert "owner" in res["message"].lower() or "admin" in res["message"].lower()
@pytest.mark.p2
def test_remove_users_invalid_tenant_id(
self, web_api_auth: RAGFlowWebApiAuth, test_users: list[dict[str, Any]]
) -> None:
"""Test removing users from a non-existent team."""
if not test_users:
pytest.skip("No test users created")
invalid_tenant_id: str = f"invalid_{uuid.uuid4().hex[:8]}"
remove_payload: dict[str, list[str]] = {"user_ids": [test_users[0]["id"]]}
res: dict[str, Any] = remove_users_from_team(web_api_auth, invalid_tenant_id, remove_payload)
assert res["code"] != 0
assert "not found" in res["message"].lower() or res["code"] in [100, 102, 108]

View file

@ -1,337 +0,0 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Advanced team management tests."""
from __future__ import annotations
import uuid
from typing import Any
import pytest
from common import create_team, create_user
from libs.auth import RAGFlowWebApiAuth
@pytest.mark.p2
class TestTeamAdvanced:
"""Advanced team management tests."""
def test_team_creation_with_custom_models(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test creating team with custom model configurations."""
team_name: str = f"Custom Models Team {uuid.uuid4().hex[:8]}"
# Attempt to create team with custom models
# Note: Model IDs need to be valid and added by the user
team_payload: dict[str, str] = {
"name": team_name,
# These would need to be actual model IDs added by the user
# "llm_id": "custom_llm_id",
# "embd_id": "custom_embd_id",
}
res: dict[str, Any] = create_team(web_api_auth, team_payload)
# Should succeed with defaults if custom models not specified
assert res["code"] == 0, res
assert res["data"]["name"] == team_name
assert "id" in res["data"]
def test_team_creation_response_structure(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test that team creation returns complete response structure."""
team_name: str = f"Structure Test Team {uuid.uuid4().hex[:8]}"
team_payload: dict[str, str] = {"name": team_name}
res: dict[str, Any] = create_team(web_api_auth, team_payload)
assert res["code"] == 0, res
assert "data" in res
# Check required fields
required_fields: list[str] = ["id", "name", "owner_id"]
for field in required_fields:
assert field in res["data"], (
f"Missing required field: {field}"
)
assert res["data"]["name"] == team_name
assert len(res["data"]["id"]) > 0, "Team ID should not be empty"
assert len(res["data"]["owner_id"]) > 0, "Owner ID should not be empty"
def test_multiple_teams_same_name_allowed(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test that multiple teams can have the same name."""
team_name: str = f"Duplicate Name {uuid.uuid4().hex[:8]}"
# Create first team
res1: dict[str, Any] = create_team(web_api_auth, {"name": team_name})
assert res1["code"] == 0, res1
team_id_1: str = res1["data"]["id"]
# Create second team with same name
res2: dict[str, Any] = create_team(web_api_auth, {"name": team_name})
assert res2["code"] == 0, res2
team_id_2: str = res2["data"]["id"]
# Teams should have different IDs
assert team_id_1 != team_id_2, "Teams should have unique IDs"
assert res1["data"]["name"] == res2["data"]["name"], (
"Both teams should have the same name"
)
def test_team_creation_with_credit_limit(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test creating team with custom credit limit."""
team_name: str = f"Credit Test Team {uuid.uuid4().hex[:8]}"
custom_credit: int = 1000
team_payload: dict[str, Any] = {
"name": team_name,
"credit": custom_credit,
}
res: dict[str, Any] = create_team(web_api_auth, team_payload)
# Should succeed
assert res["code"] == 0, res
# Note: Credit may not be in response, but should be set internally
def test_team_name_with_special_characters(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test team names with special characters."""
special_names: list[str] = [
f"Team-{uuid.uuid4().hex[:4]}_Test!",
f"Team & Co. {uuid.uuid4().hex[:4]}",
f"Team @{uuid.uuid4().hex[:4]}",
f"团队{uuid.uuid4().hex[:4]}", # Unicode
]
for name in special_names:
res: dict[str, Any] = create_team(web_api_auth, {"name": name})
# Should either accept or reject with clear message
if res["code"] == 0:
assert res["data"]["name"] == name, (
f"Team name should be preserved: {name}"
)
# If rejected, should have clear error
# (Current implementation accepts special chars)
def test_team_creation_default_owner(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test that team creator is set as owner by default."""
team_name: str = f"Owner Test Team {uuid.uuid4().hex[:8]}"
res: dict[str, Any] = create_team(web_api_auth, {"name": team_name})
assert res["code"] == 0, res
assert "owner_id" in res["data"], "Owner ID should be in response"
# Owner should be the authenticated user
# (Cannot verify without knowing web_api_auth user ID)
def test_concurrent_team_creation(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test concurrent team creation."""
import concurrent.futures
def create_test_team(index: int) -> dict[str, Any]:
team_name: str = f"Concurrent Team {index}_{uuid.uuid4().hex[:8]}"
return create_team(web_api_auth, {"name": team_name})
# Create 10 teams concurrently
count: int = 10
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures: list[concurrent.futures.Future[dict[str, Any]]] = [
executor.submit(create_test_team, i) for i in range(count)
]
results: list[dict[str, Any]] = [
f.result() for f in concurrent.futures.as_completed(futures)
]
# All should succeed
success_count: int = sum(1 for r in results if r["code"] == 0)
assert success_count == count, (
f"Expected {count} successful team creations, got {success_count}"
)
# All should have unique IDs
team_ids: list[str] = [r["data"]["id"] for r in results if r["code"] == 0]
assert len(team_ids) == len(set(team_ids)), (
"All team IDs should be unique"
)
def test_team_with_invalid_model_id(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test team creation with invalid model ID."""
team_name: str = f"Invalid Model Team {uuid.uuid4().hex[:8]}"
team_payload: dict[str, str] = {
"name": team_name,
"llm_id": "invalid_nonexistent_model_id_12345",
}
res: dict[str, Any] = create_team(web_api_auth, team_payload)
# Should reject with clear error message
assert res["code"] != 0, "Invalid model ID should be rejected"
assert (
"model" in res["message"].lower()
or "not found" in res["message"].lower()
or "invalid" in res["message"].lower()
), "Error message should mention model issue"
def test_team_creation_with_negative_credit(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test team creation with negative credit."""
team_name: str = f"Negative Credit Team {uuid.uuid4().hex[:8]}"
team_payload: dict[str, Any] = {
"name": team_name,
"credit": -100,
}
res: dict[str, Any] = create_team(web_api_auth, team_payload)
# Should reject negative credit
assert res["code"] != 0, "Negative credit should be rejected"
assert "credit" in res["message"].lower(), (
"Error message should mention credit"
)
def test_team_creation_empty_json_payload(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test team creation with completely empty payload."""
res: dict[str, Any] = create_team(web_api_auth, {})
# Should reject with clear error
assert res["code"] != 0, "Empty payload should be rejected"
assert (
"name" in res["message"].lower()
or "required" in res["message"].lower()
), "Error should mention missing name"
def test_team_unicode_name(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test team creation with full unicode name."""
unicode_names: list[str] = [
f"团队{uuid.uuid4().hex[:4]}", # Chinese
f"チーム{uuid.uuid4().hex[:4]}", # Japanese
f"Команда{uuid.uuid4().hex[:4]}", # Russian
f"فريق{uuid.uuid4().hex[:4]}", # Arabic (RTL)
f"😀🎉{uuid.uuid4().hex[:4]}", # Emoji
]
for name in unicode_names:
res: dict[str, Any] = create_team(web_api_auth, {"name": name})
# Should handle unicode properly
if res["code"] == 0:
# Verify unicode is preserved (may be normalized)
assert len(res["data"]["name"]) > 0, (
"Team name should not be empty after unicode"
)
@pytest.mark.p3
def test_team_creation_with_all_optional_params(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test team creation with all optional parameters."""
team_name: str = f"Full Params Team {uuid.uuid4().hex[:8]}"
team_payload: dict[str, Any] = {
"name": team_name,
"credit": 2000,
# Note: Model IDs would need to be valid
# "llm_id": "valid_llm_id",
# "embd_id": "valid_embd_id",
# "asr_id": "valid_asr_id",
# "parser_ids": "valid_parser_ids",
# "img2txt_id": "valid_img2txt_id",
# "rerank_id": "valid_rerank_id",
}
res: dict[str, Any] = create_team(web_api_auth, team_payload)
# Should succeed
assert res["code"] == 0, res
assert res["data"]["name"] == team_name
@pytest.mark.p3
def test_team_max_name_length(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test team with maximum allowed name length."""
# API spec says max 100 characters
max_name: str = "A" * 100
res: dict[str, Any] = create_team(web_api_auth, {"name": max_name})
# Should accept 100 characters
assert res["code"] == 0, "100-character name should be accepted"
assert res["data"]["name"] == max_name
@pytest.mark.p3
def test_team_name_just_over_limit(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test team with name just over limit."""
# 101 characters (1 over limit)
long_name: str = "A" * 101
res: dict[str, Any] = create_team(web_api_auth, {"name": long_name})
# Should reject
assert res["code"] != 0, "101-character name should be rejected"
assert (
"100" in res["message"]
or "length" in res["message"].lower()
or "long" in res["message"].lower()
), "Error should mention length limit"
@pytest.mark.p3
def test_team_creation_idempotency(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test that repeated team creation creates separate teams."""
team_name: str = f"Idempotency Test {uuid.uuid4().hex[:8]}"
payload: dict[str, str] = {"name": team_name}
# Create same team twice
res1: dict[str, Any] = create_team(web_api_auth, payload)
res2: dict[str, Any] = create_team(web_api_auth, payload)
# Both should succeed and create different teams
assert res1["code"] == 0, res1
assert res2["code"] == 0, res2
assert res1["data"]["id"] != res2["data"]["id"], (
"Should create different teams, not be idempotent"
)
@pytest.mark.p3
def test_team_with_parser_ids(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test team creation with custom parser IDs."""
team_name: str = f"Parser Test {uuid.uuid4().hex[:8]}"
# parser_ids is typically a comma-separated string
team_payload: dict[str, str] = {
"name": team_name,
"parser_ids": "naive,qa,table,paper,book,laws,presentation,manual,wiki",
}
res: dict[str, Any] = create_team(web_api_auth, team_payload)
# Should accept valid parser IDs
assert res["code"] == 0, res