ensure index if disable ingest with langflow is active
This commit is contained in:
parent
de447a6ae5
commit
c96d4943d5
1 changed files with 30 additions and 1 deletions
|
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
from starlette.requests import Request
|
from starlette.requests import Request
|
||||||
|
|
||||||
from config.settings import DISABLE_INGEST_WITH_LANGFLOW
|
from config.settings import DISABLE_INGEST_WITH_LANGFLOW, clients, INDEX_NAME, INDEX_BODY
|
||||||
from utils.logging_config import get_logger
|
from utils.logging_config import get_logger
|
||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
|
|
@ -38,7 +38,36 @@ class ConnectorRouter:
|
||||||
|
|
||||||
async def initialize(self):
|
async def initialize(self):
|
||||||
"""Initialize the active connector service."""
|
"""Initialize the active connector service."""
|
||||||
|
# Initialize OpenSearch index if using traditional OpenRAG connector service
|
||||||
|
if DISABLE_INGEST_WITH_LANGFLOW:
|
||||||
|
await self._ensure_opensearch_index()
|
||||||
|
|
||||||
return await self.get_active_service().initialize()
|
return await self.get_active_service().initialize()
|
||||||
|
|
||||||
|
async def _ensure_opensearch_index(self):
|
||||||
|
"""Ensure OpenSearch index exists when using traditional connector service."""
|
||||||
|
try:
|
||||||
|
# Check if index already exists
|
||||||
|
if await clients.opensearch.indices.exists(index=INDEX_NAME):
|
||||||
|
logger.debug("OpenSearch index already exists", index_name=INDEX_NAME)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Create the index with hard-coded INDEX_BODY (uses OpenAI embedding dimensions)
|
||||||
|
await clients.opensearch.indices.create(index=INDEX_NAME, body=INDEX_BODY)
|
||||||
|
logger.info(
|
||||||
|
"Created OpenSearch index for traditional connector service",
|
||||||
|
index_name=INDEX_NAME,
|
||||||
|
vector_dimensions=INDEX_BODY["mappings"]["properties"]["chunk_embedding"]["dimension"]
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
"Failed to initialize OpenSearch index for traditional connector service",
|
||||||
|
error=str(e),
|
||||||
|
index_name=INDEX_NAME
|
||||||
|
)
|
||||||
|
# Don't raise the exception to avoid breaking the initialization
|
||||||
|
# The service can still function, document operations might fail later
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def connection_manager(self):
|
def connection_manager(self):
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue