diff --git a/src/connectors/langflow_connector_service.py b/src/connectors/langflow_connector_service.py index 858843ea..545c6190 100644 --- a/src/connectors/langflow_connector_service.py +++ b/src/connectors/langflow_connector_service.py @@ -72,27 +72,28 @@ class LangflowConnectorService: document.mimetype or "application/octet-stream", ) - upload_result = await self.langflow_service.upload_user_file( - file_tuple, jwt_token - ) - langflow_file_id = upload_result["id"] - langflow_file_path = upload_result["path"] - - logger.debug( - "File uploaded to Langflow", - file_id=langflow_file_id, - path=langflow_file_path, - ) - - # Step 2: Run ingestion flow with the uploaded file - logger.debug( - "Running Langflow ingestion flow", file_path=langflow_file_path - ) - - # Use the same tweaks pattern as LangflowFileService - tweaks = {} # Let Langflow handle the ingestion with default settings - + langflow_file_id = None # Initialize to track if upload succeeded try: + upload_result = await self.langflow_service.upload_user_file( + file_tuple, jwt_token + ) + langflow_file_id = upload_result["id"] + langflow_file_path = upload_result["path"] + + logger.debug( + "File uploaded to Langflow", + file_id=langflow_file_id, + path=langflow_file_path, + ) + + # Step 2: Run ingestion flow with the uploaded file + logger.debug( + "Running Langflow ingestion flow", file_path=langflow_file_path + ) + + # Use the same tweaks pattern as LangflowFileService + tweaks = {} # Let Langflow handle the ingestion with default settings + ingestion_result = await self.langflow_service.run_ingestion_flow( file_paths=[langflow_file_path], jwt_token=jwt_token, @@ -126,18 +127,19 @@ class LangflowConnectorService: error=str(e), ) # Try to clean up Langflow file if upload succeeded but processing failed - try: - await self.langflow_service.delete_user_file(langflow_file_id) - logger.debug( - "Cleaned up Langflow file after error", - file_id=langflow_file_id, - ) - except Exception as cleanup_error: - logger.warning( - "Failed to cleanup Langflow file", - file_id=langflow_file_id, - error=str(cleanup_error), - ) + if langflow_file_id is not None: + try: + await self.langflow_service.delete_user_file(langflow_file_id) + logger.debug( + "Cleaned up Langflow file after error", + file_id=langflow_file_id, + ) + except Exception as cleanup_error: + logger.warning( + "Failed to cleanup Langflow file", + file_id=langflow_file_id, + error=str(cleanup_error), + ) raise def _get_file_extension(self, mimetype: str) -> str: