diff --git a/api/apps/tenant_app.py b/api/apps/tenant_app.py index 39dadc35b..5def0595a 100644 --- a/api/apps/tenant_app.py +++ b/api/apps/tenant_app.py @@ -14,7 +14,7 @@ # limitations under the License. # import logging -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional, Set, Union from flask import Response, request from flask_login import current_user, login_required @@ -445,7 +445,7 @@ def tenant_list(): @manager.route("/update-request/", methods=["PUT"]) # noqa: F821 @login_required -def update_request(tenant_id): +def update_request(tenant_id: str) -> Response: """ Accept or reject a team invitation. User must have INVITE role. Takes an 'accept' boolean in the request body to accept (true) or reject (false) the invitation. @@ -505,8 +505,8 @@ def update_request(tenant_id): ) # Get accept boolean from request body - req = request.json or {} - accept = req.get("accept") + req: Dict[str, Any] = request.json if request.json is not None else {} + accept: Optional[bool] = req.get("accept") # Validate accept parameter if accept is None: @@ -525,7 +525,7 @@ def update_request(tenant_id): if accept: # Accept invitation - update role from INVITE to the specified role - role = UserTenantRole.NORMAL.value + role: str = UserTenantRole.NORMAL.value # Update role from INVITE to the specified role (defaults to NORMAL) UserTenantService.filter_update( @@ -547,7 +547,7 @@ def update_request(tenant_id): @manager.route('//users/add', methods=['POST']) # noqa: F821 @login_required @validate_request("users") -def add_users(tenant_id): +def add_users(tenant_id: str) -> Response: """ Send invitations to one or more users to join a team. Only OWNER or ADMIN can send invitations. Users must accept the invitation before they are added to the team. @@ -620,8 +620,8 @@ def add_users(tenant_id): code=RetCode.PERMISSION_ERROR ) - req = request.json - users_input = req.get("users", []) + req: Dict[str, Any] = request.json if request.json is not None else {} + users_input: List[Union[str, Dict[str, Any]]] = req.get("users", []) if not isinstance(users_input, list) or len(users_input) == 0: return get_json_result( @@ -630,11 +630,13 @@ def add_users(tenant_id): code=RetCode.ARGUMENT_ERROR ) - added_users = [] - failed_users = [] + added_users: List[Dict[str, Any]] = [] + failed_users: List[Dict[str, Any]] = [] for user_input in users_input: # Handle both string (email) and object formats + email: Optional[str] = None + role: str = UserTenantRole.NORMAL.value if isinstance(user_input, str): email = user_input role = UserTenantRole.NORMAL.value @@ -665,7 +667,7 @@ def add_users(tenant_id): try: # Find user by email - invite_users = UserService.query(email=email) + invite_users: List[Any] = UserService.query(email=email) if not invite_users: failed_users.append({ "email": email, @@ -673,12 +675,12 @@ def add_users(tenant_id): }) continue - user_id_to_add = invite_users[0].id + user_id_to_add: str = invite_users[0].id # Check if user is already in the team - existing_user_tenants = UserTenantService.query(user_id=user_id_to_add, tenant_id=tenant_id) + existing_user_tenants: List[Any] = UserTenantService.query(user_id=user_id_to_add, tenant_id=tenant_id) if existing_user_tenants: - existing_role = existing_user_tenants[0].role + existing_role: Any = existing_user_tenants[0].role if existing_role in [UserTenantRole.NORMAL, UserTenantRole.ADMIN]: failed_users.append({ "email": email, @@ -696,7 +698,7 @@ def add_users(tenant_id): # Update invitation - keep INVITE role, user needs to accept again # Note: The intended role will be applied when user accepts via /agree endpoint # For now, we'll store it by updating the invitation (user will need to accept) - usr = invite_users[0].to_dict() + usr: Dict[str, Any] = invite_users[0].to_dict() usr = {k: v for k, v in usr.items() if k in ["id", "avatar", "email", "nickname"]} usr["role"] = "invite" # Still pending acceptance usr["intended_role"] = role # Store intended role for reference @@ -720,7 +722,7 @@ def add_users(tenant_id): # Send invitation email if configured if smtp_mail_server and settings.SMTP_CONF: from threading import Thread - user_name = "" + user_name: str = "" _, user = UserService.get_by_id(current_user.id) if user: user_name = user.nickname @@ -730,7 +732,7 @@ def add_users(tenant_id): daemon=True ).start() - usr = invite_users[0].to_dict() + usr: Dict[str, Any] = invite_users[0].to_dict() usr = {k: v for k, v in usr.items() if k in ["id", "avatar", "email", "nickname"]} usr["role"] = "invite" # User is invited, not yet added usr["intended_role"] = role # Role they will get after acceptance @@ -743,7 +745,7 @@ def add_users(tenant_id): "error": f"Failed to add user: {str(e)}" }) - result = { + result: Dict[str, List[Dict[str, Any]]] = { "added": added_users, "failed": failed_users } @@ -769,7 +771,7 @@ def add_users(tenant_id): @manager.route('//users/remove', methods=['POST']) # noqa: F821 @login_required @validate_request("user_ids") -def remove_users(tenant_id): +def remove_users(tenant_id: str) -> Response: """ Remove one or more users from a team. Only OWNER or ADMIN can remove users. Owners cannot be removed. Supports both single user and bulk operations. @@ -830,8 +832,8 @@ def remove_users(tenant_id): code=RetCode.PERMISSION_ERROR ) - req = request.json - user_ids = req.get("user_ids", []) + req: Dict[str, Any] = request.json if request.json is not None else {} + user_ids: List[str] = req.get("user_ids", []) if not isinstance(user_ids, list) or len(user_ids) == 0: return get_json_result( @@ -840,12 +842,12 @@ def remove_users(tenant_id): code=RetCode.ARGUMENT_ERROR ) - removed_users = [] - failed_users = [] + removed_users: List[Dict[str, str]] = [] + failed_users: List[Dict[str, str]] = [] # Get all admins/owners for validation (check if removing would leave team without admin/owner) - all_user_tenants = UserTenantService.query(tenant_id=tenant_id) - admin_owner_ids = { + all_user_tenants: List[Any] = UserTenantService.query(tenant_id=tenant_id) + admin_owner_ids: Set[str] = { ut.user_id for ut in all_user_tenants if ut.role in [UserTenantRole.OWNER, UserTenantRole.ADMIN] and ut.status == StatusEnum.VALID.value } @@ -878,7 +880,7 @@ def remove_users(tenant_id): # Prevent removing yourself if you're the only admin if user_id == current_user.id and user_tenant.role == UserTenantRole.ADMIN: - remaining_admins = admin_owner_ids - {user_id} + remaining_admins: Set[str] = admin_owner_ids - {user_id} if len(remaining_admins) == 0: failed_users.append({ "user_id": user_id, @@ -893,8 +895,8 @@ def remove_users(tenant_id): ]) # Get user info for response - user = UserService.filter_by_id(user_id) - user_email = user.email if user else "Unknown" + user: Optional[Any] = UserService.filter_by_id(user_id) + user_email: str = user.email if user else "Unknown" removed_users.append({ "user_id": user_id, @@ -908,7 +910,7 @@ def remove_users(tenant_id): "error": f"Failed to remove user: {str(e)}" }) - result = { + result: Dict[str, List[Dict[str, str]]] = { "removed": removed_users, "failed": failed_users }