diff --git a/frontend/components/knowledge-dropdown.tsx b/frontend/components/knowledge-dropdown.tsx index c30d5420..917d64f5 100644 --- a/frontend/components/knowledge-dropdown.tsx +++ b/frontend/components/knowledge-dropdown.tsx @@ -80,47 +80,47 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD const formData = new FormData() formData.append('file', files[0]) - // 1) Upload to Langflow - const upRes = await fetch('/api/langflow/files/upload', { + // Use unified upload and ingest endpoint + const uploadIngestRes = await fetch('/api/langflow/upload_ingest', { method: 'POST', body: formData, }) - const upJson = await upRes.json() - if (!upRes.ok) { - throw new Error(upJson?.error || 'Upload to Langflow failed') + const uploadIngestJson = await uploadIngestRes.json() + if (!uploadIngestRes.ok) { + throw new Error(uploadIngestJson?.error || 'Upload and ingest failed') } - const fileId = upJson?.id - const filePath = upJson?.path + // Extract results from the unified response + const fileId = uploadIngestJson?.upload?.id + const filePath = uploadIngestJson?.upload?.path + const runJson = uploadIngestJson?.ingestion + const deleteResult = uploadIngestJson?.deletion + if (!fileId || !filePath) { - throw new Error('Langflow did not return file id/path') + throw new Error('Upload successful but no file id/path returned') } - // 2) Run ingestion flow - const runRes = await fetch('/api/langflow/ingest', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ file_paths: [filePath] }), - }) - const runJson = await runRes.json() - if (!runRes.ok) { - throw new Error(runJson?.error || 'Langflow ingestion failed') - } - - // 3) Delete file from Langflow - const delRes = await fetch('/api/langflow/files', { - method: 'DELETE', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ file_ids: [fileId] }), - }) - const delJson = await delRes.json().catch(() => ({})) - if (!delRes.ok) { - throw new Error(delJson?.error || 'Langflow file delete failed') + // Log deletion status if provided + if (deleteResult) { + if (deleteResult.status === 'deleted') { + console.log('File successfully cleaned up from Langflow:', deleteResult.file_id) + } else if (deleteResult.status === 'delete_failed') { + console.warn('Failed to cleanup file from Langflow:', deleteResult.error) + } } // Notify UI window.dispatchEvent(new CustomEvent('fileUploaded', { - detail: { file: files[0], result: { file_id: fileId, file_path: filePath, run: runJson } } + detail: { + file: files[0], + result: { + file_id: fileId, + file_path: filePath, + run: runJson, + deletion: deleteResult, + unified: true + } + } })) // Trigger search refresh after successful ingestion window.dispatchEvent(new CustomEvent('knowledgeUpdated')) diff --git a/src/api/langflow_files.py b/src/api/langflow_files.py index 1fa1f9c7..5ac8b901 100644 --- a/src/api/langflow_files.py +++ b/src/api/langflow_files.py @@ -114,6 +114,89 @@ async def run_ingestion( return JSONResponse({"error": str(e)}, status_code=500) +async def upload_and_ingest_user_file( + request: Request, langflow_file_service: LangflowFileService, session_manager +): + """Combined upload and ingest endpoint - uploads file then runs ingestion""" + try: + logger.debug("upload_and_ingest_user_file endpoint called") + form = await request.form() + upload_file = form.get("file") + if upload_file is None: + logger.error("No file provided in upload_and_ingest request") + return JSONResponse({"error": "Missing file"}, status_code=400) + + # Extract optional parameters + session_id = form.get("session_id") + settings_json = form.get("settings") + tweaks_json = form.get("tweaks") + delete_after_ingest = form.get("delete_after_ingest", "true").lower() == "true" + + # Parse JSON fields if provided + settings = None + tweaks = None + + if settings_json: + try: + import json + settings = json.loads(settings_json) + except json.JSONDecodeError as e: + logger.error("Invalid settings JSON", error=str(e)) + return JSONResponse({"error": "Invalid settings JSON"}, status_code=400) + + if tweaks_json: + try: + import json + tweaks = json.loads(tweaks_json) + except json.JSONDecodeError as e: + logger.error("Invalid tweaks JSON", error=str(e)) + return JSONResponse({"error": "Invalid tweaks JSON"}, status_code=400) + + logger.debug( + "Processing file for combined upload and ingest", + filename=upload_file.filename, + size=upload_file.size, + session_id=session_id, + has_settings=bool(settings), + has_tweaks=bool(tweaks), + delete_after_ingest=delete_after_ingest + ) + + # Prepare file tuple for upload + content = await upload_file.read() + file_tuple = ( + upload_file.filename, + content, + upload_file.content_type or "application/octet-stream", + ) + + jwt_token = getattr(request.state, "jwt_token", None) + logger.debug("JWT token status", jwt_present=jwt_token is not None) + + logger.debug("Calling langflow_file_service.upload_and_ingest_file") + result = await langflow_file_service.upload_and_ingest_file( + file_tuple=file_tuple, + session_id=session_id, + tweaks=tweaks, + settings=settings, + jwt_token=jwt_token, + delete_after_ingest=delete_after_ingest + ) + + logger.debug("Upload and ingest successful", result=result) + return JSONResponse(result, status_code=201) + + except Exception as e: + logger.error( + "upload_and_ingest_user_file endpoint failed", + error_type=type(e).__name__, + error=str(e), + ) + import traceback + logger.error("Full traceback", traceback=traceback.format_exc()) + return JSONResponse({"error": str(e)}, status_code=500) + + async def delete_user_files( request: Request, langflow_file_service: LangflowFileService, session_manager ): diff --git a/src/main.py b/src/main.py index a1282a9c..29dea7ca 100644 --- a/src/main.py +++ b/src/main.py @@ -406,6 +406,17 @@ async def create_app(): ), methods=["DELETE"], ), + Route( + "/langflow/upload_ingest", + require_auth(services["session_manager"])( + partial( + langflow_files.upload_and_ingest_user_file, + langflow_file_service=services["langflow_file_service"], + session_manager=services["session_manager"], + ) + ), + methods=["POST"], + ), Route( "/upload_context", require_auth(services["session_manager"])( diff --git a/src/services/langflow_file_service.py b/src/services/langflow_file_service.py index 452ecb86..1bc4da29 100644 --- a/src/services/langflow_file_service.py +++ b/src/services/langflow_file_service.py @@ -107,3 +107,132 @@ class LangflowFileService: ) resp.raise_for_status() return resp.json() + + async def upload_and_ingest_file( + self, + file_tuple, + session_id: Optional[str] = None, + tweaks: Optional[Dict[str, Any]] = None, + settings: Optional[Dict[str, Any]] = None, + jwt_token: Optional[str] = None, + delete_after_ingest: bool = True, + ) -> Dict[str, Any]: + """ + Combined upload, ingest, and delete operation. + First uploads the file, then runs ingestion on it, then optionally deletes the file. + + Args: + file_tuple: File tuple (filename, content, content_type) + session_id: Optional session ID for the ingestion flow + tweaks: Optional tweaks for the ingestion flow + settings: Optional UI settings to convert to component tweaks + jwt_token: Optional JWT token for authentication + delete_after_ingest: Whether to delete the file from Langflow after ingestion (default: True) + + Returns: + Combined result with upload info, ingestion result, and deletion status + """ + self.logger.debug("[LF] Starting combined upload and ingest operation") + + # Step 1: Upload the file + try: + upload_result = await self.upload_user_file(file_tuple, jwt_token=jwt_token) + self.logger.debug( + "[LF] Upload completed successfully", + file_id=upload_result.get("id"), + file_path=upload_result.get("path"), + ) + except Exception as e: + self.logger.error("[LF] Upload failed during combined operation", error=str(e)) + raise Exception(f"Upload failed: {str(e)}") + + # Step 2: Prepare for ingestion + file_path = upload_result.get("path") + if not file_path: + raise ValueError("Upload successful but no file path returned") + + # Convert UI settings to component tweaks if provided + final_tweaks = tweaks.copy() if tweaks else {} + + if settings: + self.logger.debug("[LF] Applying ingestion settings", settings=settings) + + # Split Text component tweaks (SplitText-QIKhg) + if ( + settings.get("chunkSize") + or settings.get("chunkOverlap") + or settings.get("separator") + ): + if "SplitText-QIKhg" not in final_tweaks: + final_tweaks["SplitText-QIKhg"] = {} + if settings.get("chunkSize"): + final_tweaks["SplitText-QIKhg"]["chunk_size"] = settings["chunkSize"] + if settings.get("chunkOverlap"): + final_tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[ + "chunkOverlap" + ] + if settings.get("separator"): + final_tweaks["SplitText-QIKhg"]["separator"] = settings["separator"] + + # OpenAI Embeddings component tweaks (OpenAIEmbeddings-joRJ6) + if settings.get("embeddingModel"): + if "OpenAIEmbeddings-joRJ6" not in final_tweaks: + final_tweaks["OpenAIEmbeddings-joRJ6"] = {} + final_tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"] + + self.logger.debug("[LF] Final tweaks with settings applied", tweaks=final_tweaks) + + # Step 3: Run ingestion + try: + ingest_result = await self.run_ingestion_flow( + file_paths=[file_path], + session_id=session_id, + tweaks=final_tweaks, + jwt_token=jwt_token, + ) + self.logger.debug("[LF] Ingestion completed successfully") + except Exception as e: + self.logger.error( + "[LF] Ingestion failed during combined operation", + error=str(e), + file_path=file_path + ) + # Note: We could optionally delete the uploaded file here if ingestion fails + raise Exception(f"Ingestion failed: {str(e)}") + + # Step 4: Delete file from Langflow (optional) + file_id = upload_result.get("id") + delete_result = None + delete_error = None + + if delete_after_ingest and file_id: + try: + self.logger.debug("[LF] Deleting file after successful ingestion", file_id=file_id) + await self.delete_user_file(file_id) + delete_result = {"status": "deleted", "file_id": file_id} + self.logger.debug("[LF] File deleted successfully") + except Exception as e: + delete_error = str(e) + self.logger.warning( + "[LF] Failed to delete file after ingestion", + error=delete_error, + file_id=file_id + ) + delete_result = {"status": "delete_failed", "file_id": file_id, "error": delete_error} + + # Return combined result + result = { + "status": "success", + "upload": upload_result, + "ingestion": ingest_result, + "message": f"File '{upload_result.get('name')}' uploaded and ingested successfully" + } + + if delete_after_ingest: + result["deletion"] = delete_result + if delete_result and delete_result.get("status") == "deleted": + result["message"] += " and cleaned up" + elif delete_error: + result["message"] += f" (cleanup warning: {delete_error})" + + return result \ No newline at end of file