Implement unified upload and ingest endpoint in Langflow

This commit introduces a new combined endpoint for uploading files and running ingestion in Langflow. The frontend component is updated to utilize this endpoint, streamlining the process by eliminating separate upload and ingestion calls. The response structure is adjusted to include deletion status and other relevant information, enhancing error handling and logging practices throughout the codebase.
This commit is contained in:
Edwin Jose 2025-09-08 02:03:02 -04:00
parent 7172e95d98
commit 6c99c1b61d
4 changed files with 252 additions and 29 deletions

View file

@ -80,47 +80,47 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD
const formData = new FormData()
formData.append('file', files[0])
// 1) Upload to Langflow
const upRes = await fetch('/api/langflow/files/upload', {
// Use unified upload and ingest endpoint
const uploadIngestRes = await fetch('/api/langflow/upload_ingest', {
method: 'POST',
body: formData,
})
const upJson = await upRes.json()
if (!upRes.ok) {
throw new Error(upJson?.error || 'Upload to Langflow failed')
const uploadIngestJson = await uploadIngestRes.json()
if (!uploadIngestRes.ok) {
throw new Error(uploadIngestJson?.error || 'Upload and ingest failed')
}
const fileId = upJson?.id
const filePath = upJson?.path
// Extract results from the unified response
const fileId = uploadIngestJson?.upload?.id
const filePath = uploadIngestJson?.upload?.path
const runJson = uploadIngestJson?.ingestion
const deleteResult = uploadIngestJson?.deletion
if (!fileId || !filePath) {
throw new Error('Langflow did not return file id/path')
throw new Error('Upload successful but no file id/path returned')
}
// 2) Run ingestion flow
const runRes = await fetch('/api/langflow/ingest', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ file_paths: [filePath] }),
})
const runJson = await runRes.json()
if (!runRes.ok) {
throw new Error(runJson?.error || 'Langflow ingestion failed')
}
// 3) Delete file from Langflow
const delRes = await fetch('/api/langflow/files', {
method: 'DELETE',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ file_ids: [fileId] }),
})
const delJson = await delRes.json().catch(() => ({}))
if (!delRes.ok) {
throw new Error(delJson?.error || 'Langflow file delete failed')
// Log deletion status if provided
if (deleteResult) {
if (deleteResult.status === 'deleted') {
console.log('File successfully cleaned up from Langflow:', deleteResult.file_id)
} else if (deleteResult.status === 'delete_failed') {
console.warn('Failed to cleanup file from Langflow:', deleteResult.error)
}
}
// Notify UI
window.dispatchEvent(new CustomEvent('fileUploaded', {
detail: { file: files[0], result: { file_id: fileId, file_path: filePath, run: runJson } }
detail: {
file: files[0],
result: {
file_id: fileId,
file_path: filePath,
run: runJson,
deletion: deleteResult,
unified: true
}
}
}))
// Trigger search refresh after successful ingestion
window.dispatchEvent(new CustomEvent('knowledgeUpdated'))

View file

@ -114,6 +114,89 @@ async def run_ingestion(
return JSONResponse({"error": str(e)}, status_code=500)
async def upload_and_ingest_user_file(
request: Request, langflow_file_service: LangflowFileService, session_manager
):
"""Combined upload and ingest endpoint - uploads file then runs ingestion"""
try:
logger.debug("upload_and_ingest_user_file endpoint called")
form = await request.form()
upload_file = form.get("file")
if upload_file is None:
logger.error("No file provided in upload_and_ingest request")
return JSONResponse({"error": "Missing file"}, status_code=400)
# Extract optional parameters
session_id = form.get("session_id")
settings_json = form.get("settings")
tweaks_json = form.get("tweaks")
delete_after_ingest = form.get("delete_after_ingest", "true").lower() == "true"
# Parse JSON fields if provided
settings = None
tweaks = None
if settings_json:
try:
import json
settings = json.loads(settings_json)
except json.JSONDecodeError as e:
logger.error("Invalid settings JSON", error=str(e))
return JSONResponse({"error": "Invalid settings JSON"}, status_code=400)
if tweaks_json:
try:
import json
tweaks = json.loads(tweaks_json)
except json.JSONDecodeError as e:
logger.error("Invalid tweaks JSON", error=str(e))
return JSONResponse({"error": "Invalid tweaks JSON"}, status_code=400)
logger.debug(
"Processing file for combined upload and ingest",
filename=upload_file.filename,
size=upload_file.size,
session_id=session_id,
has_settings=bool(settings),
has_tweaks=bool(tweaks),
delete_after_ingest=delete_after_ingest
)
# Prepare file tuple for upload
content = await upload_file.read()
file_tuple = (
upload_file.filename,
content,
upload_file.content_type or "application/octet-stream",
)
jwt_token = getattr(request.state, "jwt_token", None)
logger.debug("JWT token status", jwt_present=jwt_token is not None)
logger.debug("Calling langflow_file_service.upload_and_ingest_file")
result = await langflow_file_service.upload_and_ingest_file(
file_tuple=file_tuple,
session_id=session_id,
tweaks=tweaks,
settings=settings,
jwt_token=jwt_token,
delete_after_ingest=delete_after_ingest
)
logger.debug("Upload and ingest successful", result=result)
return JSONResponse(result, status_code=201)
except Exception as e:
logger.error(
"upload_and_ingest_user_file endpoint failed",
error_type=type(e).__name__,
error=str(e),
)
import traceback
logger.error("Full traceback", traceback=traceback.format_exc())
return JSONResponse({"error": str(e)}, status_code=500)
async def delete_user_files(
request: Request, langflow_file_service: LangflowFileService, session_manager
):

View file

@ -406,6 +406,17 @@ async def create_app():
),
methods=["DELETE"],
),
Route(
"/langflow/upload_ingest",
require_auth(services["session_manager"])(
partial(
langflow_files.upload_and_ingest_user_file,
langflow_file_service=services["langflow_file_service"],
session_manager=services["session_manager"],
)
),
methods=["POST"],
),
Route(
"/upload_context",
require_auth(services["session_manager"])(

View file

@ -107,3 +107,132 @@ class LangflowFileService:
)
resp.raise_for_status()
return resp.json()
async def upload_and_ingest_file(
self,
file_tuple,
session_id: Optional[str] = None,
tweaks: Optional[Dict[str, Any]] = None,
settings: Optional[Dict[str, Any]] = None,
jwt_token: Optional[str] = None,
delete_after_ingest: bool = True,
) -> Dict[str, Any]:
"""
Combined upload, ingest, and delete operation.
First uploads the file, then runs ingestion on it, then optionally deletes the file.
Args:
file_tuple: File tuple (filename, content, content_type)
session_id: Optional session ID for the ingestion flow
tweaks: Optional tweaks for the ingestion flow
settings: Optional UI settings to convert to component tweaks
jwt_token: Optional JWT token for authentication
delete_after_ingest: Whether to delete the file from Langflow after ingestion (default: True)
Returns:
Combined result with upload info, ingestion result, and deletion status
"""
self.logger.debug("[LF] Starting combined upload and ingest operation")
# Step 1: Upload the file
try:
upload_result = await self.upload_user_file(file_tuple, jwt_token=jwt_token)
self.logger.debug(
"[LF] Upload completed successfully",
file_id=upload_result.get("id"),
file_path=upload_result.get("path"),
)
except Exception as e:
self.logger.error("[LF] Upload failed during combined operation", error=str(e))
raise Exception(f"Upload failed: {str(e)}")
# Step 2: Prepare for ingestion
file_path = upload_result.get("path")
if not file_path:
raise ValueError("Upload successful but no file path returned")
# Convert UI settings to component tweaks if provided
final_tweaks = tweaks.copy() if tweaks else {}
if settings:
self.logger.debug("[LF] Applying ingestion settings", settings=settings)
# Split Text component tweaks (SplitText-QIKhg)
if (
settings.get("chunkSize")
or settings.get("chunkOverlap")
or settings.get("separator")
):
if "SplitText-QIKhg" not in final_tweaks:
final_tweaks["SplitText-QIKhg"] = {}
if settings.get("chunkSize"):
final_tweaks["SplitText-QIKhg"]["chunk_size"] = settings["chunkSize"]
if settings.get("chunkOverlap"):
final_tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[
"chunkOverlap"
]
if settings.get("separator"):
final_tweaks["SplitText-QIKhg"]["separator"] = settings["separator"]
# OpenAI Embeddings component tweaks (OpenAIEmbeddings-joRJ6)
if settings.get("embeddingModel"):
if "OpenAIEmbeddings-joRJ6" not in final_tweaks:
final_tweaks["OpenAIEmbeddings-joRJ6"] = {}
final_tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"]
self.logger.debug("[LF] Final tweaks with settings applied", tweaks=final_tweaks)
# Step 3: Run ingestion
try:
ingest_result = await self.run_ingestion_flow(
file_paths=[file_path],
session_id=session_id,
tweaks=final_tweaks,
jwt_token=jwt_token,
)
self.logger.debug("[LF] Ingestion completed successfully")
except Exception as e:
self.logger.error(
"[LF] Ingestion failed during combined operation",
error=str(e),
file_path=file_path
)
# Note: We could optionally delete the uploaded file here if ingestion fails
raise Exception(f"Ingestion failed: {str(e)}")
# Step 4: Delete file from Langflow (optional)
file_id = upload_result.get("id")
delete_result = None
delete_error = None
if delete_after_ingest and file_id:
try:
self.logger.debug("[LF] Deleting file after successful ingestion", file_id=file_id)
await self.delete_user_file(file_id)
delete_result = {"status": "deleted", "file_id": file_id}
self.logger.debug("[LF] File deleted successfully")
except Exception as e:
delete_error = str(e)
self.logger.warning(
"[LF] Failed to delete file after ingestion",
error=delete_error,
file_id=file_id
)
delete_result = {"status": "delete_failed", "file_id": file_id, "error": delete_error}
# Return combined result
result = {
"status": "success",
"upload": upload_result,
"ingestion": ingest_result,
"message": f"File '{upload_result.get('name')}' uploaded and ingested successfully"
}
if delete_after_ingest:
result["deletion"] = delete_result
if delete_result and delete_result.get("status") == "deleted":
result["message"] += " and cleaned up"
elif delete_error:
result["message"] += f" (cleanup warning: {delete_error})"
return result