Add LangflowConnectorService for managing connector documents

This commit introduces the LangflowConnectorService class, which facilitates the management and processing of connector documents using the Langflow service. It includes methods for initializing the service, retrieving connectors, processing documents, and syncing files. The implementation emphasizes asynchronous processing and robust error handling, enhancing the overall maintainability and documentation of the codebase.
This commit is contained in:
Gabriel Luiz Freitas Almeida 2025-09-08 16:40:24 -03:00
parent 542082e5bf
commit beee47068c

View file

@ -0,0 +1,290 @@
import os
import tempfile
from typing import Any, Dict, List, Optional
# Create custom processor for connector files using Langflow
from models.processors import LangflowConnectorFileProcessor
from services.langflow_file_service import LangflowFileService
from utils.logging_config import get_logger
from .base import BaseConnector, ConnectorDocument
from .connection_manager import ConnectionManager
logger = get_logger(__name__)
class LangflowConnectorService:
"""Service to manage connector documents and process them via Langflow"""
def __init__(
self,
task_service=None,
session_manager=None,
):
self.task_service = task_service
self.session_manager = session_manager
self.connection_manager = ConnectionManager()
# Initialize LangflowFileService for processing connector documents
self.langflow_service = LangflowFileService()
async def initialize(self):
"""Initialize the service by loading existing connections"""
await self.connection_manager.load_connections()
async def get_connector(self, connection_id: str) -> Optional[BaseConnector]:
"""Get a connector by connection ID"""
return await self.connection_manager.get_connector(connection_id)
async def process_connector_document(
self,
document: ConnectorDocument,
owner_user_id: str,
connector_type: str,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
) -> Dict[str, Any]:
"""Process a document from a connector using LangflowFileService pattern"""
logger.debug(
"Processing connector document via Langflow",
document_id=document.id,
filename=document.filename,
)
# Create temporary file from document content
with tempfile.NamedTemporaryFile(
delete=False, suffix=self._get_file_extension(document.mimetype)
) as tmp_file:
tmp_file.write(document.content)
tmp_file.flush()
try:
# Step 1: Upload file to Langflow
logger.debug("Uploading file to Langflow", filename=document.filename)
content = document.content
file_tuple = (
document.filename,
content,
document.mimetype or "application/octet-stream",
)
upload_result = await self.langflow_service.upload_user_file(
file_tuple, jwt_token
)
langflow_file_id = upload_result["id"]
langflow_file_path = upload_result["path"]
logger.debug(
"File uploaded to Langflow",
file_id=langflow_file_id,
path=langflow_file_path,
)
# Step 2: Run ingestion flow with the uploaded file
logger.debug(
"Running Langflow ingestion flow", file_path=langflow_file_path
)
# Use the same tweaks pattern as LangflowFileService
tweaks = {} # Let Langflow handle the ingestion with default settings
ingestion_result = await self.langflow_service.run_ingestion_flow(
file_paths=[langflow_file_path], jwt_token=jwt_token, tweaks=tweaks
)
logger.debug("Ingestion flow completed", result=ingestion_result)
# Step 3: Delete the file from Langflow
logger.debug("Deleting file from Langflow", file_id=langflow_file_id)
await self.langflow_service.delete_user_file(langflow_file_id)
logger.debug("File deleted from Langflow", file_id=langflow_file_id)
return {
"status": "indexed",
"filename": document.filename,
"source_url": document.source_url,
"document_id": document.id,
"connector_type": connector_type,
"langflow_result": ingestion_result,
}
except Exception as e:
logger.error(
"Failed to process connector document via Langflow",
document_id=document.id,
error=str(e),
)
# Try to clean up Langflow file if upload succeeded but processing failed
if "langflow_file_id" in locals():
try:
await self.langflow_service.delete_user_file(langflow_file_id)
logger.debug(
"Cleaned up Langflow file after error",
file_id=langflow_file_id,
)
except Exception as cleanup_error:
logger.warning(
"Failed to cleanup Langflow file",
file_id=langflow_file_id,
error=str(cleanup_error),
)
raise
finally:
# Clean up temporary file
os.unlink(tmp_file.name)
def _get_file_extension(self, mimetype: str) -> str:
"""Get file extension based on MIME type"""
mime_to_ext = {
"application/pdf": ".pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx",
"application/msword": ".doc",
"application/vnd.openxmlformats-officedocument.presentationml.presentation": ".pptx",
"application/vnd.ms-powerpoint": ".ppt",
"text/plain": ".txt",
"text/html": ".html",
"application/rtf": ".rtf",
"application/vnd.google-apps.document": ".pdf", # Exported as PDF
"application/vnd.google-apps.presentation": ".pdf",
"application/vnd.google-apps.spreadsheet": ".pdf",
}
return mime_to_ext.get(mimetype, ".bin")
async def sync_connector_files(
self,
connection_id: str,
user_id: str,
max_files: int = None,
jwt_token: str = None,
) -> str:
"""Sync files from a connector connection using Langflow processing"""
if not self.task_service:
raise ValueError(
"TaskService not available - connector sync requires task service dependency"
)
logger.debug(
"Starting Langflow-based sync for connection",
connection_id=connection_id,
max_files=max_files,
)
connector = await self.get_connector(connection_id)
if not connector:
raise ValueError(
f"Connection '{connection_id}' not found or not authenticated"
)
logger.debug("Got connector", authenticated=connector.is_authenticated)
if not connector.is_authenticated:
raise ValueError(f"Connection '{connection_id}' not authenticated")
# Collect files to process (limited by max_files)
files_to_process = []
page_token = None
# Calculate page size to minimize API calls
page_size = min(max_files or 100, 1000) if max_files else 100
while True:
# List files from connector with limit
logger.debug(
"Calling list_files", page_size=page_size, page_token=page_token
)
file_list = await connector.list_files(page_token, limit=page_size)
logger.debug(
"Got files from connector", file_count=len(file_list.get("files", []))
)
files = file_list["files"]
if not files:
break
for file_info in files:
if max_files and len(files_to_process) >= max_files:
break
files_to_process.append(file_info)
# Stop if we have enough files or no more pages
if (max_files and len(files_to_process) >= max_files) or not file_list.get(
"nextPageToken"
):
break
page_token = file_list.get("nextPageToken")
# Get user information
user = self.session_manager.get_user(user_id) if self.session_manager else None
owner_name = user.name if user else None
owner_email = user.email if user else None
processor = LangflowConnectorFileProcessor(
self,
connection_id,
files_to_process,
user_id,
jwt_token=jwt_token,
owner_name=owner_name,
owner_email=owner_email,
)
# Use file IDs as items
file_ids = [file_info["id"] for file_info in files_to_process]
# Create custom task using TaskService
task_id = await self.task_service.create_custom_task(
user_id, file_ids, processor
)
return task_id
async def sync_specific_files(
self,
connection_id: str,
user_id: str,
file_ids: List[str],
jwt_token: str = None,
) -> str:
"""Sync specific files by their IDs using Langflow processing"""
if not self.task_service:
raise ValueError(
"TaskService not available - connector sync requires task service dependency"
)
connector = await self.get_connector(connection_id)
if not connector:
raise ValueError(
f"Connection '{connection_id}' not found or not authenticated"
)
if not connector.is_authenticated:
raise ValueError(f"Connection '{connection_id}' not authenticated")
if not file_ids:
raise ValueError("No file IDs provided")
# Get user information
user = self.session_manager.get_user(user_id) if self.session_manager else None
owner_name = user.name if user else None
owner_email = user.email if user else None
processor = LangflowConnectorFileProcessor(
self,
connection_id,
file_ids,
user_id,
jwt_token=jwt_token,
owner_name=owner_name,
owner_email=owner_email,
)
# Create custom task using TaskService
task_id = await self.task_service.create_custom_task(
user_id, file_ids, processor
)
return task_id