diff --git a/lightrag/kg/deprecated/chroma_impl.py b/lightrag/kg/deprecated/chroma_impl.py index 75a7d4bf..54bf9037 100644 --- a/lightrag/kg/deprecated/chroma_impl.py +++ b/lightrag/kg/deprecated/chroma_impl.py @@ -295,17 +295,23 @@ class ChromaVectorDBStorage(BaseVectorStorage): if not result or not result["ids"] or len(result["ids"]) == 0: return [] - # Format the results to match the expected structure - return [ - { - "id": result["ids"][i], + # Format the results to match the expected structure and preserve ordering + formatted_map: dict[str, dict[str, Any]] = {} + for i, result_id in enumerate(result["ids"]): + record = { + "id": result_id, "vector": result["embeddings"][i], "content": result["documents"][i], "created_at": result["metadatas"][i].get("created_at"), **result["metadatas"][i], } - for i in range(len(result["ids"])) - ] + formatted_map[str(result_id)] = record + + ordered_results: list[dict[str, Any] | None] = [] + for requested_id in ids: + ordered_results.append(formatted_map.get(str(requested_id))) + + return ordered_results except Exception as e: logger.error(f"Error retrieving vector data for IDs {ids}: {e}") return [] diff --git a/lightrag/kg/json_doc_status_impl.py b/lightrag/kg/json_doc_status_impl.py index fb506018..4630b439 100644 --- a/lightrag/kg/json_doc_status_impl.py +++ b/lightrag/kg/json_doc_status_impl.py @@ -78,15 +78,17 @@ class JsonDocStatusStorage(DocStatusStorage): return set(keys) - set(self._data.keys()) async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: - result: list[dict[str, Any]] = [] + ordered_results: list[dict[str, Any] | None] = [] if self._storage_lock is None: raise StorageNotInitializedError("JsonDocStatusStorage") async with self._storage_lock: for id in ids: data = self._data.get(id, None) if data: - result.append(data) - return result + ordered_results.append(data.copy()) + else: + ordered_results.append(None) + return ordered_results async def get_status_counts(self) -> dict[str, int]: """Get counts of documents in each status""" diff --git a/lightrag/kg/milvus_impl.py b/lightrag/kg/milvus_impl.py index 0ea65905..c0df3521 100644 --- a/lightrag/kg/milvus_impl.py +++ b/lightrag/kg/milvus_impl.py @@ -1258,7 +1258,22 @@ class MilvusVectorDBStorage(BaseVectorStorage): output_fields=output_fields, ) - return result or [] + if not result: + return [] + + result_map: dict[str, dict[str, Any]] = {} + for row in result: + if not row: + continue + row_id = row.get("id") + if row_id is not None: + result_map[str(row_id)] = row + + ordered_results: list[dict[str, Any] | None] = [] + for requested_id in ids: + ordered_results.append(result_map.get(str(requested_id))) + + return ordered_results except Exception as e: logger.error( f"[{self.workspace}] Error retrieving vector data for IDs {ids}: {e}" diff --git a/lightrag/kg/mongo_impl.py b/lightrag/kg/mongo_impl.py index 5351b0a1..a62c3031 100644 --- a/lightrag/kg/mongo_impl.py +++ b/lightrag/kg/mongo_impl.py @@ -17,7 +17,6 @@ from ..base import ( DocStatusStorage, ) from ..utils import logger, compute_mdhash_id -from ..utils_context import get_current_tenant_id from ..types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge from ..constants import GRAPH_FIELD_SEP from ..kg.shared_storage import get_data_init_lock, get_storage_lock, get_graph_db_lock @@ -113,21 +112,15 @@ class MongoKVStorage(BaseKVStorage): # Build final_namespace with workspace prefix for data isolation # Keep original namespace unchanged for type detection logic if effective_workspace: + self.final_namespace = f"{effective_workspace}_{self.namespace}" self.workspace = effective_workspace - else: - self.workspace = "_" - - # Get composite workspace (supports multi-tenant isolation) - composite_workspace = self._get_composite_workspace() - - if composite_workspace and composite_workspace != "_": - self.final_namespace = f"{composite_workspace}_{self.namespace}" logger.debug( - f"Final namespace with composite workspace: '{self.final_namespace}'" + f"Final namespace with workspace prefix: '{self.final_namespace}'" ) else: # When workspace is empty, final_namespace equals original namespace self.final_namespace = self.namespace + self.workspace = "_" logger.debug( f"[{self.workspace}] Final namespace (no workspace): '{self.namespace}'" ) @@ -153,12 +146,7 @@ class MongoKVStorage(BaseKVStorage): async def get_by_id(self, id: str) -> dict[str, Any] | None: # Unified handling for flattened keys - query = {"_id": id} - tenant_id = get_current_tenant_id() - if tenant_id: - query["tenant_id"] = tenant_id - - doc = await self._data.find_one(query) + doc = await self._data.find_one({"_id": id}) if doc: # Ensure time fields are present, provide default values for old data doc.setdefault("create_time", 0) @@ -166,26 +154,24 @@ class MongoKVStorage(BaseKVStorage): return doc async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: - query = {"_id": {"$in": ids}} - tenant_id = get_current_tenant_id() - if tenant_id: - query["tenant_id"] = tenant_id + cursor = self._data.find({"_id": {"$in": ids}}) + docs = await cursor.to_list(length=None) - cursor = self._data.find(query) - docs = await cursor.to_list() - # Ensure time fields are present for all documents + doc_map: dict[str, dict[str, Any]] = {} for doc in docs: + if not doc: + continue doc.setdefault("create_time", 0) doc.setdefault("update_time", 0) - return docs + doc_map[str(doc.get("_id"))] = doc + + ordered_results: list[dict[str, Any] | None] = [] + for id_value in ids: + ordered_results.append(doc_map.get(str(id_value))) + return ordered_results async def filter_keys(self, keys: set[str]) -> set[str]: - query = {"_id": {"$in": list(keys)}} - tenant_id = get_current_tenant_id() - if tenant_id: - query["tenant_id"] = tenant_id - - cursor = self._data.find(query, {"_id": 1}) + cursor = self._data.find({"_id": {"$in": list(keys)}}, {"_id": 1}) existing_ids = {str(x["_id"]) async for x in cursor} return keys - existing_ids @@ -195,12 +181,7 @@ class MongoKVStorage(BaseKVStorage): Returns: Dictionary containing all stored data """ - query = {} - tenant_id = get_current_tenant_id() - if tenant_id: - query["tenant_id"] = tenant_id - - cursor = self._data.find(query) + cursor = self._data.find({}) result = {} async for doc in cursor: doc_id = doc.pop("_id") @@ -220,7 +201,6 @@ class MongoKVStorage(BaseKVStorage): operations = [] current_time = int(time.time()) # Get current Unix timestamp - tenant_id = get_current_tenant_id() for k, v in data.items(): # For text_chunks namespace, ensure llm_cache_list field exists @@ -233,9 +213,6 @@ class MongoKVStorage(BaseKVStorage): v_for_set["_id"] = k # Use flattened key as _id v_for_set["update_time"] = current_time # Always update update_time - if tenant_id: - v_for_set["tenant_id"] = tenant_id - # Remove create_time from $set to avoid conflict with $setOnInsert v_for_set.pop("create_time", None) @@ -273,12 +250,7 @@ class MongoKVStorage(BaseKVStorage): ids = list(ids) try: - query = {"_id": {"$in": ids}} - tenant_id = get_current_tenant_id() - if tenant_id: - query["tenant_id"] = tenant_id - - result = await self._data.delete_many(query) + result = await self._data.delete_many({"_id": {"$in": ids}}) logger.info( f"[{self.workspace}] Deleted {result.deleted_count} documents from {self.namespace}" ) @@ -372,21 +344,15 @@ class MongoDocStatusStorage(DocStatusStorage): # Build final_namespace with workspace prefix for data isolation # Keep original namespace unchanged for type detection logic if effective_workspace: + self.final_namespace = f"{effective_workspace}_{self.namespace}" self.workspace = effective_workspace - else: - self.workspace = "_" - - # Get composite workspace (supports multi-tenant isolation) - composite_workspace = self._get_composite_workspace() - - if composite_workspace and composite_workspace != "_": - self.final_namespace = f"{composite_workspace}_{self.namespace}" logger.debug( - f"Final namespace with composite workspace: '{self.final_namespace}'" + f"Final namespace with workspace prefix: '{self.final_namespace}'" ) else: # When workspace is empty, final_namespace equals original namespace self.final_namespace = self.namespace + self.workspace = "_" logger.debug(f"Final namespace (no workspace): '{self.final_namespace}'") self._collection_name = self.final_namespace @@ -413,26 +379,25 @@ class MongoDocStatusStorage(DocStatusStorage): self._data = None async def get_by_id(self, id: str) -> Union[dict[str, Any], None]: - query = {"_id": id} - tenant_id = get_current_tenant_id() - if tenant_id: - query["tenant_id"] = tenant_id - return await self._data.find_one(query) + return await self._data.find_one({"_id": id}) async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: - query = {"_id": {"$in": ids}} - tenant_id = get_current_tenant_id() - if tenant_id: - query["tenant_id"] = tenant_id - cursor = self._data.find(query) - return await cursor.to_list() + cursor = self._data.find({"_id": {"$in": ids}}) + docs = await cursor.to_list(length=None) + + doc_map: dict[str, dict[str, Any]] = {} + for doc in docs: + if not doc: + continue + doc_map[str(doc.get("_id"))] = doc + + ordered_results: list[dict[str, Any] | None] = [] + for id_value in ids: + ordered_results.append(doc_map.get(str(id_value))) + return ordered_results async def filter_keys(self, data: set[str]) -> set[str]: - query = {"_id": {"$in": list(data)}} - tenant_id = get_current_tenant_id() - if tenant_id: - query["tenant_id"] = tenant_id - cursor = self._data.find(query, {"_id": 1}) + cursor = self._data.find({"_id": {"$in": list(data)}}, {"_id": 1}) existing_ids = {str(x["_id"]) async for x in cursor} return data - existing_ids @@ -441,14 +406,11 @@ class MongoDocStatusStorage(DocStatusStorage): if not data: return update_tasks: list[Any] = [] - tenant_id = get_current_tenant_id() for k, v in data.items(): # Ensure chunks_list field exists and is an array if "chunks_list" not in v: v["chunks_list"] = [] data[k]["_id"] = k - if tenant_id: - data[k]["tenant_id"] = tenant_id update_tasks.append( self._data.update_one({"_id": k}, {"$set": v}, upsert=True) ) @@ -456,16 +418,7 @@ class MongoDocStatusStorage(DocStatusStorage): async def get_status_counts(self) -> dict[str, int]: """Get counts of documents in each status""" - match_stage = {} - tenant_id = get_current_tenant_id() - if tenant_id: - match_stage["tenant_id"] = tenant_id - - pipeline = [] - if match_stage: - pipeline.append({"$match": match_stage}) - pipeline.append({"$group": {"_id": "$status", "count": {"$sum": 1}}}) - + pipeline = [{"$group": {"_id": "$status", "count": {"$sum": 1}}}] cursor = await self._data.aggregate(pipeline, allowDiskUse=True) result = await cursor.to_list() counts = {} @@ -477,12 +430,7 @@ class MongoDocStatusStorage(DocStatusStorage): self, status: DocStatus ) -> dict[str, DocProcessingStatus]: """Get all documents with a specific status""" - query = {"status": status.value} - tenant_id = get_current_tenant_id() - if tenant_id: - query["tenant_id"] = tenant_id - - cursor = self._data.find(query) + cursor = self._data.find({"status": status.value}) result = await cursor.to_list() processed_result = {} for doc in result: @@ -500,12 +448,7 @@ class MongoDocStatusStorage(DocStatusStorage): self, track_id: str ) -> dict[str, DocProcessingStatus]: """Get all documents with a specific track_id""" - query = {"track_id": track_id} - tenant_id = get_current_tenant_id() - if tenant_id: - query["tenant_id"] = tenant_id - - cursor = self._data.find(query) + cursor = self._data.find({"track_id": track_id}) result = await cursor.to_list() processed_result = {} for doc in result: @@ -548,11 +491,7 @@ class MongoDocStatusStorage(DocStatusStorage): return {"status": "error", "message": str(e)} async def delete(self, ids: list[str]) -> None: - query = {"_id": {"$in": ids}} - tenant_id = get_current_tenant_id() - if tenant_id: - query["tenant_id"] = tenant_id - await self._data.delete_many(query) + await self._data.delete_many({"_id": {"$in": ids}}) async def create_and_migrate_indexes_if_not_exists(self): """Create indexes to optimize pagination queries and migrate file_path indexes for Chinese collation""" @@ -583,8 +522,6 @@ class MongoDocStatusStorage(DocStatusStorage): {"name": f"{workspace_prefix}created_at", "keys": [("created_at", -1)]}, {"name": f"{workspace_prefix}id", "keys": [("_id", 1)]}, {"name": f"{workspace_prefix}track_id", "keys": [("track_id", 1)]}, - # External ID index for idempotent ingestion - {"name": f"{workspace_prefix}external_id", "keys": [("external_id", 1)]}, # New file_path indexes with Chinese collation and workspace-specific names { "name": f"{workspace_prefix}file_path_zh_collation", @@ -696,10 +633,6 @@ class MongoDocStatusStorage(DocStatusStorage): if status_filter is not None: query_filter["status"] = status_filter.value - tenant_id = get_current_tenant_id() - if tenant_id: - query_filter["tenant_id"] = tenant_id - # Get total count total_count = await self._data.count_documents(query_filter) @@ -754,16 +687,7 @@ class MongoDocStatusStorage(DocStatusStorage): Returns: Dictionary mapping status names to counts, including 'all' field """ - match_stage = {} - tenant_id = get_current_tenant_id() - if tenant_id: - match_stage["tenant_id"] = tenant_id - - pipeline = [] - if match_stage: - pipeline.append({"$match": match_stage}) - pipeline.append({"$group": {"_id": "$status", "count": {"$sum": 1}}}) - + pipeline = [{"$group": {"_id": "$status", "count": {"$sum": 1}}}] cursor = await self._data.aggregate(pipeline, allowDiskUse=True) result = await cursor.to_list() @@ -788,29 +712,7 @@ class MongoDocStatusStorage(DocStatusStorage): Union[dict[str, Any], None]: Document data if found, None otherwise Returns the same format as get_by_id method """ - query = {"file_path": file_path} - tenant_id = get_current_tenant_id() - if tenant_id: - query["tenant_id"] = tenant_id - return await self._data.find_one(query) - - async def get_doc_by_external_id( - self, external_id: str - ) -> Union[dict[str, Any], None]: - """Get document by external ID for idempotency checks. - - Args: - external_id: The external ID to search for (client-provided unique identifier) - - Returns: - Union[dict[str, Any], None]: Document data if found, None otherwise - Returns the same format as get_by_id method - """ - query = {"external_id": external_id} - tenant_id = get_current_tenant_id() - if tenant_id: - query["tenant_id"] = tenant_id - return await self._data.find_one(query) + return await self._data.find_one({"file_path": file_path}) @final @@ -853,21 +755,15 @@ class MongoGraphStorage(BaseGraphStorage): # Build final_namespace with workspace prefix for data isolation # Keep original namespace unchanged for type detection logic if effective_workspace: + self.final_namespace = f"{effective_workspace}_{self.namespace}" self.workspace = effective_workspace - else: - self.workspace = "_" - - # Get composite workspace (supports multi-tenant isolation) - composite_workspace = self._get_composite_workspace() - - if composite_workspace and composite_workspace != "_": - self.final_namespace = f"{composite_workspace}_{self.namespace}" logger.debug( - f"Final namespace with composite workspace: '{self.final_namespace}'" + f"Final namespace with workspace prefix: '{self.final_namespace}'" ) else: # When workspace is empty, final_namespace equals original namespace self.final_namespace = self.namespace + self.workspace = "_" logger.debug(f"Final namespace (no workspace): '{self.final_namespace}'") self._collection_name = self.final_namespace @@ -940,35 +836,26 @@ class MongoGraphStorage(BaseGraphStorage): Check if node_id is present in the collection by looking up its doc. No real need for $graphLookup here, but let's keep it direct. """ - query = {"_id": node_id} - tenant_id = get_current_tenant_id() - if tenant_id: - query["tenant_id"] = tenant_id - doc = await self.collection.find_one(query, {"_id": 1}) + doc = await self.collection.find_one({"_id": node_id}, {"_id": 1}) return doc is not None async def has_edge(self, source_node_id: str, target_node_id: str) -> bool: """ Check if there's a direct single-hop edge between source_node_id and target_node_id. """ - query = { - "$or": [ - { - "source_node_id": source_node_id, - "target_node_id": target_node_id, - }, - { - "source_node_id": target_node_id, - "target_node_id": source_node_id, - }, - ] - } - tenant_id = get_current_tenant_id() - if tenant_id: - query["tenant_id"] = tenant_id - doc = await self.edge_collection.find_one( - query, + { + "$or": [ + { + "source_node_id": source_node_id, + "target_node_id": target_node_id, + }, + { + "source_node_id": target_node_id, + "target_node_id": source_node_id, + }, + ] + }, {"_id": 1}, ) return doc is not None @@ -983,12 +870,9 @@ class MongoGraphStorage(BaseGraphStorage): """ Returns the total number of edges connected to node_id (both inbound and outbound). """ - query = {"$or": [{"source_node_id": node_id}, {"target_node_id": node_id}]} - tenant_id = get_current_tenant_id() - if tenant_id: - query["tenant_id"] = tenant_id - - return await self.edge_collection.count_documents(query) + return await self.edge_collection.count_documents( + {"$or": [{"source_node_id": node_id}, {"target_node_id": node_id}]} + ) async def edge_degree(self, src_id: str, tgt_id: str) -> int: """Get the total degree (sum of relationships) of two nodes. @@ -1015,31 +899,25 @@ class MongoGraphStorage(BaseGraphStorage): """ Return the full node document, or None if missing. """ - query = {"_id": node_id} - tenant_id = get_current_tenant_id() - if tenant_id: - query["tenant_id"] = tenant_id - return await self.collection.find_one(query) + return await self.collection.find_one({"_id": node_id}) async def get_edge( self, source_node_id: str, target_node_id: str ) -> dict[str, str] | None: - query = { - "$or": [ - { - "source_node_id": source_node_id, - "target_node_id": target_node_id, - }, - { - "source_node_id": target_node_id, - "target_node_id": source_node_id, - }, - ] - } - tenant_id = get_current_tenant_id() - if tenant_id: - query["tenant_id"] = tenant_id - return await self.edge_collection.find_one(query) + return await self.edge_collection.find_one( + { + "$or": [ + { + "source_node_id": source_node_id, + "target_node_id": target_node_id, + }, + { + "source_node_id": target_node_id, + "target_node_id": source_node_id, + }, + ] + } + ) async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None: """ @@ -1052,18 +930,13 @@ class MongoGraphStorage(BaseGraphStorage): list[tuple[str, str]]: List of (source_label, target_label) tuples representing edges None: If no edges found """ - query = { - "$or": [ - {"source_node_id": source_node_id}, - {"target_node_id": source_node_id}, - ] - } - tenant_id = get_current_tenant_id() - if tenant_id: - query["tenant_id"] = tenant_id - cursor = self.edge_collection.find( - query, + { + "$or": [ + {"source_node_id": source_node_id}, + {"target_node_id": source_node_id}, + ] + }, {"source_node_id": 1, "target_node_id": 1}, ) @@ -1073,29 +946,18 @@ class MongoGraphStorage(BaseGraphStorage): async def get_nodes_batch(self, node_ids: list[str]) -> dict[str, dict]: result = {} - query = {"_id": {"$in": node_ids}} - tenant_id = get_current_tenant_id() - if tenant_id: - query["tenant_id"] = tenant_id - async for doc in self.collection.find(query): + async for doc in self.collection.find({"_id": {"$in": node_ids}}): result[doc.get("_id")] = doc return result async def node_degrees_batch(self, node_ids: list[str]) -> dict[str, int]: # merge the outbound and inbound results with the same "_id" and sum the "degree" merged_results = {} - tenant_id = get_current_tenant_id() - match_stage_out = {"source_node_id": {"$in": node_ids}} - match_stage_in = {"target_node_id": {"$in": node_ids}} - - if tenant_id: - match_stage_out["tenant_id"] = tenant_id - match_stage_in["tenant_id"] = tenant_id # Outbound degrees outbound_pipeline = [ - {"$match": match_stage_out}, + {"$match": {"source_node_id": {"$in": node_ids}}}, {"$group": {"_id": "$source_node_id", "degree": {"$sum": 1}}}, ] @@ -1107,7 +969,7 @@ class MongoGraphStorage(BaseGraphStorage): # Inbound degrees inbound_pipeline = [ - {"$match": match_stage_in}, + {"$match": {"target_node_id": {"$in": node_ids}}}, {"$group": {"_id": "$target_node_id", "degree": {"$sum": 1}}}, ] @@ -1139,18 +1001,10 @@ class MongoGraphStorage(BaseGraphStorage): - Incoming edges: (connected_node, queried_node) """ result = {node_id: [] for node_id in node_ids} - tenant_id = get_current_tenant_id() - - query_out = {"source_node_id": {"$in": node_ids}} - query_in = {"target_node_id": {"$in": node_ids}} - - if tenant_id: - query_out["tenant_id"] = tenant_id - query_in["tenant_id"] = tenant_id # Query outgoing edges (where node is the source) outgoing_cursor = self.edge_collection.find( - query_out, + {"source_node_id": {"$in": node_ids}}, {"source_node_id": 1, "target_node_id": 1}, ) async for edge in outgoing_cursor: @@ -1160,7 +1014,7 @@ class MongoGraphStorage(BaseGraphStorage): # Query incoming edges (where node is the target) incoming_cursor = self.edge_collection.find( - query_in, + {"target_node_id": {"$in": node_ids}}, {"source_node_id": 1, "target_node_id": 1}, ) async for edge in incoming_cursor: @@ -1183,12 +1037,7 @@ class MongoGraphStorage(BaseGraphStorage): if not chunk_ids: return [] - query = {"source_ids": {"$in": chunk_ids}} - tenant_id = get_current_tenant_id() - if tenant_id: - query["tenant_id"] = tenant_id - - cursor = self.collection.find(query) + cursor = self.collection.find({"source_ids": {"$in": chunk_ids}}) return [doc async for doc in cursor] async def get_edges_by_chunk_ids(self, chunk_ids: list[str]) -> list[dict]: @@ -1204,12 +1053,7 @@ class MongoGraphStorage(BaseGraphStorage): if not chunk_ids: return [] - query = {"source_ids": {"$in": chunk_ids}} - tenant_id = get_current_tenant_id() - if tenant_id: - query["tenant_id"] = tenant_id - - cursor = self.edge_collection.find(query) + cursor = self.edge_collection.find({"source_ids": {"$in": chunk_ids}}) edges = [] async for edge in cursor: @@ -1234,10 +1078,6 @@ class MongoGraphStorage(BaseGraphStorage): update_doc["$set"]["source_ids"] = node_data["source_id"].split( GRAPH_FIELD_SEP ) - - tenant_id = get_current_tenant_id() - if tenant_id: - update_doc["$set"]["tenant_id"] = tenant_id await self.collection.update_one({"_id": node_id}, update_doc, upsert=True) @@ -1256,10 +1096,6 @@ class MongoGraphStorage(BaseGraphStorage): update_doc["$set"]["source_ids"] = edge_data["source_id"].split( GRAPH_FIELD_SEP ) - - tenant_id = get_current_tenant_id() - if tenant_id: - update_doc["$set"]["tenant_id"] = tenant_id edge_data["source_node_id"] = source_node_id edge_data["target_node_id"] = target_node_id @@ -1292,20 +1128,13 @@ class MongoGraphStorage(BaseGraphStorage): 1) Remove node's doc entirely. 2) Remove inbound & outbound edges from any doc that references node_id. """ - tenant_id = get_current_tenant_id() - - edge_query = {"$or": [{"source_node_id": node_id}, {"target_node_id": node_id}]} - node_query = {"_id": node_id} - - if tenant_id: - edge_query["tenant_id"] = tenant_id - node_query["tenant_id"] = tenant_id - # Remove all edges - await self.edge_collection.delete_many(edge_query) + await self.edge_collection.delete_many( + {"$or": [{"source_node_id": node_id}, {"target_node_id": node_id}]} + ) # Remove the node doc - await self.collection.delete_one(node_query) + await self.collection.delete_one({"_id": node_id}) # # ------------------------------------------------------------------------- @@ -1319,18 +1148,9 @@ class MongoGraphStorage(BaseGraphStorage): Returns: [id1, id2, ...] # Alphabetically sorted id list """ - match_stage = {} - tenant_id = get_current_tenant_id() - if tenant_id: - match_stage["tenant_id"] = tenant_id # Use aggregation with allowDiskUse for large datasets - pipeline = [] - if match_stage: - pipeline.append({"$match": match_stage}) - - pipeline.extend([{"$project": {"_id": 1}}, {"$sort": {"_id": 1}}]) - + pipeline = [{"$project": {"_id": 1}}, {"$sort": {"_id": 1}}] cursor = await self.collection.aggregate(pipeline, allowDiskUse=True) labels = [] async for doc in cursor: @@ -1383,23 +1203,15 @@ class MongoGraphStorage(BaseGraphStorage): It's possible that the node with one or multiple relationships is retrieved, while its neighbor is not. Then this node might seem like disconnected in UI. """ - tenant_id = get_current_tenant_id() - match_stage = {} - if tenant_id: - match_stage["tenant_id"] = tenant_id - total_node_count = await self.collection.count_documents(match_stage) + total_node_count = await self.collection.count_documents({}) result = KnowledgeGraph() seen_edges = set() result.is_truncated = total_node_count > max_nodes if result.is_truncated: # Get all node_ids ranked by degree if max_nodes exceeds total node count - pipeline = [] - if match_stage: - pipeline.append({"$match": match_stage}) - - pipeline.extend([ + pipeline = [ {"$project": {"source_node_id": 1, "_id": 0}}, {"$group": {"_id": "$source_node_id", "degree": {"$sum": 1}}}, { @@ -1419,16 +1231,7 @@ class MongoGraphStorage(BaseGraphStorage): {"$group": {"_id": "$_id", "degree": {"$sum": "$degree"}}}, {"$sort": {"degree": -1}}, {"$limit": max_nodes}, - ]) - - # If tenant_id is present, we need to inject it into the unionWith pipeline as well - if tenant_id: - # Find the unionWith stage - for stage in pipeline: - if "$unionWith" in stage: - # Add match stage to the beginning of the unionWith pipeline - stage["$unionWith"]["pipeline"].insert(0, {"$match": {"tenant_id": tenant_id}}) - + ] cursor = await self.edge_collection.aggregate(pipeline, allowDiskUse=True) node_ids = [] @@ -1436,40 +1239,28 @@ class MongoGraphStorage(BaseGraphStorage): node_id = str(doc["_id"]) node_ids.append(node_id) - node_query = {"_id": {"$in": node_ids}} - if tenant_id: - node_query["tenant_id"] = tenant_id - - cursor = self.collection.find(node_query, {"source_ids": 0}) + cursor = self.collection.find({"_id": {"$in": node_ids}}, {"source_ids": 0}) async for doc in cursor: result.nodes.append(self._construct_graph_node(doc["_id"], doc)) # As node count reaches the limit, only need to fetch the edges that directly connect to these nodes - edge_query = { - "$and": [ - {"source_node_id": {"$in": node_ids}}, - {"target_node_id": {"$in": node_ids}}, - ] - } - if tenant_id: - edge_query["tenant_id"] = tenant_id - - edge_cursor = self.edge_collection.find(edge_query) + edge_cursor = self.edge_collection.find( + { + "$and": [ + {"source_node_id": {"$in": node_ids}}, + {"target_node_id": {"$in": node_ids}}, + ] + } + ) else: # All nodes and edges are needed - node_query = {} - edge_query = {} - if tenant_id: - node_query["tenant_id"] = tenant_id - edge_query["tenant_id"] = tenant_id - - cursor = self.collection.find(node_query, {"source_ids": 0}) + cursor = self.collection.find({}, {"source_ids": 0}) async for doc in cursor: node_id = str(doc["_id"]) result.nodes.append(self._construct_graph_node(doc["_id"], doc)) - edge_cursor = self.edge_collection.find(edge_query) + edge_cursor = self.edge_collection.find({}) async for edge in edge_cursor: edge_id = f"{edge['source_node_id']}-{edge['target_node_id']}" @@ -1491,13 +1282,7 @@ class MongoGraphStorage(BaseGraphStorage): if depth > max_depth or len(result.nodes) > max_nodes: return result - tenant_id = get_current_tenant_id() - - node_query = {"_id": {"$in": node_labels}} - if tenant_id: - node_query["tenant_id"] = tenant_id - - cursor = self.collection.find(node_query) + cursor = self.collection.find({"_id": {"$in": node_labels}}) async for node in cursor: node_id = node["_id"] @@ -1509,16 +1294,14 @@ class MongoGraphStorage(BaseGraphStorage): # Collect neighbors # Get both inbound and outbound one hop nodes - edge_query = { - "$or": [ - {"source_node_id": {"$in": node_labels}}, - {"target_node_id": {"$in": node_labels}}, - ] - } - if tenant_id: - edge_query["tenant_id"] = tenant_id - - cursor = self.edge_collection.find(edge_query) + cursor = self.edge_collection.find( + { + "$or": [ + {"source_node_id": {"$in": node_labels}}, + {"target_node_id": {"$in": node_labels}}, + ] + } + ) neighbor_nodes = [] async for edge in cursor: @@ -1551,18 +1334,14 @@ class MongoGraphStorage(BaseGraphStorage): # Get all edges from seen_nodes all_node_ids = list(seen_nodes) - tenant_id = get_current_tenant_id() - - edge_query = { - "$and": [ - {"source_node_id": {"$in": all_node_ids}}, - {"target_node_id": {"$in": all_node_ids}}, - ] - } - if tenant_id: - edge_query["tenant_id"] = tenant_id - - cursor = self.edge_collection.find(edge_query) + cursor = self.edge_collection.find( + { + "$and": [ + {"source_node_id": {"$in": all_node_ids}}, + {"target_node_id": {"$in": all_node_ids}}, + ] + } + ) async for edge in cursor: edge_id = f"{edge['source_node_id']}-{edge['target_node_id']}" @@ -1584,14 +1363,9 @@ class MongoGraphStorage(BaseGraphStorage): "entity_type": 0, "file_path": 0, } - - tenant_id = get_current_tenant_id() - node_query = {"_id": node_label} - if tenant_id: - node_query["tenant_id"] = tenant_id # Verify if starting node exists - start_node = await self.collection.find_one(node_query) + start_node = await self.collection.find_one({"_id": node_label}) if not start_node: logger.warning( f"[{self.workspace}] Starting node with label {node_label} does not exist!" @@ -1606,14 +1380,9 @@ class MongoGraphStorage(BaseGraphStorage): # In MongoDB, depth = 0 means one-hop max_depth = max_depth - 1 - - # Prepare match stage for graphLookup - restrict_search_with_match = {} - if tenant_id: - restrict_search_with_match = {"tenant_id": tenant_id} pipeline = [ - {"$match": node_query}, + {"$match": {"_id": node_label}}, {"$project": project_doc}, { "$graphLookup": { @@ -1624,14 +1393,13 @@ class MongoGraphStorage(BaseGraphStorage): "maxDepth": max_depth, "depthField": "depth", "as": "connected_edges", - "restrictSearchWithMatch": restrict_search_with_match if restrict_search_with_match else {} }, }, { "$unionWith": { "coll": self._collection_name, "pipeline": [ - {"$match": node_query}, + {"$match": {"_id": node_label}}, {"$project": project_doc}, { "$graphLookup": { @@ -1642,7 +1410,6 @@ class MongoGraphStorage(BaseGraphStorage): "maxDepth": max_depth, "depthField": "depth", "as": "connected_edges", - "restrictSearchWithMatch": restrict_search_with_match if restrict_search_with_match else {} } }, ], @@ -1678,11 +1445,7 @@ class MongoGraphStorage(BaseGraphStorage): seen_nodes.add(edge["target_node_id"]) # Filter out all the node whose id is same as node_label so that we do not check existence next step - final_node_query = {"_id": {"$in": node_ids}} - if tenant_id: - final_node_query["tenant_id"] = tenant_id - - cursor = self.collection.find(final_node_query) + cursor = self.collection.find({"_id": {"$in": node_ids}}) async for doc in cursor: result.nodes.append(self._construct_graph_node(str(doc["_id"]), doc)) @@ -1808,25 +1571,18 @@ class MongoGraphStorage(BaseGraphStorage): if not nodes: return - tenant_id = get_current_tenant_id() - - edge_query = { - "$or": [ - {"source_node_id": {"$in": nodes}}, - {"target_node_id": {"$in": nodes}}, - ] - } - node_query = {"_id": {"$in": nodes}} - - if tenant_id: - edge_query["tenant_id"] = tenant_id - node_query["tenant_id"] = tenant_id - # 1. Remove all edges referencing these nodes - await self.edge_collection.delete_many(edge_query) + await self.edge_collection.delete_many( + { + "$or": [ + {"source_node_id": {"$in": nodes}}, + {"target_node_id": {"$in": nodes}}, + ] + } + ) # 2. Delete the node documents - await self.collection.delete_many(node_query) + await self.collection.delete_many({"_id": {"$in": nodes}}) logger.debug(f"[{self.workspace}] Successfully deleted nodes: {nodes}") @@ -1849,12 +1605,7 @@ class MongoGraphStorage(BaseGraphStorage): {"source_node_id": target_id, "target_node_id": source_id} ) - query = {"$or": all_edge_pairs} - tenant_id = get_current_tenant_id() - if tenant_id: - query["tenant_id"] = tenant_id - - await self.edge_collection.delete_many(query) + await self.edge_collection.delete_many({"$or": all_edge_pairs}) logger.debug(f"[{self.workspace}] Successfully deleted edges: {edges}") @@ -1864,12 +1615,7 @@ class MongoGraphStorage(BaseGraphStorage): Returns: A list of all nodes, where each node is a dictionary of its properties """ - query = {} - tenant_id = get_current_tenant_id() - if tenant_id: - query["tenant_id"] = tenant_id - - cursor = self.collection.find(query) + cursor = self.collection.find({}) nodes = [] async for node in cursor: node_dict = dict(node) @@ -1884,12 +1630,7 @@ class MongoGraphStorage(BaseGraphStorage): Returns: A list of all edges, where each edge is a dictionary of its properties """ - query = {} - tenant_id = get_current_tenant_id() - if tenant_id: - query["tenant_id"] = tenant_id - - cursor = self.edge_collection.find(query) + cursor = self.edge_collection.find({}) edges = [] async for edge in cursor: edge_dict = dict(edge) @@ -1908,17 +1649,8 @@ class MongoGraphStorage(BaseGraphStorage): List of labels sorted by degree (highest first) """ try: - tenant_id = get_current_tenant_id() - match_stage = {} - if tenant_id: - match_stage["tenant_id"] = tenant_id - # Use aggregation pipeline to count edges per node and sort by degree - pipeline = [] - if match_stage: - pipeline.append({"$match": match_stage}) - - pipeline.extend([ + pipeline = [ # Count outbound edges {"$group": {"_id": "$source_node_id", "out_degree": {"$sum": 1}}}, # Union with inbound edges count @@ -1955,15 +1687,7 @@ class MongoGraphStorage(BaseGraphStorage): {"$limit": limit}, # Project only the label {"$project": {"_id": 1}}, - ]) - - # If tenant_id is present, we need to inject it into the unionWith pipeline as well - if tenant_id: - # Find the unionWith stage - for stage in pipeline: - if "$unionWith" in stage: - # Add match stage to the beginning of the unionWith pipeline - stage["$unionWith"]["pipeline"].insert(0, {"$match": {"tenant_id": tenant_id}}) + ] cursor = await self.edge_collection.aggregate(pipeline, allowDiskUse=True) labels = [] @@ -1982,29 +1706,16 @@ class MongoGraphStorage(BaseGraphStorage): async def _try_atlas_text_search(self, query_strip: str, limit: int) -> list[str]: """Try Atlas Search using simple text search.""" try: - tenant_id = get_current_tenant_id() - - # Note: Atlas Search $search stage must be the first stage in the pipeline - # We cannot filter by tenant_id inside $search unless we index it - # For now, we will filter after the search, which might be less efficient but correct - pipeline = [ { "$search": { "index": "entity_id_search_idx", "text": {"query": query_strip, "path": "_id"}, } - } - ] - - if tenant_id: - pipeline.append({"$match": {"tenant_id": tenant_id}}) - - pipeline.extend([ + }, {"$project": {"_id": 1, "score": {"$meta": "searchScore"}}}, {"$limit": limit}, - ]) - + ] cursor = await self.collection.aggregate(pipeline) labels = [doc["_id"] async for doc in cursor if doc.get("_id")] if labels: @@ -2022,8 +1733,6 @@ class MongoGraphStorage(BaseGraphStorage): ) -> list[str]: """Try Atlas Search using autocomplete for prefix matching.""" try: - tenant_id = get_current_tenant_id() - pipeline = [ { "$search": { @@ -2034,17 +1743,10 @@ class MongoGraphStorage(BaseGraphStorage): "fuzzy": {"maxEdits": 1, "prefixLength": 1}, }, } - } - ] - - if tenant_id: - pipeline.append({"$match": {"tenant_id": tenant_id}}) - - pipeline.extend([ + }, {"$project": {"_id": 1, "score": {"$meta": "searchScore"}}}, {"$limit": limit}, - ]) - + ] cursor = await self.collection.aggregate(pipeline) labels = [doc["_id"] async for doc in cursor if doc.get("_id")] if labels: @@ -2062,8 +1764,6 @@ class MongoGraphStorage(BaseGraphStorage): ) -> list[str]: """Try Atlas Search using compound query for comprehensive matching.""" try: - tenant_id = get_current_tenant_id() - pipeline = [ { "$search": { @@ -2096,18 +1796,11 @@ class MongoGraphStorage(BaseGraphStorage): "minimumShouldMatch": 1, }, } - } - ] - - if tenant_id: - pipeline.append({"$match": {"tenant_id": tenant_id}}) - - pipeline.extend([ + }, {"$project": {"_id": 1, "score": {"$meta": "searchScore"}}}, {"$sort": {"score": {"$meta": "searchScore"}}}, {"$limit": limit}, - ]) - + ] cursor = await self.collection.aggregate(pipeline) labels = [doc["_id"] async for doc in cursor if doc.get("_id")] if labels: @@ -2129,11 +1822,6 @@ class MongoGraphStorage(BaseGraphStorage): escaped_query = re.escape(query_strip) regex_condition = {"_id": {"$regex": escaped_query, "$options": "i"}} - - tenant_id = get_current_tenant_id() - if tenant_id: - regex_condition["tenant_id"] = tenant_id - cursor = self.collection.find(regex_condition, {"_id": 1}).limit(limit * 2) docs = await cursor.to_list(length=limit * 2) @@ -2185,12 +1873,7 @@ class MongoGraphStorage(BaseGraphStorage): # First check if we have any nodes at all try: - count_query = {} - tenant_id = get_current_tenant_id() - if tenant_id: - count_query["tenant_id"] = tenant_id - - node_count = await self.collection.count_documents(count_query) + node_count = await self.collection.count_documents({}) if node_count == 0: logger.debug( f"[{self.workspace}] No nodes found in collection {self._collection_name}" @@ -2739,15 +2422,20 @@ class MongoVectorDBStorage(BaseVectorStorage): cursor = self._data.find({"_id": {"$in": ids}}) results = await cursor.to_list(length=None) - # Format results to include id field expected by API - formatted_results = [] + # Format results to include id field expected by API and preserve ordering + formatted_map: dict[str, dict[str, Any]] = {} for result in results: result_dict = dict(result) if "_id" in result_dict and "id" not in result_dict: result_dict["id"] = result_dict["_id"] - formatted_results.append(result_dict) + key = str(result_dict.get("id", result_dict.get("_id"))) + formatted_map[key] = result_dict - return formatted_results + ordered_results: list[dict[str, Any] | None] = [] + for id_value in ids: + ordered_results.append(formatted_map.get(str(id_value))) + + return ordered_results except Exception as e: logger.error( f"[{self.workspace}] Error retrieving vector data for IDs {ids}: {e}" diff --git a/lightrag/kg/nano_vector_db_impl.py b/lightrag/kg/nano_vector_db_impl.py index d806710b..d54bb56f 100644 --- a/lightrag/kg/nano_vector_db_impl.py +++ b/lightrag/kg/nano_vector_db_impl.py @@ -332,14 +332,25 @@ class NanoVectorDBStorage(BaseVectorStorage): client = await self._get_client() results = client.get(ids) - return [ - { + result_map: dict[str, dict[str, Any]] = {} + + for dp in results: + if not dp: + continue + record = { **{k: v for k, v in dp.items() if k != "vector"}, "id": dp.get("__id__"), "created_at": dp.get("__created_at__"), } - for dp in results - ] + key = record.get("id") + if key is not None: + result_map[str(key)] = record + + ordered_results: list[dict[str, Any] | None] = [] + for requested_id in ids: + ordered_results.append(result_map.get(str(requested_id))) + + return ordered_results async def get_vectors_by_ids(self, ids: list[str]) -> dict[str, list[float]]: """Get vectors by their IDs, returning only ID and vector data for efficiency diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 703730ab..810f1aa8 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -1819,6 +1819,26 @@ class PGKVStorage(BaseKVStorage): params = {"workspace": self.workspace} results = await self.db.query(sql, list(params.values()), multirows=True) + def _order_results( + rows: list[dict[str, Any]] | None, + ) -> list[dict[str, Any] | None]: + """Preserve the caller requested ordering for bulk id lookups.""" + if not rows: + return [None for _ in ids] + + id_map: dict[str, dict[str, Any]] = {} + for row in rows: + if row is None: + continue + row_id = row.get("id") + if row_id is not None: + id_map[str(row_id)] = row + + ordered: list[dict[str, Any] | None] = [] + for requested_id in ids: + ordered.append(id_map.get(str(requested_id))) + return ordered + if results and is_namespace(self.namespace, NameSpace.KV_STORE_TEXT_CHUNKS): # Parse llm_cache_list JSON string back to list for each result for result in results: @@ -1861,7 +1881,7 @@ class PGKVStorage(BaseKVStorage): "update_time": create_time if update_time == 0 else update_time, } processed_results.append(processed_row) - return processed_results + return _order_results(processed_results) # Special handling for FULL_ENTITIES namespace if results and is_namespace(self.namespace, NameSpace.KV_STORE_FULL_ENTITIES): @@ -1895,7 +1915,7 @@ class PGKVStorage(BaseKVStorage): result["create_time"] = create_time result["update_time"] = create_time if update_time == 0 else update_time - return results if results else [] + return _order_results(results) async def filter_keys(self, keys: set[str]) -> set[str]: """Filter out duplicated content""" @@ -2353,7 +2373,23 @@ class PGVectorStorage(BaseVectorStorage): try: results = await self.db.query(query, list(params.values()), multirows=True) - return [dict(record) for record in results] + if not results: + return [] + + # Preserve caller requested ordering while normalizing asyncpg rows to dicts. + id_map: dict[str, dict[str, Any]] = {} + for record in results: + if record is None: + continue + record_dict = dict(record) + row_id = record_dict.get("id") + if row_id is not None: + id_map[str(row_id)] = record_dict + + ordered_results: list[dict[str, Any] | None] = [] + for requested_id in ids: + ordered_results.append(id_map.get(str(requested_id))) + return ordered_results except Exception as e: logger.error( f"[{self.workspace}] Error retrieving vector data for IDs {ids}: {e}" @@ -2541,7 +2577,7 @@ class PGDocStatusStorage(DocStatusStorage): if not results: return [] - processed_results = [] + processed_map: dict[str, dict[str, Any]] = {} for row in results: # Parse chunks_list JSON string back to list chunks_list = row.get("chunks_list", []) @@ -2563,23 +2599,25 @@ class PGDocStatusStorage(DocStatusStorage): created_at = self._format_datetime_with_timezone(row["created_at"]) updated_at = self._format_datetime_with_timezone(row["updated_at"]) - processed_results.append( - { - "content_length": row["content_length"], - "content_summary": row["content_summary"], - "status": row["status"], - "chunks_count": row["chunks_count"], - "created_at": created_at, - "updated_at": updated_at, - "file_path": row["file_path"], - "chunks_list": chunks_list, - "metadata": metadata, - "error_msg": row.get("error_msg"), - "track_id": row.get("track_id"), - } - ) + processed_map[str(row.get("id"))] = { + "content_length": row["content_length"], + "content_summary": row["content_summary"], + "status": row["status"], + "chunks_count": row["chunks_count"], + "created_at": created_at, + "updated_at": updated_at, + "file_path": row["file_path"], + "chunks_list": chunks_list, + "metadata": metadata, + "error_msg": row.get("error_msg"), + "track_id": row.get("track_id"), + } - return processed_results + ordered_results: list[dict[str, Any] | None] = [] + for requested_id in ids: + ordered_results.append(processed_map.get(str(requested_id))) + + return ordered_results async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]: """Get document by file path diff --git a/lightrag/kg/qdrant_impl.py b/lightrag/kg/qdrant_impl.py index 7d584665..cd66d88e 100644 --- a/lightrag/kg/qdrant_impl.py +++ b/lightrag/kg/qdrant_impl.py @@ -409,15 +409,31 @@ class QdrantVectorDBStorage(BaseVectorStorage): with_payload=True, ) - # Ensure each result contains created_at field - payloads = [] + # Ensure each result contains created_at field and preserve caller ordering + payload_by_original_id: dict[str, dict[str, Any]] = {} + payload_by_qdrant_id: dict[str, dict[str, Any]] = {} + for point in results: - payload = point.payload + payload = dict(point.payload or {}) if "created_at" not in payload: payload["created_at"] = None - payloads.append(payload) - return payloads + qdrant_point_id = str(point.id) if point.id is not None else "" + if qdrant_point_id: + payload_by_qdrant_id[qdrant_point_id] = payload + + original_id = payload.get("id") + if original_id is not None: + payload_by_original_id[str(original_id)] = payload + + ordered_payloads: list[dict[str, Any] | None] = [] + for requested_id, qdrant_id in zip(ids, qdrant_ids): + payload = payload_by_original_id.get(str(requested_id)) + if payload is None: + payload = payload_by_qdrant_id.get(str(qdrant_id)) + ordered_payloads.append(payload) + + return ordered_payloads except Exception as e: logger.error( f"[{self.workspace}] Error retrieving vector data for IDs {ids}: {e}" diff --git a/lightrag/kg/redis_impl.py b/lightrag/kg/redis_impl.py index 549e8438..13768385 100644 --- a/lightrag/kg/redis_impl.py +++ b/lightrag/kg/redis_impl.py @@ -736,7 +736,7 @@ class RedisDocStatusStorage(DocStatusStorage): return set(keys) - existing_ids async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: - result: list[dict[str, Any]] = [] + ordered_results: list[dict[str, Any] | None] = [] async with self._get_redis_connection() as redis: try: pipe = redis.pipeline() @@ -747,15 +747,17 @@ class RedisDocStatusStorage(DocStatusStorage): for result_data in results: if result_data: try: - result.append(json.loads(result_data)) + ordered_results.append(json.loads(result_data)) except json.JSONDecodeError as e: logger.error( f"[{self.workspace}] JSON decode error in get_by_ids: {e}" ) - continue + ordered_results.append(None) + else: + ordered_results.append(None) except Exception as e: logger.error(f"[{self.workspace}] Error in get_by_ids: {e}") - return result + return ordered_results async def get_status_counts(self) -> dict[str, int]: """Get counts of documents in each status"""