From 542082e5bf2a96a760c3b7e4d6c455e5b81b97d6 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 8 Sep 2025 16:40:06 -0300 Subject: [PATCH] Add LangflowConnectorFileProcessor for handling connector file uploads This commit introduces the LangflowConnectorFileProcessor class, which processes connector file uploads using the Langflow service. It includes initialization parameters for user and connection details, and implements the process_item method to handle file processing asynchronously. Additionally, it cleans up the existing ConnectorFileProcessor by removing unused imports and streamlining file info retrieval. These changes enhance the code's robustness and maintainability in line with async development practices. --- src/models/processors.py | 78 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 75 insertions(+), 3 deletions(-) diff --git a/src/models/processors.py b/src/models/processors.py index ed5a1bb4..02836020 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Any, Dict +from typing import Any from .tasks import UploadTask, FileTask from utils.logging_config import get_logger @@ -91,10 +91,9 @@ class ConnectorFileProcessor(TaskProcessor): ) -> None: """Process a connector file using ConnectorService""" from models.tasks import TaskStatus - import time file_id = item # item is the connector file ID - file_info = self.file_info_map.get(file_id) + self.file_info_map.get(file_id) # Get the connector and connection info connector = await self.connector_service.get_connector(self.connection_id) @@ -126,6 +125,79 @@ class ConnectorFileProcessor(TaskProcessor): upload_task.successful_files += 1 +class LangflowConnectorFileProcessor(TaskProcessor): + """Processor for connector file uploads using Langflow""" + + def __init__( + self, + langflow_connector_service, + connection_id: str, + files_to_process: list, + user_id: str = None, + jwt_token: str = None, + owner_name: str = None, + owner_email: str = None, + ): + self.langflow_connector_service = langflow_connector_service + self.connection_id = connection_id + self.files_to_process = files_to_process + self.user_id = user_id + self.jwt_token = jwt_token + self.owner_name = owner_name + self.owner_email = owner_email + # Create lookup map for file info - handle both file objects and file IDs + self.file_info_map = {} + for f in files_to_process: + if isinstance(f, dict): + # Full file info objects + self.file_info_map[f["id"]] = f + else: + # Just file IDs - will need to fetch metadata during processing + self.file_info_map[f] = None + + async def process_item( + self, upload_task: UploadTask, item: str, file_task: FileTask + ) -> None: + """Process a connector file using LangflowConnectorService""" + from models.tasks import TaskStatus + + file_id = item # item is the connector file ID + self.file_info_map.get(file_id) + + # Get the connector and connection info + connector = await self.langflow_connector_service.get_connector( + self.connection_id + ) + connection = ( + await self.langflow_connector_service.connection_manager.get_connection( + self.connection_id + ) + ) + if not connector or not connection: + raise ValueError(f"Connection '{self.connection_id}' not found") + + # Get file content from connector (the connector will fetch metadata if needed) + document = await connector.get_file_content(file_id) + + # Use the user_id passed during initialization + if not self.user_id: + raise ValueError("user_id not provided to LangflowConnectorFileProcessor") + + # Process using Langflow pipeline + result = await self.langflow_connector_service.process_connector_document( + document, + self.user_id, + connection.connector_type, + jwt_token=self.jwt_token, + owner_name=self.owner_name, + owner_email=self.owner_email, + ) + + file_task.status = TaskStatus.COMPLETED + file_task.result = result + upload_task.successful_files += 1 + + class S3FileProcessor(TaskProcessor): """Processor for files stored in S3 buckets"""