[OND211-2329]: Added department model and API to create department, add members to a department and remove members from a department.

This commit is contained in:
Hetavi Shah 2025-11-14 19:13:17 +05:30
parent 113620ea80
commit 7253068b0f
3 changed files with 628 additions and 1 deletions

450
api/apps/department_app.py Normal file
View file

@ -0,0 +1,450 @@
#
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Department management API endpoints."""
import logging
from typing import Any, Dict, List, Optional
from flask import Blueprint, Response, request
from flask_login import current_user, login_required
from api.db import UserTenantRole
from api.db.db_models import Department, Tenant, User, UserDepartment, UserTenant
from api.db.services.user_service import (
DepartmentService,
TenantService,
UserDepartmentService,
UserService,
UserTenantService,
)
from api.utils.api_utils import (
get_data_error_result,
get_json_result,
server_error_response,
validate_request,
)
from common.constants import RetCode, StatusEnum
from common.misc_utils import get_uuid
manager = Blueprint("department", __name__)
def is_team_member(tenant_id: str, user_id: str) -> bool:
"""Check if a user is a member of a team (tenant).
Args:
tenant_id: The team/tenant ID.
user_id: The user ID to check.
Returns:
True if user is a member of the team, False otherwise.
"""
user_tenant: Optional[UserTenant] = UserTenantService.filter_by_tenant_and_user_id(
tenant_id, user_id
)
return user_tenant is not None and user_tenant.status == StatusEnum.VALID.value
def is_team_admin_or_owner(tenant_id: str, user_id: str) -> bool:
"""Check if a user is an OWNER or ADMIN of a team.
Args:
tenant_id: The team/tenant ID.
user_id: The user ID to check.
Returns:
True if user is OWNER or ADMIN, False otherwise.
"""
user_tenant: Optional[UserTenant] = UserTenantService.filter_by_tenant_and_user_id(
tenant_id, user_id
)
if not user_tenant:
return False
return user_tenant.role in [UserTenantRole.OWNER, UserTenantRole.ADMIN]
@manager.route("/create", methods=["POST"]) # noqa: F821
@login_required
@validate_request("name", "tenant_id")
def create_department() -> Response:
"""Create a new department within a team.
Only team owners or admins can create departments.
---
tags:
- Department
security:
- ApiKeyAuth: []
parameters:
- in: body
name: body
required: true
schema:
type: object
required:
- name
- tenant_id
properties:
name:
type: string
description: Department name.
tenant_id:
type: string
description: Team/tenant ID that the department belongs to.
description:
type: string
description: Optional department description.
responses:
200:
description: Department created successfully.
schema:
type: object
properties:
data:
type: object
description: Created department information.
message:
type: string
description: Success message.
400:
description: Invalid request or team not found.
401:
description: Unauthorized.
403:
description: Forbidden - not team owner or admin.
"""
if request.json is None:
return get_json_result(
data=False,
message="Request body is required!",
code=RetCode.ARGUMENT_ERROR,
)
req: Dict[str, Any] = request.json
name: str = req.get("name", "").strip()
tenant_id: str = req.get("tenant_id", "").strip()
description: Optional[str] = req.get("description", "").strip() or None
if not name:
return get_json_result(
data=False,
message="Department name cannot be empty!",
code=RetCode.ARGUMENT_ERROR,
)
if not tenant_id:
return get_json_result(
data=False,
message="Tenant ID is required!",
code=RetCode.ARGUMENT_ERROR,
)
# Check if user is team owner or admin
if not is_team_admin_or_owner(tenant_id, current_user.id):
return get_json_result(
data=False,
message="Only team owners or admins can create departments.",
code=RetCode.PERMISSION_ERROR,
)
# Verify tenant exists
tenants_query = TenantService.query(id=tenant_id, status=StatusEnum.VALID.value)
tenants: List[Tenant] = list(tenants_query)
if not tenants:
return get_data_error_result(message="Team not found.")
try:
# Create department
department_id: str = get_uuid()
department_data: Dict[str, Any] = {
"id": department_id,
"tenant_id": tenant_id,
"name": name,
"description": description,
"created_by": current_user.id,
"status": StatusEnum.VALID.value,
}
DepartmentService.save(**department_data)
# Get created department
success: bool
department: Optional[Department]
success, department = DepartmentService.get_by_id(department_id)
if not success or not department:
return get_data_error_result(message="Failed to create department.")
return get_json_result(
data=department.to_dict(),
message=f"Department '{name}' created successfully!",
)
except Exception as e:
logging.exception(e)
return server_error_response(e)
@manager.route("/<department_id>/members/add", methods=["POST"]) # noqa: F821
@login_required
@validate_request("user_ids")
def add_members(department_id: str) -> Response:
"""Add members to a department.
Users must be members of the team (tenant) that the department belongs to.
Only team owners or admins can add members to departments.
---
tags:
- Department
security:
- ApiKeyAuth: []
parameters:
- in: path
name: department_id
required: true
type: string
description: Department ID
- in: body
name: body
required: true
schema:
type: object
required:
- user_ids
properties:
user_ids:
type: array
description: List of user IDs to add to the department.
items:
type: string
responses:
200:
description: Members added successfully.
schema:
type: object
properties:
data:
type: object
properties:
added:
type: array
description: Successfully added user IDs
failed:
type: array
description: Users that failed to be added with error messages
message:
type: string
400:
description: Invalid request
401:
description: Unauthorized
403:
description: Forbidden - not team owner or admin
"""
if request.json is None:
return get_json_result(
data=False,
message="Request body is required!",
code=RetCode.ARGUMENT_ERROR,
)
req: Dict[str, Any] = request.json
user_ids: List[str] = req.get("user_ids", [])
if not isinstance(user_ids, list) or len(user_ids) == 0:
return get_json_result(
data=False,
message="'user_ids' must be a non-empty array.",
code=RetCode.ARGUMENT_ERROR,
)
# Get department and verify it exists
success: bool
department: Optional[Department]
success, department = DepartmentService.get_by_id(department_id)
if not success or not department:
return get_data_error_result(message="Department not found.")
# Check if user is team owner or admin
if not is_team_admin_or_owner(department.tenant_id, current_user.id):
return get_json_result(
data=False,
message="Only team owners or admins can add members to departments.",
code=RetCode.PERMISSION_ERROR,
)
added_user_ids: List[str] = []
failed_users: List[Dict[str, Any]] = []
try:
for user_id in user_ids:
if not isinstance(user_id, str) or not user_id.strip():
failed_users.append({
"user_id": user_id,
"error": "Invalid user ID format."
})
continue
user_id = user_id.strip()
# Verify user exists
user_exists: bool
user: Optional[User]
user_exists, user = UserService.get_by_id(user_id)
if not user_exists or not user:
failed_users.append({
"user_id": user_id,
"error": "User not found."
})
continue
# Verify user is a member of the team
if not is_team_member(department.tenant_id, user_id):
failed_users.append({
"user_id": user_id,
"error": f"User {user.email} is not a member of the team."
})
continue
# Check if user is already in the department
existing_member: Optional[UserDepartment] = UserDepartmentService.filter_by_department_and_user_id(
department_id, user_id
)
if existing_member and existing_member.status == StatusEnum.VALID.value:
failed_users.append({
"user_id": user_id,
"error": f"User {user.email} is already a member of this department."
})
continue
# Add user to department
try:
UserDepartmentService.save(
id=get_uuid(),
department_id=department_id,
user_id=user_id,
status=StatusEnum.VALID.value,
)
added_user_ids.append(user_id)
except Exception as e:
logging.exception(e)
failed_users.append({
"user_id": user_id,
"error": f"Failed to add user: {str(e)}"
})
return get_json_result(
data={
"added": added_user_ids,
"failed": failed_users,
},
message=f"Added {len(added_user_ids)} member(s) to department.",
)
except Exception as e:
logging.exception(e)
return server_error_response(e)
@manager.route("/<department_id>/members/<user_id>", methods=["DELETE"]) # noqa: F821
@login_required
def remove_member(department_id: str, user_id: str) -> Response:
"""Remove a user from a department.
Only team owners or admins can remove members from departments.
---
tags:
- Department
security:
- ApiKeyAuth: []
parameters:
- in: path
name: department_id
required: true
type: string
description: Department ID
- in: path
name: user_id
required: true
type: string
description: User ID to remove
responses:
200:
description: User removed successfully.
schema:
type: object
properties:
data:
type: boolean
description: Removal success status.
message:
type: string
description: Success message.
400:
description: Invalid request or user not found in department.
401:
description: Unauthorized
403:
description: Forbidden - not team owner or admin
"""
# Get department and verify it exists
success: bool
department: Optional[Department]
success, department = DepartmentService.get_by_id(department_id)
if not success or not department:
return get_data_error_result(message="Department not found.")
# Check if user is team owner or admin
if not is_team_admin_or_owner(department.tenant_id, current_user.id):
return get_json_result(
data=False,
message="Only team owners or admins can remove members from departments.",
code=RetCode.PERMISSION_ERROR,
)
try:
from api.db.db_models import DB
# Check if user is in the department
user_department: Optional[UserDepartment] = UserDepartmentService.filter_by_department_and_user_id(
department_id, user_id
)
if not user_department:
return get_data_error_result(
message="User is not a member of this department."
)
# Soft delete by setting status to invalid
with DB.connection_context():
UserDepartment.model.update({"status": StatusEnum.INVALID.value}).where(
(UserDepartment.id == user_department.id)
).execute()
return get_json_result(
data=True,
message="User removed from department successfully!",
)
except Exception as e:
logging.exception(e)
return server_error_response(e)

View file

@ -653,6 +653,28 @@ class UserTenant(DataBaseModel):
db_table = "user_tenant"
class Department(DataBaseModel):
id = CharField(max_length=32, primary_key=True)
tenant_id = CharField(max_length=32, null=False, help_text="Tenant ID", index=True)
name = CharField(max_length=128, null=False, help_text="Department name", index=True)
description = TextField(null=True, help_text="Department description")
created_by = CharField(max_length=32, null=False, help_text="User who created the department", index=True)
status = CharField(max_length=1, null=True, help_text="is it validate(0: wasted, 1: validate)", default="1", index=True)
class Meta:
db_table = "department"
class UserDepartment(DataBaseModel):
id = CharField(max_length=32, primary_key=True)
department_id = CharField(max_length=32, null=False, help_text="Department ID", index=True)
user_id = CharField(max_length=32, null=False, help_text="User ID", index=True)
status = CharField(max_length=1, null=True, help_text="is it validate(0: wasted, 1: validate)", default="1", index=True)
class Meta:
db_table = "user_department"
class InvitationCode(DataBaseModel):
id = CharField(max_length=32, primary_key=True)
code = CharField(max_length=32, null=False, index=True)

View file

@ -16,13 +16,14 @@
import hashlib
from datetime import datetime
import logging
from typing import Any, Dict, List, Optional, Tuple
import peewee
from werkzeug.security import generate_password_hash, check_password_hash
from api.db import UserTenantRole
from api.db.db_models import DB, UserTenant
from api.db.db_models import User, Tenant
from api.db.db_models import User, Tenant, Department, UserDepartment
from api.db.services.common_service import CommonService
from common.misc_utils import get_uuid
from common.time_utils import current_timestamp, datetime_format
@ -317,3 +318,157 @@ class UserTenantService(CommonService):
return user_tenant
except peewee.DoesNotExist:
return None
class DepartmentService(CommonService):
"""Service class for managing department-related database operations.
This class extends CommonService to provide functionality for department management,
including department creation, retrieval, and tenant-based queries.
Attributes:
model: The Department model class for database operations.
"""
model = Department
@classmethod
@DB.connection_context()
def get_by_tenant_id(cls, tenant_id: str) -> List[Dict[str, Any]]:
"""Get all departments for a given tenant.
Args:
tenant_id: The tenant ID to query departments for.
Returns:
List of department dictionaries with all fields.
"""
return list(cls.model.select()
.where(
(cls.model.tenant_id == tenant_id) &
(cls.model.status == StatusEnum.VALID.value)
)
.order_by(cls.model.create_time.desc())
.dicts())
@classmethod
@DB.connection_context()
def get_by_id(cls, department_id: str) -> Tuple[bool, Optional[Department]]:
"""Get a department by its ID.
Args:
department_id: The department ID.
Returns:
Tuple of (success: bool, department: Department or None).
"""
try:
department: Optional[Department] = cls.model.get_or_none(
(cls.model.id == department_id) &
(cls.model.status == StatusEnum.VALID.value)
)
if department:
return True, department
except Exception:
pass
return False, None
@classmethod
@DB.connection_context()
def filter_by_tenant_and_id(cls, tenant_id: str, department_id: str) -> Optional[Department]:
"""Get a department by tenant ID and department ID.
Args:
tenant_id: The tenant ID.
department_id: The department ID.
Returns:
Department instance or None if not found.
"""
try:
department: Optional[Department] = cls.model.select().where(
(cls.model.tenant_id == tenant_id) &
(cls.model.id == department_id) &
(cls.model.status == StatusEnum.VALID.value)
).first()
return department
except peewee.DoesNotExist:
return None
class UserDepartmentService(CommonService):
"""Service class for managing user-department relationship operations.
This class extends CommonService to handle the many-to-many relationship
between users and departments.
Attributes:
model: The UserDepartment model class for database operations.
"""
model = UserDepartment
@classmethod
@DB.connection_context()
def save(cls, **kwargs) -> UserDepartment:
"""Save a user-department relationship.
Args:
**kwargs: UserDepartment fields (id, department_id, user_id, status).
Returns:
Created UserDepartment instance.
"""
if "id" not in kwargs:
kwargs["id"] = get_uuid()
obj = cls.model(**kwargs).save(force_insert=True)
return obj
@classmethod
@DB.connection_context()
def filter_by_department_and_user_id(
cls, department_id: str, user_id: str
) -> Optional[UserDepartment]:
"""Get a user-department relationship by department ID and user ID.
Args:
department_id: The department ID.
user_id: The user ID.
Returns:
UserDepartment instance or None if not found.
"""
try:
user_department: Optional[UserDepartment] = cls.model.select().where(
(cls.model.department_id == department_id) &
(cls.model.user_id == user_id)
).first()
return user_department
except peewee.DoesNotExist:
return None
@classmethod
@DB.connection_context()
def get_by_department_id(cls, department_id: str) -> List[Dict[str, Any]]:
"""Get all users in a department.
Args:
department_id: The department ID.
Returns:
List of user dictionaries with department relationship info.
"""
fields = [
cls.model.id,
cls.model.user_id,
cls.model.status,
User.nickname,
User.email,
User.avatar,
User.is_active,
User.status,
]
return list(
cls.model.select(*fields)
.join(User, on=((cls.model.user_id == User.id) & (cls.model.status == StatusEnum.VALID.value)))
.where(cls.model.department_id == department_id)
.dicts()
)