diff --git a/src/services/task_service.py b/src/services/task_service.py index c9328b90..de297dff 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -17,7 +17,9 @@ class TaskService: def __init__(self, document_service=None, process_pool=None): self.document_service = document_service self.process_pool = process_pool - self.task_store: dict[str, dict[str, UploadTask]] = {} # user_id -> {task_id -> UploadTask} + self.task_store: dict[ + str, dict[str, UploadTask] + ] = {} # user_id -> {task_id -> UploadTask} self.background_tasks = set() if self.process_pool is None: @@ -122,18 +124,27 @@ class TaskService: # Process files with limited concurrency to avoid overwhelming the system max_workers = get_worker_count() - semaphore = asyncio.Semaphore(max_workers * 2) # Allow 2x process pool size for async I/O + semaphore = asyncio.Semaphore( + max_workers * 2 + ) # Allow 2x process pool size for async I/O async def process_with_semaphore(file_path: str): async with semaphore: - await self.document_service.process_single_file_task(upload_task, file_path) + await self.document_service.process_single_file_task( + upload_task, file_path + ) - tasks = [process_with_semaphore(file_path) for file_path in upload_task.file_tasks.keys()] + tasks = [ + process_with_semaphore(file_path) + for file_path in upload_task.file_tasks.keys() + ] await asyncio.gather(*tasks, return_exceptions=True) except Exception as e: - logger.error("Background upload processor failed", task_id=task_id, error=str(e)) + logger.error( + "Background upload processor failed", task_id=task_id, error=str(e) + ) import traceback traceback.print_exc() @@ -141,7 +152,9 @@ class TaskService: self.task_store[user_id][task_id].status = TaskStatus.FAILED self.task_store[user_id][task_id].updated_at = time.time() - async def background_custom_processor(self, user_id: str, task_id: str, items: list) -> None: + async def background_custom_processor( + self, user_id: str, task_id: str, items: list + ) -> None: """Background task to process items using custom processor""" try: upload_task = self.task_store[user_id][task_id] @@ -163,7 +176,9 @@ class TaskService: try: await processor.process_item(upload_task, item, file_task) except Exception as e: - logger.error("Failed to process item", item=str(item), error=str(e)) + logger.error( + "Failed to process item", item=str(item), error=str(e) + ) import traceback traceback.print_exc() @@ -190,7 +205,9 @@ class TaskService: pass raise # Re-raise to properly handle cancellation except Exception as e: - logger.error("Background custom processor failed", task_id=task_id, error=str(e)) + logger.error( + "Background custom processor failed", task_id=task_id, error=str(e) + ) import traceback traceback.print_exc() @@ -212,7 +229,10 @@ class TaskService: upload_task = None for candidate_user_id in candidate_user_ids: - if candidate_user_id in self.task_store and task_id in self.task_store[candidate_user_id]: + if ( + candidate_user_id in self.task_store + and task_id in self.task_store[candidate_user_id] + ): upload_task = self.task_store[candidate_user_id][task_id] break @@ -271,10 +291,23 @@ class TaskService: if task_id in tasks_by_id: continue - # Calculate running and pending counts + # Calculate running and pending counts and build file statuses running_files_count = 0 pending_files_count = 0 - for file_task in upload_task.file_tasks.values(): + file_statuses = {} + + for file_path, file_task in upload_task.file_tasks.items(): + if file_task.status.value != "completed": + file_statuses[file_path] = { + "status": file_task.status.value, + "result": file_task.result, + "error": file_task.error, + "retry_count": file_task.retry_count, + "created_at": file_task.created_at, + "updated_at": file_task.updated_at, + "duration_seconds": file_task.duration_seconds, + } + if file_task.status.value == "running": running_files_count += 1 elif file_task.status.value == "pending": @@ -292,6 +325,7 @@ class TaskService: "created_at": upload_task.created_at, "updated_at": upload_task.updated_at, "duration_seconds": upload_task.duration_seconds, + "files": file_statuses, } # First, add user-owned tasks; then shared anonymous; @@ -312,7 +346,10 @@ class TaskService: store_user_id = None for candidate_user_id in candidate_user_ids: - if candidate_user_id in self.task_store and task_id in self.task_store[candidate_user_id]: + if ( + candidate_user_id in self.task_store + and task_id in self.task_store[candidate_user_id] + ): store_user_id = candidate_user_id break @@ -326,7 +363,10 @@ class TaskService: return False # Cancel the background task to stop scheduling new work - if hasattr(upload_task, "background_task") and not upload_task.background_task.done(): + if ( + hasattr(upload_task, "background_task") + and not upload_task.background_task.done() + ): upload_task.background_task.cancel() # Mark task as failed (cancelled)