# # Copyright 2024 The InfiniFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # """Department management API endpoints.""" import logging from typing import Any, Dict, List, Optional from flask import Blueprint, Response, request from flask_login import current_user, login_required from api.db import UserTenantRole from api.db.db_models import Department, Tenant, User, UserDepartment, UserTenant from api.db.services.user_service import ( DepartmentService, TenantService, UserDepartmentService, UserService, UserTenantService, ) from api.utils.api_utils import ( get_data_error_result, get_json_result, server_error_response, validate_request, ) from common.constants import RetCode, StatusEnum from common.misc_utils import get_uuid from common.time_utils import current_timestamp, datetime_format from datetime import datetime manager = Blueprint("department", __name__) def is_team_member(tenant_id: str, user_id: str) -> bool: """Check if a user is a member of a team (tenant). Args: tenant_id: The team/tenant ID. user_id: The user ID to check. Returns: True if user is a member of the team, False otherwise. """ user_tenant: Optional[UserTenant] = UserTenantService.filter_by_tenant_and_user_id( tenant_id, user_id ) return user_tenant is not None and user_tenant.status == StatusEnum.VALID.value def is_team_admin_or_owner(tenant_id: str, user_id: str) -> bool: """Check if a user is an OWNER or ADMIN of a team. Args: tenant_id: The team/tenant ID. user_id: The user ID to check. Returns: True if user is OWNER or ADMIN, False otherwise. """ user_tenant: Optional[UserTenant] = UserTenantService.filter_by_tenant_and_user_id( tenant_id, user_id ) if not user_tenant: return False return user_tenant.role in [UserTenantRole.OWNER, UserTenantRole.ADMIN] @manager.route("/create", methods=["POST"]) # noqa: F821 @login_required @validate_request("name", "tenant_id") def create_department() -> Response: """Create a new department within a team. Only team owners or admins can create departments. --- tags: - Department security: - ApiKeyAuth: [] parameters: - in: body name: body required: true schema: type: object required: - name - tenant_id properties: name: type: string description: Department name. tenant_id: type: string description: Team/tenant ID that the department belongs to. description: type: string description: Optional department description. responses: 200: description: Department created successfully. schema: type: object properties: data: type: object description: Created department information. message: type: string description: Success message. 400: description: Invalid request or team not found. 401: description: Unauthorized. 403: description: Forbidden - not team owner or admin. """ if request.json is None: return get_json_result( data=False, message="Request body is required!", code=RetCode.ARGUMENT_ERROR, ) req: Dict[str, Any] = request.json name: str = req.get("name", "").strip() tenant_id: str = req.get("tenant_id", "").strip() description: Optional[str] = req.get("description", "").strip() or None if not name: return get_json_result( data=False, message="Department name cannot be empty!", code=RetCode.ARGUMENT_ERROR, ) if not tenant_id: return get_json_result( data=False, message="Tenant ID is required!", code=RetCode.ARGUMENT_ERROR, ) # Check if user is team owner or admin if not is_team_admin_or_owner(tenant_id, current_user.id): return get_json_result( data=False, message="Only team owners or admins can create departments.", code=RetCode.PERMISSION_ERROR, ) # Verify tenant exists tenants_query = TenantService.query(id=tenant_id, status=StatusEnum.VALID.value) tenants: List[Tenant] = list(tenants_query) if not tenants: return get_data_error_result(message="Team not found.") try: # Create department department_id: str = get_uuid() department_data: Dict[str, Any] = { "id": department_id, "tenant_id": tenant_id, "name": name, "description": description, "created_by": current_user.id, "status": StatusEnum.VALID.value, } DepartmentService.save(**department_data) # Get created department success: bool department: Optional[Department] success, department = DepartmentService.get_by_id(department_id) if not success or not department: return get_data_error_result(message="Failed to create department.") return get_json_result( data=department.to_dict(), message=f"Department '{name}' created successfully!", ) except Exception as e: logging.exception(e) return server_error_response(e) @manager.route("/", methods=["PUT"]) # noqa: F821 @login_required def update_department(department_id: str) -> Response: """Update a department's details. Only team owners, admins, or department members can update departments. --- tags: - Department security: - ApiKeyAuth: [] parameters: - in: path name: department_id required: true type: string description: Department ID - in: body name: body required: true schema: type: object properties: name: type: string description: Department name (optional). description: type: string description: Department description (optional). responses: 200: description: Department updated successfully. schema: type: object properties: data: type: object description: Updated department information. message: type: string description: Success message. 400: description: Invalid request. 401: description: Unauthorized. 403: description: Forbidden - not team owner, admin, or department member. 404: description: Department not found. """ if request.json is None: return get_json_result( data=False, message="Request body is required!", code=RetCode.ARGUMENT_ERROR, ) # Get department and verify it exists success: bool department: Optional[Department] success, department = DepartmentService.get_by_id(department_id) if not success or not department: return get_data_error_result(message="Department not found.") # Check if user is team owner or admin is_admin_or_owner: bool = is_team_admin_or_owner(department.tenant_id, current_user.id) # Check if user is a member of the department user_department: Optional[UserDepartment] = UserDepartmentService.filter_by_department_and_user_id( department_id, current_user.id ) is_department_member: bool = ( user_department is not None and user_department.status == StatusEnum.VALID.value ) # User must be either team owner/admin OR department member if not is_admin_or_owner and not is_department_member: return get_json_result( data=False, message="Only team owners, admins, or department members can update departments.", code=RetCode.PERMISSION_ERROR, ) req: Dict[str, Any] = request.json update_data: Dict[str, Any] = { "update_time": current_timestamp(), "update_date": datetime_format(datetime.now()), } # Update name if provided if "name" in req: name: str = req.get("name", "").strip() if name: if len(name) > 128: return get_json_result( data=False, message="Department name must be 128 characters or less!", code=RetCode.ARGUMENT_ERROR, ) update_data["name"] = name else: return get_json_result( data=False, message="Department name cannot be empty!", code=RetCode.ARGUMENT_ERROR, ) # Update description if provided if "description" in req: description: Optional[str] = req.get("description") if description is not None: description = description.strip() if isinstance(description, str) else None update_data["description"] = description if description else None # If no fields to update (only update_time and update_date were set), return error if len(update_data) == 2: # Only update_time and update_date return get_json_result( data=False, message="No fields provided to update. Please provide 'name' and/or 'description'.", code=RetCode.ARGUMENT_ERROR, ) try: from api.db.db_models import DB # Update the department with DB.connection_context(): Department.update(update_data).where( (Department.id == department_id) & (Department.status == StatusEnum.VALID.value) ).execute() # Get updated department success, updated_department = DepartmentService.get_by_id(department_id) if not success or not updated_department: return get_data_error_result(message="Department updated but could not retrieve updated data.") return get_json_result( data=updated_department.to_dict(), message="Department updated successfully!", ) except Exception as e: logging.exception(e) return server_error_response(e) @manager.route("//members/add", methods=["POST"]) # noqa: F821 @login_required @validate_request("user_ids") def add_members(department_id: str) -> Response: """Add members to a department. Users must be members of the team (tenant) that the department belongs to. Only team owners or admins can add members to departments. --- tags: - Department security: - ApiKeyAuth: [] parameters: - in: path name: department_id required: true type: string description: Department ID - in: body name: body required: true schema: type: object required: - user_ids properties: user_ids: type: array description: List of user IDs to add to the department. items: type: string responses: 200: description: Members added successfully. schema: type: object properties: data: type: object properties: added: type: array description: Successfully added user IDs failed: type: array description: Users that failed to be added with error messages message: type: string 400: description: Invalid request 401: description: Unauthorized 403: description: Forbidden - not team owner or admin """ if request.json is None: return get_json_result( data=False, message="Request body is required!", code=RetCode.ARGUMENT_ERROR, ) req: Dict[str, Any] = request.json user_ids: List[str] = req.get("user_ids", []) if not isinstance(user_ids, list) or len(user_ids) == 0: return get_json_result( data=False, message="'user_ids' must be a non-empty array.", code=RetCode.ARGUMENT_ERROR, ) # Get department and verify it exists success: bool department: Optional[Department] success, department = DepartmentService.get_by_id(department_id) if not success or not department: return get_data_error_result(message="Department not found.") # Check if user is team owner or admin if not is_team_admin_or_owner(department.tenant_id, current_user.id): return get_json_result( data=False, message="Only team owners or admins can add members to departments.", code=RetCode.PERMISSION_ERROR, ) added_user_ids: List[str] = [] failed_users: List[Dict[str, Any]] = [] try: for user_id in user_ids: if not isinstance(user_id, str) or not user_id.strip(): failed_users.append({ "user_id": user_id, "error": "Invalid user ID format." }) continue user_id = user_id.strip() # Verify user exists user_exists: bool user: Optional[User] user_exists, user = UserService.get_by_id(user_id) if not user_exists or not user: failed_users.append({ "user_id": user_id, "error": "User not found." }) continue # Verify user is a member of the team if not is_team_member(department.tenant_id, user_id): failed_users.append({ "user_id": user_id, "error": f"User {user.email} is not a member of the team." }) continue # Check if user is already in the department existing_member: Optional[UserDepartment] = UserDepartmentService.filter_by_department_and_user_id( department_id, user_id ) if existing_member and existing_member.status == StatusEnum.VALID.value: failed_users.append({ "user_id": user_id, "error": f"User {user.email} is already a member of this department." }) continue # Add user to department try: UserDepartmentService.save( id=get_uuid(), department_id=department_id, user_id=user_id, status=StatusEnum.VALID.value, ) added_user_ids.append(user_id) except Exception as e: logging.exception(e) failed_users.append({ "user_id": user_id, "error": f"Failed to add user: {str(e)}" }) return get_json_result( data={ "added": added_user_ids, "failed": failed_users, }, message=f"Added {len(added_user_ids)} member(s) to department.", ) except Exception as e: logging.exception(e) return server_error_response(e) @manager.route("//members/", methods=["DELETE"]) # noqa: F821 @login_required def remove_member(department_id: str, user_id: str) -> Response: """Remove a user from a department. Only team owners or admins can remove members from departments. --- tags: - Department security: - ApiKeyAuth: [] parameters: - in: path name: department_id required: true type: string description: Department ID - in: path name: user_id required: true type: string description: User ID to remove responses: 200: description: User removed successfully. schema: type: object properties: data: type: boolean description: Removal success status. message: type: string description: Success message. 400: description: Invalid request or user not found in department. 401: description: Unauthorized 403: description: Forbidden - not team owner or admin """ # Get department and verify it exists success: bool department: Optional[Department] success, department = DepartmentService.get_by_id(department_id) if not success or not department: return get_data_error_result(message="Department not found.") # Check if user is team owner or admin if not is_team_admin_or_owner(department.tenant_id, current_user.id): return get_json_result( data=False, message="Only team owners or admins can remove members from departments.", code=RetCode.PERMISSION_ERROR, ) try: from api.db.db_models import DB # Check if user is in the department user_department: Optional[UserDepartment] = UserDepartmentService.filter_by_department_and_user_id( department_id, user_id ) if not user_department: return get_data_error_result( message="User is not a member of this department." ) # Soft delete by setting status to invalid with DB.connection_context(): UserDepartment.update({"status": StatusEnum.INVALID.value}).where( (UserDepartment.id == user_department.id) ).execute() return get_json_result( data=True, message="User removed from department successfully!", ) except Exception as e: logging.exception(e) return server_error_response(e)