initialize index on startup when feature flag is enabled

This commit is contained in:
Lucas Oliveira 2025-09-26 17:13:53 -03:00
parent 44e4f3d0d6
commit 343dc442dd
2 changed files with 67 additions and 50 deletions

View file

@ -2,7 +2,12 @@
from starlette.requests import Request
from config.settings import DISABLE_INGEST_WITH_LANGFLOW, clients, INDEX_NAME, INDEX_BODY
from config.settings import (
DISABLE_INGEST_WITH_LANGFLOW,
clients,
INDEX_NAME,
INDEX_BODY,
)
from utils.logging_config import get_logger
logger = get_logger(__name__)
@ -12,19 +17,19 @@ class ConnectorRouter:
"""
Router that automatically chooses between LangflowConnectorService and ConnectorService
based on the DISABLE_INGEST_WITH_LANGFLOW configuration.
- If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses LangflowConnectorService
- If DISABLE_INGEST_WITH_LANGFLOW is True: uses traditional ConnectorService
"""
def __init__(self, langflow_connector_service, openrag_connector_service):
self.langflow_connector_service = langflow_connector_service
self.openrag_connector_service = openrag_connector_service
logger.debug(
"ConnectorRouter initialized",
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW,
)
def get_active_service(self):
"""Get the currently active connector service based on configuration."""
if DISABLE_INGEST_WITH_LANGFLOW:
@ -33,57 +38,32 @@ class ConnectorRouter:
else:
logger.debug("Using Langflow connector service")
return self.langflow_connector_service
# Proxy all connector service methods to the active service
async def initialize(self):
"""Initialize the active connector service."""
# Initialize OpenSearch index if using traditional OpenRAG connector service
if DISABLE_INGEST_WITH_LANGFLOW:
await self._ensure_opensearch_index()
return await self.get_active_service().initialize()
async def _ensure_opensearch_index(self):
"""Ensure OpenSearch index exists when using traditional connector service."""
try:
# Check if index already exists
if await clients.opensearch.indices.exists(index=INDEX_NAME):
logger.debug("OpenSearch index already exists", index_name=INDEX_NAME)
return
# Create the index with hard-coded INDEX_BODY (uses OpenAI embedding dimensions)
await clients.opensearch.indices.create(index=INDEX_NAME, body=INDEX_BODY)
logger.info(
"Created OpenSearch index for traditional connector service",
index_name=INDEX_NAME,
vector_dimensions=INDEX_BODY["mappings"]["properties"]["chunk_embedding"]["dimension"]
)
except Exception as e:
logger.error(
"Failed to initialize OpenSearch index for traditional connector service",
error=str(e),
index_name=INDEX_NAME
)
# Don't raise the exception to avoid breaking the initialization
# The service can still function, document operations might fail later
@property
def connection_manager(self):
"""Get the connection manager from the active service."""
return self.get_active_service().connection_manager
async def get_connector(self, connection_id: str):
"""Get a connector instance from the active service."""
return await self.get_active_service().get_connector(connection_id)
async def sync_specific_files(self, connection_id: str, user_id: str, file_list: list, jwt_token: str = None):
async def sync_specific_files(
self, connection_id: str, user_id: str, file_list: list, jwt_token: str = None
):
"""Sync specific files using the active service."""
return await self.get_active_service().sync_specific_files(
connection_id, user_id, file_list, jwt_token
)
def __getattr__(self, name):
"""
Proxy any other method calls to the active service.
@ -93,4 +73,6 @@ class ConnectorRouter:
if hasattr(active_service, name):
return getattr(active_service, name)
else:
raise AttributeError(f"'{type(active_service).__name__}' object has no attribute '{name}'")
raise AttributeError(
f"'{type(active_service).__name__}' object has no attribute '{name}'"
)

View file

@ -53,6 +53,7 @@ from auth_middleware import optional_auth, require_auth
from config.settings import (
DISABLE_INGEST_WITH_LANGFLOW,
EMBED_MODEL,
INDEX_BODY,
INDEX_NAME,
SESSION_SECRET,
clients,
@ -82,6 +83,7 @@ logger.info(
cuda_version=torch.version.cuda,
)
async def wait_for_opensearch():
"""Wait for OpenSearch to be ready with retries"""
max_retries = 30
@ -128,6 +130,34 @@ async def configure_alerting_security():
# Don't fail startup if alerting config fails
async def _ensure_opensearch_index(self):
"""Ensure OpenSearch index exists when using traditional connector service."""
try:
# Check if index already exists
if await clients.opensearch.indices.exists(index=INDEX_NAME):
logger.debug("OpenSearch index already exists", index_name=INDEX_NAME)
return
# Create the index with hard-coded INDEX_BODY (uses OpenAI embedding dimensions)
await clients.opensearch.indices.create(index=INDEX_NAME, body=INDEX_BODY)
logger.info(
"Created OpenSearch index for traditional connector service",
index_name=INDEX_NAME,
vector_dimensions=INDEX_BODY["mappings"]["properties"]["chunk_embedding"][
"dimension"
],
)
except Exception as e:
logger.error(
"Failed to initialize OpenSearch index for traditional connector service",
error=str(e),
index_name=INDEX_NAME,
)
# Don't raise the exception to avoid breaking the initialization
# The service can still function, document operations might fail later
async def init_index():
"""Initialize OpenSearch index and security roles"""
await wait_for_opensearch()
@ -141,10 +171,20 @@ async def init_index():
# Create documents index
if not await clients.opensearch.indices.exists(index=INDEX_NAME):
await clients.opensearch.indices.create(index=INDEX_NAME, body=dynamic_index_body)
logger.info("Created OpenSearch index", index_name=INDEX_NAME, embedding_model=embedding_model)
await clients.opensearch.indices.create(
index=INDEX_NAME, body=dynamic_index_body
)
logger.info(
"Created OpenSearch index",
index_name=INDEX_NAME,
embedding_model=embedding_model,
)
else:
logger.info("Index already exists, skipping creation", index_name=INDEX_NAME, embedding_model=embedding_model)
logger.info(
"Index already exists, skipping creation",
index_name=INDEX_NAME,
embedding_model=embedding_model,
)
# Create knowledge filters index
knowledge_filter_index_name = "knowledge_filters"
@ -402,6 +442,9 @@ async def startup_tasks(services):
# Index will be created after onboarding when we know the embedding model
await wait_for_opensearch()
if DISABLE_INGEST_WITH_LANGFLOW:
await _ensure_opensearch_index()
# Configure alerting security
await configure_alerting_security()
@ -1075,14 +1118,6 @@ async def create_app():
return app
async def startup():
"""Application startup tasks"""
await init_index()
# Get services from app state if needed for initialization
# services = app.state.services
# await services['connector_service'].initialize()
def cleanup():
"""Cleanup on application shutdown"""
# Cleanup process pools only (webhooks handled by Starlette shutdown)