Use display name instead of id for tweaks

This commit is contained in:
Gabriel Luiz Freitas Almeida 2025-09-09 16:18:25 -03:00
parent ab10e01286
commit 0e8bc3090c
3 changed files with 105 additions and 85 deletions

View file

@ -70,35 +70,33 @@ async def run_ingestion(
if settings: if settings:
logger.debug("Applying ingestion settings", settings=settings) logger.debug("Applying ingestion settings", settings=settings)
# Split Text component tweaks (SplitText-QIKhg) # Split Text component tweaks
if ( if (
settings.get("chunkSize") settings.get("chunkSize")
or settings.get("chunkOverlap") or settings.get("chunkOverlap")
or settings.get("separator") or settings.get("separator")
): ):
if "SplitText-QIKhg" not in tweaks: if "Split Text" not in tweaks:
tweaks["SplitText-QIKhg"] = {} tweaks["Split Text"] = {}
if settings.get("chunkSize"): if settings.get("chunkSize"):
tweaks["SplitText-QIKhg"]["chunk_size"] = settings["chunkSize"] tweaks["Split Text"]["chunk_size"] = settings["chunkSize"]
if settings.get("chunkOverlap"): if settings.get("chunkOverlap"):
tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[ tweaks["Split Text"]["chunk_overlap"] = settings["chunkOverlap"]
"chunkOverlap"
]
if settings.get("separator"): if settings.get("separator"):
tweaks["SplitText-QIKhg"]["separator"] = settings["separator"] tweaks["Split Text"]["separator"] = settings["separator"]
# OpenAI Embeddings component tweaks (OpenAIEmbeddings-joRJ6) # OpenAI Embeddings component tweaks
if settings.get("embeddingModel"): if settings.get("embeddingModel"):
if "OpenAIEmbeddings-joRJ6" not in tweaks: if "OpenAI Embeddings" not in tweaks:
tweaks["OpenAIEmbeddings-joRJ6"] = {} tweaks["OpenAI Embeddings"] = {}
tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"] tweaks["OpenAI Embeddings"]["model"] = settings["embeddingModel"]
# Note: OpenSearch component tweaks not needed for ingestion # Note: OpenSearch component tweaks not needed for ingestion
# (search parameters are for retrieval, not document processing) # (search parameters are for retrieval, not document processing)
logger.debug("Final tweaks with settings applied", tweaks=tweaks) logger.debug("Final tweaks with settings applied", tweaks=tweaks)
# Include user JWT if available # Include user JWT if available
jwt_token = getattr(request.state, "jwt_token", None) jwt_token: str | None = getattr(request.state, "jwt_token", None)
# Extract user info from User object # Extract user info from User object
user = getattr(request.state, "user", None) user = getattr(request.state, "user", None)
@ -128,7 +126,10 @@ async def run_ingestion(
async def upload_and_ingest_user_file( async def upload_and_ingest_user_file(
request: Request, langflow_file_service: LangflowFileService, session_manager, task_service request: Request,
langflow_file_service: LangflowFileService,
session_manager,
task_service,
): ):
"""Combined upload and ingest endpoint - uses task service for tracking and cancellation""" """Combined upload and ingest endpoint - uses task service for tracking and cancellation"""
try: try:
@ -148,10 +149,11 @@ async def upload_and_ingest_user_file(
# Parse JSON fields if provided # Parse JSON fields if provided
settings = None settings = None
tweaks = None tweaks = None
if settings_json: if settings_json:
try: try:
import json import json
settings = json.loads(settings_json) settings = json.loads(settings_json)
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
logger.error("Invalid settings JSON", error=str(e)) logger.error("Invalid settings JSON", error=str(e))
@ -160,6 +162,7 @@ async def upload_and_ingest_user_file(
if tweaks_json: if tweaks_json:
try: try:
import json import json
tweaks = json.loads(tweaks_json) tweaks = json.loads(tweaks_json)
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
logger.error("Invalid tweaks JSON", error=str(e)) logger.error("Invalid tweaks JSON", error=str(e))
@ -173,7 +176,9 @@ async def upload_and_ingest_user_file(
jwt_token = getattr(request.state, "jwt_token", None) jwt_token = getattr(request.state, "jwt_token", None)
if not user_id: if not user_id:
return JSONResponse({"error": "User authentication required"}, status_code=401) return JSONResponse(
{"error": "User authentication required"}, status_code=401
)
logger.debug( logger.debug(
"Processing file for task-based upload and ingest", "Processing file for task-based upload and ingest",
@ -183,28 +188,28 @@ async def upload_and_ingest_user_file(
has_settings=bool(settings), has_settings=bool(settings),
has_tweaks=bool(tweaks), has_tweaks=bool(tweaks),
delete_after_ingest=delete_after_ingest, delete_after_ingest=delete_after_ingest,
user_id=user_id user_id=user_id,
) )
# Create temporary file for task processing # Create temporary file for task processing
import tempfile
import os import os
import tempfile
# Read file content # Read file content
content = await upload_file.read() content = await upload_file.read()
# Create temporary file # Create temporary file
safe_filename = upload_file.filename.replace(" ", "_").replace("/", "_") safe_filename = upload_file.filename.replace(" ", "_").replace("/", "_")
temp_fd, temp_path = tempfile.mkstemp( temp_fd, temp_path = tempfile.mkstemp(suffix=f"_{safe_filename}")
suffix=f"_{safe_filename}"
)
try: try:
# Write content to temp file # Write content to temp file
with os.fdopen(temp_fd, 'wb') as temp_file: with os.fdopen(temp_fd, "wb") as temp_file:
temp_file.write(content) temp_file.write(content)
logger.debug("Created temporary file for task processing", temp_path=temp_path) logger.debug(
"Created temporary file for task processing", temp_path=temp_path
)
# Create langflow upload task for single file # Create langflow upload task for single file
task_id = await task_service.create_langflow_upload_task( task_id = await task_service.create_langflow_upload_task(
@ -222,12 +227,15 @@ async def upload_and_ingest_user_file(
) )
logger.debug("Langflow upload task created successfully", task_id=task_id) logger.debug("Langflow upload task created successfully", task_id=task_id)
return JSONResponse({ return JSONResponse(
"task_id": task_id, {
"message": f"Langflow upload task created for file '{upload_file.filename}'", "task_id": task_id,
"filename": upload_file.filename "message": f"Langflow upload task created for file '{upload_file.filename}'",
}, status_code=202) # 202 Accepted for async processing "filename": upload_file.filename,
},
status_code=202,
) # 202 Accepted for async processing
except Exception: except Exception:
# Clean up temp file on error # Clean up temp file on error
@ -237,7 +245,7 @@ async def upload_and_ingest_user_file(
except Exception: except Exception:
pass # Ignore cleanup errors pass # Ignore cleanup errors
raise raise
except Exception as e: except Exception as e:
logger.error( logger.error(
"upload_and_ingest_user_file endpoint failed", "upload_and_ingest_user_file endpoint failed",
@ -245,6 +253,7 @@ async def upload_and_ingest_user_file(
error=str(e), error=str(e),
) )
import traceback import traceback
logger.error("Full traceback", traceback=traceback.format_exc()) logger.error("Full traceback", traceback=traceback.format_exc())
return JSONResponse({"error": str(e)}, status_code=500) return JSONResponse({"error": str(e)}, status_code=500)

View file

@ -367,20 +367,22 @@ class LangflowFileProcessor(TaskProcessor):
try: try:
# Read file content # Read file content
with open(item, 'rb') as f: with open(item, "rb") as f:
content = f.read() content = f.read()
# Create file tuple for upload # Create file tuple for upload
temp_filename = os.path.basename(item) temp_filename = os.path.basename(item)
# Extract original filename from temp file suffix (remove tmp prefix) # Extract original filename from temp file suffix (remove tmp prefix)
if "_" in temp_filename: if "_" in temp_filename:
filename = temp_filename.split("_", 1)[1] # Get everything after first _ filename = temp_filename.split("_", 1)[
1
] # Get everything after first _
else: else:
filename = temp_filename filename = temp_filename
content_type, _ = mimetypes.guess_type(filename) content_type, _ = mimetypes.guess_type(filename)
if not content_type: if not content_type:
content_type = 'application/octet-stream' content_type = "application/octet-stream"
file_tuple = (filename, content, content_type) file_tuple = (filename, content, content_type)
# Get JWT token using same logic as DocumentFileProcessor # Get JWT token using same logic as DocumentFileProcessor
@ -393,27 +395,29 @@ class LangflowFileProcessor(TaskProcessor):
) )
# The session manager would have created anonymous JWT if needed # The session manager would have created anonymous JWT if needed
# Get it from the session manager's internal state # Get it from the session manager's internal state
if hasattr(self.session_manager, '_anonymous_jwt'): if hasattr(self.session_manager, "_anonymous_jwt"):
effective_jwt = self.session_manager._anonymous_jwt effective_jwt = self.session_manager._anonymous_jwt
# Prepare metadata tweaks similar to API endpoint # Prepare metadata tweaks similar to API endpoint
final_tweaks = self.tweaks.copy() if self.tweaks else {} final_tweaks = self.tweaks.copy() if self.tweaks else {}
metadata_tweaks = [] metadata_tweaks = []
if self.owner_user_id: if self.owner_user_id:
metadata_tweaks.append({"key": "owner", "value": self.owner_user_id}) metadata_tweaks.append({"key": "owner", "value": self.owner_user_id})
if self.owner_name: if self.owner_name:
metadata_tweaks.append({"key": "owner_name", "value": self.owner_name}) metadata_tweaks.append({"key": "owner_name", "value": self.owner_name})
if self.owner_email: if self.owner_email:
metadata_tweaks.append({"key": "owner_email", "value": self.owner_email}) metadata_tweaks.append(
{"key": "owner_email", "value": self.owner_email}
)
# Mark as local upload for connector_type # Mark as local upload for connector_type
metadata_tweaks.append({"key": "connector_type", "value": "local"}) metadata_tweaks.append({"key": "connector_type", "value": "local"})
if metadata_tweaks: if metadata_tweaks:
# Initialize the OpenSearch component tweaks if not already present # Initialize the OpenSearch component tweaks if not already present
if "OpenSearchHybrid-Ve6bS" not in final_tweaks: if "OpenSearch (Hybrid)" not in final_tweaks:
final_tweaks["OpenSearchHybrid-Ve6bS"] = {} final_tweaks["OpenSearch (Hybrid)"] = {}
final_tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks final_tweaks["OpenSearch (Hybrid)"]["docs_metadata"] = metadata_tweaks
# Process file using langflow service # Process file using langflow service
result = await self.langflow_file_service.upload_and_ingest_file( result = await self.langflow_file_service.upload_and_ingest_file(
@ -422,7 +426,7 @@ class LangflowFileProcessor(TaskProcessor):
tweaks=final_tweaks, tweaks=final_tweaks,
settings=self.settings, settings=self.settings,
jwt_token=effective_jwt, jwt_token=effective_jwt,
delete_after_ingest=self.delete_after_ingest delete_after_ingest=self.delete_after_ingest,
) )
# Update task with success # Update task with success

View file

@ -84,14 +84,13 @@ class LangflowFileService:
if not tweaks: if not tweaks:
tweaks = {} tweaks = {}
# Pass files via tweaks to File component (File-PSU37 from the flow) # Pass files via tweaks to File component
if file_paths: if file_paths:
tweaks["File-PSU37"] = {"path": file_paths} tweaks["File"] = {"path": file_paths}
# Pass JWT token via tweaks using the x-langflow-global-var- pattern # Pass JWT token via tweaks to OpenSearch component
if jwt_token: if jwt_token:
# Using the global variable pattern that Langflow expects for OpenSearch components tweaks["OpenSearch (Hybrid)"] = {"jwt_token": jwt_token}
tweaks["OpenSearchHybrid-Ve6bS"] = {"jwt_token": jwt_token}
logger.debug("[LF] Added JWT token to tweaks for OpenSearch components") logger.debug("[LF] Added JWT token to tweaks for OpenSearch components")
else: else:
logger.warning("[LF] No JWT token provided") logger.warning("[LF] No JWT token provided")
@ -109,9 +108,9 @@ class LangflowFileService:
if metadata_tweaks: if metadata_tweaks:
# Initialize the OpenSearch component tweaks if not already present # Initialize the OpenSearch component tweaks if not already present
if "OpenSearchHybrid-Ve6bS" not in tweaks: if "OpenSearch (Hybrid)" not in tweaks:
tweaks["OpenSearchHybrid-Ve6bS"] = {} tweaks["OpenSearch (Hybrid)"] = {}
tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks tweaks["OpenSearch (Hybrid)"]["docs_metadata"] = metadata_tweaks
logger.debug( logger.debug(
"[LF] Added metadata to tweaks", metadata_count=len(metadata_tweaks) "[LF] Added metadata to tweaks", metadata_count=len(metadata_tweaks)
) )
@ -168,7 +167,7 @@ class LangflowFileService:
""" """
Combined upload, ingest, and delete operation. Combined upload, ingest, and delete operation.
First uploads the file, then runs ingestion on it, then optionally deletes the file. First uploads the file, then runs ingestion on it, then optionally deletes the file.
Args: Args:
file_tuple: File tuple (filename, content, content_type) file_tuple: File tuple (filename, content, content_type)
session_id: Optional session ID for the ingestion flow session_id: Optional session ID for the ingestion flow
@ -176,12 +175,12 @@ class LangflowFileService:
settings: Optional UI settings to convert to component tweaks settings: Optional UI settings to convert to component tweaks
jwt_token: Optional JWT token for authentication jwt_token: Optional JWT token for authentication
delete_after_ingest: Whether to delete the file from Langflow after ingestion (default: True) delete_after_ingest: Whether to delete the file from Langflow after ingestion (default: True)
Returns: Returns:
Combined result with upload info, ingestion result, and deletion status Combined result with upload info, ingestion result, and deletion status
""" """
logger.debug("[LF] Starting combined upload and ingest operation") logger.debug("[LF] Starting combined upload and ingest operation")
# Step 1: Upload the file # Step 1: Upload the file
try: try:
upload_result = await self.upload_user_file(file_tuple, jwt_token=jwt_token) upload_result = await self.upload_user_file(file_tuple, jwt_token=jwt_token)
@ -190,10 +189,12 @@ class LangflowFileService:
extra={ extra={
"file_id": upload_result.get("id"), "file_id": upload_result.get("id"),
"file_path": upload_result.get("path"), "file_path": upload_result.get("path"),
} },
) )
except Exception as e: except Exception as e:
logger.error("[LF] Upload failed during combined operation", extra={"error": str(e)}) logger.error(
"[LF] Upload failed during combined operation", extra={"error": str(e)}
)
raise Exception(f"Upload failed: {str(e)}") raise Exception(f"Upload failed: {str(e)}")
# Step 2: Prepare for ingestion # Step 2: Prepare for ingestion
@ -203,34 +204,39 @@ class LangflowFileService:
# Convert UI settings to component tweaks if provided # Convert UI settings to component tweaks if provided
final_tweaks = tweaks.copy() if tweaks else {} final_tweaks = tweaks.copy() if tweaks else {}
if settings:
logger.debug("[LF] Applying ingestion settings", extra={"settings": settings})
# Split Text component tweaks (SplitText-QIKhg) if settings:
logger.debug(
"[LF] Applying ingestion settings", extra={"settings": settings}
)
# Split Text component tweaks
if ( if (
settings.get("chunkSize") settings.get("chunkSize")
or settings.get("chunkOverlap") or settings.get("chunkOverlap")
or settings.get("separator") or settings.get("separator")
): ):
if "SplitText-QIKhg" not in final_tweaks: if "Split Text" not in final_tweaks:
final_tweaks["SplitText-QIKhg"] = {} final_tweaks["Split Text"] = {}
if settings.get("chunkSize"): if settings.get("chunkSize"):
final_tweaks["SplitText-QIKhg"]["chunk_size"] = settings["chunkSize"] final_tweaks["Split Text"]["chunk_size"] = settings["chunkSize"]
if settings.get("chunkOverlap"): if settings.get("chunkOverlap"):
final_tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[ final_tweaks["Split Text"]["chunk_overlap"] = settings[
"chunkOverlap" "chunkOverlap"
] ]
if settings.get("separator"): if settings.get("separator"):
final_tweaks["SplitText-QIKhg"]["separator"] = settings["separator"] final_tweaks["Split Text"]["separator"] = settings["separator"]
# OpenAI Embeddings component tweaks (OpenAIEmbeddings-joRJ6) # OpenAI Embeddings component tweaks
if settings.get("embeddingModel"): if settings.get("embeddingModel"):
if "OpenAIEmbeddings-joRJ6" not in final_tweaks: if "OpenAI Embeddings" not in final_tweaks:
final_tweaks["OpenAIEmbeddings-joRJ6"] = {} final_tweaks["OpenAI Embeddings"] = {}
final_tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"] final_tweaks["OpenAI Embeddings"]["model"] = settings["embeddingModel"]
logger.debug("[LF] Final tweaks with settings applied", extra={"tweaks": final_tweaks}) logger.debug(
"[LF] Final tweaks with settings applied",
extra={"tweaks": final_tweaks},
)
# Step 3: Run ingestion # Step 3: Run ingestion
try: try:
@ -244,10 +250,7 @@ class LangflowFileService:
except Exception as e: except Exception as e:
logger.error( logger.error(
"[LF] Ingestion failed during combined operation", "[LF] Ingestion failed during combined operation",
extra={ extra={"error": str(e), "file_path": file_path},
"error": str(e),
"file_path": file_path
}
) )
# Note: We could optionally delete the uploaded file here if ingestion fails # Note: We could optionally delete the uploaded file here if ingestion fails
raise Exception(f"Ingestion failed: {str(e)}") raise Exception(f"Ingestion failed: {str(e)}")
@ -256,10 +259,13 @@ class LangflowFileService:
file_id = upload_result.get("id") file_id = upload_result.get("id")
delete_result = None delete_result = None
delete_error = None delete_error = None
if delete_after_ingest and file_id: if delete_after_ingest and file_id:
try: try:
logger.debug("[LF] Deleting file after successful ingestion", extra={"file_id": file_id}) logger.debug(
"[LF] Deleting file after successful ingestion",
extra={"file_id": file_id},
)
await self.delete_user_file(file_id) await self.delete_user_file(file_id)
delete_result = {"status": "deleted", "file_id": file_id} delete_result = {"status": "deleted", "file_id": file_id}
logger.debug("[LF] File deleted successfully") logger.debug("[LF] File deleted successfully")
@ -267,26 +273,27 @@ class LangflowFileService:
delete_error = str(e) delete_error = str(e)
logger.warning( logger.warning(
"[LF] Failed to delete file after ingestion", "[LF] Failed to delete file after ingestion",
extra={ extra={"error": delete_error, "file_id": file_id},
"error": delete_error,
"file_id": file_id
}
) )
delete_result = {"status": "delete_failed", "file_id": file_id, "error": delete_error} delete_result = {
"status": "delete_failed",
"file_id": file_id,
"error": delete_error,
}
# Return combined result # Return combined result
result = { result = {
"status": "success", "status": "success",
"upload": upload_result, "upload": upload_result,
"ingestion": ingest_result, "ingestion": ingest_result,
"message": f"File '{upload_result.get('name')}' uploaded and ingested successfully" "message": f"File '{upload_result.get('name')}' uploaded and ingested successfully",
} }
if delete_after_ingest: if delete_after_ingest:
result["deletion"] = delete_result result["deletion"] = delete_result
if delete_result and delete_result.get("status") == "deleted": if delete_result and delete_result.get("status") == "deleted":
result["message"] += " and cleaned up" result["message"] += " and cleaned up"
elif delete_error: elif delete_error:
result["message"] += f" (cleanup warning: {delete_error})" result["message"] += f" (cleanup warning: {delete_error})"
return result return result