178 lines
5.7 KiB
Python
178 lines
5.7 KiB
Python
"""
|
|
Utility functions for managing dynamic embedding field names in OpenSearch.
|
|
|
|
This module provides helpers for:
|
|
- Normalizing embedding model names to valid OpenSearch field names
|
|
- Generating dynamic field names based on embedding models
|
|
- Ensuring embedding fields exist in the OpenSearch index
|
|
"""
|
|
|
|
from typing import Dict, Any
|
|
|
|
from utils.logging_config import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
def normalize_model_name(model_name: str) -> str:
|
|
"""
|
|
Convert an embedding model name to a valid OpenSearch field suffix.
|
|
|
|
Examples:
|
|
- "text-embedding-3-small" -> "text_embedding_3_small"
|
|
- "nomic-embed-text:latest" -> "nomic_embed_text_latest"
|
|
- "ibm/slate-125m-english-rtrvr" -> "ibm_slate_125m_english_rtrvr"
|
|
|
|
Args:
|
|
model_name: The embedding model name (e.g., from OpenAI, Ollama, Watsonx)
|
|
|
|
Returns:
|
|
Normalized string safe for use as OpenSearch field name suffix
|
|
"""
|
|
normalized = model_name.lower()
|
|
# Replace common separators with underscores
|
|
normalized = normalized.replace("-", "_")
|
|
normalized = normalized.replace(":", "_")
|
|
normalized = normalized.replace("/", "_")
|
|
normalized = normalized.replace(".", "_")
|
|
# Remove any other non-alphanumeric characters
|
|
normalized = "".join(c if c.isalnum() or c == "_" else "_" for c in normalized)
|
|
# Remove duplicate underscores
|
|
while "__" in normalized:
|
|
normalized = normalized.replace("__", "_")
|
|
# Remove leading/trailing underscores
|
|
normalized = normalized.strip("_")
|
|
|
|
return normalized
|
|
|
|
|
|
def get_embedding_field_name(model_name: str) -> str:
|
|
"""
|
|
Get the OpenSearch field name for storing embeddings from a specific model.
|
|
|
|
Args:
|
|
model_name: The embedding model name
|
|
|
|
Returns:
|
|
Field name in format: chunk_embedding_{normalized_model_name}
|
|
|
|
Examples:
|
|
>>> get_embedding_field_name("text-embedding-3-small")
|
|
'chunk_embedding_text_embedding_3_small'
|
|
>>> get_embedding_field_name("nomic-embed-text")
|
|
'chunk_embedding_nomic_embed_text'
|
|
"""
|
|
normalized = normalize_model_name(model_name)
|
|
return f"chunk_embedding_{normalized}"
|
|
|
|
|
|
async def ensure_embedding_field_exists(
|
|
opensearch_client,
|
|
model_name: str,
|
|
index_name: str = None,
|
|
) -> str:
|
|
"""
|
|
Ensure that an embedding field for the specified model exists in the OpenSearch index.
|
|
If the field doesn't exist, it will be added dynamically using PUT mapping API.
|
|
|
|
Args:
|
|
opensearch_client: OpenSearch client instance
|
|
model_name: The embedding model name
|
|
index_name: OpenSearch index name (defaults to INDEX_NAME from settings)
|
|
|
|
Returns:
|
|
The field name that was ensured to exist
|
|
|
|
Raises:
|
|
Exception: If unable to add the field mapping
|
|
"""
|
|
from config.settings import INDEX_NAME
|
|
from utils.embeddings import get_embedding_dimensions
|
|
|
|
if index_name is None:
|
|
index_name = INDEX_NAME
|
|
|
|
field_name = get_embedding_field_name(model_name)
|
|
dimensions = await get_embedding_dimensions(model_name)
|
|
|
|
logger.info(
|
|
"Ensuring embedding field exists",
|
|
field_name=field_name,
|
|
model_name=model_name,
|
|
dimensions=dimensions,
|
|
)
|
|
|
|
async def _get_field_definition() -> Dict[str, Any]:
|
|
try:
|
|
mapping = await opensearch_client.indices.get_mapping(index=index_name)
|
|
except Exception as e:
|
|
logger.debug(
|
|
"Failed to fetch mapping before ensuring embedding field",
|
|
index=index_name,
|
|
error=str(e),
|
|
)
|
|
return {}
|
|
|
|
properties = mapping.get(index_name, {}).get("mappings", {}).get("properties", {})
|
|
return properties.get(field_name, {}) if isinstance(properties, dict) else {}
|
|
|
|
existing_definition = await _get_field_definition()
|
|
if existing_definition:
|
|
if existing_definition.get("type") != "knn_vector":
|
|
raise RuntimeError(
|
|
f"Field '{field_name}' already exists with incompatible type '{existing_definition.get('type')}'"
|
|
)
|
|
return field_name
|
|
|
|
# Define the field mapping for both the vector field and the tracking field
|
|
mapping = {
|
|
"properties": {
|
|
field_name: {
|
|
"type": "knn_vector",
|
|
"dimension": dimensions,
|
|
"method": {
|
|
"name": "disk_ann",
|
|
"engine": "jvector",
|
|
"space_type": "l2",
|
|
"parameters": {"ef_construction": 100, "m": 16},
|
|
},
|
|
},
|
|
# Also ensure the embedding_model tracking field exists as keyword
|
|
"embedding_model": {
|
|
"type": "keyword"
|
|
},
|
|
"embedding_dimensions": {
|
|
"type": "integer"
|
|
},
|
|
}
|
|
}
|
|
|
|
try:
|
|
# Try to add the mapping
|
|
# OpenSearch will ignore if field already exists
|
|
await opensearch_client.indices.put_mapping(
|
|
index=index_name,
|
|
body=mapping
|
|
)
|
|
logger.info(
|
|
"Successfully ensured embedding field exists",
|
|
field_name=field_name,
|
|
model_name=model_name,
|
|
)
|
|
except Exception as e:
|
|
logger.error(
|
|
"Failed to add embedding field mapping",
|
|
field_name=field_name,
|
|
model_name=model_name,
|
|
error=str(e),
|
|
)
|
|
raise
|
|
|
|
# Verify mapping was applied correctly
|
|
new_definition = await _get_field_definition()
|
|
if new_definition.get("type") != "knn_vector":
|
|
raise RuntimeError(
|
|
f"Failed to ensure '{field_name}' is mapped as knn_vector. Current definition: {new_definition}"
|
|
)
|
|
|
|
return field_name
|