From 343dc442dd660e61cc56ede51d2c10cf4fa51f9c Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Fri, 26 Sep 2025 17:13:53 -0300 Subject: [PATCH] initialize index on startup when feature flag is enabled --- src/api/connector_router.py | 60 +++++++++++++------------------------ src/main.py | 57 ++++++++++++++++++++++++++++------- 2 files changed, 67 insertions(+), 50 deletions(-) diff --git a/src/api/connector_router.py b/src/api/connector_router.py index c7ceb756..dd98e474 100644 --- a/src/api/connector_router.py +++ b/src/api/connector_router.py @@ -2,7 +2,12 @@ from starlette.requests import Request -from config.settings import DISABLE_INGEST_WITH_LANGFLOW, clients, INDEX_NAME, INDEX_BODY +from config.settings import ( + DISABLE_INGEST_WITH_LANGFLOW, + clients, + INDEX_NAME, + INDEX_BODY, +) from utils.logging_config import get_logger logger = get_logger(__name__) @@ -12,19 +17,19 @@ class ConnectorRouter: """ Router that automatically chooses between LangflowConnectorService and ConnectorService based on the DISABLE_INGEST_WITH_LANGFLOW configuration. - + - If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses LangflowConnectorService - If DISABLE_INGEST_WITH_LANGFLOW is True: uses traditional ConnectorService """ - + def __init__(self, langflow_connector_service, openrag_connector_service): self.langflow_connector_service = langflow_connector_service self.openrag_connector_service = openrag_connector_service logger.debug( "ConnectorRouter initialized", - disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW + disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW, ) - + def get_active_service(self): """Get the currently active connector service based on configuration.""" if DISABLE_INGEST_WITH_LANGFLOW: @@ -33,57 +38,32 @@ class ConnectorRouter: else: logger.debug("Using Langflow connector service") return self.langflow_connector_service - + # Proxy all connector service methods to the active service - + async def initialize(self): """Initialize the active connector service.""" # Initialize OpenSearch index if using traditional OpenRAG connector service - if DISABLE_INGEST_WITH_LANGFLOW: - await self._ensure_opensearch_index() return await self.get_active_service().initialize() - async def _ensure_opensearch_index(self): - """Ensure OpenSearch index exists when using traditional connector service.""" - try: - # Check if index already exists - if await clients.opensearch.indices.exists(index=INDEX_NAME): - logger.debug("OpenSearch index already exists", index_name=INDEX_NAME) - return - - # Create the index with hard-coded INDEX_BODY (uses OpenAI embedding dimensions) - await clients.opensearch.indices.create(index=INDEX_NAME, body=INDEX_BODY) - logger.info( - "Created OpenSearch index for traditional connector service", - index_name=INDEX_NAME, - vector_dimensions=INDEX_BODY["mappings"]["properties"]["chunk_embedding"]["dimension"] - ) - - except Exception as e: - logger.error( - "Failed to initialize OpenSearch index for traditional connector service", - error=str(e), - index_name=INDEX_NAME - ) - # Don't raise the exception to avoid breaking the initialization - # The service can still function, document operations might fail later - @property def connection_manager(self): """Get the connection manager from the active service.""" return self.get_active_service().connection_manager - + async def get_connector(self, connection_id: str): """Get a connector instance from the active service.""" return await self.get_active_service().get_connector(connection_id) - - async def sync_specific_files(self, connection_id: str, user_id: str, file_list: list, jwt_token: str = None): + + async def sync_specific_files( + self, connection_id: str, user_id: str, file_list: list, jwt_token: str = None + ): """Sync specific files using the active service.""" return await self.get_active_service().sync_specific_files( connection_id, user_id, file_list, jwt_token ) - + def __getattr__(self, name): """ Proxy any other method calls to the active service. @@ -93,4 +73,6 @@ class ConnectorRouter: if hasattr(active_service, name): return getattr(active_service, name) else: - raise AttributeError(f"'{type(active_service).__name__}' object has no attribute '{name}'") + raise AttributeError( + f"'{type(active_service).__name__}' object has no attribute '{name}'" + ) diff --git a/src/main.py b/src/main.py index 69f2ad9f..4ed57b75 100644 --- a/src/main.py +++ b/src/main.py @@ -53,6 +53,7 @@ from auth_middleware import optional_auth, require_auth from config.settings import ( DISABLE_INGEST_WITH_LANGFLOW, EMBED_MODEL, + INDEX_BODY, INDEX_NAME, SESSION_SECRET, clients, @@ -82,6 +83,7 @@ logger.info( cuda_version=torch.version.cuda, ) + async def wait_for_opensearch(): """Wait for OpenSearch to be ready with retries""" max_retries = 30 @@ -128,6 +130,34 @@ async def configure_alerting_security(): # Don't fail startup if alerting config fails +async def _ensure_opensearch_index(self): + """Ensure OpenSearch index exists when using traditional connector service.""" + try: + # Check if index already exists + if await clients.opensearch.indices.exists(index=INDEX_NAME): + logger.debug("OpenSearch index already exists", index_name=INDEX_NAME) + return + + # Create the index with hard-coded INDEX_BODY (uses OpenAI embedding dimensions) + await clients.opensearch.indices.create(index=INDEX_NAME, body=INDEX_BODY) + logger.info( + "Created OpenSearch index for traditional connector service", + index_name=INDEX_NAME, + vector_dimensions=INDEX_BODY["mappings"]["properties"]["chunk_embedding"][ + "dimension" + ], + ) + + except Exception as e: + logger.error( + "Failed to initialize OpenSearch index for traditional connector service", + error=str(e), + index_name=INDEX_NAME, + ) + # Don't raise the exception to avoid breaking the initialization + # The service can still function, document operations might fail later + + async def init_index(): """Initialize OpenSearch index and security roles""" await wait_for_opensearch() @@ -141,10 +171,20 @@ async def init_index(): # Create documents index if not await clients.opensearch.indices.exists(index=INDEX_NAME): - await clients.opensearch.indices.create(index=INDEX_NAME, body=dynamic_index_body) - logger.info("Created OpenSearch index", index_name=INDEX_NAME, embedding_model=embedding_model) + await clients.opensearch.indices.create( + index=INDEX_NAME, body=dynamic_index_body + ) + logger.info( + "Created OpenSearch index", + index_name=INDEX_NAME, + embedding_model=embedding_model, + ) else: - logger.info("Index already exists, skipping creation", index_name=INDEX_NAME, embedding_model=embedding_model) + logger.info( + "Index already exists, skipping creation", + index_name=INDEX_NAME, + embedding_model=embedding_model, + ) # Create knowledge filters index knowledge_filter_index_name = "knowledge_filters" @@ -402,6 +442,9 @@ async def startup_tasks(services): # Index will be created after onboarding when we know the embedding model await wait_for_opensearch() + if DISABLE_INGEST_WITH_LANGFLOW: + await _ensure_opensearch_index() + # Configure alerting security await configure_alerting_security() @@ -1075,14 +1118,6 @@ async def create_app(): return app -async def startup(): - """Application startup tasks""" - await init_index() - # Get services from app state if needed for initialization - # services = app.state.services - # await services['connector_service'].initialize() - - def cleanup(): """Cleanup on application shutdown""" # Cleanup process pools only (webhooks handled by Starlette shutdown)