diff --git a/src/models/processors.py b/src/models/processors.py index ed5a1bb4..02836020 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Any, Dict +from typing import Any from .tasks import UploadTask, FileTask from utils.logging_config import get_logger @@ -91,10 +91,9 @@ class ConnectorFileProcessor(TaskProcessor): ) -> None: """Process a connector file using ConnectorService""" from models.tasks import TaskStatus - import time file_id = item # item is the connector file ID - file_info = self.file_info_map.get(file_id) + self.file_info_map.get(file_id) # Get the connector and connection info connector = await self.connector_service.get_connector(self.connection_id) @@ -126,6 +125,79 @@ class ConnectorFileProcessor(TaskProcessor): upload_task.successful_files += 1 +class LangflowConnectorFileProcessor(TaskProcessor): + """Processor for connector file uploads using Langflow""" + + def __init__( + self, + langflow_connector_service, + connection_id: str, + files_to_process: list, + user_id: str = None, + jwt_token: str = None, + owner_name: str = None, + owner_email: str = None, + ): + self.langflow_connector_service = langflow_connector_service + self.connection_id = connection_id + self.files_to_process = files_to_process + self.user_id = user_id + self.jwt_token = jwt_token + self.owner_name = owner_name + self.owner_email = owner_email + # Create lookup map for file info - handle both file objects and file IDs + self.file_info_map = {} + for f in files_to_process: + if isinstance(f, dict): + # Full file info objects + self.file_info_map[f["id"]] = f + else: + # Just file IDs - will need to fetch metadata during processing + self.file_info_map[f] = None + + async def process_item( + self, upload_task: UploadTask, item: str, file_task: FileTask + ) -> None: + """Process a connector file using LangflowConnectorService""" + from models.tasks import TaskStatus + + file_id = item # item is the connector file ID + self.file_info_map.get(file_id) + + # Get the connector and connection info + connector = await self.langflow_connector_service.get_connector( + self.connection_id + ) + connection = ( + await self.langflow_connector_service.connection_manager.get_connection( + self.connection_id + ) + ) + if not connector or not connection: + raise ValueError(f"Connection '{self.connection_id}' not found") + + # Get file content from connector (the connector will fetch metadata if needed) + document = await connector.get_file_content(file_id) + + # Use the user_id passed during initialization + if not self.user_id: + raise ValueError("user_id not provided to LangflowConnectorFileProcessor") + + # Process using Langflow pipeline + result = await self.langflow_connector_service.process_connector_document( + document, + self.user_id, + connection.connector_type, + jwt_token=self.jwt_token, + owner_name=self.owner_name, + owner_email=self.owner_email, + ) + + file_task.status = TaskStatus.COMPLETED + file_task.result = result + upload_task.successful_files += 1 + + class S3FileProcessor(TaskProcessor): """Processor for files stored in S3 buckets"""