import os import tempfile from typing import Any, Dict, List, Optional # Create custom processor for connector files using Langflow from models.processors import LangflowConnectorFileProcessor from services.langflow_file_service import LangflowFileService from utils.logging_config import get_logger from .base import BaseConnector, ConnectorDocument from .connection_manager import ConnectionManager logger = get_logger(__name__) class LangflowConnectorService: """Service to manage connector documents and process them via Langflow""" def __init__( self, task_service=None, session_manager=None, ): self.task_service = task_service self.session_manager = session_manager self.connection_manager = ConnectionManager() # Initialize LangflowFileService for processing connector documents self.langflow_service = LangflowFileService() async def initialize(self): """Initialize the service by loading existing connections""" await self.connection_manager.load_connections() async def get_connector(self, connection_id: str) -> Optional[BaseConnector]: """Get a connector by connection ID""" return await self.connection_manager.get_connector(connection_id) async def process_connector_document( self, document: ConnectorDocument, owner_user_id: str, connector_type: str, jwt_token: str = None, owner_name: str = None, owner_email: str = None, ) -> Dict[str, Any]: """Process a document from a connector using LangflowFileService pattern""" logger.debug( "Processing connector document via Langflow", document_id=document.id, filename=document.filename, ) suffix = self._get_file_extension(document.mimetype) # Create temporary file from document content with tempfile.NamedTemporaryFile( delete=False, suffix=suffix ) as tmp_file: tmp_file.write(document.content) tmp_file.flush() try: # Step 1: Upload file to Langflow logger.debug("Uploading file to Langflow", filename=document.filename) content = document.content file_tuple = ( document.filename.replace(" ", "_").replace("/", "_")+suffix, content, document.mimetype or "application/octet-stream", ) upload_result = await self.langflow_service.upload_user_file( file_tuple, jwt_token ) langflow_file_id = upload_result["id"] langflow_file_path = upload_result["path"] logger.debug( "File uploaded to Langflow", file_id=langflow_file_id, path=langflow_file_path, ) # Step 2: Run ingestion flow with the uploaded file logger.debug( "Running Langflow ingestion flow", file_path=langflow_file_path ) # Use the same tweaks pattern as LangflowFileService tweaks = {} # Let Langflow handle the ingestion with default settings ingestion_result = await self.langflow_service.run_ingestion_flow( file_paths=[langflow_file_path], jwt_token=jwt_token, tweaks=tweaks, owner=owner_user_id, owner_name=owner_name, owner_email=owner_email, connector_type=connector_type, ) logger.debug("Ingestion flow completed", result=ingestion_result) # Step 3: Delete the file from Langflow logger.debug("Deleting file from Langflow", file_id=langflow_file_id) await self.langflow_service.delete_user_file(langflow_file_id) logger.debug("File deleted from Langflow", file_id=langflow_file_id) return { "status": "indexed", "filename": document.filename, "source_url": document.source_url, "document_id": document.id, "connector_type": connector_type, "langflow_result": ingestion_result, } except Exception as e: logger.error( "Failed to process connector document via Langflow", document_id=document.id, error=str(e), ) # Try to clean up Langflow file if upload succeeded but processing failed if "langflow_file_id" in locals(): try: await self.langflow_service.delete_user_file(langflow_file_id) logger.debug( "Cleaned up Langflow file after error", file_id=langflow_file_id, ) except Exception as cleanup_error: logger.warning( "Failed to cleanup Langflow file", file_id=langflow_file_id, error=str(cleanup_error), ) raise finally: # Clean up temporary file os.unlink(tmp_file.name) def _get_file_extension(self, mimetype: str) -> str: """Get file extension based on MIME type""" mime_to_ext = { "application/pdf": ".pdf", "application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx", "application/msword": ".doc", "application/vnd.openxmlformats-officedocument.presentationml.presentation": ".pptx", "application/vnd.ms-powerpoint": ".ppt", "text/plain": ".txt", "text/html": ".html", "application/rtf": ".rtf", "application/vnd.google-apps.document": ".pdf", # Exported as PDF "application/vnd.google-apps.presentation": ".pdf", "application/vnd.google-apps.spreadsheet": ".pdf", } return mime_to_ext.get(mimetype, ".bin") async def sync_connector_files( self, connection_id: str, user_id: str, max_files: int = None, jwt_token: str = None, ) -> str: """Sync files from a connector connection using Langflow processing""" if not self.task_service: raise ValueError( "TaskService not available - connector sync requires task service dependency" ) logger.debug( "Starting Langflow-based sync for connection", connection_id=connection_id, max_files=max_files, ) connector = await self.get_connector(connection_id) if not connector: raise ValueError( f"Connection '{connection_id}' not found or not authenticated" ) logger.debug("Got connector", authenticated=connector.is_authenticated) if not connector.is_authenticated: raise ValueError(f"Connection '{connection_id}' not authenticated") # Collect files to process (limited by max_files) files_to_process = [] page_token = None # Calculate page size to minimize API calls page_size = min(max_files or 100, 1000) if max_files else 100 while True: # List files from connector with limit logger.debug( "Calling list_files", page_size=page_size, page_token=page_token ) file_list = await connector.list_files(page_token, limit=page_size) logger.debug( "Got files from connector", file_count=len(file_list.get("files", [])) ) files = file_list["files"] if not files: break for file_info in files: if max_files and len(files_to_process) >= max_files: break files_to_process.append(file_info) # Stop if we have enough files or no more pages if (max_files and len(files_to_process) >= max_files) or not file_list.get( "nextPageToken" ): break page_token = file_list.get("nextPageToken") # Get user information user = self.session_manager.get_user(user_id) if self.session_manager else None owner_name = user.name if user else None owner_email = user.email if user else None processor = LangflowConnectorFileProcessor( self, connection_id, files_to_process, user_id, jwt_token=jwt_token, owner_name=owner_name, owner_email=owner_email, ) # Use file IDs as items file_ids = [file_info["id"] for file_info in files_to_process] # Create custom task using TaskService task_id = await self.task_service.create_custom_task( user_id, file_ids, processor ) return task_id async def sync_specific_files( self, connection_id: str, user_id: str, file_ids: List[str], jwt_token: str = None, ) -> str: """Sync specific files by their IDs using Langflow processing""" if not self.task_service: raise ValueError( "TaskService not available - connector sync requires task service dependency" ) connector = await self.get_connector(connection_id) if not connector: raise ValueError( f"Connection '{connection_id}' not found or not authenticated" ) if not connector.is_authenticated: raise ValueError(f"Connection '{connection_id}' not authenticated") if not file_ids: raise ValueError("No file IDs provided") # Get user information user = self.session_manager.get_user(user_id) if self.session_manager else None owner_name = user.name if user else None owner_email = user.email if user else None processor = LangflowConnectorFileProcessor( self, connection_id, file_ids, user_id, jwt_token=jwt_token, owner_name=owner_name, owner_email=owner_email, ) # Create custom task using TaskService task_id = await self.task_service.create_custom_task( user_id, file_ids, processor ) return task_id async def _get_connector(self, connection_id: str) -> Optional[BaseConnector]: """Get a connector by connection ID (alias for get_connector)""" return await self.get_connector(connection_id)