fix imports in task_service
This commit is contained in:
parent
1df2594698
commit
327b435814
1 changed files with 17 additions and 44 deletions
|
|
@ -1,10 +1,11 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import random
|
import random
|
||||||
from typing import Dict, Optional
|
import time
|
||||||
|
import uuid
|
||||||
|
|
||||||
from models.tasks import TaskStatus, UploadTask, FileTask
|
from models.tasks import FileTask, TaskStatus, UploadTask
|
||||||
from utils.gpu_detection import get_worker_count
|
|
||||||
from session_manager import AnonymousUser
|
from session_manager import AnonymousUser
|
||||||
|
from utils.gpu_detection import get_worker_count
|
||||||
from utils.logging_config import get_logger
|
from utils.logging_config import get_logger
|
||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
|
|
@ -14,9 +15,7 @@ class TaskService:
|
||||||
def __init__(self, document_service=None, process_pool=None):
|
def __init__(self, document_service=None, process_pool=None):
|
||||||
self.document_service = document_service
|
self.document_service = document_service
|
||||||
self.process_pool = process_pool
|
self.process_pool = process_pool
|
||||||
self.task_store: Dict[
|
self.task_store: dict[str, dict[str, UploadTask]] = {} # user_id -> {task_id -> UploadTask}
|
||||||
str, Dict[str, UploadTask]
|
|
||||||
] = {} # user_id -> {task_id -> UploadTask}
|
|
||||||
self.background_tasks = set()
|
self.background_tasks = set()
|
||||||
|
|
||||||
if self.process_pool is None:
|
if self.process_pool is None:
|
||||||
|
|
@ -67,9 +66,7 @@ class TaskService:
|
||||||
self.task_store[user_id][task_id] = upload_task
|
self.task_store[user_id][task_id] = upload_task
|
||||||
|
|
||||||
# Start background processing
|
# Start background processing
|
||||||
background_task = asyncio.create_task(
|
background_task = asyncio.create_task(self.background_custom_processor(user_id, task_id, items))
|
||||||
self.background_custom_processor(user_id, task_id, items)
|
|
||||||
)
|
|
||||||
self.background_tasks.add(background_task)
|
self.background_tasks.add(background_task)
|
||||||
background_task.add_done_callback(self.background_tasks.discard)
|
background_task.add_done_callback(self.background_tasks.discard)
|
||||||
|
|
||||||
|
|
@ -87,27 +84,18 @@ class TaskService:
|
||||||
|
|
||||||
# Process files with limited concurrency to avoid overwhelming the system
|
# Process files with limited concurrency to avoid overwhelming the system
|
||||||
max_workers = get_worker_count()
|
max_workers = get_worker_count()
|
||||||
semaphore = asyncio.Semaphore(
|
semaphore = asyncio.Semaphore(max_workers * 2) # Allow 2x process pool size for async I/O
|
||||||
max_workers * 2
|
|
||||||
) # Allow 2x process pool size for async I/O
|
|
||||||
|
|
||||||
async def process_with_semaphore(file_path: str):
|
async def process_with_semaphore(file_path: str):
|
||||||
async with semaphore:
|
async with semaphore:
|
||||||
await self.document_service.process_single_file_task(
|
await self.document_service.process_single_file_task(upload_task, file_path)
|
||||||
upload_task, file_path
|
|
||||||
)
|
|
||||||
|
|
||||||
tasks = [
|
tasks = [process_with_semaphore(file_path) for file_path in upload_task.file_tasks.keys()]
|
||||||
process_with_semaphore(file_path)
|
|
||||||
for file_path in upload_task.file_tasks.keys()
|
|
||||||
]
|
|
||||||
|
|
||||||
await asyncio.gather(*tasks, return_exceptions=True)
|
await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(
|
logger.error("Background upload processor failed", task_id=task_id, error=str(e))
|
||||||
"Background upload processor failed", task_id=task_id, error=str(e)
|
|
||||||
)
|
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
|
|
@ -115,9 +103,7 @@ class TaskService:
|
||||||
self.task_store[user_id][task_id].status = TaskStatus.FAILED
|
self.task_store[user_id][task_id].status = TaskStatus.FAILED
|
||||||
self.task_store[user_id][task_id].updated_at = time.time()
|
self.task_store[user_id][task_id].updated_at = time.time()
|
||||||
|
|
||||||
async def background_custom_processor(
|
async def background_custom_processor(self, user_id: str, task_id: str, items: list) -> None:
|
||||||
self, user_id: str, task_id: str, items: list
|
|
||||||
) -> None:
|
|
||||||
"""Background task to process items using custom processor"""
|
"""Background task to process items using custom processor"""
|
||||||
try:
|
try:
|
||||||
upload_task = self.task_store[user_id][task_id]
|
upload_task = self.task_store[user_id][task_id]
|
||||||
|
|
@ -139,9 +125,7 @@ class TaskService:
|
||||||
try:
|
try:
|
||||||
await processor.process_item(upload_task, item, file_task)
|
await processor.process_item(upload_task, item, file_task)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(
|
logger.error("Failed to process item", item=str(item), error=str(e))
|
||||||
"Failed to process item", item=str(item), error=str(e)
|
|
||||||
)
|
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
|
|
@ -168,9 +152,7 @@ class TaskService:
|
||||||
pass
|
pass
|
||||||
raise # Re-raise to properly handle cancellation
|
raise # Re-raise to properly handle cancellation
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(
|
logger.error("Background custom processor failed", task_id=task_id, error=str(e))
|
||||||
"Background custom processor failed", task_id=task_id, error=str(e)
|
|
||||||
)
|
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
|
|
@ -178,7 +160,7 @@ class TaskService:
|
||||||
self.task_store[user_id][task_id].status = TaskStatus.FAILED
|
self.task_store[user_id][task_id].status = TaskStatus.FAILED
|
||||||
self.task_store[user_id][task_id].updated_at = time.time()
|
self.task_store[user_id][task_id].updated_at = time.time()
|
||||||
|
|
||||||
def get_task_status(self, user_id: str, task_id: str) -> Optional[dict]:
|
def get_task_status(self, user_id: str, task_id: str) -> dict | None:
|
||||||
"""Get the status of a specific upload task
|
"""Get the status of a specific upload task
|
||||||
|
|
||||||
Includes fallback to shared tasks stored under the "anonymous" user key
|
Includes fallback to shared tasks stored under the "anonymous" user key
|
||||||
|
|
@ -192,10 +174,7 @@ class TaskService:
|
||||||
|
|
||||||
upload_task = None
|
upload_task = None
|
||||||
for candidate_user_id in candidate_user_ids:
|
for candidate_user_id in candidate_user_ids:
|
||||||
if (
|
if candidate_user_id in self.task_store and task_id in self.task_store[candidate_user_id]:
|
||||||
candidate_user_id in self.task_store
|
|
||||||
and task_id in self.task_store[candidate_user_id]
|
|
||||||
):
|
|
||||||
upload_task = self.task_store[candidate_user_id][task_id]
|
upload_task = self.task_store[candidate_user_id][task_id]
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
@ -269,10 +248,7 @@ class TaskService:
|
||||||
|
|
||||||
store_user_id = None
|
store_user_id = None
|
||||||
for candidate_user_id in candidate_user_ids:
|
for candidate_user_id in candidate_user_ids:
|
||||||
if (
|
if candidate_user_id in self.task_store and task_id in self.task_store[candidate_user_id]:
|
||||||
candidate_user_id in self.task_store
|
|
||||||
and task_id in self.task_store[candidate_user_id]
|
|
||||||
):
|
|
||||||
store_user_id = candidate_user_id
|
store_user_id = candidate_user_id
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
@ -286,10 +262,7 @@ class TaskService:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# Cancel the background task to stop scheduling new work
|
# Cancel the background task to stop scheduling new work
|
||||||
if (
|
if hasattr(upload_task, "background_task") and not upload_task.background_task.done():
|
||||||
hasattr(upload_task, "background_task")
|
|
||||||
and not upload_task.background_task.done()
|
|
||||||
):
|
|
||||||
upload_task.background_task.cancel()
|
upload_task.background_task.cancel()
|
||||||
|
|
||||||
# Mark task as failed (cancelled)
|
# Mark task as failed (cancelled)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue