Compare commits

...
Sign in to create a new pull request.

1 commit

Author SHA1 Message Date
Edwin Jose
42a18bb0e9 Refactor file upload task to use filename mapping
Changed the handling of original filenames in Langflow upload tasks to use a mapping from file paths to original filenames instead of a list. Updated both the API router and TaskService to support this change, improving reliability when associating uploaded files with their original names.
2025-10-03 15:53:11 -04:00
2 changed files with 60 additions and 48 deletions

View file

@ -13,27 +13,27 @@ logger = get_logger(__name__)
async def upload_ingest_router( async def upload_ingest_router(
request: Request, request: Request,
document_service=None, document_service=None,
langflow_file_service=None, langflow_file_service=None,
session_manager=None, session_manager=None,
task_service=None task_service=None,
): ):
""" """
Router endpoint that automatically routes upload requests based on configuration. Router endpoint that automatically routes upload requests based on configuration.
- If DISABLE_INGEST_WITH_LANGFLOW is True: uses traditional OpenRAG upload (/upload) - If DISABLE_INGEST_WITH_LANGFLOW is True: uses traditional OpenRAG upload (/upload)
- If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses Langflow upload-ingest via task service - If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses Langflow upload-ingest via task service
This provides a single endpoint that users can call regardless of backend configuration. This provides a single endpoint that users can call regardless of backend configuration.
All langflow uploads are processed as background tasks for better scalability. All langflow uploads are processed as background tasks for better scalability.
""" """
try: try:
logger.debug( logger.debug(
"Router upload_ingest endpoint called", "Router upload_ingest endpoint called",
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW,
) )
# Route based on configuration # Route based on configuration
if DISABLE_INGEST_WITH_LANGFLOW: if DISABLE_INGEST_WITH_LANGFLOW:
# Route to traditional OpenRAG upload # Route to traditional OpenRAG upload
@ -42,8 +42,10 @@ async def upload_ingest_router(
else: else:
# Route to Langflow upload and ingest using task service # Route to Langflow upload and ingest using task service
logger.debug("Routing to Langflow upload-ingest pipeline via task service") logger.debug("Routing to Langflow upload-ingest pipeline via task service")
return await langflow_upload_ingest_task(request, langflow_file_service, session_manager, task_service) return await langflow_upload_ingest_task(
request, langflow_file_service, session_manager, task_service
)
except Exception as e: except Exception as e:
logger.error("Error in upload_ingest_router", error=str(e)) logger.error("Error in upload_ingest_router", error=str(e))
error_msg = str(e) error_msg = str(e)
@ -57,17 +59,14 @@ async def upload_ingest_router(
async def langflow_upload_ingest_task( async def langflow_upload_ingest_task(
request: Request, request: Request, langflow_file_service, session_manager, task_service
langflow_file_service,
session_manager,
task_service
): ):
"""Task-based langflow upload and ingest for single/multiple files""" """Task-based langflow upload and ingest for single/multiple files"""
try: try:
logger.debug("Task-based langflow upload_ingest endpoint called") logger.debug("Task-based langflow upload_ingest endpoint called")
form = await request.form() form = await request.form()
upload_files = form.getlist("file") upload_files = form.getlist("file")
if not upload_files or len(upload_files) == 0: if not upload_files or len(upload_files) == 0:
logger.error("No files provided in task-based upload request") logger.error("No files provided in task-based upload request")
return JSONResponse({"error": "Missing files"}, status_code=400) return JSONResponse({"error": "Missing files"}, status_code=400)
@ -82,10 +81,11 @@ async def langflow_upload_ingest_task(
# Parse JSON fields if provided # Parse JSON fields if provided
settings = None settings = None
tweaks = None tweaks = None
if settings_json: if settings_json:
try: try:
import json import json
settings = json.loads(settings_json) settings = json.loads(settings_json)
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
logger.error("Invalid settings JSON", error=str(e)) logger.error("Invalid settings JSON", error=str(e))
@ -94,6 +94,7 @@ async def langflow_upload_ingest_task(
if tweaks_json: if tweaks_json:
try: try:
import json import json
tweaks = json.loads(tweaks_json) tweaks = json.loads(tweaks_json)
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
logger.error("Invalid tweaks JSON", error=str(e)) logger.error("Invalid tweaks JSON", error=str(e))
@ -107,11 +108,14 @@ async def langflow_upload_ingest_task(
jwt_token = getattr(request.state, "jwt_token", None) jwt_token = getattr(request.state, "jwt_token", None)
if not user_id: if not user_id:
return JSONResponse({"error": "User authentication required"}, status_code=401) return JSONResponse(
{"error": "User authentication required"}, status_code=401
)
# Create temporary files for task processing # Create temporary files for task processing
import tempfile import tempfile
import os import os
temp_file_paths = [] temp_file_paths = []
original_filenames = [] original_filenames = []
@ -132,7 +136,7 @@ async def langflow_upload_ingest_task(
temp_path = os.path.join(temp_dir, safe_filename) temp_path = os.path.join(temp_dir, safe_filename)
# Write content to temp file # Write content to temp file
with open(temp_path, 'wb') as temp_file: with open(temp_path, "wb") as temp_file:
temp_file.write(content) temp_file.write(content)
temp_file_paths.append(temp_path) temp_file_paths.append(temp_path)
@ -143,22 +147,22 @@ async def langflow_upload_ingest_task(
user_id=user_id, user_id=user_id,
has_settings=bool(settings), has_settings=bool(settings),
has_tweaks=bool(tweaks), has_tweaks=bool(tweaks),
delete_after_ingest=delete_after_ingest delete_after_ingest=delete_after_ingest,
) )
# Create langflow upload task # Create langflow upload task
print(f"tweaks: {tweaks}") logger.debug(
print(f"settings: {settings}") f"Preparing to create langflow upload task: tweaks={tweaks}, settings={settings}, jwt_token={jwt_token}, user_name={user_name}, user_email={user_email}, session_id={session_id}, delete_after_ingest={delete_after_ingest}, temp_file_paths={temp_file_paths}",
print(f"jwt_token: {jwt_token}") )
print(f"user_name: {user_name}") # Create a map between temp_file_paths and original_filenames
print(f"user_email: {user_email}") file_path_to_original_filename = dict(zip(temp_file_paths, original_filenames))
print(f"session_id: {session_id}") logger.debug(
print(f"delete_after_ingest: {delete_after_ingest}") f"File path to original filename map: {file_path_to_original_filename}",
print(f"temp_file_paths: {temp_file_paths}") )
task_id = await task_service.create_langflow_upload_task( task_id = await task_service.create_langflow_upload_task(
user_id=user_id, user_id=user_id,
file_paths=temp_file_paths, file_paths=temp_file_paths,
original_filenames=original_filenames, original_filenames=file_path_to_original_filename,
langflow_file_service=langflow_file_service, langflow_file_service=langflow_file_service,
session_manager=session_manager, session_manager=session_manager,
jwt_token=jwt_token, jwt_token=jwt_token,
@ -172,20 +176,24 @@ async def langflow_upload_ingest_task(
) )
logger.debug("Langflow upload task created successfully", task_id=task_id) logger.debug("Langflow upload task created successfully", task_id=task_id)
return JSONResponse({ return JSONResponse(
"task_id": task_id, {
"message": f"Langflow upload task created for {len(upload_files)} file(s)", "task_id": task_id,
"file_count": len(upload_files) "message": f"Langflow upload task created for {len(upload_files)} file(s)",
}, status_code=202) # 202 Accepted for async processing "file_count": len(upload_files),
},
status_code=202,
) # 202 Accepted for async processing
except Exception: except Exception:
# Clean up temp files on error # Clean up temp files on error
from utils.file_utils import safe_unlink from utils.file_utils import safe_unlink
for temp_path in temp_file_paths: for temp_path in temp_file_paths:
safe_unlink(temp_path) safe_unlink(temp_path)
raise raise
except Exception as e: except Exception as e:
logger.error( logger.error(
"Task-based langflow upload_ingest endpoint failed", "Task-based langflow upload_ingest endpoint failed",
@ -193,5 +201,6 @@ async def langflow_upload_ingest_task(
error=str(e), error=str(e),
) )
import traceback import traceback
logger.error("Full traceback", traceback=traceback.format_exc()) logger.error("Full traceback", traceback=traceback.format_exc())
return JSONResponse({"error": str(e)}, status_code=500) return JSONResponse({"error": str(e)}, status_code=500)

View file

@ -1,6 +1,5 @@
import asyncio import asyncio
import random import random
from typing import Dict, Optional
import time import time
import uuid import uuid
@ -59,7 +58,7 @@ class TaskService:
file_paths: list, file_paths: list,
langflow_file_service, langflow_file_service,
session_manager, session_manager,
original_filenames: list = None, original_filenames: dict | None = None,
jwt_token: str = None, jwt_token: str = None,
owner_name: str = None, owner_name: str = None,
owner_email: str = None, owner_email: str = None,
@ -88,7 +87,7 @@ class TaskService:
) )
return await self.create_custom_task(user_id, file_paths, processor, original_filenames) return await self.create_custom_task(user_id, file_paths, processor, original_filenames)
async def create_custom_task(self, user_id: str, items: list, processor, original_filenames: list = None) -> str: async def create_custom_task(self, user_id: str, items: list, processor, original_filenames: dict | None = None) -> str:
"""Create a new task with custom processor for any type of items""" """Create a new task with custom processor for any type of items"""
import os import os
# Store anonymous tasks under a stable key so they can be retrieved later # Store anonymous tasks under a stable key so they can be retrieved later
@ -96,14 +95,18 @@ class TaskService:
task_id = str(uuid.uuid4()) task_id = str(uuid.uuid4())
# Create file tasks with original filenames if provided # Create file tasks with original filenames if provided
file_tasks = {} normalized_originals = (
for i, item in enumerate(items): {str(k): v for k, v in original_filenames.items()} if original_filenames else {}
if original_filenames and i < len(original_filenames): )
filename = original_filenames[i] file_tasks = {
else: str(item): FileTask(
filename = os.path.basename(str(item)) file_path=str(item),
filename=normalized_originals.get(
file_tasks[str(item)] = FileTask(file_path=str(item), filename=filename) str(item), os.path.basename(str(item))
),
)
for item in items
}
upload_task = UploadTask( upload_task = UploadTask(
task_id=task_id, task_id=task_id,