diff --git a/src/services/task_service.py b/src/services/task_service.py index 0537e933..705f6f3c 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -1,10 +1,11 @@ import asyncio import random -from typing import Dict, Optional +import time +import uuid -from models.tasks import TaskStatus, UploadTask, FileTask -from utils.gpu_detection import get_worker_count +from models.tasks import FileTask, TaskStatus, UploadTask from session_manager import AnonymousUser +from utils.gpu_detection import get_worker_count from utils.logging_config import get_logger logger = get_logger(__name__) @@ -14,9 +15,7 @@ class TaskService: def __init__(self, document_service=None, process_pool=None): self.document_service = document_service self.process_pool = process_pool - self.task_store: Dict[ - str, Dict[str, UploadTask] - ] = {} # user_id -> {task_id -> UploadTask} + self.task_store: dict[str, dict[str, UploadTask]] = {} # user_id -> {task_id -> UploadTask} self.background_tasks = set() if self.process_pool is None: @@ -67,9 +66,7 @@ class TaskService: self.task_store[user_id][task_id] = upload_task # Start background processing - background_task = asyncio.create_task( - self.background_custom_processor(user_id, task_id, items) - ) + background_task = asyncio.create_task(self.background_custom_processor(user_id, task_id, items)) self.background_tasks.add(background_task) background_task.add_done_callback(self.background_tasks.discard) @@ -87,27 +84,18 @@ class TaskService: # Process files with limited concurrency to avoid overwhelming the system max_workers = get_worker_count() - semaphore = asyncio.Semaphore( - max_workers * 2 - ) # Allow 2x process pool size for async I/O + semaphore = asyncio.Semaphore(max_workers * 2) # Allow 2x process pool size for async I/O async def process_with_semaphore(file_path: str): async with semaphore: - await self.document_service.process_single_file_task( - upload_task, file_path - ) + await self.document_service.process_single_file_task(upload_task, file_path) - tasks = [ - process_with_semaphore(file_path) - for file_path in upload_task.file_tasks.keys() - ] + tasks = [process_with_semaphore(file_path) for file_path in upload_task.file_tasks.keys()] await asyncio.gather(*tasks, return_exceptions=True) except Exception as e: - logger.error( - "Background upload processor failed", task_id=task_id, error=str(e) - ) + logger.error("Background upload processor failed", task_id=task_id, error=str(e)) import traceback traceback.print_exc() @@ -115,9 +103,7 @@ class TaskService: self.task_store[user_id][task_id].status = TaskStatus.FAILED self.task_store[user_id][task_id].updated_at = time.time() - async def background_custom_processor( - self, user_id: str, task_id: str, items: list - ) -> None: + async def background_custom_processor(self, user_id: str, task_id: str, items: list) -> None: """Background task to process items using custom processor""" try: upload_task = self.task_store[user_id][task_id] @@ -139,9 +125,7 @@ class TaskService: try: await processor.process_item(upload_task, item, file_task) except Exception as e: - logger.error( - "Failed to process item", item=str(item), error=str(e) - ) + logger.error("Failed to process item", item=str(item), error=str(e)) import traceback traceback.print_exc() @@ -168,9 +152,7 @@ class TaskService: pass raise # Re-raise to properly handle cancellation except Exception as e: - logger.error( - "Background custom processor failed", task_id=task_id, error=str(e) - ) + logger.error("Background custom processor failed", task_id=task_id, error=str(e)) import traceback traceback.print_exc() @@ -178,7 +160,7 @@ class TaskService: self.task_store[user_id][task_id].status = TaskStatus.FAILED self.task_store[user_id][task_id].updated_at = time.time() - def get_task_status(self, user_id: str, task_id: str) -> Optional[dict]: + def get_task_status(self, user_id: str, task_id: str) -> dict | None: """Get the status of a specific upload task Includes fallback to shared tasks stored under the "anonymous" user key @@ -192,10 +174,7 @@ class TaskService: upload_task = None for candidate_user_id in candidate_user_ids: - if ( - candidate_user_id in self.task_store - and task_id in self.task_store[candidate_user_id] - ): + if candidate_user_id in self.task_store and task_id in self.task_store[candidate_user_id]: upload_task = self.task_store[candidate_user_id][task_id] break @@ -269,10 +248,7 @@ class TaskService: store_user_id = None for candidate_user_id in candidate_user_ids: - if ( - candidate_user_id in self.task_store - and task_id in self.task_store[candidate_user_id] - ): + if candidate_user_id in self.task_store and task_id in self.task_store[candidate_user_id]: store_user_id = candidate_user_id break @@ -286,10 +262,7 @@ class TaskService: return False # Cancel the background task to stop scheduling new work - if ( - hasattr(upload_task, "background_task") - and not upload_task.background_task.done() - ): + if hasattr(upload_task, "background_task") and not upload_task.background_task.done(): upload_task.background_task.cancel() # Mark task as failed (cancelled)