diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 721181d5..fa529784 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -2272,117 +2272,111 @@ class LightRAG: relationships_to_delete = set() relationships_to_rebuild = {} # (src, tgt) -> remaining_chunk_ids - # Use graph database lock to ensure atomic merges and updates + try: + # Get affected entities and relations from full_entities and full_relations storage + doc_entities_data = await self.full_entities.get_by_id(doc_id) + doc_relations_data = await self.full_relations.get_by_id(doc_id) + + affected_nodes = [] + affected_edges = [] + + # Get entity data from graph storage using entity names from full_entities + if doc_entities_data and "entity_names" in doc_entities_data: + entity_names = doc_entities_data["entity_names"] + # get_nodes_batch returns dict[str, dict], need to convert to list[dict] + nodes_dict = await self.chunk_entity_relation_graph.get_nodes_batch( + entity_names + ) + for entity_name in entity_names: + node_data = nodes_dict.get(entity_name) + if node_data: + # Ensure compatibility with existing logic that expects "id" field + if "id" not in node_data: + node_data["id"] = entity_name + affected_nodes.append(node_data) + + # Get relation data from graph storage using relation pairs from full_relations + if doc_relations_data and "relation_pairs" in doc_relations_data: + relation_pairs = doc_relations_data["relation_pairs"] + edge_pairs_dicts = [ + {"src": pair[0], "tgt": pair[1]} for pair in relation_pairs + ] + # get_edges_batch returns dict[tuple[str, str], dict], need to convert to list[dict] + edges_dict = await self.chunk_entity_relation_graph.get_edges_batch( + edge_pairs_dicts + ) + + for pair in relation_pairs: + src, tgt = pair[0], pair[1] + edge_key = (src, tgt) + edge_data = edges_dict.get(edge_key) + if edge_data: + # Ensure compatibility with existing logic that expects "source" and "target" fields + if "source" not in edge_data: + edge_data["source"] = src + if "target" not in edge_data: + edge_data["target"] = tgt + affected_edges.append(edge_data) + + except Exception as e: + logger.error(f"Failed to analyze affected graph elements: {e}") + raise Exception(f"Failed to analyze graph dependencies: {e}") from e + + try: + # Process entities + for node_data in affected_nodes: + node_label = node_data.get("entity_id") + if node_label and "source_id" in node_data: + sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP)) + remaining_sources = sources - chunk_ids + + if not remaining_sources: + entities_to_delete.add(node_label) + elif remaining_sources != sources: + entities_to_rebuild[node_label] = remaining_sources + + async with pipeline_status_lock: + log_message = f"Found {len(entities_to_rebuild)} affected entities" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + + # Process relationships + for edge_data in affected_edges: + src = edge_data.get("source") + tgt = edge_data.get("target") + + if src and tgt and "source_id" in edge_data: + edge_tuple = tuple(sorted((src, tgt))) + if ( + edge_tuple in relationships_to_delete + or edge_tuple in relationships_to_rebuild + ): + continue + + sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP)) + remaining_sources = sources - chunk_ids + + if not remaining_sources: + relationships_to_delete.add(edge_tuple) + elif remaining_sources != sources: + relationships_to_rebuild[edge_tuple] = remaining_sources + + async with pipeline_status_lock: + log_message = ( + f"Found {len(relationships_to_rebuild)} affected relations" + ) + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + + except Exception as e: + logger.error(f"Failed to process graph analysis results: {e}") + raise Exception(f"Failed to process graph dependencies: {e}") from e + + # Use graph database lock to prevent dirty read graph_db_lock = get_graph_db_lock(enable_logging=False) async with graph_db_lock: - try: - # Get affected entities and relations from full_entities and full_relations storage - doc_entities_data = await self.full_entities.get_by_id(doc_id) - doc_relations_data = await self.full_relations.get_by_id(doc_id) - - affected_nodes = [] - affected_edges = [] - - # Get entity data from graph storage using entity names from full_entities - if doc_entities_data and "entity_names" in doc_entities_data: - entity_names = doc_entities_data["entity_names"] - # get_nodes_batch returns dict[str, dict], need to convert to list[dict] - nodes_dict = ( - await self.chunk_entity_relation_graph.get_nodes_batch( - entity_names - ) - ) - for entity_name in entity_names: - node_data = nodes_dict.get(entity_name) - if node_data: - # Ensure compatibility with existing logic that expects "id" field - if "id" not in node_data: - node_data["id"] = entity_name - affected_nodes.append(node_data) - - # Get relation data from graph storage using relation pairs from full_relations - if doc_relations_data and "relation_pairs" in doc_relations_data: - relation_pairs = doc_relations_data["relation_pairs"] - edge_pairs_dicts = [ - {"src": pair[0], "tgt": pair[1]} for pair in relation_pairs - ] - # get_edges_batch returns dict[tuple[str, str], dict], need to convert to list[dict] - edges_dict = ( - await self.chunk_entity_relation_graph.get_edges_batch( - edge_pairs_dicts - ) - ) - - for pair in relation_pairs: - src, tgt = pair[0], pair[1] - edge_key = (src, tgt) - edge_data = edges_dict.get(edge_key) - if edge_data: - # Ensure compatibility with existing logic that expects "source" and "target" fields - if "source" not in edge_data: - edge_data["source"] = src - if "target" not in edge_data: - edge_data["target"] = tgt - affected_edges.append(edge_data) - - except Exception as e: - logger.error(f"Failed to analyze affected graph elements: {e}") - raise Exception(f"Failed to analyze graph dependencies: {e}") from e - - try: - # Process entities - for node_data in affected_nodes: - node_label = node_data.get("entity_id") - if node_label and "source_id" in node_data: - sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP)) - remaining_sources = sources - chunk_ids - - if not remaining_sources: - entities_to_delete.add(node_label) - elif remaining_sources != sources: - entities_to_rebuild[node_label] = remaining_sources - - async with pipeline_status_lock: - log_message = ( - f"Found {len(entities_to_rebuild)} affected entities" - ) - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - - # Process relationships - for edge_data in affected_edges: - src = edge_data.get("source") - tgt = edge_data.get("target") - - if src and tgt and "source_id" in edge_data: - edge_tuple = tuple(sorted((src, tgt))) - if ( - edge_tuple in relationships_to_delete - or edge_tuple in relationships_to_rebuild - ): - continue - - sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP)) - remaining_sources = sources - chunk_ids - - if not remaining_sources: - relationships_to_delete.add(edge_tuple) - elif remaining_sources != sources: - relationships_to_rebuild[edge_tuple] = remaining_sources - - async with pipeline_status_lock: - log_message = ( - f"Found {len(relationships_to_rebuild)} affected relations" - ) - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - - except Exception as e: - logger.error(f"Failed to process graph analysis results: {e}") - raise Exception(f"Failed to process graph dependencies: {e}") from e - # 5. Delete chunks from storage if chunk_ids: try: @@ -2453,27 +2447,28 @@ class LightRAG: logger.error(f"Failed to delete relationships: {e}") raise Exception(f"Failed to delete relationships: {e}") from e - # 8. Rebuild entities and relationships from remaining chunks - if entities_to_rebuild or relationships_to_rebuild: - try: - await _rebuild_knowledge_from_chunks( - entities_to_rebuild=entities_to_rebuild, - relationships_to_rebuild=relationships_to_rebuild, - knowledge_graph_inst=self.chunk_entity_relation_graph, - entities_vdb=self.entities_vdb, - relationships_vdb=self.relationships_vdb, - text_chunks_storage=self.text_chunks, - llm_response_cache=self.llm_response_cache, - global_config=asdict(self), - pipeline_status=pipeline_status, - pipeline_status_lock=pipeline_status_lock, - ) + # Persist changes to graph database before releasing graph database lock + await self._insert_done() - except Exception as e: - logger.error(f"Failed to rebuild knowledge from chunks: {e}") - raise Exception( - f"Failed to rebuild knowledge graph: {e}" - ) from e + # 8. Rebuild entities and relationships from remaining chunks + if entities_to_rebuild or relationships_to_rebuild: + try: + await _rebuild_knowledge_from_chunks( + entities_to_rebuild=entities_to_rebuild, + relationships_to_rebuild=relationships_to_rebuild, + knowledge_graph_inst=self.chunk_entity_relation_graph, + entities_vdb=self.entities_vdb, + relationships_vdb=self.relationships_vdb, + text_chunks_storage=self.text_chunks, + llm_response_cache=self.llm_response_cache, + global_config=asdict(self), + pipeline_status=pipeline_status, + pipeline_status_lock=pipeline_status_lock, + ) + + except Exception as e: + logger.error(f"Failed to rebuild knowledge from chunks: {e}") + raise Exception(f"Failed to rebuild knowledge graph: {e}") from e # 9. Delete from full_entities and full_relations storage try: