From 70ba7cd787ef417359a7e5fabdf0ca9b9f21b766 Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 7 Nov 2025 02:56:16 +0800 Subject: [PATCH] Fix: Remove redundant entity/relation chunk deletions (cherry picked from commit ea141e2779e0f60be0d69486c425eaae4ed80ea4) --- lightrag/lightrag.py | 430 +++++++++++++++++++++++++++++++++---------- 1 file changed, 335 insertions(+), 95 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 985cc5f0..acf157da 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -22,6 +22,7 @@ from typing import ( Dict, ) from lightrag.prompt import PROMPTS +from lightrag.exceptions import PipelineCancelledException from lightrag.constants import ( DEFAULT_MAX_GLEANING, DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE, @@ -47,6 +48,8 @@ from lightrag.constants import ( DEFAULT_LLM_TIMEOUT, DEFAULT_EMBEDDING_TIMEOUT, DEFAULT_SOURCE_IDS_LIMIT_METHOD, + DEFAULT_MAX_FILE_PATHS, + DEFAULT_FILE_PATH_MORE_PLACEHOLDER, ) from lightrag.utils import get_env_value @@ -84,7 +87,7 @@ from lightrag.operate import ( merge_nodes_and_edges, kg_query, naive_query, - _rebuild_knowledge_from_chunks, + rebuild_knowledge_from_chunks, ) from lightrag.constants import GRAPH_FIELD_SEP from lightrag.utils import ( @@ -393,6 +396,14 @@ class LightRAG: ) """Strategy for enforcing source_id limits: IGNORE_NEW or FIFO.""" + max_file_paths: int = field( + default=get_env_value("MAX_FILE_PATHS", DEFAULT_MAX_FILE_PATHS, int) + ) + """Maximum number of file paths to store in entity/relation file_path field.""" + + file_path_more_placeholder: str = field(default=DEFAULT_FILE_PATH_MORE_PLACEHOLDER) + """Placeholder text when file paths exceed max_file_paths limit.""" + addon_params: dict[str, Any] = field( default_factory=lambda: { "language": get_env_value( @@ -699,7 +710,7 @@ class LightRAG: async def check_and_migrate_data(self): """Check if data migration is needed and perform migration if necessary""" - async with get_data_init_lock(enable_logging=True): + async with get_data_init_lock(): try: # Check if migration is needed: # 1. chunk_entity_relation_graph has entities and relations (count > 0) @@ -877,13 +888,13 @@ class LightRAG: need_entity_migration = await self.entity_chunks.is_empty() except Exception as exc: # pragma: no cover - defensive logging logger.error(f"Failed to check entity chunks storage: {exc}") - need_entity_migration = True + raise exc try: need_relation_migration = await self.relation_chunks.is_empty() except Exception as exc: # pragma: no cover - defensive logging logger.error(f"Failed to check relation chunks storage: {exc}") - need_relation_migration = True + raise exc if not need_entity_migration and not need_relation_migration: return @@ -1593,6 +1604,7 @@ class LightRAG: "batchs": 0, # Total number of files to be processed "cur_batch": 0, # Number of files already processed "request_pending": False, # Clear any previous request + "cancellation_requested": False, # Initialize cancellation flag "latest_message": "", } ) @@ -1609,6 +1621,22 @@ class LightRAG: try: # Process documents until no more documents or requests while True: + # Check for cancellation request at the start of main loop + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + # Clear pending request + pipeline_status["request_pending"] = False + # Celar cancellation flag + pipeline_status["cancellation_requested"] = False + + log_message = "Pipeline cancelled by user" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + + # Exit directly, skipping request_pending check + return + if not to_process_docs: log_message = "All enqueued documents have been processed" logger.info(log_message) @@ -1671,14 +1699,25 @@ class LightRAG: semaphore: asyncio.Semaphore, ) -> None: """Process single document""" + # Initialize variables at the start to prevent UnboundLocalError in error handling + file_path = "unknown_source" + current_file_number = 0 file_extraction_stage_ok = False + processing_start_time = int(time.time()) + first_stage_tasks = [] + entity_relation_task = None + async with semaphore: nonlocal processed_count - current_file_number = 0 # Initialize to prevent UnboundLocalError in error handling first_stage_tasks = [] entity_relation_task = None try: + # Check for cancellation before starting document processing + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException("User cancelled") + # Get file path from status document file_path = getattr( status_doc, "file_path", "unknown_source" @@ -1741,6 +1780,11 @@ class LightRAG: # Record processing start time processing_start_time = int(time.time()) + # Check for cancellation before entity extraction + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException("User cancelled") + # Process document in two stages # Stage 1: Process text chunks and docs (parallel execution) doc_status_task = asyncio.create_task( @@ -1791,20 +1835,33 @@ class LightRAG: chunks, pipeline_status, pipeline_status_lock ) ) - await entity_relation_task + chunk_results = await entity_relation_task file_extraction_stage_ok = True except Exception as e: - # Log error and update pipeline status - logger.error(traceback.format_exc()) - error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}" - logger.error(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append( - traceback.format_exc() - ) - pipeline_status["history_messages"].append(error_msg) + # Check if this is a user cancellation + if isinstance(e, PipelineCancelledException): + # User cancellation - log brief message only, no traceback + error_msg = f"User cancelled {current_file_number}/{total_files}: {file_path}" + logger.warning(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + error_msg + ) + else: + # Other exceptions - log with traceback + logger.error(traceback.format_exc()) + error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}" + logger.error(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + traceback.format_exc() + ) + pipeline_status["history_messages"].append( + error_msg + ) # Cancel tasks that are not yet completed all_tasks = first_stage_tasks + ( @@ -1814,9 +1871,14 @@ class LightRAG: if task and not task.done(): task.cancel() - # Persistent llm cache + # Persistent llm cache with error handling if self.llm_response_cache: - await self.llm_response_cache.index_done_callback() + try: + await self.llm_response_cache.index_done_callback() + except Exception as persist_error: + logger.error( + f"Failed to persist LLM cache: {persist_error}" + ) # Record processing end time for failed case processing_end_time = int(time.time()) @@ -1846,8 +1908,16 @@ class LightRAG: # Concurrency is controlled by keyed lock for individual entities and relationships if file_extraction_stage_ok: try: - # Get chunk_results from entity_relation_task - chunk_results = await entity_relation_task + # Check for cancellation before merge + async with pipeline_status_lock: + if pipeline_status.get( + "cancellation_requested", False + ): + raise PipelineCancelledException( + "User cancelled" + ) + + # Use chunk_results from entity_relation_task await merge_nodes_and_edges( chunk_results=chunk_results, # result collected from entity_relation_task knowledge_graph_inst=self.chunk_entity_relation_graph, @@ -1904,22 +1974,38 @@ class LightRAG: ) except Exception as e: - # Log error and update pipeline status - logger.error(traceback.format_exc()) - error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}" - logger.error(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append( - traceback.format_exc() - ) - pipeline_status["history_messages"].append( - error_msg - ) + # Check if this is a user cancellation + if isinstance(e, PipelineCancelledException): + # User cancellation - log brief message only, no traceback + error_msg = f"User cancelled during merge {current_file_number}/{total_files}: {file_path}" + logger.warning(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + error_msg + ) + else: + # Other exceptions - log with traceback + logger.error(traceback.format_exc()) + error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}" + logger.error(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + traceback.format_exc() + ) + pipeline_status["history_messages"].append( + error_msg + ) - # Persistent llm cache + # Persistent llm cache with error handling if self.llm_response_cache: - await self.llm_response_cache.index_done_callback() + try: + await self.llm_response_cache.index_done_callback() + except Exception as persist_error: + logger.error( + f"Failed to persist LLM cache: {persist_error}" + ) # Record processing end time for failed case processing_end_time = int(time.time()) @@ -1960,7 +2046,19 @@ class LightRAG: ) # Wait for all document processing to complete - await asyncio.gather(*doc_tasks) + try: + await asyncio.gather(*doc_tasks) + except PipelineCancelledException: + # Cancel all remaining tasks + for task in doc_tasks: + if not task.done(): + task.cancel() + + # Wait for all tasks to complete cancellation + await asyncio.wait(doc_tasks, return_when=asyncio.ALL_COMPLETED) + + # Exit directly (document statuses already updated in process_document) + return # Check if there's a pending request to process more documents (with lock) has_pending_request = False @@ -1991,11 +2089,14 @@ class LightRAG: to_process_docs.update(pending_docs) finally: - log_message = "Enqueued document processing pipeline stoped" + log_message = "Enqueued document processing pipeline stopped" logger.info(log_message) - # Always reset busy status when done or if an exception occurs (with lock) + # Always reset busy status and cancellation flag when done or if an exception occurs (with lock) async with pipeline_status_lock: pipeline_status["busy"] = False + pipeline_status["cancellation_requested"] = ( + False # Always reset cancellation flag + ) pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) @@ -2783,7 +2884,9 @@ class LightRAG: # Return the dictionary containing statuses only for the found document IDs return found_statuses - async def adelete_by_doc_id(self, doc_id: str) -> DeletionResult: + async def adelete_by_doc_id( + self, doc_id: str, delete_llm_cache: bool = False + ) -> DeletionResult: """Delete a document and all its related data, including chunks, graph elements. This method orchestrates a comprehensive deletion process for a given document ID. @@ -2793,6 +2896,8 @@ class LightRAG: Args: doc_id (str): The unique identifier of the document to be deleted. + delete_llm_cache (bool): Whether to delete cached LLM extraction results + associated with the document. Defaults to False. Returns: DeletionResult: An object containing the outcome of the deletion process. @@ -2804,6 +2909,7 @@ class LightRAG: """ deletion_operations_started = False original_exception = None + doc_llm_cache_ids: list[str] = [] # Get pipeline status shared data and lock for status updates pipeline_status = await get_namespace_data("pipeline_status") @@ -2904,6 +3010,57 @@ class LightRAG: # Mark that deletion operations have started deletion_operations_started = True + if delete_llm_cache and chunk_ids: + if not self.llm_response_cache: + logger.info( + "Skipping LLM cache collection for document %s because cache storage is unavailable", + doc_id, + ) + elif not self.text_chunks: + logger.info( + "Skipping LLM cache collection for document %s because text chunk storage is unavailable", + doc_id, + ) + else: + try: + chunk_data_list = await self.text_chunks.get_by_ids( + list(chunk_ids) + ) + seen_cache_ids: set[str] = set() + for chunk_data in chunk_data_list: + if not chunk_data or not isinstance(chunk_data, dict): + continue + cache_ids = chunk_data.get("llm_cache_list", []) + if not isinstance(cache_ids, list): + continue + for cache_id in cache_ids: + if ( + isinstance(cache_id, str) + and cache_id + and cache_id not in seen_cache_ids + ): + doc_llm_cache_ids.append(cache_id) + seen_cache_ids.add(cache_id) + if doc_llm_cache_ids: + logger.info( + "Collected %d LLM cache entries for document %s", + len(doc_llm_cache_ids), + doc_id, + ) + else: + logger.info( + "No LLM cache entries found for document %s", doc_id + ) + except Exception as cache_collect_error: + logger.error( + "Failed to collect LLM cache ids for document %s: %s", + doc_id, + cache_collect_error, + ) + raise Exception( + f"Failed to collect LLM cache ids for document {doc_id}: {cache_collect_error}" + ) from cache_collect_error + # 4. Analyze entities and relationships that will be affected entities_to_delete = set() entities_to_rebuild = {} # entity_name -> remaining chunk id list @@ -3078,38 +3235,31 @@ class LightRAG: if entity_chunk_updates and self.entity_chunks: entity_upsert_payload = {} - entity_delete_ids: set[str] = set() for entity_name, remaining in entity_chunk_updates.items(): if not remaining: - entity_delete_ids.add(entity_name) - else: - entity_upsert_payload[entity_name] = { - "chunk_ids": remaining, - "count": len(remaining), - "updated_at": current_time, - } - - if entity_delete_ids: - await self.entity_chunks.delete(list(entity_delete_ids)) + # Empty entities are deleted alongside graph nodes later + continue + entity_upsert_payload[entity_name] = { + "chunk_ids": remaining, + "count": len(remaining), + "updated_at": current_time, + } if entity_upsert_payload: await self.entity_chunks.upsert(entity_upsert_payload) if relation_chunk_updates and self.relation_chunks: relation_upsert_payload = {} - relation_delete_ids: set[str] = set() for edge_tuple, remaining in relation_chunk_updates.items(): - storage_key = make_relation_chunk_key(*edge_tuple) if not remaining: - relation_delete_ids.add(storage_key) - else: - relation_upsert_payload[storage_key] = { - "chunk_ids": remaining, - "count": len(remaining), - "updated_at": current_time, - } + # Empty relations are deleted alongside graph edges later + continue + storage_key = make_relation_chunk_key(*edge_tuple) + relation_upsert_payload[storage_key] = { + "chunk_ids": remaining, + "count": len(remaining), + "updated_at": current_time, + } - if relation_delete_ids: - await self.relation_chunks.delete(list(relation_delete_ids)) if relation_upsert_payload: await self.relation_chunks.upsert(relation_upsert_payload) @@ -3136,39 +3286,10 @@ class LightRAG: logger.error(f"Failed to delete chunks: {e}") raise Exception(f"Failed to delete document chunks: {e}") from e - # 6. Delete entities that have no remaining sources - if entities_to_delete: - try: - # Delete from vector database - entity_vdb_ids = [ - compute_mdhash_id(entity, prefix="ent-") - for entity in entities_to_delete - ] - await self.entities_vdb.delete(entity_vdb_ids) - - # Delete from graph - await self.chunk_entity_relation_graph.remove_nodes( - list(entities_to_delete) - ) - - # Delete from entity_chunks storage - if self.entity_chunks: - await self.entity_chunks.delete(list(entities_to_delete)) - - async with pipeline_status_lock: - log_message = f"Successfully deleted {len(entities_to_delete)} entities" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - - except Exception as e: - logger.error(f"Failed to delete entities: {e}") - raise Exception(f"Failed to delete entities: {e}") from e - - # 7. Delete relationships that have no remaining sources + # 6. Delete relationships that have no remaining sources if relationships_to_delete: try: - # Delete from vector database + # Delete from relation vdb rel_ids_to_delete = [] for src, tgt in relationships_to_delete: rel_ids_to_delete.extend( @@ -3202,13 +3323,105 @@ class LightRAG: logger.error(f"Failed to delete relationships: {e}") raise Exception(f"Failed to delete relationships: {e}") from e + # 7. Delete entities that have no remaining sources + if entities_to_delete: + try: + # Batch get all edges for entities to avoid N+1 query problem + nodes_edges_dict = await self.chunk_entity_relation_graph.get_nodes_edges_batch( + list(entities_to_delete) + ) + + # Debug: Check and log all edges before deleting nodes + edges_to_delete = set() + edges_still_exist = 0 + + for entity, edges in nodes_edges_dict.items(): + if edges: + for src, tgt in edges: + # Normalize edge representation (sorted for consistency) + edge_tuple = tuple(sorted((src, tgt))) + edges_to_delete.add(edge_tuple) + + if ( + src in entities_to_delete + and tgt in entities_to_delete + ): + logger.warning( + f"Edge still exists: {src} <-> {tgt}" + ) + elif src in entities_to_delete: + logger.warning( + f"Edge still exists: {src} --> {tgt}" + ) + else: + logger.warning( + f"Edge still exists: {src} <-- {tgt}" + ) + edges_still_exist += 1 + + if edges_still_exist: + logger.warning( + f"⚠️ {edges_still_exist} entities still has edges before deletion" + ) + + # Clean residual edges from VDB and storage before deleting nodes + if edges_to_delete: + # Delete from relationships_vdb + rel_ids_to_delete = [] + for src, tgt in edges_to_delete: + rel_ids_to_delete.extend( + [ + compute_mdhash_id(src + tgt, prefix="rel-"), + compute_mdhash_id(tgt + src, prefix="rel-"), + ] + ) + await self.relationships_vdb.delete(rel_ids_to_delete) + + # Delete from relation_chunks storage + if self.relation_chunks: + relation_storage_keys = [ + make_relation_chunk_key(src, tgt) + for src, tgt in edges_to_delete + ] + await self.relation_chunks.delete(relation_storage_keys) + + logger.info( + f"Cleaned {len(edges_to_delete)} residual edges from VDB and chunk-tracking storage" + ) + + # Delete from graph (edges will be auto-deleted with nodes) + await self.chunk_entity_relation_graph.remove_nodes( + list(entities_to_delete) + ) + + # Delete from vector vdb + entity_vdb_ids = [ + compute_mdhash_id(entity, prefix="ent-") + for entity in entities_to_delete + ] + await self.entities_vdb.delete(entity_vdb_ids) + + # Delete from entity_chunks storage + if self.entity_chunks: + await self.entity_chunks.delete(list(entities_to_delete)) + + async with pipeline_status_lock: + log_message = f"Successfully deleted {len(entities_to_delete)} entities" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + + except Exception as e: + logger.error(f"Failed to delete entities: {e}") + raise Exception(f"Failed to delete entities: {e}") from e + # Persist changes to graph database before releasing graph database lock await self._insert_done() # 8. Rebuild entities and relationships from remaining chunks if entities_to_rebuild or relationships_to_rebuild: try: - await _rebuild_knowledge_from_chunks( + await rebuild_knowledge_from_chunks( entities_to_rebuild=entities_to_rebuild, relationships_to_rebuild=relationships_to_rebuild, knowledge_graph_inst=self.chunk_entity_relation_graph, @@ -3245,6 +3458,23 @@ class LightRAG: logger.error(f"Failed to delete document and status: {e}") raise Exception(f"Failed to delete document and status: {e}") from e + if delete_llm_cache and doc_llm_cache_ids and self.llm_response_cache: + try: + await self.llm_response_cache.delete(doc_llm_cache_ids) + cache_log_message = f"Successfully deleted {len(doc_llm_cache_ids)} LLM cache entries for document {doc_id}" + logger.info(cache_log_message) + async with pipeline_status_lock: + pipeline_status["latest_message"] = cache_log_message + pipeline_status["history_messages"].append(cache_log_message) + log_message = cache_log_message + except Exception as cache_delete_error: + log_message = f"Failed to delete LLM cache for document {doc_id}: {cache_delete_error}" + logger.error(log_message) + logger.error(traceback.format_exc()) + async with pipeline_status_lock: + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + return DeletionResult( status="success", doc_id=doc_id, @@ -3409,7 +3639,11 @@ class LightRAG: ) async def aedit_entity( - self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True + self, + entity_name: str, + updated_data: dict[str, str], + allow_rename: bool = True, + allow_merge: bool = False, ) -> dict[str, Any]: """Asynchronously edit entity information. @@ -3420,6 +3654,7 @@ class LightRAG: entity_name: Name of the entity to edit updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"} allow_rename: Whether to allow entity renaming, defaults to True + allow_merge: Whether to merge into an existing entity when renaming to an existing name Returns: Dictionary containing updated entity information @@ -3433,16 +3668,21 @@ class LightRAG: entity_name, updated_data, allow_rename, + allow_merge, self.entity_chunks, self.relation_chunks, ) def edit_entity( - self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True + self, + entity_name: str, + updated_data: dict[str, str], + allow_rename: bool = True, + allow_merge: bool = False, ) -> dict[str, Any]: loop = always_get_an_event_loop() return loop.run_until_complete( - self.aedit_entity(entity_name, updated_data, allow_rename) + self.aedit_entity(entity_name, updated_data, allow_rename, allow_merge) ) async def aedit_relation(