Refactor file upload task to use filename mapping (#195)
Changed the handling of original filenames in Langflow upload tasks to use a mapping from file paths to original filenames instead of a list. Updated both the API router and TaskService to support this change, improving reliability when associating uploaded files with their original names.
This commit is contained in:
parent
7201a914be
commit
5ea70cc779
2 changed files with 60 additions and 48 deletions
|
|
@ -13,27 +13,27 @@ logger = get_logger(__name__)
|
|||
|
||||
|
||||
async def upload_ingest_router(
|
||||
request: Request,
|
||||
document_service=None,
|
||||
langflow_file_service=None,
|
||||
request: Request,
|
||||
document_service=None,
|
||||
langflow_file_service=None,
|
||||
session_manager=None,
|
||||
task_service=None
|
||||
task_service=None,
|
||||
):
|
||||
"""
|
||||
Router endpoint that automatically routes upload requests based on configuration.
|
||||
|
||||
|
||||
- If DISABLE_INGEST_WITH_LANGFLOW is True: uses traditional OpenRAG upload (/upload)
|
||||
- If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses Langflow upload-ingest via task service
|
||||
|
||||
|
||||
This provides a single endpoint that users can call regardless of backend configuration.
|
||||
All langflow uploads are processed as background tasks for better scalability.
|
||||
"""
|
||||
try:
|
||||
logger.debug(
|
||||
"Router upload_ingest endpoint called",
|
||||
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW
|
||||
"Router upload_ingest endpoint called",
|
||||
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW,
|
||||
)
|
||||
|
||||
|
||||
# Route based on configuration
|
||||
if DISABLE_INGEST_WITH_LANGFLOW:
|
||||
# Route to traditional OpenRAG upload
|
||||
|
|
@ -42,8 +42,10 @@ async def upload_ingest_router(
|
|||
else:
|
||||
# Route to Langflow upload and ingest using task service
|
||||
logger.debug("Routing to Langflow upload-ingest pipeline via task service")
|
||||
return await langflow_upload_ingest_task(request, langflow_file_service, session_manager, task_service)
|
||||
|
||||
return await langflow_upload_ingest_task(
|
||||
request, langflow_file_service, session_manager, task_service
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error in upload_ingest_router", error=str(e))
|
||||
error_msg = str(e)
|
||||
|
|
@ -57,17 +59,14 @@ async def upload_ingest_router(
|
|||
|
||||
|
||||
async def langflow_upload_ingest_task(
|
||||
request: Request,
|
||||
langflow_file_service,
|
||||
session_manager,
|
||||
task_service
|
||||
request: Request, langflow_file_service, session_manager, task_service
|
||||
):
|
||||
"""Task-based langflow upload and ingest for single/multiple files"""
|
||||
try:
|
||||
logger.debug("Task-based langflow upload_ingest endpoint called")
|
||||
form = await request.form()
|
||||
upload_files = form.getlist("file")
|
||||
|
||||
|
||||
if not upload_files or len(upload_files) == 0:
|
||||
logger.error("No files provided in task-based upload request")
|
||||
return JSONResponse({"error": "Missing files"}, status_code=400)
|
||||
|
|
@ -82,10 +81,11 @@ async def langflow_upload_ingest_task(
|
|||
# Parse JSON fields if provided
|
||||
settings = None
|
||||
tweaks = None
|
||||
|
||||
|
||||
if settings_json:
|
||||
try:
|
||||
import json
|
||||
|
||||
settings = json.loads(settings_json)
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error("Invalid settings JSON", error=str(e))
|
||||
|
|
@ -94,6 +94,7 @@ async def langflow_upload_ingest_task(
|
|||
if tweaks_json:
|
||||
try:
|
||||
import json
|
||||
|
||||
tweaks = json.loads(tweaks_json)
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error("Invalid tweaks JSON", error=str(e))
|
||||
|
|
@ -107,11 +108,14 @@ async def langflow_upload_ingest_task(
|
|||
jwt_token = getattr(request.state, "jwt_token", None)
|
||||
|
||||
if not user_id:
|
||||
return JSONResponse({"error": "User authentication required"}, status_code=401)
|
||||
return JSONResponse(
|
||||
{"error": "User authentication required"}, status_code=401
|
||||
)
|
||||
|
||||
# Create temporary files for task processing
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
temp_file_paths = []
|
||||
original_filenames = []
|
||||
|
||||
|
|
@ -132,7 +136,7 @@ async def langflow_upload_ingest_task(
|
|||
temp_path = os.path.join(temp_dir, safe_filename)
|
||||
|
||||
# Write content to temp file
|
||||
with open(temp_path, 'wb') as temp_file:
|
||||
with open(temp_path, "wb") as temp_file:
|
||||
temp_file.write(content)
|
||||
|
||||
temp_file_paths.append(temp_path)
|
||||
|
|
@ -143,22 +147,22 @@ async def langflow_upload_ingest_task(
|
|||
user_id=user_id,
|
||||
has_settings=bool(settings),
|
||||
has_tweaks=bool(tweaks),
|
||||
delete_after_ingest=delete_after_ingest
|
||||
delete_after_ingest=delete_after_ingest,
|
||||
)
|
||||
|
||||
# Create langflow upload task
|
||||
print(f"tweaks: {tweaks}")
|
||||
print(f"settings: {settings}")
|
||||
print(f"jwt_token: {jwt_token}")
|
||||
print(f"user_name: {user_name}")
|
||||
print(f"user_email: {user_email}")
|
||||
print(f"session_id: {session_id}")
|
||||
print(f"delete_after_ingest: {delete_after_ingest}")
|
||||
print(f"temp_file_paths: {temp_file_paths}")
|
||||
logger.debug(
|
||||
f"Preparing to create langflow upload task: tweaks={tweaks}, settings={settings}, jwt_token={jwt_token}, user_name={user_name}, user_email={user_email}, session_id={session_id}, delete_after_ingest={delete_after_ingest}, temp_file_paths={temp_file_paths}",
|
||||
)
|
||||
# Create a map between temp_file_paths and original_filenames
|
||||
file_path_to_original_filename = dict(zip(temp_file_paths, original_filenames))
|
||||
logger.debug(
|
||||
f"File path to original filename map: {file_path_to_original_filename}",
|
||||
)
|
||||
task_id = await task_service.create_langflow_upload_task(
|
||||
user_id=user_id,
|
||||
file_paths=temp_file_paths,
|
||||
original_filenames=original_filenames,
|
||||
original_filenames=file_path_to_original_filename,
|
||||
langflow_file_service=langflow_file_service,
|
||||
session_manager=session_manager,
|
||||
jwt_token=jwt_token,
|
||||
|
|
@ -172,20 +176,24 @@ async def langflow_upload_ingest_task(
|
|||
)
|
||||
|
||||
logger.debug("Langflow upload task created successfully", task_id=task_id)
|
||||
|
||||
return JSONResponse({
|
||||
"task_id": task_id,
|
||||
"message": f"Langflow upload task created for {len(upload_files)} file(s)",
|
||||
"file_count": len(upload_files)
|
||||
}, status_code=202) # 202 Accepted for async processing
|
||||
|
||||
|
||||
return JSONResponse(
|
||||
{
|
||||
"task_id": task_id,
|
||||
"message": f"Langflow upload task created for {len(upload_files)} file(s)",
|
||||
"file_count": len(upload_files),
|
||||
},
|
||||
status_code=202,
|
||||
) # 202 Accepted for async processing
|
||||
|
||||
except Exception:
|
||||
# Clean up temp files on error
|
||||
from utils.file_utils import safe_unlink
|
||||
|
||||
for temp_path in temp_file_paths:
|
||||
safe_unlink(temp_path)
|
||||
raise
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Task-based langflow upload_ingest endpoint failed",
|
||||
|
|
@ -193,5 +201,6 @@ async def langflow_upload_ingest_task(
|
|||
error=str(e),
|
||||
)
|
||||
import traceback
|
||||
|
||||
logger.error("Full traceback", traceback=traceback.format_exc())
|
||||
return JSONResponse({"error": str(e)}, status_code=500)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
import asyncio
|
||||
import random
|
||||
from typing import Dict, Optional
|
||||
import time
|
||||
import uuid
|
||||
|
||||
|
|
@ -59,7 +58,7 @@ class TaskService:
|
|||
file_paths: list,
|
||||
langflow_file_service,
|
||||
session_manager,
|
||||
original_filenames: list = None,
|
||||
original_filenames: dict | None = None,
|
||||
jwt_token: str = None,
|
||||
owner_name: str = None,
|
||||
owner_email: str = None,
|
||||
|
|
@ -88,7 +87,7 @@ class TaskService:
|
|||
)
|
||||
return await self.create_custom_task(user_id, file_paths, processor, original_filenames)
|
||||
|
||||
async def create_custom_task(self, user_id: str, items: list, processor, original_filenames: list = None) -> str:
|
||||
async def create_custom_task(self, user_id: str, items: list, processor, original_filenames: dict | None = None) -> str:
|
||||
"""Create a new task with custom processor for any type of items"""
|
||||
import os
|
||||
# Store anonymous tasks under a stable key so they can be retrieved later
|
||||
|
|
@ -96,14 +95,18 @@ class TaskService:
|
|||
task_id = str(uuid.uuid4())
|
||||
|
||||
# Create file tasks with original filenames if provided
|
||||
file_tasks = {}
|
||||
for i, item in enumerate(items):
|
||||
if original_filenames and i < len(original_filenames):
|
||||
filename = original_filenames[i]
|
||||
else:
|
||||
filename = os.path.basename(str(item))
|
||||
|
||||
file_tasks[str(item)] = FileTask(file_path=str(item), filename=filename)
|
||||
normalized_originals = (
|
||||
{str(k): v for k, v in original_filenames.items()} if original_filenames else {}
|
||||
)
|
||||
file_tasks = {
|
||||
str(item): FileTask(
|
||||
file_path=str(item),
|
||||
filename=normalized_originals.get(
|
||||
str(item), os.path.basename(str(item))
|
||||
),
|
||||
)
|
||||
for item in items
|
||||
}
|
||||
|
||||
upload_task = UploadTask(
|
||||
task_id=task_id,
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue