diff --git a/api/apps/__init__.py b/api/apps/__init__.py index a6e33c13b..ea7f559f9 100644 --- a/api/apps/__init__.py +++ b/api/apps/__init__.py @@ -103,11 +103,18 @@ T = TypeVar("T") P = ParamSpec("P") def _load_user(): - jwt = Serializer(secret_key=settings.SECRET_KEY) - authorization = request.headers.get("Authorization") + try: + jwt = Serializer(secret_key=settings.SECRET_KEY) + authorization = request.headers.get("Authorization") + except RuntimeError as e: + # Working outside of request context - no user authenticated + if "request context" in str(e).lower(): + return None + # Re-raise other RuntimeErrors + raise g.user = None if not authorization: - return + return None try: access_token = str(jwt.loads(authorization)) diff --git a/api/apps/canvas_app.py b/api/apps/canvas_app.py index 37d86b7fd..5d055e204 100644 --- a/api/apps/canvas_app.py +++ b/api/apps/canvas_app.py @@ -138,7 +138,11 @@ async def save() -> Any: @login_required def get(canvas_id): if not UserCanvasService.accessible(canvas_id, current_user.id): - return get_data_error_result(message="canvas not found.") + return get_json_result( + data=False, + message='You do not have read permission for this canvas.', + code=RetCode.PERMISSION_ERROR + ) e, c = UserCanvasService.get_by_canvas_id(canvas_id) return get_json_result(data=c) diff --git a/api/apps/kb_app.py b/api/apps/kb_app.py index 30fb7aed8..1c87218a9 100644 --- a/api/apps/kb_app.py +++ b/api/apps/kb_app.py @@ -32,10 +32,11 @@ from api.db.services.task_service import TaskService, GRAPH_RAPTOR_FAKE_DOC_ID from api.db.services.user_service import TenantService, UserTenantService from api.utils.api_utils import get_error_data_result, server_error_response, get_data_error_result, validate_request, not_allowed_parameters, \ request_json -from api.db import VALID_FILE_TYPES +from api.db import VALID_FILE_TYPES, UserTenantRole from api.db.services.knowledgebase_service import KnowledgebaseService from api.db.db_models import File from api.utils.api_utils import get_json_result +from api.common.permission_utils import has_permission from rag.nlp import search from api.constants import DATASET_NAME_LIMIT from rag.utils.redis_conn import REDIS_CONN @@ -53,6 +54,8 @@ async def create() -> Any: # Validate shared_tenant_id if provided shared_tenant_id: Optional[str] = req.get("shared_tenant_id") + user_tenant: Optional[Any] = None + if shared_tenant_id: if req.get("permission") != "team": return get_json_result( @@ -61,14 +64,92 @@ async def create() -> Any: code=RetCode.ARGUMENT_ERROR ) # Verify user is a member of the shared tenant - user_tenant = UserTenantService.filter_by_tenant_and_user_id(shared_tenant_id, current_user.id) - if not user_tenant or user_tenant.status != StatusEnum.VALID.value: + try: + user_tenant = UserTenantService.filter_by_tenant_and_user_id(shared_tenant_id, current_user.id) + if not user_tenant or user_tenant.status != StatusEnum.VALID.value: + return get_json_result( + data=False, + message=f"You are not a member of the selected team", + code=RetCode.PERMISSION_ERROR + ) + except Exception as e: + logging.exception(e) return get_json_result( data=False, - message=f"You are not a member of the selected team", - code=RetCode.PERMISSION_ERROR + message=f"Error verifying team membership: {str(e)}", + code=RetCode.EXCEPTION_ERROR ) + # Check create permission if sharing with team + if req.get("permission") == "team": + target_tenant_id: Optional[str] = shared_tenant_id + + # Auto-resolve team if shared_tenant_id not provided + if not target_tenant_id: + try: + user_tenant_relations = list(UserTenantService.query( + user_id=current_user.id, + status=StatusEnum.VALID.value + )) + + if not user_tenant_relations: + return get_json_result( + data=False, + message="You are not a member of any team. Please specify shared_tenant_id or join a team first.", + code=RetCode.DATA_ERROR + ) + + # Prefer owner teams + owner_teams = [ut for ut in user_tenant_relations if ut.role == UserTenantRole.OWNER] + if owner_teams: + target_tenant_id = owner_teams[0].tenant_id + else: + target_tenant_id = user_tenant_relations[0].tenant_id + + req["shared_tenant_id"] = target_tenant_id + except Exception as e: + logging.exception(e) + return get_json_result( + data=False, + message=f"Error finding team: {str(e)}", + code=RetCode.EXCEPTION_ERROR + ) + + # Ensure user_tenant is set for permission check + if not user_tenant: + try: + user_tenant = UserTenantService.filter_by_tenant_and_user_id(target_tenant_id, current_user.id) + if not user_tenant or user_tenant.status != StatusEnum.VALID.value: + return get_json_result( + data=False, + message=f"You are not a member of the selected team.", + code=RetCode.PERMISSION_ERROR + ) + except Exception as e: + logging.exception(e) + return get_json_result( + data=False, + message=f"Error verifying team membership: {str(e)}", + code=RetCode.EXCEPTION_ERROR + ) + + # Owners and admins bypass permission check + if user_tenant.role not in [UserTenantRole.OWNER, UserTenantRole.ADMIN]: + try: + if not has_permission(target_tenant_id, current_user.id, "dataset", "create"): + return get_json_result( + data=False, + message='You do not have create permission for datasets in this team.', + code=RetCode.PERMISSION_ERROR + ) + except Exception as e: + logging.exception(e) + return get_json_result( + data=False, + message=f"Error checking permissions: {str(e)}", + code=RetCode.EXCEPTION_ERROR + ) + e: bool res: Any e, res = KnowledgebaseService.create_with_name( @@ -162,15 +243,16 @@ async def update(): def detail(): kb_id = request.args["kb_id"] try: - tenants = UserTenantService.query(user_id=current_user.id) - for tenant in tenants: - if KnowledgebaseService.query( - tenant_id=tenant.tenant_id, id=kb_id): - break - else: + # Unified access control: + # - Owners always have access + # - Team members must have appropriate CRUD permissions + # - Non-existent datasets return the same permission-style error as before + if not KnowledgebaseService.accessible(kb_id, current_user.id, required_permission="read"): return get_json_result( - data=False, message='Only owner of knowledgebase authorized for this operation.', - code=RetCode.OPERATING_ERROR) + data=False, + message='Only owner of knowledgebase authorized for this operation.', + code=RetCode.OPERATING_ERROR, + ) kb = KnowledgebaseService.get_detail(kb_id) if not kb: return get_data_error_result( @@ -236,15 +318,13 @@ async def rm(): code=RetCode.PERMISSION_ERROR ) try: - kbs = KnowledgebaseService.query( - created_by=current_user.id, id=req["kb_id"]) - if not kbs: - return get_json_result( - data=False, message='Only owner of knowledgebase authorized for this operation.', - code=RetCode.OPERATING_ERROR) + ok, kb = KnowledgebaseService.get_by_id(req["kb_id"]) + if not ok or kb is None: + return get_data_error_result( + message="Can't find this knowledgebase!") for doc in DocumentService.query(kb_id=req["kb_id"]): - if not DocumentService.remove_document(doc, kbs[0].tenant_id): + if not DocumentService.remove_document(doc, kb.tenant_id): return get_data_error_result( message="Database error (Document removal)!") f2d = File2DocumentService.get_by_document_id(doc.id) @@ -252,15 +332,14 @@ async def rm(): FileService.filter_delete([File.source_type == FileSource.KNOWLEDGEBASE, File.id == f2d[0].file_id]) File2DocumentService.delete_by_document_id(doc.id) FileService.filter_delete( - [File.source_type == FileSource.KNOWLEDGEBASE, File.type == "folder", File.name == kbs[0].name]) + [File.source_type == FileSource.KNOWLEDGEBASE, File.type == "folder", File.name == kb.name]) if not KnowledgebaseService.delete_by_id(req["kb_id"]): return get_data_error_result( message="Database error (Knowledgebase removal)!") - for kb in kbs: - settings.docStoreConn.delete({"kb_id": kb.id}, search.index_name(kb.tenant_id), kb.id) - settings.docStoreConn.deleteIdx(search.index_name(kb.tenant_id), kb.id) - if hasattr(settings.STORAGE_IMPL, 'remove_bucket'): - settings.STORAGE_IMPL.remove_bucket(kb.id) + settings.docStoreConn.delete({"kb_id": kb.id}, search.index_name(kb.tenant_id), kb.id) + settings.docStoreConn.deleteIdx(search.index_name(kb.tenant_id), kb.id) + if hasattr(settings.STORAGE_IMPL, 'remove_bucket'): + settings.STORAGE_IMPL.remove_bucket(kb.id) return get_json_result(data=True) except Exception as e: return server_error_response(e) diff --git a/api/apps/tenant_app.py b/api/apps/tenant_app.py index a05ddda34..f4dbeea6c 100644 --- a/api/apps/tenant_app.py +++ b/api/apps/tenant_app.py @@ -37,7 +37,10 @@ from api.utils.api_utils import ( validate_request, ) from api.utils.web_utils import send_invite_email -from api.common.permission_utils import get_user_permissions, update_user_permissions +from api.common.permission_utils import ( + get_user_permissions as get_member_permissions, + update_user_permissions as update_member_permissions, +) from common import settings from common.constants import RetCode, StatusEnum from common.misc_utils import get_uuid @@ -565,14 +568,23 @@ async def update_team(tenant_id: str) -> Response: code=RetCode.DATA_ERROR ) - req_json = await request.json + try: + req_json = await request.json + except Exception: + # Handle malformed or missing JSON body explicitly + return get_json_result( + data=False, + message="Request body is required!", + code=RetCode.ARGUMENT_ERROR, + ) + if req_json is None: return get_json_result( data=False, message="Request body is required!", code=RetCode.ARGUMENT_ERROR, ) - + req: Dict[str, Any] = req_json # Extract update fields (all optional) @@ -1304,7 +1316,7 @@ def demote_admin(tenant_id: str, user_id: str) -> Response: - ApiKeyAuth: [] parameters: - in: path - name: tenant_iderify they are actually an admin; otherwise return: + name: tenant_id required: true type: string description: Team ID @@ -1510,7 +1522,7 @@ def get_user_permissions(tenant_id: str, user_id: str) -> Response: code=RetCode.DATA_ERROR, ) - permissions: Dict[str, Dict[str, bool]] = get_user_permissions(tenant_id, user_id) + permissions: Dict[str, Dict[str, bool]] = get_member_permissions(tenant_id, user_id) return get_json_result( data=permissions, @@ -1687,7 +1699,7 @@ async def update_user_permissions(tenant_id: str, user_id: str) -> Response: validated_permissions[resource_type][perm_name] = perm_value # Update permissions - success: bool = update_user_permissions(tenant_id, user_id, validated_permissions) + success: bool = update_member_permissions(tenant_id, user_id, validated_permissions) if not success: return get_json_result( @@ -1697,7 +1709,7 @@ async def update_user_permissions(tenant_id: str, user_id: str) -> Response: ) # Get updated permissions for response - updated_permissions: Dict[str, Dict[str, bool]] = get_user_permissions(tenant_id, user_id) + updated_permissions: Dict[str, Dict[str, bool]] = get_member_permissions(tenant_id, user_id) return get_json_result( data=updated_permissions, diff --git a/api/apps/user_app.py b/api/apps/user_app.py index c3d87e953..ce83c3f47 100644 --- a/api/apps/user_app.py +++ b/api/apps/user_app.py @@ -794,7 +794,7 @@ async def user_add(): @manager.route("/create", methods=["POST"]) # noqa: F821 @login_required @validate_request("nickname", "email", "password") -def create_user() -> Response: +async def create_user() -> Response: """ Create a new user. @@ -848,14 +848,15 @@ def create_user() -> Response: schema: type: object """ - if request.json is None: + req_json = await request.json + if req_json is None: return get_json_result( data=False, message="Request body is required!", code=RetCode.ARGUMENT_ERROR, ) - req: Dict[str, Any] = request.json + req: Dict[str, Any] = req_json email_address: str = str(req.get("email", "")) # Validate email is provided @@ -979,8 +980,7 @@ def create_user() -> Response: @manager.route("/update", methods=["PUT"]) # noqa: F821 @login_required -@validate_request() -def update_user() -> Response: +async def update_user() -> Response: """ Update an existing user. Users can only update their own account. --- @@ -1041,14 +1041,15 @@ def update_user() -> Response: schema: type: object """ - if request.json is None: + req_json = await request.json + if req_json is None: return get_json_result( data=False, message="Request body is required!", code=RetCode.ARGUMENT_ERROR, ) - req: Dict[str, Any] = request.json + req: Dict[str, Any] = req_json user_id: Optional[str] = req.get("user_id") email: Optional[str] = req.get("email") identified_by_user_id: bool = bool(user_id) @@ -1362,8 +1363,7 @@ def list_users() -> Response: @manager.route("/delete", methods=["DELETE"]) # noqa: F821 @login_required -@validate_request() -def delete_user() -> Response: +async def delete_user() -> Response: """ Delete a user. Users can only delete their own account. @@ -1424,14 +1424,15 @@ def delete_user() -> Response: code=RetCode.UNAUTHORIZED, ) - if request.json is None: + req_json = await request.json + if req_json is None: return get_json_result( data=False, message="Request body is required!", code=RetCode.ARGUMENT_ERROR, ) - req: Dict[str, Any] = request.json + req: Dict[str, Any] = req_json user_id: Optional[str] = req.get("user_id") email: Optional[str] = req.get("email") diff --git a/api/db/services/knowledgebase_service.py b/api/db/services/knowledgebase_service.py index 380272ec5..f7dc4a573 100644 --- a/api/db/services/knowledgebase_service.py +++ b/api/db/services/knowledgebase_service.py @@ -170,21 +170,30 @@ class KnowledgebaseService(CommonService): User.avatar.alias('tenant_avatar'), cls.model.update_time ] + # A KB is visible to a user if: + # - It is shared with one of the user's joined tenants (via shared_tenant_id) + # and has TEAM permission, or + # - It is owned by the user (tenant_id == user_id) + visibility_expr = ( + ( + (cls.model.shared_tenant_id.in_(joined_tenant_ids)) + & (cls.model.permission == TenantPermission.TEAM.value) + ) + | (cls.model.tenant_id == user_id) + ) + + base_query = ( + cls.model.select(*fields) + .join(User, on=(cls.model.tenant_id == User.id)) + .where(visibility_expr & (cls.model.status == StatusEnum.VALID.value)) + ) + if keywords: - kbs = cls.model.select(*fields).join(User, on=(cls.model.tenant_id == User.id)).where( - ((cls.model.tenant_id.in_(joined_tenant_ids) & (cls.model.permission == - TenantPermission.TEAM.value)) | ( - cls.model.tenant_id == user_id)) - & (cls.model.status == StatusEnum.VALID.value), - (fn.LOWER(cls.model.name).contains(keywords.lower())) + kbs = base_query.where( + fn.LOWER(cls.model.name).contains(keywords.lower()) ) else: - kbs = cls.model.select(*fields).join(User, on=(cls.model.tenant_id == User.id)).where( - ((cls.model.tenant_id.in_(joined_tenant_ids) & (cls.model.permission == - TenantPermission.TEAM.value)) | ( - cls.model.tenant_id == user_id)) - & (cls.model.status == StatusEnum.VALID.value) - ) + kbs = base_query if parser_id: kbs = kbs.where(cls.model.parser_id == parser_id) if desc: @@ -192,12 +201,21 @@ class KnowledgebaseService(CommonService): else: kbs = kbs.order_by(cls.model.getter_by(orderby).asc()) - count = kbs.count() - + # Apply pagination at query level + total = kbs.count() if page_number and items_per_page: kbs = kbs.paginate(page_number, items_per_page) - return list(kbs.dicts()), count + # Convert to list of dicts and enforce per-user read permission, + # so revoking dataset read permission hides team datasets from lists. + kb_dicts = list(kbs.dicts()) + filtered_kbs = [ + kb for kb in kb_dicts + if cls.accessible(kb["id"], user_id, required_permission="read") + ] + + # When filtering is applied, total should reflect visible datasets + return filtered_kbs, len(filtered_kbs) @classmethod @DB.connection_context() diff --git a/api/utils/api_utils.py b/api/utils/api_utils.py index 8911406c6..164077f9f 100644 --- a/api/utils/api_utils.py +++ b/api/utils/api_utils.py @@ -92,6 +92,9 @@ def server_error_response(e): logging.exception(e) try: msg = repr(e).lower() + # Check if it's a RuntimeError about request context - treat as Unauthorized + if isinstance(e, RuntimeError) and "request context" in msg: + return get_json_result(code=RetCode.UNAUTHORIZED, message="Unauthorized") if getattr(e, "code", None) == 401 or ("unauthorized" in msg) or ("401" in msg): return get_json_result(code=RetCode.UNAUTHORIZED, message=repr(e)) except Exception as ex: diff --git a/pyproject.toml b/pyproject.toml index 4dfc4c1e2..0cb42a8e6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -152,6 +152,7 @@ dependencies = [ "pypandoc>=1.16", "pyobvector==0.2.18", "ruff>=0.14.4", + "reportlab>=4.4.3", ] [dependency-groups] diff --git a/test/testcases/test_http_api/common.py b/test/testcases/test_http_api/common.py index 0e7b8dfdd..c7324591c 100644 --- a/test/testcases/test_http_api/common.py +++ b/test/testcases/test_http_api/common.py @@ -21,6 +21,7 @@ from configs import HOST_ADDRESS, VERSION from requests.auth import AuthBase from requests_toolbelt import MultipartEncoder from utils.file_utils import create_txt_file +from libs.auth import RAGFlowWebApiAuth # Import login_as_user and encrypt_password from root conftest import importlib.util @@ -39,26 +40,137 @@ CHUNK_API_URL = f"/api/{VERSION}/datasets/{{dataset_id}}/documents/{{document_id CHAT_ASSISTANT_API_URL = f"/api/{VERSION}/chats" SESSION_WITH_CHAT_ASSISTANT_API_URL = f"/api/{VERSION}/chats/{{chat_id}}/sessions" SESSION_WITH_AGENT_API_URL = f"/api/{VERSION}/agents/{{agent_id}}/sessions" +CANVAS_API_URL = f"/{VERSION}/canvas" # DATASET MANAGEMENT def create_dataset(auth, payload=None, *, headers=HEADERS, data=None): - res = requests.post(url=f"{HOST_ADDRESS}{DATASETS_API_URL}", headers=headers, auth=auth, json=payload, data=data) + """ + Create dataset. + + - For HTTP API token auth (`RAGFlowHttpApiAuth`), use the RESTful datasets API. + - For web JWT auth (`RAGFlowWebApiAuth`), proxy to the KB web endpoint `/v1/kb/create` + and normalize its response shape to match the datasets API (`data.id`). + """ + # Web (JWT) flow: go through KB web endpoint with permission-aware logic + if isinstance(auth, RAGFlowWebApiAuth): + url = f"{HOST_ADDRESS}/{VERSION}/kb/create" + res = requests.post(url=url, headers=headers, auth=auth, json=payload, data=data) + body = res.json() + # KB create returns {"kb_id": ...}; normalize to {"id": ..., "kb_id": ...} + if body.get("code") == 0 and isinstance(body.get("data"), dict) and "kb_id" in body["data"]: + kb_id = body["data"]["kb_id"] + body["data"] = {"id": kb_id, "kb_id": kb_id} + return body + + # HTTP API (API key) flow: original datasets REST endpoint + res = requests.post( + url=f"{HOST_ADDRESS}{DATASETS_API_URL}", + headers=headers, + auth=auth, + json=payload, + data=data, + ) return res.json() def list_datasets(auth, params=None, *, headers=HEADERS): - res = requests.get(url=f"{HOST_ADDRESS}{DATASETS_API_URL}", headers=headers, auth=auth, params=params) + """ + List datasets. + + - Web JWT auth: call `/v1/kb/list` and project KBs to a simple `[{"id": ...}, ...]` list. + - HTTP API token auth: use `/api/{version}/datasets` as before. + """ + if isinstance(auth, RAGFlowWebApiAuth): + url = f"{HOST_ADDRESS}/{VERSION}/kb/list" + # `list_kbs` expects POST with optional body (owner_ids etc.) and query params for paging. + res = requests.post(url=url, headers=headers, auth=auth, params=params or {}, json={}) + body = res.json() + if body.get("code") == 0: + data = body.get("data") or {} + kbs = data.get("kbs", []) + # Normalize to the datasets API shape: list of objects with "id" + body["data"] = [{"id": kb["id"], **kb} for kb in kbs] + return body + + res = requests.get( + url=f"{HOST_ADDRESS}{DATASETS_API_URL}", + headers=headers, + auth=auth, + params=params, + ) return res.json() def update_dataset(auth, dataset_id, payload=None, *, headers=HEADERS, data=None): - res = requests.put(url=f"{HOST_ADDRESS}{DATASETS_API_URL}/{dataset_id}", headers=headers, auth=auth, json=payload, data=data) + """ + Update dataset. + + - Web JWT auth: call `/v1/kb/update` with `kb_id` and normalize response. + - HTTP API token auth: use `/api/{version}/datasets/{id}`. + """ + if isinstance(auth, RAGFlowWebApiAuth): + url = f"{HOST_ADDRESS}/{VERSION}/kb/update" + # KB update expects "kb_id" instead of "id" + kb_payload = dict(payload or {}) + kb_payload["kb_id"] = dataset_id + res = requests.post(url=url, headers=headers, auth=auth, json=kb_payload, data=data) + body = res.json() + if body.get("code") == 0 and isinstance(body.get("data"), dict): + kb = body["data"] + # Ensure an "id" field is present for tests + if "id" not in kb and "kb_id" in kb: + kb["id"] = kb["kb_id"] + body["data"] = kb + return body + + res = requests.put( + url=f"{HOST_ADDRESS}{DATASETS_API_URL}/{dataset_id}", + headers=headers, + auth=auth, + json=payload, + data=data, + ) return res.json() def delete_datasets(auth, payload=None, *, headers=HEADERS, data=None): - res = requests.delete(url=f"{HOST_ADDRESS}{DATASETS_API_URL}", headers=headers, auth=auth, json=payload, data=data) + """ + Delete datasets. + + - Web JWT auth: call `/v1/kb/rm` with single `kb_id` when exactly one id is provided, + and treat other cases as unsupported for this test suite. + - HTTP API token auth: original RESTful batch delete. + """ + if isinstance(auth, RAGFlowWebApiAuth): + ids = (payload or {}).get("ids") + # For the permission tests we only ever delete a single dataset by id. + if not ids or len(ids) != 1: + return { + "code": 101, + "message": "Only single-id delete is supported via web API helper", + "data": False, + } + kb_id = ids[0] + url = f"{HOST_ADDRESS}/{VERSION}/kb/rm" + res = requests.post( + url=url, + headers=headers, + auth=auth, + json={"kb_id": kb_id}, + data=data, + ) + body = res.json() + # KB rm returns data=True/False; keep that shape but keep code semantics. + return body + + res = requests.delete( + url=f"{HOST_ADDRESS}{DATASETS_API_URL}", + headers=headers, + auth=auth, + json=payload, + data=data, + ) return res.json() @@ -592,6 +704,64 @@ def promote_admin( return res.json() +def get_user_permissions( + auth: Union[AuthBase, str, None], + tenant_id: str, + user_id: str, + *, + headers: Dict[str, str] = HEADERS, +) -> Dict[str, Any]: + """Get CRUD permissions for a team member. + + Args: + auth: Authentication object (AuthBase subclass), token string, or None. + tenant_id: The team ID. + user_id: The user ID to get permissions for. + headers: Optional HTTP headers. Defaults to HEADERS. + + Returns: + JSON response as a dictionary containing permissions. + + Raises: + requests.RequestException: If the HTTP request fails. + """ + url: str = f"{HOST_ADDRESS}{TEAM_API_URL}/{tenant_id}/users/{user_id}/permissions" + res: requests.Response = requests.get( + url=url, headers=headers, auth=auth + ) + return res.json() + + +def update_user_permissions( + auth: Union[AuthBase, str, None], + tenant_id: str, + user_id: str, + payload: Optional[Dict[str, Any]] = None, + *, + headers: Dict[str, str] = HEADERS, +) -> Dict[str, Any]: + """Update CRUD permissions for a team member. + + Args: + auth: Authentication object (AuthBase subclass), token string, or None. + tenant_id: The team ID. + user_id: The user ID to update permissions for. + payload: JSON payload containing permissions to update. + headers: Optional HTTP headers. Defaults to HEADERS. + + Returns: + JSON response as a dictionary containing updated permissions. + + Raises: + requests.RequestException: If the HTTP request fails. + """ + url: str = f"{HOST_ADDRESS}{TEAM_API_URL}/{tenant_id}/users/{user_id}/permissions" + res: requests.Response = requests.put( + url=url, headers=headers, auth=auth, json=payload + ) + return res.json() + + def demote_admin( auth: Union[AuthBase, str, None], tenant_id: str, @@ -620,3 +790,212 @@ def demote_admin( return res.json() +# CANVAS MANAGEMENT +def create_canvas( + auth: Union[AuthBase, str, None], + payload: Optional[Dict[str, Any]] = None, + *, + headers: Dict[str, str] = HEADERS, +) -> Dict[str, Any]: + """Create or update a canvas. + + Args: + auth: Authentication object (AuthBase subclass), token string, or None. + payload: JSON payload containing canvas data (dsl, title, permission, etc.). + headers: Optional HTTP headers. Defaults to HEADERS. + + Returns: + JSON response as a dictionary containing the canvas data. + + Raises: + requests.RequestException: If the HTTP request fails. + """ + url: str = f"{HOST_ADDRESS}{CANVAS_API_URL}/set" + res: requests.Response = requests.post( + url=url, headers=headers, auth=auth, json=payload + ) + return res.json() + + +def get_canvas( + auth: Union[AuthBase, str, None], + canvas_id: str, + *, + headers: Dict[str, str] = HEADERS, +) -> Dict[str, Any]: + """Get a canvas by ID. + + Args: + auth: Authentication object (AuthBase subclass), token string, or None. + canvas_id: The canvas ID to retrieve. + headers: Optional HTTP headers. Defaults to HEADERS. + + Returns: + JSON response as a dictionary containing the canvas data. + + Raises: + requests.RequestException: If the HTTP request fails. + """ + url: str = f"{HOST_ADDRESS}{CANVAS_API_URL}/get/{canvas_id}" + res: requests.Response = requests.get( + url=url, headers=headers, auth=auth + ) + return res.json() + + +def list_canvases( + auth: Union[AuthBase, str, None], + params: Optional[Dict[str, Any]] = None, + *, + headers: Dict[str, str] = HEADERS, +) -> Dict[str, Any]: + """List canvases. + + Args: + auth: Authentication object (AuthBase subclass), token string, or None. + params: Optional query parameters (page_number, page_size, keywords, etc.). + headers: Optional HTTP headers. Defaults to HEADERS. + + Returns: + JSON response as a dictionary containing canvas list and total count. + + Raises: + requests.RequestException: If the HTTP request fails. + """ + url: str = f"{HOST_ADDRESS}{CANVAS_API_URL}/list" + res: requests.Response = requests.get( + url=url, headers=headers, auth=auth, params=params + ) + return res.json() + + +def delete_canvas( + auth: Union[AuthBase, str, None], + payload: Optional[Dict[str, Any]] = None, + *, + headers: Dict[str, str] = HEADERS, +) -> Dict[str, Any]: + """Delete one or more canvases. + + Args: + auth: Authentication object (AuthBase subclass), token string, or None. + payload: JSON payload containing canvas_ids list. + headers: Optional HTTP headers. Defaults to HEADERS. + + Returns: + JSON response as a dictionary containing deletion result. + + Raises: + requests.RequestException: If the HTTP request fails. + """ + url: str = f"{HOST_ADDRESS}{CANVAS_API_URL}/rm" + res: requests.Response = requests.post( + url=url, headers=headers, auth=auth, json=payload + ) + return res.json() + + +def update_canvas_setting( + auth: Union[AuthBase, str, None], + payload: Optional[Dict[str, Any]] = None, + *, + headers: Dict[str, str] = HEADERS, +) -> Dict[str, Any]: + """Update canvas settings (title, permission, etc.). + + Args: + auth: Authentication object (AuthBase subclass), token string, or None. + payload: JSON payload containing canvas settings (id, title, permission, etc.). + headers: Optional HTTP headers. Defaults to HEADERS. + + Returns: + JSON response as a dictionary containing updated canvas data. + + Raises: + requests.RequestException: If the HTTP request fails. + """ + url: str = f"{HOST_ADDRESS}{CANVAS_API_URL}/setting" + res: requests.Response = requests.post( + url=url, headers=headers, auth=auth, json=payload + ) + return res.json() + + +def reset_canvas( + auth: Union[AuthBase, str, None], + payload: Optional[Dict[str, Any]] = None, + *, + headers: Dict[str, str] = HEADERS, +) -> Dict[str, Any]: + """Reset a canvas to a previous version. + + Args: + auth: Authentication object (AuthBase subclass), token string, or None. + payload: JSON payload containing canvas id. + headers: Optional HTTP headers. Defaults to HEADERS. + + Returns: + JSON response as a dictionary containing reset result. + + Raises: + requests.RequestException: If the HTTP request fails. + """ + url: str = f"{HOST_ADDRESS}{CANVAS_API_URL}/reset" + res: requests.Response = requests.post( + url=url, headers=headers, auth=auth, json=payload + ) + return res.json() + + +def run_canvas( + auth: Union[AuthBase, str, None], + payload: Optional[Dict[str, Any]] = None, + *, + headers: Dict[str, str] = HEADERS, +) -> Dict[str, Any]: + """Run a canvas (completion). + + Args: + auth: Authentication object (AuthBase subclass), token string, or None. + payload: JSON payload containing canvas id, query, files, inputs, etc. + headers: Optional HTTP headers. Defaults to HEADERS. + + Returns: + JSON response as a dictionary containing run result. + + Raises: + requests.RequestException: If the HTTP request fails. + """ + url: str = f"{HOST_ADDRESS}{CANVAS_API_URL}/completion" + res: requests.Response = requests.post( + url=url, headers=headers, auth=auth, json=payload + ) + return res.json() + + +def debug_canvas( + auth: Union[AuthBase, str, None], + payload: Optional[Dict[str, Any]] = None, + *, + headers: Dict[str, str] = HEADERS, +) -> Dict[str, Any]: + """Debug a canvas component. + + Args: + auth: Authentication object (AuthBase subclass), token string, or None. + payload: JSON payload containing canvas id, component_id, params. + headers: Optional HTTP headers. Defaults to HEADERS. + + Returns: + JSON response as a dictionary containing debug result. + + Raises: + requests.RequestException: If the HTTP request fails. + """ + url: str = f"{HOST_ADDRESS}{CANVAS_API_URL}/debug" + res: requests.Response = requests.post( + url=url, headers=headers, auth=auth, json=payload + ) + return res.json() + + diff --git a/test/testcases/test_http_api/test_canvas_management/test_canvas_permissions.py b/test/testcases/test_http_api/test_canvas_management/test_canvas_permissions.py new file mode 100644 index 000000000..1a1c76d91 --- /dev/null +++ b/test/testcases/test_http_api/test_canvas_management/test_canvas_permissions.py @@ -0,0 +1,505 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import annotations + +import json +import time +import uuid +from typing import Any + +import pytest + +from common import ( + accept_team_invitation, + add_users_to_team, + create_canvas, + create_team, + create_user, + debug_canvas, + delete_canvas, + encrypt_password, + get_canvas, + get_user_permissions, + list_canvases, + login_as_user, + reset_canvas, + run_canvas, + update_canvas_setting, + update_user_permissions, +) +from configs import INVALID_API_TOKEN +from libs.auth import RAGFlowWebApiAuth + + +# --------------------------------------------------------------------------- +# Test Classes +# --------------------------------------------------------------------------- + + +@pytest.mark.p1 +class TestCanvasPermissions: + """Comprehensive tests for canvas permissions with CRUD operations.""" + + @pytest.fixture + def test_team(self, web_api_auth: RAGFlowWebApiAuth) -> dict[str, Any]: + """Create a test team for use in tests.""" + team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"} + res: dict[str, Any] = create_team(web_api_auth, team_payload) + assert res["code"] == 0 + return res["data"] + + @pytest.fixture + def team_with_user( + self, + web_api_auth: RAGFlowWebApiAuth, + test_team: dict[str, Any], + ) -> dict[str, Any]: + """Create a team with a user who has accepted the invitation.""" + tenant_id: str = test_team["id"] + + # Create user + email = f"testuser_{uuid.uuid4().hex[:8]}@example.com" + password = "TestPassword123!" + encrypted_password = encrypt_password(password) + user_payload: dict[str, str] = { + "email": email, + "password": encrypted_password, + "nickname": "Test User", + } + user_res: dict[str, Any] = create_user(web_api_auth, user_payload) + assert user_res["code"] == 0 + user_id: str = user_res["data"]["id"] + + # Add user to team + add_payload: dict[str, list[str]] = {"users": [email]} + add_res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload) + assert add_res["code"] == 0 + + # Small delay + time.sleep(0.5) + + # Accept invitation as the user + user_auth: RAGFlowWebApiAuth = login_as_user(email, password) + accept_res: dict[str, Any] = accept_team_invitation(user_auth, tenant_id) + assert accept_res["code"] == 0 + + return { + "team": test_team, + "user": {"id": user_id, "email": email, "password": password}, + } + + @pytest.fixture + def team_canvas( + self, + web_api_auth: RAGFlowWebApiAuth, + test_team: dict[str, Any], + ) -> dict[str, Any]: + """Create a canvas shared with team.""" + # Simple canvas DSL + canvas_dsl = { + "nodes": [ + { + "id": "start", + "type": "start", + "position": {"x": 100, "y": 100}, + } + ], + "edges": [], + } + + canvas_payload: dict[str, Any] = { + "title": f"Test Canvas {uuid.uuid4().hex[:8]}", + "dsl": json.dumps(canvas_dsl), + "permission": "team", + "shared_tenant_id": test_team["id"], + "canvas_category": "Agent", + } + + res: dict[str, Any] = create_canvas(web_api_auth, canvas_payload) + assert res["code"] == 0 + return res["data"] + + @pytest.mark.p1 + def test_read_permission_allows_get_canvas( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + team_canvas: dict[str, Any], + ) -> None: + """Test that user with read permission can get canvas.""" + canvas_id: str = team_canvas["id"] + user_email: str = team_with_user["user"]["email"] + user_password: str = team_with_user["user"]["password"] + tenant_id: str = team_with_user["team"]["id"] + + # User should have read permission by default + user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password) + + res: dict[str, Any] = get_canvas(user_auth, canvas_id) + assert res["code"] == 0, res + assert "data" in res + assert res["data"]["id"] == canvas_id + + @pytest.mark.p1 + def test_no_read_permission_denies_get_canvas( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + team_canvas: dict[str, Any], + ) -> None: + """Test that user without read permission cannot get canvas.""" + canvas_id: str = team_canvas["id"] + user_id: str = team_with_user["user"]["id"] + user_email: str = team_with_user["user"]["email"] + user_password: str = team_with_user["user"]["password"] + tenant_id: str = team_with_user["team"]["id"] + + # Revoke read permission + update_payload: dict[str, Any] = { + "permissions": { + "canvas": {"read": False}, + } + } + update_res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload) + assert update_res["code"] == 0 + + # User should not be able to get canvas + user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password) + res: dict[str, Any] = get_canvas(user_auth, canvas_id) + assert res["code"] != 0 + assert "permission" in res["message"].lower() or "read" in res["message"].lower() + + @pytest.mark.p1 + def test_update_permission_allows_update_canvas( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + team_canvas: dict[str, Any], + ) -> None: + """Test that user with update permission can update canvas.""" + canvas_id: str = team_canvas["id"] + user_id: str = team_with_user["user"]["id"] + user_email: str = team_with_user["user"]["email"] + user_password: str = team_with_user["user"]["password"] + tenant_id: str = team_with_user["team"]["id"] + + # Grant update permission + update_payload: dict[str, Any] = { + "permissions": { + "canvas": {"update": True}, + } + } + update_res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload) + assert update_res["code"] == 0 + + # User should be able to update canvas settings + user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password) + setting_payload: dict[str, Any] = { + "id": canvas_id, + "title": "Updated Title", + "permission": "team", + } + res: dict[str, Any] = update_canvas_setting(user_auth, setting_payload) + assert res["code"] == 0, res + + @pytest.mark.p1 + def test_no_update_permission_denies_update_canvas( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + team_canvas: dict[str, Any], + ) -> None: + """Test that user without update permission cannot update canvas.""" + canvas_id: str = team_canvas["id"] + user_id: str = team_with_user["user"]["id"] + user_email: str = team_with_user["user"]["email"] + user_password: str = team_with_user["user"]["password"] + tenant_id: str = team_with_user["team"]["id"] + + # Ensure update permission is False (default) + permissions_res: dict[str, Any] = get_user_permissions(web_api_auth, tenant_id, user_id) + assert permissions_res["code"] == 0 + assert permissions_res["data"]["canvas"]["update"] is False + + # User should not be able to update canvas + user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password) + setting_payload: dict[str, Any] = { + "id": canvas_id, + "title": "Updated Title", + "permission": "team", + } + res: dict[str, Any] = update_canvas_setting(user_auth, setting_payload) + assert res["code"] != 0 + assert "permission" in res["message"].lower() or "update" in res["message"].lower() + + @pytest.mark.p1 + def test_delete_permission_allows_delete_canvas( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + team_canvas: dict[str, Any], + ) -> None: + """Test that user with delete permission can delete canvas.""" + # Create a new canvas for deletion + canvas_dsl = { + "nodes": [{"id": "start", "type": "start", "position": {"x": 100, "y": 100}}], + "edges": [], + } + tenant_id: str = team_with_user["team"]["id"] + canvas_payload: dict[str, Any] = { + "title": f"Test Canvas Delete {uuid.uuid4().hex[:8]}", + "dsl": json.dumps(canvas_dsl), + "permission": "team", + "shared_tenant_id": tenant_id, + "canvas_category": "Agent", + } + create_res: dict[str, Any] = create_canvas(web_api_auth, canvas_payload) + assert create_res["code"] == 0 + canvas_id: str = create_res["data"]["id"] + + user_id: str = team_with_user["user"]["id"] + user_email: str = team_with_user["user"]["email"] + user_password: str = team_with_user["user"]["password"] + tenant_id = team_with_user["team"]["id"] + + # Grant delete permission + update_payload: dict[str, Any] = { + "permissions": { + "canvas": {"delete": True}, + } + } + update_res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload) + assert update_res["code"] == 0 + + # User should be able to delete canvas + user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password) + delete_payload: dict[str, Any] = {"canvas_ids": [canvas_id]} + res: dict[str, Any] = delete_canvas(user_auth, delete_payload) + assert res["code"] == 0, res + + @pytest.mark.p1 + def test_no_delete_permission_denies_delete_canvas( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + team_canvas: dict[str, Any], + ) -> None: + """Test that user without delete permission cannot delete canvas.""" + canvas_id: str = team_canvas["id"] + user_id: str = team_with_user["user"]["id"] + user_email: str = team_with_user["user"]["email"] + user_password: str = team_with_user["user"]["password"] + tenant_id: str = team_with_user["team"]["id"] + + # Ensure delete permission is False (default) + permissions_res: dict[str, Any] = get_user_permissions(web_api_auth, tenant_id, user_id) + assert permissions_res["code"] == 0 + assert permissions_res["data"]["canvas"]["delete"] is False + + # User should not be able to delete canvas + user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password) + delete_payload: dict[str, Any] = {"canvas_ids": [canvas_id]} + res: dict[str, Any] = delete_canvas(user_auth, delete_payload) + assert res["code"] != 0 + assert "permission" in res["message"].lower() or "delete" in res["message"].lower() + + @pytest.mark.p1 + def test_read_permission_allows_run_canvas( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + team_canvas: dict[str, Any], + ) -> None: + """Test that user with read permission can run canvas.""" + canvas_id: str = team_canvas["id"] + user_email: str = team_with_user["user"]["email"] + user_password: str = team_with_user["user"]["password"] + + # User should have read permission by default (allows running) + user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password) + + run_payload: dict[str, Any] = { + "id": canvas_id, + "query": "test query", + } + # Note: This might fail for other reasons (missing components, etc.) + # but should not fail due to permission + res: dict[str, Any] = run_canvas(user_auth, run_payload) + # Permission check should pass (code != PERMISSION_ERROR) + if res["code"] != 0: + assert "permission" not in res["message"].lower() or "read" not in res["message"].lower() + + @pytest.mark.p1 + def test_read_permission_allows_debug_canvas( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + team_canvas: dict[str, Any], + ) -> None: + """Test that user with read permission can debug canvas.""" + canvas_id: str = team_canvas["id"] + user_email: str = team_with_user["user"]["email"] + user_password: str = team_with_user["user"]["password"] + + # User should have read permission by default (allows debugging) + user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password) + + debug_payload: dict[str, Any] = { + "id": canvas_id, + "component_id": "start", + "params": {}, + } + # Note: This might fail for other reasons (missing components, etc.) + # but should not fail due to permission + res: dict[str, Any] = debug_canvas(user_auth, debug_payload) + # Permission check should pass (code != PERMISSION_ERROR) + if res["code"] != 0: + assert "permission" not in res["message"].lower() or "read" not in res["message"].lower() + + @pytest.mark.p1 + def test_update_permission_allows_reset_canvas( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + team_canvas: dict[str, Any], + ) -> None: + """Test that user with update permission can reset canvas.""" + canvas_id: str = team_canvas["id"] + user_id: str = team_with_user["user"]["id"] + user_email: str = team_with_user["user"]["email"] + user_password: str = team_with_user["user"]["password"] + tenant_id: str = team_with_user["team"]["id"] + + # Grant update permission + update_payload: dict[str, Any] = { + "permissions": { + "canvas": {"update": True}, + } + } + update_res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload) + assert update_res["code"] == 0 + + # User should be able to reset canvas + user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password) + reset_payload: dict[str, Any] = {"id": canvas_id} + # Note: This might fail if there are no versions, but should not fail due to permission + res: dict[str, Any] = reset_canvas(user_auth, reset_payload) + # Permission check should pass (code != PERMISSION_ERROR) + if res["code"] != 0: + assert "permission" not in res["message"].lower() or "update" not in res["message"].lower() + + @pytest.mark.p1 + def test_create_permission_allows_create_team_canvas( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + ) -> None: + """Test that user with create permission can create team canvas.""" + user_id: str = team_with_user["user"]["id"] + user_email: str = team_with_user["user"]["email"] + user_password: str = team_with_user["user"]["password"] + tenant_id: str = team_with_user["team"]["id"] + + # Grant create permission + update_payload: dict[str, Any] = { + "permissions": { + "canvas": {"create": True}, + } + } + update_res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload) + assert update_res["code"] == 0 + + # User should be able to create team canvas + user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password) + canvas_dsl = { + "nodes": [{"id": "start", "type": "start", "position": {"x": 100, "y": 100}}], + "edges": [], + } + canvas_payload: dict[str, Any] = { + "title": f"User Created Canvas {uuid.uuid4().hex[:8]}", + "dsl": json.dumps(canvas_dsl), + "permission": "team", + "shared_tenant_id": tenant_id, + "canvas_category": "Agent", + } + res: dict[str, Any] = create_canvas(user_auth, canvas_payload) + assert res["code"] == 0, res + + @pytest.mark.p1 + def test_no_create_permission_denies_create_team_canvas( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + ) -> None: + """Test that user without create permission cannot create team canvas.""" + user_id: str = team_with_user["user"]["id"] + user_email: str = team_with_user["user"]["email"] + user_password: str = team_with_user["user"]["password"] + tenant_id: str = team_with_user["team"]["id"] + + # Ensure create permission is False (default) + permissions_res: dict[str, Any] = get_user_permissions(web_api_auth, tenant_id, user_id) + assert permissions_res["code"] == 0 + assert permissions_res["data"]["canvas"]["create"] is False + + # User should not be able to create team canvas + user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password) + canvas_dsl = { + "nodes": [{"id": "start", "type": "start", "position": {"x": 100, "y": 100}}], + "edges": [], + } + canvas_payload: dict[str, Any] = { + "title": f"User Created Canvas {uuid.uuid4().hex[:8]}", + "dsl": json.dumps(canvas_dsl), + "permission": "team", + "shared_tenant_id": tenant_id, + "canvas_category": "Agent", + } + res: dict[str, Any] = create_canvas(user_auth, canvas_payload) + assert res["code"] != 0 + assert "permission" in res["message"].lower() or "create" in res["message"].lower() + + @pytest.mark.p1 + def test_owner_always_has_full_permissions( + self, + web_api_auth: RAGFlowWebApiAuth, + test_team: dict[str, Any], + team_canvas: dict[str, Any], + ) -> None: + """Test that team owner always has full permissions regardless of settings.""" + canvas_id: str = team_canvas["id"] + owner_id: str = test_team["owner_id"] + tenant_id: str = test_team["id"] + + # Owner should have full permissions + permissions_res: dict[str, Any] = get_user_permissions(web_api_auth, tenant_id, owner_id) + assert permissions_res["code"] == 0 + permissions: dict[str, Any] = permissions_res["data"] + + assert permissions["canvas"]["create"] is True + assert permissions["canvas"]["read"] is True + assert permissions["canvas"]["update"] is True + assert permissions["canvas"]["delete"] is True + + # Owner should be able to perform all operations + setting_payload: dict[str, Any] = { + "id": canvas_id, + "title": "Owner Updated Title", + "permission": "team", + } + res: dict[str, Any] = update_canvas_setting(web_api_auth, setting_payload) + assert res["code"] == 0, res + diff --git a/test/testcases/test_http_api/test_dataset_mangement/test_dataset_permissions.py b/test/testcases/test_http_api/test_dataset_mangement/test_dataset_permissions.py new file mode 100644 index 000000000..268dbd311 --- /dev/null +++ b/test/testcases/test_http_api/test_dataset_mangement/test_dataset_permissions.py @@ -0,0 +1,458 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import annotations + +import time +import uuid +from typing import Any + +import pytest +import requests + +from common import ( + accept_team_invitation, + add_users_to_team, + create_dataset, + create_team, + create_user, + delete_datasets, + encrypt_password, + get_user_permissions, + list_datasets, + login_as_user, + update_dataset, + update_user_permissions, +) +from configs import HOST_ADDRESS, INVALID_API_TOKEN, VERSION +from libs.auth import RAGFlowWebApiAuth + + +def get_dataset_detail(auth: RAGFlowWebApiAuth, kb_id: str) -> dict[str, Any]: + """Get dataset details by ID. + + Args: + auth: Authentication object. + kb_id: Knowledge base (dataset) ID. + + Returns: + JSON response as a dictionary containing the dataset data. + """ + # Use the web KB detail endpoint, which is JWT-authenticated and + # enforces dataset/KB read permissions consistently with canvas tests. + url: str = f"{HOST_ADDRESS}/{VERSION}/kb/detail" + res: requests.Response = requests.get( + url=url, headers={"Content-Type": "application/json"}, auth=auth, params={"kb_id": kb_id} + ) + return res.json() + + +# --------------------------------------------------------------------------- +# Test Classes +# --------------------------------------------------------------------------- + + +@pytest.mark.p1 +class TestDatasetPermissions: + """Comprehensive tests for dataset permissions with CRUD operations.""" + + @pytest.fixture + def test_team(self, web_api_auth: RAGFlowWebApiAuth) -> dict[str, Any]: + """Create a test team for use in tests.""" + team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"} + res: dict[str, Any] = create_team(web_api_auth, team_payload) + assert res["code"] == 0 + return res["data"] + + @pytest.fixture + def team_with_user( + self, + web_api_auth: RAGFlowWebApiAuth, + test_team: dict[str, Any], + ) -> dict[str, Any]: + """Create a team with a user who has accepted the invitation.""" + tenant_id: str = test_team["id"] + + # Create user + email = f"testuser_{uuid.uuid4().hex[:8]}@example.com" + password = "TestPassword123!" + encrypted_password = encrypt_password(password) + user_payload: dict[str, str] = { + "email": email, + "password": encrypted_password, + "nickname": "Test User", + } + user_res: dict[str, Any] = create_user(web_api_auth, user_payload) + assert user_res["code"] == 0 + user_id: str = user_res["data"]["id"] + + # Add user to team + add_payload: dict[str, list[str]] = {"users": [email]} + add_res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload) + assert add_res["code"] == 0 + + # Small delay + time.sleep(0.5) + + # Accept invitation as the user + user_auth: RAGFlowWebApiAuth = login_as_user(email, password) + accept_res: dict[str, Any] = accept_team_invitation(user_auth, tenant_id) + assert accept_res["code"] == 0 + + return { + "team": test_team, + "user": {"id": user_id, "email": email, "password": password}, + } + + @pytest.fixture + def team_dataset( + self, + web_api_auth: RAGFlowWebApiAuth, + test_team: dict[str, Any], + ) -> dict[str, Any]: + """Create a dataset shared with team.""" + dataset_payload: dict[str, Any] = { + "name": f"Test Dataset {uuid.uuid4().hex[:8]}", + "permission": "team", + "shared_tenant_id": test_team["id"], + } + + res: dict[str, Any] = create_dataset(web_api_auth, dataset_payload) + assert res["code"] == 0 + return res["data"] + + @pytest.mark.p1 + def test_read_permission_allows_get_dataset( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + team_dataset: dict[str, Any], + ) -> None: + """Test that user with read permission can get dataset.""" + dataset_id: str = team_dataset["id"] + user_email: str = team_with_user["user"]["email"] + user_password: str = team_with_user["user"]["password"] + + # User should have read permission by default + user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password) + + res: dict[str, Any] = get_dataset_detail(user_auth, dataset_id) + assert res["code"] == 0, res + assert "data" in res + assert res["data"]["id"] == dataset_id + + @pytest.mark.p1 + def test_no_read_permission_denies_get_dataset( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + team_dataset: dict[str, Any], + ) -> None: + """Test that user without read permission cannot get dataset.""" + dataset_id: str = team_dataset["id"] + user_id: str = team_with_user["user"]["id"] + user_email: str = team_with_user["user"]["email"] + user_password: str = team_with_user["user"]["password"] + tenant_id: str = team_with_user["team"]["id"] + + # Revoke read permission + update_payload: dict[str, Any] = { + "permissions": { + "dataset": {"read": False}, + } + } + update_res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload) + assert update_res["code"] == 0 + + # User should not be able to get dataset + user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password) + res: dict[str, Any] = get_dataset_detail(user_auth, dataset_id) + assert res["code"] != 0 + assert "permission" in res["message"].lower() or "read" in res["message"].lower() or "authorized" in res["message"].lower() + + @pytest.mark.p1 + def test_update_permission_allows_update_dataset( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + team_dataset: dict[str, Any], + ) -> None: + """Test that user with update permission can update dataset.""" + dataset_id: str = team_dataset["id"] + user_id: str = team_with_user["user"]["id"] + user_email: str = team_with_user["user"]["email"] + user_password: str = team_with_user["user"]["password"] + tenant_id: str = team_with_user["team"]["id"] + + # Grant update permission + update_payload: dict[str, Any] = { + "permissions": { + "dataset": {"update": True}, + } + } + update_res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload) + assert update_res["code"] == 0 + + # User should be able to update dataset + user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password) + update_dataset_payload: dict[str, Any] = { + "name": f"Updated Dataset {uuid.uuid4().hex[:8]}", + "description": "Updated description", + "parser_id": team_dataset.get("parser_id", ""), + } + res: dict[str, Any] = update_dataset(user_auth, dataset_id, update_dataset_payload) + assert res["code"] == 0, res + + @pytest.mark.p1 + def test_no_update_permission_denies_update_dataset( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + team_dataset: dict[str, Any], + ) -> None: + """Test that user without update permission cannot update dataset.""" + dataset_id: str = team_dataset["id"] + user_id: str = team_with_user["user"]["id"] + user_email: str = team_with_user["user"]["email"] + user_password: str = team_with_user["user"]["password"] + tenant_id: str = team_with_user["team"]["id"] + + # Ensure update permission is False (default) + permissions_res: dict[str, Any] = get_user_permissions(web_api_auth, tenant_id, user_id) + assert permissions_res["code"] == 0 + assert permissions_res["data"]["dataset"]["update"] is False + + # User should not be able to update dataset + user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password) + update_dataset_payload: dict[str, Any] = { + "name": f"Updated Dataset {uuid.uuid4().hex[:8]}", + "description": "Updated description", + "parser_id": team_dataset.get("parser_id", ""), + } + res: dict[str, Any] = update_dataset(user_auth, dataset_id, update_dataset_payload) + assert res["code"] != 0 + assert "permission" in res["message"].lower() or "update" in res["message"].lower() + + @pytest.mark.p1 + def test_delete_permission_allows_delete_dataset( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + team_dataset: dict[str, Any], + ) -> None: + """Test that user with delete permission can delete dataset.""" + # Create a new dataset for deletion + tenant_id: str = team_with_user["team"]["id"] + dataset_payload: dict[str, Any] = { + "name": f"Test Dataset Delete {uuid.uuid4().hex[:8]}", + "permission": "team", + "shared_tenant_id": tenant_id, + } + create_res: dict[str, Any] = create_dataset(web_api_auth, dataset_payload) + assert create_res["code"] == 0 + dataset_id: str = create_res["data"]["id"] + + user_id: str = team_with_user["user"]["id"] + user_email: str = team_with_user["user"]["email"] + user_password: str = team_with_user["user"]["password"] + + # Grant delete permission + update_payload: dict[str, Any] = { + "permissions": { + "dataset": {"delete": True}, + } + } + update_res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload) + assert update_res["code"] == 0 + + # User should be able to delete dataset + user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password) + delete_payload: dict[str, Any] = {"ids": [dataset_id]} + res: dict[str, Any] = delete_datasets(user_auth, delete_payload) + assert res["code"] == 0, res + + @pytest.mark.p1 + def test_no_delete_permission_denies_delete_dataset( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + team_dataset: dict[str, Any], + ) -> None: + """Test that user without delete permission cannot delete dataset.""" + dataset_id: str = team_dataset["id"] + user_id: str = team_with_user["user"]["id"] + user_email: str = team_with_user["user"]["email"] + user_password: str = team_with_user["user"]["password"] + tenant_id: str = team_with_user["team"]["id"] + + # Ensure delete permission is False (default) + permissions_res: dict[str, Any] = get_user_permissions(web_api_auth, tenant_id, user_id) + assert permissions_res["code"] == 0 + assert permissions_res["data"]["dataset"]["delete"] is False + + # User should not be able to delete dataset + user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password) + delete_payload: dict[str, Any] = {"ids": [dataset_id]} + res: dict[str, Any] = delete_datasets(user_auth, delete_payload) + assert res["code"] != 0 + assert "permission" in res["message"].lower() or "delete" in res["message"].lower() + + @pytest.mark.p1 + def test_read_permission_allows_list_datasets( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + team_dataset: dict[str, Any], + ) -> None: + """Test that user with read permission can list datasets.""" + user_email: str = team_with_user["user"]["email"] + user_password: str = team_with_user["user"]["password"] + dataset_id: str = team_dataset["id"] + + # User should have read permission by default (allows listing) + user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password) + + res: dict[str, Any] = list_datasets(user_auth) + # Permission check should pass (code != PERMISSION_ERROR) + if res["code"] == 0: + # If listing succeeds, check that the dataset is visible + dataset_ids = [ds["id"] for ds in res.get("data", [])] + assert dataset_id in dataset_ids, "Team dataset should be visible to user with read permission" + else: + # If it fails, it should not be due to permission error + assert "permission" not in res["message"].lower() or "read" not in res["message"].lower() + + @pytest.mark.p1 + def test_no_read_permission_denies_list_datasets( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + team_dataset: dict[str, Any], + ) -> None: + """Test that user without read permission cannot see team datasets in list.""" + user_id: str = team_with_user["user"]["id"] + user_email: str = team_with_user["user"]["email"] + user_password: str = team_with_user["user"]["password"] + tenant_id: str = team_with_user["team"]["id"] + dataset_id: str = team_dataset["id"] + + # Revoke read permission + update_payload: dict[str, Any] = { + "permissions": { + "dataset": {"read": False}, + } + } + update_res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload) + assert update_res["code"] == 0 + + # User should not see team datasets in list + user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password) + res: dict[str, Any] = list_datasets(user_auth) + # If listing succeeds, the team dataset should not be visible + if res["code"] == 0: + dataset_ids = [ds["id"] for ds in res.get("data", [])] + assert dataset_id not in dataset_ids, "Team dataset should not be visible to user without read permission" + + @pytest.mark.p1 + def test_create_permission_allows_create_team_dataset( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + ) -> None: + """Test that user with create permission can create team dataset.""" + user_id: str = team_with_user["user"]["id"] + user_email: str = team_with_user["user"]["email"] + user_password: str = team_with_user["user"]["password"] + tenant_id: str = team_with_user["team"]["id"] + + # Grant create permission + update_payload: dict[str, Any] = { + "permissions": { + "dataset": {"create": True}, + } + } + update_res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload) + assert update_res["code"] == 0 + + # User should be able to create team dataset + user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password) + dataset_payload: dict[str, Any] = { + "name": f"User Created Dataset {uuid.uuid4().hex[:8]}", + "permission": "team", + "shared_tenant_id": tenant_id, + } + res: dict[str, Any] = create_dataset(user_auth, dataset_payload) + assert res["code"] == 0, res + + @pytest.mark.p1 + def test_no_create_permission_denies_create_team_dataset( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + ) -> None: + """Test that user without create permission cannot create team dataset.""" + user_id: str = team_with_user["user"]["id"] + user_email: str = team_with_user["user"]["email"] + user_password: str = team_with_user["user"]["password"] + tenant_id: str = team_with_user["team"]["id"] + + # Ensure create permission is False (default) + permissions_res: dict[str, Any] = get_user_permissions(web_api_auth, tenant_id, user_id) + assert permissions_res["code"] == 0 + assert permissions_res["data"]["dataset"]["create"] is False + + # User should not be able to create team dataset + user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password) + dataset_payload: dict[str, Any] = { + "name": f"User Created Dataset {uuid.uuid4().hex[:8]}", + "permission": "team", + "shared_tenant_id": tenant_id, + } + res: dict[str, Any] = create_dataset(user_auth, dataset_payload) + assert res["code"] != 0 + assert "permission" in res["message"].lower() or "create" in res["message"].lower() + + @pytest.mark.p1 + def test_owner_always_has_full_permissions( + self, + web_api_auth: RAGFlowWebApiAuth, + test_team: dict[str, Any], + team_dataset: dict[str, Any], + ) -> None: + """Test that team owner always has full permissions regardless of settings.""" + dataset_id: str = team_dataset["id"] + owner_id: str = test_team["owner_id"] + tenant_id: str = test_team["id"] + + # Owner should have full permissions + permissions_res: dict[str, Any] = get_user_permissions(web_api_auth, tenant_id, owner_id) + assert permissions_res["code"] == 0 + permissions: dict[str, Any] = permissions_res["data"] + + assert permissions["dataset"]["create"] is True + assert permissions["dataset"]["read"] is True + assert permissions["dataset"]["update"] is True + assert permissions["dataset"]["delete"] is True + + # Owner should be able to perform all operations + update_payload: dict[str, Any] = { + "name": f"Owner Updated Dataset {uuid.uuid4().hex[:8]}", + "description": "Owner updated description", + "parser_id": team_dataset.get("parser_id", ""), + } + res: dict[str, Any] = update_dataset(web_api_auth, dataset_id, update_payload) + assert res["code"] == 0, res + + diff --git a/test/testcases/test_http_api/test_team_management/test_user_permissions.py b/test/testcases/test_http_api/test_team_management/test_user_permissions.py new file mode 100644 index 000000000..a8e63a402 --- /dev/null +++ b/test/testcases/test_http_api/test_team_management/test_user_permissions.py @@ -0,0 +1,586 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import annotations + +import time +import uuid +from typing import Any + +import pytest + +from common import ( + accept_team_invitation, + add_users_to_team, + create_team, + create_user, + encrypt_password, + get_user_permissions, + login_as_user, + update_user_permissions, +) +from configs import INVALID_API_TOKEN +from libs.auth import RAGFlowWebApiAuth + + +# --------------------------------------------------------------------------- +# Test Classes +# --------------------------------------------------------------------------- + + +@pytest.mark.p1 +class TestAuthorization: + """Tests for authentication behavior when managing user permissions.""" + + @pytest.mark.parametrize( + ("invalid_auth", "expected_code", "expected_message"), + [ + (None, 401, "Unauthorized"), + (RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "Unauthorized"), + ], + ) + def test_get_permissions_invalid_auth( + self, + invalid_auth: RAGFlowWebApiAuth | None, + expected_code: int, + expected_message: str, + web_api_auth: RAGFlowWebApiAuth, + ) -> None: + """Test getting permissions with invalid or missing authentication.""" + # Create a team and add a user first + team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"} + team_res: dict[str, Any] = create_team(web_api_auth, team_payload) + if team_res.get("code", -1) != 0: + pytest.skip(f"Team creation failed with code {team_res.get('code')}: {team_res.get('message', 'Unknown error')}. Full response: {team_res}") + + tenant_id: str = team_res["data"]["id"] + + # Create and add a user + email = f"testuser_{uuid.uuid4().hex[:8]}@example.com" + password = "TestPassword123!" + encrypted_password = encrypt_password(password) + user_payload: dict[str, str] = { + "email": email, + "password": encrypted_password, + "nickname": "Test User", + } + user_res: dict[str, Any] = create_user(web_api_auth, user_payload) + if user_res["code"] != 0: + pytest.skip("User creation failed, skipping auth test") + + user_id: str = user_res["data"]["id"] + add_payload: dict[str, list[str]] = {"users": [email]} + add_users_to_team(web_api_auth, tenant_id, add_payload) + + # Small delay + time.sleep(0.5) + + # Accept invitation as the user + user_auth: RAGFlowWebApiAuth = login_as_user(email, password) + accept_team_invitation(user_auth, tenant_id) + + # Try to get permissions with invalid auth + res: dict[str, Any] = get_user_permissions(invalid_auth, tenant_id, user_id) + assert res["code"] == expected_code, res + if expected_message: + assert expected_message in res["message"] + + @pytest.mark.parametrize( + ("invalid_auth", "expected_code", "expected_message"), + [ + (None, 401, "Unauthorized"), + (RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "Unauthorized"), + ], + ) + def test_update_permissions_invalid_auth( + self, + invalid_auth: RAGFlowWebApiAuth | None, + expected_code: int, + expected_message: str, + web_api_auth: RAGFlowWebApiAuth, + ) -> None: + """Test updating permissions with invalid or missing authentication.""" + # Create a team and add a user first + team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"} + team_res: dict[str, Any] = create_team(web_api_auth, team_payload) + if team_res.get("code", -1) != 0: + pytest.skip(f"Team creation failed with code {team_res.get('code')}: {team_res.get('message', 'Unknown error')}. Full response: {team_res}") + + tenant_id: str = team_res["data"]["id"] + + # Create and add a user + email = f"testuser_{uuid.uuid4().hex[:8]}@example.com" + password = "TestPassword123!" + encrypted_password = encrypt_password(password) + user_payload: dict[str, str] = { + "email": email, + "password": encrypted_password, + "nickname": "Test User", + } + user_res: dict[str, Any] = create_user(web_api_auth, user_payload) + if user_res["code"] != 0: + pytest.skip("User creation failed, skipping auth test") + + user_id: str = user_res["data"]["id"] + add_payload: dict[str, list[str]] = {"users": [email]} + add_users_to_team(web_api_auth, tenant_id, add_payload) + + # Small delay + time.sleep(0.5) + + # Accept invitation as the user + user_auth: RAGFlowWebApiAuth = login_as_user(email, password) + accept_team_invitation(user_auth, tenant_id) + + # Try to update permissions with invalid auth + update_payload: dict[str, Any] = { + "permissions": { + "dataset": {"create": True, "read": True, "update": False, "delete": False}, + "canvas": {"create": False, "read": True, "update": False, "delete": False} + } + } + res: dict[str, Any] = update_user_permissions(invalid_auth, tenant_id, user_id, update_payload) + assert res["code"] == expected_code, res + if expected_message: + assert expected_message in res["message"] + + +@pytest.mark.p1 +class TestGetUserPermissions: + """Comprehensive tests for getting user permissions.""" + + @pytest.fixture + def test_team(self, web_api_auth: RAGFlowWebApiAuth) -> dict[str, Any]: + """Create a test team for use in tests.""" + team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"} + res: dict[str, Any] = create_team(web_api_auth, team_payload) + assert res["code"] == 0, f"Team creation failed with code {res.get('code')}: {res.get('message', 'Unknown error')}. Full response: {res}" + return res["data"] + + @pytest.fixture + def team_with_user( + self, + web_api_auth: RAGFlowWebApiAuth, + test_team: dict[str, Any], + ) -> dict[str, Any]: + """Create a team with a user who has accepted the invitation.""" + tenant_id: str = test_team["id"] + + # Create user + email = f"testuser_{uuid.uuid4().hex[:8]}@example.com" + password = "TestPassword123!" + encrypted_password = encrypt_password(password) + user_payload: dict[str, str] = { + "email": email, + "password": encrypted_password, + "nickname": "Test User", + } + user_res: dict[str, Any] = create_user(web_api_auth, user_payload) + assert user_res["code"] == 0 + user_id: str = user_res["data"]["id"] + + # Add user to team + add_payload: dict[str, list[str]] = {"users": [email]} + add_res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload) + assert add_res["code"] == 0 + + # Small delay + time.sleep(0.5) + + # Accept invitation as the user + user_auth: RAGFlowWebApiAuth = login_as_user(email, password) + accept_res: dict[str, Any] = accept_team_invitation(user_auth, tenant_id) + assert accept_res["code"] == 0 + + return { + "team": test_team, + "user": {"id": user_id, "email": email, "password": password}, + } + + @pytest.mark.p1 + def test_get_default_permissions( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + ) -> None: + """Test getting default permissions for a new team member.""" + tenant_id: str = team_with_user["team"]["id"] + user_id: str = team_with_user["user"]["id"] + + res: dict[str, Any] = get_user_permissions(web_api_auth, tenant_id, user_id) + assert res["code"] == 0, res + assert "data" in res + permissions: dict[str, Any] = res["data"] + + # Check structure + assert "dataset" in permissions + assert "canvas" in permissions + + # Check default permissions (read-only) + assert permissions["dataset"]["read"] is True + assert permissions["dataset"]["create"] is False + assert permissions["dataset"]["update"] is False + assert permissions["dataset"]["delete"] is False + + assert permissions["canvas"]["read"] is True + assert permissions["canvas"]["create"] is False + assert permissions["canvas"]["update"] is False + assert permissions["canvas"]["delete"] is False + + @pytest.mark.p1 + def test_get_owner_permissions( + self, + web_api_auth: RAGFlowWebApiAuth, + test_team: dict[str, Any], + ) -> None: + """Test getting permissions for team owner (should have full permissions).""" + tenant_id: str = test_team["id"] + owner_id: str = test_team["owner_id"] + + res: dict[str, Any] = get_user_permissions(web_api_auth, tenant_id, owner_id) + assert res["code"] == 0, res + assert "data" in res + permissions: dict[str, Any] = res["data"] + + # Owner should have full permissions + assert permissions["dataset"]["create"] is True + assert permissions["dataset"]["read"] is True + assert permissions["dataset"]["update"] is True + assert permissions["dataset"]["delete"] is True + + assert permissions["canvas"]["create"] is True + assert permissions["canvas"]["read"] is True + assert permissions["canvas"]["update"] is True + assert permissions["canvas"]["delete"] is True + + @pytest.mark.p1 + def test_get_permissions_user_not_in_team( + self, + web_api_auth: RAGFlowWebApiAuth, + test_team: dict[str, Any], + ) -> None: + """Test getting permissions for a user not in the team.""" + tenant_id: str = test_team["id"] + invalid_user_id: str = f"invalid_{uuid.uuid4().hex[:8]}" + + res: dict[str, Any] = get_user_permissions(web_api_auth, tenant_id, invalid_user_id) + assert res["code"] != 0 + assert "not a member" in res["message"].lower() or res["code"] in [100, 102] + + @pytest.mark.p1 + def test_get_permissions_not_owner_or_admin( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + ) -> None: + """Test that normal users cannot view permissions.""" + tenant_id: str = team_with_user["team"]["id"] + user_id: str = team_with_user["user"]["id"] + user_email: str = team_with_user["user"]["email"] + user_password: str = team_with_user["user"]["password"] + + # Login as normal user + user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password) + + # Try to get permissions (normal user should not be able to) + res: dict[str, Any] = get_user_permissions(user_auth, tenant_id, user_id) + assert res["code"] == 108 # PERMISSION_ERROR + assert "owner" in res["message"].lower() or "admin" in res["message"].lower() + + +@pytest.mark.p1 +class TestUpdateUserPermissions: + """Comprehensive tests for updating user permissions.""" + + @pytest.fixture + def test_team(self, web_api_auth: RAGFlowWebApiAuth) -> dict[str, Any]: + """Create a test team for use in tests.""" + team_payload: dict[str, str] = {"name": f"Test Team {uuid.uuid4().hex[:8]}"} + res: dict[str, Any] = create_team(web_api_auth, team_payload) + assert res["code"] == 0, f"Team creation failed with code {res.get('code')}: {res.get('message', 'Unknown error')}. Full response: {res}" + return res["data"] + + @pytest.fixture + def team_with_user( + self, + web_api_auth: RAGFlowWebApiAuth, + test_team: dict[str, Any], + ) -> dict[str, Any]: + """Create a team with a user who has accepted the invitation.""" + tenant_id: str = test_team["id"] + + # Create user + email = f"testuser_{uuid.uuid4().hex[:8]}@example.com" + password = "TestPassword123!" + encrypted_password = encrypt_password(password) + user_payload: dict[str, str] = { + "email": email, + "password": encrypted_password, + "nickname": "Test User", + } + user_res: dict[str, Any] = create_user(web_api_auth, user_payload) + assert user_res["code"] == 0 + user_id: str = user_res["data"]["id"] + + # Add user to team + add_payload: dict[str, list[str]] = {"users": [email]} + add_res: dict[str, Any] = add_users_to_team(web_api_auth, tenant_id, add_payload) + assert add_res["code"] == 0 + + # Small delay + time.sleep(0.5) + + # Accept invitation as the user + user_auth: RAGFlowWebApiAuth = login_as_user(email, password) + accept_res: dict[str, Any] = accept_team_invitation(user_auth, tenant_id) + assert accept_res["code"] == 0 + + return { + "team": test_team, + "user": {"id": user_id, "email": email, "password": password}, + } + + @pytest.mark.p1 + def test_update_permissions_grant_create( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + ) -> None: + """Test updating permissions to grant create permission.""" + tenant_id: str = team_with_user["team"]["id"] + user_id: str = team_with_user["user"]["id"] + + # Update permissions to grant create for dataset + update_payload: dict[str, Any] = { + "permissions": { + "dataset": {"create": True}, + } + } + res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload) + assert res["code"] == 0, res + assert "data" in res + permissions: dict[str, Any] = res["data"] + + # Check that create is now True, but other permissions remain default + assert permissions["dataset"]["create"] is True + assert permissions["dataset"]["read"] is True # Default + assert permissions["dataset"]["update"] is False # Default + assert permissions["dataset"]["delete"] is False # Default + + @pytest.mark.p1 + def test_update_permissions_grant_all( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + ) -> None: + """Test updating permissions to grant all CRUD permissions.""" + tenant_id: str = team_with_user["team"]["id"] + user_id: str = team_with_user["user"]["id"] + + # Update permissions to grant all for both dataset and canvas + update_payload: dict[str, Any] = { + "permissions": { + "dataset": {"create": True, "read": True, "update": True, "delete": True}, + "canvas": {"create": True, "read": True, "update": True, "delete": True} + } + } + res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload) + assert res["code"] == 0, res + assert "data" in res + permissions: dict[str, Any] = res["data"] + + # Verify all permissions are True + for resource_type in ["dataset", "canvas"]: + assert permissions[resource_type]["create"] is True + assert permissions[resource_type]["read"] is True + assert permissions[resource_type]["update"] is True + assert permissions[resource_type]["delete"] is True + + @pytest.mark.p1 + def test_update_permissions_revoke_read( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + ) -> None: + """Test updating permissions to revoke read permission.""" + tenant_id: str = team_with_user["team"]["id"] + user_id: str = team_with_user["user"]["id"] + + # First grant all permissions + update_payload: dict[str, Any] = { + "permissions": { + "dataset": {"create": True, "read": True, "update": True, "delete": True}, + } + } + res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload) + assert res["code"] == 0 + + # Then revoke read + revoke_payload: dict[str, Any] = { + "permissions": { + "dataset": {"read": False}, + } + } + res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, revoke_payload) + assert res["code"] == 0, res + permissions: dict[str, Any] = res["data"] + + # Read should be False, but other permissions should remain + assert permissions["dataset"]["read"] is False + assert permissions["dataset"]["create"] is True + assert permissions["dataset"]["update"] is True + assert permissions["dataset"]["delete"] is True + + @pytest.mark.p1 + def test_update_permissions_partial_update( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + ) -> None: + """Test that partial permission updates only change specified fields.""" + tenant_id: str = team_with_user["team"]["id"] + user_id: str = team_with_user["user"]["id"] + + # Update only canvas create permission + update_payload: dict[str, Any] = { + "permissions": { + "canvas": {"create": True}, + } + } + res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload) + assert res["code"] == 0, res + permissions: dict[str, Any] = res["data"] + + # Canvas create should be True, but dataset should remain default + assert permissions["canvas"]["create"] is True + assert permissions["dataset"]["create"] is False # Still default + + @pytest.mark.p1 + def test_update_permissions_owner_or_admin( + self, + web_api_auth: RAGFlowWebApiAuth, + test_team: dict[str, Any], + ) -> None: + """Test that owner/admin permissions cannot be updated.""" + tenant_id: str = test_team["id"] + owner_id: str = test_team["owner_id"] + + # Try to update owner permissions + update_payload: dict[str, Any] = { + "permissions": { + "dataset": {"create": False}, + } + } + res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, owner_id, update_payload) + assert res["code"] != 0 + assert "owner" in res["message"].lower() or "admin" in res["message"].lower() + + @pytest.mark.p1 + def test_update_permissions_user_not_in_team( + self, + web_api_auth: RAGFlowWebApiAuth, + test_team: dict[str, Any], + ) -> None: + """Test updating permissions for a user not in the team.""" + tenant_id: str = test_team["id"] + invalid_user_id: str = f"invalid_{uuid.uuid4().hex[:8]}" + + update_payload: dict[str, Any] = { + "permissions": { + "dataset": {"create": True}, + } + } + res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, invalid_user_id, update_payload) + assert res["code"] != 0 + assert "not a member" in res["message"].lower() or res["code"] in [100, 102] + + @pytest.mark.p1 + def test_update_permissions_not_owner_or_admin( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + ) -> None: + """Test that normal users cannot update permissions.""" + tenant_id: str = team_with_user["team"]["id"] + user_id: str = team_with_user["user"]["id"] + user_email: str = team_with_user["user"]["email"] + user_password: str = team_with_user["user"]["password"] + + # Login as normal user + user_auth: RAGFlowWebApiAuth = login_as_user(user_email, user_password) + + # Try to update permissions (normal user should not be able to) + update_payload: dict[str, Any] = { + "permissions": { + "dataset": {"create": True}, + } + } + res: dict[str, Any] = update_user_permissions(user_auth, tenant_id, user_id, update_payload) + assert res["code"] == 108 # PERMISSION_ERROR + assert "owner" in res["message"].lower() or "admin" in res["message"].lower() + + @pytest.mark.p1 + def test_update_permissions_invalid_resource_type( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + ) -> None: + """Test updating permissions with invalid resource type.""" + tenant_id: str = team_with_user["team"]["id"] + user_id: str = team_with_user["user"]["id"] + + update_payload: dict[str, Any] = { + "permissions": { + "invalid_resource": {"create": True}, + } + } + res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload) + assert res["code"] != 0 + assert "invalid" in res["message"].lower() or "resource" in res["message"].lower() + + @pytest.mark.p1 + def test_update_permissions_invalid_permission_name( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + ) -> None: + """Test updating permissions with invalid permission name.""" + tenant_id: str = team_with_user["team"]["id"] + user_id: str = team_with_user["user"]["id"] + + update_payload: dict[str, Any] = { + "permissions": { + "dataset": {"invalid_permission": True}, + } + } + res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload) + assert res["code"] != 0 + assert "invalid" in res["message"].lower() or "permission" in res["message"].lower() + + @pytest.mark.p1 + def test_update_permissions_missing_permissions_field( + self, + web_api_auth: RAGFlowWebApiAuth, + team_with_user: dict[str, Any], + ) -> None: + """Test updating permissions without permissions field.""" + tenant_id: str = team_with_user["team"]["id"] + user_id: str = team_with_user["user"]["id"] + + update_payload: dict[str, Any] = {} + res: dict[str, Any] = update_user_permissions(web_api_auth, tenant_id, user_id, update_payload) + assert res["code"] != 0 + assert "required" in res["message"].lower() or "permissions" in res["message"].lower() + diff --git a/test/testcases/test_http_api/test_user_management/test_create_user.py b/test/testcases/test_http_api/test_user_management/test_create_user.py index 32f67f020..c9c891bbf 100644 --- a/test/testcases/test_http_api/test_user_management/test_create_user.py +++ b/test/testcases/test_http_api/test_user_management/test_create_user.py @@ -25,7 +25,7 @@ import pytest from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5 from Cryptodome.PublicKey import RSA -from ..common import create_user +from common import create_user from configs import INVALID_API_TOKEN from libs.auth import RAGFlowHttpApiAuth, RAGFlowWebApiAuth @@ -95,8 +95,8 @@ class TestUserCreate: "email": "", "password": "test123", }, - 103, - "Invalid email address", + 101, + "Email is required!", ), ( { @@ -162,7 +162,7 @@ class TestUserCreate: ("user@", 103, "Invalid email address"), ("user@example", 103, "Invalid email address"), ("user@.com", 103, "Invalid email address"), - ("", 103, "Invalid email address"), + ("", 101, "Email is required!"), ], ) def test_email_validation( diff --git a/uv.lock b/uv.lock index f68b2262d..80ae46cc2 100644 --- a/uv.lock +++ b/uv.lock @@ -5449,6 +5449,7 @@ dependencies = [ { name = "ranx" }, { name = "readability-lxml" }, { name = "replicate" }, + { name = "reportlab" }, { name = "requests" }, { name = "roman-numbers" }, { name = "ruamel-base" }, @@ -5611,6 +5612,7 @@ requires-dist = [ { name = "ranx", specifier = "==0.3.20" }, { name = "readability-lxml", specifier = "==0.8.1" }, { name = "replicate", specifier = "==0.31.0" }, + { name = "reportlab", specifier = ">=4.4.3" }, { name = "requests", specifier = "==2.32.2" }, { name = "roman-numbers", specifier = "==1.0.2" }, { name = "ruamel-base", specifier = "==1.0.0" },