[OND211-2329]: Added permissions for team users to CRUD on datasets and agents.

This commit is contained in:
Hetavi Shah 2025-11-21 19:11:11 +05:30
parent 60a6265b86
commit 89d27c75b6
10 changed files with 625 additions and 109 deletions

View file

@ -30,6 +30,7 @@ from api.db.services.pipeline_operation_log_service import PipelineOperationLogS
from api.db.services.task_service import queue_dataflow, CANVAS_DEBUG_DOC_ID, TaskService from api.db.services.task_service import queue_dataflow, CANVAS_DEBUG_DOC_ID, TaskService
from api.db.services.user_service import TenantService from api.db.services.user_service import TenantService
from api.db.services.user_canvas_version import UserCanvasVersionService from api.db.services.user_canvas_version import UserCanvasVersionService
from api.common.permission_utils import has_permission
from common.constants import RetCode from common.constants import RetCode
from common.misc_utils import get_uuid from common.misc_utils import get_uuid
from api.utils.api_utils import get_json_result, server_error_response, validate_request, get_data_error_result, \ from api.utils.api_utils import get_json_result, server_error_response, validate_request, get_data_error_result, \
@ -60,10 +61,13 @@ def templates():
async def rm(): async def rm():
req = await request_json() req = await request_json()
for i in req["canvas_ids"]: for i in req["canvas_ids"]:
if not UserCanvasService.accessible(i, current_user.id): # Check delete permission
if not UserCanvasService.accessible(i, current_user.id, required_permission="delete"):
return get_json_result( return get_json_result(
data=False, message='Only owner of canvas authorized for this operation.', data=False,
code=RetCode.OPERATING_ERROR) message='You do not have delete permission for this canvas.',
code=RetCode.PERMISSION_ERROR
)
UserCanvasService.delete_by_id(i) UserCanvasService.delete_by_id(i)
return get_json_result(data=True) return get_json_result(data=True)
@ -98,6 +102,17 @@ async def save() -> Any:
cate = req.get("canvas_category", CanvasCategory.Agent) cate = req.get("canvas_category", CanvasCategory.Agent)
if "id" not in req: if "id" not in req:
# Check create permission if sharing with team
if req.get("permission") == "team":
shared_tenant_id: Optional[str] = req.get("shared_tenant_id")
target_tenant_id: str = shared_tenant_id if shared_tenant_id else current_user.id
if not has_permission(target_tenant_id, current_user.id, "canvas", "create"):
return get_json_result(
data=False,
message='You do not have create permission for canvases in this team.',
code=RetCode.PERMISSION_ERROR
)
req["user_id"] = current_user.id req["user_id"] = current_user.id
if UserCanvasService.query(user_id=current_user.id, title=req["title"].strip(), canvas_category=cate): if UserCanvasService.query(user_id=current_user.id, title=req["title"].strip(), canvas_category=cate):
return get_data_error_result(message=f"{req['title'].strip()} already exists.") return get_data_error_result(message=f"{req['title'].strip()} already exists.")
@ -105,10 +120,13 @@ async def save() -> Any:
if not UserCanvasService.save(**req): if not UserCanvasService.save(**req):
return get_data_error_result(message="Fail to save canvas.") return get_data_error_result(message="Fail to save canvas.")
else: else:
if not UserCanvasService.accessible(req["id"], current_user.id): # Check update permission
if not UserCanvasService.accessible(req["id"], current_user.id, required_permission="update"):
return get_json_result( return get_json_result(
data=False, message='Only owner of canvas authorized for this operation.', data=False,
code=RetCode.OPERATING_ERROR) message='You do not have update permission for this canvas.',
code=RetCode.PERMISSION_ERROR
)
UserCanvasService.update_by_id(req["id"], req) UserCanvasService.update_by_id(req["id"], req)
# save version # save version
UserCanvasVersionService.insert(user_canvas_id=req["id"], dsl=req["dsl"], title="{0}_{1}".format(req["title"], time.strftime("%Y_%m_%d_%H_%M_%S"))) UserCanvasVersionService.insert(user_canvas_id=req["id"], dsl=req["dsl"], title="{0}_{1}".format(req["title"], time.strftime("%Y_%m_%d_%H_%M_%S")))
@ -156,10 +174,13 @@ async def run():
files = req.get("files", []) files = req.get("files", [])
inputs = req.get("inputs", {}) inputs = req.get("inputs", {})
user_id = req.get("user_id", current_user.id) user_id = req.get("user_id", current_user.id)
if not UserCanvasService.accessible(req["id"], current_user.id): # Check read permission (to run the canvas)
if not UserCanvasService.accessible(req["id"], current_user.id, required_permission="read"):
return get_json_result( return get_json_result(
data=False, message='Only owner of canvas authorized for this operation.', data=False,
code=RetCode.OPERATING_ERROR) message='You do not have read permission for this canvas.',
code=RetCode.PERMISSION_ERROR
)
e, cvs = UserCanvasService.get_by_id(req["id"]) e, cvs = UserCanvasService.get_by_id(req["id"])
if not e: if not e:
@ -247,10 +268,14 @@ def cancel(task_id):
@login_required @login_required
async def reset(): async def reset():
req = await request_json() req = await request_json()
if not UserCanvasService.accessible(req["id"], current_user.id): # Check update permission (to reset the canvas)
if not UserCanvasService.accessible(req["id"], current_user.id, required_permission="update"):
return get_json_result( return get_json_result(
data=False, message='Only owner of canvas authorized for this operation.', data=False,
code=RetCode.OPERATING_ERROR) message='You do not have update permission for this canvas.',
code=RetCode.PERMISSION_ERROR
)
try: try:
e, user_canvas = UserCanvasService.get_by_id(req["id"]) e, user_canvas = UserCanvasService.get_by_id(req["id"])
if not e: if not e:
@ -366,10 +391,13 @@ def input_form():
@login_required @login_required
async def debug(): async def debug():
req = await request_json() req = await request_json()
if not UserCanvasService.accessible(req["id"], current_user.id): # Check read permission (to debug the canvas)
if not UserCanvasService.accessible(req["id"], current_user.id, required_permission="read"):
return get_json_result( return get_json_result(
data=False, message='Only owner of canvas authorized for this operation.', data=False,
code=RetCode.OPERATING_ERROR) message='You do not have read permission for this canvas.',
code=RetCode.PERMISSION_ERROR
)
try: try:
e, user_canvas = UserCanvasService.get_by_id(req["id"]) e, user_canvas = UserCanvasService.get_by_id(req["id"])
canvas = Canvas(json.dumps(user_canvas.dsl), current_user.id) canvas = Canvas(json.dumps(user_canvas.dsl), current_user.id)
@ -545,10 +573,13 @@ async def setting():
req = await request_json() req = await request_json()
req["user_id"] = current_user.id req["user_id"] = current_user.id
if not UserCanvasService.accessible(req["id"], current_user.id): # Check update permission (to change settings)
if not UserCanvasService.accessible(req["id"], current_user.id, required_permission="update"):
return get_json_result( return get_json_result(
data=False, message='Only owner of canvas authorized for this operation.', data=False,
code=RetCode.OPERATING_ERROR) message='You do not have update permission for this canvas.',
code=RetCode.PERMISSION_ERROR
)
e,flow = UserCanvasService.get_by_id(req["id"]) e,flow = UserCanvasService.get_by_id(req["id"])
if not e: if not e:

View file

@ -43,7 +43,6 @@ from rag.utils.doc_store_conn import OrderByExpr
from common.constants import RetCode, PipelineTaskType, StatusEnum, VALID_TASK_STATUS, FileSource, LLMType, PAGERANK_FLD from common.constants import RetCode, PipelineTaskType, StatusEnum, VALID_TASK_STATUS, FileSource, LLMType, PAGERANK_FLD
from common import settings from common import settings
from api.apps import login_required, current_user from api.apps import login_required, current_user
from common.constants import StatusEnum
@manager.route('/create', methods=['post']) # noqa: F821 @manager.route('/create', methods=['post']) # noqa: F821
@ -105,19 +104,15 @@ async def update():
message=f"Dataset name length is {len(req['name'])} which is large than {DATASET_NAME_LIMIT}") message=f"Dataset name length is {len(req['name'])} which is large than {DATASET_NAME_LIMIT}")
req["name"] = req["name"].strip() req["name"] = req["name"].strip()
if not KnowledgebaseService.accessible4deletion(req["kb_id"], current_user.id): # Check if user has update permission
if not KnowledgebaseService.accessible(req["kb_id"], current_user.id, required_permission="update"):
return get_json_result( return get_json_result(
data=False, data=False,
message='No authorization.', message='You do not have update permission for this knowledge base.',
code=RetCode.AUTHENTICATION_ERROR code=RetCode.PERMISSION_ERROR
) )
try: try:
if not KnowledgebaseService.query(
created_by=current_user.id, id=req["kb_id"]):
return get_json_result(
data=False, message='Only owner of knowledgebase authorized for this operation.',
code=RetCode.OPERATING_ERROR)
e, kb = KnowledgebaseService.get_by_id(req["kb_id"]) e, kb = KnowledgebaseService.get_by_id(req["kb_id"])
if not e: if not e:
return get_data_error_result( return get_data_error_result(
@ -233,11 +228,12 @@ async def list_kbs():
@validate_request("kb_id") @validate_request("kb_id")
async def rm(): async def rm():
req = await request_json() req = await request_json()
# Check if user has delete permission
if not KnowledgebaseService.accessible4deletion(req["kb_id"], current_user.id): if not KnowledgebaseService.accessible4deletion(req["kb_id"], current_user.id):
return get_json_result( return get_json_result(
data=False, data=False,
message='No authorization.', message='You do not have delete permission for this knowledge base.',
code=RetCode.AUTHENTICATION_ERROR code=RetCode.PERMISSION_ERROR
) )
try: try:
kbs = KnowledgebaseService.query( kbs = KnowledgebaseService.query(

View file

@ -27,7 +27,7 @@ from api.db.services.file2document_service import File2DocumentService
from api.db.services.file_service import FileService from api.db.services.file_service import FileService
from api.db.services.knowledgebase_service import KnowledgebaseService from api.db.services.knowledgebase_service import KnowledgebaseService
from api.db.services.task_service import GRAPH_RAPTOR_FAKE_DOC_ID, TaskService from api.db.services.task_service import GRAPH_RAPTOR_FAKE_DOC_ID, TaskService
from api.db.services.user_service import TenantService from api.db.services.user_service import TenantService, UserTenantService
from common.constants import RetCode, FileSource, StatusEnum from common.constants import RetCode, FileSource, StatusEnum
from api.utils.api_utils import ( from api.utils.api_utils import (
deep_merge, deep_merge,
@ -51,8 +51,6 @@ from api.utils.validation_utils import (
from rag.nlp import search from rag.nlp import search
from common.constants import PAGERANK_FLD from common.constants import PAGERANK_FLD
from common import settings from common import settings
from api.db.services.user_service import UserTenantService
from common.constants import StatusEnum
@manager.route("/datasets", methods=["POST"]) # noqa: F821 @manager.route("/datasets", methods=["POST"]) # noqa: F821
@token_required @token_required
@ -238,6 +236,11 @@ async def delete(tenant_id):
errors = [] errors = []
success_count = 0 success_count = 0
for kb_id, kb in kb_id_instance_pairs: for kb_id, kb in kb_id_instance_pairs:
# Check delete permission for each dataset
if not KnowledgebaseService.accessible4deletion(kb_id, tenant_id):
errors.append(f"User lacks delete permission for dataset '{kb_id}'")
continue
for doc in DocumentService.query(kb_id=kb_id): for doc in DocumentService.query(kb_id=kb_id):
if not DocumentService.remove_document(doc, tenant_id): if not DocumentService.remove_document(doc, tenant_id):
errors.append(f"Remove document '{doc.id}' error for dataset '{kb_id}'") errors.append(f"Remove document '{doc.id}' error for dataset '{kb_id}'")
@ -346,10 +349,15 @@ async def update(tenant_id, dataset_id):
return get_error_argument_result(message="No properties were modified") return get_error_argument_result(message="No properties were modified")
try: try:
# Check update permission
if not KnowledgebaseService.accessible(dataset_id, tenant_id, required_permission="update"):
return get_error_permission_result(
message=f"User lacks update permission for dataset '{dataset_id}'")
kb = KnowledgebaseService.get_or_none(id=dataset_id, tenant_id=tenant_id) kb = KnowledgebaseService.get_or_none(id=dataset_id, tenant_id=tenant_id)
if kb is None: if kb is None:
return get_error_permission_result( return get_error_permission_result(
message=f"User '{tenant_id}' lacks permission for dataset '{dataset_id}'") message=f"Dataset '{dataset_id}' not found")
if req.get("parser_config"): if req.get("parser_config"):
req["parser_config"] = deep_merge(kb.parser_config, req["parser_config"]) req["parser_config"] = deep_merge(kb.parser_config, req["parser_config"])

View file

@ -17,10 +17,9 @@ import logging
from threading import Thread from threading import Thread
from typing import Any, Dict, List, Optional, Set, Union from typing import Any, Dict, List, Optional, Set, Union
from quart import request
from api.db import UserTenantRole
from flask import Response, request, Blueprint from flask import Response, request, Blueprint
from flask_login import current_user, login_required from flask_login import current_user, login_required
from quart import request
from api.apps import smtp_mail_server from api.apps import smtp_mail_server
from api.db import FileType, UserTenantRole from api.db import FileType, UserTenantRole
@ -33,10 +32,6 @@ from api.db.services.user_service import (
UserService, UserService,
UserTenantService, UserTenantService,
) )
from common.constants import RetCode, StatusEnum
from common.misc_utils import get_uuid
from common.time_utils import delta_seconds
from api.utils.api_utils import ( from api.utils.api_utils import (
get_data_error_result, get_data_error_result,
get_json_result, get_json_result,
@ -44,8 +39,11 @@ from api.utils.api_utils import (
validate_request, validate_request,
) )
from api.utils.web_utils import send_invite_email from api.utils.web_utils import send_invite_email
from api.common.permission_utils import get_user_permissions, update_user_permissions
from common import settings from common import settings
from api.apps import smtp_mail_server, login_required, current_user from common.constants import RetCode, StatusEnum
from common.misc_utils import get_uuid
from common.time_utils import delta_seconds
manager = Blueprint("tenant", __name__) manager = Blueprint("tenant", __name__)
def is_team_admin_or_owner(tenant_id: str, user_id: str) -> bool: def is_team_admin_or_owner(tenant_id: str, user_id: str) -> bool:
@ -310,6 +308,12 @@ def create_team() -> Response:
owner_user_id: Optional[str] = user_id owner_user_id: Optional[str] = user_id
if not owner_user_id: if not owner_user_id:
# Use current authenticated user as default # Use current authenticated user as default
if not current_user or not hasattr(current_user, 'id') or not current_user.id:
return get_json_result(
data=False,
message="Unable to determine user ID. Please ensure you are authenticated.",
code=RetCode.UNAUTHORIZED,
)
owner_user_id = current_user.id owner_user_id = current_user.id
# Verify user exists # Verify user exists
@ -781,11 +785,17 @@ def update_request(tenant_id: str) -> Response:
if accept: if accept:
# Accept invitation - update role from INVITE to the specified role # Accept invitation - update role from INVITE to the specified role
role: str = UserTenantRole.NORMAL.value role: str = UserTenantRole.NORMAL.value
# Set default permissions: read-only access to datasets and canvases
default_permissions: Dict[str, Any] = {
"dataset": {"create": False, "read": True, "update": False, "delete": False},
"canvas": {"create": False, "read": True, "update": False, "delete": False}
}
# Update role from INVITE to the specified role (defaults to NORMAL) # Update role from INVITE to the specified role (defaults to NORMAL) and set default permissions
UserTenantService.filter_update( UserTenantService.filter_update(
[UserTenant.tenant_id == tenant_id, UserTenant.user_id == current_user.id], [UserTenant.tenant_id == tenant_id, UserTenant.user_id == current_user.id],
{"role": role, "status": StatusEnum.VALID.value} {"role": role, "status": StatusEnum.VALID.value, "permissions": default_permissions}
) )
return get_json_result(data=True, message=f"Successfully joined the team with role '{role}'.") return get_json_result(data=True, message=f"Successfully joined the team with role '{role}'.")
else: else:
@ -964,6 +974,12 @@ def add_users(tenant_id: str) -> Response:
}) })
continue continue
# Set default permissions: read-only access to datasets and canvases
default_permissions: Dict[str, Any] = {
"dataset": {"create": False, "read": True, "update": False, "delete": False},
"canvas": {"create": False, "read": True, "update": False, "delete": False}
}
# Send invitation - create user with INVITE role (user must accept to join) # Send invitation - create user with INVITE role (user must accept to join)
UserTenantService.save( UserTenantService.save(
id=get_uuid(), id=get_uuid(),
@ -971,7 +987,8 @@ def add_users(tenant_id: str) -> Response:
tenant_id=tenant_id, tenant_id=tenant_id,
invited_by=current_user.id, invited_by=current_user.id,
role=UserTenantRole.INVITE, # Start with INVITE role role=UserTenantRole.INVITE, # Start with INVITE role
status=StatusEnum.VALID.value status=StatusEnum.VALID.value,
permissions=default_permissions
) )
# Send invitation email if configured # Send invitation email if configured
@ -1382,3 +1399,285 @@ def demote_admin(tenant_id: str, user_id: str) -> Response:
except Exception as e: except Exception as e:
logging.exception(e) logging.exception(e)
return server_error_response(e) return server_error_response(e)
@manager.route('/<tenant_id>/users/<user_id>/permissions', methods=['GET']) # noqa: F821
@login_required
def get_user_permissions(tenant_id: str, user_id: str) -> Response:
"""
Get CRUD permissions for a team member.
Only team owners or admins can view permissions.
---
tags:
- Team
security:
- ApiKeyAuth: []
parameters:
- in: path
name: tenant_id
required: true
type: string
description: Team ID
- in: path
name: user_id
required: true
type: string
description: User ID to get permissions for
responses:
200:
description: Permissions retrieved successfully.
schema:
type: object
properties:
data:
type: object
properties:
dataset:
type: object
properties:
create:
type: boolean
read:
type: boolean
update:
type: boolean
delete:
type: boolean
canvas:
type: object
properties:
create:
type: boolean
read:
type: boolean
update:
type: boolean
delete:
type: boolean
message:
type: string
401:
description: Unauthorized.
403:
description: Forbidden - not team owner or admin.
404:
description: User not found in team.
"""
# Check if current user is team owner or admin
if not is_team_admin_or_owner(tenant_id, current_user.id):
return get_json_result(
data=False,
message="Only team owners or admins can view permissions.",
code=RetCode.PERMISSION_ERROR,
)
try:
# Check if target user exists in the team
user_tenant: Optional[UserTenant] = UserTenantService.filter_by_tenant_and_user_id(
tenant_id, user_id
)
if not user_tenant:
return get_json_result(
data=False,
message="User is not a member of this team.",
code=RetCode.DATA_ERROR,
)
permissions: Dict[str, Dict[str, bool]] = get_user_permissions(tenant_id, user_id)
return get_json_result(
data=permissions,
message="Permissions retrieved successfully.",
)
except Exception as e:
logging.exception(e)
return server_error_response(e)
@manager.route('/<tenant_id>/users/<user_id>/permissions', methods=['PUT']) # noqa: F821
@login_required
@validate_request("permissions")
def update_user_permissions(tenant_id: str, user_id: str) -> Response:
"""
Update CRUD permissions for a team member.
Only team owners or admins can update permissions.
Owners and admins always have full permissions and cannot be restricted.
---
tags:
- Team
security:
- ApiKeyAuth: []
parameters:
- in: path
name: tenant_id
required: true
type: string
description: Team ID
- in: path
name: user_id
required: true
type: string
description: User ID to update permissions for
- in: body
name: body
required: true
schema:
type: object
required:
- permissions
properties:
permissions:
type: object
description: Permissions to update (only provided fields will be updated)
properties:
dataset:
type: object
properties:
create:
type: boolean
read:
type: boolean
update:
type: boolean
delete:
type: boolean
canvas:
type: object
properties:
create:
type: boolean
read:
type: boolean
update:
type: boolean
delete:
type: boolean
responses:
200:
description: Permissions updated successfully.
schema:
type: object
properties:
data:
type: object
description: Updated permissions
message:
type: string
400:
description: Invalid request.
401:
description: Unauthorized.
403:
description: Forbidden - not team owner or admin, or trying to restrict owner/admin.
404:
description: User not found in team.
"""
# Check if current user is team owner or admin
if not is_team_admin_or_owner(tenant_id, current_user.id):
return get_json_result(
data=False,
message="Only team owners or admins can update permissions.",
code=RetCode.PERMISSION_ERROR,
)
if request.json is None:
return get_json_result(
data=False,
message="Request body is required!",
code=RetCode.ARGUMENT_ERROR,
)
req: Dict[str, Any] = request.json
permissions: Optional[Dict[str, Any]] = req.get("permissions")
if not permissions or not isinstance(permissions, dict):
return get_json_result(
data=False,
message="'permissions' must be a non-empty object.",
code=RetCode.ARGUMENT_ERROR,
)
try:
# Check if target user exists in the team
user_tenant: Optional[UserTenant] = UserTenantService.filter_by_tenant_and_user_id(
tenant_id, user_id
)
if not user_tenant:
return get_json_result(
data=False,
message="User is not a member of this team.",
code=RetCode.DATA_ERROR,
)
# Owners and admins always have full permissions - cannot be restricted
if user_tenant.role in [UserTenantRole.OWNER, UserTenantRole.ADMIN]:
return get_json_result(
data=False,
message="Cannot update permissions for team owners or admins. They always have full permissions.",
code=RetCode.DATA_ERROR,
)
# Validate permissions structure
valid_resource_types = {"dataset", "canvas"}
valid_permissions = {"create", "read", "update", "delete"}
validated_permissions: Dict[str, Dict[str, bool]] = {}
for resource_type, perms in permissions.items():
if resource_type not in valid_resource_types:
return get_json_result(
data=False,
message=f"Invalid resource type '{resource_type}'. Must be one of: {', '.join(valid_resource_types)}",
code=RetCode.ARGUMENT_ERROR,
)
if not isinstance(perms, dict):
return get_json_result(
data=False,
message=f"Permissions for '{resource_type}' must be an object.",
code=RetCode.ARGUMENT_ERROR,
)
validated_permissions[resource_type] = {}
for perm_name, perm_value in perms.items():
if perm_name not in valid_permissions:
return get_json_result(
data=False,
message=f"Invalid permission '{perm_name}' for '{resource_type}'. Must be one of: {', '.join(valid_permissions)}",
code=RetCode.ARGUMENT_ERROR,
)
if not isinstance(perm_value, bool):
return get_json_result(
data=False,
message=f"Permission value for '{resource_type}.{perm_name}' must be a boolean.",
code=RetCode.ARGUMENT_ERROR,
)
validated_permissions[resource_type][perm_name] = perm_value
# Update permissions
success: bool = update_user_permissions(tenant_id, user_id, validated_permissions)
if not success:
return get_json_result(
data=False,
message="Failed to update permissions.",
code=RetCode.EXCEPTION_ERROR,
)
# Get updated permissions for response
updated_permissions: Dict[str, Dict[str, bool]] = get_user_permissions(tenant_id, user_id)
return get_json_result(
data=updated_permissions,
message="Permissions updated successfully.",
)
except Exception as e:
logging.exception(e)
return server_error_response(e)

View file

@ -23,10 +23,8 @@ import time
from datetime import datetime from datetime import datetime
from typing import Any, Dict, List, Optional, Match from typing import Any, Dict, List, Optional, Match
from quart import redirect, request, session, make_response from quart import redirect, request, session, make_response, Response
from flask import redirect, request, session, make_response, Response
from flask_login import current_user, login_required, login_user, logout_user
from werkzeug.security import check_password_hash, generate_password_hash from werkzeug.security import check_password_hash, generate_password_hash
from api.apps.auth import get_auth_client from api.apps.auth import get_auth_client

View file

@ -14,17 +14,20 @@
# limitations under the License. # limitations under the License.
# #
from typing import Dict, Any, Optional, List from typing import Dict, Any, Optional, List, Literal
from api.db import TenantPermission from api.db import TenantPermission
from api.db.db_models import File, Knowledgebase from api.db.db_models import File, Knowledgebase
from api.db.services.file_service import FileService
from api.db.services.knowledgebase_service import KnowledgebaseService
from api.db.services.user_service import TenantService, UserTenantService from api.db.services.user_service import TenantService, UserTenantService
from api.common.permission_utils import has_permission
from common.constants import StatusEnum from common.constants import StatusEnum
def check_kb_team_permission(kb: Dict[str, Any] | Knowledgebase, other: str) -> bool: def check_kb_team_permission(
kb: Dict[str, Any] | Knowledgebase,
other: str,
required_permission: Literal["create", "read", "update", "delete"] = "read"
) -> bool:
kb = kb.to_dict() if isinstance(kb, Knowledgebase) else kb kb = kb.to_dict() if isinstance(kb, Knowledgebase) else kb
kb_tenant_id = kb["tenant_id"] kb_tenant_id = kb["tenant_id"]
@ -37,19 +40,23 @@ def check_kb_team_permission(kb: Dict[str, Any] | Knowledgebase, other: str) ->
if kb["permission"] != TenantPermission.TEAM: if kb["permission"] != TenantPermission.TEAM:
return False return False
# If shared_tenant_id is specified, check if user is a member of that specific tenant # Determine which tenant to check permissions for
shared_tenant_id: Optional[str] = kb.get("shared_tenant_id") shared_tenant_id: Optional[str] = kb.get("shared_tenant_id")
if shared_tenant_id: target_tenant_id: str = shared_tenant_id if shared_tenant_id else kb_tenant_id
# Check if user is a member of the shared tenant
user_tenant = UserTenantService.filter_by_tenant_and_user_id(shared_tenant_id, other) # Check if user is a member of the target tenant
return user_tenant is not None and user_tenant.status == StatusEnum.VALID.value user_tenant = UserTenantService.filter_by_tenant_and_user_id(target_tenant_id, other)
if not user_tenant or user_tenant.status != StatusEnum.VALID.value:
# Legacy behavior: if no shared_tenant_id, check if user is a member of the KB's tenant return False
joined_tenants: List[Dict[str, Any]] = TenantService.get_joined_tenants_by_user_id(other)
return any(tenant["tenant_id"] == kb_tenant_id for tenant in joined_tenants) # Check CRUD permissions
return has_permission(target_tenant_id, other, "dataset", required_permission)
def check_file_team_permission(file: Dict[str, Any] | File, other: str) -> bool: def check_file_team_permission(file: Dict[str, Any] | File, other: str) -> bool:
# Import here to avoid circular import
from api.db.services.file_service import FileService
file = file.to_dict() if isinstance(file, File) else file file = file.to_dict() if isinstance(file, File) else file
file_tenant_id = file["tenant_id"] file_tenant_id = file["tenant_id"]
@ -60,6 +67,9 @@ def check_file_team_permission(file: Dict[str, Any] | File, other: str) -> bool:
kb_ids: List[str] = [kb_info["kb_id"] for kb_info in FileService.get_kb_id_by_file_id(file_id)] kb_ids: List[str] = [kb_info["kb_id"] for kb_info in FileService.get_kb_id_by_file_id(file_id)]
# Import here to avoid circular import
from api.db.services.knowledgebase_service import KnowledgebaseService
for kb_id in kb_ids: for kb_id in kb_ids:
ok: bool ok: bool
kb: Optional[Knowledgebase] kb: Optional[Knowledgebase]

View file

@ -0,0 +1,147 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Dict, Any, Optional, Literal
from api.db.services.user_service import UserTenantService
from api.db.db_models import UserTenant
from api.db import UserTenantRole
from common.constants import StatusEnum
def get_user_permissions(tenant_id: str, user_id: str) -> Dict[str, Dict[str, bool]]:
"""
Get CRUD permissions for a user in a tenant.
Args:
tenant_id: The tenant ID.
user_id: The user ID.
Returns:
Dictionary with permissions for dataset and canvas:
{
"dataset": {"create": bool, "read": bool, "update": bool, "delete": bool},
"canvas": {"create": bool, "read": bool, "update": bool, "delete": bool}
}
Returns default permissions (read-only) if user not found or no permissions set.
"""
user_tenant = UserTenantService.filter_by_tenant_and_user_id(tenant_id, user_id)
if not user_tenant or user_tenant.status != StatusEnum.VALID.value:
# Return default read-only permissions
return {
"dataset": {"create": False, "read": True, "update": False, "delete": False},
"canvas": {"create": False, "read": True, "update": False, "delete": False}
}
# Owners and admins have full permissions
if user_tenant.role in [UserTenantRole.OWNER, UserTenantRole.ADMIN]:
return {
"dataset": {"create": True, "read": True, "update": True, "delete": True},
"canvas": {"create": True, "read": True, "update": True, "delete": True}
}
# Get permissions from user_tenant.permissions, with defaults
permissions = user_tenant.permissions if user_tenant.permissions else {}
default_permissions = {
"dataset": {"create": False, "read": True, "update": False, "delete": False},
"canvas": {"create": False, "read": True, "update": False, "delete": False}
}
# Merge with defaults to ensure all fields are present
result: Dict[str, Dict[str, bool]] = {}
for resource_type in ["dataset", "canvas"]:
result[resource_type] = {
"create": permissions.get(resource_type, {}).get("create", default_permissions[resource_type]["create"]),
"read": permissions.get(resource_type, {}).get("read", default_permissions[resource_type]["read"]),
"update": permissions.get(resource_type, {}).get("update", default_permissions[resource_type]["update"]),
"delete": permissions.get(resource_type, {}).get("delete", default_permissions[resource_type]["delete"]),
}
return result
def has_permission(
tenant_id: str,
user_id: str,
resource_type: Literal["dataset", "canvas"],
permission: Literal["create", "read", "update", "delete"]
) -> bool:
"""
Check if a user has a specific permission for a resource type in a tenant.
Args:
tenant_id: The tenant ID.
user_id: The user ID.
resource_type: Type of resource ("dataset" or "canvas").
permission: The permission to check ("create", "read", "update", or "delete").
Returns:
True if user has the permission, False otherwise.
"""
permissions = get_user_permissions(tenant_id, user_id)
return permissions.get(resource_type, {}).get(permission, False)
def update_user_permissions(
tenant_id: str,
user_id: str,
permissions: Dict[str, Dict[str, bool]]
) -> bool:
"""
Update CRUD permissions for a user in a tenant.
Args:
tenant_id: The tenant ID.
user_id: The user ID.
permissions: Dictionary with permissions to update:
{
"dataset": {"create": bool, "read": bool, "update": bool, "delete": bool},
"canvas": {"create": bool, "read": bool, "update": bool, "delete": bool}
}
Only provided fields will be updated, others will remain unchanged.
Returns:
True if update was successful, False otherwise.
"""
user_tenant = UserTenantService.filter_by_tenant_and_user_id(tenant_id, user_id)
if not user_tenant:
return False
# Get current permissions or defaults
current_permissions = user_tenant.permissions if user_tenant.permissions else {
"dataset": {"create": False, "read": True, "update": False, "delete": False},
"canvas": {"create": False, "read": True, "update": False, "delete": False}
}
# Merge new permissions with current permissions
updated_permissions: Dict[str, Dict[str, bool]] = {}
for resource_type in ["dataset", "canvas"]:
updated_permissions[resource_type] = current_permissions.get(resource_type, {}).copy()
if resource_type in permissions:
updated_permissions[resource_type].update(permissions[resource_type])
# Update in database
from api.db.db_models import UserTenant as UserTenantModel
UserTenantService.filter_update(
[UserTenantModel.tenant_id == tenant_id, UserTenantModel.user_id == user_id],
{"permissions": updated_permissions}
)
return True

View file

@ -649,6 +649,7 @@ class UserTenant(DataBaseModel):
role = CharField(max_length=32, null=False, help_text="UserTenantRole", index=True) role = CharField(max_length=32, null=False, help_text="UserTenantRole", index=True)
invited_by = CharField(max_length=32, null=False, index=True) invited_by = CharField(max_length=32, null=False, index=True)
status = CharField(max_length=1, null=True, help_text="is it validate(0: wasted, 1: validate)", default="1", index=True) status = CharField(max_length=1, null=True, help_text="is it validate(0: wasted, 1: validate)", default="1", index=True)
permissions = JSONField(null=True, help_text="CRUD permissions for datasets and canvases", default={"dataset": {"create": False, "read": True, "update": False, "delete": False}, "canvas": {"create": False, "read": True, "update": False, "delete": False}})
class Meta: class Meta:
db_table = "user_tenant" db_table = "user_tenant"
@ -1211,6 +1212,11 @@ def migrate_db():
migrate(migrator.add_column("user_canvas", "shared_tenant_id", CharField(max_length=32, null=True, help_text="Specific tenant ID to share with when permission is 'team'", index=True))) migrate(migrator.add_column("user_canvas", "shared_tenant_id", CharField(max_length=32, null=True, help_text="Specific tenant ID to share with when permission is 'team'", index=True)))
except Exception: except Exception:
pass pass
try:
default_permissions = {"dataset": {"create": False, "read": True, "update": False, "delete": False}, "canvas": {"create": False, "read": True, "update": False, "delete": False}}
migrate(migrator.add_column("user_tenant", "permissions", JSONField(null=True, help_text="CRUD permissions for datasets and canvases", default=default_permissions)))
except Exception:
pass
try: try:
migrate(migrator.add_column("llm", "is_tools", BooleanField(null=False, help_text="support tools", default=False))) migrate(migrator.add_column("llm", "is_tools", BooleanField(null=False, help_text="support tools", default=False)))
except Exception: except Exception:

View file

@ -16,7 +16,7 @@
import json import json
import logging import logging
import time import time
from typing import List, Dict, Any, Optional, Tuple from typing import List, Dict, Any, Optional, Tuple, Literal
from uuid import uuid4 from uuid import uuid4
from agent.canvas import Canvas from agent.canvas import Canvas
from api.db import CanvasCategory, TenantPermission from api.db import CanvasCategory, TenantPermission
@ -28,10 +28,9 @@ from api.utils.api_utils import get_data_openai
import tiktoken import tiktoken
from peewee import fn from peewee import fn
from api.db.services.user_service import UserTenantService from api.db.services.user_service import UserTenantService
from common.constants import StatusEnum
from api.db.services.user_service import UserTenantService
from api.db import TenantPermission from api.db import TenantPermission
from common.constants import StatusEnum from common.constants import StatusEnum
from api.common.permission_utils import has_permission
class CanvasTemplateService(CommonService): class CanvasTemplateService(CommonService):
model = CanvasTemplate model = CanvasTemplate
@ -199,8 +198,12 @@ class UserCanvasService(CommonService):
@classmethod @classmethod
@DB.connection_context() @DB.connection_context()
def accessible(cls, canvas_id: str, tenant_id: str) -> bool: def accessible(
cls,
canvas_id: str,
tenant_id: str,
required_permission: Literal["create", "read", "update", "delete"] = "read"
) -> bool:
e: bool e: bool
c: Optional[Dict[str, Any]] c: Optional[Dict[str, Any]]
@ -208,7 +211,7 @@ class UserCanvasService(CommonService):
if not e or c is None: if not e or c is None:
return False return False
# If user owns the canvas, always allow # If user owns the canvas, always allow (full permissions)
if c["user_id"] == tenant_id: if c["user_id"] == tenant_id:
return True return True
@ -216,15 +219,17 @@ class UserCanvasService(CommonService):
if c.get("permission") != TenantPermission.TEAM.value: if c.get("permission") != TenantPermission.TEAM.value:
return False return False
# If shared_tenant_id is specified, check if user is a member of that specific tenant # Determine which tenant to check permissions for
shared_tenant_id: Optional[str] = c.get("shared_tenant_id") shared_tenant_id: Optional[str] = c.get("shared_tenant_id")
if shared_tenant_id: target_tenant_id: str = shared_tenant_id if shared_tenant_id else str(c["user_id"])
user_tenant = UserTenantService.filter_by_tenant_and_user_id(shared_tenant_id, tenant_id)
return user_tenant is not None and user_tenant.status == StatusEnum.VALID.value # Check if user is a member of the target tenant
user_tenant = UserTenantService.filter_by_tenant_and_user_id(target_tenant_id, tenant_id)
# Legacy behavior: check if user is a member of the canvas owner's tenant if not user_tenant or user_tenant.status != StatusEnum.VALID.value:
tids: List[str] = [str(t.tenant_id) for t in UserTenantService.query(user_id=tenant_id, status=StatusEnum.VALID.value)] return False
return str(c["user_id"]) in tids
# Check CRUD permissions
return has_permission(target_tenant_id, tenant_id, "canvas", required_permission)
async def completion(tenant_id, agent_id, session_id=None, **kwargs): async def completion(tenant_id, agent_id, session_id=None, **kwargs):

View file

@ -14,6 +14,7 @@
# limitations under the License. # limitations under the License.
# #
from datetime import datetime from datetime import datetime
from typing import Optional, Literal
from peewee import fn, JOIN from peewee import fn, JOIN
@ -27,6 +28,8 @@ from common.misc_utils import get_uuid
from common.constants import StatusEnum from common.constants import StatusEnum
from api.constants import DATASET_NAME_LIMIT from api.constants import DATASET_NAME_LIMIT
from api.utils.api_utils import get_parser_config, get_data_error_result from api.utils.api_utils import get_parser_config, get_data_error_result
from api.common.check_team_permission import check_kb_team_permission
from common.constants import TaskStatus
class KnowledgebaseService(CommonService): class KnowledgebaseService(CommonService):
@ -50,37 +53,37 @@ class KnowledgebaseService(CommonService):
@classmethod @classmethod
@DB.connection_context() @DB.connection_context()
def accessible4deletion(cls, kb_id, user_id): def accessible4deletion(cls, kb_id: str, user_id: str) -> bool:
"""Check if a knowledge base can be deleted by a specific user. """Check if a knowledge base can be deleted by a specific user.
This method verifies whether a user has permission to delete a knowledge base This method verifies whether a user has permission to delete a knowledge base
by checking if they are the creator of that knowledge base. by checking if they are the creator or have delete permission via CRUD permissions.
Args: Args:
kb_id (str): The unique identifier of the knowledge base to check. kb_id: The unique identifier of the knowledge base to check.
user_id (str): The unique identifier of the user attempting the deletion. user_id: The unique identifier of the user attempting the deletion.
Returns: Returns:
bool: True if the user has permission to delete the knowledge base, bool: True if the user has permission to delete the knowledge base,
False if the user doesn't have permission or the knowledge base doesn't exist. False if the user doesn't have permission or the knowledge base doesn't exist.
Example:
>>> KnowledgebaseService.accessible4deletion("kb123", "user456")
True
Note:
- This method only checks creator permissions
- A return value of False can mean either:
1. The knowledge base doesn't exist
2. The user is not the creator of the knowledge base
""" """
# Check if a knowledge base can be deleted by a user
docs = cls.model.select( # Get the knowledge base
cls.model.id).where(cls.model.id == kb_id, cls.model.created_by == user_id).paginate(0, 1) ok: bool
docs = docs.dicts() kb: Optional[Knowledgebase]
if not docs: ok, kb = cls.get_by_id(kb_id)
if not ok or kb is None:
return False return False
return True
# If user is the creator, always allow
if kb.created_by == user_id:
return True
# Check team permissions with delete permission required
if kb.permission == "team":
return check_kb_team_permission(kb, user_id, required_permission="delete")
return False
@classmethod @classmethod
@DB.connection_context() @DB.connection_context()
@ -93,9 +96,9 @@ class KnowledgebaseService(CommonService):
# Returns: # Returns:
# If all documents are parsed successfully, returns (True, None) # If all documents are parsed successfully, returns (True, None)
# If any document is not fully parsed, returns (False, error_message) # If any document is not fully parsed, returns (False, error_message)
from common.constants import TaskStatus # Import here to avoid circular import
from api.db.services.document_service import DocumentService from api.db.services.document_service import DocumentService
# Get knowledge base information # Get knowledge base information
kbs = cls.query(id=kb_id) kbs = cls.query(id=kb_id)
if not kbs: if not kbs:
@ -470,20 +473,33 @@ class KnowledgebaseService(CommonService):
@classmethod @classmethod
@DB.connection_context() @DB.connection_context()
def accessible(cls, kb_id, user_id): def accessible(cls, kb_id: str, user_id: str, required_permission: Literal["create", "read", "update", "delete"] = "read") -> bool:
# Check if a knowledge base is accessible by a user """Check if a knowledge base is accessible by a user with the required permission.
# Args:
# kb_id: Knowledge base ID Args:
# user_id: User ID kb_id: Knowledge base ID
# Returns: user_id: User ID
# Boolean indicating accessibility required_permission: Required permission level ("create", "read", "update", "delete")
docs = cls.model.select( Returns:
cls.model.id).join(UserTenant, on=(UserTenant.tenant_id == Knowledgebase.tenant_id) Boolean indicating accessibility
).where(cls.model.id == kb_id, UserTenant.user_id == user_id).paginate(0, 1) """
docs = docs.dicts()
if not docs: # Get the knowledge base
ok: bool
kb: Optional[Knowledgebase]
ok, kb = cls.get_by_id(kb_id)
if not ok or kb is None:
return False return False
return True
# If user is the creator, always allow
if kb.created_by == user_id:
return True
# Check team permissions
if kb.permission == "team":
return check_kb_team_permission(kb, user_id, required_permission=required_permission)
return False
@classmethod @classmethod
@DB.connection_context() @DB.connection_context()