From d60db573dc1db438239bb619436af343eb0ca7e3 Mon Sep 17 00:00:00 2001 From: yangdx Date: Thu, 26 Jun 2025 13:51:53 +0800 Subject: [PATCH] Add allowDiskUse flag to MongoDB aggregations - Enable disk use for large aggregations - Fix cursor handling for list_search_indexes - Improve query performance for big datasets - Update vector search index check - Set proper length for to_list results --- lightrag/kg/mongo_impl.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/lightrag/kg/mongo_impl.py b/lightrag/kg/mongo_impl.py index 6be02d1d..f5a87cbe 100644 --- a/lightrag/kg/mongo_impl.py +++ b/lightrag/kg/mongo_impl.py @@ -276,7 +276,7 @@ class MongoDocStatusStorage(DocStatusStorage): async def get_status_counts(self) -> dict[str, int]: """Get counts of documents in each status""" pipeline = [{"$group": {"_id": "$status", "count": {"$sum": 1}}}] - cursor = self._data.aggregate(pipeline) + cursor = self._data.aggregate(pipeline, allowDiskUse=True) result = await cursor.to_list() counts = {} for doc in result: @@ -527,7 +527,7 @@ class MongoGraphStorage(BaseGraphStorage): {"$group": {"_id": "$source_node_id", "degree": {"$sum": 1}}}, ] - cursor = await self.edge_collection.aggregate(outbound_pipeline) + cursor = await self.edge_collection.aggregate(outbound_pipeline, allowDiskUse=True) async for doc in cursor: merged_results[doc.get("_id")] = doc.get("degree") @@ -537,7 +537,7 @@ class MongoGraphStorage(BaseGraphStorage): {"$group": {"_id": "$target_node_id", "degree": {"$sum": 1}}}, ] - cursor = await self.edge_collection.aggregate(inbound_pipeline) + cursor = await self.edge_collection.aggregate(inbound_pipeline, allowDiskUse=True) async for doc in cursor: merged_results[doc.get("_id")] = merged_results.get( doc.get("_id"), 0 @@ -756,7 +756,7 @@ class MongoGraphStorage(BaseGraphStorage): # Add starting node to pipeline pipeline.insert(0, {"$match": {"_id": label}}) - cursor = await self.collection.aggregate(pipeline) + cursor = await self.collection.aggregate(pipeline, allowDiskUse=True) async for doc in cursor: # Add the start node node_id = str(doc["_id"]) @@ -938,7 +938,8 @@ class MongoVectorDBStorage(BaseVectorStorage): try: index_name = "vector_knn_index" - indexes = await self._data.list_search_indexes().to_list(length=None) + indexes_cursor = await self._data.list_search_indexes() + indexes = await indexes_cursor.to_list(length=None) for index in indexes: if index["name"] == index_name: logger.debug("vector index already exist") @@ -1033,8 +1034,8 @@ class MongoVectorDBStorage(BaseVectorStorage): ] # Execute the aggregation pipeline - cursor = self._data.aggregate(pipeline) - results = await cursor.to_list() + cursor = await self._data.aggregate(pipeline, allowDiskUse=True) + results = await cursor.to_list(length=None) # Format and return the results with created_at field return [