diff --git a/src/services/langflow_file_service.py b/src/services/langflow_file_service.py index 132cd45e..63353874 100644 --- a/src/services/langflow_file_service.py +++ b/src/services/langflow_file_service.py @@ -130,9 +130,16 @@ class LangflowFileService: ) # Avoid logging full payload to prevent leaking sensitive data (e.g., JWT) + headers = { + "X-Langflow-Global-Var-JWT": jwt_token, + "X-Langflow-Global-Var-Owner": owner, + "X-Langflow-Global-Var-Owner-Name": owner_name, + "X-Langflow-Global-Var-Owner-Email": owner_email, + "X-Langflow-Global-Var-Connector-Type": connector_type, + } resp = await clients.langflow_request( - "POST", f"/api/v1/run/{self.flow_id_ingest}", json=payload + "POST", f"/api/v1/run/{self.flow_id_ingest}", json=payload, headers=headers ) logger.debug( "[LF] Run response", status_code=resp.status_code, reason=resp.reason_phrase @@ -168,7 +175,7 @@ class LangflowFileService: """ Combined upload, ingest, and delete operation. First uploads the file, then runs ingestion on it, then optionally deletes the file. - + Args: file_tuple: File tuple (filename, content, content_type) session_id: Optional session ID for the ingestion flow @@ -176,12 +183,12 @@ class LangflowFileService: settings: Optional UI settings to convert to component tweaks jwt_token: Optional JWT token for authentication delete_after_ingest: Whether to delete the file from Langflow after ingestion (default: True) - + Returns: Combined result with upload info, ingestion result, and deletion status """ logger.debug("[LF] Starting combined upload and ingest operation") - + # Step 1: Upload the file try: upload_result = await self.upload_user_file(file_tuple, jwt_token=jwt_token) @@ -190,10 +197,12 @@ class LangflowFileService: extra={ "file_id": upload_result.get("id"), "file_path": upload_result.get("path"), - } + }, ) except Exception as e: - logger.error("[LF] Upload failed during combined operation", extra={"error": str(e)}) + logger.error( + "[LF] Upload failed during combined operation", extra={"error": str(e)} + ) raise Exception(f"Upload failed: {str(e)}") # Step 2: Prepare for ingestion @@ -203,9 +212,11 @@ class LangflowFileService: # Convert UI settings to component tweaks if provided final_tweaks = tweaks.copy() if tweaks else {} - + if settings: - logger.debug("[LF] Applying ingestion settings", extra={"settings": settings}) + logger.debug( + "[LF] Applying ingestion settings", extra={"settings": settings} + ) # Split Text component tweaks (SplitText-QIKhg) if ( @@ -216,7 +227,9 @@ class LangflowFileService: if "SplitText-QIKhg" not in final_tweaks: final_tweaks["SplitText-QIKhg"] = {} if settings.get("chunkSize"): - final_tweaks["SplitText-QIKhg"]["chunk_size"] = settings["chunkSize"] + final_tweaks["SplitText-QIKhg"]["chunk_size"] = settings[ + "chunkSize" + ] if settings.get("chunkOverlap"): final_tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[ "chunkOverlap" @@ -228,9 +241,14 @@ class LangflowFileService: if settings.get("embeddingModel"): if "OpenAIEmbeddings-joRJ6" not in final_tweaks: final_tweaks["OpenAIEmbeddings-joRJ6"] = {} - final_tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"] + final_tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings[ + "embeddingModel" + ] - logger.debug("[LF] Final tweaks with settings applied", extra={"tweaks": final_tweaks}) + logger.debug( + "[LF] Final tweaks with settings applied", + extra={"tweaks": final_tweaks}, + ) # Step 3: Run ingestion try: @@ -244,10 +262,7 @@ class LangflowFileService: except Exception as e: logger.error( "[LF] Ingestion failed during combined operation", - extra={ - "error": str(e), - "file_path": file_path - } + extra={"error": str(e), "file_path": file_path}, ) # Note: We could optionally delete the uploaded file here if ingestion fails raise Exception(f"Ingestion failed: {str(e)}") @@ -256,10 +271,13 @@ class LangflowFileService: file_id = upload_result.get("id") delete_result = None delete_error = None - + if delete_after_ingest and file_id: try: - logger.debug("[LF] Deleting file after successful ingestion", extra={"file_id": file_id}) + logger.debug( + "[LF] Deleting file after successful ingestion", + extra={"file_id": file_id}, + ) await self.delete_user_file(file_id) delete_result = {"status": "deleted", "file_id": file_id} logger.debug("[LF] File deleted successfully") @@ -267,26 +285,27 @@ class LangflowFileService: delete_error = str(e) logger.warning( "[LF] Failed to delete file after ingestion", - extra={ - "error": delete_error, - "file_id": file_id - } + extra={"error": delete_error, "file_id": file_id}, ) - delete_result = {"status": "delete_failed", "file_id": file_id, "error": delete_error} + delete_result = { + "status": "delete_failed", + "file_id": file_id, + "error": delete_error, + } # Return combined result result = { "status": "success", "upload": upload_result, "ingestion": ingest_result, - "message": f"File '{upload_result.get('name')}' uploaded and ingested successfully" + "message": f"File '{upload_result.get('name')}' uploaded and ingested successfully", } - + if delete_after_ingest: result["deletion"] = delete_result if delete_result and delete_result.get("status") == "deleted": result["message"] += " and cleaned up" elif delete_error: result["message"] += f" (cleanup warning: {delete_error})" - + return result