[OND211-2329]: Updated permissions API and added tests.

This commit is contained in:
Hetavi Shah 2025-11-24 19:12:14 +05:30
parent 13b8f0cf41
commit e462d5f23a
14 changed files with 2126 additions and 71 deletions

View file

@ -103,11 +103,18 @@ T = TypeVar("T")
P = ParamSpec("P")
def _load_user():
jwt = Serializer(secret_key=settings.SECRET_KEY)
authorization = request.headers.get("Authorization")
try:
jwt = Serializer(secret_key=settings.SECRET_KEY)
authorization = request.headers.get("Authorization")
except RuntimeError as e:
# Working outside of request context - no user authenticated
if "request context" in str(e).lower():
return None
# Re-raise other RuntimeErrors
raise
g.user = None
if not authorization:
return
return None
try:
access_token = str(jwt.loads(authorization))

View file

@ -138,7 +138,11 @@ async def save() -> Any:
@login_required
def get(canvas_id):
if not UserCanvasService.accessible(canvas_id, current_user.id):
return get_data_error_result(message="canvas not found.")
return get_json_result(
data=False,
message='You do not have read permission for this canvas.',
code=RetCode.PERMISSION_ERROR
)
e, c = UserCanvasService.get_by_canvas_id(canvas_id)
return get_json_result(data=c)

View file

@ -32,10 +32,11 @@ from api.db.services.task_service import TaskService, GRAPH_RAPTOR_FAKE_DOC_ID
from api.db.services.user_service import TenantService, UserTenantService
from api.utils.api_utils import get_error_data_result, server_error_response, get_data_error_result, validate_request, not_allowed_parameters, \
request_json
from api.db import VALID_FILE_TYPES
from api.db import VALID_FILE_TYPES, UserTenantRole
from api.db.services.knowledgebase_service import KnowledgebaseService
from api.db.db_models import File
from api.utils.api_utils import get_json_result
from api.common.permission_utils import has_permission
from rag.nlp import search
from api.constants import DATASET_NAME_LIMIT
from rag.utils.redis_conn import REDIS_CONN
@ -53,6 +54,8 @@ async def create() -> Any:
# Validate shared_tenant_id if provided
shared_tenant_id: Optional[str] = req.get("shared_tenant_id")
user_tenant: Optional[Any] = None
if shared_tenant_id:
if req.get("permission") != "team":
return get_json_result(
@ -61,14 +64,92 @@ async def create() -> Any:
code=RetCode.ARGUMENT_ERROR
)
# Verify user is a member of the shared tenant
user_tenant = UserTenantService.filter_by_tenant_and_user_id(shared_tenant_id, current_user.id)
if not user_tenant or user_tenant.status != StatusEnum.VALID.value:
try:
user_tenant = UserTenantService.filter_by_tenant_and_user_id(shared_tenant_id, current_user.id)
if not user_tenant or user_tenant.status != StatusEnum.VALID.value:
return get_json_result(
data=False,
message=f"You are not a member of the selected team",
code=RetCode.PERMISSION_ERROR
)
except Exception as e:
logging.exception(e)
return get_json_result(
data=False,
message=f"You are not a member of the selected team",
code=RetCode.PERMISSION_ERROR
message=f"Error verifying team membership: {str(e)}",
code=RetCode.EXCEPTION_ERROR
)
# Check create permission if sharing with team
if req.get("permission") == "team":
target_tenant_id: Optional[str] = shared_tenant_id
# Auto-resolve team if shared_tenant_id not provided
if not target_tenant_id:
try:
user_tenant_relations = list(UserTenantService.query(
user_id=current_user.id,
status=StatusEnum.VALID.value
))
if not user_tenant_relations:
return get_json_result(
data=False,
message="You are not a member of any team. Please specify shared_tenant_id or join a team first.",
code=RetCode.DATA_ERROR
)
# Prefer owner teams
owner_teams = [ut for ut in user_tenant_relations if ut.role == UserTenantRole.OWNER]
if owner_teams:
target_tenant_id = owner_teams[0].tenant_id
else:
target_tenant_id = user_tenant_relations[0].tenant_id
req["shared_tenant_id"] = target_tenant_id
except Exception as e:
logging.exception(e)
return get_json_result(
data=False,
message=f"Error finding team: {str(e)}",
code=RetCode.EXCEPTION_ERROR
)
# Ensure user_tenant is set for permission check
if not user_tenant:
try:
user_tenant = UserTenantService.filter_by_tenant_and_user_id(target_tenant_id, current_user.id)
if not user_tenant or user_tenant.status != StatusEnum.VALID.value:
return get_json_result(
data=False,
message=f"You are not a member of the selected team.",
code=RetCode.PERMISSION_ERROR
)
except Exception as e:
logging.exception(e)
return get_json_result(
data=False,
message=f"Error verifying team membership: {str(e)}",
code=RetCode.EXCEPTION_ERROR
)
# Owners and admins bypass permission check
if user_tenant.role not in [UserTenantRole.OWNER, UserTenantRole.ADMIN]:
try:
if not has_permission(target_tenant_id, current_user.id, "dataset", "create"):
return get_json_result(
data=False,
message='You do not have create permission for datasets in this team.',
code=RetCode.PERMISSION_ERROR
)
except Exception as e:
logging.exception(e)
return get_json_result(
data=False,
message=f"Error checking permissions: {str(e)}",
code=RetCode.EXCEPTION_ERROR
)
e: bool
res: Any
e, res = KnowledgebaseService.create_with_name(
@ -162,15 +243,16 @@ async def update():
def detail():
kb_id = request.args["kb_id"]
try:
tenants = UserTenantService.query(user_id=current_user.id)
for tenant in tenants:
if KnowledgebaseService.query(
tenant_id=tenant.tenant_id, id=kb_id):
break
else:
# Unified access control:
# - Owners always have access
# - Team members must have appropriate CRUD permissions
# - Non-existent datasets return the same permission-style error as before
if not KnowledgebaseService.accessible(kb_id, current_user.id, required_permission="read"):
return get_json_result(
data=False, message='Only owner of knowledgebase authorized for this operation.',
code=RetCode.OPERATING_ERROR)
data=False,
message='Only owner of knowledgebase authorized for this operation.',
code=RetCode.OPERATING_ERROR,
)
kb = KnowledgebaseService.get_detail(kb_id)
if not kb:
return get_data_error_result(
@ -236,15 +318,13 @@ async def rm():
code=RetCode.PERMISSION_ERROR
)
try:
kbs = KnowledgebaseService.query(
created_by=current_user.id, id=req["kb_id"])
if not kbs:
return get_json_result(
data=False, message='Only owner of knowledgebase authorized for this operation.',
code=RetCode.OPERATING_ERROR)
ok, kb = KnowledgebaseService.get_by_id(req["kb_id"])
if not ok or kb is None:
return get_data_error_result(
message="Can't find this knowledgebase!")
for doc in DocumentService.query(kb_id=req["kb_id"]):
if not DocumentService.remove_document(doc, kbs[0].tenant_id):
if not DocumentService.remove_document(doc, kb.tenant_id):
return get_data_error_result(
message="Database error (Document removal)!")
f2d = File2DocumentService.get_by_document_id(doc.id)
@ -252,15 +332,14 @@ async def rm():
FileService.filter_delete([File.source_type == FileSource.KNOWLEDGEBASE, File.id == f2d[0].file_id])
File2DocumentService.delete_by_document_id(doc.id)
FileService.filter_delete(
[File.source_type == FileSource.KNOWLEDGEBASE, File.type == "folder", File.name == kbs[0].name])
[File.source_type == FileSource.KNOWLEDGEBASE, File.type == "folder", File.name == kb.name])
if not KnowledgebaseService.delete_by_id(req["kb_id"]):
return get_data_error_result(
message="Database error (Knowledgebase removal)!")
for kb in kbs:
settings.docStoreConn.delete({"kb_id": kb.id}, search.index_name(kb.tenant_id), kb.id)
settings.docStoreConn.deleteIdx(search.index_name(kb.tenant_id), kb.id)
if hasattr(settings.STORAGE_IMPL, 'remove_bucket'):
settings.STORAGE_IMPL.remove_bucket(kb.id)
settings.docStoreConn.delete({"kb_id": kb.id}, search.index_name(kb.tenant_id), kb.id)
settings.docStoreConn.deleteIdx(search.index_name(kb.tenant_id), kb.id)
if hasattr(settings.STORAGE_IMPL, 'remove_bucket'):
settings.STORAGE_IMPL.remove_bucket(kb.id)
return get_json_result(data=True)
except Exception as e:
return server_error_response(e)

View file

@ -37,7 +37,10 @@ from api.utils.api_utils import (
validate_request,
)
from api.utils.web_utils import send_invite_email
from api.common.permission_utils import get_user_permissions, update_user_permissions
from api.common.permission_utils import (
get_user_permissions as get_member_permissions,
update_user_permissions as update_member_permissions,
)
from common import settings
from common.constants import RetCode, StatusEnum
from common.misc_utils import get_uuid
@ -565,14 +568,23 @@ async def update_team(tenant_id: str) -> Response:
code=RetCode.DATA_ERROR
)
req_json = await request.json
try:
req_json = await request.json
except Exception:
# Handle malformed or missing JSON body explicitly
return get_json_result(
data=False,
message="Request body is required!",
code=RetCode.ARGUMENT_ERROR,
)
if req_json is None:
return get_json_result(
data=False,
message="Request body is required!",
code=RetCode.ARGUMENT_ERROR,
)
req: Dict[str, Any] = req_json
# Extract update fields (all optional)
@ -1304,7 +1316,7 @@ def demote_admin(tenant_id: str, user_id: str) -> Response:
- ApiKeyAuth: []
parameters:
- in: path
name: tenant_iderify they are actually an admin; otherwise return:
name: tenant_id
required: true
type: string
description: Team ID
@ -1510,7 +1522,7 @@ def get_user_permissions(tenant_id: str, user_id: str) -> Response:
code=RetCode.DATA_ERROR,
)
permissions: Dict[str, Dict[str, bool]] = get_user_permissions(tenant_id, user_id)
permissions: Dict[str, Dict[str, bool]] = get_member_permissions(tenant_id, user_id)
return get_json_result(
data=permissions,
@ -1687,7 +1699,7 @@ async def update_user_permissions(tenant_id: str, user_id: str) -> Response:
validated_permissions[resource_type][perm_name] = perm_value
# Update permissions
success: bool = update_user_permissions(tenant_id, user_id, validated_permissions)
success: bool = update_member_permissions(tenant_id, user_id, validated_permissions)
if not success:
return get_json_result(
@ -1697,7 +1709,7 @@ async def update_user_permissions(tenant_id: str, user_id: str) -> Response:
)
# Get updated permissions for response
updated_permissions: Dict[str, Dict[str, bool]] = get_user_permissions(tenant_id, user_id)
updated_permissions: Dict[str, Dict[str, bool]] = get_member_permissions(tenant_id, user_id)
return get_json_result(
data=updated_permissions,

View file

@ -794,7 +794,7 @@ async def user_add():
@manager.route("/create", methods=["POST"]) # noqa: F821
@login_required
@validate_request("nickname", "email", "password")
def create_user() -> Response:
async def create_user() -> Response:
"""
Create a new user.
@ -848,14 +848,15 @@ def create_user() -> Response:
schema:
type: object
"""
if request.json is None:
req_json = await request.json
if req_json is None:
return get_json_result(
data=False,
message="Request body is required!",
code=RetCode.ARGUMENT_ERROR,
)
req: Dict[str, Any] = request.json
req: Dict[str, Any] = req_json
email_address: str = str(req.get("email", ""))
# Validate email is provided
@ -979,8 +980,7 @@ def create_user() -> Response:
@manager.route("/update", methods=["PUT"]) # noqa: F821
@login_required
@validate_request()
def update_user() -> Response:
async def update_user() -> Response:
"""
Update an existing user. Users can only update their own account.
---
@ -1041,14 +1041,15 @@ def update_user() -> Response:
schema:
type: object
"""
if request.json is None:
req_json = await request.json
if req_json is None:
return get_json_result(
data=False,
message="Request body is required!",
code=RetCode.ARGUMENT_ERROR,
)
req: Dict[str, Any] = request.json
req: Dict[str, Any] = req_json
user_id: Optional[str] = req.get("user_id")
email: Optional[str] = req.get("email")
identified_by_user_id: bool = bool(user_id)
@ -1362,8 +1363,7 @@ def list_users() -> Response:
@manager.route("/delete", methods=["DELETE"]) # noqa: F821
@login_required
@validate_request()
def delete_user() -> Response:
async def delete_user() -> Response:
"""
Delete a user. Users can only delete their own account.
@ -1424,14 +1424,15 @@ def delete_user() -> Response:
code=RetCode.UNAUTHORIZED,
)
if request.json is None:
req_json = await request.json
if req_json is None:
return get_json_result(
data=False,
message="Request body is required!",
code=RetCode.ARGUMENT_ERROR,
)
req: Dict[str, Any] = request.json
req: Dict[str, Any] = req_json
user_id: Optional[str] = req.get("user_id")
email: Optional[str] = req.get("email")

View file

@ -170,21 +170,30 @@ class KnowledgebaseService(CommonService):
User.avatar.alias('tenant_avatar'),
cls.model.update_time
]
# A KB is visible to a user if:
# - It is shared with one of the user's joined tenants (via shared_tenant_id)
# and has TEAM permission, or
# - It is owned by the user (tenant_id == user_id)
visibility_expr = (
(
(cls.model.shared_tenant_id.in_(joined_tenant_ids))
& (cls.model.permission == TenantPermission.TEAM.value)
)
| (cls.model.tenant_id == user_id)
)
base_query = (
cls.model.select(*fields)
.join(User, on=(cls.model.tenant_id == User.id))
.where(visibility_expr & (cls.model.status == StatusEnum.VALID.value))
)
if keywords:
kbs = cls.model.select(*fields).join(User, on=(cls.model.tenant_id == User.id)).where(
((cls.model.tenant_id.in_(joined_tenant_ids) & (cls.model.permission ==
TenantPermission.TEAM.value)) | (
cls.model.tenant_id == user_id))
& (cls.model.status == StatusEnum.VALID.value),
(fn.LOWER(cls.model.name).contains(keywords.lower()))
kbs = base_query.where(
fn.LOWER(cls.model.name).contains(keywords.lower())
)
else:
kbs = cls.model.select(*fields).join(User, on=(cls.model.tenant_id == User.id)).where(
((cls.model.tenant_id.in_(joined_tenant_ids) & (cls.model.permission ==
TenantPermission.TEAM.value)) | (
cls.model.tenant_id == user_id))
& (cls.model.status == StatusEnum.VALID.value)
)
kbs = base_query
if parser_id:
kbs = kbs.where(cls.model.parser_id == parser_id)
if desc:
@ -192,12 +201,21 @@ class KnowledgebaseService(CommonService):
else:
kbs = kbs.order_by(cls.model.getter_by(orderby).asc())
count = kbs.count()
# Apply pagination at query level
total = kbs.count()
if page_number and items_per_page:
kbs = kbs.paginate(page_number, items_per_page)
return list(kbs.dicts()), count
# Convert to list of dicts and enforce per-user read permission,
# so revoking dataset read permission hides team datasets from lists.
kb_dicts = list(kbs.dicts())
filtered_kbs = [
kb for kb in kb_dicts
if cls.accessible(kb["id"], user_id, required_permission="read")
]
# When filtering is applied, total should reflect visible datasets
return filtered_kbs, len(filtered_kbs)
@classmethod
@DB.connection_context()

View file

@ -92,6 +92,9 @@ def server_error_response(e):
logging.exception(e)
try:
msg = repr(e).lower()
# Check if it's a RuntimeError about request context - treat as Unauthorized
if isinstance(e, RuntimeError) and "request context" in msg:
return get_json_result(code=RetCode.UNAUTHORIZED, message="Unauthorized")
if getattr(e, "code", None) == 401 or ("unauthorized" in msg) or ("401" in msg):
return get_json_result(code=RetCode.UNAUTHORIZED, message=repr(e))
except Exception as ex:

View file

@ -152,6 +152,7 @@ dependencies = [
"pypandoc>=1.16",
"pyobvector==0.2.18",
"ruff>=0.14.4",
"reportlab>=4.4.3",
]
[dependency-groups]

View file

@ -21,6 +21,7 @@ from configs import HOST_ADDRESS, VERSION
from requests.auth import AuthBase
from requests_toolbelt import MultipartEncoder
from utils.file_utils import create_txt_file
from libs.auth import RAGFlowWebApiAuth
# Import login_as_user and encrypt_password from root conftest
import importlib.util
@ -39,26 +40,137 @@ CHUNK_API_URL = f"/api/{VERSION}/datasets/{{dataset_id}}/documents/{{document_id
CHAT_ASSISTANT_API_URL = f"/api/{VERSION}/chats"
SESSION_WITH_CHAT_ASSISTANT_API_URL = f"/api/{VERSION}/chats/{{chat_id}}/sessions"
SESSION_WITH_AGENT_API_URL = f"/api/{VERSION}/agents/{{agent_id}}/sessions"
CANVAS_API_URL = f"/{VERSION}/canvas"
# DATASET MANAGEMENT
def create_dataset(auth, payload=None, *, headers=HEADERS, data=None):
res = requests.post(url=f"{HOST_ADDRESS}{DATASETS_API_URL}", headers=headers, auth=auth, json=payload, data=data)
"""
Create dataset.
- For HTTP API token auth (`RAGFlowHttpApiAuth`), use the RESTful datasets API.
- For web JWT auth (`RAGFlowWebApiAuth`), proxy to the KB web endpoint `/v1/kb/create`
and normalize its response shape to match the datasets API (`data.id`).
"""
# Web (JWT) flow: go through KB web endpoint with permission-aware logic
if isinstance(auth, RAGFlowWebApiAuth):
url = f"{HOST_ADDRESS}/{VERSION}/kb/create"
res = requests.post(url=url, headers=headers, auth=auth, json=payload, data=data)
body = res.json()
# KB create returns {"kb_id": ...}; normalize to {"id": ..., "kb_id": ...}
if body.get("code") == 0 and isinstance(body.get("data"), dict) and "kb_id" in body["data"]:
kb_id = body["data"]["kb_id"]
body["data"] = {"id": kb_id, "kb_id": kb_id}
return body
# HTTP API (API key) flow: original datasets REST endpoint
res = requests.post(
url=f"{HOST_ADDRESS}{DATASETS_API_URL}",
headers=headers,
auth=auth,
json=payload,
data=data,
)
return res.json()
def list_datasets(auth, params=None, *, headers=HEADERS):
res = requests.get(url=f"{HOST_ADDRESS}{DATASETS_API_URL}", headers=headers, auth=auth, params=params)
"""
List datasets.
- Web JWT auth: call `/v1/kb/list` and project KBs to a simple `[{"id": ...}, ...]` list.
- HTTP API token auth: use `/api/{version}/datasets` as before.
"""
if isinstance(auth, RAGFlowWebApiAuth):
url = f"{HOST_ADDRESS}/{VERSION}/kb/list"
# `list_kbs` expects POST with optional body (owner_ids etc.) and query params for paging.
res = requests.post(url=url, headers=headers, auth=auth, params=params or {}, json={})
body = res.json()
if body.get("code") == 0:
data = body.get("data") or {}
kbs = data.get("kbs", [])
# Normalize to the datasets API shape: list of objects with "id"
body["data"] = [{"id": kb["id"], **kb} for kb in kbs]
return body
res = requests.get(
url=f"{HOST_ADDRESS}{DATASETS_API_URL}",
headers=headers,
auth=auth,
params=params,
)
return res.json()
def update_dataset(auth, dataset_id, payload=None, *, headers=HEADERS, data=None):
res = requests.put(url=f"{HOST_ADDRESS}{DATASETS_API_URL}/{dataset_id}", headers=headers, auth=auth, json=payload, data=data)
"""
Update dataset.
- Web JWT auth: call `/v1/kb/update` with `kb_id` and normalize response.
- HTTP API token auth: use `/api/{version}/datasets/{id}`.
"""
if isinstance(auth, RAGFlowWebApiAuth):
url = f"{HOST_ADDRESS}/{VERSION}/kb/update"
# KB update expects "kb_id" instead of "id"
kb_payload = dict(payload or {})
kb_payload["kb_id"] = dataset_id
res = requests.post(url=url, headers=headers, auth=auth, json=kb_payload, data=data)
body = res.json()
if body.get("code") == 0 and isinstance(body.get("data"), dict):
kb = body["data"]
# Ensure an "id" field is present for tests
if "id" not in kb and "kb_id" in kb:
kb["id"] = kb["kb_id"]
body["data"] = kb
return body
res = requests.put(
url=f"{HOST_ADDRESS}{DATASETS_API_URL}/{dataset_id}",
headers=headers,
auth=auth,
json=payload,
data=data,
)
return res.json()
def delete_datasets(auth, payload=None, *, headers=HEADERS, data=None):
res = requests.delete(url=f"{HOST_ADDRESS}{DATASETS_API_URL}", headers=headers, auth=auth, json=payload, data=data)
"""
Delete datasets.
- Web JWT auth: call `/v1/kb/rm` with single `kb_id` when exactly one id is provided,
and treat other cases as unsupported for this test suite.
- HTTP API token auth: original RESTful batch delete.
"""
if isinstance(auth, RAGFlowWebApiAuth):
ids = (payload or {}).get("ids")
# For the permission tests we only ever delete a single dataset by id.
if not ids or len(ids) != 1:
return {
"code": 101,
"message": "Only single-id delete is supported via web API helper",
"data": False,
}
kb_id = ids[0]
url = f"{HOST_ADDRESS}/{VERSION}/kb/rm"
res = requests.post(
url=url,
headers=headers,
auth=auth,
json={"kb_id": kb_id},
data=data,
)
body = res.json()
# KB rm returns data=True/False; keep that shape but keep code semantics.
return body
res = requests.delete(
url=f"{HOST_ADDRESS}{DATASETS_API_URL}",
headers=headers,
auth=auth,
json=payload,
data=data,
)
return res.json()
@ -592,6 +704,64 @@ def promote_admin(
return res.json()
def get_user_permissions(
auth: Union[AuthBase, str, None],
tenant_id: str,
user_id: str,
*,
headers: Dict[str, str] = HEADERS,
) -> Dict[str, Any]:
"""Get CRUD permissions for a team member.
Args:
auth: Authentication object (AuthBase subclass), token string, or None.
tenant_id: The team ID.
user_id: The user ID to get permissions for.
headers: Optional HTTP headers. Defaults to HEADERS.
Returns:
JSON response as a dictionary containing permissions.
Raises:
requests.RequestException: If the HTTP request fails.
"""
url: str = f"{HOST_ADDRESS}{TEAM_API_URL}/{tenant_id}/users/{user_id}/permissions"
res: requests.Response = requests.get(
url=url, headers=headers, auth=auth
)
return res.json()
def update_user_permissions(
auth: Union[AuthBase, str, None],
tenant_id: str,
user_id: str,
payload: Optional[Dict[str, Any]] = None,
*,
headers: Dict[str, str] = HEADERS,
) -> Dict[str, Any]:
"""Update CRUD permissions for a team member.
Args:
auth: Authentication object (AuthBase subclass), token string, or None.
tenant_id: The team ID.
user_id: The user ID to update permissions for.
payload: JSON payload containing permissions to update.
headers: Optional HTTP headers. Defaults to HEADERS.
Returns:
JSON response as a dictionary containing updated permissions.
Raises:
requests.RequestException: If the HTTP request fails.
"""
url: str = f"{HOST_ADDRESS}{TEAM_API_URL}/{tenant_id}/users/{user_id}/permissions"
res: requests.Response = requests.put(
url=url, headers=headers, auth=auth, json=payload
)
return res.json()
def demote_admin(
auth: Union[AuthBase, str, None],
tenant_id: str,
@ -620,3 +790,212 @@ def demote_admin(
return res.json()
# CANVAS MANAGEMENT
def create_canvas(
auth: Union[AuthBase, str, None],
payload: Optional[Dict[str, Any]] = None,
*,
headers: Dict[str, str] = HEADERS,
) -> Dict[str, Any]:
"""Create or update a canvas.
Args:
auth: Authentication object (AuthBase subclass), token string, or None.
payload: JSON payload containing canvas data (dsl, title, permission, etc.).
headers: Optional HTTP headers. Defaults to HEADERS.
Returns:
JSON response as a dictionary containing the canvas data.
Raises:
requests.RequestException: If the HTTP request fails.
"""
url: str = f"{HOST_ADDRESS}{CANVAS_API_URL}/set"
res: requests.Response = requests.post(
url=url, headers=headers, auth=auth, json=payload
)
return res.json()
def get_canvas(
auth: Union[AuthBase, str, None],
canvas_id: str,
*,
headers: Dict[str, str] = HEADERS,
) -> Dict[str, Any]:
"""Get a canvas by ID.
Args:
auth: Authentication object (AuthBase subclass), token string, or None.
canvas_id: The canvas ID to retrieve.
headers: Optional HTTP headers. Defaults to HEADERS.
Returns:
JSON response as a dictionary containing the canvas data.
Raises:
requests.RequestException: If the HTTP request fails.
"""
url: str = f"{HOST_ADDRESS}{CANVAS_API_URL}/get/{canvas_id}"
res: requests.Response = requests.get(
url=url, headers=headers, auth=auth
)
return res.json()
def list_canvases(
auth: Union[AuthBase, str, None],
params: Optional[Dict[str, Any]] = None,
*,
headers: Dict[str, str] = HEADERS,
) -> Dict[str, Any]:
"""List canvases.
Args:
auth: Authentication object (AuthBase subclass), token string, or None.
params: Optional query parameters (page_number, page_size, keywords, etc.).
headers: Optional HTTP headers. Defaults to HEADERS.
Returns:
JSON response as a dictionary containing canvas list and total count.
Raises:
requests.RequestException: If the HTTP request fails.
"""
url: str = f"{HOST_ADDRESS}{CANVAS_API_URL}/list"
res: requests.Response = requests.get(
url=url, headers=headers, auth=auth, params=params
)
return res.json()
def delete_canvas(
auth: Union[AuthBase, str, None],
payload: Optional[Dict[str, Any]] = None,
*,
headers: Dict[str, str] = HEADERS,
) -> Dict[str, Any]:
"""Delete one or more canvases.
Args:
auth: Authentication object (AuthBase subclass), token string, or None.
payload: JSON payload containing canvas_ids list.
headers: Optional HTTP headers. Defaults to HEADERS.
Returns:
JSON response as a dictionary containing deletion result.
Raises:
requests.RequestException: If the HTTP request fails.
"""
url: str = f"{HOST_ADDRESS}{CANVAS_API_URL}/rm"
res: requests.Response = requests.post(
url=url, headers=headers, auth=auth, json=payload
)
return res.json()
def update_canvas_setting(
auth: Union[AuthBase, str, None],
payload: Optional[Dict[str, Any]] = None,
*,
headers: Dict[str, str] = HEADERS,
) -> Dict[str, Any]:
"""Update canvas settings (title, permission, etc.).
Args:
auth: Authentication object (AuthBase subclass), token string, or None.
payload: JSON payload containing canvas settings (id, title, permission, etc.).
headers: Optional HTTP headers. Defaults to HEADERS.
Returns:
JSON response as a dictionary containing updated canvas data.
Raises:
requests.RequestException: If the HTTP request fails.
"""
url: str = f"{HOST_ADDRESS}{CANVAS_API_URL}/setting"
res: requests.Response = requests.post(
url=url, headers=headers, auth=auth, json=payload
)
return res.json()
def reset_canvas(
auth: Union[AuthBase, str, None],
payload: Optional[Dict[str, Any]] = None,
*,
headers: Dict[str, str] = HEADERS,
) -> Dict[str, Any]:
"""Reset a canvas to a previous version.
Args:
auth: Authentication object (AuthBase subclass), token string, or None.
payload: JSON payload containing canvas id.
headers: Optional HTTP headers. Defaults to HEADERS.
Returns:
JSON response as a dictionary containing reset result.
Raises:
requests.RequestException: If the HTTP request fails.
"""
url: str = f"{HOST_ADDRESS}{CANVAS_API_URL}/reset"
res: requests.Response = requests.post(
url=url, headers=headers, auth=auth, json=payload
)
return res.json()
def run_canvas(
auth: Union[AuthBase, str, None],
payload: Optional[Dict[str, Any]] = None,
*,
headers: Dict[str, str] = HEADERS,
) -> Dict[str, Any]:
"""Run a canvas (completion).
Args:
auth: Authentication object (AuthBase subclass), token string, or None.
payload: JSON payload containing canvas id, query, files, inputs, etc.
headers: Optional HTTP headers. Defaults to HEADERS.
Returns:
JSON response as a dictionary containing run result.
Raises:
requests.RequestException: If the HTTP request fails.
"""
url: str = f"{HOST_ADDRESS}{CANVAS_API_URL}/completion"
res: requests.Response = requests.post(
url=url, headers=headers, auth=auth, json=payload
)
return res.json()
def debug_canvas(
auth: Union[AuthBase, str, None],
payload: Optional[Dict[str, Any]] = None,
*,
headers: Dict[str, str] = HEADERS,
) -> Dict[str, Any]:
"""Debug a canvas component.
Args:
auth: Authentication object (AuthBase subclass), token string, or None.
payload: JSON payload containing canvas id, component_id, params.
headers: Optional HTTP headers. Defaults to HEADERS.
Returns:
JSON response as a dictionary containing debug result.
Raises:
requests.RequestException: If the HTTP request fails.
"""
url: str = f"{HOST_ADDRESS}{CANVAS_API_URL}/debug"
res: requests.Response = requests.post(
url=url, headers=headers, auth=auth, json=payload
)
return res.json()

View file

@ -0,0 +1,505 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations
import json
import time
import uuid
from typing import Any
import pytest
from common import (
accept_team_invitation,
add_users_to_team,
create_canvas,
create_team,
create_user,
debug_canvas,
delete_canvas,
encrypt_password,
get_canvas,
get_user_permissions,
list_canvases,
login_as_user,
reset_canvas,
run_canvas,
update_canvas_setting,
update_user_permissions,
)
from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowWebApiAuth
# ---------------------------------------------------------------------------
# Test Classes
# ---------------------------------------------------------------------------
@pytest.mark.p1
class TestCanvasPermissions:
"""Comprehensive tests for canvas permissions with CRUD operations."""
@pytest.fixture
def test_team(self, web_api_auth: RAGFlowWebApiAuth) -> dict[str, Any]:
"""Create a test team for use in tests."""
team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"}
res: dict[str, Any] = create_team(web_api_auth, team_payload)
assert res["code"] == 0
return res["data"]
@pytest.fixture
def team_with_user(
self,
web_api_auth: RAGFlowWebApiAuth,
test_team: dict[str, Any],
) -> dict[str, Any]:
"""Create a team with a user who has accepted the invitation."""
tenant_id: str = test_team["id"]
# Create user
email = f"testuser_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
encrypted_password = encrypt_password(password)
user_payload: dict[str, str] = {
"email": email,
"password": encrypted_password,
"nickname": "Test User",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
assert user_res["code"] == 0
user_id: str = user_res["data"]["id"]
# Add user to team
add_payload: dict[str, list[str]] = {"users": [email]}
add_res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload)
assert add_res["code"] == 0
# Small delay
time.sleep(0.5)
# Accept invitation as the user
user_auth: RAGFlowWebApiAuth = login_as_user(email, password)
accept_res: dict[str, Any] = accept_team_invitation(user_auth, tenant_id)
assert accept_res["code"] == 0
return {
"team": test_team,
"user": {"id": user_id, "email": email, "password": password},
}
@pytest.fixture
def team_canvas(
self,
web_api_auth: RAGFlowWebApiAuth,
test_team: dict[str, Any],
) -> dict[str, Any]:
"""Create a canvas shared with team."""
# Simple canvas DSL
canvas_dsl = {
"nodes": [
{
"id": "start",
"type": "start",
"position": {"x": 100, "y": 100},
}
],
"edges": [],
}
canvas_payload: dict[str, Any] = {
"title": f"Test Canvas {uuid.uuid4().hex[:8]}",
"dsl": json.dumps(canvas_dsl),
"permission": "team",
"shared_tenant_id": test_team["id"],
"canvas_category": "Agent",
}
res: dict[str, Any] = create_canvas(web_api_auth, canvas_payload)
assert res["code"] == 0
return res["data"]
@pytest.mark.p1
def test_read_permission_allows_get_canvas(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
team_canvas: dict[str, Any],
) -> None:
"""Test that user with read permission can get canvas."""
canvas_id: str = team_canvas["id"]
user_email: str = team_with_user["user"]["email"]
user_password: str = team_with_user["user"]["password"]
tenant_id: str = team_with_user["team"]["id"]
# User should have read permission by default
user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password)
res: dict[str, Any] = get_canvas(user_auth, canvas_id)
assert res["code"] == 0, res
assert "data" in res
assert res["data"]["id"] == canvas_id
@pytest.mark.p1
def test_no_read_permission_denies_get_canvas(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
team_canvas: dict[str, Any],
) -> None:
"""Test that user without read permission cannot get canvas."""
canvas_id: str = team_canvas["id"]
user_id: str = team_with_user["user"]["id"]
user_email: str = team_with_user["user"]["email"]
user_password: str = team_with_user["user"]["password"]
tenant_id: str = team_with_user["team"]["id"]
# Revoke read permission
update_payload: dict[str, Any] = {
"permissions": {
"canvas": {"read": False},
}
}
update_res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload)
assert update_res["code"] == 0
# User should not be able to get canvas
user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password)
res: dict[str, Any] = get_canvas(user_auth, canvas_id)
assert res["code"] != 0
assert "permission" in res["message"].lower() or "read" in res["message"].lower()
@pytest.mark.p1
def test_update_permission_allows_update_canvas(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
team_canvas: dict[str, Any],
) -> None:
"""Test that user with update permission can update canvas."""
canvas_id: str = team_canvas["id"]
user_id: str = team_with_user["user"]["id"]
user_email: str = team_with_user["user"]["email"]
user_password: str = team_with_user["user"]["password"]
tenant_id: str = team_with_user["team"]["id"]
# Grant update permission
update_payload: dict[str, Any] = {
"permissions": {
"canvas": {"update": True},
}
}
update_res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload)
assert update_res["code"] == 0
# User should be able to update canvas settings
user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password)
setting_payload: dict[str, Any] = {
"id": canvas_id,
"title": "Updated Title",
"permission": "team",
}
res: dict[str, Any] = update_canvas_setting(user_auth, setting_payload)
assert res["code"] == 0, res
@pytest.mark.p1
def test_no_update_permission_denies_update_canvas(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
team_canvas: dict[str, Any],
) -> None:
"""Test that user without update permission cannot update canvas."""
canvas_id: str = team_canvas["id"]
user_id: str = team_with_user["user"]["id"]
user_email: str = team_with_user["user"]["email"]
user_password: str = team_with_user["user"]["password"]
tenant_id: str = team_with_user["team"]["id"]
# Ensure update permission is False (default)
permissions_res: dict[str, Any] = get_user_permissions(web_api_auth, tenant_id, user_id)
assert permissions_res["code"] == 0
assert permissions_res["data"]["canvas"]["update"] is False
# User should not be able to update canvas
user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password)
setting_payload: dict[str, Any] = {
"id": canvas_id,
"title": "Updated Title",
"permission": "team",
}
res: dict[str, Any] = update_canvas_setting(user_auth, setting_payload)
assert res["code"] != 0
assert "permission" in res["message"].lower() or "update" in res["message"].lower()
@pytest.mark.p1
def test_delete_permission_allows_delete_canvas(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
team_canvas: dict[str, Any],
) -> None:
"""Test that user with delete permission can delete canvas."""
# Create a new canvas for deletion
canvas_dsl = {
"nodes": [{"id": "start", "type": "start", "position": {"x": 100, "y": 100}}],
"edges": [],
}
tenant_id: str = team_with_user["team"]["id"]
canvas_payload: dict[str, Any] = {
"title": f"Test Canvas Delete {uuid.uuid4().hex[:8]}",
"dsl": json.dumps(canvas_dsl),
"permission": "team",
"shared_tenant_id": tenant_id,
"canvas_category": "Agent",
}
create_res: dict[str, Any] = create_canvas(web_api_auth, canvas_payload)
assert create_res["code"] == 0
canvas_id: str = create_res["data"]["id"]
user_id: str = team_with_user["user"]["id"]
user_email: str = team_with_user["user"]["email"]
user_password: str = team_with_user["user"]["password"]
tenant_id = team_with_user["team"]["id"]
# Grant delete permission
update_payload: dict[str, Any] = {
"permissions": {
"canvas": {"delete": True},
}
}
update_res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload)
assert update_res["code"] == 0
# User should be able to delete canvas
user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password)
delete_payload: dict[str, Any] = {"canvas_ids": [canvas_id]}
res: dict[str, Any] = delete_canvas(user_auth, delete_payload)
assert res["code"] == 0, res
@pytest.mark.p1
def test_no_delete_permission_denies_delete_canvas(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
team_canvas: dict[str, Any],
) -> None:
"""Test that user without delete permission cannot delete canvas."""
canvas_id: str = team_canvas["id"]
user_id: str = team_with_user["user"]["id"]
user_email: str = team_with_user["user"]["email"]
user_password: str = team_with_user["user"]["password"]
tenant_id: str = team_with_user["team"]["id"]
# Ensure delete permission is False (default)
permissions_res: dict[str, Any] = get_user_permissions(web_api_auth, tenant_id, user_id)
assert permissions_res["code"] == 0
assert permissions_res["data"]["canvas"]["delete"] is False
# User should not be able to delete canvas
user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password)
delete_payload: dict[str, Any] = {"canvas_ids": [canvas_id]}
res: dict[str, Any] = delete_canvas(user_auth, delete_payload)
assert res["code"] != 0
assert "permission" in res["message"].lower() or "delete" in res["message"].lower()
@pytest.mark.p1
def test_read_permission_allows_run_canvas(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
team_canvas: dict[str, Any],
) -> None:
"""Test that user with read permission can run canvas."""
canvas_id: str = team_canvas["id"]
user_email: str = team_with_user["user"]["email"]
user_password: str = team_with_user["user"]["password"]
# User should have read permission by default (allows running)
user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password)
run_payload: dict[str, Any] = {
"id": canvas_id,
"query": "test query",
}
# Note: This might fail for other reasons (missing components, etc.)
# but should not fail due to permission
res: dict[str, Any] = run_canvas(user_auth, run_payload)
# Permission check should pass (code != PERMISSION_ERROR)
if res["code"] != 0:
assert "permission" not in res["message"].lower() or "read" not in res["message"].lower()
@pytest.mark.p1
def test_read_permission_allows_debug_canvas(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
team_canvas: dict[str, Any],
) -> None:
"""Test that user with read permission can debug canvas."""
canvas_id: str = team_canvas["id"]
user_email: str = team_with_user["user"]["email"]
user_password: str = team_with_user["user"]["password"]
# User should have read permission by default (allows debugging)
user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password)
debug_payload: dict[str, Any] = {
"id": canvas_id,
"component_id": "start",
"params": {},
}
# Note: This might fail for other reasons (missing components, etc.)
# but should not fail due to permission
res: dict[str, Any] = debug_canvas(user_auth, debug_payload)
# Permission check should pass (code != PERMISSION_ERROR)
if res["code"] != 0:
assert "permission" not in res["message"].lower() or "read" not in res["message"].lower()
@pytest.mark.p1
def test_update_permission_allows_reset_canvas(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
team_canvas: dict[str, Any],
) -> None:
"""Test that user with update permission can reset canvas."""
canvas_id: str = team_canvas["id"]
user_id: str = team_with_user["user"]["id"]
user_email: str = team_with_user["user"]["email"]
user_password: str = team_with_user["user"]["password"]
tenant_id: str = team_with_user["team"]["id"]
# Grant update permission
update_payload: dict[str, Any] = {
"permissions": {
"canvas": {"update": True},
}
}
update_res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload)
assert update_res["code"] == 0
# User should be able to reset canvas
user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password)
reset_payload: dict[str, Any] = {"id": canvas_id}
# Note: This might fail if there are no versions, but should not fail due to permission
res: dict[str, Any] = reset_canvas(user_auth, reset_payload)
# Permission check should pass (code != PERMISSION_ERROR)
if res["code"] != 0:
assert "permission" not in res["message"].lower() or "update" not in res["message"].lower()
@pytest.mark.p1
def test_create_permission_allows_create_team_canvas(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
) -> None:
"""Test that user with create permission can create team canvas."""
user_id: str = team_with_user["user"]["id"]
user_email: str = team_with_user["user"]["email"]
user_password: str = team_with_user["user"]["password"]
tenant_id: str = team_with_user["team"]["id"]
# Grant create permission
update_payload: dict[str, Any] = {
"permissions": {
"canvas": {"create": True},
}
}
update_res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload)
assert update_res["code"] == 0
# User should be able to create team canvas
user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password)
canvas_dsl = {
"nodes": [{"id": "start", "type": "start", "position": {"x": 100, "y": 100}}],
"edges": [],
}
canvas_payload: dict[str, Any] = {
"title": f"User Created Canvas {uuid.uuid4().hex[:8]}",
"dsl": json.dumps(canvas_dsl),
"permission": "team",
"shared_tenant_id": tenant_id,
"canvas_category": "Agent",
}
res: dict[str, Any] = create_canvas(user_auth, canvas_payload)
assert res["code"] == 0, res
@pytest.mark.p1
def test_no_create_permission_denies_create_team_canvas(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
) -> None:
"""Test that user without create permission cannot create team canvas."""
user_id: str = team_with_user["user"]["id"]
user_email: str = team_with_user["user"]["email"]
user_password: str = team_with_user["user"]["password"]
tenant_id: str = team_with_user["team"]["id"]
# Ensure create permission is False (default)
permissions_res: dict[str, Any] = get_user_permissions(web_api_auth, tenant_id, user_id)
assert permissions_res["code"] == 0
assert permissions_res["data"]["canvas"]["create"] is False
# User should not be able to create team canvas
user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password)
canvas_dsl = {
"nodes": [{"id": "start", "type": "start", "position": {"x": 100, "y": 100}}],
"edges": [],
}
canvas_payload: dict[str, Any] = {
"title": f"User Created Canvas {uuid.uuid4().hex[:8]}",
"dsl": json.dumps(canvas_dsl),
"permission": "team",
"shared_tenant_id": tenant_id,
"canvas_category": "Agent",
}
res: dict[str, Any] = create_canvas(user_auth, canvas_payload)
assert res["code"] != 0
assert "permission" in res["message"].lower() or "create" in res["message"].lower()
@pytest.mark.p1
def test_owner_always_has_full_permissions(
self,
web_api_auth: RAGFlowWebApiAuth,
test_team: dict[str, Any],
team_canvas: dict[str, Any],
) -> None:
"""Test that team owner always has full permissions regardless of settings."""
canvas_id: str = team_canvas["id"]
owner_id: str = test_team["owner_id"]
tenant_id: str = test_team["id"]
# Owner should have full permissions
permissions_res: dict[str, Any] = get_user_permissions(web_api_auth, tenant_id, owner_id)
assert permissions_res["code"] == 0
permissions: dict[str, Any] = permissions_res["data"]
assert permissions["canvas"]["create"] is True
assert permissions["canvas"]["read"] is True
assert permissions["canvas"]["update"] is True
assert permissions["canvas"]["delete"] is True
# Owner should be able to perform all operations
setting_payload: dict[str, Any] = {
"id": canvas_id,
"title": "Owner Updated Title",
"permission": "team",
}
res: dict[str, Any] = update_canvas_setting(web_api_auth, setting_payload)
assert res["code"] == 0, res

View file

@ -0,0 +1,458 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations
import time
import uuid
from typing import Any
import pytest
import requests
from common import (
accept_team_invitation,
add_users_to_team,
create_dataset,
create_team,
create_user,
delete_datasets,
encrypt_password,
get_user_permissions,
list_datasets,
login_as_user,
update_dataset,
update_user_permissions,
)
from configs import HOST_ADDRESS, INVALID_API_TOKEN, VERSION
from libs.auth import RAGFlowWebApiAuth
def get_dataset_detail(auth: RAGFlowWebApiAuth, kb_id: str) -> dict[str, Any]:
"""Get dataset details by ID.
Args:
auth: Authentication object.
kb_id: Knowledge base (dataset) ID.
Returns:
JSON response as a dictionary containing the dataset data.
"""
# Use the web KB detail endpoint, which is JWT-authenticated and
# enforces dataset/KB read permissions consistently with canvas tests.
url: str = f"{HOST_ADDRESS}/{VERSION}/kb/detail"
res: requests.Response = requests.get(
url=url, headers={"Content-Type": "application/json"}, auth=auth, params={"kb_id": kb_id}
)
return res.json()
# ---------------------------------------------------------------------------
# Test Classes
# ---------------------------------------------------------------------------
@pytest.mark.p1
class TestDatasetPermissions:
"""Comprehensive tests for dataset permissions with CRUD operations."""
@pytest.fixture
def test_team(self, web_api_auth: RAGFlowWebApiAuth) -> dict[str, Any]:
"""Create a test team for use in tests."""
team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"}
res: dict[str, Any] = create_team(web_api_auth, team_payload)
assert res["code"] == 0
return res["data"]
@pytest.fixture
def team_with_user(
self,
web_api_auth: RAGFlowWebApiAuth,
test_team: dict[str, Any],
) -> dict[str, Any]:
"""Create a team with a user who has accepted the invitation."""
tenant_id: str = test_team["id"]
# Create user
email = f"testuser_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
encrypted_password = encrypt_password(password)
user_payload: dict[str, str] = {
"email": email,
"password": encrypted_password,
"nickname": "Test User",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
assert user_res["code"] == 0
user_id: str = user_res["data"]["id"]
# Add user to team
add_payload: dict[str, list[str]] = {"users": [email]}
add_res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload)
assert add_res["code"] == 0
# Small delay
time.sleep(0.5)
# Accept invitation as the user
user_auth: RAGFlowWebApiAuth = login_as_user(email, password)
accept_res: dict[str, Any] = accept_team_invitation(user_auth, tenant_id)
assert accept_res["code"] == 0
return {
"team": test_team,
"user": {"id": user_id, "email": email, "password": password},
}
@pytest.fixture
def team_dataset(
self,
web_api_auth: RAGFlowWebApiAuth,
test_team: dict[str, Any],
) -> dict[str, Any]:
"""Create a dataset shared with team."""
dataset_payload: dict[str, Any] = {
"name": f"Test Dataset {uuid.uuid4().hex[:8]}",
"permission": "team",
"shared_tenant_id": test_team["id"],
}
res: dict[str, Any] = create_dataset(web_api_auth, dataset_payload)
assert res["code"] == 0
return res["data"]
@pytest.mark.p1
def test_read_permission_allows_get_dataset(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
team_dataset: dict[str, Any],
) -> None:
"""Test that user with read permission can get dataset."""
dataset_id: str = team_dataset["id"]
user_email: str = team_with_user["user"]["email"]
user_password: str = team_with_user["user"]["password"]
# User should have read permission by default
user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password)
res: dict[str, Any] = get_dataset_detail(user_auth, dataset_id)
assert res["code"] == 0, res
assert "data" in res
assert res["data"]["id"] == dataset_id
@pytest.mark.p1
def test_no_read_permission_denies_get_dataset(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
team_dataset: dict[str, Any],
) -> None:
"""Test that user without read permission cannot get dataset."""
dataset_id: str = team_dataset["id"]
user_id: str = team_with_user["user"]["id"]
user_email: str = team_with_user["user"]["email"]
user_password: str = team_with_user["user"]["password"]
tenant_id: str = team_with_user["team"]["id"]
# Revoke read permission
update_payload: dict[str, Any] = {
"permissions": {
"dataset": {"read": False},
}
}
update_res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload)
assert update_res["code"] == 0
# User should not be able to get dataset
user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password)
res: dict[str, Any] = get_dataset_detail(user_auth, dataset_id)
assert res["code"] != 0
assert "permission" in res["message"].lower() or "read" in res["message"].lower() or "authorized" in res["message"].lower()
@pytest.mark.p1
def test_update_permission_allows_update_dataset(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
team_dataset: dict[str, Any],
) -> None:
"""Test that user with update permission can update dataset."""
dataset_id: str = team_dataset["id"]
user_id: str = team_with_user["user"]["id"]
user_email: str = team_with_user["user"]["email"]
user_password: str = team_with_user["user"]["password"]
tenant_id: str = team_with_user["team"]["id"]
# Grant update permission
update_payload: dict[str, Any] = {
"permissions": {
"dataset": {"update": True},
}
}
update_res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload)
assert update_res["code"] == 0
# User should be able to update dataset
user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password)
update_dataset_payload: dict[str, Any] = {
"name": f"Updated Dataset {uuid.uuid4().hex[:8]}",
"description": "Updated description",
"parser_id": team_dataset.get("parser_id", ""),
}
res: dict[str, Any] = update_dataset(user_auth, dataset_id, update_dataset_payload)
assert res["code"] == 0, res
@pytest.mark.p1
def test_no_update_permission_denies_update_dataset(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
team_dataset: dict[str, Any],
) -> None:
"""Test that user without update permission cannot update dataset."""
dataset_id: str = team_dataset["id"]
user_id: str = team_with_user["user"]["id"]
user_email: str = team_with_user["user"]["email"]
user_password: str = team_with_user["user"]["password"]
tenant_id: str = team_with_user["team"]["id"]
# Ensure update permission is False (default)
permissions_res: dict[str, Any] = get_user_permissions(web_api_auth, tenant_id, user_id)
assert permissions_res["code"] == 0
assert permissions_res["data"]["dataset"]["update"] is False
# User should not be able to update dataset
user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password)
update_dataset_payload: dict[str, Any] = {
"name": f"Updated Dataset {uuid.uuid4().hex[:8]}",
"description": "Updated description",
"parser_id": team_dataset.get("parser_id", ""),
}
res: dict[str, Any] = update_dataset(user_auth, dataset_id, update_dataset_payload)
assert res["code"] != 0
assert "permission" in res["message"].lower() or "update" in res["message"].lower()
@pytest.mark.p1
def test_delete_permission_allows_delete_dataset(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
team_dataset: dict[str, Any],
) -> None:
"""Test that user with delete permission can delete dataset."""
# Create a new dataset for deletion
tenant_id: str = team_with_user["team"]["id"]
dataset_payload: dict[str, Any] = {
"name": f"Test Dataset Delete {uuid.uuid4().hex[:8]}",
"permission": "team",
"shared_tenant_id": tenant_id,
}
create_res: dict[str, Any] = create_dataset(web_api_auth, dataset_payload)
assert create_res["code"] == 0
dataset_id: str = create_res["data"]["id"]
user_id: str = team_with_user["user"]["id"]
user_email: str = team_with_user["user"]["email"]
user_password: str = team_with_user["user"]["password"]
# Grant delete permission
update_payload: dict[str, Any] = {
"permissions": {
"dataset": {"delete": True},
}
}
update_res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload)
assert update_res["code"] == 0
# User should be able to delete dataset
user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password)
delete_payload: dict[str, Any] = {"ids": [dataset_id]}
res: dict[str, Any] = delete_datasets(user_auth, delete_payload)
assert res["code"] == 0, res
@pytest.mark.p1
def test_no_delete_permission_denies_delete_dataset(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
team_dataset: dict[str, Any],
) -> None:
"""Test that user without delete permission cannot delete dataset."""
dataset_id: str = team_dataset["id"]
user_id: str = team_with_user["user"]["id"]
user_email: str = team_with_user["user"]["email"]
user_password: str = team_with_user["user"]["password"]
tenant_id: str = team_with_user["team"]["id"]
# Ensure delete permission is False (default)
permissions_res: dict[str, Any] = get_user_permissions(web_api_auth, tenant_id, user_id)
assert permissions_res["code"] == 0
assert permissions_res["data"]["dataset"]["delete"] is False
# User should not be able to delete dataset
user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password)
delete_payload: dict[str, Any] = {"ids": [dataset_id]}
res: dict[str, Any] = delete_datasets(user_auth, delete_payload)
assert res["code"] != 0
assert "permission" in res["message"].lower() or "delete" in res["message"].lower()
@pytest.mark.p1
def test_read_permission_allows_list_datasets(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
team_dataset: dict[str, Any],
) -> None:
"""Test that user with read permission can list datasets."""
user_email: str = team_with_user["user"]["email"]
user_password: str = team_with_user["user"]["password"]
dataset_id: str = team_dataset["id"]
# User should have read permission by default (allows listing)
user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password)
res: dict[str, Any] = list_datasets(user_auth)
# Permission check should pass (code != PERMISSION_ERROR)
if res["code"] == 0:
# If listing succeeds, check that the dataset is visible
dataset_ids = [ds["id"] for ds in res.get("data", [])]
assert dataset_id in dataset_ids, "Team dataset should be visible to user with read permission"
else:
# If it fails, it should not be due to permission error
assert "permission" not in res["message"].lower() or "read" not in res["message"].lower()
@pytest.mark.p1
def test_no_read_permission_denies_list_datasets(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
team_dataset: dict[str, Any],
) -> None:
"""Test that user without read permission cannot see team datasets in list."""
user_id: str = team_with_user["user"]["id"]
user_email: str = team_with_user["user"]["email"]
user_password: str = team_with_user["user"]["password"]
tenant_id: str = team_with_user["team"]["id"]
dataset_id: str = team_dataset["id"]
# Revoke read permission
update_payload: dict[str, Any] = {
"permissions": {
"dataset": {"read": False},
}
}
update_res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload)
assert update_res["code"] == 0
# User should not see team datasets in list
user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password)
res: dict[str, Any] = list_datasets(user_auth)
# If listing succeeds, the team dataset should not be visible
if res["code"] == 0:
dataset_ids = [ds["id"] for ds in res.get("data", [])]
assert dataset_id not in dataset_ids, "Team dataset should not be visible to user without read permission"
@pytest.mark.p1
def test_create_permission_allows_create_team_dataset(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
) -> None:
"""Test that user with create permission can create team dataset."""
user_id: str = team_with_user["user"]["id"]
user_email: str = team_with_user["user"]["email"]
user_password: str = team_with_user["user"]["password"]
tenant_id: str = team_with_user["team"]["id"]
# Grant create permission
update_payload: dict[str, Any] = {
"permissions": {
"dataset": {"create": True},
}
}
update_res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload)
assert update_res["code"] == 0
# User should be able to create team dataset
user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password)
dataset_payload: dict[str, Any] = {
"name": f"User Created Dataset {uuid.uuid4().hex[:8]}",
"permission": "team",
"shared_tenant_id": tenant_id,
}
res: dict[str, Any] = create_dataset(user_auth, dataset_payload)
assert res["code"] == 0, res
@pytest.mark.p1
def test_no_create_permission_denies_create_team_dataset(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
) -> None:
"""Test that user without create permission cannot create team dataset."""
user_id: str = team_with_user["user"]["id"]
user_email: str = team_with_user["user"]["email"]
user_password: str = team_with_user["user"]["password"]
tenant_id: str = team_with_user["team"]["id"]
# Ensure create permission is False (default)
permissions_res: dict[str, Any] = get_user_permissions(web_api_auth, tenant_id, user_id)
assert permissions_res["code"] == 0
assert permissions_res["data"]["dataset"]["create"] is False
# User should not be able to create team dataset
user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password)
dataset_payload: dict[str, Any] = {
"name": f"User Created Dataset {uuid.uuid4().hex[:8]}",
"permission": "team",
"shared_tenant_id": tenant_id,
}
res: dict[str, Any] = create_dataset(user_auth, dataset_payload)
assert res["code"] != 0
assert "permission" in res["message"].lower() or "create" in res["message"].lower()
@pytest.mark.p1
def test_owner_always_has_full_permissions(
self,
web_api_auth: RAGFlowWebApiAuth,
test_team: dict[str, Any],
team_dataset: dict[str, Any],
) -> None:
"""Test that team owner always has full permissions regardless of settings."""
dataset_id: str = team_dataset["id"]
owner_id: str = test_team["owner_id"]
tenant_id: str = test_team["id"]
# Owner should have full permissions
permissions_res: dict[str, Any] = get_user_permissions(web_api_auth, tenant_id, owner_id)
assert permissions_res["code"] == 0
permissions: dict[str, Any] = permissions_res["data"]
assert permissions["dataset"]["create"] is True
assert permissions["dataset"]["read"] is True
assert permissions["dataset"]["update"] is True
assert permissions["dataset"]["delete"] is True
# Owner should be able to perform all operations
update_payload: dict[str, Any] = {
"name": f"Owner Updated Dataset {uuid.uuid4().hex[:8]}",
"description": "Owner updated description",
"parser_id": team_dataset.get("parser_id", ""),
}
res: dict[str, Any] = update_dataset(web_api_auth, dataset_id, update_payload)
assert res["code"] == 0, res

View file

@ -0,0 +1,586 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations
import time
import uuid
from typing import Any
import pytest
from common import (
accept_team_invitation,
add_users_to_team,
create_team,
create_user,
encrypt_password,
get_user_permissions,
login_as_user,
update_user_permissions,
)
from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowWebApiAuth
# ---------------------------------------------------------------------------
# Test Classes
# ---------------------------------------------------------------------------
@pytest.mark.p1
class TestAuthorization:
"""Tests for authentication behavior when managing user permissions."""
@pytest.mark.parametrize(
("invalid_auth", "expected_code", "expected_message"),
[
(None, 401, "Unauthorized"),
(RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "Unauthorized"),
],
)
def test_get_permissions_invalid_auth(
self,
invalid_auth: RAGFlowWebApiAuth | None,
expected_code: int,
expected_message: str,
web_api_auth: RAGFlowWebApiAuth,
) -> None:
"""Test getting permissions with invalid or missing authentication."""
# Create a team and add a user first
team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"}
team_res: dict[str, Any] = create_team(web_api_auth, team_payload)
if team_res.get("code", -1) != 0:
pytest.skip(f"Team creation failed with code {team_res.get('code')}: {team_res.get('message', 'Unknown error')}. Full response: {team_res}")
tenant_id: str = team_res["data"]["id"]
# Create and add a user
email = f"testuser_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
encrypted_password = encrypt_password(password)
user_payload: dict[str, str] = {
"email": email,
"password": encrypted_password,
"nickname": "Test User",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
if user_res["code"] != 0:
pytest.skip("User creation failed, skipping auth test")
user_id: str = user_res["data"]["id"]
add_payload: dict[str, list[str]] = {"users": [email]}
add_users_to_team(web_api_auth, tenant_id, add_payload)
# Small delay
time.sleep(0.5)
# Accept invitation as the user
user_auth: RAGFlowWebApiAuth = login_as_user(email, password)
accept_team_invitation(user_auth, tenant_id)
# Try to get permissions with invalid auth
res: dict[str, Any] = get_user_permissions(invalid_auth, tenant_id, user_id)
assert res["code"] == expected_code, res
if expected_message:
assert expected_message in res["message"]
@pytest.mark.parametrize(
("invalid_auth", "expected_code", "expected_message"),
[
(None, 401, "Unauthorized"),
(RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "Unauthorized"),
],
)
def test_update_permissions_invalid_auth(
self,
invalid_auth: RAGFlowWebApiAuth | None,
expected_code: int,
expected_message: str,
web_api_auth: RAGFlowWebApiAuth,
) -> None:
"""Test updating permissions with invalid or missing authentication."""
# Create a team and add a user first
team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"}
team_res: dict[str, Any] = create_team(web_api_auth, team_payload)
if team_res.get("code", -1) != 0:
pytest.skip(f"Team creation failed with code {team_res.get('code')}: {team_res.get('message', 'Unknown error')}. Full response: {team_res}")
tenant_id: str = team_res["data"]["id"]
# Create and add a user
email = f"testuser_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
encrypted_password = encrypt_password(password)
user_payload: dict[str, str] = {
"email": email,
"password": encrypted_password,
"nickname": "Test User",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
if user_res["code"] != 0:
pytest.skip("User creation failed, skipping auth test")
user_id: str = user_res["data"]["id"]
add_payload: dict[str, list[str]] = {"users": [email]}
add_users_to_team(web_api_auth, tenant_id, add_payload)
# Small delay
time.sleep(0.5)
# Accept invitation as the user
user_auth: RAGFlowWebApiAuth = login_as_user(email, password)
accept_team_invitation(user_auth, tenant_id)
# Try to update permissions with invalid auth
update_payload: dict[str, Any] = {
"permissions": {
"dataset": {"create": True, "read": True, "update": False, "delete": False},
"canvas": {"create": False, "read": True, "update": False, "delete": False}
}
}
res: dict[str, Any] = update_user_permissions(invalid_auth, tenant_id, user_id, update_payload)
assert res["code"] == expected_code, res
if expected_message:
assert expected_message in res["message"]
@pytest.mark.p1
class TestGetUserPermissions:
"""Comprehensive tests for getting user permissions."""
@pytest.fixture
def test_team(self, web_api_auth: RAGFlowWebApiAuth) -> dict[str, Any]:
"""Create a test team for use in tests."""
team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"}
res: dict[str, Any] = create_team(web_api_auth, team_payload)
assert res["code"] == 0, f"Team creation failed with code {res.get('code')}: {res.get('message', 'Unknown error')}. Full response: {res}"
return res["data"]
@pytest.fixture
def team_with_user(
self,
web_api_auth: RAGFlowWebApiAuth,
test_team: dict[str, Any],
) -> dict[str, Any]:
"""Create a team with a user who has accepted the invitation."""
tenant_id: str = test_team["id"]
# Create user
email = f"testuser_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
encrypted_password = encrypt_password(password)
user_payload: dict[str, str] = {
"email": email,
"password": encrypted_password,
"nickname": "Test User",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
assert user_res["code"] == 0
user_id: str = user_res["data"]["id"]
# Add user to team
add_payload: dict[str, list[str]] = {"users": [email]}
add_res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload)
assert add_res["code"] == 0
# Small delay
time.sleep(0.5)
# Accept invitation as the user
user_auth: RAGFlowWebApiAuth = login_as_user(email, password)
accept_res: dict[str, Any] = accept_team_invitation(user_auth, tenant_id)
assert accept_res["code"] == 0
return {
"team": test_team,
"user": {"id": user_id, "email": email, "password": password},
}
@pytest.mark.p1
def test_get_default_permissions(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
) -> None:
"""Test getting default permissions for a new team member."""
tenant_id: str = team_with_user["team"]["id"]
user_id: str = team_with_user["user"]["id"]
res: dict[str, Any] = get_user_permissions(web_api_auth, tenant_id, user_id)
assert res["code"] == 0, res
assert "data" in res
permissions: dict[str, Any] = res["data"]
# Check structure
assert "dataset" in permissions
assert "canvas" in permissions
# Check default permissions (read-only)
assert permissions["dataset"]["read"] is True
assert permissions["dataset"]["create"] is False
assert permissions["dataset"]["update"] is False
assert permissions["dataset"]["delete"] is False
assert permissions["canvas"]["read"] is True
assert permissions["canvas"]["create"] is False
assert permissions["canvas"]["update"] is False
assert permissions["canvas"]["delete"] is False
@pytest.mark.p1
def test_get_owner_permissions(
self,
web_api_auth: RAGFlowWebApiAuth,
test_team: dict[str, Any],
) -> None:
"""Test getting permissions for team owner (should have full permissions)."""
tenant_id: str = test_team["id"]
owner_id: str = test_team["owner_id"]
res: dict[str, Any] = get_user_permissions(web_api_auth, tenant_id, owner_id)
assert res["code"] == 0, res
assert "data" in res
permissions: dict[str, Any] = res["data"]
# Owner should have full permissions
assert permissions["dataset"]["create"] is True
assert permissions["dataset"]["read"] is True
assert permissions["dataset"]["update"] is True
assert permissions["dataset"]["delete"] is True
assert permissions["canvas"]["create"] is True
assert permissions["canvas"]["read"] is True
assert permissions["canvas"]["update"] is True
assert permissions["canvas"]["delete"] is True
@pytest.mark.p1
def test_get_permissions_user_not_in_team(
self,
web_api_auth: RAGFlowWebApiAuth,
test_team: dict[str, Any],
) -> None:
"""Test getting permissions for a user not in the team."""
tenant_id: str = test_team["id"]
invalid_user_id: str = f"invalid_{uuid.uuid4().hex[:8]}"
res: dict[str, Any] = get_user_permissions(web_api_auth, tenant_id, invalid_user_id)
assert res["code"] != 0
assert "not a member" in res["message"].lower() or res["code"] in [100, 102]
@pytest.mark.p1
def test_get_permissions_not_owner_or_admin(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
) -> None:
"""Test that normal users cannot view permissions."""
tenant_id: str = team_with_user["team"]["id"]
user_id: str = team_with_user["user"]["id"]
user_email: str = team_with_user["user"]["email"]
user_password: str = team_with_user["user"]["password"]
# Login as normal user
user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password)
# Try to get permissions (normal user should not be able to)
res: dict[str, Any] = get_user_permissions(user_auth, tenant_id, user_id)
assert res["code"] == 108 # PERMISSION_ERROR
assert "owner" in res["message"].lower() or "admin" in res["message"].lower()
@pytest.mark.p1
class TestUpdateUserPermissions:
"""Comprehensive tests for updating user permissions."""
@pytest.fixture
def test_team(self, web_api_auth: RAGFlowWebApiAuth) -> dict[str, Any]:
"""Create a test team for use in tests."""
team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"}
res: dict[str, Any] = create_team(web_api_auth, team_payload)
assert res["code"] == 0, f"Team creation failed with code {res.get('code')}: {res.get('message', 'Unknown error')}. Full response: {res}"
return res["data"]
@pytest.fixture
def team_with_user(
self,
web_api_auth: RAGFlowWebApiAuth,
test_team: dict[str, Any],
) -> dict[str, Any]:
"""Create a team with a user who has accepted the invitation."""
tenant_id: str = test_team["id"]
# Create user
email = f"testuser_{uuid.uuid4().hex[:8]}@example.com"
password = "TestPassword123!"
encrypted_password = encrypt_password(password)
user_payload: dict[str, str] = {
"email": email,
"password": encrypted_password,
"nickname": "Test User",
}
user_res: dict[str, Any] = create_user(web_api_auth, user_payload)
assert user_res["code"] == 0
user_id: str = user_res["data"]["id"]
# Add user to team
add_payload: dict[str, list[str]] = {"users": [email]}
add_res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload)
assert add_res["code"] == 0
# Small delay
time.sleep(0.5)
# Accept invitation as the user
user_auth: RAGFlowWebApiAuth = login_as_user(email, password)
accept_res: dict[str, Any] = accept_team_invitation(user_auth, tenant_id)
assert accept_res["code"] == 0
return {
"team": test_team,
"user": {"id": user_id, "email": email, "password": password},
}
@pytest.mark.p1
def test_update_permissions_grant_create(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
) -> None:
"""Test updating permissions to grant create permission."""
tenant_id: str = team_with_user["team"]["id"]
user_id: str = team_with_user["user"]["id"]
# Update permissions to grant create for dataset
update_payload: dict[str, Any] = {
"permissions": {
"dataset": {"create": True},
}
}
res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload)
assert res["code"] == 0, res
assert "data" in res
permissions: dict[str, Any] = res["data"]
# Check that create is now True, but other permissions remain default
assert permissions["dataset"]["create"] is True
assert permissions["dataset"]["read"] is True # Default
assert permissions["dataset"]["update"] is False # Default
assert permissions["dataset"]["delete"] is False # Default
@pytest.mark.p1
def test_update_permissions_grant_all(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
) -> None:
"""Test updating permissions to grant all CRUD permissions."""
tenant_id: str = team_with_user["team"]["id"]
user_id: str = team_with_user["user"]["id"]
# Update permissions to grant all for both dataset and canvas
update_payload: dict[str, Any] = {
"permissions": {
"dataset": {"create": True, "read": True, "update": True, "delete": True},
"canvas": {"create": True, "read": True, "update": True, "delete": True}
}
}
res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload)
assert res["code"] == 0, res
assert "data" in res
permissions: dict[str, Any] = res["data"]
# Verify all permissions are True
for resource_type in ["dataset", "canvas"]:
assert permissions[resource_type]["create"] is True
assert permissions[resource_type]["read"] is True
assert permissions[resource_type]["update"] is True
assert permissions[resource_type]["delete"] is True
@pytest.mark.p1
def test_update_permissions_revoke_read(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
) -> None:
"""Test updating permissions to revoke read permission."""
tenant_id: str = team_with_user["team"]["id"]
user_id: str = team_with_user["user"]["id"]
# First grant all permissions
update_payload: dict[str, Any] = {
"permissions": {
"dataset": {"create": True, "read": True, "update": True, "delete": True},
}
}
res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload)
assert res["code"] == 0
# Then revoke read
revoke_payload: dict[str, Any] = {
"permissions": {
"dataset": {"read": False},
}
}
res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, revoke_payload)
assert res["code"] == 0, res
permissions: dict[str, Any] = res["data"]
# Read should be False, but other permissions should remain
assert permissions["dataset"]["read"] is False
assert permissions["dataset"]["create"] is True
assert permissions["dataset"]["update"] is True
assert permissions["dataset"]["delete"] is True
@pytest.mark.p1
def test_update_permissions_partial_update(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
) -> None:
"""Test that partial permission updates only change specified fields."""
tenant_id: str = team_with_user["team"]["id"]
user_id: str = team_with_user["user"]["id"]
# Update only canvas create permission
update_payload: dict[str, Any] = {
"permissions": {
"canvas": {"create": True},
}
}
res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload)
assert res["code"] == 0, res
permissions: dict[str, Any] = res["data"]
# Canvas create should be True, but dataset should remain default
assert permissions["canvas"]["create"] is True
assert permissions["dataset"]["create"] is False # Still default
@pytest.mark.p1
def test_update_permissions_owner_or_admin(
self,
web_api_auth: RAGFlowWebApiAuth,
test_team: dict[str, Any],
) -> None:
"""Test that owner/admin permissions cannot be updated."""
tenant_id: str = test_team["id"]
owner_id: str = test_team["owner_id"]
# Try to update owner permissions
update_payload: dict[str, Any] = {
"permissions": {
"dataset": {"create": False},
}
}
res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, owner_id, update_payload)
assert res["code"] != 0
assert "owner" in res["message"].lower() or "admin" in res["message"].lower()
@pytest.mark.p1
def test_update_permissions_user_not_in_team(
self,
web_api_auth: RAGFlowWebApiAuth,
test_team: dict[str, Any],
) -> None:
"""Test updating permissions for a user not in the team."""
tenant_id: str = test_team["id"]
invalid_user_id: str = f"invalid_{uuid.uuid4().hex[:8]}"
update_payload: dict[str, Any] = {
"permissions": {
"dataset": {"create": True},
}
}
res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, invalid_user_id, update_payload)
assert res["code"] != 0
assert "not a member" in res["message"].lower() or res["code"] in [100, 102]
@pytest.mark.p1
def test_update_permissions_not_owner_or_admin(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
) -> None:
"""Test that normal users cannot update permissions."""
tenant_id: str = team_with_user["team"]["id"]
user_id: str = team_with_user["user"]["id"]
user_email: str = team_with_user["user"]["email"]
user_password: str = team_with_user["user"]["password"]
# Login as normal user
user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password)
# Try to update permissions (normal user should not be able to)
update_payload: dict[str, Any] = {
"permissions": {
"dataset": {"create": True},
}
}
res: dict[str, Any] = update_user_permissions(user_auth, tenant_id, user_id, update_payload)
assert res["code"] == 108 # PERMISSION_ERROR
assert "owner" in res["message"].lower() or "admin" in res["message"].lower()
@pytest.mark.p1
def test_update_permissions_invalid_resource_type(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
) -> None:
"""Test updating permissions with invalid resource type."""
tenant_id: str = team_with_user["team"]["id"]
user_id: str = team_with_user["user"]["id"]
update_payload: dict[str, Any] = {
"permissions": {
"invalid_resource": {"create": True},
}
}
res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload)
assert res["code"] != 0
assert "invalid" in res["message"].lower() or "resource" in res["message"].lower()
@pytest.mark.p1
def test_update_permissions_invalid_permission_name(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
) -> None:
"""Test updating permissions with invalid permission name."""
tenant_id: str = team_with_user["team"]["id"]
user_id: str = team_with_user["user"]["id"]
update_payload: dict[str, Any] = {
"permissions": {
"dataset": {"invalid_permission": True},
}
}
res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload)
assert res["code"] != 0
assert "invalid" in res["message"].lower() or "permission" in res["message"].lower()
@pytest.mark.p1
def test_update_permissions_missing_permissions_field(
self,
web_api_auth: RAGFlowWebApiAuth,
team_with_user: dict[str, Any],
) -> None:
"""Test updating permissions without permissions field."""
tenant_id: str = team_with_user["team"]["id"]
user_id: str = team_with_user["user"]["id"]
update_payload: dict[str, Any] = {}
res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload)
assert res["code"] != 0
assert "required" in res["message"].lower() or "permissions" in res["message"].lower()

View file

@ -25,7 +25,7 @@ import pytest
from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5
from Cryptodome.PublicKey import RSA
from ..common import create_user
from common import create_user
from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowHttpApiAuth, RAGFlowWebApiAuth
@ -95,8 +95,8 @@ class TestUserCreate:
"email": "",
"password": "test123",
},
103,
"Invalid email address",
101,
"Email is required!",
),
(
{
@ -162,7 +162,7 @@ class TestUserCreate:
("user@", 103, "Invalid email address"),
("user@example", 103, "Invalid email address"),
("user@.com", 103, "Invalid email address"),
("", 103, "Invalid email address"),
("", 101, "Email is required!"),
],
)
def test_email_validation(

2
uv.lock generated
View file

@ -5449,6 +5449,7 @@ dependencies = [
{ name = "ranx" },
{ name = "readability-lxml" },
{ name = "replicate" },
{ name = "reportlab" },
{ name = "requests" },
{ name = "roman-numbers" },
{ name = "ruamel-base" },
@ -5611,6 +5612,7 @@ requires-dist = [
{ name = "ranx", specifier = "==0.3.20" },
{ name = "readability-lxml", specifier = "==0.8.1" },
{ name = "replicate", specifier = "==0.31.0" },
{ name = "reportlab", specifier = ">=4.4.3" },
{ name = "requests", specifier = "==2.32.2" },
{ name = "roman-numbers", specifier = "==1.0.2" },
{ name = "ruamel-base", specifier = "==1.0.0" },