update to routers

This commit is contained in:
Edwin Jose 2025-09-08 19:31:42 -04:00
parent ed41ce0a67
commit e0117f5cd1
5 changed files with 94 additions and 4 deletions

View file

@ -133,8 +133,8 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD
const formData = new FormData()
formData.append('file', files[0])
// Use unified upload and ingest endpoint
const uploadIngestRes = await fetch('/api/langflow/upload_ingest', {
// Use router upload and ingest endpoint (automatically routes based on configuration)
const uploadIngestRes = await fetch('/api/router/upload_ingest', {
method: 'POST',
body: formData,
})

View file

@ -51,7 +51,7 @@ function AdminPage() {
const formData = new FormData()
formData.append("file", selectedFile)
const response = await fetch("/api/upload", {
const response = await fetch("/api/router/upload_ingest", {
method: "POST",
body: formData,
})

View file

@ -0,0 +1,67 @@
"""Connector router that automatically routes based on configuration settings."""
from starlette.requests import Request
from config.settings import DISABLE_INGEST_WITH_LANGFLOW
from utils.logging_config import get_logger
logger = get_logger(__name__)
class ConnectorRouter:
"""
Router that automatically chooses between LangflowConnectorService and ConnectorService
based on the DISABLE_INGEST_WITH_LANGFLOW configuration.
- If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses LangflowConnectorService
- If DISABLE_INGEST_WITH_LANGFLOW is True: uses traditional ConnectorService
"""
def __init__(self, langflow_connector_service, openrag_connector_service):
self.langflow_connector_service = langflow_connector_service
self.openrag_connector_service = openrag_connector_service
logger.debug(
"ConnectorRouter initialized",
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW
)
def get_active_service(self):
"""Get the currently active connector service based on configuration."""
if DISABLE_INGEST_WITH_LANGFLOW:
logger.debug("Using traditional OpenRAG connector service")
return self.openrag_connector_service
else:
logger.debug("Using Langflow connector service")
return self.langflow_connector_service
# Proxy all connector service methods to the active service
async def initialize(self):
"""Initialize the active connector service."""
return await self.get_active_service().initialize()
@property
def connection_manager(self):
"""Get the connection manager from the active service."""
return self.get_active_service().connection_manager
async def get_connector(self, connection_id: str):
"""Get a connector instance from the active service."""
return await self.get_active_service().get_connector(connection_id)
async def sync_specific_files(self, connection_id: str, user_id: str, file_list: list, jwt_token: str = None):
"""Sync specific files using the active service."""
return await self.get_active_service().sync_specific_files(
connection_id, user_id, file_list, jwt_token
)
def __getattr__(self, name):
"""
Proxy any other method calls to the active service.
This ensures compatibility with any methods we might have missed.
"""
active_service = self.get_active_service()
if hasattr(active_service, name):
return getattr(active_service, name)
else:
raise AttributeError(f"'{type(active_service).__name__}' object has no attribute '{name}'")

View file

@ -33,6 +33,10 @@ async def upload_ingest_router(
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW
)
# Route directly without task creation
# Note: Single file uploads are processed synchronously and don't need task tracking
# Tasks are only used for bulk operations (folders, S3 buckets, etc.)
if DISABLE_INGEST_WITH_LANGFLOW:
# Route to traditional OpenRAG upload
logger.debug("Routing to traditional OpenRAG upload")

View file

@ -2,6 +2,7 @@ import sys
# Configure structured logging early
from connectors.langflow_connector_service import LangflowConnectorService
from connectors.service import ConnectorService
from utils.logging_config import configure_from_env, get_logger
configure_from_env()
@ -44,6 +45,7 @@ from auth_middleware import optional_auth, require_auth
# Configuration and setup
from config.settings import (
DISABLE_INGEST_WITH_LANGFLOW,
EMBED_MODEL,
INDEX_BODY,
INDEX_NAME,
SESSION_SECRET,
@ -52,6 +54,7 @@ from config.settings import (
)
# Existing services
from api.connector_router import ConnectorRouter
from services.auth_service import AuthService
from services.chat_service import ChatService
@ -390,10 +393,26 @@ async def initialize_services():
document_service.process_pool = process_pool
# Initialize connector service
connector_service = LangflowConnectorService(
# Initialize both connector services
langflow_connector_service = LangflowConnectorService(
task_service=task_service,
session_manager=session_manager,
)
openrag_connector_service = ConnectorService(
patched_async_client=clients.patched_async_client,
process_pool=process_pool,
embed_model=EMBED_MODEL,
index_name=INDEX_NAME,
task_service=task_service,
session_manager=session_manager,
)
# Create connector router that chooses based on configuration
connector_service = ConnectorRouter(
langflow_connector_service=langflow_connector_service,
openrag_connector_service=openrag_connector_service
)
# Initialize auth service
auth_service = AuthService(session_manager, connector_service)