diff --git a/src/api/langflow_files.py b/src/api/langflow_files.py index a5595813..f53692a4 100644 --- a/src/api/langflow_files.py +++ b/src/api/langflow_files.py @@ -70,35 +70,33 @@ async def run_ingestion( if settings: logger.debug("Applying ingestion settings", settings=settings) - # Split Text component tweaks (SplitText-QIKhg) + # Split Text component tweaks if ( settings.get("chunkSize") or settings.get("chunkOverlap") or settings.get("separator") ): - if "SplitText-QIKhg" not in tweaks: - tweaks["SplitText-QIKhg"] = {} + if "Split Text" not in tweaks: + tweaks["Split Text"] = {} if settings.get("chunkSize"): - tweaks["SplitText-QIKhg"]["chunk_size"] = settings["chunkSize"] + tweaks["Split Text"]["chunk_size"] = settings["chunkSize"] if settings.get("chunkOverlap"): - tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[ - "chunkOverlap" - ] + tweaks["Split Text"]["chunk_overlap"] = settings["chunkOverlap"] if settings.get("separator"): - tweaks["SplitText-QIKhg"]["separator"] = settings["separator"] + tweaks["Split Text"]["separator"] = settings["separator"] - # OpenAI Embeddings component tweaks (OpenAIEmbeddings-joRJ6) + # OpenAI Embeddings component tweaks if settings.get("embeddingModel"): - if "OpenAIEmbeddings-joRJ6" not in tweaks: - tweaks["OpenAIEmbeddings-joRJ6"] = {} - tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"] + if "OpenAI Embeddings" not in tweaks: + tweaks["OpenAI Embeddings"] = {} + tweaks["OpenAI Embeddings"]["model"] = settings["embeddingModel"] # Note: OpenSearch component tweaks not needed for ingestion # (search parameters are for retrieval, not document processing) logger.debug("Final tweaks with settings applied", tweaks=tweaks) # Include user JWT if available - jwt_token = getattr(request.state, "jwt_token", None) + jwt_token: str | None = getattr(request.state, "jwt_token", None) # Extract user info from User object user = getattr(request.state, "user", None) @@ -128,7 +126,10 @@ async def run_ingestion( async def upload_and_ingest_user_file( - request: Request, langflow_file_service: LangflowFileService, session_manager, task_service + request: Request, + langflow_file_service: LangflowFileService, + session_manager, + task_service, ): """Combined upload and ingest endpoint - uses task service for tracking and cancellation""" try: @@ -148,10 +149,11 @@ async def upload_and_ingest_user_file( # Parse JSON fields if provided settings = None tweaks = None - + if settings_json: try: import json + settings = json.loads(settings_json) except json.JSONDecodeError as e: logger.error("Invalid settings JSON", error=str(e)) @@ -160,6 +162,7 @@ async def upload_and_ingest_user_file( if tweaks_json: try: import json + tweaks = json.loads(tweaks_json) except json.JSONDecodeError as e: logger.error("Invalid tweaks JSON", error=str(e)) @@ -173,7 +176,9 @@ async def upload_and_ingest_user_file( jwt_token = getattr(request.state, "jwt_token", None) if not user_id: - return JSONResponse({"error": "User authentication required"}, status_code=401) + return JSONResponse( + {"error": "User authentication required"}, status_code=401 + ) logger.debug( "Processing file for task-based upload and ingest", @@ -183,28 +188,28 @@ async def upload_and_ingest_user_file( has_settings=bool(settings), has_tweaks=bool(tweaks), delete_after_ingest=delete_after_ingest, - user_id=user_id + user_id=user_id, ) # Create temporary file for task processing - import tempfile import os - + import tempfile + # Read file content content = await upload_file.read() - + # Create temporary file safe_filename = upload_file.filename.replace(" ", "_").replace("/", "_") - temp_fd, temp_path = tempfile.mkstemp( - suffix=f"_{safe_filename}" - ) - + temp_fd, temp_path = tempfile.mkstemp(suffix=f"_{safe_filename}") + try: # Write content to temp file - with os.fdopen(temp_fd, 'wb') as temp_file: + with os.fdopen(temp_fd, "wb") as temp_file: temp_file.write(content) - logger.debug("Created temporary file for task processing", temp_path=temp_path) + logger.debug( + "Created temporary file for task processing", temp_path=temp_path + ) # Create langflow upload task for single file task_id = await task_service.create_langflow_upload_task( @@ -222,12 +227,15 @@ async def upload_and_ingest_user_file( ) logger.debug("Langflow upload task created successfully", task_id=task_id) - - return JSONResponse({ - "task_id": task_id, - "message": f"Langflow upload task created for file '{upload_file.filename}'", - "filename": upload_file.filename - }, status_code=202) # 202 Accepted for async processing + + return JSONResponse( + { + "task_id": task_id, + "message": f"Langflow upload task created for file '{upload_file.filename}'", + "filename": upload_file.filename, + }, + status_code=202, + ) # 202 Accepted for async processing except Exception: # Clean up temp file on error @@ -237,7 +245,7 @@ async def upload_and_ingest_user_file( except Exception: pass # Ignore cleanup errors raise - + except Exception as e: logger.error( "upload_and_ingest_user_file endpoint failed", @@ -245,6 +253,7 @@ async def upload_and_ingest_user_file( error=str(e), ) import traceback + logger.error("Full traceback", traceback=traceback.format_exc()) return JSONResponse({"error": str(e)}, status_code=500) diff --git a/src/models/processors.py b/src/models/processors.py index a817f8d4..b82e3fce 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -367,20 +367,22 @@ class LangflowFileProcessor(TaskProcessor): try: # Read file content - with open(item, 'rb') as f: + with open(item, "rb") as f: content = f.read() # Create file tuple for upload temp_filename = os.path.basename(item) # Extract original filename from temp file suffix (remove tmp prefix) if "_" in temp_filename: - filename = temp_filename.split("_", 1)[1] # Get everything after first _ + filename = temp_filename.split("_", 1)[ + 1 + ] # Get everything after first _ else: filename = temp_filename content_type, _ = mimetypes.guess_type(filename) if not content_type: - content_type = 'application/octet-stream' - + content_type = "application/octet-stream" + file_tuple = (filename, content, content_type) # Get JWT token using same logic as DocumentFileProcessor @@ -393,27 +395,29 @@ class LangflowFileProcessor(TaskProcessor): ) # The session manager would have created anonymous JWT if needed # Get it from the session manager's internal state - if hasattr(self.session_manager, '_anonymous_jwt'): + if hasattr(self.session_manager, "_anonymous_jwt"): effective_jwt = self.session_manager._anonymous_jwt # Prepare metadata tweaks similar to API endpoint final_tweaks = self.tweaks.copy() if self.tweaks else {} - + metadata_tweaks = [] if self.owner_user_id: metadata_tweaks.append({"key": "owner", "value": self.owner_user_id}) if self.owner_name: metadata_tweaks.append({"key": "owner_name", "value": self.owner_name}) if self.owner_email: - metadata_tweaks.append({"key": "owner_email", "value": self.owner_email}) + metadata_tweaks.append( + {"key": "owner_email", "value": self.owner_email} + ) # Mark as local upload for connector_type metadata_tweaks.append({"key": "connector_type", "value": "local"}) if metadata_tweaks: # Initialize the OpenSearch component tweaks if not already present - if "OpenSearchHybrid-Ve6bS" not in final_tweaks: - final_tweaks["OpenSearchHybrid-Ve6bS"] = {} - final_tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks + if "OpenSearch (Hybrid)" not in final_tweaks: + final_tweaks["OpenSearch (Hybrid)"] = {} + final_tweaks["OpenSearch (Hybrid)"]["docs_metadata"] = metadata_tweaks # Process file using langflow service result = await self.langflow_file_service.upload_and_ingest_file( @@ -422,7 +426,7 @@ class LangflowFileProcessor(TaskProcessor): tweaks=final_tweaks, settings=self.settings, jwt_token=effective_jwt, - delete_after_ingest=self.delete_after_ingest + delete_after_ingest=self.delete_after_ingest, ) # Update task with success diff --git a/src/services/langflow_file_service.py b/src/services/langflow_file_service.py index 132cd45e..bc522973 100644 --- a/src/services/langflow_file_service.py +++ b/src/services/langflow_file_service.py @@ -84,14 +84,13 @@ class LangflowFileService: if not tweaks: tweaks = {} - # Pass files via tweaks to File component (File-PSU37 from the flow) + # Pass files via tweaks to File component if file_paths: - tweaks["File-PSU37"] = {"path": file_paths} + tweaks["File"] = {"path": file_paths} - # Pass JWT token via tweaks using the x-langflow-global-var- pattern + # Pass JWT token via tweaks to OpenSearch component if jwt_token: - # Using the global variable pattern that Langflow expects for OpenSearch components - tweaks["OpenSearchHybrid-Ve6bS"] = {"jwt_token": jwt_token} + tweaks["OpenSearch (Hybrid)"] = {"jwt_token": jwt_token} logger.debug("[LF] Added JWT token to tweaks for OpenSearch components") else: logger.warning("[LF] No JWT token provided") @@ -109,9 +108,9 @@ class LangflowFileService: if metadata_tweaks: # Initialize the OpenSearch component tweaks if not already present - if "OpenSearchHybrid-Ve6bS" not in tweaks: - tweaks["OpenSearchHybrid-Ve6bS"] = {} - tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks + if "OpenSearch (Hybrid)" not in tweaks: + tweaks["OpenSearch (Hybrid)"] = {} + tweaks["OpenSearch (Hybrid)"]["docs_metadata"] = metadata_tweaks logger.debug( "[LF] Added metadata to tweaks", metadata_count=len(metadata_tweaks) ) @@ -168,7 +167,7 @@ class LangflowFileService: """ Combined upload, ingest, and delete operation. First uploads the file, then runs ingestion on it, then optionally deletes the file. - + Args: file_tuple: File tuple (filename, content, content_type) session_id: Optional session ID for the ingestion flow @@ -176,12 +175,12 @@ class LangflowFileService: settings: Optional UI settings to convert to component tweaks jwt_token: Optional JWT token for authentication delete_after_ingest: Whether to delete the file from Langflow after ingestion (default: True) - + Returns: Combined result with upload info, ingestion result, and deletion status """ logger.debug("[LF] Starting combined upload and ingest operation") - + # Step 1: Upload the file try: upload_result = await self.upload_user_file(file_tuple, jwt_token=jwt_token) @@ -190,10 +189,12 @@ class LangflowFileService: extra={ "file_id": upload_result.get("id"), "file_path": upload_result.get("path"), - } + }, ) except Exception as e: - logger.error("[LF] Upload failed during combined operation", extra={"error": str(e)}) + logger.error( + "[LF] Upload failed during combined operation", extra={"error": str(e)} + ) raise Exception(f"Upload failed: {str(e)}") # Step 2: Prepare for ingestion @@ -203,34 +204,39 @@ class LangflowFileService: # Convert UI settings to component tweaks if provided final_tweaks = tweaks.copy() if tweaks else {} - - if settings: - logger.debug("[LF] Applying ingestion settings", extra={"settings": settings}) - # Split Text component tweaks (SplitText-QIKhg) + if settings: + logger.debug( + "[LF] Applying ingestion settings", extra={"settings": settings} + ) + + # Split Text component tweaks if ( settings.get("chunkSize") or settings.get("chunkOverlap") or settings.get("separator") ): - if "SplitText-QIKhg" not in final_tweaks: - final_tweaks["SplitText-QIKhg"] = {} + if "Split Text" not in final_tweaks: + final_tweaks["Split Text"] = {} if settings.get("chunkSize"): - final_tweaks["SplitText-QIKhg"]["chunk_size"] = settings["chunkSize"] + final_tweaks["Split Text"]["chunk_size"] = settings["chunkSize"] if settings.get("chunkOverlap"): - final_tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[ + final_tweaks["Split Text"]["chunk_overlap"] = settings[ "chunkOverlap" ] if settings.get("separator"): - final_tweaks["SplitText-QIKhg"]["separator"] = settings["separator"] + final_tweaks["Split Text"]["separator"] = settings["separator"] - # OpenAI Embeddings component tweaks (OpenAIEmbeddings-joRJ6) + # OpenAI Embeddings component tweaks if settings.get("embeddingModel"): - if "OpenAIEmbeddings-joRJ6" not in final_tweaks: - final_tweaks["OpenAIEmbeddings-joRJ6"] = {} - final_tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"] + if "OpenAI Embeddings" not in final_tweaks: + final_tweaks["OpenAI Embeddings"] = {} + final_tweaks["OpenAI Embeddings"]["model"] = settings["embeddingModel"] - logger.debug("[LF] Final tweaks with settings applied", extra={"tweaks": final_tweaks}) + logger.debug( + "[LF] Final tweaks with settings applied", + extra={"tweaks": final_tweaks}, + ) # Step 3: Run ingestion try: @@ -244,10 +250,7 @@ class LangflowFileService: except Exception as e: logger.error( "[LF] Ingestion failed during combined operation", - extra={ - "error": str(e), - "file_path": file_path - } + extra={"error": str(e), "file_path": file_path}, ) # Note: We could optionally delete the uploaded file here if ingestion fails raise Exception(f"Ingestion failed: {str(e)}") @@ -256,10 +259,13 @@ class LangflowFileService: file_id = upload_result.get("id") delete_result = None delete_error = None - + if delete_after_ingest and file_id: try: - logger.debug("[LF] Deleting file after successful ingestion", extra={"file_id": file_id}) + logger.debug( + "[LF] Deleting file after successful ingestion", + extra={"file_id": file_id}, + ) await self.delete_user_file(file_id) delete_result = {"status": "deleted", "file_id": file_id} logger.debug("[LF] File deleted successfully") @@ -267,26 +273,27 @@ class LangflowFileService: delete_error = str(e) logger.warning( "[LF] Failed to delete file after ingestion", - extra={ - "error": delete_error, - "file_id": file_id - } + extra={"error": delete_error, "file_id": file_id}, ) - delete_result = {"status": "delete_failed", "file_id": file_id, "error": delete_error} + delete_result = { + "status": "delete_failed", + "file_id": file_id, + "error": delete_error, + } # Return combined result result = { "status": "success", "upload": upload_result, "ingestion": ingest_result, - "message": f"File '{upload_result.get('name')}' uploaded and ingested successfully" + "message": f"File '{upload_result.get('name')}' uploaded and ingested successfully", } - + if delete_after_ingest: result["deletion"] = delete_result if delete_result and delete_result.get("status") == "deleted": result["message"] += " and cleaned up" elif delete_error: result["message"] += f" (cleanup warning: {delete_error})" - + return result