added files on get all tasks on backend
This commit is contained in:
parent
526e4a9f48
commit
5f68fa1a9a
1 changed files with 53 additions and 13 deletions
|
|
@ -17,7 +17,9 @@ class TaskService:
|
|||
def __init__(self, document_service=None, process_pool=None):
|
||||
self.document_service = document_service
|
||||
self.process_pool = process_pool
|
||||
self.task_store: dict[str, dict[str, UploadTask]] = {} # user_id -> {task_id -> UploadTask}
|
||||
self.task_store: dict[
|
||||
str, dict[str, UploadTask]
|
||||
] = {} # user_id -> {task_id -> UploadTask}
|
||||
self.background_tasks = set()
|
||||
|
||||
if self.process_pool is None:
|
||||
|
|
@ -122,18 +124,27 @@ class TaskService:
|
|||
|
||||
# Process files with limited concurrency to avoid overwhelming the system
|
||||
max_workers = get_worker_count()
|
||||
semaphore = asyncio.Semaphore(max_workers * 2) # Allow 2x process pool size for async I/O
|
||||
semaphore = asyncio.Semaphore(
|
||||
max_workers * 2
|
||||
) # Allow 2x process pool size for async I/O
|
||||
|
||||
async def process_with_semaphore(file_path: str):
|
||||
async with semaphore:
|
||||
await self.document_service.process_single_file_task(upload_task, file_path)
|
||||
await self.document_service.process_single_file_task(
|
||||
upload_task, file_path
|
||||
)
|
||||
|
||||
tasks = [process_with_semaphore(file_path) for file_path in upload_task.file_tasks.keys()]
|
||||
tasks = [
|
||||
process_with_semaphore(file_path)
|
||||
for file_path in upload_task.file_tasks.keys()
|
||||
]
|
||||
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Background upload processor failed", task_id=task_id, error=str(e))
|
||||
logger.error(
|
||||
"Background upload processor failed", task_id=task_id, error=str(e)
|
||||
)
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
|
|
@ -141,7 +152,9 @@ class TaskService:
|
|||
self.task_store[user_id][task_id].status = TaskStatus.FAILED
|
||||
self.task_store[user_id][task_id].updated_at = time.time()
|
||||
|
||||
async def background_custom_processor(self, user_id: str, task_id: str, items: list) -> None:
|
||||
async def background_custom_processor(
|
||||
self, user_id: str, task_id: str, items: list
|
||||
) -> None:
|
||||
"""Background task to process items using custom processor"""
|
||||
try:
|
||||
upload_task = self.task_store[user_id][task_id]
|
||||
|
|
@ -163,7 +176,9 @@ class TaskService:
|
|||
try:
|
||||
await processor.process_item(upload_task, item, file_task)
|
||||
except Exception as e:
|
||||
logger.error("Failed to process item", item=str(item), error=str(e))
|
||||
logger.error(
|
||||
"Failed to process item", item=str(item), error=str(e)
|
||||
)
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
|
|
@ -190,7 +205,9 @@ class TaskService:
|
|||
pass
|
||||
raise # Re-raise to properly handle cancellation
|
||||
except Exception as e:
|
||||
logger.error("Background custom processor failed", task_id=task_id, error=str(e))
|
||||
logger.error(
|
||||
"Background custom processor failed", task_id=task_id, error=str(e)
|
||||
)
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
|
|
@ -212,7 +229,10 @@ class TaskService:
|
|||
|
||||
upload_task = None
|
||||
for candidate_user_id in candidate_user_ids:
|
||||
if candidate_user_id in self.task_store and task_id in self.task_store[candidate_user_id]:
|
||||
if (
|
||||
candidate_user_id in self.task_store
|
||||
and task_id in self.task_store[candidate_user_id]
|
||||
):
|
||||
upload_task = self.task_store[candidate_user_id][task_id]
|
||||
break
|
||||
|
||||
|
|
@ -271,10 +291,23 @@ class TaskService:
|
|||
if task_id in tasks_by_id:
|
||||
continue
|
||||
|
||||
# Calculate running and pending counts
|
||||
# Calculate running and pending counts and build file statuses
|
||||
running_files_count = 0
|
||||
pending_files_count = 0
|
||||
for file_task in upload_task.file_tasks.values():
|
||||
file_statuses = {}
|
||||
|
||||
for file_path, file_task in upload_task.file_tasks.items():
|
||||
if file_task.status.value != "completed":
|
||||
file_statuses[file_path] = {
|
||||
"status": file_task.status.value,
|
||||
"result": file_task.result,
|
||||
"error": file_task.error,
|
||||
"retry_count": file_task.retry_count,
|
||||
"created_at": file_task.created_at,
|
||||
"updated_at": file_task.updated_at,
|
||||
"duration_seconds": file_task.duration_seconds,
|
||||
}
|
||||
|
||||
if file_task.status.value == "running":
|
||||
running_files_count += 1
|
||||
elif file_task.status.value == "pending":
|
||||
|
|
@ -292,6 +325,7 @@ class TaskService:
|
|||
"created_at": upload_task.created_at,
|
||||
"updated_at": upload_task.updated_at,
|
||||
"duration_seconds": upload_task.duration_seconds,
|
||||
"files": file_statuses,
|
||||
}
|
||||
|
||||
# First, add user-owned tasks; then shared anonymous;
|
||||
|
|
@ -312,7 +346,10 @@ class TaskService:
|
|||
|
||||
store_user_id = None
|
||||
for candidate_user_id in candidate_user_ids:
|
||||
if candidate_user_id in self.task_store and task_id in self.task_store[candidate_user_id]:
|
||||
if (
|
||||
candidate_user_id in self.task_store
|
||||
and task_id in self.task_store[candidate_user_id]
|
||||
):
|
||||
store_user_id = candidate_user_id
|
||||
break
|
||||
|
||||
|
|
@ -326,7 +363,10 @@ class TaskService:
|
|||
return False
|
||||
|
||||
# Cancel the background task to stop scheduling new work
|
||||
if hasattr(upload_task, "background_task") and not upload_task.background_task.done():
|
||||
if (
|
||||
hasattr(upload_task, "background_task")
|
||||
and not upload_task.background_task.done()
|
||||
):
|
||||
upload_task.background_task.cancel()
|
||||
|
||||
# Mark task as failed (cancelled)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue