From 5ea70cc7797b652eb829617d466d18e706a8adae Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Fri, 3 Oct 2025 16:17:05 -0400 Subject: [PATCH] Refactor file upload task to use filename mapping (#195) Changed the handling of original filenames in Langflow upload tasks to use a mapping from file paths to original filenames instead of a list. Updated both the API router and TaskService to support this change, improving reliability when associating uploaded files with their original names. --- src/api/router.py | 83 ++++++++++++++++++++---------------- src/services/task_service.py | 25 ++++++----- 2 files changed, 60 insertions(+), 48 deletions(-) diff --git a/src/api/router.py b/src/api/router.py index 327757be..15a9b116 100644 --- a/src/api/router.py +++ b/src/api/router.py @@ -13,27 +13,27 @@ logger = get_logger(__name__) async def upload_ingest_router( - request: Request, - document_service=None, - langflow_file_service=None, + request: Request, + document_service=None, + langflow_file_service=None, session_manager=None, - task_service=None + task_service=None, ): """ Router endpoint that automatically routes upload requests based on configuration. - + - If DISABLE_INGEST_WITH_LANGFLOW is True: uses traditional OpenRAG upload (/upload) - If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses Langflow upload-ingest via task service - + This provides a single endpoint that users can call regardless of backend configuration. All langflow uploads are processed as background tasks for better scalability. """ try: logger.debug( - "Router upload_ingest endpoint called", - disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW + "Router upload_ingest endpoint called", + disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW, ) - + # Route based on configuration if DISABLE_INGEST_WITH_LANGFLOW: # Route to traditional OpenRAG upload @@ -42,8 +42,10 @@ async def upload_ingest_router( else: # Route to Langflow upload and ingest using task service logger.debug("Routing to Langflow upload-ingest pipeline via task service") - return await langflow_upload_ingest_task(request, langflow_file_service, session_manager, task_service) - + return await langflow_upload_ingest_task( + request, langflow_file_service, session_manager, task_service + ) + except Exception as e: logger.error("Error in upload_ingest_router", error=str(e)) error_msg = str(e) @@ -57,17 +59,14 @@ async def upload_ingest_router( async def langflow_upload_ingest_task( - request: Request, - langflow_file_service, - session_manager, - task_service + request: Request, langflow_file_service, session_manager, task_service ): """Task-based langflow upload and ingest for single/multiple files""" try: logger.debug("Task-based langflow upload_ingest endpoint called") form = await request.form() upload_files = form.getlist("file") - + if not upload_files or len(upload_files) == 0: logger.error("No files provided in task-based upload request") return JSONResponse({"error": "Missing files"}, status_code=400) @@ -82,10 +81,11 @@ async def langflow_upload_ingest_task( # Parse JSON fields if provided settings = None tweaks = None - + if settings_json: try: import json + settings = json.loads(settings_json) except json.JSONDecodeError as e: logger.error("Invalid settings JSON", error=str(e)) @@ -94,6 +94,7 @@ async def langflow_upload_ingest_task( if tweaks_json: try: import json + tweaks = json.loads(tweaks_json) except json.JSONDecodeError as e: logger.error("Invalid tweaks JSON", error=str(e)) @@ -107,11 +108,14 @@ async def langflow_upload_ingest_task( jwt_token = getattr(request.state, "jwt_token", None) if not user_id: - return JSONResponse({"error": "User authentication required"}, status_code=401) + return JSONResponse( + {"error": "User authentication required"}, status_code=401 + ) # Create temporary files for task processing import tempfile import os + temp_file_paths = [] original_filenames = [] @@ -132,7 +136,7 @@ async def langflow_upload_ingest_task( temp_path = os.path.join(temp_dir, safe_filename) # Write content to temp file - with open(temp_path, 'wb') as temp_file: + with open(temp_path, "wb") as temp_file: temp_file.write(content) temp_file_paths.append(temp_path) @@ -143,22 +147,22 @@ async def langflow_upload_ingest_task( user_id=user_id, has_settings=bool(settings), has_tweaks=bool(tweaks), - delete_after_ingest=delete_after_ingest + delete_after_ingest=delete_after_ingest, ) # Create langflow upload task - print(f"tweaks: {tweaks}") - print(f"settings: {settings}") - print(f"jwt_token: {jwt_token}") - print(f"user_name: {user_name}") - print(f"user_email: {user_email}") - print(f"session_id: {session_id}") - print(f"delete_after_ingest: {delete_after_ingest}") - print(f"temp_file_paths: {temp_file_paths}") + logger.debug( + f"Preparing to create langflow upload task: tweaks={tweaks}, settings={settings}, jwt_token={jwt_token}, user_name={user_name}, user_email={user_email}, session_id={session_id}, delete_after_ingest={delete_after_ingest}, temp_file_paths={temp_file_paths}", + ) + # Create a map between temp_file_paths and original_filenames + file_path_to_original_filename = dict(zip(temp_file_paths, original_filenames)) + logger.debug( + f"File path to original filename map: {file_path_to_original_filename}", + ) task_id = await task_service.create_langflow_upload_task( user_id=user_id, file_paths=temp_file_paths, - original_filenames=original_filenames, + original_filenames=file_path_to_original_filename, langflow_file_service=langflow_file_service, session_manager=session_manager, jwt_token=jwt_token, @@ -172,20 +176,24 @@ async def langflow_upload_ingest_task( ) logger.debug("Langflow upload task created successfully", task_id=task_id) - - return JSONResponse({ - "task_id": task_id, - "message": f"Langflow upload task created for {len(upload_files)} file(s)", - "file_count": len(upload_files) - }, status_code=202) # 202 Accepted for async processing - + + return JSONResponse( + { + "task_id": task_id, + "message": f"Langflow upload task created for {len(upload_files)} file(s)", + "file_count": len(upload_files), + }, + status_code=202, + ) # 202 Accepted for async processing + except Exception: # Clean up temp files on error from utils.file_utils import safe_unlink + for temp_path in temp_file_paths: safe_unlink(temp_path) raise - + except Exception as e: logger.error( "Task-based langflow upload_ingest endpoint failed", @@ -193,5 +201,6 @@ async def langflow_upload_ingest_task( error=str(e), ) import traceback + logger.error("Full traceback", traceback=traceback.format_exc()) return JSONResponse({"error": str(e)}, status_code=500) diff --git a/src/services/task_service.py b/src/services/task_service.py index eb5825c0..735ad483 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -1,6 +1,5 @@ import asyncio import random -from typing import Dict, Optional import time import uuid @@ -59,7 +58,7 @@ class TaskService: file_paths: list, langflow_file_service, session_manager, - original_filenames: list = None, + original_filenames: dict | None = None, jwt_token: str = None, owner_name: str = None, owner_email: str = None, @@ -88,7 +87,7 @@ class TaskService: ) return await self.create_custom_task(user_id, file_paths, processor, original_filenames) - async def create_custom_task(self, user_id: str, items: list, processor, original_filenames: list = None) -> str: + async def create_custom_task(self, user_id: str, items: list, processor, original_filenames: dict | None = None) -> str: """Create a new task with custom processor for any type of items""" import os # Store anonymous tasks under a stable key so they can be retrieved later @@ -96,14 +95,18 @@ class TaskService: task_id = str(uuid.uuid4()) # Create file tasks with original filenames if provided - file_tasks = {} - for i, item in enumerate(items): - if original_filenames and i < len(original_filenames): - filename = original_filenames[i] - else: - filename = os.path.basename(str(item)) - - file_tasks[str(item)] = FileTask(file_path=str(item), filename=filename) + normalized_originals = ( + {str(k): v for k, v in original_filenames.items()} if original_filenames else {} + ) + file_tasks = { + str(item): FileTask( + file_path=str(item), + filename=normalized_originals.get( + str(item), os.path.basename(str(item)) + ), + ) + for item in items + } upload_task = UploadTask( task_id=task_id,