From 3ed0bcef3faa10c38ade27d109bb1e61ae1c6e8f Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Mon, 8 Sep 2025 22:35:18 -0400 Subject: [PATCH] create a task service --- src/api/langflow_files.py | 30 +++++++++++++++++++++++++----- src/main.py | 1 + src/models/processors.py | 38 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 64 insertions(+), 5 deletions(-) diff --git a/src/api/langflow_files.py b/src/api/langflow_files.py index 23bd33ec..faa1b85a 100644 --- a/src/api/langflow_files.py +++ b/src/api/langflow_files.py @@ -8,7 +8,7 @@ logger = get_logger(__name__) async def upload_user_file( - request: Request, langflow_file_service: LangflowFileService, session_manager + request: Request, langflow_file_service: LangflowFileService, task_service, session_manager ): try: logger.debug("upload_user_file endpoint called") @@ -33,10 +33,30 @@ async def upload_user_file( jwt_token = getattr(request.state, "jwt_token", None) logger.debug("JWT token status", jwt_present=jwt_token is not None) - logger.debug("Calling langflow_file_service.upload_user_file") - result = await langflow_file_service.upload_user_file(file_tuple, jwt_token) - logger.debug("Upload successful", result=result) - return JSONResponse(result, status_code=201) + # Get user info for task management + user = getattr(request.state, "user", None) + user_id = user.user_id if user else "anonymous" + + # Create processor for Langflow file upload + from models.processors import LangflowFileProcessor + processor = LangflowFileProcessor( + langflow_file_service=langflow_file_service, + jwt_token=jwt_token, + ) + + # Create task for file upload + logger.debug("Creating task for langflow file upload") + task_id = await task_service.create_custom_task( + user_id, [file_tuple], processor + ) + + logger.debug("Task created successfully", task_id=task_id) + return JSONResponse({ + "task_id": task_id, + "total_files": 1, + "status": "accepted" + }, status_code=201) + except Exception as e: logger.error( "upload_user_file endpoint failed", diff --git a/src/main.py b/src/main.py index 285e94ed..78fbdc64 100644 --- a/src/main.py +++ b/src/main.py @@ -381,6 +381,7 @@ async def create_app(): partial( langflow_files.upload_user_file, langflow_file_service=services["langflow_file_service"], + task_service=services["task_service"], session_manager=services["session_manager"], ) ), diff --git a/src/models/processors.py b/src/models/processors.py index 02836020..f563f4ad 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -198,6 +198,44 @@ class LangflowConnectorFileProcessor(TaskProcessor): upload_task.successful_files += 1 +class LangflowFileProcessor(TaskProcessor): + """Processor for files uploaded via Langflow API""" + + def __init__( + self, + langflow_file_service, + jwt_token: str = None, + ): + self.langflow_file_service = langflow_file_service + self.jwt_token = jwt_token + + async def process_item( + self, upload_task: UploadTask, item: tuple, file_task: FileTask + ) -> None: + """Process a file upload via Langflow API""" + from models.tasks import TaskStatus + import time + + file_task.status = TaskStatus.RUNNING + file_task.updated_at = time.time() + + try: + # item is the file_tuple: (filename, content, content_type) + result = await self.langflow_file_service.upload_user_file(item, self.jwt_token) + + file_task.status = TaskStatus.COMPLETED + file_task.result = result + upload_task.successful_files += 1 + + except Exception as e: + logger.error("Failed to process langflow file upload", error=str(e)) + file_task.status = TaskStatus.FAILED + file_task.error = str(e) + upload_task.failed_files += 1 + finally: + file_task.updated_at = time.time() + + class S3FileProcessor(TaskProcessor): """Processor for files stored in S3 buckets"""