format langflow_file_service.py

This commit is contained in:
Gabriel Luiz Freitas Almeida 2025-09-22 19:32:26 -03:00
parent b19890fa4d
commit 8d1b4f7cf6

View file

@ -113,7 +113,9 @@ class LangflowFileService:
# Initialize the OpenSearch component tweaks if not already present
if "OpenSearchVectorStoreComponent-YnJox" not in tweaks:
tweaks["OpenSearchVectorStoreComponent-YnJox"] = {}
tweaks["OpenSearchVectorStoreComponent-YnJox"]["docs_metadata"] = metadata_tweaks
tweaks["OpenSearchVectorStoreComponent-YnJox"]["docs_metadata"] = (
metadata_tweaks
)
logger.debug(
"[LF] Added metadata to tweaks", metadata_count=len(metadata_tweaks)
)
@ -170,7 +172,7 @@ class LangflowFileService:
"""
Combined upload, ingest, and delete operation.
First uploads the file, then runs ingestion on it, then optionally deletes the file.
Args:
file_tuple: File tuple (filename, content, content_type)
session_id: Optional session ID for the ingestion flow
@ -178,12 +180,12 @@ class LangflowFileService:
settings: Optional UI settings to convert to component tweaks
jwt_token: Optional JWT token for authentication
delete_after_ingest: Whether to delete the file from Langflow after ingestion (default: True)
Returns:
Combined result with upload info, ingestion result, and deletion status
"""
logger.debug("[LF] Starting combined upload and ingest operation")
# Step 1: Upload the file
try:
upload_result = await self.upload_user_file(file_tuple, jwt_token=jwt_token)
@ -192,10 +194,12 @@ class LangflowFileService:
extra={
"file_id": upload_result.get("id"),
"file_path": upload_result.get("path"),
}
},
)
except Exception as e:
logger.error("[LF] Upload failed during combined operation", extra={"error": str(e)})
logger.error(
"[LF] Upload failed during combined operation", extra={"error": str(e)}
)
raise Exception(f"Upload failed: {str(e)}")
# Step 2: Prepare for ingestion
@ -205,9 +209,11 @@ class LangflowFileService:
# Convert UI settings to component tweaks if provided
final_tweaks = tweaks.copy() if tweaks else {}
if settings:
logger.debug("[LF] Applying ingestion settings", extra={"settings": settings})
logger.debug(
"[LF] Applying ingestion settings", extra={"settings": settings}
)
# Split Text component tweaks (SplitText-PC36h)
if (
@ -218,7 +224,9 @@ class LangflowFileService:
if "SplitText-PC36h" not in final_tweaks:
final_tweaks["SplitText-PC36h"] = {}
if settings.get("chunkSize"):
final_tweaks["SplitText-PC36h"]["chunk_size"] = settings["chunkSize"]
final_tweaks["SplitText-PC36h"]["chunk_size"] = settings[
"chunkSize"
]
if settings.get("chunkOverlap"):
final_tweaks["SplitText-PC36h"]["chunk_overlap"] = settings[
"chunkOverlap"
@ -230,9 +238,14 @@ class LangflowFileService:
if settings.get("embeddingModel"):
if "OpenAIEmbeddings-joRJ6" not in final_tweaks:
final_tweaks["OpenAIEmbeddings-joRJ6"] = {}
final_tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"]
final_tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings[
"embeddingModel"
]
logger.debug("[LF] Final tweaks with settings applied", extra={"tweaks": final_tweaks})
logger.debug(
"[LF] Final tweaks with settings applied",
extra={"tweaks": final_tweaks},
)
# Step 3: Run ingestion
try:
@ -246,10 +259,7 @@ class LangflowFileService:
except Exception as e:
logger.error(
"[LF] Ingestion failed during combined operation",
extra={
"error": str(e),
"file_path": file_path
}
extra={"error": str(e), "file_path": file_path},
)
# Note: We could optionally delete the uploaded file here if ingestion fails
raise Exception(f"Ingestion failed: {str(e)}")
@ -258,10 +268,13 @@ class LangflowFileService:
file_id = upload_result.get("id")
delete_result = None
delete_error = None
if delete_after_ingest and file_id:
try:
logger.debug("[LF] Deleting file after successful ingestion", extra={"file_id": file_id})
logger.debug(
"[LF] Deleting file after successful ingestion",
extra={"file_id": file_id},
)
await self.delete_user_file(file_id)
delete_result = {"status": "deleted", "file_id": file_id}
logger.debug("[LF] File deleted successfully")
@ -269,26 +282,27 @@ class LangflowFileService:
delete_error = str(e)
logger.warning(
"[LF] Failed to delete file after ingestion",
extra={
"error": delete_error,
"file_id": file_id
}
extra={"error": delete_error, "file_id": file_id},
)
delete_result = {"status": "delete_failed", "file_id": file_id, "error": delete_error}
delete_result = {
"status": "delete_failed",
"file_id": file_id,
"error": delete_error,
}
# Return combined result
result = {
"status": "success",
"upload": upload_result,
"ingestion": ingest_result,
"message": f"File '{upload_result.get('name')}' uploaded and ingested successfully"
"message": f"File '{upload_result.get('name')}' uploaded and ingested successfully",
}
if delete_after_ingest:
result["deletion"] = delete_result
if delete_result and delete_result.get("status") == "deleted":
result["message"] += " and cleaned up"
elif delete_error:
result["message"] += f" (cleanup warning: {delete_error})"
return result