From e0117f5cd1647952b4e823e8473955403981f4a4 Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Mon, 8 Sep 2025 19:31:42 -0400 Subject: [PATCH] update to routers --- frontend/components/knowledge-dropdown.tsx | 4 +- frontend/src/app/admin/page.tsx | 2 +- src/api/connector_router.py | 67 ++++++++++++++++++++++ src/api/router.py | 4 ++ src/main.py | 21 ++++++- 5 files changed, 94 insertions(+), 4 deletions(-) create mode 100644 src/api/connector_router.py diff --git a/frontend/components/knowledge-dropdown.tsx b/frontend/components/knowledge-dropdown.tsx index 8088964b..481a45b1 100644 --- a/frontend/components/knowledge-dropdown.tsx +++ b/frontend/components/knowledge-dropdown.tsx @@ -133,8 +133,8 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD const formData = new FormData() formData.append('file', files[0]) - // Use unified upload and ingest endpoint - const uploadIngestRes = await fetch('/api/langflow/upload_ingest', { + // Use router upload and ingest endpoint (automatically routes based on configuration) + const uploadIngestRes = await fetch('/api/router/upload_ingest', { method: 'POST', body: formData, }) diff --git a/frontend/src/app/admin/page.tsx b/frontend/src/app/admin/page.tsx index c3262156..6cb8aa96 100644 --- a/frontend/src/app/admin/page.tsx +++ b/frontend/src/app/admin/page.tsx @@ -51,7 +51,7 @@ function AdminPage() { const formData = new FormData() formData.append("file", selectedFile) - const response = await fetch("/api/upload", { + const response = await fetch("/api/router/upload_ingest", { method: "POST", body: formData, }) diff --git a/src/api/connector_router.py b/src/api/connector_router.py new file mode 100644 index 00000000..2a692ae4 --- /dev/null +++ b/src/api/connector_router.py @@ -0,0 +1,67 @@ +"""Connector router that automatically routes based on configuration settings.""" + +from starlette.requests import Request + +from config.settings import DISABLE_INGEST_WITH_LANGFLOW +from utils.logging_config import get_logger + +logger = get_logger(__name__) + + +class ConnectorRouter: + """ + Router that automatically chooses between LangflowConnectorService and ConnectorService + based on the DISABLE_INGEST_WITH_LANGFLOW configuration. + + - If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses LangflowConnectorService + - If DISABLE_INGEST_WITH_LANGFLOW is True: uses traditional ConnectorService + """ + + def __init__(self, langflow_connector_service, openrag_connector_service): + self.langflow_connector_service = langflow_connector_service + self.openrag_connector_service = openrag_connector_service + logger.debug( + "ConnectorRouter initialized", + disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW + ) + + def get_active_service(self): + """Get the currently active connector service based on configuration.""" + if DISABLE_INGEST_WITH_LANGFLOW: + logger.debug("Using traditional OpenRAG connector service") + return self.openrag_connector_service + else: + logger.debug("Using Langflow connector service") + return self.langflow_connector_service + + # Proxy all connector service methods to the active service + + async def initialize(self): + """Initialize the active connector service.""" + return await self.get_active_service().initialize() + + @property + def connection_manager(self): + """Get the connection manager from the active service.""" + return self.get_active_service().connection_manager + + async def get_connector(self, connection_id: str): + """Get a connector instance from the active service.""" + return await self.get_active_service().get_connector(connection_id) + + async def sync_specific_files(self, connection_id: str, user_id: str, file_list: list, jwt_token: str = None): + """Sync specific files using the active service.""" + return await self.get_active_service().sync_specific_files( + connection_id, user_id, file_list, jwt_token + ) + + def __getattr__(self, name): + """ + Proxy any other method calls to the active service. + This ensures compatibility with any methods we might have missed. + """ + active_service = self.get_active_service() + if hasattr(active_service, name): + return getattr(active_service, name) + else: + raise AttributeError(f"'{type(active_service).__name__}' object has no attribute '{name}'") diff --git a/src/api/router.py b/src/api/router.py index 0dbc3ee2..518217ee 100644 --- a/src/api/router.py +++ b/src/api/router.py @@ -33,6 +33,10 @@ async def upload_ingest_router( disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW ) + # Route directly without task creation + # Note: Single file uploads are processed synchronously and don't need task tracking + # Tasks are only used for bulk operations (folders, S3 buckets, etc.) + if DISABLE_INGEST_WITH_LANGFLOW: # Route to traditional OpenRAG upload logger.debug("Routing to traditional OpenRAG upload") diff --git a/src/main.py b/src/main.py index f59136aa..c941e53c 100644 --- a/src/main.py +++ b/src/main.py @@ -2,6 +2,7 @@ import sys # Configure structured logging early from connectors.langflow_connector_service import LangflowConnectorService +from connectors.service import ConnectorService from utils.logging_config import configure_from_env, get_logger configure_from_env() @@ -44,6 +45,7 @@ from auth_middleware import optional_auth, require_auth # Configuration and setup from config.settings import ( DISABLE_INGEST_WITH_LANGFLOW, + EMBED_MODEL, INDEX_BODY, INDEX_NAME, SESSION_SECRET, @@ -52,6 +54,7 @@ from config.settings import ( ) # Existing services +from api.connector_router import ConnectorRouter from services.auth_service import AuthService from services.chat_service import ChatService @@ -390,10 +393,26 @@ async def initialize_services(): document_service.process_pool = process_pool # Initialize connector service - connector_service = LangflowConnectorService( + + # Initialize both connector services + langflow_connector_service = LangflowConnectorService( task_service=task_service, session_manager=session_manager, ) + openrag_connector_service = ConnectorService( + patched_async_client=clients.patched_async_client, + process_pool=process_pool, + embed_model=EMBED_MODEL, + index_name=INDEX_NAME, + task_service=task_service, + session_manager=session_manager, + ) + + # Create connector router that chooses based on configuration + connector_service = ConnectorRouter( + langflow_connector_service=langflow_connector_service, + openrag_connector_service=openrag_connector_service + ) # Initialize auth service auth_service = AuthService(session_manager, connector_service)