From 6c99c1b61d97d2255a5850ee29bb0d76eafb9290 Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Mon, 8 Sep 2025 02:03:02 -0400 Subject: [PATCH] Implement unified upload and ingest endpoint in Langflow This commit introduces a new combined endpoint for uploading files and running ingestion in Langflow. The frontend component is updated to utilize this endpoint, streamlining the process by eliminating separate upload and ingestion calls. The response structure is adjusted to include deletion status and other relevant information, enhancing error handling and logging practices throughout the codebase. --- frontend/components/knowledge-dropdown.tsx | 58 ++++----- src/api/langflow_files.py | 83 +++++++++++++ src/main.py | 11 ++ src/services/langflow_file_service.py | 129 +++++++++++++++++++++ 4 files changed, 252 insertions(+), 29 deletions(-) diff --git a/frontend/components/knowledge-dropdown.tsx b/frontend/components/knowledge-dropdown.tsx index c30d5420..917d64f5 100644 --- a/frontend/components/knowledge-dropdown.tsx +++ b/frontend/components/knowledge-dropdown.tsx @@ -80,47 +80,47 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD const formData = new FormData() formData.append('file', files[0]) - // 1) Upload to Langflow - const upRes = await fetch('/api/langflow/files/upload', { + // Use unified upload and ingest endpoint + const uploadIngestRes = await fetch('/api/langflow/upload_ingest', { method: 'POST', body: formData, }) - const upJson = await upRes.json() - if (!upRes.ok) { - throw new Error(upJson?.error || 'Upload to Langflow failed') + const uploadIngestJson = await uploadIngestRes.json() + if (!uploadIngestRes.ok) { + throw new Error(uploadIngestJson?.error || 'Upload and ingest failed') } - const fileId = upJson?.id - const filePath = upJson?.path + // Extract results from the unified response + const fileId = uploadIngestJson?.upload?.id + const filePath = uploadIngestJson?.upload?.path + const runJson = uploadIngestJson?.ingestion + const deleteResult = uploadIngestJson?.deletion + if (!fileId || !filePath) { - throw new Error('Langflow did not return file id/path') + throw new Error('Upload successful but no file id/path returned') } - // 2) Run ingestion flow - const runRes = await fetch('/api/langflow/ingest', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ file_paths: [filePath] }), - }) - const runJson = await runRes.json() - if (!runRes.ok) { - throw new Error(runJson?.error || 'Langflow ingestion failed') - } - - // 3) Delete file from Langflow - const delRes = await fetch('/api/langflow/files', { - method: 'DELETE', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ file_ids: [fileId] }), - }) - const delJson = await delRes.json().catch(() => ({})) - if (!delRes.ok) { - throw new Error(delJson?.error || 'Langflow file delete failed') + // Log deletion status if provided + if (deleteResult) { + if (deleteResult.status === 'deleted') { + console.log('File successfully cleaned up from Langflow:', deleteResult.file_id) + } else if (deleteResult.status === 'delete_failed') { + console.warn('Failed to cleanup file from Langflow:', deleteResult.error) + } } // Notify UI window.dispatchEvent(new CustomEvent('fileUploaded', { - detail: { file: files[0], result: { file_id: fileId, file_path: filePath, run: runJson } } + detail: { + file: files[0], + result: { + file_id: fileId, + file_path: filePath, + run: runJson, + deletion: deleteResult, + unified: true + } + } })) // Trigger search refresh after successful ingestion window.dispatchEvent(new CustomEvent('knowledgeUpdated')) diff --git a/src/api/langflow_files.py b/src/api/langflow_files.py index 1fa1f9c7..5ac8b901 100644 --- a/src/api/langflow_files.py +++ b/src/api/langflow_files.py @@ -114,6 +114,89 @@ async def run_ingestion( return JSONResponse({"error": str(e)}, status_code=500) +async def upload_and_ingest_user_file( + request: Request, langflow_file_service: LangflowFileService, session_manager +): + """Combined upload and ingest endpoint - uploads file then runs ingestion""" + try: + logger.debug("upload_and_ingest_user_file endpoint called") + form = await request.form() + upload_file = form.get("file") + if upload_file is None: + logger.error("No file provided in upload_and_ingest request") + return JSONResponse({"error": "Missing file"}, status_code=400) + + # Extract optional parameters + session_id = form.get("session_id") + settings_json = form.get("settings") + tweaks_json = form.get("tweaks") + delete_after_ingest = form.get("delete_after_ingest", "true").lower() == "true" + + # Parse JSON fields if provided + settings = None + tweaks = None + + if settings_json: + try: + import json + settings = json.loads(settings_json) + except json.JSONDecodeError as e: + logger.error("Invalid settings JSON", error=str(e)) + return JSONResponse({"error": "Invalid settings JSON"}, status_code=400) + + if tweaks_json: + try: + import json + tweaks = json.loads(tweaks_json) + except json.JSONDecodeError as e: + logger.error("Invalid tweaks JSON", error=str(e)) + return JSONResponse({"error": "Invalid tweaks JSON"}, status_code=400) + + logger.debug( + "Processing file for combined upload and ingest", + filename=upload_file.filename, + size=upload_file.size, + session_id=session_id, + has_settings=bool(settings), + has_tweaks=bool(tweaks), + delete_after_ingest=delete_after_ingest + ) + + # Prepare file tuple for upload + content = await upload_file.read() + file_tuple = ( + upload_file.filename, + content, + upload_file.content_type or "application/octet-stream", + ) + + jwt_token = getattr(request.state, "jwt_token", None) + logger.debug("JWT token status", jwt_present=jwt_token is not None) + + logger.debug("Calling langflow_file_service.upload_and_ingest_file") + result = await langflow_file_service.upload_and_ingest_file( + file_tuple=file_tuple, + session_id=session_id, + tweaks=tweaks, + settings=settings, + jwt_token=jwt_token, + delete_after_ingest=delete_after_ingest + ) + + logger.debug("Upload and ingest successful", result=result) + return JSONResponse(result, status_code=201) + + except Exception as e: + logger.error( + "upload_and_ingest_user_file endpoint failed", + error_type=type(e).__name__, + error=str(e), + ) + import traceback + logger.error("Full traceback", traceback=traceback.format_exc()) + return JSONResponse({"error": str(e)}, status_code=500) + + async def delete_user_files( request: Request, langflow_file_service: LangflowFileService, session_manager ): diff --git a/src/main.py b/src/main.py index a1282a9c..29dea7ca 100644 --- a/src/main.py +++ b/src/main.py @@ -406,6 +406,17 @@ async def create_app(): ), methods=["DELETE"], ), + Route( + "/langflow/upload_ingest", + require_auth(services["session_manager"])( + partial( + langflow_files.upload_and_ingest_user_file, + langflow_file_service=services["langflow_file_service"], + session_manager=services["session_manager"], + ) + ), + methods=["POST"], + ), Route( "/upload_context", require_auth(services["session_manager"])( diff --git a/src/services/langflow_file_service.py b/src/services/langflow_file_service.py index 452ecb86..1bc4da29 100644 --- a/src/services/langflow_file_service.py +++ b/src/services/langflow_file_service.py @@ -107,3 +107,132 @@ class LangflowFileService: ) resp.raise_for_status() return resp.json() + + async def upload_and_ingest_file( + self, + file_tuple, + session_id: Optional[str] = None, + tweaks: Optional[Dict[str, Any]] = None, + settings: Optional[Dict[str, Any]] = None, + jwt_token: Optional[str] = None, + delete_after_ingest: bool = True, + ) -> Dict[str, Any]: + """ + Combined upload, ingest, and delete operation. + First uploads the file, then runs ingestion on it, then optionally deletes the file. + + Args: + file_tuple: File tuple (filename, content, content_type) + session_id: Optional session ID for the ingestion flow + tweaks: Optional tweaks for the ingestion flow + settings: Optional UI settings to convert to component tweaks + jwt_token: Optional JWT token for authentication + delete_after_ingest: Whether to delete the file from Langflow after ingestion (default: True) + + Returns: + Combined result with upload info, ingestion result, and deletion status + """ + self.logger.debug("[LF] Starting combined upload and ingest operation") + + # Step 1: Upload the file + try: + upload_result = await self.upload_user_file(file_tuple, jwt_token=jwt_token) + self.logger.debug( + "[LF] Upload completed successfully", + file_id=upload_result.get("id"), + file_path=upload_result.get("path"), + ) + except Exception as e: + self.logger.error("[LF] Upload failed during combined operation", error=str(e)) + raise Exception(f"Upload failed: {str(e)}") + + # Step 2: Prepare for ingestion + file_path = upload_result.get("path") + if not file_path: + raise ValueError("Upload successful but no file path returned") + + # Convert UI settings to component tweaks if provided + final_tweaks = tweaks.copy() if tweaks else {} + + if settings: + self.logger.debug("[LF] Applying ingestion settings", settings=settings) + + # Split Text component tweaks (SplitText-QIKhg) + if ( + settings.get("chunkSize") + or settings.get("chunkOverlap") + or settings.get("separator") + ): + if "SplitText-QIKhg" not in final_tweaks: + final_tweaks["SplitText-QIKhg"] = {} + if settings.get("chunkSize"): + final_tweaks["SplitText-QIKhg"]["chunk_size"] = settings["chunkSize"] + if settings.get("chunkOverlap"): + final_tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[ + "chunkOverlap" + ] + if settings.get("separator"): + final_tweaks["SplitText-QIKhg"]["separator"] = settings["separator"] + + # OpenAI Embeddings component tweaks (OpenAIEmbeddings-joRJ6) + if settings.get("embeddingModel"): + if "OpenAIEmbeddings-joRJ6" not in final_tweaks: + final_tweaks["OpenAIEmbeddings-joRJ6"] = {} + final_tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"] + + self.logger.debug("[LF] Final tweaks with settings applied", tweaks=final_tweaks) + + # Step 3: Run ingestion + try: + ingest_result = await self.run_ingestion_flow( + file_paths=[file_path], + session_id=session_id, + tweaks=final_tweaks, + jwt_token=jwt_token, + ) + self.logger.debug("[LF] Ingestion completed successfully") + except Exception as e: + self.logger.error( + "[LF] Ingestion failed during combined operation", + error=str(e), + file_path=file_path + ) + # Note: We could optionally delete the uploaded file here if ingestion fails + raise Exception(f"Ingestion failed: {str(e)}") + + # Step 4: Delete file from Langflow (optional) + file_id = upload_result.get("id") + delete_result = None + delete_error = None + + if delete_after_ingest and file_id: + try: + self.logger.debug("[LF] Deleting file after successful ingestion", file_id=file_id) + await self.delete_user_file(file_id) + delete_result = {"status": "deleted", "file_id": file_id} + self.logger.debug("[LF] File deleted successfully") + except Exception as e: + delete_error = str(e) + self.logger.warning( + "[LF] Failed to delete file after ingestion", + error=delete_error, + file_id=file_id + ) + delete_result = {"status": "delete_failed", "file_id": file_id, "error": delete_error} + + # Return combined result + result = { + "status": "success", + "upload": upload_result, + "ingestion": ingest_result, + "message": f"File '{upload_result.get('name')}' uploaded and ingested successfully" + } + + if delete_after_ingest: + result["deletion"] = delete_result + if delete_result and delete_result.get("status") == "deleted": + result["message"] += " and cleaned up" + elif delete_error: + result["message"] += f" (cleanup warning: {delete_error})" + + return result \ No newline at end of file