Merge pull request #18 from langflow-ai/shared-tasks

feat: Enhance task management to support shared tasks
This commit is contained in:
Sebastián Estévez 2025-09-05 12:25:38 -04:00 committed by GitHub
commit 6debe0b0bf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -2,9 +2,10 @@ import asyncio
import uuid
import time
import random
from typing import Dict
from typing import Dict, Optional
from models.tasks import TaskStatus, UploadTask, FileTask
from session_manager import AnonymousUser
from src.utils.gpu_detection import get_worker_count
from utils.logging_config import get_logger
@ -179,16 +180,29 @@ class TaskService:
self.task_store[user_id][task_id].status = TaskStatus.FAILED
self.task_store[user_id][task_id].updated_at = time.time()
def get_task_status(self, user_id: str, task_id: str) -> dict:
"""Get the status of a specific upload task"""
if (
not task_id
or user_id not in self.task_store
or task_id not in self.task_store[user_id]
):
def get_task_status(self, user_id: str, task_id: str) -> Optional[dict]:
"""Get the status of a specific upload task
Includes fallback to shared tasks stored under the "anonymous" user key
so default system tasks are visible to all users.
"""
if not task_id:
return None
upload_task = self.task_store[user_id][task_id]
# Prefer the caller's user_id; otherwise check shared/anonymous tasks
candidate_user_ids = [user_id, AnonymousUser().user_id]
upload_task = None
for candidate_user_id in candidate_user_ids:
if (
candidate_user_id in self.task_store
and task_id in self.task_store[candidate_user_id]
):
upload_task = self.task_store[candidate_user_id][task_id]
break
if upload_task is None:
return None
file_statuses = {}
for file_path, file_task in upload_task.file_tasks.items():
@ -214,14 +228,21 @@ class TaskService:
}
def get_all_tasks(self, user_id: str) -> list:
"""Get all tasks for a user"""
if user_id not in self.task_store:
return []
"""Get all tasks for a user
tasks = []
for task_id, upload_task in self.task_store[user_id].items():
tasks.append(
{
Returns the union of the user's own tasks and shared default tasks stored
under the "anonymous" user key. User-owned tasks take precedence
if a task_id overlaps.
"""
tasks_by_id = {}
def add_tasks_from_store(store_user_id):
if store_user_id not in self.task_store:
return
for task_id, upload_task in self.task_store[store_user_id].items():
if task_id in tasks_by_id:
continue
tasks_by_id[task_id] = {
"task_id": upload_task.task_id,
"status": upload_task.status.value,
"total_files": upload_task.total_files,
@ -231,18 +252,36 @@ class TaskService:
"created_at": upload_task.created_at,
"updated_at": upload_task.updated_at,
}
)
# Sort by creation time, most recent first
# First, add user-owned tasks; then shared anonymous;
add_tasks_from_store(user_id)
add_tasks_from_store(AnonymousUser().user_id)
tasks = list(tasks_by_id.values())
tasks.sort(key=lambda x: x["created_at"], reverse=True)
return tasks
def cancel_task(self, user_id: str, task_id: str) -> bool:
"""Cancel a task if it exists and is not already completed"""
if user_id not in self.task_store or task_id not in self.task_store[user_id]:
"""Cancel a task if it exists and is not already completed.
Supports cancellation of shared default tasks stored under the anonymous user.
"""
# Check candidate user IDs first, then anonymous to find which user ID the task is mapped to
candidate_user_ids = [user_id, AnonymousUser().user_id]
store_user_id = None
for candidate_user_id in candidate_user_ids:
if (
candidate_user_id in self.task_store
and task_id in self.task_store[candidate_user_id]
):
store_user_id = candidate_user_id
break
if store_user_id is None:
return False
upload_task = self.task_store[user_id][task_id]
upload_task = self.task_store[store_user_id][task_id]
# Can only cancel pending or running tasks
if upload_task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED]: