Compare commits
1 commit
main
...
fix/tasks_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
42a18bb0e9 |
2 changed files with 60 additions and 48 deletions
|
|
@ -13,27 +13,27 @@ logger = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
async def upload_ingest_router(
|
async def upload_ingest_router(
|
||||||
request: Request,
|
request: Request,
|
||||||
document_service=None,
|
document_service=None,
|
||||||
langflow_file_service=None,
|
langflow_file_service=None,
|
||||||
session_manager=None,
|
session_manager=None,
|
||||||
task_service=None
|
task_service=None,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Router endpoint that automatically routes upload requests based on configuration.
|
Router endpoint that automatically routes upload requests based on configuration.
|
||||||
|
|
||||||
- If DISABLE_INGEST_WITH_LANGFLOW is True: uses traditional OpenRAG upload (/upload)
|
- If DISABLE_INGEST_WITH_LANGFLOW is True: uses traditional OpenRAG upload (/upload)
|
||||||
- If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses Langflow upload-ingest via task service
|
- If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses Langflow upload-ingest via task service
|
||||||
|
|
||||||
This provides a single endpoint that users can call regardless of backend configuration.
|
This provides a single endpoint that users can call regardless of backend configuration.
|
||||||
All langflow uploads are processed as background tasks for better scalability.
|
All langflow uploads are processed as background tasks for better scalability.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Router upload_ingest endpoint called",
|
"Router upload_ingest endpoint called",
|
||||||
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW
|
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Route based on configuration
|
# Route based on configuration
|
||||||
if DISABLE_INGEST_WITH_LANGFLOW:
|
if DISABLE_INGEST_WITH_LANGFLOW:
|
||||||
# Route to traditional OpenRAG upload
|
# Route to traditional OpenRAG upload
|
||||||
|
|
@ -42,8 +42,10 @@ async def upload_ingest_router(
|
||||||
else:
|
else:
|
||||||
# Route to Langflow upload and ingest using task service
|
# Route to Langflow upload and ingest using task service
|
||||||
logger.debug("Routing to Langflow upload-ingest pipeline via task service")
|
logger.debug("Routing to Langflow upload-ingest pipeline via task service")
|
||||||
return await langflow_upload_ingest_task(request, langflow_file_service, session_manager, task_service)
|
return await langflow_upload_ingest_task(
|
||||||
|
request, langflow_file_service, session_manager, task_service
|
||||||
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Error in upload_ingest_router", error=str(e))
|
logger.error("Error in upload_ingest_router", error=str(e))
|
||||||
error_msg = str(e)
|
error_msg = str(e)
|
||||||
|
|
@ -57,17 +59,14 @@ async def upload_ingest_router(
|
||||||
|
|
||||||
|
|
||||||
async def langflow_upload_ingest_task(
|
async def langflow_upload_ingest_task(
|
||||||
request: Request,
|
request: Request, langflow_file_service, session_manager, task_service
|
||||||
langflow_file_service,
|
|
||||||
session_manager,
|
|
||||||
task_service
|
|
||||||
):
|
):
|
||||||
"""Task-based langflow upload and ingest for single/multiple files"""
|
"""Task-based langflow upload and ingest for single/multiple files"""
|
||||||
try:
|
try:
|
||||||
logger.debug("Task-based langflow upload_ingest endpoint called")
|
logger.debug("Task-based langflow upload_ingest endpoint called")
|
||||||
form = await request.form()
|
form = await request.form()
|
||||||
upload_files = form.getlist("file")
|
upload_files = form.getlist("file")
|
||||||
|
|
||||||
if not upload_files or len(upload_files) == 0:
|
if not upload_files or len(upload_files) == 0:
|
||||||
logger.error("No files provided in task-based upload request")
|
logger.error("No files provided in task-based upload request")
|
||||||
return JSONResponse({"error": "Missing files"}, status_code=400)
|
return JSONResponse({"error": "Missing files"}, status_code=400)
|
||||||
|
|
@ -82,10 +81,11 @@ async def langflow_upload_ingest_task(
|
||||||
# Parse JSON fields if provided
|
# Parse JSON fields if provided
|
||||||
settings = None
|
settings = None
|
||||||
tweaks = None
|
tweaks = None
|
||||||
|
|
||||||
if settings_json:
|
if settings_json:
|
||||||
try:
|
try:
|
||||||
import json
|
import json
|
||||||
|
|
||||||
settings = json.loads(settings_json)
|
settings = json.loads(settings_json)
|
||||||
except json.JSONDecodeError as e:
|
except json.JSONDecodeError as e:
|
||||||
logger.error("Invalid settings JSON", error=str(e))
|
logger.error("Invalid settings JSON", error=str(e))
|
||||||
|
|
@ -94,6 +94,7 @@ async def langflow_upload_ingest_task(
|
||||||
if tweaks_json:
|
if tweaks_json:
|
||||||
try:
|
try:
|
||||||
import json
|
import json
|
||||||
|
|
||||||
tweaks = json.loads(tweaks_json)
|
tweaks = json.loads(tweaks_json)
|
||||||
except json.JSONDecodeError as e:
|
except json.JSONDecodeError as e:
|
||||||
logger.error("Invalid tweaks JSON", error=str(e))
|
logger.error("Invalid tweaks JSON", error=str(e))
|
||||||
|
|
@ -107,11 +108,14 @@ async def langflow_upload_ingest_task(
|
||||||
jwt_token = getattr(request.state, "jwt_token", None)
|
jwt_token = getattr(request.state, "jwt_token", None)
|
||||||
|
|
||||||
if not user_id:
|
if not user_id:
|
||||||
return JSONResponse({"error": "User authentication required"}, status_code=401)
|
return JSONResponse(
|
||||||
|
{"error": "User authentication required"}, status_code=401
|
||||||
|
)
|
||||||
|
|
||||||
# Create temporary files for task processing
|
# Create temporary files for task processing
|
||||||
import tempfile
|
import tempfile
|
||||||
import os
|
import os
|
||||||
|
|
||||||
temp_file_paths = []
|
temp_file_paths = []
|
||||||
original_filenames = []
|
original_filenames = []
|
||||||
|
|
||||||
|
|
@ -132,7 +136,7 @@ async def langflow_upload_ingest_task(
|
||||||
temp_path = os.path.join(temp_dir, safe_filename)
|
temp_path = os.path.join(temp_dir, safe_filename)
|
||||||
|
|
||||||
# Write content to temp file
|
# Write content to temp file
|
||||||
with open(temp_path, 'wb') as temp_file:
|
with open(temp_path, "wb") as temp_file:
|
||||||
temp_file.write(content)
|
temp_file.write(content)
|
||||||
|
|
||||||
temp_file_paths.append(temp_path)
|
temp_file_paths.append(temp_path)
|
||||||
|
|
@ -143,22 +147,22 @@ async def langflow_upload_ingest_task(
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
has_settings=bool(settings),
|
has_settings=bool(settings),
|
||||||
has_tweaks=bool(tweaks),
|
has_tweaks=bool(tweaks),
|
||||||
delete_after_ingest=delete_after_ingest
|
delete_after_ingest=delete_after_ingest,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create langflow upload task
|
# Create langflow upload task
|
||||||
print(f"tweaks: {tweaks}")
|
logger.debug(
|
||||||
print(f"settings: {settings}")
|
f"Preparing to create langflow upload task: tweaks={tweaks}, settings={settings}, jwt_token={jwt_token}, user_name={user_name}, user_email={user_email}, session_id={session_id}, delete_after_ingest={delete_after_ingest}, temp_file_paths={temp_file_paths}",
|
||||||
print(f"jwt_token: {jwt_token}")
|
)
|
||||||
print(f"user_name: {user_name}")
|
# Create a map between temp_file_paths and original_filenames
|
||||||
print(f"user_email: {user_email}")
|
file_path_to_original_filename = dict(zip(temp_file_paths, original_filenames))
|
||||||
print(f"session_id: {session_id}")
|
logger.debug(
|
||||||
print(f"delete_after_ingest: {delete_after_ingest}")
|
f"File path to original filename map: {file_path_to_original_filename}",
|
||||||
print(f"temp_file_paths: {temp_file_paths}")
|
)
|
||||||
task_id = await task_service.create_langflow_upload_task(
|
task_id = await task_service.create_langflow_upload_task(
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
file_paths=temp_file_paths,
|
file_paths=temp_file_paths,
|
||||||
original_filenames=original_filenames,
|
original_filenames=file_path_to_original_filename,
|
||||||
langflow_file_service=langflow_file_service,
|
langflow_file_service=langflow_file_service,
|
||||||
session_manager=session_manager,
|
session_manager=session_manager,
|
||||||
jwt_token=jwt_token,
|
jwt_token=jwt_token,
|
||||||
|
|
@ -172,20 +176,24 @@ async def langflow_upload_ingest_task(
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.debug("Langflow upload task created successfully", task_id=task_id)
|
logger.debug("Langflow upload task created successfully", task_id=task_id)
|
||||||
|
|
||||||
return JSONResponse({
|
return JSONResponse(
|
||||||
"task_id": task_id,
|
{
|
||||||
"message": f"Langflow upload task created for {len(upload_files)} file(s)",
|
"task_id": task_id,
|
||||||
"file_count": len(upload_files)
|
"message": f"Langflow upload task created for {len(upload_files)} file(s)",
|
||||||
}, status_code=202) # 202 Accepted for async processing
|
"file_count": len(upload_files),
|
||||||
|
},
|
||||||
|
status_code=202,
|
||||||
|
) # 202 Accepted for async processing
|
||||||
|
|
||||||
except Exception:
|
except Exception:
|
||||||
# Clean up temp files on error
|
# Clean up temp files on error
|
||||||
from utils.file_utils import safe_unlink
|
from utils.file_utils import safe_unlink
|
||||||
|
|
||||||
for temp_path in temp_file_paths:
|
for temp_path in temp_file_paths:
|
||||||
safe_unlink(temp_path)
|
safe_unlink(temp_path)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(
|
logger.error(
|
||||||
"Task-based langflow upload_ingest endpoint failed",
|
"Task-based langflow upload_ingest endpoint failed",
|
||||||
|
|
@ -193,5 +201,6 @@ async def langflow_upload_ingest_task(
|
||||||
error=str(e),
|
error=str(e),
|
||||||
)
|
)
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
logger.error("Full traceback", traceback=traceback.format_exc())
|
logger.error("Full traceback", traceback=traceback.format_exc())
|
||||||
return JSONResponse({"error": str(e)}, status_code=500)
|
return JSONResponse({"error": str(e)}, status_code=500)
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,5 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import random
|
import random
|
||||||
from typing import Dict, Optional
|
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
|
|
@ -59,7 +58,7 @@ class TaskService:
|
||||||
file_paths: list,
|
file_paths: list,
|
||||||
langflow_file_service,
|
langflow_file_service,
|
||||||
session_manager,
|
session_manager,
|
||||||
original_filenames: list = None,
|
original_filenames: dict | None = None,
|
||||||
jwt_token: str = None,
|
jwt_token: str = None,
|
||||||
owner_name: str = None,
|
owner_name: str = None,
|
||||||
owner_email: str = None,
|
owner_email: str = None,
|
||||||
|
|
@ -88,7 +87,7 @@ class TaskService:
|
||||||
)
|
)
|
||||||
return await self.create_custom_task(user_id, file_paths, processor, original_filenames)
|
return await self.create_custom_task(user_id, file_paths, processor, original_filenames)
|
||||||
|
|
||||||
async def create_custom_task(self, user_id: str, items: list, processor, original_filenames: list = None) -> str:
|
async def create_custom_task(self, user_id: str, items: list, processor, original_filenames: dict | None = None) -> str:
|
||||||
"""Create a new task with custom processor for any type of items"""
|
"""Create a new task with custom processor for any type of items"""
|
||||||
import os
|
import os
|
||||||
# Store anonymous tasks under a stable key so they can be retrieved later
|
# Store anonymous tasks under a stable key so they can be retrieved later
|
||||||
|
|
@ -96,14 +95,18 @@ class TaskService:
|
||||||
task_id = str(uuid.uuid4())
|
task_id = str(uuid.uuid4())
|
||||||
|
|
||||||
# Create file tasks with original filenames if provided
|
# Create file tasks with original filenames if provided
|
||||||
file_tasks = {}
|
normalized_originals = (
|
||||||
for i, item in enumerate(items):
|
{str(k): v for k, v in original_filenames.items()} if original_filenames else {}
|
||||||
if original_filenames and i < len(original_filenames):
|
)
|
||||||
filename = original_filenames[i]
|
file_tasks = {
|
||||||
else:
|
str(item): FileTask(
|
||||||
filename = os.path.basename(str(item))
|
file_path=str(item),
|
||||||
|
filename=normalized_originals.get(
|
||||||
file_tasks[str(item)] = FileTask(file_path=str(item), filename=filename)
|
str(item), os.path.basename(str(item))
|
||||||
|
),
|
||||||
|
)
|
||||||
|
for item in items
|
||||||
|
}
|
||||||
|
|
||||||
upload_task = UploadTask(
|
upload_task = UploadTask(
|
||||||
task_id=task_id,
|
task_id=task_id,
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue