From 531ca7cd497f6b0bc1f2ac18388043bc26fce268 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 4 Sep 2025 08:44:00 -0300 Subject: [PATCH] Refactor TaskService for improved readability and maintainability This commit enhances the TaskService class by reorganizing import statements, updating type hints to use `dict` instead of `Dict`, and improving the formatting of method definitions for better clarity. Additionally, minor adjustments were made to comments and error handling, contributing to a more robust and well-documented codebase. --- src/services/task_service.py | 153 ++++++++++++++++++----------------- 1 file changed, 79 insertions(+), 74 deletions(-) diff --git a/src/services/task_service.py b/src/services/task_service.py index 8fa1ed2b..6f088100 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -1,60 +1,69 @@ import asyncio -import uuid -import time import random -from typing import Dict +import time +import uuid -from models.tasks import TaskStatus, UploadTask, FileTask - -from src.utils.gpu_detection import get_worker_count +from models.tasks import FileTask, TaskStatus, UploadTask +from utils.gpu_detection import get_worker_count class TaskService: def __init__(self, document_service=None, process_pool=None): self.document_service = document_service self.process_pool = process_pool - self.task_store: Dict[str, Dict[str, UploadTask]] = {} # user_id -> {task_id -> UploadTask} + self.task_store: dict[str, dict[str, UploadTask]] = {} # user_id -> {task_id -> UploadTask} self.background_tasks = set() - + if self.process_pool is None: raise ValueError("TaskService requires a process_pool parameter") - async def exponential_backoff_delay(self, retry_count: int, base_delay: float = 1.0, max_delay: float = 60.0) -> None: + async def exponential_backoff_delay( + self, retry_count: int, base_delay: float = 1.0, max_delay: float = 60.0 + ) -> None: """Apply exponential backoff with jitter""" - delay = min(base_delay * (2 ** retry_count) + random.uniform(0, 1), max_delay) + delay = min(base_delay * (2**retry_count) + random.uniform(0, 1), max_delay) await asyncio.sleep(delay) - async def create_upload_task(self, user_id: str, file_paths: list, jwt_token: str = None, owner_name: str = None, owner_email: str = None) -> str: + async def create_upload_task( + self, user_id: str, file_paths: list, jwt_token: str = None, owner_name: str = None, owner_email: str = None + ) -> str: """Create a new upload task for bulk file processing""" # Use default DocumentFileProcessor with user context from models.processors import DocumentFileProcessor - processor = DocumentFileProcessor(self.document_service, owner_user_id=user_id, jwt_token=jwt_token, owner_name=owner_name, owner_email=owner_email) + + processor = DocumentFileProcessor( + self.document_service, + owner_user_id=user_id, + jwt_token=jwt_token, + owner_name=owner_name, + owner_email=owner_email, + ) return await self.create_custom_task(user_id, file_paths, processor) - + async def create_custom_task(self, user_id: str, items: list, processor) -> str: """Create a new task with custom processor for any type of items""" task_id = str(uuid.uuid4()) upload_task = UploadTask( task_id=task_id, total_files=len(items), - file_tasks={str(item): FileTask(file_path=str(item)) for item in items} + file_tasks={str(item): FileTask(file_path=str(item)) for item in items}, ) - + # Attach the custom processor to the task upload_task.processor = processor - + if user_id not in self.task_store: self.task_store[user_id] = {} self.task_store[user_id][task_id] = upload_task - + # Start background processing background_task = asyncio.create_task(self.background_custom_processor(user_id, task_id, items)) self.background_tasks.add(background_task) background_task.add_done_callback(self.background_tasks.discard) - + # Store reference to background task for cancellation upload_task.background_task = background_task - + return task_id async def background_upload_processor(self, user_id: str, task_id: str) -> None: @@ -63,25 +72,23 @@ class TaskService: upload_task = self.task_store[user_id][task_id] upload_task.status = TaskStatus.RUNNING upload_task.updated_at = time.time() - + # Process files with limited concurrency to avoid overwhelming the system max_workers = get_worker_count() semaphore = asyncio.Semaphore(max_workers * 2) # Allow 2x process pool size for async I/O - + async def process_with_semaphore(file_path: str): async with semaphore: await self.document_service.process_single_file_task(upload_task, file_path) - - tasks = [ - process_with_semaphore(file_path) - for file_path in upload_task.file_tasks.keys() - ] - + + tasks = [process_with_semaphore(file_path) for file_path in upload_task.file_tasks.keys()] + await asyncio.gather(*tasks, return_exceptions=True) - + except Exception as e: print(f"[ERROR] Background upload processor failed for task {task_id}: {e}") import traceback + traceback.print_exc() if user_id in self.task_store and task_id in self.task_store[user_id]: self.task_store[user_id][task_id].status = TaskStatus.FAILED @@ -93,24 +100,25 @@ class TaskService: upload_task = self.task_store[user_id][task_id] upload_task.status = TaskStatus.RUNNING upload_task.updated_at = time.time() - + processor = upload_task.processor - + # Process items with limited concurrency max_workers = get_worker_count() semaphore = asyncio.Semaphore(max_workers * 2) - + async def process_with_semaphore(item, item_key: str): async with semaphore: file_task = upload_task.file_tasks[item_key] file_task.status = TaskStatus.RUNNING file_task.updated_at = time.time() - + try: await processor.process_item(upload_task, item, file_task) except Exception as e: print(f"[ERROR] Failed to process item {item}: {e}") import traceback + traceback.print_exc() file_task.status = TaskStatus.FAILED file_task.error = str(e) @@ -119,18 +127,15 @@ class TaskService: file_task.updated_at = time.time() upload_task.processed_files += 1 upload_task.updated_at = time.time() - - tasks = [ - process_with_semaphore(item, str(item)) - for item in items - ] - + + tasks = [process_with_semaphore(item, str(item)) for item in items] + await asyncio.gather(*tasks, return_exceptions=True) - + # Mark task as completed upload_task.status = TaskStatus.COMPLETED upload_task.updated_at = time.time() - + except asyncio.CancelledError: print(f"[INFO] Background processor for task {task_id} was cancelled") if user_id in self.task_store and task_id in self.task_store[user_id]: @@ -140,6 +145,7 @@ class TaskService: except Exception as e: print(f"[ERROR] Background custom processor failed for task {task_id}: {e}") import traceback + traceback.print_exc() if user_id in self.task_store and task_id in self.task_store[user_id]: self.task_store[user_id][task_id].status = TaskStatus.FAILED @@ -147,13 +153,11 @@ class TaskService: def get_task_status(self, user_id: str, task_id: str) -> dict: """Get the status of a specific upload task""" - if (not task_id or - user_id not in self.task_store or - task_id not in self.task_store[user_id]): + if not task_id or user_id not in self.task_store or task_id not in self.task_store[user_id]: return None - + upload_task = self.task_store[user_id][task_id] - + file_statuses = {} for file_path, file_task in upload_task.file_tasks.items(): file_statuses[file_path] = { @@ -162,9 +166,9 @@ class TaskService: "error": file_task.error, "retry_count": file_task.retry_count, "created_at": file_task.created_at, - "updated_at": file_task.updated_at + "updated_at": file_task.updated_at, } - + return { "task_id": upload_task.task_id, "status": upload_task.status.value, @@ -174,51 +178,52 @@ class TaskService: "failed_files": upload_task.failed_files, "created_at": upload_task.created_at, "updated_at": upload_task.updated_at, - "files": file_statuses + "files": file_statuses, } - + def get_all_tasks(self, user_id: str) -> list: """Get all tasks for a user""" if user_id not in self.task_store: return [] - + tasks = [] for task_id, upload_task in self.task_store[user_id].items(): - tasks.append({ - "task_id": upload_task.task_id, - "status": upload_task.status.value, - "total_files": upload_task.total_files, - "processed_files": upload_task.processed_files, - "successful_files": upload_task.successful_files, - "failed_files": upload_task.failed_files, - "created_at": upload_task.created_at, - "updated_at": upload_task.updated_at - }) - + tasks.append( + { + "task_id": upload_task.task_id, + "status": upload_task.status.value, + "total_files": upload_task.total_files, + "processed_files": upload_task.processed_files, + "successful_files": upload_task.successful_files, + "failed_files": upload_task.failed_files, + "created_at": upload_task.created_at, + "updated_at": upload_task.updated_at, + } + ) + # Sort by creation time, most recent first tasks.sort(key=lambda x: x["created_at"], reverse=True) return tasks - + def cancel_task(self, user_id: str, task_id: str) -> bool: """Cancel a task if it exists and is not already completed""" - if (user_id not in self.task_store or - task_id not in self.task_store[user_id]): + if user_id not in self.task_store or task_id not in self.task_store[user_id]: return False - + upload_task = self.task_store[user_id][task_id] - + # Can only cancel pending or running tasks if upload_task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED]: return False - + # Cancel the background task to stop scheduling new work - if hasattr(upload_task, 'background_task') and not upload_task.background_task.done(): + if hasattr(upload_task, "background_task") and not upload_task.background_task.done(): upload_task.background_task.cancel() - + # Mark task as failed (cancelled) upload_task.status = TaskStatus.FAILED upload_task.updated_at = time.time() - + # Mark all pending file tasks as failed for file_task in upload_task.file_tasks.values(): if file_task.status == TaskStatus.PENDING: @@ -226,10 +231,10 @@ class TaskService: file_task.error = "Task cancelled by user" file_task.updated_at = time.time() upload_task.failed_files += 1 - + return True - + def shutdown(self): """Cleanup process pool""" - if hasattr(self, 'process_pool'): - self.process_pool.shutdown(wait=True) \ No newline at end of file + if hasattr(self, "process_pool"): + self.process_pool.shutdown(wait=True)