Add allowDiskUse flag to MongoDB aggregations

- Enable disk use for large aggregations
- Fix cursor handling for list_search_indexes
- Improve query performance for big datasets
- Update vector search index check
- Set proper length for to_list results
This commit is contained in:
yangdx 2025-06-26 13:51:53 +08:00
parent 71565f4794
commit d60db573dc

View file

@ -276,7 +276,7 @@ class MongoDocStatusStorage(DocStatusStorage):
async def get_status_counts(self) -> dict[str, int]: async def get_status_counts(self) -> dict[str, int]:
"""Get counts of documents in each status""" """Get counts of documents in each status"""
pipeline = [{"$group": {"_id": "$status", "count": {"$sum": 1}}}] pipeline = [{"$group": {"_id": "$status", "count": {"$sum": 1}}}]
cursor = self._data.aggregate(pipeline) cursor = self._data.aggregate(pipeline, allowDiskUse=True)
result = await cursor.to_list() result = await cursor.to_list()
counts = {} counts = {}
for doc in result: for doc in result:
@ -527,7 +527,7 @@ class MongoGraphStorage(BaseGraphStorage):
{"$group": {"_id": "$source_node_id", "degree": {"$sum": 1}}}, {"$group": {"_id": "$source_node_id", "degree": {"$sum": 1}}},
] ]
cursor = await self.edge_collection.aggregate(outbound_pipeline) cursor = await self.edge_collection.aggregate(outbound_pipeline, allowDiskUse=True)
async for doc in cursor: async for doc in cursor:
merged_results[doc.get("_id")] = doc.get("degree") merged_results[doc.get("_id")] = doc.get("degree")
@ -537,7 +537,7 @@ class MongoGraphStorage(BaseGraphStorage):
{"$group": {"_id": "$target_node_id", "degree": {"$sum": 1}}}, {"$group": {"_id": "$target_node_id", "degree": {"$sum": 1}}},
] ]
cursor = await self.edge_collection.aggregate(inbound_pipeline) cursor = await self.edge_collection.aggregate(inbound_pipeline, allowDiskUse=True)
async for doc in cursor: async for doc in cursor:
merged_results[doc.get("_id")] = merged_results.get( merged_results[doc.get("_id")] = merged_results.get(
doc.get("_id"), 0 doc.get("_id"), 0
@ -756,7 +756,7 @@ class MongoGraphStorage(BaseGraphStorage):
# Add starting node to pipeline # Add starting node to pipeline
pipeline.insert(0, {"$match": {"_id": label}}) pipeline.insert(0, {"$match": {"_id": label}})
cursor = await self.collection.aggregate(pipeline) cursor = await self.collection.aggregate(pipeline, allowDiskUse=True)
async for doc in cursor: async for doc in cursor:
# Add the start node # Add the start node
node_id = str(doc["_id"]) node_id = str(doc["_id"])
@ -938,7 +938,8 @@ class MongoVectorDBStorage(BaseVectorStorage):
try: try:
index_name = "vector_knn_index" index_name = "vector_knn_index"
indexes = await self._data.list_search_indexes().to_list(length=None) indexes_cursor = await self._data.list_search_indexes()
indexes = await indexes_cursor.to_list(length=None)
for index in indexes: for index in indexes:
if index["name"] == index_name: if index["name"] == index_name:
logger.debug("vector index already exist") logger.debug("vector index already exist")
@ -1033,8 +1034,8 @@ class MongoVectorDBStorage(BaseVectorStorage):
] ]
# Execute the aggregation pipeline # Execute the aggregation pipeline
cursor = self._data.aggregate(pipeline) cursor = await self._data.aggregate(pipeline, allowDiskUse=True)
results = await cursor.to_list() results = await cursor.to_list(length=None)
# Format and return the results with created_at field # Format and return the results with created_at field
return [ return [