openrag/src/utils/embedding_fields.py
2025-10-11 02:53:16 -04:00

178 lines
5.7 KiB
Python

"""
Utility functions for managing dynamic embedding field names in OpenSearch.
This module provides helpers for:
- Normalizing embedding model names to valid OpenSearch field names
- Generating dynamic field names based on embedding models
- Ensuring embedding fields exist in the OpenSearch index
"""
from typing import Dict, Any
from utils.logging_config import get_logger
logger = get_logger(__name__)
def normalize_model_name(model_name: str) -> str:
"""
Convert an embedding model name to a valid OpenSearch field suffix.
Examples:
- "text-embedding-3-small" -> "text_embedding_3_small"
- "nomic-embed-text:latest" -> "nomic_embed_text_latest"
- "ibm/slate-125m-english-rtrvr" -> "ibm_slate_125m_english_rtrvr"
Args:
model_name: The embedding model name (e.g., from OpenAI, Ollama, Watsonx)
Returns:
Normalized string safe for use as OpenSearch field name suffix
"""
normalized = model_name.lower()
# Replace common separators with underscores
normalized = normalized.replace("-", "_")
normalized = normalized.replace(":", "_")
normalized = normalized.replace("/", "_")
normalized = normalized.replace(".", "_")
# Remove any other non-alphanumeric characters
normalized = "".join(c if c.isalnum() or c == "_" else "_" for c in normalized)
# Remove duplicate underscores
while "__" in normalized:
normalized = normalized.replace("__", "_")
# Remove leading/trailing underscores
normalized = normalized.strip("_")
return normalized
def get_embedding_field_name(model_name: str) -> str:
"""
Get the OpenSearch field name for storing embeddings from a specific model.
Args:
model_name: The embedding model name
Returns:
Field name in format: chunk_embedding_{normalized_model_name}
Examples:
>>> get_embedding_field_name("text-embedding-3-small")
'chunk_embedding_text_embedding_3_small'
>>> get_embedding_field_name("nomic-embed-text")
'chunk_embedding_nomic_embed_text'
"""
normalized = normalize_model_name(model_name)
return f"chunk_embedding_{normalized}"
async def ensure_embedding_field_exists(
opensearch_client,
model_name: str,
index_name: str = None,
) -> str:
"""
Ensure that an embedding field for the specified model exists in the OpenSearch index.
If the field doesn't exist, it will be added dynamically using PUT mapping API.
Args:
opensearch_client: OpenSearch client instance
model_name: The embedding model name
index_name: OpenSearch index name (defaults to INDEX_NAME from settings)
Returns:
The field name that was ensured to exist
Raises:
Exception: If unable to add the field mapping
"""
from config.settings import INDEX_NAME
from utils.embeddings import get_embedding_dimensions
if index_name is None:
index_name = INDEX_NAME
field_name = get_embedding_field_name(model_name)
dimensions = await get_embedding_dimensions(model_name)
logger.info(
"Ensuring embedding field exists",
field_name=field_name,
model_name=model_name,
dimensions=dimensions,
)
async def _get_field_definition() -> Dict[str, Any]:
try:
mapping = await opensearch_client.indices.get_mapping(index=index_name)
except Exception as e:
logger.debug(
"Failed to fetch mapping before ensuring embedding field",
index=index_name,
error=str(e),
)
return {}
properties = mapping.get(index_name, {}).get("mappings", {}).get("properties", {})
return properties.get(field_name, {}) if isinstance(properties, dict) else {}
existing_definition = await _get_field_definition()
if existing_definition:
if existing_definition.get("type") != "knn_vector":
raise RuntimeError(
f"Field '{field_name}' already exists with incompatible type '{existing_definition.get('type')}'"
)
return field_name
# Define the field mapping for both the vector field and the tracking field
mapping = {
"properties": {
field_name: {
"type": "knn_vector",
"dimension": dimensions,
"method": {
"name": "disk_ann",
"engine": "jvector",
"space_type": "l2",
"parameters": {"ef_construction": 100, "m": 16},
},
},
# Also ensure the embedding_model tracking field exists as keyword
"embedding_model": {
"type": "keyword"
},
"embedding_dimensions": {
"type": "integer"
},
}
}
try:
# Try to add the mapping
# OpenSearch will ignore if field already exists
await opensearch_client.indices.put_mapping(
index=index_name,
body=mapping
)
logger.info(
"Successfully ensured embedding field exists",
field_name=field_name,
model_name=model_name,
)
except Exception as e:
logger.error(
"Failed to add embedding field mapping",
field_name=field_name,
model_name=model_name,
error=str(e),
)
raise
# Verify mapping was applied correctly
new_definition = await _get_field_definition()
if new_definition.get("type") != "knn_vector":
raise RuntimeError(
f"Failed to ensure '{field_name}' is mapped as knn_vector. Current definition: {new_definition}"
)
return field_name