From 6f85bd6b1995f91021700090feb5e63b71e6fb1b Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 20 Sep 2025 12:38:41 +0800 Subject: [PATCH] Add workspace-aware MongoDB indexing and Atlas Search support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Add workspace attribute to storage classes • Use workspace-specific index names • Implement Atlas Search with fallbacks • Add entity search and popular labels • Improve index migration strategy --- lightrag/kg/mongo_impl.py | 494 ++++++++++++++++++++++++++++++++++---- 1 file changed, 444 insertions(+), 50 deletions(-) diff --git a/lightrag/kg/mongo_impl.py b/lightrag/kg/mongo_impl.py index 9e4d7e67..ce8e10fd 100644 --- a/lightrag/kg/mongo_impl.py +++ b/lightrag/kg/mongo_impl.py @@ -1,4 +1,5 @@ import os +import re import time from dataclasses import dataclass, field import numpy as np @@ -112,6 +113,7 @@ class MongoKVStorage(BaseKVStorage): # Keep original namespace unchanged for type detection logic if effective_workspace: self.final_namespace = f"{effective_workspace}_{self.namespace}" + self.workspace = effective_workspace logger.debug( f"Final namespace with workspace prefix: '{self.final_namespace}'" ) @@ -335,6 +337,7 @@ class MongoDocStatusStorage(DocStatusStorage): # Keep original namespace unchanged for type detection logic if effective_workspace: self.final_namespace = f"{effective_workspace}_{self.namespace}" + self.workspace = effective_workspace logger.debug( f"Final namespace with workspace prefix: '{self.final_namespace}'" ) @@ -474,6 +477,7 @@ class MongoDocStatusStorage(DocStatusStorage): async def create_and_migrate_indexes_if_not_exists(self): """Create indexes to optimize pagination queries and migrate file_path indexes for Chinese collation""" try: + # Get indexes for the current collection only indexes_cursor = await self._data.list_indexes() existing_indexes = await indexes_cursor.to_list(length=None) existing_index_names = {idx.get("name", "") for idx in existing_indexes} @@ -481,80 +485,67 @@ class MongoDocStatusStorage(DocStatusStorage): # Define collation configuration for Chinese pinyin sorting collation_config = {"locale": "zh", "numericOrdering": True} - # 1. Define all indexes needed (including original pagination indexes and new collation indexes) + # Use workspace-specific index names to avoid cross-workspace conflicts + workspace_prefix = f"{self.workspace}_" if self.workspace != "_" else "" + + # 1. Define all indexes needed with workspace-specific names all_indexes = [ # Original pagination indexes { - "name": "status_updated_at", + "name": f"{workspace_prefix}status_updated_at", "keys": [("status", 1), ("updated_at", -1)], }, { - "name": "status_created_at", + "name": f"{workspace_prefix}status_created_at", "keys": [("status", 1), ("created_at", -1)], }, - {"name": "updated_at", "keys": [("updated_at", -1)]}, - {"name": "created_at", "keys": [("created_at", -1)]}, - {"name": "id", "keys": [("_id", 1)]}, - {"name": "track_id", "keys": [("track_id", 1)]}, - # New file_path indexes with Chinese collation + {"name": f"{workspace_prefix}updated_at", "keys": [("updated_at", -1)]}, + {"name": f"{workspace_prefix}created_at", "keys": [("created_at", -1)]}, + {"name": f"{workspace_prefix}id", "keys": [("_id", 1)]}, + {"name": f"{workspace_prefix}track_id", "keys": [("track_id", 1)]}, + # New file_path indexes with Chinese collation and workspace-specific names { - "name": "file_path_zh_collation", + "name": f"{workspace_prefix}file_path_zh_collation", "keys": [("file_path", 1)], "collation": collation_config, }, { - "name": "status_file_path_zh_collation", + "name": f"{workspace_prefix}status_file_path_zh_collation", "keys": [("status", 1), ("file_path", 1)], "collation": collation_config, }, ] - # 2. Handle index migration: drop conflicting indexes with different names but same key patterns - for index_info in all_indexes: - target_keys = index_info["keys"] - target_name = index_info["name"] - target_collation = index_info.get("collation") + # 2. Handle legacy index cleanup: only drop old indexes that exist in THIS collection + legacy_index_names = [ + "file_path_zh_collation", + "status_file_path_zh_collation", + "status_updated_at", + "status_created_at", + "updated_at", + "created_at", + "id", + "track_id", + ] - # Find existing indexes with the same key pattern but different names or collation - conflicting_indexes = [] - for idx in existing_indexes: - idx_name = idx.get("name", "") - idx_keys = list(idx.get("key", {}).items()) - idx_collation = idx.get("collation") - - # Skip the _id_ index (MongoDB default) - if idx_name == "_id_": - continue - - # Check if keys match but name or collation differs - if idx_keys == target_keys: - if ( - idx_name != target_name - or (target_collation and not idx_collation) - or (not target_collation and idx_collation) - or ( - target_collation - and idx_collation - and target_collation != idx_collation - ) - ): - conflicting_indexes.append(idx_name) - - # Drop conflicting indexes - for conflicting_name in conflicting_indexes: + for legacy_name in legacy_index_names: + if ( + legacy_name in existing_index_names + and legacy_name + != f"{workspace_prefix}{legacy_name.replace(workspace_prefix, '')}" + ): try: - await self._data.drop_index(conflicting_name) - logger.info( - f"[{self.workspace}] Migrated: dropped conflicting index '{conflicting_name}' for collection {self._collection_name}" + await self._data.drop_index(legacy_name) + logger.debug( + f"[{self.workspace}] Migrated: dropped legacy index '{legacy_name}' from collection {self._collection_name}" ) - # Remove from existing_index_names to allow recreation - existing_index_names.discard(conflicting_name) + existing_index_names.discard(legacy_name) except PyMongoError as drop_error: logger.warning( - f"[{self.workspace}] Failed to drop conflicting index '{conflicting_name}': {drop_error}" + f"[{self.workspace}] Failed to drop legacy index '{legacy_name}' from collection {self._collection_name}: {drop_error}" ) - # 3. Create all needed indexes + # 3. Create all needed indexes with workspace-specific names for index_info in all_indexes: index_name = index_info["name"] if index_name not in existing_index_names: @@ -566,7 +557,7 @@ class MongoDocStatusStorage(DocStatusStorage): await self._data.create_index( index_info["keys"], **create_kwargs ) - logger.info( + logger.debug( f"[{self.workspace}] Created index '{index_name}' for collection {self._collection_name}" ) except PyMongoError as create_error: @@ -734,6 +725,7 @@ class MongoGraphStorage(BaseGraphStorage): # Keep original namespace unchanged for type detection logic if effective_workspace: self.final_namespace = f"{effective_workspace}_{self.namespace}" + self.workspace = effective_workspace logger.debug( f"Final namespace with workspace prefix: '{self.final_namespace}'" ) @@ -757,6 +749,10 @@ class MongoGraphStorage(BaseGraphStorage): self.edge_collection = await get_or_create_collection( self.db, self._edge_collection_name ) + + # Create Atlas Search index for better search performance if possible + await self.create_search_index_if_not_exists() + logger.debug( f"[{self.workspace}] Use MongoDB as KG {self._collection_name}" ) @@ -1612,6 +1608,403 @@ class MongoGraphStorage(BaseGraphStorage): edges.append(edge_dict) return edges + async def get_popular_labels(self, limit: int = 300) -> list[str]: + """Get popular labels by node degree (most connected entities) + + Args: + limit: Maximum number of labels to return + + Returns: + List of labels sorted by degree (highest first) + """ + try: + # Use aggregation pipeline to count edges per node and sort by degree + pipeline = [ + # Count outbound edges + {"$group": {"_id": "$source_node_id", "out_degree": {"$sum": 1}}}, + # Union with inbound edges count + { + "$unionWith": { + "coll": self._edge_collection_name, + "pipeline": [ + { + "$group": { + "_id": "$target_node_id", + "in_degree": {"$sum": 1}, + } + } + ], + } + }, + # Group by node_id and sum degrees + { + "$group": { + "_id": "$_id", + "total_degree": { + "$sum": { + "$add": [ + {"$ifNull": ["$out_degree", 0]}, + {"$ifNull": ["$in_degree", 0]}, + ] + } + }, + } + }, + # Sort by degree descending, then by label ascending + {"$sort": {"total_degree": -1, "_id": 1}}, + # Limit results + {"$limit": limit}, + # Project only the label + {"$project": {"_id": 1}}, + ] + + cursor = await self.edge_collection.aggregate(pipeline, allowDiskUse=True) + labels = [] + async for doc in cursor: + if doc.get("_id"): + labels.append(doc["_id"]) + + logger.debug( + f"[{self.workspace}] Retrieved {len(labels)} popular labels (limit: {limit})" + ) + return labels + except Exception as e: + logger.error(f"[{self.workspace}] Error getting popular labels: {str(e)}") + return [] + + async def _try_atlas_text_search(self, query_strip: str, limit: int) -> list[str]: + """Try Atlas Search using simple text search.""" + try: + pipeline = [ + { + "$search": { + "index": "entity_id_search_idx", + "text": {"query": query_strip, "path": "_id"}, + } + }, + {"$project": {"_id": 1, "score": {"$meta": "searchScore"}}}, + {"$limit": limit}, + ] + cursor = await self.collection.aggregate(pipeline) + labels = [doc["_id"] async for doc in cursor if doc.get("_id")] + if labels: + logger.debug( + f"[{self.workspace}] Atlas text search returned {len(labels)} results" + ) + return labels + return [] + except PyMongoError as e: + logger.debug(f"[{self.workspace}] Atlas text search failed: {e}") + return [] + + async def _try_atlas_autocomplete_search( + self, query_strip: str, limit: int + ) -> list[str]: + """Try Atlas Search using autocomplete for prefix matching.""" + try: + pipeline = [ + { + "$search": { + "index": "entity_id_search_idx", + "autocomplete": { + "query": query_strip, + "path": "_id", + "fuzzy": {"maxEdits": 1, "prefixLength": 1}, + }, + } + }, + {"$project": {"_id": 1, "score": {"$meta": "searchScore"}}}, + {"$limit": limit}, + ] + cursor = await self.collection.aggregate(pipeline) + labels = [doc["_id"] async for doc in cursor if doc.get("_id")] + if labels: + logger.debug( + f"[{self.workspace}] Atlas autocomplete search returned {len(labels)} results" + ) + return labels + return [] + except PyMongoError as e: + logger.debug(f"[{self.workspace}] Atlas autocomplete search failed: {e}") + return [] + + async def _try_atlas_compound_search( + self, query_strip: str, limit: int + ) -> list[str]: + """Try Atlas Search using compound query for comprehensive matching.""" + try: + pipeline = [ + { + "$search": { + "index": "entity_id_search_idx", + "compound": { + "should": [ + { + "text": { + "query": query_strip, + "path": "_id", + "score": {"boost": {"value": 10}}, + } + }, + { + "autocomplete": { + "query": query_strip, + "path": "_id", + "score": {"boost": {"value": 5}}, + "fuzzy": {"maxEdits": 1, "prefixLength": 1}, + } + }, + { + "wildcard": { + "query": f"*{query_strip}*", + "path": "_id", + "score": {"boost": {"value": 2}}, + } + }, + ], + "minimumShouldMatch": 1, + }, + } + }, + {"$project": {"_id": 1, "score": {"$meta": "searchScore"}}}, + {"$sort": {"score": {"$meta": "searchScore"}}}, + {"$limit": limit}, + ] + cursor = await self.collection.aggregate(pipeline) + labels = [doc["_id"] async for doc in cursor if doc.get("_id")] + if labels: + logger.debug( + f"[{self.workspace}] Atlas compound search returned {len(labels)} results" + ) + return labels + return [] + except PyMongoError as e: + logger.debug(f"[{self.workspace}] Atlas compound search failed: {e}") + return [] + + async def _fallback_regex_search(self, query_strip: str, limit: int) -> list[str]: + """Fallback to regex-based search when Atlas Search fails.""" + try: + logger.debug( + f"[{self.workspace}] Using regex fallback search for: '{query_strip}'" + ) + + escaped_query = re.escape(query_strip) + regex_condition = {"_id": {"$regex": escaped_query, "$options": "i"}} + cursor = self.collection.find(regex_condition, {"_id": 1}).limit(limit * 2) + docs = await cursor.to_list(length=limit * 2) + + # Extract labels + labels = [] + for doc in docs: + doc_id = doc.get("_id") + if doc_id: + labels.append(doc_id) + + # Sort results to prioritize exact matches and starts-with matches + def sort_key(label): + label_lower = label.lower() + query_lower_strip = query_strip.lower() + + if label_lower == query_lower_strip: + return (0, label_lower) # Exact match - highest priority + elif label_lower.startswith(query_lower_strip): + return (1, label_lower) # Starts with - medium priority + else: + return (2, label_lower) # Contains - lowest priority + + labels.sort(key=sort_key) + labels = labels[:limit] # Apply final limit after sorting + + logger.debug( + f"[{self.workspace}] Regex fallback search returned {len(labels)} results (limit: {limit})" + ) + return labels + + except Exception as e: + logger.error(f"[{self.workspace}] Regex fallback search failed: {e}") + import traceback + + logger.error(f"[{self.workspace}] Traceback: {traceback.format_exc()}") + return [] + + async def search_labels(self, query: str, limit: int = 50) -> list[str]: + """ + Search labels with progressive fallback strategy: + 1. Atlas text search (simple and fast) + 2. Atlas autocomplete search (prefix matching with fuzzy) + 3. Atlas compound search (comprehensive matching) + 4. Regex fallback (when Atlas Search is unavailable) + """ + query_strip = query.strip() + if not query_strip: + return [] + + # First check if we have any nodes at all + try: + node_count = await self.collection.count_documents({}) + if node_count == 0: + logger.debug( + f"[{self.workspace}] No nodes found in collection {self._collection_name}" + ) + return [] + except PyMongoError as e: + logger.error(f"[{self.workspace}] Error counting nodes: {e}") + return [] + + # Progressive search strategy + search_methods = [ + ("text", self._try_atlas_text_search), + ("autocomplete", self._try_atlas_autocomplete_search), + ("compound", self._try_atlas_compound_search), + ] + + # Try Atlas Search methods in order + for method_name, search_method in search_methods: + try: + labels = await search_method(query_strip, limit) + if labels: + logger.debug( + f"[{self.workspace}] Search successful using {method_name} method: {len(labels)} results" + ) + return labels + else: + logger.debug( + f"[{self.workspace}] {method_name} search returned no results, trying next method" + ) + except Exception as e: + logger.debug( + f"[{self.workspace}] {method_name} search failed: {e}, trying next method" + ) + continue + + # If all Atlas Search methods fail, use regex fallback + logger.info( + f"[{self.workspace}] All Atlas Search methods failed, using regex fallback search for: '{query_strip}'" + ) + return await self._fallback_regex_search(query_strip, limit) + + async def _check_if_index_needs_rebuild( + self, indexes: list, index_name: str + ) -> bool: + """Check if the existing index needs to be rebuilt due to configuration issues.""" + for index in indexes: + if index["name"] == index_name: + # Check if the index has the old problematic configuration + definition = index.get("latestDefinition", {}) + mappings = definition.get("mappings", {}) + fields = mappings.get("fields", {}) + id_field = fields.get("_id", {}) + + # If it's the old single-type autocomplete configuration, rebuild + if ( + isinstance(id_field, dict) + and id_field.get("type") == "autocomplete" + ): + logger.info( + f"[{self.workspace}] Found old index configuration for '{index_name}', will rebuild" + ) + return True + + # If it's not a list (multi-type configuration), rebuild + if not isinstance(id_field, list): + logger.info( + f"[{self.workspace}] Index '{index_name}' needs upgrade to multi-type configuration" + ) + return True + + logger.info( + f"[{self.workspace}] Index '{index_name}' has correct configuration" + ) + return False + return True # Index doesn't exist, needs creation + + async def _safely_drop_old_index(self, index_name: str): + """Safely drop the old search index.""" + try: + await self.collection.drop_search_index(index_name) + logger.info( + f"[{self.workspace}] Successfully dropped old search index '{index_name}'" + ) + except PyMongoError as e: + logger.warning( + f"[{self.workspace}] Could not drop old index '{index_name}': {e}" + ) + + async def _create_improved_search_index(self, index_name: str): + """Create an improved search index with multiple field types.""" + search_index_model = SearchIndexModel( + definition={ + "mappings": { + "dynamic": False, + "fields": { + "_id": [ + { + "type": "string", + }, + { + "type": "token", + }, + { + "type": "autocomplete", + "maxGrams": 15, + "minGrams": 2, + }, + ] + }, + }, + "analyzer": "lucene.standard", # Index-level analyzer for text processing + }, + name=index_name, + type="search", + ) + + await self.collection.create_search_index(search_index_model) + logger.info( + f"[{self.workspace}] Created improved Atlas Search index '{index_name}' for collection {self._collection_name}. " + ) + logger.info( + f"[{self.workspace}] Index will be built asynchronously, using regex fallback until ready." + ) + + async def create_search_index_if_not_exists(self): + """Creates an improved Atlas Search index for entity search, rebuilding if necessary.""" + index_name = "entity_id_search_idx" + + try: + # Check if we're using MongoDB Atlas (has search index capabilities) + indexes_cursor = await self.collection.list_search_indexes() + indexes = await indexes_cursor.to_list(length=None) + + # Check if we need to rebuild the index + needs_rebuild = await self._check_if_index_needs_rebuild( + indexes, index_name + ) + + if needs_rebuild: + # Check if index exists and drop it + index_exists = any(idx["name"] == index_name for idx in indexes) + if index_exists: + await self._safely_drop_old_index(index_name) + + # Create the improved search index (async, no waiting) + await self._create_improved_search_index(index_name) + else: + logger.info( + f"[{self.workspace}] Atlas Search index '{index_name}' already exists with correct configuration" + ) + + except PyMongoError as e: + # This is expected if not using MongoDB Atlas or if search indexes are not supported + logger.info( + f"[{self.workspace}] Could not create Atlas Search index for {self._collection_name}: {e}. " + "This is normal if not using MongoDB Atlas - search will use regex fallback." + ) + except Exception as e: + logger.warning( + f"[{self.workspace}] Unexpected error creating Atlas Search index for {self._collection_name}: {e}" + ) + async def drop(self) -> dict[str, str]: """Drop the storage by removing all documents in the collection. @@ -1685,6 +2078,7 @@ class MongoVectorDBStorage(BaseVectorStorage): # Keep original namespace unchanged for type detection logic if effective_workspace: self.final_namespace = f"{effective_workspace}_{self.namespace}" + self.workspace = effective_workspace logger.debug( f"Final namespace with workspace prefix: '{self.final_namespace}'" )