diff --git a/src/config/settings.py b/src/config/settings.py index 0672ad68..10e875ea 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -113,6 +113,8 @@ INDEX_BODY = { "mimetype": {"type": "keyword"}, "page": {"type": "integer"}, "text": {"type": "text"}, + # Legacy field - kept for backward compatibility + # New documents will use chunk_embedding_{model_name} fields "chunk_embedding": { "type": "knn_vector", "dimension": VECTOR_DIM, @@ -123,6 +125,8 @@ INDEX_BODY = { "parameters": {"ef_construction": 100, "m": 16}, }, }, + # Track which embedding model was used for this chunk + "embedding_model": {"type": "keyword"}, "source_url": {"type": "keyword"}, "connector_type": {"type": "keyword"}, "owner": {"type": "keyword"}, diff --git a/src/models/processors.py b/src/models/processors.py index 4a5d96b5..2360ddd3 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -156,15 +156,23 @@ class TaskProcessor: owner_email: str = None, file_size: int = None, connector_type: str = "local", + embedding_model: str = None, ): """ Standard processing pipeline for non-Langflow processors: docling conversion + embeddings + OpenSearch indexing. + + Args: + embedding_model: Embedding model to use (defaults to EMBED_MODEL from settings) """ import datetime from config.settings import INDEX_NAME, EMBED_MODEL, clients from services.document_service import chunk_texts_for_embeddings from utils.document_processing import extract_relevant + from utils.embedding_fields import get_embedding_field_name, ensure_embedding_field_exists + + # Use provided embedding model or fall back to default + embedding_model = embedding_model or EMBED_MODEL # Get user's OpenSearch client with JWT for OIDC auth opensearch_client = self.document_service.session_manager.get_user_opensearch_client( @@ -175,6 +183,18 @@ class TaskProcessor: if await self.check_document_exists(file_hash, opensearch_client): return {"status": "unchanged", "id": file_hash} + # Ensure the embedding field exists for this model + embedding_field_name = await ensure_embedding_field_exists( + opensearch_client, embedding_model, INDEX_NAME + ) + + logger.info( + "Processing document with embedding model", + embedding_model=embedding_model, + embedding_field=embedding_field_name, + file_hash=file_hash, + ) + # Convert and extract result = clients.converter.convert(file_path) full_doc = result.document.export_to_dict() @@ -188,7 +208,7 @@ class TaskProcessor: for batch in text_batches: resp = await clients.patched_async_client.embeddings.create( - model=EMBED_MODEL, input=batch + model=embedding_model, input=batch ) embeddings.extend([d.embedding for d in resp.data]) @@ -202,7 +222,10 @@ class TaskProcessor: "mimetype": slim_doc["mimetype"], "page": chunk["page"], "text": chunk["text"], - "chunk_embedding": vect, + # Store embedding in model-specific field + embedding_field_name: vect, + # Track which model was used + "embedding_model": embedding_model, "file_size": file_size, "connector_type": connector_type, "indexed_time": datetime.datetime.now().isoformat(), diff --git a/src/services/search_service.py b/src/services/search_service.py index 230c052f..6243c6fa 100644 --- a/src/services/search_service.py +++ b/src/services/search_service.py @@ -12,16 +12,33 @@ class SearchService: self.session_manager = session_manager @tool - async def search_tool(self, query: str) -> Dict[str, Any]: + async def search_tool(self, query: str, embedding_model: str = None) -> Dict[str, Any]: """ Use this tool to search for documents relevant to the query. Args: query (str): query string to search the corpus + embedding_model (str): Optional override for embedding model. + If not provided, uses EMBED_MODEL from config. Returns: dict (str, Any): {"results": [chunks]} on success """ + from utils.embedding_fields import get_embedding_field_name + + # Strategy: Use provided model, or default to EMBED_MODEL + # This assumes documents are embedded with EMBED_MODEL by default + # Future enhancement: Could auto-detect available models in corpus + embedding_model = embedding_model or EMBED_MODEL + embedding_field_name = get_embedding_field_name(embedding_model) + + logger.info( + "Search with embedding model", + embedding_model=embedding_model, + embedding_field=embedding_field_name, + query_preview=query[:50] if query else None, + ) + # Get authentication context from the current async context user_id, jwt_token = get_auth_context() # Get search filters, limit, and score threshold from context @@ -37,12 +54,75 @@ class SearchService: # Detect wildcard request ("*") to return global facets/stats without semantic search is_wildcard_match_all = isinstance(query, str) and query.strip() == "*" - # Only embed when not doing match_all + # Get available embedding models from corpus + query_embeddings = {} + available_models = [] + if not is_wildcard_match_all: - resp = await clients.patched_async_client.embeddings.create( - model=EMBED_MODEL, input=[query] + # First, detect which embedding models exist in the corpus + opensearch_client = self.session_manager.get_user_opensearch_client( + user_id, jwt_token + ) + + try: + agg_query = { + "size": 0, + "aggs": { + "embedding_models": { + "terms": { + "field": "embedding_model", + "size": 10 + } + } + } + } + agg_result = await opensearch_client.search(index=INDEX_NAME, body=agg_query) + buckets = agg_result.get("aggregations", {}).get("embedding_models", {}).get("buckets", []) + available_models = [b["key"] for b in buckets if b["key"]] + + if not available_models: + # Fallback to configured model if no documents indexed yet + available_models = [embedding_model] + + logger.info( + "Detected embedding models in corpus", + available_models=available_models, + model_counts={b["key"]: b["doc_count"] for b in buckets} + ) + except Exception as e: + logger.warning("Failed to detect embedding models, using configured model", error=str(e)) + available_models = [embedding_model] + + # Parallelize embedding generation for all models + import asyncio + + async def embed_with_model(model_name): + try: + resp = await clients.patched_async_client.embeddings.create( + model=model_name, input=[query] + ) + return model_name, resp.data[0].embedding + except Exception as e: + logger.error(f"Failed to embed with model {model_name}", error=str(e)) + return model_name, None + + # Run all embeddings in parallel + embedding_results = await asyncio.gather( + *[embed_with_model(model) for model in available_models], + return_exceptions=True + ) + + # Collect successful embeddings + for result in embedding_results: + if isinstance(result, tuple) and result[1] is not None: + model_name, embedding = result + query_embeddings[model_name] = embedding + + logger.info( + "Generated query embeddings", + models=list(query_embeddings.keys()), + query_preview=query[:50] ) - query_embedding = resp.data[0].embedding # Build filter clauses filter_clauses = [] @@ -80,17 +160,51 @@ class SearchService: else: query_block = {"match_all": {}} else: + # Build multi-model KNN queries + knn_queries = [] + embedding_fields_to_check = [] + + for model_name, embedding_vector in query_embeddings.items(): + field_name = get_embedding_field_name(model_name) + embedding_fields_to_check.append(field_name) + knn_queries.append({ + "knn": { + field_name: { + "vector": embedding_vector, + "k": 50, + "num_candidates": 1000, + } + } + }) + + # Build exists filter - doc must have at least one embedding field + exists_any_embedding = { + "bool": { + "should": [{"exists": {"field": f}} for f in embedding_fields_to_check], + "minimum_should_match": 1 + } + } + + # Add exists filter to existing filters + all_filters = [*filter_clauses, exists_any_embedding] + + logger.debug( + "Building hybrid query with filters", + user_filters_count=len(filter_clauses), + total_filters_count=len(all_filters), + filter_types=[type(f).__name__ for f in all_filters] + ) + # Hybrid search query structure (semantic + keyword) + # Use dis_max to pick best score across multiple embedding fields query_block = { "bool": { "should": [ { - "knn": { - "chunk_embedding": { - "vector": query_embedding, - "k": 10, - "boost": 0.7, - } + "dis_max": { + "tie_breaker": 0.0, # Take only the best match, no blending + "boost": 0.7, # 70% weight for semantic search + "queries": knn_queries } }, { @@ -99,12 +213,12 @@ class SearchService: "fields": ["text^2", "filename^1.5"], "type": "best_fields", "fuzziness": "AUTO", - "boost": 0.3, + "boost": 0.3, # 30% weight for keyword search } }, ], "minimum_should_match": 1, - **({"filter": filter_clauses} if filter_clauses else {}), + "filter": all_filters, } } @@ -115,6 +229,7 @@ class SearchService: "document_types": {"terms": {"field": "mimetype", "size": 10}}, "owners": {"terms": {"field": "owner_name.keyword", "size": 10}}, "connector_types": {"terms": {"field": "connector_type", "size": 10}}, + "embedding_models": {"terms": {"field": "embedding_model", "size": 10}}, }, "_source": [ "filename", @@ -127,6 +242,7 @@ class SearchService: "owner_email", "file_size", "connector_type", + "embedding_model", # Include embedding model in results "allowed_users", "allowed_groups", ], @@ -177,6 +293,7 @@ class SearchService: "owner_email": hit["_source"].get("owner_email"), "file_size": hit["_source"].get("file_size"), "connector_type": hit["_source"].get("connector_type"), + "embedding_model": hit["_source"].get("embedding_model"), # Include in results } ) @@ -199,8 +316,13 @@ class SearchService: filters: Dict[str, Any] = None, limit: int = 10, score_threshold: float = 0, + embedding_model: str = None, ) -> Dict[str, Any]: - """Public search method for API endpoints""" + """Public search method for API endpoints + + Args: + embedding_model: Embedding model to use for search (defaults to EMBED_MODEL) + """ # Set auth context if provided (for direct API calls) from config.settings import is_no_auth_mode @@ -220,4 +342,4 @@ class SearchService: set_search_limit(limit) set_score_threshold(score_threshold) - return await self.search_tool(query) + return await self.search_tool(query, embedding_model=embedding_model) diff --git a/src/utils/add_embedding_model_field.py b/src/utils/add_embedding_model_field.py new file mode 100644 index 00000000..e482c6ca --- /dev/null +++ b/src/utils/add_embedding_model_field.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 +""" +Migration script to add embedding_model field to existing OpenSearch index. +Run this once to fix the field type from text to keyword. +""" +import asyncio +import sys +from opensearchpy import AsyncOpenSearch +from opensearchpy._async.http_aiohttp import AIOHttpConnection + +# Add parent directory to path to import config +sys.path.insert(0, '/home/tato/Desktop/openrag/src') + +from config.settings import ( + OPENSEARCH_HOST, + OPENSEARCH_PORT, + OPENSEARCH_USERNAME, + OPENSEARCH_PASSWORD, + INDEX_NAME, +) +from utils.logging_config import get_logger + +logger = get_logger(__name__) + + +async def add_embedding_model_field(): + """Add embedding_model as keyword field to existing index""" + + # Create admin OpenSearch client + client = AsyncOpenSearch( + hosts=[{"host": OPENSEARCH_HOST, "port": OPENSEARCH_PORT}], + connection_class=AIOHttpConnection, + scheme="https", + use_ssl=True, + verify_certs=False, + ssl_assert_fingerprint=None, + http_auth=(OPENSEARCH_USERNAME, OPENSEARCH_PASSWORD), + http_compress=True, + ) + + try: + # Check if index exists + exists = await client.indices.exists(index=INDEX_NAME) + if not exists: + logger.error(f"Index {INDEX_NAME} does not exist") + return False + + # Get current mapping + mapping = await client.indices.get_mapping(index=INDEX_NAME) + current_props = mapping[INDEX_NAME]["mappings"].get("properties", {}) + + # Check if embedding_model field exists + if "embedding_model" in current_props: + current_type = current_props["embedding_model"].get("type") + logger.info(f"embedding_model field exists with type: {current_type}") + + if current_type == "keyword": + logger.info("Field is already correct type (keyword)") + return True + else: + logger.warning( + f"Field exists with wrong type: {current_type}. " + "Cannot change field type on existing field. " + "You need to reindex or use a different field name." + ) + return False + + # Add the field as keyword + logger.info("Adding embedding_model field as keyword type") + new_mapping = { + "properties": { + "embedding_model": {"type": "keyword"} + } + } + + response = await client.indices.put_mapping( + index=INDEX_NAME, + body=new_mapping + ) + + logger.info(f"Successfully added embedding_model field: {response}") + + # Verify the change + updated_mapping = await client.indices.get_mapping(index=INDEX_NAME) + updated_props = updated_mapping[INDEX_NAME]["mappings"]["properties"] + + if "embedding_model" in updated_props: + field_type = updated_props["embedding_model"].get("type") + logger.info(f"Verified: embedding_model field type is now: {field_type}") + return field_type == "keyword" + else: + logger.error("Field was not added successfully") + return False + + except Exception as e: + logger.error(f"Error adding embedding_model field: {e}") + return False + finally: + await client.close() + + +if __name__ == "__main__": + success = asyncio.run(add_embedding_model_field()) + sys.exit(0 if success else 1) diff --git a/src/utils/embedding_fields.py b/src/utils/embedding_fields.py new file mode 100644 index 00000000..66663750 --- /dev/null +++ b/src/utils/embedding_fields.py @@ -0,0 +1,150 @@ +""" +Utility functions for managing dynamic embedding field names in OpenSearch. + +This module provides helpers for: +- Normalizing embedding model names to valid OpenSearch field names +- Generating dynamic field names based on embedding models +- Ensuring embedding fields exist in the OpenSearch index +""" + +from utils.logging_config import get_logger + +logger = get_logger(__name__) + + +def normalize_model_name(model_name: str) -> str: + """ + Convert an embedding model name to a valid OpenSearch field suffix. + + Examples: + - "text-embedding-3-small" -> "text_embedding_3_small" + - "nomic-embed-text:latest" -> "nomic_embed_text_latest" + - "ibm/slate-125m-english-rtrvr" -> "ibm_slate_125m_english_rtrvr" + + Args: + model_name: The embedding model name (e.g., from OpenAI, Ollama, Watsonx) + + Returns: + Normalized string safe for use as OpenSearch field name suffix + """ + normalized = model_name.lower() + # Replace common separators with underscores + normalized = normalized.replace("-", "_") + normalized = normalized.replace(":", "_") + normalized = normalized.replace("/", "_") + normalized = normalized.replace(".", "_") + # Remove any other non-alphanumeric characters + normalized = "".join(c if c.isalnum() or c == "_" else "_" for c in normalized) + # Remove duplicate underscores + while "__" in normalized: + normalized = normalized.replace("__", "_") + # Remove leading/trailing underscores + normalized = normalized.strip("_") + + return normalized + + +def get_embedding_field_name(model_name: str) -> str: + """ + Get the OpenSearch field name for storing embeddings from a specific model. + + Args: + model_name: The embedding model name + + Returns: + Field name in format: chunk_embedding_{normalized_model_name} + + Examples: + >>> get_embedding_field_name("text-embedding-3-small") + 'chunk_embedding_text_embedding_3_small' + >>> get_embedding_field_name("nomic-embed-text") + 'chunk_embedding_nomic_embed_text' + """ + normalized = normalize_model_name(model_name) + return f"chunk_embedding_{normalized}" + + +async def ensure_embedding_field_exists( + opensearch_client, + model_name: str, + index_name: str = None, +) -> str: + """ + Ensure that an embedding field for the specified model exists in the OpenSearch index. + If the field doesn't exist, it will be added dynamically using PUT mapping API. + + Args: + opensearch_client: OpenSearch client instance + model_name: The embedding model name + index_name: OpenSearch index name (defaults to INDEX_NAME from settings) + + Returns: + The field name that was ensured to exist + + Raises: + Exception: If unable to add the field mapping + """ + from config.settings import INDEX_NAME + from utils.embeddings import get_embedding_dimensions + + if index_name is None: + index_name = INDEX_NAME + + field_name = get_embedding_field_name(model_name) + dimensions = get_embedding_dimensions(model_name) + + logger.info( + "Ensuring embedding field exists", + field_name=field_name, + model_name=model_name, + dimensions=dimensions, + ) + + # Define the field mapping + mapping = { + "properties": { + field_name: { + "type": "knn_vector", + "dimension": dimensions, + "method": { + "name": "disk_ann", + "engine": "jvector", + "space_type": "l2", + "parameters": {"ef_construction": 100, "m": 16}, + }, + } + } + } + + try: + # Try to add the mapping + # OpenSearch will ignore if field already exists + await opensearch_client.indices.put_mapping( + index=index_name, + body=mapping + ) + logger.info( + "Successfully ensured embedding field exists", + field_name=field_name, + model_name=model_name, + ) + except Exception as e: + error_msg = str(e).lower() + # These are expected/safe errors when field already exists + if "already" in error_msg or "exists" in error_msg or "mapper_parsing_exception" in error_msg: + logger.debug( + "Embedding field already exists (expected)", + field_name=field_name, + model_name=model_name, + ) + else: + logger.error( + "Failed to ensure embedding field exists", + field_name=field_name, + model_name=model_name, + error=str(e), + ) + # Don't raise - field might already exist with different params + # Better to proceed and let indexing fail if there's a real issue + + return field_name diff --git a/src/utils/embeddings.py b/src/utils/embeddings.py index b0ec035f..c91f7bb2 100644 --- a/src/utils/embeddings.py +++ b/src/utils/embeddings.py @@ -40,6 +40,8 @@ def create_dynamic_index_body(embedding_model: str) -> dict: "mimetype": {"type": "keyword"}, "page": {"type": "integer"}, "text": {"type": "text"}, + # Legacy field - kept for backward compatibility + # New documents will use chunk_embedding_{model_name} fields "chunk_embedding": { "type": "knn_vector", "dimension": dimensions, @@ -50,6 +52,8 @@ def create_dynamic_index_body(embedding_model: str) -> dict: "parameters": {"ef_construction": 100, "m": 16}, }, }, + # Track which embedding model was used for this chunk + "embedding_model": {"type": "keyword"}, "source_url": {"type": "keyword"}, "connector_type": {"type": "keyword"}, "owner": {"type": "keyword"},