Add workspace-aware MongoDB indexing and Atlas Search support

• Add workspace attribute to storage classes
• Use workspace-specific index names
• Implement Atlas Search with fallbacks
• Add entity search and popular labels
• Improve index migration strategy
This commit is contained in:
yangdx 2025-09-20 12:38:41 +08:00
parent 223397a247
commit 6f85bd6b19

View file

@ -1,4 +1,5 @@
import os
import re
import time
from dataclasses import dataclass, field
import numpy as np
@ -112,6 +113,7 @@ class MongoKVStorage(BaseKVStorage):
# Keep original namespace unchanged for type detection logic
if effective_workspace:
self.final_namespace = f"{effective_workspace}_{self.namespace}"
self.workspace = effective_workspace
logger.debug(
f"Final namespace with workspace prefix: '{self.final_namespace}'"
)
@ -335,6 +337,7 @@ class MongoDocStatusStorage(DocStatusStorage):
# Keep original namespace unchanged for type detection logic
if effective_workspace:
self.final_namespace = f"{effective_workspace}_{self.namespace}"
self.workspace = effective_workspace
logger.debug(
f"Final namespace with workspace prefix: '{self.final_namespace}'"
)
@ -474,6 +477,7 @@ class MongoDocStatusStorage(DocStatusStorage):
async def create_and_migrate_indexes_if_not_exists(self):
"""Create indexes to optimize pagination queries and migrate file_path indexes for Chinese collation"""
try:
# Get indexes for the current collection only
indexes_cursor = await self._data.list_indexes()
existing_indexes = await indexes_cursor.to_list(length=None)
existing_index_names = {idx.get("name", "") for idx in existing_indexes}
@ -481,80 +485,67 @@ class MongoDocStatusStorage(DocStatusStorage):
# Define collation configuration for Chinese pinyin sorting
collation_config = {"locale": "zh", "numericOrdering": True}
# 1. Define all indexes needed (including original pagination indexes and new collation indexes)
# Use workspace-specific index names to avoid cross-workspace conflicts
workspace_prefix = f"{self.workspace}_" if self.workspace != "_" else ""
# 1. Define all indexes needed with workspace-specific names
all_indexes = [
# Original pagination indexes
{
"name": "status_updated_at",
"name": f"{workspace_prefix}status_updated_at",
"keys": [("status", 1), ("updated_at", -1)],
},
{
"name": "status_created_at",
"name": f"{workspace_prefix}status_created_at",
"keys": [("status", 1), ("created_at", -1)],
},
{"name": "updated_at", "keys": [("updated_at", -1)]},
{"name": "created_at", "keys": [("created_at", -1)]},
{"name": "id", "keys": [("_id", 1)]},
{"name": "track_id", "keys": [("track_id", 1)]},
# New file_path indexes with Chinese collation
{"name": f"{workspace_prefix}updated_at", "keys": [("updated_at", -1)]},
{"name": f"{workspace_prefix}created_at", "keys": [("created_at", -1)]},
{"name": f"{workspace_prefix}id", "keys": [("_id", 1)]},
{"name": f"{workspace_prefix}track_id", "keys": [("track_id", 1)]},
# New file_path indexes with Chinese collation and workspace-specific names
{
"name": "file_path_zh_collation",
"name": f"{workspace_prefix}file_path_zh_collation",
"keys": [("file_path", 1)],
"collation": collation_config,
},
{
"name": "status_file_path_zh_collation",
"name": f"{workspace_prefix}status_file_path_zh_collation",
"keys": [("status", 1), ("file_path", 1)],
"collation": collation_config,
},
]
# 2. Handle index migration: drop conflicting indexes with different names but same key patterns
for index_info in all_indexes:
target_keys = index_info["keys"]
target_name = index_info["name"]
target_collation = index_info.get("collation")
# 2. Handle legacy index cleanup: only drop old indexes that exist in THIS collection
legacy_index_names = [
"file_path_zh_collation",
"status_file_path_zh_collation",
"status_updated_at",
"status_created_at",
"updated_at",
"created_at",
"id",
"track_id",
]
# Find existing indexes with the same key pattern but different names or collation
conflicting_indexes = []
for idx in existing_indexes:
idx_name = idx.get("name", "")
idx_keys = list(idx.get("key", {}).items())
idx_collation = idx.get("collation")
# Skip the _id_ index (MongoDB default)
if idx_name == "_id_":
continue
# Check if keys match but name or collation differs
if idx_keys == target_keys:
if (
idx_name != target_name
or (target_collation and not idx_collation)
or (not target_collation and idx_collation)
or (
target_collation
and idx_collation
and target_collation != idx_collation
)
):
conflicting_indexes.append(idx_name)
# Drop conflicting indexes
for conflicting_name in conflicting_indexes:
for legacy_name in legacy_index_names:
if (
legacy_name in existing_index_names
and legacy_name
!= f"{workspace_prefix}{legacy_name.replace(workspace_prefix, '')}"
):
try:
await self._data.drop_index(conflicting_name)
logger.info(
f"[{self.workspace}] Migrated: dropped conflicting index '{conflicting_name}' for collection {self._collection_name}"
await self._data.drop_index(legacy_name)
logger.debug(
f"[{self.workspace}] Migrated: dropped legacy index '{legacy_name}' from collection {self._collection_name}"
)
# Remove from existing_index_names to allow recreation
existing_index_names.discard(conflicting_name)
existing_index_names.discard(legacy_name)
except PyMongoError as drop_error:
logger.warning(
f"[{self.workspace}] Failed to drop conflicting index '{conflicting_name}': {drop_error}"
f"[{self.workspace}] Failed to drop legacy index '{legacy_name}' from collection {self._collection_name}: {drop_error}"
)
# 3. Create all needed indexes
# 3. Create all needed indexes with workspace-specific names
for index_info in all_indexes:
index_name = index_info["name"]
if index_name not in existing_index_names:
@ -566,7 +557,7 @@ class MongoDocStatusStorage(DocStatusStorage):
await self._data.create_index(
index_info["keys"], **create_kwargs
)
logger.info(
logger.debug(
f"[{self.workspace}] Created index '{index_name}' for collection {self._collection_name}"
)
except PyMongoError as create_error:
@ -734,6 +725,7 @@ class MongoGraphStorage(BaseGraphStorage):
# Keep original namespace unchanged for type detection logic
if effective_workspace:
self.final_namespace = f"{effective_workspace}_{self.namespace}"
self.workspace = effective_workspace
logger.debug(
f"Final namespace with workspace prefix: '{self.final_namespace}'"
)
@ -757,6 +749,10 @@ class MongoGraphStorage(BaseGraphStorage):
self.edge_collection = await get_or_create_collection(
self.db, self._edge_collection_name
)
# Create Atlas Search index for better search performance if possible
await self.create_search_index_if_not_exists()
logger.debug(
f"[{self.workspace}] Use MongoDB as KG {self._collection_name}"
)
@ -1612,6 +1608,403 @@ class MongoGraphStorage(BaseGraphStorage):
edges.append(edge_dict)
return edges
async def get_popular_labels(self, limit: int = 300) -> list[str]:
"""Get popular labels by node degree (most connected entities)
Args:
limit: Maximum number of labels to return
Returns:
List of labels sorted by degree (highest first)
"""
try:
# Use aggregation pipeline to count edges per node and sort by degree
pipeline = [
# Count outbound edges
{"$group": {"_id": "$source_node_id", "out_degree": {"$sum": 1}}},
# Union with inbound edges count
{
"$unionWith": {
"coll": self._edge_collection_name,
"pipeline": [
{
"$group": {
"_id": "$target_node_id",
"in_degree": {"$sum": 1},
}
}
],
}
},
# Group by node_id and sum degrees
{
"$group": {
"_id": "$_id",
"total_degree": {
"$sum": {
"$add": [
{"$ifNull": ["$out_degree", 0]},
{"$ifNull": ["$in_degree", 0]},
]
}
},
}
},
# Sort by degree descending, then by label ascending
{"$sort": {"total_degree": -1, "_id": 1}},
# Limit results
{"$limit": limit},
# Project only the label
{"$project": {"_id": 1}},
]
cursor = await self.edge_collection.aggregate(pipeline, allowDiskUse=True)
labels = []
async for doc in cursor:
if doc.get("_id"):
labels.append(doc["_id"])
logger.debug(
f"[{self.workspace}] Retrieved {len(labels)} popular labels (limit: {limit})"
)
return labels
except Exception as e:
logger.error(f"[{self.workspace}] Error getting popular labels: {str(e)}")
return []
async def _try_atlas_text_search(self, query_strip: str, limit: int) -> list[str]:
"""Try Atlas Search using simple text search."""
try:
pipeline = [
{
"$search": {
"index": "entity_id_search_idx",
"text": {"query": query_strip, "path": "_id"},
}
},
{"$project": {"_id": 1, "score": {"$meta": "searchScore"}}},
{"$limit": limit},
]
cursor = await self.collection.aggregate(pipeline)
labels = [doc["_id"] async for doc in cursor if doc.get("_id")]
if labels:
logger.debug(
f"[{self.workspace}] Atlas text search returned {len(labels)} results"
)
return labels
return []
except PyMongoError as e:
logger.debug(f"[{self.workspace}] Atlas text search failed: {e}")
return []
async def _try_atlas_autocomplete_search(
self, query_strip: str, limit: int
) -> list[str]:
"""Try Atlas Search using autocomplete for prefix matching."""
try:
pipeline = [
{
"$search": {
"index": "entity_id_search_idx",
"autocomplete": {
"query": query_strip,
"path": "_id",
"fuzzy": {"maxEdits": 1, "prefixLength": 1},
},
}
},
{"$project": {"_id": 1, "score": {"$meta": "searchScore"}}},
{"$limit": limit},
]
cursor = await self.collection.aggregate(pipeline)
labels = [doc["_id"] async for doc in cursor if doc.get("_id")]
if labels:
logger.debug(
f"[{self.workspace}] Atlas autocomplete search returned {len(labels)} results"
)
return labels
return []
except PyMongoError as e:
logger.debug(f"[{self.workspace}] Atlas autocomplete search failed: {e}")
return []
async def _try_atlas_compound_search(
self, query_strip: str, limit: int
) -> list[str]:
"""Try Atlas Search using compound query for comprehensive matching."""
try:
pipeline = [
{
"$search": {
"index": "entity_id_search_idx",
"compound": {
"should": [
{
"text": {
"query": query_strip,
"path": "_id",
"score": {"boost": {"value": 10}},
}
},
{
"autocomplete": {
"query": query_strip,
"path": "_id",
"score": {"boost": {"value": 5}},
"fuzzy": {"maxEdits": 1, "prefixLength": 1},
}
},
{
"wildcard": {
"query": f"*{query_strip}*",
"path": "_id",
"score": {"boost": {"value": 2}},
}
},
],
"minimumShouldMatch": 1,
},
}
},
{"$project": {"_id": 1, "score": {"$meta": "searchScore"}}},
{"$sort": {"score": {"$meta": "searchScore"}}},
{"$limit": limit},
]
cursor = await self.collection.aggregate(pipeline)
labels = [doc["_id"] async for doc in cursor if doc.get("_id")]
if labels:
logger.debug(
f"[{self.workspace}] Atlas compound search returned {len(labels)} results"
)
return labels
return []
except PyMongoError as e:
logger.debug(f"[{self.workspace}] Atlas compound search failed: {e}")
return []
async def _fallback_regex_search(self, query_strip: str, limit: int) -> list[str]:
"""Fallback to regex-based search when Atlas Search fails."""
try:
logger.debug(
f"[{self.workspace}] Using regex fallback search for: '{query_strip}'"
)
escaped_query = re.escape(query_strip)
regex_condition = {"_id": {"$regex": escaped_query, "$options": "i"}}
cursor = self.collection.find(regex_condition, {"_id": 1}).limit(limit * 2)
docs = await cursor.to_list(length=limit * 2)
# Extract labels
labels = []
for doc in docs:
doc_id = doc.get("_id")
if doc_id:
labels.append(doc_id)
# Sort results to prioritize exact matches and starts-with matches
def sort_key(label):
label_lower = label.lower()
query_lower_strip = query_strip.lower()
if label_lower == query_lower_strip:
return (0, label_lower) # Exact match - highest priority
elif label_lower.startswith(query_lower_strip):
return (1, label_lower) # Starts with - medium priority
else:
return (2, label_lower) # Contains - lowest priority
labels.sort(key=sort_key)
labels = labels[:limit] # Apply final limit after sorting
logger.debug(
f"[{self.workspace}] Regex fallback search returned {len(labels)} results (limit: {limit})"
)
return labels
except Exception as e:
logger.error(f"[{self.workspace}] Regex fallback search failed: {e}")
import traceback
logger.error(f"[{self.workspace}] Traceback: {traceback.format_exc()}")
return []
async def search_labels(self, query: str, limit: int = 50) -> list[str]:
"""
Search labels with progressive fallback strategy:
1. Atlas text search (simple and fast)
2. Atlas autocomplete search (prefix matching with fuzzy)
3. Atlas compound search (comprehensive matching)
4. Regex fallback (when Atlas Search is unavailable)
"""
query_strip = query.strip()
if not query_strip:
return []
# First check if we have any nodes at all
try:
node_count = await self.collection.count_documents({})
if node_count == 0:
logger.debug(
f"[{self.workspace}] No nodes found in collection {self._collection_name}"
)
return []
except PyMongoError as e:
logger.error(f"[{self.workspace}] Error counting nodes: {e}")
return []
# Progressive search strategy
search_methods = [
("text", self._try_atlas_text_search),
("autocomplete", self._try_atlas_autocomplete_search),
("compound", self._try_atlas_compound_search),
]
# Try Atlas Search methods in order
for method_name, search_method in search_methods:
try:
labels = await search_method(query_strip, limit)
if labels:
logger.debug(
f"[{self.workspace}] Search successful using {method_name} method: {len(labels)} results"
)
return labels
else:
logger.debug(
f"[{self.workspace}] {method_name} search returned no results, trying next method"
)
except Exception as e:
logger.debug(
f"[{self.workspace}] {method_name} search failed: {e}, trying next method"
)
continue
# If all Atlas Search methods fail, use regex fallback
logger.info(
f"[{self.workspace}] All Atlas Search methods failed, using regex fallback search for: '{query_strip}'"
)
return await self._fallback_regex_search(query_strip, limit)
async def _check_if_index_needs_rebuild(
self, indexes: list, index_name: str
) -> bool:
"""Check if the existing index needs to be rebuilt due to configuration issues."""
for index in indexes:
if index["name"] == index_name:
# Check if the index has the old problematic configuration
definition = index.get("latestDefinition", {})
mappings = definition.get("mappings", {})
fields = mappings.get("fields", {})
id_field = fields.get("_id", {})
# If it's the old single-type autocomplete configuration, rebuild
if (
isinstance(id_field, dict)
and id_field.get("type") == "autocomplete"
):
logger.info(
f"[{self.workspace}] Found old index configuration for '{index_name}', will rebuild"
)
return True
# If it's not a list (multi-type configuration), rebuild
if not isinstance(id_field, list):
logger.info(
f"[{self.workspace}] Index '{index_name}' needs upgrade to multi-type configuration"
)
return True
logger.info(
f"[{self.workspace}] Index '{index_name}' has correct configuration"
)
return False
return True # Index doesn't exist, needs creation
async def _safely_drop_old_index(self, index_name: str):
"""Safely drop the old search index."""
try:
await self.collection.drop_search_index(index_name)
logger.info(
f"[{self.workspace}] Successfully dropped old search index '{index_name}'"
)
except PyMongoError as e:
logger.warning(
f"[{self.workspace}] Could not drop old index '{index_name}': {e}"
)
async def _create_improved_search_index(self, index_name: str):
"""Create an improved search index with multiple field types."""
search_index_model = SearchIndexModel(
definition={
"mappings": {
"dynamic": False,
"fields": {
"_id": [
{
"type": "string",
},
{
"type": "token",
},
{
"type": "autocomplete",
"maxGrams": 15,
"minGrams": 2,
},
]
},
},
"analyzer": "lucene.standard", # Index-level analyzer for text processing
},
name=index_name,
type="search",
)
await self.collection.create_search_index(search_index_model)
logger.info(
f"[{self.workspace}] Created improved Atlas Search index '{index_name}' for collection {self._collection_name}. "
)
logger.info(
f"[{self.workspace}] Index will be built asynchronously, using regex fallback until ready."
)
async def create_search_index_if_not_exists(self):
"""Creates an improved Atlas Search index for entity search, rebuilding if necessary."""
index_name = "entity_id_search_idx"
try:
# Check if we're using MongoDB Atlas (has search index capabilities)
indexes_cursor = await self.collection.list_search_indexes()
indexes = await indexes_cursor.to_list(length=None)
# Check if we need to rebuild the index
needs_rebuild = await self._check_if_index_needs_rebuild(
indexes, index_name
)
if needs_rebuild:
# Check if index exists and drop it
index_exists = any(idx["name"] == index_name for idx in indexes)
if index_exists:
await self._safely_drop_old_index(index_name)
# Create the improved search index (async, no waiting)
await self._create_improved_search_index(index_name)
else:
logger.info(
f"[{self.workspace}] Atlas Search index '{index_name}' already exists with correct configuration"
)
except PyMongoError as e:
# This is expected if not using MongoDB Atlas or if search indexes are not supported
logger.info(
f"[{self.workspace}] Could not create Atlas Search index for {self._collection_name}: {e}. "
"This is normal if not using MongoDB Atlas - search will use regex fallback."
)
except Exception as e:
logger.warning(
f"[{self.workspace}] Unexpected error creating Atlas Search index for {self._collection_name}: {e}"
)
async def drop(self) -> dict[str, str]:
"""Drop the storage by removing all documents in the collection.
@ -1685,6 +2078,7 @@ class MongoVectorDBStorage(BaseVectorStorage):
# Keep original namespace unchanged for type detection logic
if effective_workspace:
self.final_namespace = f"{effective_workspace}_{self.namespace}"
self.workspace = effective_workspace
logger.debug(
f"Final namespace with workspace prefix: '{self.final_namespace}'"
)