Add LangflowConnectorFileProcessor for handling connector file uploads
This commit introduces the LangflowConnectorFileProcessor class, which processes connector file uploads using the Langflow service. It includes initialization parameters for user and connection details, and implements the process_item method to handle file processing asynchronously. Additionally, it cleans up the existing ConnectorFileProcessor by removing unused imports and streamlining file info retrieval. These changes enhance the code's robustness and maintainability in line with async development practices.
This commit is contained in:
parent
327b435814
commit
542082e5bf
1 changed files with 75 additions and 3 deletions
|
|
@ -1,5 +1,5 @@
|
|||
from abc import ABC, abstractmethod
|
||||
from typing import Any, Dict
|
||||
from typing import Any
|
||||
from .tasks import UploadTask, FileTask
|
||||
from utils.logging_config import get_logger
|
||||
|
||||
|
|
@ -91,10 +91,9 @@ class ConnectorFileProcessor(TaskProcessor):
|
|||
) -> None:
|
||||
"""Process a connector file using ConnectorService"""
|
||||
from models.tasks import TaskStatus
|
||||
import time
|
||||
|
||||
file_id = item # item is the connector file ID
|
||||
file_info = self.file_info_map.get(file_id)
|
||||
self.file_info_map.get(file_id)
|
||||
|
||||
# Get the connector and connection info
|
||||
connector = await self.connector_service.get_connector(self.connection_id)
|
||||
|
|
@ -126,6 +125,79 @@ class ConnectorFileProcessor(TaskProcessor):
|
|||
upload_task.successful_files += 1
|
||||
|
||||
|
||||
class LangflowConnectorFileProcessor(TaskProcessor):
|
||||
"""Processor for connector file uploads using Langflow"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
langflow_connector_service,
|
||||
connection_id: str,
|
||||
files_to_process: list,
|
||||
user_id: str = None,
|
||||
jwt_token: str = None,
|
||||
owner_name: str = None,
|
||||
owner_email: str = None,
|
||||
):
|
||||
self.langflow_connector_service = langflow_connector_service
|
||||
self.connection_id = connection_id
|
||||
self.files_to_process = files_to_process
|
||||
self.user_id = user_id
|
||||
self.jwt_token = jwt_token
|
||||
self.owner_name = owner_name
|
||||
self.owner_email = owner_email
|
||||
# Create lookup map for file info - handle both file objects and file IDs
|
||||
self.file_info_map = {}
|
||||
for f in files_to_process:
|
||||
if isinstance(f, dict):
|
||||
# Full file info objects
|
||||
self.file_info_map[f["id"]] = f
|
||||
else:
|
||||
# Just file IDs - will need to fetch metadata during processing
|
||||
self.file_info_map[f] = None
|
||||
|
||||
async def process_item(
|
||||
self, upload_task: UploadTask, item: str, file_task: FileTask
|
||||
) -> None:
|
||||
"""Process a connector file using LangflowConnectorService"""
|
||||
from models.tasks import TaskStatus
|
||||
|
||||
file_id = item # item is the connector file ID
|
||||
self.file_info_map.get(file_id)
|
||||
|
||||
# Get the connector and connection info
|
||||
connector = await self.langflow_connector_service.get_connector(
|
||||
self.connection_id
|
||||
)
|
||||
connection = (
|
||||
await self.langflow_connector_service.connection_manager.get_connection(
|
||||
self.connection_id
|
||||
)
|
||||
)
|
||||
if not connector or not connection:
|
||||
raise ValueError(f"Connection '{self.connection_id}' not found")
|
||||
|
||||
# Get file content from connector (the connector will fetch metadata if needed)
|
||||
document = await connector.get_file_content(file_id)
|
||||
|
||||
# Use the user_id passed during initialization
|
||||
if not self.user_id:
|
||||
raise ValueError("user_id not provided to LangflowConnectorFileProcessor")
|
||||
|
||||
# Process using Langflow pipeline
|
||||
result = await self.langflow_connector_service.process_connector_document(
|
||||
document,
|
||||
self.user_id,
|
||||
connection.connector_type,
|
||||
jwt_token=self.jwt_token,
|
||||
owner_name=self.owner_name,
|
||||
owner_email=self.owner_email,
|
||||
)
|
||||
|
||||
file_task.status = TaskStatus.COMPLETED
|
||||
file_task.result = result
|
||||
upload_task.successful_files += 1
|
||||
|
||||
|
||||
class S3FileProcessor(TaskProcessor):
|
||||
"""Processor for files stored in S3 buckets"""
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue