From 796fd28b8fe13293483d44b12eb979f788ae3ed6 Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Fri, 5 Sep 2025 12:14:33 -0400 Subject: [PATCH 1/2] Enhance task management to support shared tasks for anonymous users. Updated `get_task_status` and `get_all_tasks` methods to include fallback to tasks stored under the "anonymous" user key. Improved task cancellation logic to handle shared tasks. Refactored code for clarity and maintainability. --- src/services/task_service.py | 81 ++++++++++++++++++++++++++---------- 1 file changed, 60 insertions(+), 21 deletions(-) diff --git a/src/services/task_service.py b/src/services/task_service.py index bd2a73d7..257ae2ce 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -2,9 +2,10 @@ import asyncio import uuid import time import random -from typing import Dict +from typing import Dict, Optional from models.tasks import TaskStatus, UploadTask, FileTask +from session_manager import AnonymousUser from src.utils.gpu_detection import get_worker_count from utils.logging_config import get_logger @@ -179,16 +180,29 @@ class TaskService: self.task_store[user_id][task_id].status = TaskStatus.FAILED self.task_store[user_id][task_id].updated_at = time.time() - def get_task_status(self, user_id: str, task_id: str) -> dict: - """Get the status of a specific upload task""" - if ( - not task_id - or user_id not in self.task_store - or task_id not in self.task_store[user_id] - ): + def get_task_status(self, user_id: str, task_id: str) -> Optional[dict]: + """Get the status of a specific upload task + + Includes fallback to shared tasks stored under the "anonymous" user key + so default system tasks are visible to all users. + """ + if not task_id: return None - upload_task = self.task_store[user_id][task_id] + # Prefer the caller's user_id; otherwise check shared/anonymous tasks + candidate_user_ids = [user_id, AnonymousUser().user_id] + + upload_task = None + for candidate_user_id in candidate_user_ids: + if ( + candidate_user_id in self.task_store + and task_id in self.task_store[candidate_user_id] + ): + upload_task = self.task_store[candidate_user_id][task_id] + break + + if upload_task is None: + return None file_statuses = {} for file_path, file_task in upload_task.file_tasks.items(): @@ -214,14 +228,21 @@ class TaskService: } def get_all_tasks(self, user_id: str) -> list: - """Get all tasks for a user""" - if user_id not in self.task_store: - return [] + """Get all tasks for a user - tasks = [] - for task_id, upload_task in self.task_store[user_id].items(): - tasks.append( - { + Returns the union of the user's own tasks and shared default tasks stored + under the "anonymous" user key. User-owned tasks take precedence + if a task_id overlaps. + """ + tasks_by_id = {} + + def add_tasks_from_store(store_user_id): + if store_user_id not in self.task_store: + return + for task_id, upload_task in self.task_store[store_user_id].items(): + if task_id in tasks_by_id: + continue + tasks_by_id[task_id] = { "task_id": upload_task.task_id, "status": upload_task.status.value, "total_files": upload_task.total_files, @@ -231,18 +252,36 @@ class TaskService: "created_at": upload_task.created_at, "updated_at": upload_task.updated_at, } - ) - # Sort by creation time, most recent first + # First, add user-owned tasks; then shared anonymous; + add_tasks_from_store(user_id) + add_tasks_from_store(AnonymousUser().user_id) + + tasks = list(tasks_by_id.values()) tasks.sort(key=lambda x: x["created_at"], reverse=True) return tasks def cancel_task(self, user_id: str, task_id: str) -> bool: - """Cancel a task if it exists and is not already completed""" - if user_id not in self.task_store or task_id not in self.task_store[user_id]: + """Cancel a task if it exists and is not already completed. + + Supports cancellation of shared default tasks stored under the anonymous user. + """ + # Try user's own tasks first, then shared anonymous tasks + candidate_user_ids = [user_id, AnonymousUser().user_id] + + store_user_id = None + for candidate_user_id in candidate_user_ids: + if ( + candidate_user_id in self.task_store + and task_id in self.task_store[candidate_user_id] + ): + store_user_id = candidate_user_id + break + + if store_user_id is None: return False - upload_task = self.task_store[user_id][task_id] + upload_task = self.task_store[store_user_id][task_id] # Can only cancel pending or running tasks if upload_task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED]: From 947c7f8f3b00f1b9232099cc6920720589228028 Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Fri, 5 Sep 2025 12:24:28 -0400 Subject: [PATCH 2/2] shared tasks --- src/services/task_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/services/task_service.py b/src/services/task_service.py index 257ae2ce..695752d8 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -266,7 +266,7 @@ class TaskService: Supports cancellation of shared default tasks stored under the anonymous user. """ - # Try user's own tasks first, then shared anonymous tasks + # Check candidate user IDs first, then anonymous to find which user ID the task is mapped to candidate_user_ids = [user_id, AnonymousUser().user_id] store_user_id = None