[OND211-2329]: Updated create group tests and added tests for add/remove members to group.

This commit is contained in:
Hetavi Shah 2025-11-19 16:53:43 +05:30
parent 3e7d57b208
commit c50f7b39d9
4 changed files with 1061 additions and 33 deletions

View file

@ -22,13 +22,14 @@ from requests.auth import AuthBase
from requests_toolbelt import MultipartEncoder
from utils.file_utils import create_txt_file
# Import login_as_user from root conftest
# Import login_as_user and encrypt_password from root conftest
import importlib.util
_root_conftest_path = Path(__file__).parent.parent / "conftest.py"
_root_spec = importlib.util.spec_from_file_location("root_conftest", _root_conftest_path)
_root_conftest_module = importlib.util.module_from_spec(_root_spec)
_root_spec.loader.exec_module(_root_conftest_module)
login_as_user = _root_conftest_module.login_as_user
encrypt_password = _root_conftest_module.encrypt_password
HEADERS = {"Content-Type": "application/json"}
DATASETS_API_URL = f"/api/{VERSION}/datasets"
@ -696,3 +697,87 @@ def create_group(
url=url, headers=headers, auth=auth, json=payload
)
return res.json()
def add_group_members(
auth: Union[AuthBase, str, None],
group_id: str,
payload: Optional[Dict[str, Any]] = None,
*,
headers: Dict[str, str] = HEADERS,
) -> Dict[str, Any]:
"""Add members to a group.
Args:
auth: Authentication object (AuthBase subclass), token string, or None.
group_id: The group ID to add members to.
payload: Optional JSON payload containing user_ids list.
headers: Optional HTTP headers. Defaults to HEADERS.
Returns:
JSON response as a dictionary containing added and failed user lists.
Raises:
requests.RequestException: If the HTTP request fails.
"""
url: str = f"{HOST_ADDRESS}{GROUP_API_URL}/{group_id}/members/add"
res: requests.Response = requests.post(
url=url, headers=headers, auth=auth, json=payload
)
return res.json()
def remove_group_member(
auth: Union[AuthBase, str, None],
group_id: str,
user_id: str,
*,
headers: Dict[str, str] = HEADERS,
) -> Dict[str, Any]:
"""Remove a user from a group.
Args:
auth: Authentication object (AuthBase subclass), token string, or None.
group_id: The group ID to remove member from.
user_id: The user ID to remove from the group.
headers: Optional HTTP headers. Defaults to HEADERS.
Returns:
JSON response as a dictionary containing the removal result.
Raises:
requests.RequestException: If the HTTP request fails.
"""
url: str = f"{HOST_ADDRESS}{GROUP_API_URL}/{group_id}/members/{user_id}"
res: requests.Response = requests.delete(
url=url, headers=headers, auth=auth
)
return res.json()
def list_group_members(
auth: Union[AuthBase, str, None],
group_id: str,
params: Optional[Dict[str, Any]] = None,
*,
headers: Dict[str, str] = HEADERS,
) -> Dict[str, Any]:
"""List all members of a group.
Args:
auth: Authentication object (AuthBase subclass), token string, or None.
group_id: The group ID to list members for.
params: Optional query parameters for filtering/pagination.
headers: Optional HTTP headers. Defaults to HEADERS.
Returns:
JSON response as a dictionary containing the list of members.
Raises:
requests.RequestException: If the HTTP request fails.
"""
url: str = f"{HOST_ADDRESS}{GROUP_API_URL}/{group_id}/members"
res: requests.Response = requests.get(
url=url, headers=headers, auth=auth, params=params
)
return res.json()

View file

@ -0,0 +1,497 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations
import time
import uuid
from typing import Any
import pytest
from common import (
add_group_members,
add_users_to_team,
create_group,
create_team,
create_user,
encrypt_password,
login_as_user,
)
from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowWebApiAuth
# ---------------------------------------------------------------------------
# Test Classes
# ---------------------------------------------------------------------------
@pytest.mark.p1
class TestAuthorization:
"""Tests for authentication behavior when adding members to a group."""
@pytest.mark.parametrize(
("invalid_auth", "expected_code", "expected_message"),
[
(None, 401, "Unauthorized"),
(RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "Unauthorized"),
],
)
def test_invalid_auth(
self,
invalid_auth: RAGFlowWebApiAuth | None,
expected_code: int,
expected_message: str,
web_api_auth: RAGFlowWebApiAuth,
) -> None:
"""Test adding members with invalid or missing authentication."""
# Create a team and group first
team_name: str = f"Test Team {uuid.uuid4().hex[:8]}"
team_payload: dict[str, str] = {"name": team_name}
team_res: dict[str, Any] = create_team(web_api_auth, team_payload)
if team_res["code"] != 0:
pytest.skip("Team creation failed, skipping auth test")
tenant_id: str = team_res["data"]["id"]
group_name: str = f"Test Group {uuid.uuid4().hex[:8]}"
group_payload: dict[str, str] = {
"name": group_name,
"tenant_id": tenant_id,
}
group_res: dict[str, Any] = create_group(web_api_auth, group_payload)
if group_res["code"] != 0:
pytest.skip("Group creation failed, skipping auth test")
group_id: str = group_res["data"]["id"]
# Try to add members with invalid auth
add_payload: dict[str, list[str]] = {"user_ids": ["test_user_id"]}
res: dict[str, Any] = add_group_members(invalid_auth, group_id, add_payload)
assert res["code"] == expected_code, res
if expected_message:
assert expected_message in res["message"]
@pytest.mark.p1
class TestAddMembers:
"""Comprehensive tests for adding members to a group."""
@pytest.fixture
def test_team(self, web_api_auth: RAGFlowWebApiAuth) -> dict[str, Any]:
"""Create a test team for use in tests."""
team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"}
res: dict[str, Any] = create_team(web_api_auth, team_payload)
assert res["code"] == 0
return res["data"]
@pytest.fixture
def test_group(
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any]
) -> dict[str, Any]:
"""Create a test group for use in tests."""
group_payload: dict[str, str] = {
"name": f"Test Group {uuid.uuid4().hex[:8]}",
"tenant_id": test_team["id"],
}
res: dict[str, Any] = create_group(web_api_auth, group_payload)
assert res["code"] == 0
return res["data"]
@pytest.fixture
def test_users(self, web_api_auth: RAGFlowWebApiAuth) -> list[dict[str, Any]]:
"""Create test users for use in tests."""
users = []
for i in range(3):
email = f"testuser{i}_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
encrypted_password = encrypt_password(password)
user_payload: dict[str, str] = {
"email": email,
"password": encrypted_password,
"nickname": f"Test User {i}",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
if user_res["code"] == 0:
users.append({"email": email, "id": user_res["data"]["id"], "password": password})
return users
@pytest.fixture
def team_with_users(
self,
web_api_auth: RAGFlowWebApiAuth,
test_team: dict[str, Any],
test_users: list[dict[str, Any]],
) -> dict[str, Any]:
"""Add test users to the team."""
for user in test_users:
add_payload: dict[str, list[str]] = {"users": [user["email"]]}
add_users_to_team(web_api_auth, test_team["id"], add_payload)
return test_team
@pytest.mark.p1
def test_add_single_member(
self,
web_api_auth: RAGFlowWebApiAuth,
test_group: dict[str, Any],
team_with_users: dict[str, Any],
test_users: list[dict[str, Any]],
) -> None:
"""Test adding a single member to a group."""
if not test_users:
pytest.skip("No test users created")
user_id: str = test_users[0]["id"]
add_payload: dict[str, list[str]] = {"user_ids": [user_id]}
res: dict[str, Any] = add_group_members(
web_api_auth, test_group["id"], add_payload
)
assert res["code"] == 0, res
assert "data" in res
assert "added" in res["data"]
assert "failed" in res["data"]
assert len(res["data"]["added"]) == 1
assert res["data"]["added"][0] == user_id
assert len(res["data"]["failed"]) == 0
@pytest.mark.p1
def test_add_multiple_members(
self,
web_api_auth: RAGFlowWebApiAuth,
test_group: dict[str, Any],
team_with_users: dict[str, Any],
test_users: list[dict[str, Any]],
) -> None:
"""Test adding multiple members to a group."""
if len(test_users) < 2:
pytest.skip("Need at least 2 test users")
user_ids: list[str] = [user["id"] for user in test_users[:2]]
add_payload: dict[str, list[str]] = {"user_ids": user_ids}
res: dict[str, Any] = add_group_members(
web_api_auth, test_group["id"], add_payload
)
assert res["code"] == 0, res
assert len(res["data"]["added"]) == 2
assert set(res["data"]["added"]) == set(user_ids)
assert len(res["data"]["failed"]) == 0
@pytest.mark.p1
def test_add_member_missing_request_body(
self,
web_api_auth: RAGFlowWebApiAuth,
test_group: dict[str, Any],
) -> None:
"""Test adding members without request body."""
res: dict[str, Any] = add_group_members(
web_api_auth, test_group["id"], None
)
assert res["code"] == 101
assert "required" in res["message"].lower() or "body" in res["message"].lower()
@pytest.mark.p1
def test_add_member_missing_user_ids(
self,
web_api_auth: RAGFlowWebApiAuth,
test_group: dict[str, Any],
) -> None:
"""Test adding members without user_ids."""
add_payload: dict[str, Any] = {}
res: dict[str, Any] = add_group_members(
web_api_auth, test_group["id"], add_payload
)
assert res["code"] == 101
assert "user_ids" in res["message"].lower() or "non-empty array" in res[
"message"
].lower()
@pytest.mark.p1
def test_add_member_empty_user_ids(
self,
web_api_auth: RAGFlowWebApiAuth,
test_group: dict[str, Any],
) -> None:
"""Test adding members with empty user_ids array."""
add_payload: dict[str, list[str]] = {"user_ids": []}
res: dict[str, Any] = add_group_members(
web_api_auth, test_group["id"], add_payload
)
assert res["code"] == 101
assert "non-empty array" in res["message"].lower() or "empty" in res[
"message"
].lower()
@pytest.mark.p1
def test_add_member_invalid_user_id(
self,
web_api_auth: RAGFlowWebApiAuth,
test_group: dict[str, Any],
) -> None:
"""Test adding a non-existent user."""
add_payload: dict[str, list[str]] = {
"user_ids": ["non_existent_user_id_12345"]
}
res: dict[str, Any] = add_group_members(
web_api_auth, test_group["id"], add_payload
)
assert res["code"] == 0 # API returns success with failed list
assert len(res["data"]["added"]) == 0
assert len(res["data"]["failed"]) == 1
assert "not found" in res["data"]["failed"][0]["error"].lower() or "invalid" in res[
"data"
]["failed"][0]["error"].lower()
@pytest.mark.p1
def test_add_member_user_not_in_team(
self,
web_api_auth: RAGFlowWebApiAuth,
test_group: dict[str, Any],
) -> None:
"""Test adding a user who is not a member of the team."""
# Create a user but don't add to team
email = f"notinteam_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
encrypted_password = encrypt_password(password)
user_payload: dict[str, str] = {
"email": email,
"password": encrypted_password,
"nickname": "Not In Team User",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
assert user_res["code"] == 0
user_id: str = user_res["data"]["id"]
# Try to add to group (should fail - user not in team)
add_payload: dict[str, list[str]] = {"user_ids": [user_id]}
res: dict[str, Any] = add_group_members(
web_api_auth, test_group["id"], add_payload
)
assert res["code"] == 0 # API returns success with failed list
assert len(res["data"]["added"]) == 0
assert len(res["data"]["failed"]) == 1
assert "not a member of the team" in res["data"]["failed"][0]["error"].lower()
@pytest.mark.p1
def test_add_duplicate_member(
self,
web_api_auth: RAGFlowWebApiAuth,
test_group: dict[str, Any],
team_with_users: dict[str, Any],
test_users: list[dict[str, Any]],
) -> None:
"""Test adding a user who is already in the group."""
if not test_users:
pytest.skip("No test users created")
user_id: str = test_users[0]["id"]
# Add user first time
add_payload: dict[str, list[str]] = {"user_ids": [user_id]}
res1: dict[str, Any] = add_group_members(
web_api_auth, test_group["id"], add_payload
)
assert res1["code"] == 0
assert len(res1["data"]["added"]) == 1
# Try to add same user again
res2: dict[str, Any] = add_group_members(
web_api_auth, test_group["id"], add_payload
)
assert res2["code"] == 0 # API returns success with failed list
assert len(res2["data"]["added"]) == 0
assert len(res2["data"]["failed"]) == 1
assert "already a member" in res2["data"]["failed"][0]["error"].lower()
@pytest.mark.p1
def test_add_member_invalid_group_id(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_users: dict[str, Any],
test_users: list[dict[str, Any]],
) -> None:
"""Test adding members to a non-existent group."""
if not test_users:
pytest.skip("No test users created")
user_id: str = test_users[0]["id"]
add_payload: dict[str, list[str]] = {"user_ids": [user_id]}
res: dict[str, Any] = add_group_members(
web_api_auth, "non_existent_group_id_12345", add_payload
)
assert res["code"] == 102
assert "group not found" in res["message"].lower() or "not found" in res[
"message"
].lower()
@pytest.mark.p1
def test_add_member_invalid_user_id_format(
self,
web_api_auth: RAGFlowWebApiAuth,
test_group: dict[str, Any],
) -> None:
"""Test adding members with invalid user ID format."""
add_payload: dict[str, list[Any]] = {"user_ids": [12345]} # Not a string
res: dict[str, Any] = add_group_members(
web_api_auth, test_group["id"], add_payload
)
assert res["code"] == 0 # API returns success with failed list
assert len(res["data"]["added"]) == 0
assert len(res["data"]["failed"]) == 1
assert "invalid" in res["data"]["failed"][0]["error"].lower()
@pytest.mark.p2
def test_add_member_not_team_owner_or_admin(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test that non-admin/non-owner users cannot add members to groups."""
# Create a team with the main user (owner)
team_name: str = f"Owner Team {uuid.uuid4().hex[:8]}"
team_payload: dict[str, str] = {"name": team_name}
team_res: dict[str, Any] = create_team(web_api_auth, team_payload)
assert team_res["code"] == 0, team_res
tenant_id: str = team_res["data"]["id"]
# Create a group
group_name: str = f"Test Group {uuid.uuid4().hex[:8]}"
group_payload: dict[str, str] = {
"name": group_name,
"tenant_id": tenant_id,
}
group_res: dict[str, Any] = create_group(web_api_auth, group_payload)
assert group_res["code"] == 0, group_res
group_id: str = group_res["data"]["id"]
# Create a second user with encrypted password
other_user_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
other_user_password: str = "test123"
encrypted_password: str = encrypt_password(other_user_password)
user_payload: dict[str, str] = {
"nickname": "Other User",
"email": other_user_email,
"password": encrypted_password,
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
assert user_res["code"] == 0, user_res
# Add user to team
add_team_payload: dict[str, list[str]] = {"users": [other_user_email]}
add_team_res: dict[str, Any] = add_users_to_team(
web_api_auth, tenant_id, add_team_payload
)
assert add_team_res["code"] == 0
# Small delay to ensure user is fully created
time.sleep(0.5)
# Login as the other user
other_user_auth: RAGFlowWebApiAuth = login_as_user(
other_user_email, other_user_password
)
# Try to add a member to the group as the other user
# First, we need another user in the team to add
third_user_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
third_user_password: str = "test123"
third_encrypted_password: str = encrypt_password(third_user_password)
third_user_payload: dict[str, str] = {
"nickname": "Third User",
"email": third_user_email,
"password": third_encrypted_password,
}
third_user_res: dict[str, Any] = create_user(web_api_auth, third_user_payload)
assert third_user_res["code"] == 0
# Add third user to team
add_third_team_payload: dict[str, list[str]] = {"users": [third_user_email]}
add_third_team_res: dict[str, Any] = add_users_to_team(
web_api_auth, tenant_id, add_third_team_payload
)
assert add_third_team_res["code"] == 0
third_user_id: str = third_user_res["data"]["id"]
# Try to add member as non-admin user
add_payload: dict[str, list[str]] = {"user_ids": [third_user_id]}
res: dict[str, Any] = add_group_members(other_user_auth, group_id, add_payload)
# Should fail - user is not the team owner or admin
assert res["code"] != 0, (
"Non-owner/non-admin should not be able to add members to groups"
)
# Verify it's a permission-related error
assert res["code"] in [108, 403, 104, 102], (
f"Expected permission error code (108, 403, 104, or 102), got: {res}"
)
# Verify the error message indicates permission issue
assert "owner" in res["message"].lower() or "admin" in res["message"].lower() or "permission" in res["message"].lower(), (
f"Error message should indicate permission issue, got: {res['message']}"
)
@pytest.mark.p2
def test_add_and_list_members(
self,
web_api_auth: RAGFlowWebApiAuth,
test_group: dict[str, Any],
team_with_users: dict[str, Any],
test_users: list[dict[str, Any]],
) -> None:
"""Test adding members and then listing them."""
if not test_users:
pytest.skip("No test users created")
from common import list_group_members
# List members before adding
list_res_before: dict[str, Any] = list_group_members(
web_api_auth, test_group["id"]
)
assert list_res_before["code"] == 0
initial_count: int = len(list_res_before["data"])
# Add a user
user_id: str = test_users[0]["id"]
add_payload: dict[str, list[str]] = {"user_ids": [user_id]}
add_res: dict[str, Any] = add_group_members(
web_api_auth, test_group["id"], add_payload
)
assert add_res["code"] == 0
assert len(add_res["data"]["added"]) == 1
# List members after adding
list_res_after: dict[str, Any] = list_group_members(
web_api_auth, test_group["id"]
)
assert list_res_after["code"] == 0
assert len(list_res_after["data"]) == initial_count + 1
# Verify the added user is in the list
member_user_ids: set[str] = {member["user_id"] for member in list_res_after["data"]}
assert user_id in member_user_ids

View file

@ -15,47 +15,16 @@
#
from __future__ import annotations
import base64
import os
import uuid
from typing import Any
import pytest
from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5
from Cryptodome.PublicKey import RSA
from common import create_group, create_team, create_user, login_as_user
from common import create_group, create_team, create_user, encrypt_password, login_as_user
from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowWebApiAuth
# ---------------------------------------------------------------------------
# Helper Functions
# ---------------------------------------------------------------------------
def encrypt_password(password: str) -> str:
"""Encrypt password using RSA for API calls.
Args:
password: Plain text password to encrypt.
Returns:
Encrypted password as a base64-encoded string.
"""
current_dir: str = os.path.dirname(os.path.abspath(__file__))
project_base: str = os.path.abspath(os.path.join(current_dir, "..", "..", "..", ".."))
file_path: str = os.path.join(project_base, "conf", "public.pem")
with open(file_path, encoding="utf-8") as pem_file:
rsa_key: RSA.RsaKey = RSA.import_key(pem_file.read(), passphrase="Welcome")
cipher: Cipher_pkcs1_v1_5.PKCS115_Cipher = Cipher_pkcs1_v1_5.new(rsa_key)
password_base64: str = base64.b64encode(password.encode()).decode()
encrypted_password: bytes = cipher.encrypt(password_base64.encode())
return base64.b64encode(encrypted_password).decode()
# ---------------------------------------------------------------------------
# Test Classes
# ---------------------------------------------------------------------------

View file

@ -0,0 +1,477 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations
import time
import uuid
from typing import Any
import pytest
from common import (
add_group_members,
add_users_to_team,
create_group,
create_team,
create_user,
encrypt_password,
login_as_user,
remove_group_member,
)
from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowWebApiAuth
# ---------------------------------------------------------------------------
# Test Classes
# ---------------------------------------------------------------------------
@pytest.mark.p1
class TestAuthorization:
"""Tests for authentication behavior when removing members from a group."""
@pytest.mark.parametrize(
("invalid_auth", "expected_code", "expected_message"),
[
(None, 401, "Unauthorized"),
(RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "Unauthorized"),
],
)
def test_invalid_auth(
self,
invalid_auth: RAGFlowWebApiAuth | None,
expected_code: int,
expected_message: str,
web_api_auth: RAGFlowWebApiAuth,
) -> None:
"""Test removing members with invalid or missing authentication."""
# Create a team and group first
team_name: str = f"Test Team {uuid.uuid4().hex[:8]}"
team_payload: dict[str, str] = {"name": team_name}
team_res: dict[str, Any] = create_team(web_api_auth, team_payload)
if team_res["code"] != 0:
pytest.skip("Team creation failed, skipping auth test")
tenant_id: str = team_res["data"]["id"]
group_name: str = f"Test Group {uuid.uuid4().hex[:8]}"
group_payload: dict[str, str] = {
"name": group_name,
"tenant_id": tenant_id,
}
group_res: dict[str, Any] = create_group(web_api_auth, group_payload)
if group_res["code"] != 0:
pytest.skip("Group creation failed, skipping auth test")
group_id: str = group_res["data"]["id"]
# Create a user and add to team and group
email = f"testuser_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
encrypted_password = encrypt_password(password)
user_payload: dict[str, str] = {
"email": email,
"password": encrypted_password,
"nickname": "Test User",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
if user_res["code"] != 0:
pytest.skip("User creation failed, skipping auth test")
user_id: str = user_res["data"]["id"]
# Add user to team
add_team_payload: dict[str, list[str]] = {"users": [email]}
add_users_to_team(web_api_auth, tenant_id, add_team_payload)
# Add user to group
add_group_payload: dict[str, list[str]] = {"user_ids": [user_id]}
add_group_members(web_api_auth, group_id, add_group_payload)
# Try to remove member with invalid auth
res: dict[str, Any] = remove_group_member(
invalid_auth, group_id, user_id
)
assert res["code"] == expected_code, res
if expected_message:
assert expected_message in res["message"]
@pytest.mark.p1
class TestRemoveMember:
"""Comprehensive tests for removing members from a group."""
@pytest.fixture
def test_team(self, web_api_auth: RAGFlowWebApiAuth) -> dict[str, Any]:
"""Create a test team for use in tests."""
team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"}
res: dict[str, Any] = create_team(web_api_auth, team_payload)
assert res["code"] == 0
return res["data"]
@pytest.fixture
def test_group(
self, web_api_auth: RAGFlowWebApiAuth, test_team: dict[str, Any]
) -> dict[str, Any]:
"""Create a test group for use in tests."""
group_payload: dict[str, str] = {
"name": f"Test Group {uuid.uuid4().hex[:8]}",
"tenant_id": test_team["id"],
}
res: dict[str, Any] = create_group(web_api_auth, group_payload)
assert res["code"] == 0
return res["data"]
@pytest.fixture
def test_user_with_member(
self,
web_api_auth: RAGFlowWebApiAuth,
test_team: dict[str, Any],
test_group: dict[str, Any],
) -> dict[str, Any]:
"""Create a test user and add them to team and group."""
email = f"testuser_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
encrypted_password = encrypt_password(password)
user_payload: dict[str, str] = {
"email": email,
"password": encrypted_password,
"nickname": "Test User",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
assert user_res["code"] == 0
user_id: str = user_res["data"]["id"]
# Add user to team
add_team_payload: dict[str, list[str]] = {"users": [email]}
add_users_to_team(web_api_auth, test_team["id"], add_team_payload)
# Add user to group
add_group_payload: dict[str, list[str]] = {"user_ids": [user_id]}
add_res: dict[str, Any] = add_group_members(
web_api_auth, test_group["id"], add_group_payload
)
assert add_res["code"] == 0
assert len(add_res["data"]["added"]) == 1
return {"id": user_id, "email": email, "password": password}
@pytest.mark.p1
def test_remove_single_member(
self,
web_api_auth: RAGFlowWebApiAuth,
test_group: dict[str, Any],
test_user_with_member: dict[str, Any],
) -> None:
"""Test removing a single member from a group."""
user_id: str = test_user_with_member["id"]
res: dict[str, Any] = remove_group_member(
web_api_auth, test_group["id"], user_id
)
assert res["code"] == 0, res
assert "data" in res
assert res["data"] is True
assert "removed" in res["message"].lower() or "success" in res[
"message"
].lower()
@pytest.mark.p1
def test_remove_member_invalid_group_id(
self,
web_api_auth: RAGFlowWebApiAuth,
test_user_with_member: dict[str, Any],
) -> None:
"""Test removing a member from a non-existent group."""
user_id: str = test_user_with_member["id"]
res: dict[str, Any] = remove_group_member(
web_api_auth, "non_existent_group_id_12345", user_id
)
assert res["code"] == 102
assert "group not found" in res["message"].lower() or "not found" in res[
"message"
].lower()
@pytest.mark.p1
def test_remove_member_user_not_in_group(
self,
web_api_auth: RAGFlowWebApiAuth,
test_group: dict[str, Any],
test_team: dict[str, Any],
) -> None:
"""Test removing a user who is not in the group."""
# Create a user and add to team but not group
email = f"notingroup_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
encrypted_password = encrypt_password(password)
user_payload: dict[str, str] = {
"email": email,
"password": encrypted_password,
"nickname": "Not In Group User",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
assert user_res["code"] == 0
user_id: str = user_res["data"]["id"]
# Add user to team but not group
add_team_payload: dict[str, list[str]] = {"users": [email]}
add_users_to_team(web_api_auth, test_team["id"], add_team_payload)
# Try to remove from group
res: dict[str, Any] = remove_group_member(
web_api_auth, test_group["id"], user_id
)
assert res["code"] == 102
assert "not a member" in res["message"].lower() or "not found" in res[
"message"
].lower()
@pytest.mark.p1
def test_remove_member_invalid_user_id(
self,
web_api_auth: RAGFlowWebApiAuth,
test_group: dict[str, Any],
) -> None:
"""Test removing a non-existent user from a group."""
res: dict[str, Any] = remove_group_member(
web_api_auth, test_group["id"], "non_existent_user_id_12345"
)
assert res["code"] == 102
assert "not a member" in res["message"].lower() or "not found" in res[
"message"
].lower()
@pytest.mark.p2
def test_remove_member_not_team_owner_or_admin(
self, web_api_auth: RAGFlowWebApiAuth
) -> None:
"""Test that non-admin/non-owner users cannot remove members from groups."""
# Create a team with the main user (owner)
team_name: str = f"Owner Team {uuid.uuid4().hex[:8]}"
team_payload: dict[str, str] = {"name": team_name}
team_res: dict[str, Any] = create_team(web_api_auth, team_payload)
assert team_res["code"] == 0, team_res
tenant_id: str = team_res["data"]["id"]
# Create a group
group_name: str = f"Test Group {uuid.uuid4().hex[:8]}"
group_payload: dict[str, str] = {
"name": group_name,
"tenant_id": tenant_id,
}
group_res: dict[str, Any] = create_group(web_api_auth, group_payload)
assert group_res["code"] == 0, group_res
group_id: str = group_res["data"]["id"]
# Create a second user with encrypted password
other_user_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
other_user_password: str = "test123"
encrypted_password: str = encrypt_password(other_user_password)
user_payload: dict[str, str] = {
"nickname": "Other User",
"email": other_user_email,
"password": encrypted_password,
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
assert user_res["code"] == 0, user_res
# Add user to team
add_team_payload: dict[str, list[str]] = {"users": [other_user_email]}
add_team_res: dict[str, Any] = add_users_to_team(
web_api_auth, tenant_id, add_team_payload
)
assert add_team_res["code"] == 0
# Create a third user to add to group and then try to remove
third_user_email: str = f"test_{uuid.uuid4().hex[:8]}@example.com"
third_user_password: str = "test123"
third_encrypted_password: str = encrypt_password(third_user_password)
third_user_payload: dict[str, str] = {
"nickname": "Third User",
"email": third_user_email,
"password": third_encrypted_password,
}
third_user_res: dict[str, Any] = create_user(web_api_auth, third_user_payload)
assert third_user_res["code"] == 0
# Add third user to team
add_third_team_payload: dict[str, list[str]] = {"users": [third_user_email]}
add_third_team_res: dict[str, Any] = add_users_to_team(
web_api_auth, tenant_id, add_third_team_payload
)
assert add_third_team_res["code"] == 0
third_user_id: str = third_user_res["data"]["id"]
# Add third user to group (as owner)
add_group_payload: dict[str, list[str]] = {"user_ids": [third_user_id]}
add_group_res: dict[str, Any] = add_group_members(
web_api_auth, group_id, add_group_payload
)
assert add_group_res["code"] == 0
# Small delay to ensure user is fully created
time.sleep(0.5)
# Login as the other user (non-admin)
other_user_auth: RAGFlowWebApiAuth = login_as_user(
other_user_email, other_user_password
)
# Try to remove member as non-admin user
res: dict[str, Any] = remove_group_member(
other_user_auth, group_id, third_user_id
)
# Should fail - user is not the team owner or admin
assert res["code"] != 0, (
"Non-owner/non-admin should not be able to remove members from groups"
)
# Verify it's a permission-related error
assert res["code"] in [108, 403, 104, 102], (
f"Expected permission error code (108, 403, 104, or 102), got: {res}"
)
# Verify the error message indicates permission issue
assert "owner" in res["message"].lower() or "admin" in res["message"].lower() or "permission" in res["message"].lower(), (
f"Error message should indicate permission issue, got: {res['message']}"
)
@pytest.mark.p2
def test_remove_and_re_add_member(
self,
web_api_auth: RAGFlowWebApiAuth,
test_group: dict[str, Any],
test_user_with_member: dict[str, Any],
) -> None:
"""Test removing a member and then adding them back."""
user_id: str = test_user_with_member["id"]
# Remove member
remove_res: dict[str, Any] = remove_group_member(
web_api_auth, test_group["id"], user_id
)
assert remove_res["code"] == 0
# Add member back
add_payload: dict[str, list[str]] = {"user_ids": [user_id]}
add_res: dict[str, Any] = add_group_members(
web_api_auth, test_group["id"], add_payload
)
assert add_res["code"] == 0
assert len(add_res["data"]["added"]) == 1
assert add_res["data"]["added"][0] == user_id
@pytest.mark.p2
def test_remove_multiple_members_sequentially(
self,
web_api_auth: RAGFlowWebApiAuth,
test_group: dict[str, Any],
test_team: dict[str, Any],
) -> None:
"""Test removing multiple members from a group sequentially."""
# Create and add multiple users
user_ids = []
for i in range(3):
email = f"testuser{i}_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
encrypted_password = encrypt_password(password)
user_payload: dict[str, str] = {
"email": email,
"password": encrypted_password,
"nickname": f"Test User {i}",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
assert user_res["code"] == 0
user_id: str = user_res["data"]["id"]
user_ids.append(user_id)
# Add to team
add_team_payload: dict[str, list[str]] = {"users": [email]}
add_users_to_team(web_api_auth, test_team["id"], add_team_payload)
# Add all users to group
add_payload: dict[str, list[str]] = {"user_ids": user_ids}
add_res: dict[str, Any] = add_group_members(
web_api_auth, test_group["id"], add_payload
)
assert add_res["code"] == 0
assert len(add_res["data"]["added"]) == 3
# Remove each user sequentially
for user_id in user_ids:
remove_res: dict[str, Any] = remove_group_member(
web_api_auth, test_group["id"], user_id
)
assert remove_res["code"] == 0
# Verify all removed by listing members
from common import list_group_members
list_res: dict[str, Any] = list_group_members(web_api_auth, test_group["id"])
assert list_res["code"] == 0
member_user_ids: set[str] = {member["user_id"] for member in list_res["data"]}
for user_id in user_ids:
assert user_id not in member_user_ids
@pytest.mark.p2
def test_remove_member_and_list(
self,
web_api_auth: RAGFlowWebApiAuth,
test_group: dict[str, Any],
test_user_with_member: dict[str, Any],
) -> None:
"""Test removing a member and verifying via list."""
from common import list_group_members
user_id: str = test_user_with_member["id"]
# List members before removal
list_res_before: dict[str, Any] = list_group_members(
web_api_auth, test_group["id"]
)
assert list_res_before["code"] == 0
member_user_ids_before: set[str] = {
member["user_id"] for member in list_res_before["data"]
}
assert user_id in member_user_ids_before
# Remove member
remove_res: dict[str, Any] = remove_group_member(
web_api_auth, test_group["id"], user_id
)
assert remove_res["code"] == 0
# List members after removal
list_res_after: dict[str, Any] = list_group_members(
web_api_auth, test_group["id"]
)
assert list_res_after["code"] == 0
member_user_ids_after: set[str] = {
member["user_id"] for member in list_res_after["data"]
}
assert user_id not in member_user_ids_after
assert len(member_user_ids_after) == len(member_user_ids_before) - 1