From 102c17d7ec01433a116c4ef4f05da8f257486051 Mon Sep 17 00:00:00 2001 From: phact Date: Fri, 10 Oct 2025 09:43:37 -0400 Subject: [PATCH 01/52] WIP, non-langflow mode only --- src/config/settings.py | 4 + src/models/processors.py | 27 ++++- src/services/search_service.py | 152 ++++++++++++++++++++++--- src/utils/add_embedding_model_field.py | 104 +++++++++++++++++ src/utils/embedding_fields.py | 150 ++++++++++++++++++++++++ src/utils/embeddings.py | 4 + 6 files changed, 424 insertions(+), 17 deletions(-) create mode 100644 src/utils/add_embedding_model_field.py create mode 100644 src/utils/embedding_fields.py diff --git a/src/config/settings.py b/src/config/settings.py index 0672ad68..10e875ea 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -113,6 +113,8 @@ INDEX_BODY = { "mimetype": {"type": "keyword"}, "page": {"type": "integer"}, "text": {"type": "text"}, + # Legacy field - kept for backward compatibility + # New documents will use chunk_embedding_{model_name} fields "chunk_embedding": { "type": "knn_vector", "dimension": VECTOR_DIM, @@ -123,6 +125,8 @@ INDEX_BODY = { "parameters": {"ef_construction": 100, "m": 16}, }, }, + # Track which embedding model was used for this chunk + "embedding_model": {"type": "keyword"}, "source_url": {"type": "keyword"}, "connector_type": {"type": "keyword"}, "owner": {"type": "keyword"}, diff --git a/src/models/processors.py b/src/models/processors.py index 4a5d96b5..2360ddd3 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -156,15 +156,23 @@ class TaskProcessor: owner_email: str = None, file_size: int = None, connector_type: str = "local", + embedding_model: str = None, ): """ Standard processing pipeline for non-Langflow processors: docling conversion + embeddings + OpenSearch indexing. + + Args: + embedding_model: Embedding model to use (defaults to EMBED_MODEL from settings) """ import datetime from config.settings import INDEX_NAME, EMBED_MODEL, clients from services.document_service import chunk_texts_for_embeddings from utils.document_processing import extract_relevant + from utils.embedding_fields import get_embedding_field_name, ensure_embedding_field_exists + + # Use provided embedding model or fall back to default + embedding_model = embedding_model or EMBED_MODEL # Get user's OpenSearch client with JWT for OIDC auth opensearch_client = self.document_service.session_manager.get_user_opensearch_client( @@ -175,6 +183,18 @@ class TaskProcessor: if await self.check_document_exists(file_hash, opensearch_client): return {"status": "unchanged", "id": file_hash} + # Ensure the embedding field exists for this model + embedding_field_name = await ensure_embedding_field_exists( + opensearch_client, embedding_model, INDEX_NAME + ) + + logger.info( + "Processing document with embedding model", + embedding_model=embedding_model, + embedding_field=embedding_field_name, + file_hash=file_hash, + ) + # Convert and extract result = clients.converter.convert(file_path) full_doc = result.document.export_to_dict() @@ -188,7 +208,7 @@ class TaskProcessor: for batch in text_batches: resp = await clients.patched_async_client.embeddings.create( - model=EMBED_MODEL, input=batch + model=embedding_model, input=batch ) embeddings.extend([d.embedding for d in resp.data]) @@ -202,7 +222,10 @@ class TaskProcessor: "mimetype": slim_doc["mimetype"], "page": chunk["page"], "text": chunk["text"], - "chunk_embedding": vect, + # Store embedding in model-specific field + embedding_field_name: vect, + # Track which model was used + "embedding_model": embedding_model, "file_size": file_size, "connector_type": connector_type, "indexed_time": datetime.datetime.now().isoformat(), diff --git a/src/services/search_service.py b/src/services/search_service.py index 230c052f..6243c6fa 100644 --- a/src/services/search_service.py +++ b/src/services/search_service.py @@ -12,16 +12,33 @@ class SearchService: self.session_manager = session_manager @tool - async def search_tool(self, query: str) -> Dict[str, Any]: + async def search_tool(self, query: str, embedding_model: str = None) -> Dict[str, Any]: """ Use this tool to search for documents relevant to the query. Args: query (str): query string to search the corpus + embedding_model (str): Optional override for embedding model. + If not provided, uses EMBED_MODEL from config. Returns: dict (str, Any): {"results": [chunks]} on success """ + from utils.embedding_fields import get_embedding_field_name + + # Strategy: Use provided model, or default to EMBED_MODEL + # This assumes documents are embedded with EMBED_MODEL by default + # Future enhancement: Could auto-detect available models in corpus + embedding_model = embedding_model or EMBED_MODEL + embedding_field_name = get_embedding_field_name(embedding_model) + + logger.info( + "Search with embedding model", + embedding_model=embedding_model, + embedding_field=embedding_field_name, + query_preview=query[:50] if query else None, + ) + # Get authentication context from the current async context user_id, jwt_token = get_auth_context() # Get search filters, limit, and score threshold from context @@ -37,12 +54,75 @@ class SearchService: # Detect wildcard request ("*") to return global facets/stats without semantic search is_wildcard_match_all = isinstance(query, str) and query.strip() == "*" - # Only embed when not doing match_all + # Get available embedding models from corpus + query_embeddings = {} + available_models = [] + if not is_wildcard_match_all: - resp = await clients.patched_async_client.embeddings.create( - model=EMBED_MODEL, input=[query] + # First, detect which embedding models exist in the corpus + opensearch_client = self.session_manager.get_user_opensearch_client( + user_id, jwt_token + ) + + try: + agg_query = { + "size": 0, + "aggs": { + "embedding_models": { + "terms": { + "field": "embedding_model", + "size": 10 + } + } + } + } + agg_result = await opensearch_client.search(index=INDEX_NAME, body=agg_query) + buckets = agg_result.get("aggregations", {}).get("embedding_models", {}).get("buckets", []) + available_models = [b["key"] for b in buckets if b["key"]] + + if not available_models: + # Fallback to configured model if no documents indexed yet + available_models = [embedding_model] + + logger.info( + "Detected embedding models in corpus", + available_models=available_models, + model_counts={b["key"]: b["doc_count"] for b in buckets} + ) + except Exception as e: + logger.warning("Failed to detect embedding models, using configured model", error=str(e)) + available_models = [embedding_model] + + # Parallelize embedding generation for all models + import asyncio + + async def embed_with_model(model_name): + try: + resp = await clients.patched_async_client.embeddings.create( + model=model_name, input=[query] + ) + return model_name, resp.data[0].embedding + except Exception as e: + logger.error(f"Failed to embed with model {model_name}", error=str(e)) + return model_name, None + + # Run all embeddings in parallel + embedding_results = await asyncio.gather( + *[embed_with_model(model) for model in available_models], + return_exceptions=True + ) + + # Collect successful embeddings + for result in embedding_results: + if isinstance(result, tuple) and result[1] is not None: + model_name, embedding = result + query_embeddings[model_name] = embedding + + logger.info( + "Generated query embeddings", + models=list(query_embeddings.keys()), + query_preview=query[:50] ) - query_embedding = resp.data[0].embedding # Build filter clauses filter_clauses = [] @@ -80,17 +160,51 @@ class SearchService: else: query_block = {"match_all": {}} else: + # Build multi-model KNN queries + knn_queries = [] + embedding_fields_to_check = [] + + for model_name, embedding_vector in query_embeddings.items(): + field_name = get_embedding_field_name(model_name) + embedding_fields_to_check.append(field_name) + knn_queries.append({ + "knn": { + field_name: { + "vector": embedding_vector, + "k": 50, + "num_candidates": 1000, + } + } + }) + + # Build exists filter - doc must have at least one embedding field + exists_any_embedding = { + "bool": { + "should": [{"exists": {"field": f}} for f in embedding_fields_to_check], + "minimum_should_match": 1 + } + } + + # Add exists filter to existing filters + all_filters = [*filter_clauses, exists_any_embedding] + + logger.debug( + "Building hybrid query with filters", + user_filters_count=len(filter_clauses), + total_filters_count=len(all_filters), + filter_types=[type(f).__name__ for f in all_filters] + ) + # Hybrid search query structure (semantic + keyword) + # Use dis_max to pick best score across multiple embedding fields query_block = { "bool": { "should": [ { - "knn": { - "chunk_embedding": { - "vector": query_embedding, - "k": 10, - "boost": 0.7, - } + "dis_max": { + "tie_breaker": 0.0, # Take only the best match, no blending + "boost": 0.7, # 70% weight for semantic search + "queries": knn_queries } }, { @@ -99,12 +213,12 @@ class SearchService: "fields": ["text^2", "filename^1.5"], "type": "best_fields", "fuzziness": "AUTO", - "boost": 0.3, + "boost": 0.3, # 30% weight for keyword search } }, ], "minimum_should_match": 1, - **({"filter": filter_clauses} if filter_clauses else {}), + "filter": all_filters, } } @@ -115,6 +229,7 @@ class SearchService: "document_types": {"terms": {"field": "mimetype", "size": 10}}, "owners": {"terms": {"field": "owner_name.keyword", "size": 10}}, "connector_types": {"terms": {"field": "connector_type", "size": 10}}, + "embedding_models": {"terms": {"field": "embedding_model", "size": 10}}, }, "_source": [ "filename", @@ -127,6 +242,7 @@ class SearchService: "owner_email", "file_size", "connector_type", + "embedding_model", # Include embedding model in results "allowed_users", "allowed_groups", ], @@ -177,6 +293,7 @@ class SearchService: "owner_email": hit["_source"].get("owner_email"), "file_size": hit["_source"].get("file_size"), "connector_type": hit["_source"].get("connector_type"), + "embedding_model": hit["_source"].get("embedding_model"), # Include in results } ) @@ -199,8 +316,13 @@ class SearchService: filters: Dict[str, Any] = None, limit: int = 10, score_threshold: float = 0, + embedding_model: str = None, ) -> Dict[str, Any]: - """Public search method for API endpoints""" + """Public search method for API endpoints + + Args: + embedding_model: Embedding model to use for search (defaults to EMBED_MODEL) + """ # Set auth context if provided (for direct API calls) from config.settings import is_no_auth_mode @@ -220,4 +342,4 @@ class SearchService: set_search_limit(limit) set_score_threshold(score_threshold) - return await self.search_tool(query) + return await self.search_tool(query, embedding_model=embedding_model) diff --git a/src/utils/add_embedding_model_field.py b/src/utils/add_embedding_model_field.py new file mode 100644 index 00000000..e482c6ca --- /dev/null +++ b/src/utils/add_embedding_model_field.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 +""" +Migration script to add embedding_model field to existing OpenSearch index. +Run this once to fix the field type from text to keyword. +""" +import asyncio +import sys +from opensearchpy import AsyncOpenSearch +from opensearchpy._async.http_aiohttp import AIOHttpConnection + +# Add parent directory to path to import config +sys.path.insert(0, '/home/tato/Desktop/openrag/src') + +from config.settings import ( + OPENSEARCH_HOST, + OPENSEARCH_PORT, + OPENSEARCH_USERNAME, + OPENSEARCH_PASSWORD, + INDEX_NAME, +) +from utils.logging_config import get_logger + +logger = get_logger(__name__) + + +async def add_embedding_model_field(): + """Add embedding_model as keyword field to existing index""" + + # Create admin OpenSearch client + client = AsyncOpenSearch( + hosts=[{"host": OPENSEARCH_HOST, "port": OPENSEARCH_PORT}], + connection_class=AIOHttpConnection, + scheme="https", + use_ssl=True, + verify_certs=False, + ssl_assert_fingerprint=None, + http_auth=(OPENSEARCH_USERNAME, OPENSEARCH_PASSWORD), + http_compress=True, + ) + + try: + # Check if index exists + exists = await client.indices.exists(index=INDEX_NAME) + if not exists: + logger.error(f"Index {INDEX_NAME} does not exist") + return False + + # Get current mapping + mapping = await client.indices.get_mapping(index=INDEX_NAME) + current_props = mapping[INDEX_NAME]["mappings"].get("properties", {}) + + # Check if embedding_model field exists + if "embedding_model" in current_props: + current_type = current_props["embedding_model"].get("type") + logger.info(f"embedding_model field exists with type: {current_type}") + + if current_type == "keyword": + logger.info("Field is already correct type (keyword)") + return True + else: + logger.warning( + f"Field exists with wrong type: {current_type}. " + "Cannot change field type on existing field. " + "You need to reindex or use a different field name." + ) + return False + + # Add the field as keyword + logger.info("Adding embedding_model field as keyword type") + new_mapping = { + "properties": { + "embedding_model": {"type": "keyword"} + } + } + + response = await client.indices.put_mapping( + index=INDEX_NAME, + body=new_mapping + ) + + logger.info(f"Successfully added embedding_model field: {response}") + + # Verify the change + updated_mapping = await client.indices.get_mapping(index=INDEX_NAME) + updated_props = updated_mapping[INDEX_NAME]["mappings"]["properties"] + + if "embedding_model" in updated_props: + field_type = updated_props["embedding_model"].get("type") + logger.info(f"Verified: embedding_model field type is now: {field_type}") + return field_type == "keyword" + else: + logger.error("Field was not added successfully") + return False + + except Exception as e: + logger.error(f"Error adding embedding_model field: {e}") + return False + finally: + await client.close() + + +if __name__ == "__main__": + success = asyncio.run(add_embedding_model_field()) + sys.exit(0 if success else 1) diff --git a/src/utils/embedding_fields.py b/src/utils/embedding_fields.py new file mode 100644 index 00000000..66663750 --- /dev/null +++ b/src/utils/embedding_fields.py @@ -0,0 +1,150 @@ +""" +Utility functions for managing dynamic embedding field names in OpenSearch. + +This module provides helpers for: +- Normalizing embedding model names to valid OpenSearch field names +- Generating dynamic field names based on embedding models +- Ensuring embedding fields exist in the OpenSearch index +""" + +from utils.logging_config import get_logger + +logger = get_logger(__name__) + + +def normalize_model_name(model_name: str) -> str: + """ + Convert an embedding model name to a valid OpenSearch field suffix. + + Examples: + - "text-embedding-3-small" -> "text_embedding_3_small" + - "nomic-embed-text:latest" -> "nomic_embed_text_latest" + - "ibm/slate-125m-english-rtrvr" -> "ibm_slate_125m_english_rtrvr" + + Args: + model_name: The embedding model name (e.g., from OpenAI, Ollama, Watsonx) + + Returns: + Normalized string safe for use as OpenSearch field name suffix + """ + normalized = model_name.lower() + # Replace common separators with underscores + normalized = normalized.replace("-", "_") + normalized = normalized.replace(":", "_") + normalized = normalized.replace("/", "_") + normalized = normalized.replace(".", "_") + # Remove any other non-alphanumeric characters + normalized = "".join(c if c.isalnum() or c == "_" else "_" for c in normalized) + # Remove duplicate underscores + while "__" in normalized: + normalized = normalized.replace("__", "_") + # Remove leading/trailing underscores + normalized = normalized.strip("_") + + return normalized + + +def get_embedding_field_name(model_name: str) -> str: + """ + Get the OpenSearch field name for storing embeddings from a specific model. + + Args: + model_name: The embedding model name + + Returns: + Field name in format: chunk_embedding_{normalized_model_name} + + Examples: + >>> get_embedding_field_name("text-embedding-3-small") + 'chunk_embedding_text_embedding_3_small' + >>> get_embedding_field_name("nomic-embed-text") + 'chunk_embedding_nomic_embed_text' + """ + normalized = normalize_model_name(model_name) + return f"chunk_embedding_{normalized}" + + +async def ensure_embedding_field_exists( + opensearch_client, + model_name: str, + index_name: str = None, +) -> str: + """ + Ensure that an embedding field for the specified model exists in the OpenSearch index. + If the field doesn't exist, it will be added dynamically using PUT mapping API. + + Args: + opensearch_client: OpenSearch client instance + model_name: The embedding model name + index_name: OpenSearch index name (defaults to INDEX_NAME from settings) + + Returns: + The field name that was ensured to exist + + Raises: + Exception: If unable to add the field mapping + """ + from config.settings import INDEX_NAME + from utils.embeddings import get_embedding_dimensions + + if index_name is None: + index_name = INDEX_NAME + + field_name = get_embedding_field_name(model_name) + dimensions = get_embedding_dimensions(model_name) + + logger.info( + "Ensuring embedding field exists", + field_name=field_name, + model_name=model_name, + dimensions=dimensions, + ) + + # Define the field mapping + mapping = { + "properties": { + field_name: { + "type": "knn_vector", + "dimension": dimensions, + "method": { + "name": "disk_ann", + "engine": "jvector", + "space_type": "l2", + "parameters": {"ef_construction": 100, "m": 16}, + }, + } + } + } + + try: + # Try to add the mapping + # OpenSearch will ignore if field already exists + await opensearch_client.indices.put_mapping( + index=index_name, + body=mapping + ) + logger.info( + "Successfully ensured embedding field exists", + field_name=field_name, + model_name=model_name, + ) + except Exception as e: + error_msg = str(e).lower() + # These are expected/safe errors when field already exists + if "already" in error_msg or "exists" in error_msg or "mapper_parsing_exception" in error_msg: + logger.debug( + "Embedding field already exists (expected)", + field_name=field_name, + model_name=model_name, + ) + else: + logger.error( + "Failed to ensure embedding field exists", + field_name=field_name, + model_name=model_name, + error=str(e), + ) + # Don't raise - field might already exist with different params + # Better to proceed and let indexing fail if there's a real issue + + return field_name diff --git a/src/utils/embeddings.py b/src/utils/embeddings.py index b0ec035f..c91f7bb2 100644 --- a/src/utils/embeddings.py +++ b/src/utils/embeddings.py @@ -40,6 +40,8 @@ def create_dynamic_index_body(embedding_model: str) -> dict: "mimetype": {"type": "keyword"}, "page": {"type": "integer"}, "text": {"type": "text"}, + # Legacy field - kept for backward compatibility + # New documents will use chunk_embedding_{model_name} fields "chunk_embedding": { "type": "knn_vector", "dimension": dimensions, @@ -50,6 +52,8 @@ def create_dynamic_index_body(embedding_model: str) -> dict: "parameters": {"ef_construction": 100, "m": 16}, }, }, + # Track which embedding model was used for this chunk + "embedding_model": {"type": "keyword"}, "source_url": {"type": "keyword"}, "connector_type": {"type": "keyword"}, "owner": {"type": "keyword"}, From 4b29ad73af1bd6b2d013bf4f362114ea8da799a8 Mon Sep 17 00:00:00 2001 From: phact Date: Fri, 10 Oct 2025 09:57:45 -0400 Subject: [PATCH 02/52] WIP, non-langflow mode only --- src/utils/add_embedding_model_field.py | 104 ------------------------- src/utils/embedding_fields.py | 6 +- 2 files changed, 5 insertions(+), 105 deletions(-) delete mode 100644 src/utils/add_embedding_model_field.py diff --git a/src/utils/add_embedding_model_field.py b/src/utils/add_embedding_model_field.py deleted file mode 100644 index e482c6ca..00000000 --- a/src/utils/add_embedding_model_field.py +++ /dev/null @@ -1,104 +0,0 @@ -#!/usr/bin/env python3 -""" -Migration script to add embedding_model field to existing OpenSearch index. -Run this once to fix the field type from text to keyword. -""" -import asyncio -import sys -from opensearchpy import AsyncOpenSearch -from opensearchpy._async.http_aiohttp import AIOHttpConnection - -# Add parent directory to path to import config -sys.path.insert(0, '/home/tato/Desktop/openrag/src') - -from config.settings import ( - OPENSEARCH_HOST, - OPENSEARCH_PORT, - OPENSEARCH_USERNAME, - OPENSEARCH_PASSWORD, - INDEX_NAME, -) -from utils.logging_config import get_logger - -logger = get_logger(__name__) - - -async def add_embedding_model_field(): - """Add embedding_model as keyword field to existing index""" - - # Create admin OpenSearch client - client = AsyncOpenSearch( - hosts=[{"host": OPENSEARCH_HOST, "port": OPENSEARCH_PORT}], - connection_class=AIOHttpConnection, - scheme="https", - use_ssl=True, - verify_certs=False, - ssl_assert_fingerprint=None, - http_auth=(OPENSEARCH_USERNAME, OPENSEARCH_PASSWORD), - http_compress=True, - ) - - try: - # Check if index exists - exists = await client.indices.exists(index=INDEX_NAME) - if not exists: - logger.error(f"Index {INDEX_NAME} does not exist") - return False - - # Get current mapping - mapping = await client.indices.get_mapping(index=INDEX_NAME) - current_props = mapping[INDEX_NAME]["mappings"].get("properties", {}) - - # Check if embedding_model field exists - if "embedding_model" in current_props: - current_type = current_props["embedding_model"].get("type") - logger.info(f"embedding_model field exists with type: {current_type}") - - if current_type == "keyword": - logger.info("Field is already correct type (keyword)") - return True - else: - logger.warning( - f"Field exists with wrong type: {current_type}. " - "Cannot change field type on existing field. " - "You need to reindex or use a different field name." - ) - return False - - # Add the field as keyword - logger.info("Adding embedding_model field as keyword type") - new_mapping = { - "properties": { - "embedding_model": {"type": "keyword"} - } - } - - response = await client.indices.put_mapping( - index=INDEX_NAME, - body=new_mapping - ) - - logger.info(f"Successfully added embedding_model field: {response}") - - # Verify the change - updated_mapping = await client.indices.get_mapping(index=INDEX_NAME) - updated_props = updated_mapping[INDEX_NAME]["mappings"]["properties"] - - if "embedding_model" in updated_props: - field_type = updated_props["embedding_model"].get("type") - logger.info(f"Verified: embedding_model field type is now: {field_type}") - return field_type == "keyword" - else: - logger.error("Field was not added successfully") - return False - - except Exception as e: - logger.error(f"Error adding embedding_model field: {e}") - return False - finally: - await client.close() - - -if __name__ == "__main__": - success = asyncio.run(add_embedding_model_field()) - sys.exit(0 if success else 1) diff --git a/src/utils/embedding_fields.py b/src/utils/embedding_fields.py index 66663750..0ba4f449 100644 --- a/src/utils/embedding_fields.py +++ b/src/utils/embedding_fields.py @@ -100,7 +100,7 @@ async def ensure_embedding_field_exists( dimensions=dimensions, ) - # Define the field mapping + # Define the field mapping for both the vector field and the tracking field mapping = { "properties": { field_name: { @@ -112,6 +112,10 @@ async def ensure_embedding_field_exists( "space_type": "l2", "parameters": {"ef_construction": 100, "m": 16}, }, + }, + # Also ensure the embedding_model tracking field exists as keyword + "embedding_model": { + "type": "keyword" } } } From 23a0efbbda82742753c0ea8a5d6ba9a549928e93 Mon Sep 17 00:00:00 2001 From: phact Date: Fri, 10 Oct 2025 09:58:21 -0400 Subject: [PATCH 03/52] one time migration script for existing indexes --- scripts/migrate_embedding_model_field.py | 104 +++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 scripts/migrate_embedding_model_field.py diff --git a/scripts/migrate_embedding_model_field.py b/scripts/migrate_embedding_model_field.py new file mode 100644 index 00000000..e482c6ca --- /dev/null +++ b/scripts/migrate_embedding_model_field.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 +""" +Migration script to add embedding_model field to existing OpenSearch index. +Run this once to fix the field type from text to keyword. +""" +import asyncio +import sys +from opensearchpy import AsyncOpenSearch +from opensearchpy._async.http_aiohttp import AIOHttpConnection + +# Add parent directory to path to import config +sys.path.insert(0, '/home/tato/Desktop/openrag/src') + +from config.settings import ( + OPENSEARCH_HOST, + OPENSEARCH_PORT, + OPENSEARCH_USERNAME, + OPENSEARCH_PASSWORD, + INDEX_NAME, +) +from utils.logging_config import get_logger + +logger = get_logger(__name__) + + +async def add_embedding_model_field(): + """Add embedding_model as keyword field to existing index""" + + # Create admin OpenSearch client + client = AsyncOpenSearch( + hosts=[{"host": OPENSEARCH_HOST, "port": OPENSEARCH_PORT}], + connection_class=AIOHttpConnection, + scheme="https", + use_ssl=True, + verify_certs=False, + ssl_assert_fingerprint=None, + http_auth=(OPENSEARCH_USERNAME, OPENSEARCH_PASSWORD), + http_compress=True, + ) + + try: + # Check if index exists + exists = await client.indices.exists(index=INDEX_NAME) + if not exists: + logger.error(f"Index {INDEX_NAME} does not exist") + return False + + # Get current mapping + mapping = await client.indices.get_mapping(index=INDEX_NAME) + current_props = mapping[INDEX_NAME]["mappings"].get("properties", {}) + + # Check if embedding_model field exists + if "embedding_model" in current_props: + current_type = current_props["embedding_model"].get("type") + logger.info(f"embedding_model field exists with type: {current_type}") + + if current_type == "keyword": + logger.info("Field is already correct type (keyword)") + return True + else: + logger.warning( + f"Field exists with wrong type: {current_type}. " + "Cannot change field type on existing field. " + "You need to reindex or use a different field name." + ) + return False + + # Add the field as keyword + logger.info("Adding embedding_model field as keyword type") + new_mapping = { + "properties": { + "embedding_model": {"type": "keyword"} + } + } + + response = await client.indices.put_mapping( + index=INDEX_NAME, + body=new_mapping + ) + + logger.info(f"Successfully added embedding_model field: {response}") + + # Verify the change + updated_mapping = await client.indices.get_mapping(index=INDEX_NAME) + updated_props = updated_mapping[INDEX_NAME]["mappings"]["properties"] + + if "embedding_model" in updated_props: + field_type = updated_props["embedding_model"].get("type") + logger.info(f"Verified: embedding_model field type is now: {field_type}") + return field_type == "keyword" + else: + logger.error("Field was not added successfully") + return False + + except Exception as e: + logger.error(f"Error adding embedding_model field: {e}") + return False + finally: + await client.close() + + +if __name__ == "__main__": + success = asyncio.run(add_embedding_model_field()) + sys.exit(0 if success else 1) From b799d91c257998f6d70a0879577f32a3de5e5813 Mon Sep 17 00:00:00 2001 From: phact Date: Fri, 10 Oct 2025 10:05:27 -0400 Subject: [PATCH 04/52] fix migration script --- scripts/migrate_embedding_model_field.py | 375 ++++++++++++++++++++--- 1 file changed, 325 insertions(+), 50 deletions(-) diff --git a/scripts/migrate_embedding_model_field.py b/scripts/migrate_embedding_model_field.py index e482c6ca..d90f270e 100644 --- a/scripts/migrate_embedding_model_field.py +++ b/scripts/migrate_embedding_model_field.py @@ -1,15 +1,58 @@ #!/usr/bin/env python3 """ -Migration script to add embedding_model field to existing OpenSearch index. -Run this once to fix the field type from text to keyword. +Migration script to migrate legacy embeddings to multi-model setup. + +This script migrates documents from the legacy single-field embedding system +to the new multi-model system with dynamic field names. + +Legacy format: + { + "chunk_embedding": [0.1, 0.2, ...], + // no embedding_model field + } + +New format: + { + "chunk_embedding_text_embedding_3_small": [0.1, 0.2, ...], + "embedding_model": "text-embedding-3-small" + } + +Usage: + uv run python scripts/migrate_embedding_model_field.py --model + +Example: + uv run python scripts/migrate_embedding_model_field.py --model text-embedding-3-small + +Options: + --model MODEL The embedding model name to assign to legacy embeddings + (e.g., "text-embedding-3-small", "nomic-embed-text") + --batch-size SIZE Number of documents to process per batch (default: 100) + --dry-run Show what would be migrated without making changes + --index INDEX Index name (default: documents) + +What it does: + 1. Finds all documents with legacy "chunk_embedding" field but no "embedding_model" field + 2. For each document: + - Copies the vector from "chunk_embedding" to "chunk_embedding_{model_name}" + - Adds "embedding_model" field with the specified model name + - Optionally removes the legacy "chunk_embedding" field + 3. Uses bulk updates for efficiency + +Note: This script does NOT re-embed documents. It simply tags existing embeddings +with the model name you specify. Make sure to specify the correct model that was +actually used to create those embeddings. """ import asyncio import sys -from opensearchpy import AsyncOpenSearch +import os +import argparse +from typing import List, Dict, Any + +from opensearchpy import AsyncOpenSearch, helpers from opensearchpy._async.http_aiohttp import AIOHttpConnection -# Add parent directory to path to import config -sys.path.insert(0, '/home/tato/Desktop/openrag/src') +# Add src directory to path to import config +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) from config.settings import ( OPENSEARCH_HOST, @@ -19,12 +62,188 @@ from config.settings import ( INDEX_NAME, ) from utils.logging_config import get_logger +from utils.embedding_fields import get_embedding_field_name logger = get_logger(__name__) -async def add_embedding_model_field(): - """Add embedding_model as keyword field to existing index""" +async def ensure_new_field_exists( + client: AsyncOpenSearch, + index_name: str, + field_name: str, + dimensions: int +) -> None: + """Ensure the new embedding field exists in the index.""" + mapping = { + "properties": { + field_name: { + "type": "knn_vector", + "dimension": dimensions, + "method": { + "name": "disk_ann", + "engine": "jvector", + "space_type": "l2", + "parameters": {"ef_construction": 100, "m": 16}, + }, + }, + "embedding_model": { + "type": "keyword" + } + } + } + + try: + await client.indices.put_mapping(index=index_name, body=mapping) + logger.info(f"Ensured field exists: {field_name}") + except Exception as e: + error_msg = str(e).lower() + if "already" in error_msg or "exists" in error_msg: + logger.debug(f"Field already exists: {field_name}") + else: + logger.error(f"Failed to add field mapping: {e}") + raise + + +async def find_legacy_documents( + client: AsyncOpenSearch, + index_name: str, + batch_size: int = 100 +) -> List[Dict[str, Any]]: + """Find all documents with legacy chunk_embedding but no embedding_model field.""" + query = { + "query": { + "bool": { + "must": [ + {"exists": {"field": "chunk_embedding"}} + ], + "must_not": [ + {"exists": {"field": "embedding_model"}} + ] + } + }, + "size": batch_size, + "_source": True + } + + try: + response = await client.search(index=index_name, body=query, scroll='5m') + scroll_id = response['_scroll_id'] + hits = response['hits']['hits'] + + all_docs = hits + + # Continue scrolling until no more results + while len(hits) > 0: + response = await client.scroll(scroll_id=scroll_id, scroll='5m') + scroll_id = response['_scroll_id'] + hits = response['hits']['hits'] + all_docs.extend(hits) + + # Clean up scroll + await client.clear_scroll(scroll_id=scroll_id) + + return all_docs + except Exception as e: + logger.error(f"Error finding legacy documents: {e}") + raise + + +async def migrate_documents( + client: AsyncOpenSearch, + index_name: str, + documents: List[Dict[str, Any]], + model_name: str, + new_field_name: str, + dry_run: bool = False +) -> Dict[str, int]: + """Migrate legacy documents to new format.""" + if not documents: + return {"migrated": 0, "errors": 0} + + if dry_run: + logger.info(f"DRY RUN: Would migrate {len(documents)} documents") + for doc in documents[:5]: # Show first 5 as sample + doc_id = doc['_id'] + has_legacy = 'chunk_embedding' in doc['_source'] + logger.info(f" Document {doc_id}: has_legacy={has_legacy}") + if len(documents) > 5: + logger.info(f" ... and {len(documents) - 5} more documents") + return {"migrated": len(documents), "errors": 0} + + # Prepare bulk update actions + actions = [] + for doc in documents: + doc_id = doc['_id'] + source = doc['_source'] + + # Copy the legacy embedding to the new field + legacy_embedding = source.get('chunk_embedding') + if not legacy_embedding: + logger.warning(f"Document {doc_id} missing chunk_embedding, skipping") + continue + + # Build update document + update_doc = { + new_field_name: legacy_embedding, + "embedding_model": model_name + } + + action = { + "_op_type": "update", + "_index": index_name, + "_id": doc_id, + "doc": update_doc + } + actions.append(action) + + # Execute bulk update + migrated = 0 + errors = 0 + + try: + success, failed = await helpers.async_bulk( + client, + actions, + raise_on_error=False, + raise_on_exception=False + ) + migrated = success + errors = len(failed) if isinstance(failed, list) else 0 + + if errors > 0: + logger.error(f"Failed to migrate {errors} documents") + for failure in (failed if isinstance(failed, list) else [])[:5]: + logger.error(f" Error: {failure}") + + logger.info(f"Successfully migrated {migrated} documents") + except Exception as e: + logger.error(f"Bulk migration failed: {e}") + raise + + return {"migrated": migrated, "errors": errors} + + +async def migrate_legacy_embeddings( + model_name: str, + batch_size: int = 100, + dry_run: bool = False, + index_name: str = None +) -> bool: + """Main migration function.""" + if index_name is None: + index_name = INDEX_NAME + + new_field_name = get_embedding_field_name(model_name) + + logger.info("=" * 60) + logger.info("Legacy Embedding Migration") + logger.info("=" * 60) + logger.info(f"Index: {index_name}") + logger.info(f"Model: {model_name}") + logger.info(f"New field: {new_field_name}") + logger.info(f"Batch size: {batch_size}") + logger.info(f"Dry run: {dry_run}") + logger.info("=" * 60) # Create admin OpenSearch client client = AsyncOpenSearch( @@ -40,65 +259,121 @@ async def add_embedding_model_field(): try: # Check if index exists - exists = await client.indices.exists(index=INDEX_NAME) + exists = await client.indices.exists(index=index_name) if not exists: - logger.error(f"Index {INDEX_NAME} does not exist") + logger.error(f"Index '{index_name}' does not exist") return False - # Get current mapping - mapping = await client.indices.get_mapping(index=INDEX_NAME) - current_props = mapping[INDEX_NAME]["mappings"].get("properties", {}) + # Find legacy documents + logger.info("Searching for legacy documents...") + legacy_docs = await find_legacy_documents(client, index_name, batch_size) - # Check if embedding_model field exists - if "embedding_model" in current_props: - current_type = current_props["embedding_model"].get("type") - logger.info(f"embedding_model field exists with type: {current_type}") + if not legacy_docs: + logger.info("No legacy documents found. Migration not needed.") + return True - if current_type == "keyword": - logger.info("Field is already correct type (keyword)") - return True - else: - logger.warning( - f"Field exists with wrong type: {current_type}. " - "Cannot change field type on existing field. " - "You need to reindex or use a different field name." - ) - return False + logger.info(f"Found {len(legacy_docs)} legacy documents to migrate") - # Add the field as keyword - logger.info("Adding embedding_model field as keyword type") - new_mapping = { - "properties": { - "embedding_model": {"type": "keyword"} - } - } + # Get vector dimension from first document + first_doc = legacy_docs[0] + legacy_embedding = first_doc['_source'].get('chunk_embedding', []) + dimensions = len(legacy_embedding) + logger.info(f"Detected vector dimensions: {dimensions}") - response = await client.indices.put_mapping( - index=INDEX_NAME, - body=new_mapping + # Ensure new field exists + if not dry_run: + logger.info(f"Ensuring new field exists: {new_field_name}") + await ensure_new_field_exists(client, index_name, new_field_name, dimensions) + + # Migrate documents + logger.info("Starting migration...") + result = await migrate_documents( + client, + index_name, + legacy_docs, + model_name, + new_field_name, + dry_run ) - logger.info(f"Successfully added embedding_model field: {response}") + logger.info("=" * 60) + logger.info("Migration Summary") + logger.info("=" * 60) + logger.info(f"Total documents: {len(legacy_docs)}") + logger.info(f"Successfully migrated: {result['migrated']}") + logger.info(f"Errors: {result['errors']}") + logger.info("=" * 60) - # Verify the change - updated_mapping = await client.indices.get_mapping(index=INDEX_NAME) - updated_props = updated_mapping[INDEX_NAME]["mappings"]["properties"] - - if "embedding_model" in updated_props: - field_type = updated_props["embedding_model"].get("type") - logger.info(f"Verified: embedding_model field type is now: {field_type}") - return field_type == "keyword" - else: - logger.error("Field was not added successfully") + if result['errors'] > 0: + logger.warning("Migration completed with errors") return False + if dry_run: + logger.info("DRY RUN completed. No changes were made.") + logger.info(f"Run without --dry-run to perform the migration") + else: + logger.info("Migration completed successfully!") + + return True + except Exception as e: - logger.error(f"Error adding embedding_model field: {e}") + logger.error(f"Migration failed: {e}") return False finally: await client.close() -if __name__ == "__main__": - success = asyncio.run(add_embedding_model_field()) +def main(): + parser = argparse.ArgumentParser( + description="Migrate legacy embeddings to multi-model setup", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Dry run to see what would be migrated + uv run python scripts/migrate_embedding_model_field.py --model text-embedding-3-small --dry-run + + # Perform actual migration + uv run python scripts/migrate_embedding_model_field.py --model text-embedding-3-small + + # Migrate with custom batch size + uv run python scripts/migrate_embedding_model_field.py --model nomic-embed-text --batch-size 500 + """ + ) + + parser.add_argument( + '--model', + required=True, + help='Embedding model name to assign to legacy embeddings (e.g., "text-embedding-3-small")' + ) + parser.add_argument( + '--batch-size', + type=int, + default=100, + help='Number of documents to process per batch (default: 100)' + ) + parser.add_argument( + '--dry-run', + action='store_true', + help='Show what would be migrated without making changes' + ) + parser.add_argument( + '--index', + default=None, + help=f'Index name (default: {INDEX_NAME})' + ) + + args = parser.parse_args() + + # Run migration + success = asyncio.run(migrate_legacy_embeddings( + model_name=args.model, + batch_size=args.batch_size, + dry_run=args.dry_run, + index_name=args.index + )) + sys.exit(0 if success else 1) + + +if __name__ == "__main__": + main() From c8c719c7ebed82d815079f5820c005026c546906 Mon Sep 17 00:00:00 2001 From: phact Date: Fri, 10 Oct 2025 10:28:11 -0400 Subject: [PATCH 05/52] filter query optimization --- src/services/search_service.py | 93 ++++++++++++++++++++++++---------- 1 file changed, 66 insertions(+), 27 deletions(-) diff --git a/src/services/search_service.py b/src/services/search_service.py index 6243c6fa..d8bd0e27 100644 --- a/src/services/search_service.py +++ b/src/services/search_service.py @@ -64,7 +64,36 @@ class SearchService: user_id, jwt_token ) + # Build filter clauses first so we can use them in model detection + filter_clauses = [] + if filters: + # Map frontend filter names to backend field names + field_mapping = { + "data_sources": "filename", + "document_types": "mimetype", + "owners": "owner_name.keyword", + "connector_types": "connector_type", + } + + for filter_key, values in filters.items(): + if values is not None and isinstance(values, list): + # Map frontend key to backend field name + field_name = field_mapping.get(filter_key, filter_key) + + if len(values) == 0: + # Empty array means "match nothing" - use impossible filter + filter_clauses.append( + {"term": {field_name: "__IMPOSSIBLE_VALUE__"}} + ) + elif len(values) == 1: + # Single value filter + filter_clauses.append({"term": {field_name: values[0]}}) + else: + # Multiple values filter + filter_clauses.append({"terms": {field_name: values}}) + try: + # Build aggregation query with filters applied agg_query = { "size": 0, "aggs": { @@ -76,6 +105,15 @@ class SearchService: } } } + + # Apply filters to model detection if any exist + if filter_clauses: + agg_query["query"] = { + "bool": { + "filter": filter_clauses + } + } + agg_result = await opensearch_client.search(index=INDEX_NAME, body=agg_query) buckets = agg_result.get("aggregations", {}).get("embedding_models", {}).get("buckets", []) available_models = [b["key"] for b in buckets if b["key"]] @@ -87,7 +125,8 @@ class SearchService: logger.info( "Detected embedding models in corpus", available_models=available_models, - model_counts={b["key"]: b["doc_count"] for b in buckets} + model_counts={b["key"]: b["doc_count"] for b in buckets}, + with_filters=len(filter_clauses) > 0 ) except Exception as e: logger.warning("Failed to detect embedding models, using configured model", error=str(e)) @@ -123,34 +162,34 @@ class SearchService: models=list(query_embeddings.keys()), query_preview=query[:50] ) + else: + # Wildcard query - no embedding needed + filter_clauses = [] + if filters: + # Map frontend filter names to backend field names + field_mapping = { + "data_sources": "filename", + "document_types": "mimetype", + "owners": "owner_name.keyword", + "connector_types": "connector_type", + } - # Build filter clauses - filter_clauses = [] - if filters: - # Map frontend filter names to backend field names - field_mapping = { - "data_sources": "filename", - "document_types": "mimetype", - "owners": "owner_name.keyword", - "connector_types": "connector_type", - } + for filter_key, values in filters.items(): + if values is not None and isinstance(values, list): + # Map frontend key to backend field name + field_name = field_mapping.get(filter_key, filter_key) - for filter_key, values in filters.items(): - if values is not None and isinstance(values, list): - # Map frontend key to backend field name - field_name = field_mapping.get(filter_key, filter_key) - - if len(values) == 0: - # Empty array means "match nothing" - use impossible filter - filter_clauses.append( - {"term": {field_name: "__IMPOSSIBLE_VALUE__"}} - ) - elif len(values) == 1: - # Single value filter - filter_clauses.append({"term": {field_name: values[0]}}) - else: - # Multiple values filter - filter_clauses.append({"terms": {field_name: values}}) + if len(values) == 0: + # Empty array means "match nothing" - use impossible filter + filter_clauses.append( + {"term": {field_name: "__IMPOSSIBLE_VALUE__"}} + ) + elif len(values) == 1: + # Single value filter + filter_clauses.append({"term": {field_name: values[0]}}) + else: + # Multiple values filter + filter_clauses.append({"terms": {field_name: values}}) # Build query body if is_wildcard_match_all: From 77f558e6904194db2083026e069a0f77e3869397 Mon Sep 17 00:00:00 2001 From: phact Date: Fri, 10 Oct 2025 12:52:37 -0400 Subject: [PATCH 06/52] show file processing errors in UI --- .../src/app/api/queries/useGetSearchQuery.ts | 1 + .../src/app/api/queries/useGetTasksQuery.ts | 20 +- frontend/src/app/knowledge/page.tsx | 42 +++- .../src/components/task-notification-menu.tsx | 238 +++++++++--------- frontend/src/contexts/task-context.tsx | 34 ++- 5 files changed, 204 insertions(+), 131 deletions(-) diff --git a/frontend/src/app/api/queries/useGetSearchQuery.ts b/frontend/src/app/api/queries/useGetSearchQuery.ts index 50905fcc..3db14cb2 100644 --- a/frontend/src/app/api/queries/useGetSearchQuery.ts +++ b/frontend/src/app/api/queries/useGetSearchQuery.ts @@ -50,6 +50,7 @@ export interface File { | "failed" | "hidden" | "sync"; + error?: string; chunks?: ChunkResult[]; } diff --git a/frontend/src/app/api/queries/useGetTasksQuery.ts b/frontend/src/app/api/queries/useGetTasksQuery.ts index 1ea59d26..f7420dd7 100644 --- a/frontend/src/app/api/queries/useGetTasksQuery.ts +++ b/frontend/src/app/api/queries/useGetTasksQuery.ts @@ -4,6 +4,24 @@ import { useQueryClient, } from "@tanstack/react-query"; +export interface TaskFileEntry { + status?: + | "pending" + | "running" + | "processing" + | "completed" + | "failed" + | "error"; + result?: unknown; + error?: string; + retry_count?: number; + created_at?: string; + updated_at?: string; + duration_seconds?: number; + filename?: string; + [key: string]: unknown; +} + export interface Task { task_id: string; status: @@ -24,7 +42,7 @@ export interface Task { duration_seconds?: number; result?: Record; error?: string; - files?: Record>; + files?: Record; } export interface TasksResponse { diff --git a/frontend/src/app/knowledge/page.tsx b/frontend/src/app/knowledge/page.tsx index 334f8e6f..385fa9be 100644 --- a/frontend/src/app/knowledge/page.tsx +++ b/frontend/src/app/knowledge/page.tsx @@ -26,6 +26,14 @@ import GoogleDriveIcon from "../settings/icons/google-drive-icon"; import OneDriveIcon from "../settings/icons/one-drive-icon"; import SharePointIcon from "../settings/icons/share-point-icon"; import { KnowledgeSearchInput } from "@/components/knowledge-search-input"; +import { + Dialog, + DialogContent, + DialogDescription, + DialogHeader, + DialogTitle, + DialogTrigger, +} from "@/components/ui/dialog"; // Function to get the appropriate icon for a connector type function getSourceIcon(connectorType?: string) { @@ -77,6 +85,7 @@ function SearchPage() { size: taskFile.size, connector_type: taskFile.connector_type, status: taskFile.status, + error: taskFile.error, }; }); @@ -128,7 +137,6 @@ function SearchPage() { // Read status directly from data on each render const status = data?.status || "active"; const isActive = status === "active"; - console.log(data?.filename, status, "a"); return (
) => { - console.log(data?.filename, data?.status, "b"); - // Default to 'active' status if no status is provided const status = data?.status || "active"; + const error = + typeof data?.error === "string" && data.error.trim().length > 0 + ? data.error.trim() + : undefined; + if (status === "failed" && error) { + return ( + + + + + + + Ingestion failed + + {data?.filename || "Unknown file"} + + +
+ {error} +
+
+
+ ); + } return ; }, }, diff --git a/frontend/src/components/task-notification-menu.tsx b/frontend/src/components/task-notification-menu.tsx index 6cb968e8..7e6dcb0a 100644 --- a/frontend/src/components/task-notification-menu.tsx +++ b/frontend/src/components/task-notification-menu.tsx @@ -169,95 +169,87 @@ export function TaskNotificationMenu() { {activeTasks.length > 0 && (

Active Tasks

- {activeTasks.map((task) => ( - - -
- - {getTaskIcon(task.status)} - Task {task.task_id.substring(0, 8)}... - -
- - Started {formatRelativeTime(task.created_at)} - {formatDuration(task.duration_seconds) && ( - - • {formatDuration(task.duration_seconds)} - - )} - -
- {formatTaskProgress(task) && ( - -
-
- Progress: {formatTaskProgress(task)?.basic} -
- {formatTaskProgress(task)?.detailed && ( -
-
-
- - {formatTaskProgress(task)?.detailed.successful} success - -
-
-
- - {formatTaskProgress(task)?.detailed.failed} failed - -
-
-
- - {formatTaskProgress(task)?.detailed.running} running - -
-
-
- - {formatTaskProgress(task)?.detailed.pending} pending - + {activeTasks.map((task) => { + const progress = formatTaskProgress(task) + const showCancel = + task.status === 'pending' || + task.status === 'running' || + task.status === 'processing' + + return ( + + +
+ + {getTaskIcon(task.status)} + Task {task.task_id.substring(0, 8)}... + +
+ + Started {formatRelativeTime(task.created_at)} + {formatDuration(task.duration_seconds) && ( + + • {formatDuration(task.duration_seconds)} + + )} + +
+ {(progress || showCancel) && ( + + {progress && ( +
+
+ Progress: {progress.basic}
+ {progress.detailed && ( +
+
+
+ + {progress.detailed.successful} success + +
+
+
+ + {progress.detailed.failed} failed + +
+
+
+ + {progress.detailed.running} running + +
+
+
+ + {progress.detailed.pending} pending + +
+
+ )}
)} -
- {/* Cancel button in bottom right */} - {(task.status === 'pending' || task.status === 'running' || task.status === 'processing') && ( -
- -
- )} - - )} - {/* Cancel button for tasks without progress */} - {!formatTaskProgress(task) && (task.status === 'pending' || task.status === 'running' || task.status === 'processing') && ( - -
- -
-
- )} - - ))} + {showCancel && ( +
+ +
+ )} + + )} + + ) + })}
)} @@ -282,43 +274,47 @@ export function TaskNotificationMenu() { {isExpanded && (
- {recentTasks.map((task) => ( -
- {getTaskIcon(task.status)} -
-
- Task {task.task_id.substring(0, 8)}... -
-
- {formatRelativeTime(task.updated_at)} - {formatDuration(task.duration_seconds) && ( - - • {formatDuration(task.duration_seconds)} - - )} -
- {/* Show final results for completed tasks */} - {task.status === 'completed' && formatTaskProgress(task)?.detailed && ( -
- {formatTaskProgress(task)?.detailed.successful} success, {' '} - {formatTaskProgress(task)?.detailed.failed} failed - {(formatTaskProgress(task)?.detailed.running || 0) > 0 && ( - , {formatTaskProgress(task)?.detailed.running} running + {recentTasks.map((task) => { + const progress = formatTaskProgress(task) + + return ( +
+ {getTaskIcon(task.status)} +
+
+ Task {task.task_id.substring(0, 8)}... +
+
+ {formatRelativeTime(task.updated_at)} + {formatDuration(task.duration_seconds) && ( + + • {formatDuration(task.duration_seconds)} + )}
- )} - {task.status === 'failed' && task.error && ( -
- {task.error} -
- )} + {/* Show final results for completed tasks */} + {task.status === 'completed' && progress?.detailed && ( +
+ {progress.detailed.successful} success,{' '} + {progress.detailed.failed} failed + {(progress.detailed.running || 0) > 0 && ( + , {progress.detailed.running} running + )} +
+ )} + {task.status === 'failed' && task.error && ( +
+ {task.error} +
+ )} +
+ {getStatusBadge(task.status)}
- {getStatusBadge(task.status)} -
- ))} + ) + })}
)}
@@ -338,4 +334,4 @@ export function TaskNotificationMenu() {
) -} \ No newline at end of file +} diff --git a/frontend/src/contexts/task-context.tsx b/frontend/src/contexts/task-context.tsx index 9b3d9908..475c36f6 100644 --- a/frontend/src/contexts/task-context.tsx +++ b/frontend/src/contexts/task-context.tsx @@ -14,6 +14,7 @@ import { toast } from "sonner"; import { useCancelTaskMutation } from "@/app/api/mutations/useCancelTaskMutation"; import { type Task, + type TaskFileEntry, useGetTasksQuery, } from "@/app/api/queries/useGetTasksQuery"; import { useAuth } from "@/contexts/auth-context"; @@ -31,6 +32,7 @@ export interface TaskFile { task_id: string; created_at: string; updated_at: string; + error?: string; } interface TaskContextType { tasks: Task[]; @@ -105,6 +107,7 @@ export function TaskProvider({ children }: { children: React.ReactNode }) { task_id: taskId, created_at: now, updated_at: now, + error: file.error, })); setFiles((prevFiles) => [...prevFiles, ...filesToAdd]); @@ -138,12 +141,13 @@ export function TaskProvider({ children }: { children: React.ReactNode }) { taskFileEntries.forEach(([filePath, fileInfo]) => { if (typeof fileInfo === "object" && fileInfo) { + const fileInfoEntry = fileInfo as TaskFileEntry; // Use the filename from backend if available, otherwise extract from path const fileName = - (fileInfo as any).filename || + fileInfoEntry.filename || filePath.split("/").pop() || filePath; - const fileStatus = fileInfo.status as string; + const fileStatus = fileInfoEntry.status ?? "processing"; // Map backend file status to our TaskFile status let mappedStatus: TaskFile["status"]; @@ -162,6 +166,23 @@ export function TaskProvider({ children }: { children: React.ReactNode }) { mappedStatus = "processing"; } + const fileError = (() => { + if ( + typeof fileInfoEntry.error === "string" && + fileInfoEntry.error.trim().length > 0 + ) { + return fileInfoEntry.error.trim(); + } + if ( + mappedStatus === "failed" && + typeof currentTask.error === "string" && + currentTask.error.trim().length > 0 + ) { + return currentTask.error.trim(); + } + return undefined; + })(); + setFiles((prevFiles) => { const existingFileIndex = prevFiles.findIndex( (f) => @@ -185,13 +206,14 @@ export function TaskProvider({ children }: { children: React.ReactNode }) { status: mappedStatus, task_id: currentTask.task_id, created_at: - typeof fileInfo.created_at === "string" - ? fileInfo.created_at + typeof fileInfoEntry.created_at === "string" + ? fileInfoEntry.created_at : now, updated_at: - typeof fileInfo.updated_at === "string" - ? fileInfo.updated_at + typeof fileInfoEntry.updated_at === "string" + ? fileInfoEntry.updated_at : now, + error: fileError, }; if (existingFileIndex >= 0) { From 6266e5c18d7b1de35835c670600557e917e26d16 Mon Sep 17 00:00:00 2001 From: phact Date: Fri, 10 Oct 2025 14:16:58 -0400 Subject: [PATCH 07/52] persist dimensions --- .../src/app/api/queries/useGetSearchQuery.ts | 19 ++++++++++++++ .../src/app/api/queries/useGetTasksQuery.ts | 2 ++ frontend/src/app/globals.css | 2 +- frontend/src/app/knowledge/page.tsx | 26 ++++++++++++++++++- frontend/src/components/layout-wrapper.tsx | 2 +- frontend/src/contexts/task-context.tsx | 12 +++++++++ src/models/processors.py | 3 ++- src/services/search_service.py | 2 ++ src/utils/embedding_fields.py | 5 +++- src/utils/embeddings.py | 3 ++- 10 files changed, 70 insertions(+), 6 deletions(-) diff --git a/frontend/src/app/api/queries/useGetSearchQuery.ts b/frontend/src/app/api/queries/useGetSearchQuery.ts index 3db14cb2..f686cd32 100644 --- a/frontend/src/app/api/queries/useGetSearchQuery.ts +++ b/frontend/src/app/api/queries/useGetSearchQuery.ts @@ -29,6 +29,8 @@ export interface ChunkResult { owner_email?: string; file_size?: number; connector_type?: string; + embedding_model?: string; + embedding_dimensions?: number; index?: number; } @@ -43,6 +45,8 @@ export interface File { owner_email?: string; size: number; connector_type: string; + embedding_model?: string; + embedding_dimensions?: number; status?: | "processing" | "active" @@ -134,6 +138,8 @@ export const useGetSearchQuery = ( owner_email?: string; file_size?: number; connector_type?: string; + embedding_model?: string; + embedding_dimensions?: number; } >(); @@ -142,6 +148,15 @@ export const useGetSearchQuery = ( if (existing) { existing.chunks.push(chunk); existing.totalScore += chunk.score; + if (!existing.embedding_model && chunk.embedding_model) { + existing.embedding_model = chunk.embedding_model; + } + if ( + existing.embedding_dimensions == null && + typeof chunk.embedding_dimensions === "number" + ) { + existing.embedding_dimensions = chunk.embedding_dimensions; + } } else { fileMap.set(chunk.filename, { filename: chunk.filename, @@ -154,6 +169,8 @@ export const useGetSearchQuery = ( owner_email: chunk.owner_email, file_size: chunk.file_size, connector_type: chunk.connector_type, + embedding_model: chunk.embedding_model, + embedding_dimensions: chunk.embedding_dimensions, }); } }); @@ -169,6 +186,8 @@ export const useGetSearchQuery = ( owner_email: file.owner_email || "", size: file.file_size || 0, connector_type: file.connector_type || "local", + embedding_model: file.embedding_model, + embedding_dimensions: file.embedding_dimensions, chunks: file.chunks, })); diff --git a/frontend/src/app/api/queries/useGetTasksQuery.ts b/frontend/src/app/api/queries/useGetTasksQuery.ts index f7420dd7..b8cdba01 100644 --- a/frontend/src/app/api/queries/useGetTasksQuery.ts +++ b/frontend/src/app/api/queries/useGetTasksQuery.ts @@ -19,6 +19,8 @@ export interface TaskFileEntry { updated_at?: string; duration_seconds?: number; filename?: string; + embedding_model?: string; + embedding_dimensions?: number; [key: string]: unknown; } diff --git a/frontend/src/app/globals.css b/frontend/src/app/globals.css index 5b786b7b..d8460fd6 100644 --- a/frontend/src/app/globals.css +++ b/frontend/src/app/globals.css @@ -168,7 +168,7 @@ } .header-notifications { - @apply absolute right-[0px] top-[-4px] h-1 w-1 rounded-full bg-destructive; + @apply absolute right-1 top-1 h-2 w-2 rounded-full bg-destructive; } .header-menu-bar { diff --git a/frontend/src/app/knowledge/page.tsx b/frontend/src/app/knowledge/page.tsx index 385fa9be..9cd28433 100644 --- a/frontend/src/app/knowledge/page.tsx +++ b/frontend/src/app/knowledge/page.tsx @@ -86,6 +86,8 @@ function SearchPage() { connector_type: taskFile.connector_type, status: taskFile.status, error: taskFile.error, + embedding_model: taskFile.embedding_model, + embedding_dimensions: taskFile.embedding_dimensions, }; }); @@ -124,7 +126,7 @@ function SearchPage() { const gridRef = useRef(null); - const columnDefs = [ + const columnDefs: ColDef[] = [ { field: "filename", headerName: "Source", @@ -200,6 +202,28 @@ function SearchPage() { ); }, }, + { + field: "embedding_model", + headerName: "Embedding model", + minWidth: 200, + cellRenderer: ({ data }: CustomCellRendererProps) => ( + + {data?.embedding_model || "—"} + + ), + }, + { + field: "embedding_dimensions", + headerName: "Dimensions", + width: 110, + cellRenderer: ({ data }: CustomCellRendererProps) => ( + + {typeof data?.embedding_dimensions === "number" + ? data.embedding_dimensions.toString() + : "—"} + + ), + }, { field: "status", headerName: "Status", diff --git a/frontend/src/components/layout-wrapper.tsx b/frontend/src/components/layout-wrapper.tsx index fda7c181..d6061384 100644 --- a/frontend/src/components/layout-wrapper.tsx +++ b/frontend/src/components/layout-wrapper.tsx @@ -129,7 +129,7 @@ export function LayoutWrapper({ children }: { children: React.ReactNode }) { {/* Task Notification Bell */}