Enhance task management to support shared tasks for anonymous users. Updated get_task_status and get_all_tasks methods to include fallback to tasks stored under the "anonymous" user key. Improved task cancellation logic to handle shared tasks. Refactored code for clarity and maintainability.

This commit is contained in:
Edwin Jose 2025-09-05 12:14:33 -04:00
parent e983d3c27a
commit 796fd28b8f

View file

@ -2,9 +2,10 @@ import asyncio
import uuid
import time
import random
from typing import Dict
from typing import Dict, Optional
from models.tasks import TaskStatus, UploadTask, FileTask
from session_manager import AnonymousUser
from src.utils.gpu_detection import get_worker_count
from utils.logging_config import get_logger
@ -179,16 +180,29 @@ class TaskService:
self.task_store[user_id][task_id].status = TaskStatus.FAILED
self.task_store[user_id][task_id].updated_at = time.time()
def get_task_status(self, user_id: str, task_id: str) -> dict:
"""Get the status of a specific upload task"""
if (
not task_id
or user_id not in self.task_store
or task_id not in self.task_store[user_id]
):
def get_task_status(self, user_id: str, task_id: str) -> Optional[dict]:
"""Get the status of a specific upload task
Includes fallback to shared tasks stored under the "anonymous" user key
so default system tasks are visible to all users.
"""
if not task_id:
return None
upload_task = self.task_store[user_id][task_id]
# Prefer the caller's user_id; otherwise check shared/anonymous tasks
candidate_user_ids = [user_id, AnonymousUser().user_id]
upload_task = None
for candidate_user_id in candidate_user_ids:
if (
candidate_user_id in self.task_store
and task_id in self.task_store[candidate_user_id]
):
upload_task = self.task_store[candidate_user_id][task_id]
break
if upload_task is None:
return None
file_statuses = {}
for file_path, file_task in upload_task.file_tasks.items():
@ -214,14 +228,21 @@ class TaskService:
}
def get_all_tasks(self, user_id: str) -> list:
"""Get all tasks for a user"""
if user_id not in self.task_store:
return []
"""Get all tasks for a user
tasks = []
for task_id, upload_task in self.task_store[user_id].items():
tasks.append(
{
Returns the union of the user's own tasks and shared default tasks stored
under the "anonymous" user key. User-owned tasks take precedence
if a task_id overlaps.
"""
tasks_by_id = {}
def add_tasks_from_store(store_user_id):
if store_user_id not in self.task_store:
return
for task_id, upload_task in self.task_store[store_user_id].items():
if task_id in tasks_by_id:
continue
tasks_by_id[task_id] = {
"task_id": upload_task.task_id,
"status": upload_task.status.value,
"total_files": upload_task.total_files,
@ -231,18 +252,36 @@ class TaskService:
"created_at": upload_task.created_at,
"updated_at": upload_task.updated_at,
}
)
# Sort by creation time, most recent first
# First, add user-owned tasks; then shared anonymous;
add_tasks_from_store(user_id)
add_tasks_from_store(AnonymousUser().user_id)
tasks = list(tasks_by_id.values())
tasks.sort(key=lambda x: x["created_at"], reverse=True)
return tasks
def cancel_task(self, user_id: str, task_id: str) -> bool:
"""Cancel a task if it exists and is not already completed"""
if user_id not in self.task_store or task_id not in self.task_store[user_id]:
"""Cancel a task if it exists and is not already completed.
Supports cancellation of shared default tasks stored under the anonymous user.
"""
# Try user's own tasks first, then shared anonymous tasks
candidate_user_ids = [user_id, AnonymousUser().user_id]
store_user_id = None
for candidate_user_id in candidate_user_ids:
if (
candidate_user_id in self.task_store
and task_id in self.task_store[candidate_user_id]
):
store_user_id = candidate_user_id
break
if store_user_id is None:
return False
upload_task = self.task_store[user_id][task_id]
upload_task = self.task_store[store_user_id][task_id]
# Can only cancel pending or running tasks
if upload_task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED]: