diff --git a/src/api/langflow_files.py b/src/api/langflow_files.py index f5ac3657..85b265cc 100644 --- a/src/api/langflow_files.py +++ b/src/api/langflow_files.py @@ -128,11 +128,11 @@ async def run_ingestion( async def upload_and_ingest_user_file( - request: Request, langflow_file_service: LangflowFileService, session_manager + request: Request, langflow_file_service: LangflowFileService, session_manager, task_service ): - """Combined upload and ingest endpoint - uploads file then runs ingestion""" + """Combined upload and ingest endpoint - uses task service for tracking and cancellation""" try: - logger.debug("upload_and_ingest_user_file endpoint called") + logger.debug("upload_and_ingest_user_file endpoint called - using task service") form = await request.form() upload_file = form.get("file") if upload_file is None: @@ -165,39 +165,78 @@ async def upload_and_ingest_user_file( logger.error("Invalid tweaks JSON", error=str(e)) return JSONResponse({"error": "Invalid tweaks JSON"}, status_code=400) + # Get user info from request state + user = getattr(request.state, "user", None) + user_id = user.user_id if user else None + user_name = user.name if user else None + user_email = user.email if user else None + jwt_token = getattr(request.state, "jwt_token", None) + + if not user_id: + return JSONResponse({"error": "User authentication required"}, status_code=401) + logger.debug( - "Processing file for combined upload and ingest", + "Processing file for task-based upload and ingest", filename=upload_file.filename, size=upload_file.size, session_id=session_id, has_settings=bool(settings), has_tweaks=bool(tweaks), - delete_after_ingest=delete_after_ingest + delete_after_ingest=delete_after_ingest, + user_id=user_id ) - # Prepare file tuple for upload + # Create temporary file for task processing + import tempfile + import os + + # Read file content content = await upload_file.read() - file_tuple = ( - upload_file.filename, - content, - upload_file.content_type or "application/octet-stream", - ) - - jwt_token = getattr(request.state, "jwt_token", None) - logger.debug("JWT token status", jwt_present=jwt_token is not None) - - logger.debug("Calling langflow_file_service.upload_and_ingest_file") - result = await langflow_file_service.upload_and_ingest_file( - file_tuple=file_tuple, - session_id=session_id, - tweaks=tweaks, - settings=settings, - jwt_token=jwt_token, - delete_after_ingest=delete_after_ingest + + # Create temporary file + temp_fd, temp_path = tempfile.mkstemp( + suffix=f"_{upload_file.filename}", + prefix="langflow_upload_" ) - logger.debug("Upload and ingest successful", result=result) - return JSONResponse(result, status_code=201) + try: + # Write content to temp file + with os.fdopen(temp_fd, 'wb') as temp_file: + temp_file.write(content) + + logger.debug("Created temporary file for task processing", temp_path=temp_path) + + # Create langflow upload task for single file + task_id = await task_service.create_langflow_upload_task( + user_id=user_id, + file_paths=[temp_path], + langflow_file_service=langflow_file_service, + session_manager=session_manager, + jwt_token=jwt_token, + owner_name=user_name, + owner_email=user_email, + session_id=session_id, + tweaks=tweaks, + settings=settings, + delete_after_ingest=delete_after_ingest, + ) + + logger.debug("Langflow upload task created successfully", task_id=task_id) + + return JSONResponse({ + "task_id": task_id, + "message": f"Langflow upload task created for file '{upload_file.filename}'", + "filename": upload_file.filename + }, status_code=202) # 202 Accepted for async processing + + except Exception: + # Clean up temp file on error + try: + if os.path.exists(temp_path): + os.unlink(temp_path) + except Exception: + pass # Ignore cleanup errors + raise except Exception as e: logger.error( diff --git a/src/api/router.py b/src/api/router.py index 518217ee..5472e738 100644 --- a/src/api/router.py +++ b/src/api/router.py @@ -8,7 +8,6 @@ from utils.logging_config import get_logger # Import the actual endpoint implementations from .upload import upload as traditional_upload -from .langflow_files import upload_and_ingest_user_file as langflow_upload_ingest logger = get_logger(__name__) @@ -17,15 +16,17 @@ async def upload_ingest_router( request: Request, document_service=None, langflow_file_service=None, - session_manager=None + session_manager=None, + task_service=None ): """ Router endpoint that automatically routes upload requests based on configuration. - If DISABLE_INGEST_WITH_LANGFLOW is True: uses traditional OpenRAG upload (/upload) - - If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses Langflow upload-ingest (/langflow/upload_ingest) + - If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses Langflow upload-ingest via task service This provides a single endpoint that users can call regardless of backend configuration. + All langflow uploads are processed as background tasks for better scalability. """ try: logger.debug( @@ -33,18 +34,15 @@ async def upload_ingest_router( disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW ) - # Route directly without task creation - # Note: Single file uploads are processed synchronously and don't need task tracking - # Tasks are only used for bulk operations (folders, S3 buckets, etc.) - + # Route based on configuration if DISABLE_INGEST_WITH_LANGFLOW: # Route to traditional OpenRAG upload logger.debug("Routing to traditional OpenRAG upload") return await traditional_upload(request, document_service, session_manager) else: - # Route to Langflow upload and ingest - logger.debug("Routing to Langflow upload-ingest pipeline") - return await langflow_upload_ingest(request, langflow_file_service, session_manager) + # Route to Langflow upload and ingest using task service + logger.debug("Routing to Langflow upload-ingest pipeline via task service") + return await langflow_upload_ingest_task(request, langflow_file_service, session_manager, task_service) except Exception as e: logger.error("Error in upload_ingest_router", error=str(e)) @@ -56,3 +54,130 @@ async def upload_ingest_router( return JSONResponse({"error": error_msg}, status_code=403) else: return JSONResponse({"error": error_msg}, status_code=500) + + +async def langflow_upload_ingest_task( + request: Request, + langflow_file_service, + session_manager, + task_service +): + """Task-based langflow upload and ingest for single/multiple files""" + try: + logger.debug("Task-based langflow upload_ingest endpoint called") + form = await request.form() + upload_files = form.getlist("file") + + if not upload_files or len(upload_files) == 0: + logger.error("No files provided in task-based upload request") + return JSONResponse({"error": "Missing files"}, status_code=400) + + # Extract optional parameters + session_id = form.get("session_id") + settings_json = form.get("settings") + tweaks_json = form.get("tweaks") + delete_after_ingest = form.get("delete_after_ingest", "true").lower() == "true" + + # Parse JSON fields if provided + settings = None + tweaks = None + + if settings_json: + try: + import json + settings = json.loads(settings_json) + except json.JSONDecodeError as e: + logger.error("Invalid settings JSON", error=str(e)) + return JSONResponse({"error": "Invalid settings JSON"}, status_code=400) + + if tweaks_json: + try: + import json + tweaks = json.loads(tweaks_json) + except json.JSONDecodeError as e: + logger.error("Invalid tweaks JSON", error=str(e)) + return JSONResponse({"error": "Invalid tweaks JSON"}, status_code=400) + + # Get user info from request state + user = getattr(request.state, "user", None) + user_id = user.user_id if user else None + user_name = user.name if user else None + user_email = user.email if user else None + jwt_token = getattr(request.state, "jwt_token", None) + + if not user_id: + return JSONResponse({"error": "User authentication required"}, status_code=401) + + # Create temporary files for task processing + import tempfile + import os + temp_file_paths = [] + + try: + for upload_file in upload_files: + # Read file content + content = await upload_file.read() + + # Create temporary file + temp_fd, temp_path = tempfile.mkstemp( + suffix=f"_{upload_file.filename}", + prefix="langflow_upload_" + ) + + # Write content to temp file + with os.fdopen(temp_fd, 'wb') as temp_file: + temp_file.write(content) + + temp_file_paths.append(temp_path) + + logger.debug( + "Created temporary files for task-based processing", + file_count=len(temp_file_paths), + user_id=user_id, + has_settings=bool(settings), + has_tweaks=bool(tweaks), + delete_after_ingest=delete_after_ingest + ) + + # Create langflow upload task + task_id = await task_service.create_langflow_upload_task( + user_id=user_id, + file_paths=temp_file_paths, + langflow_file_service=langflow_file_service, + session_manager=session_manager, + jwt_token=jwt_token, + owner_name=user_name, + owner_email=user_email, + session_id=session_id, + tweaks=tweaks, + settings=settings, + delete_after_ingest=delete_after_ingest, + ) + + logger.debug("Langflow upload task created successfully", task_id=task_id) + + return JSONResponse({ + "task_id": task_id, + "message": f"Langflow upload task created for {len(upload_files)} file(s)", + "file_count": len(upload_files) + }, status_code=202) # 202 Accepted for async processing + + except Exception: + # Clean up temp files on error + for temp_path in temp_file_paths: + try: + if os.path.exists(temp_path): + os.unlink(temp_path) + except Exception: + pass # Ignore cleanup errors + raise + + except Exception as e: + logger.error( + "Task-based langflow upload_ingest endpoint failed", + error_type=type(e).__name__, + error=str(e), + ) + import traceback + logger.error("Full traceback", traceback=traceback.format_exc()) + return JSONResponse({"error": str(e)}, status_code=500) diff --git a/src/main.py b/src/main.py index c941e53c..3b473e63 100644 --- a/src/main.py +++ b/src/main.py @@ -10,6 +10,7 @@ logger = get_logger(__name__) import asyncio import atexit +import mimetypes import multiprocessing import os import subprocess @@ -278,6 +279,7 @@ async def ingest_default_documents_when_ready(services): async def _ingest_default_documents_langflow(services, file_paths): """Ingest default documents using Langflow upload-ingest-delete pipeline.""" langflow_file_service = services["langflow_file_service"] + session_manager = services["session_manager"] logger.info( "Using Langflow ingestion pipeline for default documents", @@ -298,17 +300,49 @@ async def _ingest_default_documents_langflow(services, file_paths): # Create file tuple for upload filename = os.path.basename(file_path) # Determine content type based on file extension - import mimetypes content_type, _ = mimetypes.guess_type(filename) if not content_type: content_type = 'application/octet-stream' file_tuple = (filename, content, content_type) - # Use langflow upload_and_ingest_file method + # Use AnonymousUser details for default documents + from session_manager import AnonymousUser + anonymous_user = AnonymousUser() + + # Get JWT token using same logic as DocumentFileProcessor + # This will handle anonymous JWT creation if needed for anonymous user + effective_jwt = None + + # Let session manager handle anonymous JWT creation if needed + if session_manager: + # This call will create anonymous JWT if needed (same as DocumentFileProcessor) + session_manager.get_user_opensearch_client( + anonymous_user.user_id, effective_jwt + ) + # Get the JWT that was created by session manager + if hasattr(session_manager, '_anonymous_jwt'): + effective_jwt = session_manager._anonymous_jwt + + # Prepare tweaks for default documents with anonymous user metadata + default_tweaks = { + "OpenSearchHybrid-Ve6bS": { + "docs_metadata": [ + {"key": "owner", "value": None}, + {"key": "owner_name", "value": anonymous_user.name}, + {"key": "owner_email", "value": anonymous_user.email}, + {"key": "connector_type", "value": "system_default"} + ] + } + } + + # Use langflow upload_and_ingest_file method with JWT token result = await langflow_file_service.upload_and_ingest_file( file_tuple=file_tuple, - jwt_token=None, # No auth for default documents + session_id=None, # No session for default documents + tweaks=default_tweaks, # Add anonymous user metadata + settings=None, # Use default ingestion settings + jwt_token=effective_jwt, # Use JWT token (anonymous if needed) delete_after_ingest=True, # Clean up after ingestion ) @@ -511,6 +545,7 @@ async def create_app(): langflow_files.upload_and_ingest_user_file, langflow_file_service=services["langflow_file_service"], session_manager=services["session_manager"], + task_service=services["task_service"], ) ), methods=["POST"], @@ -906,6 +941,7 @@ async def create_app(): document_service=services["document_service"], langflow_file_service=services["langflow_file_service"], session_manager=services["session_manager"], + task_service=services["task_service"], ) ), methods=["POST"], diff --git a/src/models/processors.py b/src/models/processors.py index 02836020..0f011127 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -323,3 +323,111 @@ class S3FileProcessor(TaskProcessor): tmp.close() os.remove(tmp.name) file_task.updated_at = time.time() + + +class LangflowFileProcessor(TaskProcessor): + """Processor for Langflow file uploads with upload and ingest""" + + def __init__( + self, + langflow_file_service, + session_manager, + owner_user_id: str = None, + jwt_token: str = None, + owner_name: str = None, + owner_email: str = None, + session_id: str = None, + tweaks: dict = None, + settings: dict = None, + delete_after_ingest: bool = True, + ): + self.langflow_file_service = langflow_file_service + self.session_manager = session_manager + self.owner_user_id = owner_user_id + self.jwt_token = jwt_token + self.owner_name = owner_name + self.owner_email = owner_email + self.session_id = session_id + self.tweaks = tweaks or {} + self.settings = settings + self.delete_after_ingest = delete_after_ingest + + async def process_item( + self, upload_task: UploadTask, item: str, file_task: FileTask + ) -> None: + """Process a file path using LangflowFileService upload_and_ingest_file""" + import mimetypes + import os + from models.tasks import TaskStatus + import time + + # Update task status + file_task.status = TaskStatus.RUNNING + file_task.updated_at = time.time() + + try: + # Read file content + with open(item, 'rb') as f: + content = f.read() + + # Create file tuple for upload + filename = os.path.basename(item) + content_type, _ = mimetypes.guess_type(filename) + if not content_type: + content_type = 'application/octet-stream' + + file_tuple = (filename, content, content_type) + + # Get JWT token using same logic as DocumentFileProcessor + # This will handle anonymous JWT creation if needed + effective_jwt = self.jwt_token + if self.session_manager and not effective_jwt: + # Let session manager handle anonymous JWT creation if needed + self.session_manager.get_user_opensearch_client( + self.owner_user_id, self.jwt_token + ) + # The session manager would have created anonymous JWT if needed + # Get it from the session manager's internal state + if hasattr(self.session_manager, '_anonymous_jwt'): + effective_jwt = self.session_manager._anonymous_jwt + + # Prepare metadata tweaks similar to API endpoint + final_tweaks = self.tweaks.copy() if self.tweaks else {} + + metadata_tweaks = [] + if self.owner_user_id: + metadata_tweaks.append({"key": "owner", "value": self.owner_user_id}) + if self.owner_name: + metadata_tweaks.append({"key": "owner_name", "value": self.owner_name}) + if self.owner_email: + metadata_tweaks.append({"key": "owner_email", "value": self.owner_email}) + # Mark as local upload for connector_type + metadata_tweaks.append({"key": "connector_type", "value": "local"}) + + if metadata_tweaks: + # Initialize the OpenSearch component tweaks if not already present + if "OpenSearchHybrid-Ve6bS" not in final_tweaks: + final_tweaks["OpenSearchHybrid-Ve6bS"] = {} + final_tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks + + # Process file using langflow service + result = await self.langflow_file_service.upload_and_ingest_file( + file_tuple=file_tuple, + session_id=self.session_id, + tweaks=final_tweaks, + settings=self.settings, + jwt_token=effective_jwt, + delete_after_ingest=self.delete_after_ingest + ) + + # Update task with success + file_task.status = TaskStatus.COMPLETED + file_task.result = result + file_task.updated_at = time.time() + + except Exception as e: + # Update task with failure + file_task.status = TaskStatus.FAILED + file_task.error_message = str(e) + file_task.updated_at = time.time() + raise diff --git a/src/services/task_service.py b/src/services/task_service.py index ace36c73..f3d22234 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -51,6 +51,38 @@ class TaskService: ) return await self.create_custom_task(user_id, file_paths, processor) + async def create_langflow_upload_task( + self, + user_id: str, + file_paths: list, + langflow_file_service, + session_manager, + jwt_token: str = None, + owner_name: str = None, + owner_email: str = None, + session_id: str = None, + tweaks: dict = None, + settings: dict = None, + delete_after_ingest: bool = True, + ) -> str: + """Create a new upload task for Langflow file processing with upload and ingest""" + # Use LangflowFileProcessor with user context + from models.processors import LangflowFileProcessor + + processor = LangflowFileProcessor( + langflow_file_service=langflow_file_service, + session_manager=session_manager, + owner_user_id=user_id, + jwt_token=jwt_token, + owner_name=owner_name, + owner_email=owner_email, + session_id=session_id, + tweaks=tweaks, + settings=settings, + delete_after_ingest=delete_after_ingest, + ) + return await self.create_custom_task(user_id, file_paths, processor) + async def create_custom_task(self, user_id: str, items: list, processor) -> str: """Create a new task with custom processor for any type of items""" task_id = str(uuid.uuid4())