From 9fae0eadff774980d7a820325b03aff7c3771156 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 23 Jun 2025 09:57:56 +0800 Subject: [PATCH] feat: Ensure thread safety for graph write operations Add a lock to delete, adelete_by_entity, and adelete_by_relation methods to prevent race conditions and ensure data consistency during concurrent modifications to the knowledge graph. --- lightrag/lightrag.py | 229 +++++++++++++++++++++++-------------------- 1 file changed, 120 insertions(+), 109 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index d299080a..c16b65f3 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -35,6 +35,7 @@ from lightrag.kg import ( from lightrag.kg.shared_storage import ( get_namespace_data, get_pipeline_status_lock, + get_graph_db_lock, ) from .base import ( @@ -1728,109 +1729,113 @@ class LightRAG: relationships_to_delete = set() relationships_to_rebuild = {} # (src, tgt) -> remaining_chunk_ids - # Process entities - all_labels = await self.chunk_entity_relation_graph.get_all_labels() - for node_label in all_labels: - node_data = await self.chunk_entity_relation_graph.get_node(node_label) - if node_data and "source_id" in node_data: - # Split source_id using GRAPH_FIELD_SEP - sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP)) - remaining_sources = sources - chunk_ids + # Use graph database lock to ensure atomic merges and updates + graph_db_lock = get_graph_db_lock(enable_logging=False) + async with graph_db_lock: - if not remaining_sources: - entities_to_delete.add(node_label) - logger.debug( - f"Entity {node_label} marked for deletion - no remaining sources" - ) - elif remaining_sources != sources: - # Entity needs to be rebuilt from remaining chunks - entities_to_rebuild[node_label] = remaining_sources - logger.debug( - f"Entity {node_label} will be rebuilt from {len(remaining_sources)} remaining chunks" - ) + # Process entities + all_labels = await self.chunk_entity_relation_graph.get_all_labels() + for node_label in all_labels: + node_data = await self.chunk_entity_relation_graph.get_node(node_label) + if node_data and "source_id" in node_data: + # Split source_id using GRAPH_FIELD_SEP + sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP)) + remaining_sources = sources - chunk_ids - # Process relationships - for node_label in all_labels: - node_edges = await self.chunk_entity_relation_graph.get_node_edges( - node_label - ) - if node_edges: - for src, tgt in node_edges: - edge_data = await self.chunk_entity_relation_graph.get_edge( - src, tgt - ) - if edge_data and "source_id" in edge_data: - # Split source_id using GRAPH_FIELD_SEP - sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP)) - remaining_sources = sources - chunk_ids + if not remaining_sources: + entities_to_delete.add(node_label) + logger.debug( + f"Entity {node_label} marked for deletion - no remaining sources" + ) + elif remaining_sources != sources: + # Entity needs to be rebuilt from remaining chunks + entities_to_rebuild[node_label] = remaining_sources + logger.debug( + f"Entity {node_label} will be rebuilt from {len(remaining_sources)} remaining chunks" + ) - if not remaining_sources: - relationships_to_delete.add((src, tgt)) - logger.debug( - f"Relationship {src}-{tgt} marked for deletion - no remaining sources" - ) - elif remaining_sources != sources: - # Relationship needs to be rebuilt from remaining chunks - relationships_to_rebuild[(src, tgt)] = remaining_sources - logger.debug( - f"Relationship {src}-{tgt} will be rebuilt from {len(remaining_sources)} remaining chunks" - ) - - # 5. Delete chunks from storage - if chunk_ids: - await self.chunks_vdb.delete(chunk_ids) - await self.text_chunks.delete(chunk_ids) - logger.info(f"Deleted {len(chunk_ids)} chunks from storage") - - # 6. Delete entities that have no remaining sources - if entities_to_delete: - # Delete from vector database - entity_vdb_ids = [ - compute_mdhash_id(entity, prefix="ent-") - for entity in entities_to_delete - ] - await self.entities_vdb.delete(entity_vdb_ids) - - # Delete from graph - await self.chunk_entity_relation_graph.remove_nodes( - list(entities_to_delete) - ) - logger.info(f"Deleted {len(entities_to_delete)} entities") - - # 7. Delete relationships that have no remaining sources - if relationships_to_delete: - # Delete from vector database - rel_ids_to_delete = [] - for src, tgt in relationships_to_delete: - rel_ids_to_delete.extend( - [ - compute_mdhash_id(src + tgt, prefix="rel-"), - compute_mdhash_id(tgt + src, prefix="rel-"), - ] + # Process relationships + for node_label in all_labels: + node_edges = await self.chunk_entity_relation_graph.get_node_edges( + node_label ) - await self.relationships_vdb.delete(rel_ids_to_delete) + if node_edges: + for src, tgt in node_edges: + edge_data = await self.chunk_entity_relation_graph.get_edge( + src, tgt + ) + if edge_data and "source_id" in edge_data: + # Split source_id using GRAPH_FIELD_SEP + sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP)) + remaining_sources = sources - chunk_ids - # Delete from graph - await self.chunk_entity_relation_graph.remove_edges( - list(relationships_to_delete) - ) - logger.info(f"Deleted {len(relationships_to_delete)} relationships") + if not remaining_sources: + relationships_to_delete.add((src, tgt)) + logger.debug( + f"Relationship {src}-{tgt} marked for deletion - no remaining sources" + ) + elif remaining_sources != sources: + # Relationship needs to be rebuilt from remaining chunks + relationships_to_rebuild[(src, tgt)] = remaining_sources + logger.debug( + f"Relationship {src}-{tgt} will be rebuilt from {len(remaining_sources)} remaining chunks" + ) - # 8. **OPTIMIZATION 2**: Rebuild entities and relationships from remaining chunks - if entities_to_rebuild or relationships_to_rebuild: - logger.info( - f"Rebuilding {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships..." - ) - await _rebuild_knowledge_from_chunks( - entities_to_rebuild=entities_to_rebuild, - relationships_to_rebuild=relationships_to_rebuild, - knowledge_graph_inst=self.chunk_entity_relation_graph, - entities_vdb=self.entities_vdb, - relationships_vdb=self.relationships_vdb, - text_chunks=self.text_chunks, - llm_response_cache=self.llm_response_cache, - global_config=asdict(self), - ) + # 5. Delete chunks from storage + if chunk_ids: + await self.chunks_vdb.delete(chunk_ids) + await self.text_chunks.delete(chunk_ids) + logger.info(f"Deleted {len(chunk_ids)} chunks from storage") + + # 6. Delete entities that have no remaining sources + if entities_to_delete: + # Delete from vector database + entity_vdb_ids = [ + compute_mdhash_id(entity, prefix="ent-") + for entity in entities_to_delete + ] + await self.entities_vdb.delete(entity_vdb_ids) + + # Delete from graph + await self.chunk_entity_relation_graph.remove_nodes( + list(entities_to_delete) + ) + logger.info(f"Deleted {len(entities_to_delete)} entities") + + # 7. Delete relationships that have no remaining sources + if relationships_to_delete: + # Delete from vector database + rel_ids_to_delete = [] + for src, tgt in relationships_to_delete: + rel_ids_to_delete.extend( + [ + compute_mdhash_id(src + tgt, prefix="rel-"), + compute_mdhash_id(tgt + src, prefix="rel-"), + ] + ) + await self.relationships_vdb.delete(rel_ids_to_delete) + + # Delete from graph + await self.chunk_entity_relation_graph.remove_edges( + list(relationships_to_delete) + ) + logger.info(f"Deleted {len(relationships_to_delete)} relationships") + + # 8. **OPTIMIZATION 2**: Rebuild entities and relationships from remaining chunks + if entities_to_rebuild or relationships_to_rebuild: + logger.info( + f"Rebuilding {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships..." + ) + await _rebuild_knowledge_from_chunks( + entities_to_rebuild=entities_to_rebuild, + relationships_to_rebuild=relationships_to_rebuild, + knowledge_graph_inst=self.chunk_entity_relation_graph, + entities_vdb=self.entities_vdb, + relationships_vdb=self.relationships_vdb, + text_chunks=self.text_chunks, + llm_response_cache=self.llm_response_cache, + global_config=asdict(self), + ) # 9. Delete original document and status await self.full_docs.delete([doc_id]) @@ -1857,12 +1862,15 @@ class LightRAG: """ from .utils_graph import adelete_by_entity - return await adelete_by_entity( - self.chunk_entity_relation_graph, - self.entities_vdb, - self.relationships_vdb, - entity_name, - ) + # Use graph database lock to ensure atomic merges and updates + graph_db_lock = get_graph_db_lock(enable_logging=False) + async with graph_db_lock: + return await adelete_by_entity( + self.chunk_entity_relation_graph, + self.entities_vdb, + self.relationships_vdb, + entity_name, + ) def delete_by_entity(self, entity_name: str) -> None: loop = always_get_an_event_loop() @@ -1877,12 +1885,15 @@ class LightRAG: """ from .utils_graph import adelete_by_relation - return await adelete_by_relation( - self.chunk_entity_relation_graph, - self.relationships_vdb, - source_entity, - target_entity, - ) + # Use graph database lock to ensure atomic merges and updates + graph_db_lock = get_graph_db_lock(enable_logging=False) + async with graph_db_lock: + return await adelete_by_relation( + self.chunk_entity_relation_graph, + self.relationships_vdb, + source_entity, + target_entity, + ) def delete_by_relation(self, source_entity: str, target_entity: str) -> None: loop = always_get_an_event_loop()