add langflow routers with langflow file processor

This commit is contained in:
Edwin Jose 2025-09-09 03:20:21 -04:00
parent faac0c912e
commit 8dff280930
5 changed files with 378 additions and 38 deletions

View file

@ -128,11 +128,11 @@ async def run_ingestion(
async def upload_and_ingest_user_file(
request: Request, langflow_file_service: LangflowFileService, session_manager
request: Request, langflow_file_service: LangflowFileService, session_manager, task_service
):
"""Combined upload and ingest endpoint - uploads file then runs ingestion"""
"""Combined upload and ingest endpoint - uses task service for tracking and cancellation"""
try:
logger.debug("upload_and_ingest_user_file endpoint called")
logger.debug("upload_and_ingest_user_file endpoint called - using task service")
form = await request.form()
upload_file = form.get("file")
if upload_file is None:
@ -165,39 +165,78 @@ async def upload_and_ingest_user_file(
logger.error("Invalid tweaks JSON", error=str(e))
return JSONResponse({"error": "Invalid tweaks JSON"}, status_code=400)
# Get user info from request state
user = getattr(request.state, "user", None)
user_id = user.user_id if user else None
user_name = user.name if user else None
user_email = user.email if user else None
jwt_token = getattr(request.state, "jwt_token", None)
if not user_id:
return JSONResponse({"error": "User authentication required"}, status_code=401)
logger.debug(
"Processing file for combined upload and ingest",
"Processing file for task-based upload and ingest",
filename=upload_file.filename,
size=upload_file.size,
session_id=session_id,
has_settings=bool(settings),
has_tweaks=bool(tweaks),
delete_after_ingest=delete_after_ingest
delete_after_ingest=delete_after_ingest,
user_id=user_id
)
# Prepare file tuple for upload
# Create temporary file for task processing
import tempfile
import os
# Read file content
content = await upload_file.read()
file_tuple = (
upload_file.filename,
content,
upload_file.content_type or "application/octet-stream",
)
jwt_token = getattr(request.state, "jwt_token", None)
logger.debug("JWT token status", jwt_present=jwt_token is not None)
logger.debug("Calling langflow_file_service.upload_and_ingest_file")
result = await langflow_file_service.upload_and_ingest_file(
file_tuple=file_tuple,
session_id=session_id,
tweaks=tweaks,
settings=settings,
jwt_token=jwt_token,
delete_after_ingest=delete_after_ingest
# Create temporary file
temp_fd, temp_path = tempfile.mkstemp(
suffix=f"_{upload_file.filename}",
prefix="langflow_upload_"
)
logger.debug("Upload and ingest successful", result=result)
return JSONResponse(result, status_code=201)
try:
# Write content to temp file
with os.fdopen(temp_fd, 'wb') as temp_file:
temp_file.write(content)
logger.debug("Created temporary file for task processing", temp_path=temp_path)
# Create langflow upload task for single file
task_id = await task_service.create_langflow_upload_task(
user_id=user_id,
file_paths=[temp_path],
langflow_file_service=langflow_file_service,
session_manager=session_manager,
jwt_token=jwt_token,
owner_name=user_name,
owner_email=user_email,
session_id=session_id,
tweaks=tweaks,
settings=settings,
delete_after_ingest=delete_after_ingest,
)
logger.debug("Langflow upload task created successfully", task_id=task_id)
return JSONResponse({
"task_id": task_id,
"message": f"Langflow upload task created for file '{upload_file.filename}'",
"filename": upload_file.filename
}, status_code=202) # 202 Accepted for async processing
except Exception:
# Clean up temp file on error
try:
if os.path.exists(temp_path):
os.unlink(temp_path)
except Exception:
pass # Ignore cleanup errors
raise
except Exception as e:
logger.error(

View file

@ -8,7 +8,6 @@ from utils.logging_config import get_logger
# Import the actual endpoint implementations
from .upload import upload as traditional_upload
from .langflow_files import upload_and_ingest_user_file as langflow_upload_ingest
logger = get_logger(__name__)
@ -17,15 +16,17 @@ async def upload_ingest_router(
request: Request,
document_service=None,
langflow_file_service=None,
session_manager=None
session_manager=None,
task_service=None
):
"""
Router endpoint that automatically routes upload requests based on configuration.
- If DISABLE_INGEST_WITH_LANGFLOW is True: uses traditional OpenRAG upload (/upload)
- If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses Langflow upload-ingest (/langflow/upload_ingest)
- If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses Langflow upload-ingest via task service
This provides a single endpoint that users can call regardless of backend configuration.
All langflow uploads are processed as background tasks for better scalability.
"""
try:
logger.debug(
@ -33,18 +34,15 @@ async def upload_ingest_router(
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW
)
# Route directly without task creation
# Note: Single file uploads are processed synchronously and don't need task tracking
# Tasks are only used for bulk operations (folders, S3 buckets, etc.)
# Route based on configuration
if DISABLE_INGEST_WITH_LANGFLOW:
# Route to traditional OpenRAG upload
logger.debug("Routing to traditional OpenRAG upload")
return await traditional_upload(request, document_service, session_manager)
else:
# Route to Langflow upload and ingest
logger.debug("Routing to Langflow upload-ingest pipeline")
return await langflow_upload_ingest(request, langflow_file_service, session_manager)
# Route to Langflow upload and ingest using task service
logger.debug("Routing to Langflow upload-ingest pipeline via task service")
return await langflow_upload_ingest_task(request, langflow_file_service, session_manager, task_service)
except Exception as e:
logger.error("Error in upload_ingest_router", error=str(e))
@ -56,3 +54,130 @@ async def upload_ingest_router(
return JSONResponse({"error": error_msg}, status_code=403)
else:
return JSONResponse({"error": error_msg}, status_code=500)
async def langflow_upload_ingest_task(
request: Request,
langflow_file_service,
session_manager,
task_service
):
"""Task-based langflow upload and ingest for single/multiple files"""
try:
logger.debug("Task-based langflow upload_ingest endpoint called")
form = await request.form()
upload_files = form.getlist("file")
if not upload_files or len(upload_files) == 0:
logger.error("No files provided in task-based upload request")
return JSONResponse({"error": "Missing files"}, status_code=400)
# Extract optional parameters
session_id = form.get("session_id")
settings_json = form.get("settings")
tweaks_json = form.get("tweaks")
delete_after_ingest = form.get("delete_after_ingest", "true").lower() == "true"
# Parse JSON fields if provided
settings = None
tweaks = None
if settings_json:
try:
import json
settings = json.loads(settings_json)
except json.JSONDecodeError as e:
logger.error("Invalid settings JSON", error=str(e))
return JSONResponse({"error": "Invalid settings JSON"}, status_code=400)
if tweaks_json:
try:
import json
tweaks = json.loads(tweaks_json)
except json.JSONDecodeError as e:
logger.error("Invalid tweaks JSON", error=str(e))
return JSONResponse({"error": "Invalid tweaks JSON"}, status_code=400)
# Get user info from request state
user = getattr(request.state, "user", None)
user_id = user.user_id if user else None
user_name = user.name if user else None
user_email = user.email if user else None
jwt_token = getattr(request.state, "jwt_token", None)
if not user_id:
return JSONResponse({"error": "User authentication required"}, status_code=401)
# Create temporary files for task processing
import tempfile
import os
temp_file_paths = []
try:
for upload_file in upload_files:
# Read file content
content = await upload_file.read()
# Create temporary file
temp_fd, temp_path = tempfile.mkstemp(
suffix=f"_{upload_file.filename}",
prefix="langflow_upload_"
)
# Write content to temp file
with os.fdopen(temp_fd, 'wb') as temp_file:
temp_file.write(content)
temp_file_paths.append(temp_path)
logger.debug(
"Created temporary files for task-based processing",
file_count=len(temp_file_paths),
user_id=user_id,
has_settings=bool(settings),
has_tweaks=bool(tweaks),
delete_after_ingest=delete_after_ingest
)
# Create langflow upload task
task_id = await task_service.create_langflow_upload_task(
user_id=user_id,
file_paths=temp_file_paths,
langflow_file_service=langflow_file_service,
session_manager=session_manager,
jwt_token=jwt_token,
owner_name=user_name,
owner_email=user_email,
session_id=session_id,
tweaks=tweaks,
settings=settings,
delete_after_ingest=delete_after_ingest,
)
logger.debug("Langflow upload task created successfully", task_id=task_id)
return JSONResponse({
"task_id": task_id,
"message": f"Langflow upload task created for {len(upload_files)} file(s)",
"file_count": len(upload_files)
}, status_code=202) # 202 Accepted for async processing
except Exception:
# Clean up temp files on error
for temp_path in temp_file_paths:
try:
if os.path.exists(temp_path):
os.unlink(temp_path)
except Exception:
pass # Ignore cleanup errors
raise
except Exception as e:
logger.error(
"Task-based langflow upload_ingest endpoint failed",
error_type=type(e).__name__,
error=str(e),
)
import traceback
logger.error("Full traceback", traceback=traceback.format_exc())
return JSONResponse({"error": str(e)}, status_code=500)

View file

@ -10,6 +10,7 @@ logger = get_logger(__name__)
import asyncio
import atexit
import mimetypes
import multiprocessing
import os
import subprocess
@ -278,6 +279,7 @@ async def ingest_default_documents_when_ready(services):
async def _ingest_default_documents_langflow(services, file_paths):
"""Ingest default documents using Langflow upload-ingest-delete pipeline."""
langflow_file_service = services["langflow_file_service"]
session_manager = services["session_manager"]
logger.info(
"Using Langflow ingestion pipeline for default documents",
@ -298,17 +300,49 @@ async def _ingest_default_documents_langflow(services, file_paths):
# Create file tuple for upload
filename = os.path.basename(file_path)
# Determine content type based on file extension
import mimetypes
content_type, _ = mimetypes.guess_type(filename)
if not content_type:
content_type = 'application/octet-stream'
file_tuple = (filename, content, content_type)
# Use langflow upload_and_ingest_file method
# Use AnonymousUser details for default documents
from session_manager import AnonymousUser
anonymous_user = AnonymousUser()
# Get JWT token using same logic as DocumentFileProcessor
# This will handle anonymous JWT creation if needed for anonymous user
effective_jwt = None
# Let session manager handle anonymous JWT creation if needed
if session_manager:
# This call will create anonymous JWT if needed (same as DocumentFileProcessor)
session_manager.get_user_opensearch_client(
anonymous_user.user_id, effective_jwt
)
# Get the JWT that was created by session manager
if hasattr(session_manager, '_anonymous_jwt'):
effective_jwt = session_manager._anonymous_jwt
# Prepare tweaks for default documents with anonymous user metadata
default_tweaks = {
"OpenSearchHybrid-Ve6bS": {
"docs_metadata": [
{"key": "owner", "value": None},
{"key": "owner_name", "value": anonymous_user.name},
{"key": "owner_email", "value": anonymous_user.email},
{"key": "connector_type", "value": "system_default"}
]
}
}
# Use langflow upload_and_ingest_file method with JWT token
result = await langflow_file_service.upload_and_ingest_file(
file_tuple=file_tuple,
jwt_token=None, # No auth for default documents
session_id=None, # No session for default documents
tweaks=default_tweaks, # Add anonymous user metadata
settings=None, # Use default ingestion settings
jwt_token=effective_jwt, # Use JWT token (anonymous if needed)
delete_after_ingest=True, # Clean up after ingestion
)
@ -511,6 +545,7 @@ async def create_app():
langflow_files.upload_and_ingest_user_file,
langflow_file_service=services["langflow_file_service"],
session_manager=services["session_manager"],
task_service=services["task_service"],
)
),
methods=["POST"],
@ -906,6 +941,7 @@ async def create_app():
document_service=services["document_service"],
langflow_file_service=services["langflow_file_service"],
session_manager=services["session_manager"],
task_service=services["task_service"],
)
),
methods=["POST"],

View file

@ -323,3 +323,111 @@ class S3FileProcessor(TaskProcessor):
tmp.close()
os.remove(tmp.name)
file_task.updated_at = time.time()
class LangflowFileProcessor(TaskProcessor):
"""Processor for Langflow file uploads with upload and ingest"""
def __init__(
self,
langflow_file_service,
session_manager,
owner_user_id: str = None,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
session_id: str = None,
tweaks: dict = None,
settings: dict = None,
delete_after_ingest: bool = True,
):
self.langflow_file_service = langflow_file_service
self.session_manager = session_manager
self.owner_user_id = owner_user_id
self.jwt_token = jwt_token
self.owner_name = owner_name
self.owner_email = owner_email
self.session_id = session_id
self.tweaks = tweaks or {}
self.settings = settings
self.delete_after_ingest = delete_after_ingest
async def process_item(
self, upload_task: UploadTask, item: str, file_task: FileTask
) -> None:
"""Process a file path using LangflowFileService upload_and_ingest_file"""
import mimetypes
import os
from models.tasks import TaskStatus
import time
# Update task status
file_task.status = TaskStatus.RUNNING
file_task.updated_at = time.time()
try:
# Read file content
with open(item, 'rb') as f:
content = f.read()
# Create file tuple for upload
filename = os.path.basename(item)
content_type, _ = mimetypes.guess_type(filename)
if not content_type:
content_type = 'application/octet-stream'
file_tuple = (filename, content, content_type)
# Get JWT token using same logic as DocumentFileProcessor
# This will handle anonymous JWT creation if needed
effective_jwt = self.jwt_token
if self.session_manager and not effective_jwt:
# Let session manager handle anonymous JWT creation if needed
self.session_manager.get_user_opensearch_client(
self.owner_user_id, self.jwt_token
)
# The session manager would have created anonymous JWT if needed
# Get it from the session manager's internal state
if hasattr(self.session_manager, '_anonymous_jwt'):
effective_jwt = self.session_manager._anonymous_jwt
# Prepare metadata tweaks similar to API endpoint
final_tweaks = self.tweaks.copy() if self.tweaks else {}
metadata_tweaks = []
if self.owner_user_id:
metadata_tweaks.append({"key": "owner", "value": self.owner_user_id})
if self.owner_name:
metadata_tweaks.append({"key": "owner_name", "value": self.owner_name})
if self.owner_email:
metadata_tweaks.append({"key": "owner_email", "value": self.owner_email})
# Mark as local upload for connector_type
metadata_tweaks.append({"key": "connector_type", "value": "local"})
if metadata_tweaks:
# Initialize the OpenSearch component tweaks if not already present
if "OpenSearchHybrid-Ve6bS" not in final_tweaks:
final_tweaks["OpenSearchHybrid-Ve6bS"] = {}
final_tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks
# Process file using langflow service
result = await self.langflow_file_service.upload_and_ingest_file(
file_tuple=file_tuple,
session_id=self.session_id,
tweaks=final_tweaks,
settings=self.settings,
jwt_token=effective_jwt,
delete_after_ingest=self.delete_after_ingest
)
# Update task with success
file_task.status = TaskStatus.COMPLETED
file_task.result = result
file_task.updated_at = time.time()
except Exception as e:
# Update task with failure
file_task.status = TaskStatus.FAILED
file_task.error_message = str(e)
file_task.updated_at = time.time()
raise

View file

@ -51,6 +51,38 @@ class TaskService:
)
return await self.create_custom_task(user_id, file_paths, processor)
async def create_langflow_upload_task(
self,
user_id: str,
file_paths: list,
langflow_file_service,
session_manager,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
session_id: str = None,
tweaks: dict = None,
settings: dict = None,
delete_after_ingest: bool = True,
) -> str:
"""Create a new upload task for Langflow file processing with upload and ingest"""
# Use LangflowFileProcessor with user context
from models.processors import LangflowFileProcessor
processor = LangflowFileProcessor(
langflow_file_service=langflow_file_service,
session_manager=session_manager,
owner_user_id=user_id,
jwt_token=jwt_token,
owner_name=owner_name,
owner_email=owner_email,
session_id=session_id,
tweaks=tweaks,
settings=settings,
delete_after_ingest=delete_after_ingest,
)
return await self.create_custom_task(user_id, file_paths, processor)
async def create_custom_task(self, user_id: str, items: list, processor) -> str:
"""Create a new task with custom processor for any type of items"""
task_id = str(uuid.uuid4())