From a471f1ca0e22f72b00d8e23bf8b78dd8bd64439d Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 24 Oct 2025 14:08:12 +0800 Subject: [PATCH] Add pipeline cancellation feature for graceful processing termination MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Add cancel_pipeline API endpoint • Implement PipelineCancelledException • Add cancellation checks in main loop • Handle task cancellation gracefully • Mark cancelled docs as FAILED (cherry picked from commit 743aefc6557ee9a67a7c5e9275b76b866fa17dc7) --- lightrag/exceptions.py | 56 +---- lightrag/lightrag.py | 534 +++++++++++------------------------------ lightrag/operate.py | 39 +++ 3 files changed, 186 insertions(+), 443 deletions(-) diff --git a/lightrag/exceptions.py b/lightrag/exceptions.py index 709f294d..09e1d0e7 100644 --- a/lightrag/exceptions.py +++ b/lightrag/exceptions.py @@ -68,7 +68,10 @@ class StorageNotInitializedError(RuntimeError): f"{storage_type} not initialized. Please ensure proper initialization:\n" f"\n" f" rag = LightRAG(...)\n" - f" await rag.initialize_storages() # Required - auto-initializes pipeline_status\n" + f" await rag.initialize_storages() # Required\n" + f" \n" + f" from lightrag.kg.shared_storage import initialize_pipeline_status\n" + f" await initialize_pipeline_status() # Required for pipeline operations\n" f"\n" f"See: https://github.com/HKUDS/LightRAG#important-initialization-requirements" ) @@ -79,21 +82,18 @@ class PipelineNotInitializedError(KeyError): def __init__(self, namespace: str = ""): msg = ( - f"Pipeline namespace '{namespace}' not found.\n" + f"Pipeline namespace '{namespace}' not found. " + f"This usually means pipeline status was not initialized.\n" f"\n" - f"Pipeline status should be auto-initialized by initialize_storages().\n" - f"If you see this error, please ensure:\n" + f"Please call 'await initialize_pipeline_status()' after initializing storages:\n" f"\n" - f" 1. You called await rag.initialize_storages()\n" - f" 2. For multi-workspace setups, each LightRAG instance was properly initialized\n" - f"\n" - f"Standard initialization:\n" - f" rag = LightRAG(workspace='your_workspace')\n" - f" await rag.initialize_storages() # Auto-initializes pipeline_status\n" - f"\n" - f"If you need manual control (advanced):\n" f" from lightrag.kg.shared_storage import initialize_pipeline_status\n" - f" await initialize_pipeline_status(workspace='your_workspace')" + f" await initialize_pipeline_status()\n" + f"\n" + f"Full initialization sequence:\n" + f" rag = LightRAG(...)\n" + f" await rag.initialize_storages()\n" + f" await initialize_pipeline_status()" ) super().__init__(msg) @@ -104,33 +104,3 @@ class PipelineCancelledException(Exception): def __init__(self, message: str = "User cancelled"): super().__init__(message) self.message = message - - -class ChunkTokenLimitExceededError(ValueError): - """Raised when a chunk exceeds the configured token limit.""" - - def __init__( - self, - chunk_tokens: int, - chunk_token_limit: int, - chunk_preview: str | None = None, - ) -> None: - preview = chunk_preview.strip() if chunk_preview else None - truncated_preview = preview[:80] if preview else None - preview_note = f" Preview: '{truncated_preview}'" if truncated_preview else "" - message = ( - f"Chunk token length {chunk_tokens} exceeds chunk_token_size {chunk_token_limit}." - f"{preview_note}" - ) - super().__init__(message) - self.chunk_tokens = chunk_tokens - self.chunk_token_limit = chunk_token_limit - self.chunk_preview = truncated_preview - - -class QdrantMigrationError(Exception): - """Raised when Qdrant data migration from legacy collections fails.""" - - def __init__(self, message: str): - super().__init__(message) - self.message = message diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 8a638759..191a5acd 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -3,7 +3,6 @@ from __future__ import annotations import traceback import asyncio import configparser -import inspect import os import time import warnings @@ -13,7 +12,6 @@ from functools import partial from typing import ( Any, AsyncIterator, - Awaitable, Callable, Iterator, cast, @@ -22,7 +20,6 @@ from typing import ( Optional, List, Dict, - Union, ) from lightrag.prompt import PROMPTS from lightrag.exceptions import PipelineCancelledException @@ -64,10 +61,9 @@ from lightrag.kg import ( from lightrag.kg.shared_storage import ( get_namespace_data, + get_pipeline_status_lock, + get_graph_db_lock, get_data_init_lock, - get_default_workspace, - set_default_workspace, - get_namespace_lock, ) from lightrag.base import ( @@ -91,7 +87,7 @@ from lightrag.operate import ( merge_nodes_and_edges, kg_query, naive_query, - rebuild_knowledge_from_chunks, + _rebuild_knowledge_from_chunks, ) from lightrag.constants import GRAPH_FIELD_SEP from lightrag.utils import ( @@ -247,28 +243,23 @@ class LightRAG: int, int, ], - Union[List[Dict[str, Any]], Awaitable[List[Dict[str, Any]]]], + List[Dict[str, Any]], ] = field(default_factory=lambda: chunking_by_token_size) """ Custom chunking function for splitting text into chunks before processing. - The function can be either synchronous or asynchronous. - The function should take the following parameters: - `tokenizer`: A Tokenizer instance to use for tokenization. - `content`: The text to be split into chunks. - `split_by_character`: The character to split the text on. If None, the text is split into chunks of `chunk_token_size` tokens. - `split_by_character_only`: If True, the text is split only on the specified character. - - `chunk_overlap_token_size`: The number of overlapping tokens between consecutive chunks. - `chunk_token_size`: The maximum number of tokens per chunk. + - `chunk_overlap_token_size`: The number of overlapping tokens between consecutive chunks. - - The function should return a list of dictionaries (or an awaitable that resolves to a list), - where each dictionary contains the following keys: - - `tokens` (int): The number of tokens in the chunk. - - `content` (str): The text content of the chunk. - - `chunk_order_index` (int): Zero-based index indicating the chunk's order in the document. + The function should return a list of dictionaries, where each dictionary contains the following keys: + - `tokens`: The number of tokens in the chunk. + - `content`: The text content of the chunk. Defaults to `chunking_by_token_size` if not specified. """ @@ -279,9 +270,6 @@ class LightRAG: embedding_func: EmbeddingFunc | None = field(default=None) """Function for computing text embeddings. Must be set before use.""" - embedding_token_limit: int | None = field(default=None, init=False) - """Token limit for embedding model. Set automatically from embedding_func.max_token_size in __post_init__.""" - embedding_batch_num: int = field(default=int(os.getenv("EMBEDDING_BATCH_NUM", 10))) """Batch size for embedding computations.""" @@ -525,16 +513,6 @@ class LightRAG: logger.debug(f"LightRAG init with param:\n {_print_config}\n") # Init Embedding - # Step 1: Capture max_token_size before applying decorator (decorator strips dataclass attributes) - embedding_max_token_size = None - if self.embedding_func and hasattr(self.embedding_func, "max_token_size"): - embedding_max_token_size = self.embedding_func.max_token_size - logger.debug( - f"Captured embedding max_token_size: {embedding_max_token_size}" - ) - self.embedding_token_limit = embedding_max_token_size - - # Step 2: Apply priority wrapper decorator self.embedding_func = priority_limit_async_func_call( self.embedding_func_max_async, llm_timeout=self.default_embedding_timeout, @@ -661,22 +639,6 @@ class LightRAG: async def initialize_storages(self): """Storage initialization must be called one by one to prevent deadlock""" if self._storages_status == StoragesStatus.CREATED: - # Set the first initialized workspace will set the default workspace - # Allows namespace operation without specifying workspace for backward compatibility - default_workspace = get_default_workspace() - if default_workspace is None: - set_default_workspace(self.workspace) - elif default_workspace != self.workspace: - logger.info( - f"Creating LightRAG instance with workspace='{self.workspace}' " - f"while default workspace is set to '{default_workspace}'" - ) - - # Auto-initialize pipeline_status for this workspace - from lightrag.kg.shared_storage import initialize_pipeline_status - - await initialize_pipeline_status(workspace=self.workspace) - for storage in ( self.full_docs, self.text_chunks, @@ -748,7 +710,7 @@ class LightRAG: async def check_and_migrate_data(self): """Check if data migration is needed and perform migration if necessary""" - async with get_data_init_lock(): + async with get_data_init_lock(enable_logging=True): try: # Check if migration is needed: # 1. chunk_entity_relation_graph has entities and relations (count > 0) @@ -1611,12 +1573,8 @@ class LightRAG: """ # Get pipeline status shared data and lock - pipeline_status = await get_namespace_data( - "pipeline_status", workspace=self.workspace - ) - pipeline_status_lock = get_namespace_lock( - "pipeline_status", workspace=self.workspace - ) + pipeline_status = await get_namespace_data("pipeline_status") + pipeline_status_lock = get_pipeline_status_lock() # Check if another process is already processing the queue async with pipeline_status_lock: @@ -1741,16 +1699,10 @@ class LightRAG: semaphore: asyncio.Semaphore, ) -> None: """Process single document""" - # Initialize variables at the start to prevent UnboundLocalError in error handling - file_path = "unknown_source" - current_file_number = 0 file_extraction_stage_ok = False - processing_start_time = int(time.time()) - first_stage_tasks = [] - entity_relation_task = None - async with semaphore: nonlocal processed_count + current_file_number = 0 # Initialize to prevent UnboundLocalError in error handling first_stage_tasks = [] entity_relation_task = None @@ -1798,28 +1750,7 @@ class LightRAG: ) content = content_data["content"] - # Call chunking function, supporting both sync and async implementations - chunking_result = self.chunking_func( - self.tokenizer, - content, - split_by_character, - split_by_character_only, - self.chunk_overlap_token_size, - self.chunk_token_size, - ) - - # If result is awaitable, await to get actual result - if inspect.isawaitable(chunking_result): - chunking_result = await chunking_result - - # Validate return type - if not isinstance(chunking_result, (list, tuple)): - raise TypeError( - f"chunking_func must return a list or tuple of dicts, " - f"got {type(chunking_result)}" - ) - - # Build chunks dictionary + # Generate chunks from document chunks: dict[str, Any] = { compute_mdhash_id(dp["content"], prefix="chunk-"): { **dp, @@ -1827,7 +1758,14 @@ class LightRAG: "file_path": file_path, # Add file path to each chunk "llm_cache_list": [], # Initialize empty LLM cache list for each chunk } - for dp in chunking_result + for dp in self.chunking_func( + self.tokenizer, + content, + split_by_character, + split_by_character_only, + self.chunk_overlap_token_size, + self.chunk_token_size, + ) } if not chunks: @@ -1891,33 +1829,20 @@ class LightRAG: chunks, pipeline_status, pipeline_status_lock ) ) - chunk_results = await entity_relation_task + await entity_relation_task file_extraction_stage_ok = True except Exception as e: - # Check if this is a user cancellation - if isinstance(e, PipelineCancelledException): - # User cancellation - log brief message only, no traceback - error_msg = f"User cancelled {current_file_number}/{total_files}: {file_path}" - logger.warning(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append( - error_msg - ) - else: - # Other exceptions - log with traceback - logger.error(traceback.format_exc()) - error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}" - logger.error(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append( - traceback.format_exc() - ) - pipeline_status["history_messages"].append( - error_msg - ) + # Log error and update pipeline status + logger.error(traceback.format_exc()) + error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}" + logger.error(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + traceback.format_exc() + ) + pipeline_status["history_messages"].append(error_msg) # Cancel tasks that are not yet completed all_tasks = first_stage_tasks + ( @@ -1927,14 +1852,9 @@ class LightRAG: if task and not task.done(): task.cancel() - # Persistent llm cache with error handling + # Persistent llm cache if self.llm_response_cache: - try: - await self.llm_response_cache.index_done_callback() - except Exception as persist_error: - logger.error( - f"Failed to persist LLM cache: {persist_error}" - ) + await self.llm_response_cache.index_done_callback() # Record processing end time for failed case processing_end_time = int(time.time()) @@ -1973,7 +1893,8 @@ class LightRAG: "User cancelled" ) - # Use chunk_results from entity_relation_task + # Get chunk_results from entity_relation_task + chunk_results = await entity_relation_task await merge_nodes_and_edges( chunk_results=chunk_results, # result collected from entity_relation_task knowledge_graph_inst=self.chunk_entity_relation_graph, @@ -2030,38 +1951,22 @@ class LightRAG: ) except Exception as e: - # Check if this is a user cancellation - if isinstance(e, PipelineCancelledException): - # User cancellation - log brief message only, no traceback - error_msg = f"User cancelled during merge {current_file_number}/{total_files}: {file_path}" - logger.warning(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append( - error_msg - ) - else: - # Other exceptions - log with traceback - logger.error(traceback.format_exc()) - error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}" - logger.error(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append( - traceback.format_exc() - ) - pipeline_status["history_messages"].append( - error_msg - ) + # Log error and update pipeline status + logger.error(traceback.format_exc()) + error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}" + logger.error(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + traceback.format_exc() + ) + pipeline_status["history_messages"].append( + error_msg + ) - # Persistent llm cache with error handling + # Persistent llm cache if self.llm_response_cache: - try: - await self.llm_response_cache.index_done_callback() - except Exception as persist_error: - logger.error( - f"Failed to persist LLM cache: {persist_error}" - ) + await self.llm_response_cache.index_done_callback() # Record processing end time for failed case processing_end_time = int(time.time()) @@ -2950,26 +2855,6 @@ class LightRAG: data across different storage layers are removed or rebuiled. If entities or relationships are partially affected, they will be rebuilded using LLM cached from remaining documents. - **Concurrency Control Design:** - - This function implements a pipeline-based concurrency control to prevent data corruption: - - 1. **Single Document Deletion** (when WE acquire pipeline): - - Sets job_name to "Single document deletion" (NOT starting with "deleting") - - Prevents other adelete_by_doc_id calls from running concurrently - - Ensures exclusive access to graph operations for this deletion - - 2. **Batch Document Deletion** (when background_delete_documents acquires pipeline): - - Sets job_name to "Deleting {N} Documents" (starts with "deleting") - - Allows multiple adelete_by_doc_id calls to join the deletion queue - - Each call validates the job name to ensure it's part of a deletion operation - - The validation logic `if not job_name.startswith("deleting") or "document" not in job_name` - ensures that: - - adelete_by_doc_id can only run when pipeline is idle OR during batch deletion - - Prevents concurrent single deletions that could cause race conditions - - Rejects operations when pipeline is busy with non-deletion tasks - Args: doc_id (str): The unique identifier of the document to be deleted. delete_llm_cache (bool): Whether to delete cached LLM extraction results @@ -2977,62 +2862,20 @@ class LightRAG: Returns: DeletionResult: An object containing the outcome of the deletion process. - - `status` (str): "success", "not_found", "not_allowed", or "failure". + - `status` (str): "success", "not_found", or "failure". - `doc_id` (str): The ID of the document attempted to be deleted. - `message` (str): A summary of the operation's result. - - `status_code` (int): HTTP status code (e.g., 200, 404, 403, 500). + - `status_code` (int): HTTP status code (e.g., 200, 404, 500). - `file_path` (str | None): The file path of the deleted document, if available. """ - # Get pipeline status shared data and lock for validation - pipeline_status = await get_namespace_data( - "pipeline_status", workspace=self.workspace - ) - pipeline_status_lock = get_namespace_lock( - "pipeline_status", workspace=self.workspace - ) - - # Track whether WE acquired the pipeline - we_acquired_pipeline = False - - # Check and acquire pipeline if needed - async with pipeline_status_lock: - if not pipeline_status.get("busy", False): - # Pipeline is idle - WE acquire it for this deletion - we_acquired_pipeline = True - pipeline_status.update( - { - "busy": True, - "job_name": "Single document deletion", - "job_start": datetime.now(timezone.utc).isoformat(), - "docs": 1, - "batchs": 1, - "cur_batch": 0, - "request_pending": False, - "cancellation_requested": False, - "latest_message": f"Starting deletion for document: {doc_id}", - } - ) - # Initialize history messages - pipeline_status["history_messages"][:] = [ - f"Starting deletion for document: {doc_id}" - ] - else: - # Pipeline already busy - verify it's a deletion job - job_name = pipeline_status.get("job_name", "").lower() - if not job_name.startswith("deleting") or "document" not in job_name: - return DeletionResult( - status="not_allowed", - doc_id=doc_id, - message=f"Deletion not allowed: current job '{pipeline_status.get('job_name')}' is not a document deletion job", - status_code=403, - file_path=None, - ) - # Pipeline is busy with deletion - proceed without acquiring - deletion_operations_started = False original_exception = None doc_llm_cache_ids: list[str] = [] + # Get pipeline status shared data and lock for status updates + pipeline_status = await get_namespace_data("pipeline_status") + pipeline_status_lock = get_pipeline_status_lock() + async with pipeline_status_lock: log_message = f"Starting deletion process for document {doc_id}" logger.info(log_message) @@ -3264,9 +3107,6 @@ class LightRAG: ] if not existing_sources: - # No chunk references means this entity should be deleted - entities_to_delete.add(node_label) - entity_chunk_updates[node_label] = [] continue remaining_sources = subtract_source_ids(existing_sources, chunk_ids) @@ -3288,7 +3128,6 @@ class LightRAG: # Process relationships for edge_data in affected_edges: - # source target is not in normalize order in graph db property src = edge_data.get("source") tgt = edge_data.get("target") @@ -3325,9 +3164,6 @@ class LightRAG: ] if not existing_sources: - # No chunk references means this relationship should be deleted - relationships_to_delete.add(edge_tuple) - relation_chunk_updates[edge_tuple] = [] continue remaining_sources = subtract_source_ids(existing_sources, chunk_ids) @@ -3353,31 +3189,38 @@ class LightRAG: if entity_chunk_updates and self.entity_chunks: entity_upsert_payload = {} + entity_delete_ids: set[str] = set() for entity_name, remaining in entity_chunk_updates.items(): if not remaining: - # Empty entities are deleted alongside graph nodes later - continue - entity_upsert_payload[entity_name] = { - "chunk_ids": remaining, - "count": len(remaining), - "updated_at": current_time, - } + entity_delete_ids.add(entity_name) + else: + entity_upsert_payload[entity_name] = { + "chunk_ids": remaining, + "count": len(remaining), + "updated_at": current_time, + } + + if entity_delete_ids: + await self.entity_chunks.delete(list(entity_delete_ids)) if entity_upsert_payload: await self.entity_chunks.upsert(entity_upsert_payload) if relation_chunk_updates and self.relation_chunks: relation_upsert_payload = {} + relation_delete_ids: set[str] = set() for edge_tuple, remaining in relation_chunk_updates.items(): - if not remaining: - # Empty relations are deleted alongside graph edges later - continue storage_key = make_relation_chunk_key(*edge_tuple) - relation_upsert_payload[storage_key] = { - "chunk_ids": remaining, - "count": len(remaining), - "updated_at": current_time, - } + if not remaining: + relation_delete_ids.add(storage_key) + else: + relation_upsert_payload[storage_key] = { + "chunk_ids": remaining, + "count": len(remaining), + "updated_at": current_time, + } + if relation_delete_ids: + await self.relation_chunks.delete(list(relation_delete_ids)) if relation_upsert_payload: await self.relation_chunks.upsert(relation_upsert_payload) @@ -3385,111 +3228,56 @@ class LightRAG: logger.error(f"Failed to process graph analysis results: {e}") raise Exception(f"Failed to process graph dependencies: {e}") from e - # Data integrity is ensured by allowing only one process to hold pipeline at a time(no graph db lock is needed anymore) + # Use graph database lock to prevent dirty read + graph_db_lock = get_graph_db_lock(enable_logging=False) + async with graph_db_lock: + # 5. Delete chunks from storage + if chunk_ids: + try: + await self.chunks_vdb.delete(chunk_ids) + await self.text_chunks.delete(chunk_ids) - # 5. Delete chunks from storage - if chunk_ids: - try: - await self.chunks_vdb.delete(chunk_ids) - await self.text_chunks.delete(chunk_ids) + async with pipeline_status_lock: + log_message = f"Successfully deleted {len(chunk_ids)} chunks from storage" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) - async with pipeline_status_lock: - log_message = ( - f"Successfully deleted {len(chunk_ids)} chunks from storage" - ) - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + except Exception as e: + logger.error(f"Failed to delete chunks: {e}") + raise Exception(f"Failed to delete document chunks: {e}") from e - except Exception as e: - logger.error(f"Failed to delete chunks: {e}") - raise Exception(f"Failed to delete document chunks: {e}") from e - - # 6. Delete relationships that have no remaining sources - if relationships_to_delete: - try: - # Delete from relation vdb - rel_ids_to_delete = [] - for src, tgt in relationships_to_delete: - rel_ids_to_delete.extend( - [ - compute_mdhash_id(src + tgt, prefix="rel-"), - compute_mdhash_id(tgt + src, prefix="rel-"), - ] - ) - await self.relationships_vdb.delete(rel_ids_to_delete) - - # Delete from graph - await self.chunk_entity_relation_graph.remove_edges( - list(relationships_to_delete) - ) - - # Delete from relation_chunks storage - if self.relation_chunks: - relation_storage_keys = [ - make_relation_chunk_key(src, tgt) - for src, tgt in relationships_to_delete + # 6. Delete entities that have no remaining sources + if entities_to_delete: + try: + # Delete from vector database + entity_vdb_ids = [ + compute_mdhash_id(entity, prefix="ent-") + for entity in entities_to_delete ] - await self.relation_chunks.delete(relation_storage_keys) + await self.entities_vdb.delete(entity_vdb_ids) - async with pipeline_status_lock: - log_message = f"Successfully deleted {len(relationships_to_delete)} relations" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - - except Exception as e: - logger.error(f"Failed to delete relationships: {e}") - raise Exception(f"Failed to delete relationships: {e}") from e - - # 7. Delete entities that have no remaining sources - if entities_to_delete: - try: - # Batch get all edges for entities to avoid N+1 query problem - nodes_edges_dict = ( - await self.chunk_entity_relation_graph.get_nodes_edges_batch( + # Delete from graph + await self.chunk_entity_relation_graph.remove_nodes( list(entities_to_delete) ) - ) - # Debug: Check and log all edges before deleting nodes - edges_to_delete = set() - edges_still_exist = 0 + async with pipeline_status_lock: + log_message = f"Successfully deleted {len(entities_to_delete)} entities" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) - for entity, edges in nodes_edges_dict.items(): - if edges: - for src, tgt in edges: - # Normalize edge representation (sorted for consistency) - edge_tuple = tuple(sorted((src, tgt))) - edges_to_delete.add(edge_tuple) + except Exception as e: + logger.error(f"Failed to delete entities: {e}") + raise Exception(f"Failed to delete entities: {e}") from e - if ( - src in entities_to_delete - and tgt in entities_to_delete - ): - logger.warning( - f"Edge still exists: {src} <-> {tgt}" - ) - elif src in entities_to_delete: - logger.warning( - f"Edge still exists: {src} --> {tgt}" - ) - else: - logger.warning( - f"Edge still exists: {src} <-- {tgt}" - ) - edges_still_exist += 1 - - if edges_still_exist: - logger.warning( - f"⚠️ {edges_still_exist} entities still has edges before deletion" - ) - - # Clean residual edges from VDB and storage before deleting nodes - if edges_to_delete: - # Delete from relationships_vdb + # 7. Delete relationships that have no remaining sources + if relationships_to_delete: + try: + # Delete from vector database rel_ids_to_delete = [] - for src, tgt in edges_to_delete: + for src, tgt in relationships_to_delete: rel_ids_to_delete.extend( [ compute_mdhash_id(src + tgt, prefix="rel-"), @@ -3498,53 +3286,28 @@ class LightRAG: ) await self.relationships_vdb.delete(rel_ids_to_delete) - # Delete from relation_chunks storage - if self.relation_chunks: - relation_storage_keys = [ - make_relation_chunk_key(src, tgt) - for src, tgt in edges_to_delete - ] - await self.relation_chunks.delete(relation_storage_keys) - - logger.info( - f"Cleaned {len(edges_to_delete)} residual edges from VDB and chunk-tracking storage" + # Delete from graph + await self.chunk_entity_relation_graph.remove_edges( + list(relationships_to_delete) ) - # Delete from graph (edges will be auto-deleted with nodes) - await self.chunk_entity_relation_graph.remove_nodes( - list(entities_to_delete) - ) + async with pipeline_status_lock: + log_message = f"Successfully deleted {len(relationships_to_delete)} relations" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) - # Delete from vector vdb - entity_vdb_ids = [ - compute_mdhash_id(entity, prefix="ent-") - for entity in entities_to_delete - ] - await self.entities_vdb.delete(entity_vdb_ids) + except Exception as e: + logger.error(f"Failed to delete relationships: {e}") + raise Exception(f"Failed to delete relationships: {e}") from e - # Delete from entity_chunks storage - if self.entity_chunks: - await self.entity_chunks.delete(list(entities_to_delete)) - - async with pipeline_status_lock: - log_message = ( - f"Successfully deleted {len(entities_to_delete)} entities" - ) - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - - except Exception as e: - logger.error(f"Failed to delete entities: {e}") - raise Exception(f"Failed to delete entities: {e}") from e - - # Persist changes to graph database before entity and relationship rebuild - await self._insert_done() + # Persist changes to graph database before releasing graph database lock + await self._insert_done() # 8. Rebuild entities and relationships from remaining chunks if entities_to_rebuild or relationships_to_rebuild: try: - await rebuild_knowledge_from_chunks( + await _rebuild_knowledge_from_chunks( entities_to_rebuild=entities_to_rebuild, relationships_to_rebuild=relationships_to_rebuild, knowledge_graph_inst=self.chunk_entity_relation_graph, @@ -3645,18 +3408,6 @@ class LightRAG: f"No deletion operations were started for document {doc_id}, skipping persistence" ) - # Release pipeline only if WE acquired it - if we_acquired_pipeline: - async with pipeline_status_lock: - pipeline_status["busy"] = False - pipeline_status["cancellation_requested"] = False - completion_msg = ( - f"Deletion process completed for document: {doc_id}" - ) - pipeline_status["latest_message"] = completion_msg - pipeline_status["history_messages"].append(completion_msg) - logger.info(completion_msg) - async def adelete_by_entity(self, entity_name: str) -> DeletionResult: """Asynchronously delete an entity and all its relationships. @@ -3774,22 +3525,16 @@ class LightRAG: ) async def aedit_entity( - self, - entity_name: str, - updated_data: dict[str, str], - allow_rename: bool = True, - allow_merge: bool = False, + self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True ) -> dict[str, Any]: """Asynchronously edit entity information. Updates entity information in the knowledge graph and re-embeds the entity in the vector database. - Also synchronizes entity_chunks_storage and relation_chunks_storage to track chunk references. Args: entity_name: Name of the entity to edit updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"} allow_rename: Whether to allow entity renaming, defaults to True - allow_merge: Whether to merge into an existing entity when renaming to an existing name Returns: Dictionary containing updated entity information @@ -3803,21 +3548,14 @@ class LightRAG: entity_name, updated_data, allow_rename, - allow_merge, - self.entity_chunks, - self.relation_chunks, ) def edit_entity( - self, - entity_name: str, - updated_data: dict[str, str], - allow_rename: bool = True, - allow_merge: bool = False, + self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True ) -> dict[str, Any]: loop = always_get_an_event_loop() return loop.run_until_complete( - self.aedit_entity(entity_name, updated_data, allow_rename, allow_merge) + self.aedit_entity(entity_name, updated_data, allow_rename) ) async def aedit_relation( @@ -3826,7 +3564,6 @@ class LightRAG: """Asynchronously edit relation information. Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database. - Also synchronizes the relation_chunks_storage to track which chunks reference this relation. Args: source_entity: Name of the source entity @@ -3845,7 +3582,6 @@ class LightRAG: source_entity, target_entity, updated_data, - self.relation_chunks, ) def edit_relation( @@ -3957,8 +3693,6 @@ class LightRAG: target_entity, merge_strategy, target_entity_data, - self.entity_chunks, - self.relation_chunks, ) def merge_entities( diff --git a/lightrag/operate.py b/lightrag/operate.py index 4b34f474..eac58349 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -7,6 +7,7 @@ import json_repair from typing import Any, AsyncIterator, overload, Literal from collections import Counter, defaultdict +from lightrag.exceptions import PipelineCancelledException from lightrag.utils import ( logger, compute_mdhash_id, @@ -2204,6 +2205,12 @@ async def merge_nodes_and_edges( file_path: File path for logging """ + # Check for cancellation at the start of merge + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException("User cancelled during merge phase") + # Collect all nodes and edges from all chunks all_nodes = defaultdict(list) all_edges = defaultdict(list) @@ -2240,6 +2247,14 @@ async def merge_nodes_and_edges( async def _locked_process_entity_name(entity_name, entities): async with semaphore: + # Check for cancellation before processing entity + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException( + "User cancelled during entity merge" + ) + workspace = global_config.get("workspace", "") namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" async with get_storage_keyed_lock( @@ -2339,6 +2354,14 @@ async def merge_nodes_and_edges( async def _locked_process_edges(edge_key, edges): async with semaphore: + # Check for cancellation before processing edges + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException( + "User cancelled during relation merge" + ) + workspace = global_config.get("workspace", "") namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" sorted_edge_key = sorted([edge_key[0], edge_key[1]]) @@ -2525,6 +2548,14 @@ async def extract_entities( llm_response_cache: BaseKVStorage | None = None, text_chunks_storage: BaseKVStorage | None = None, ) -> list: + # Check for cancellation at the start of entity extraction + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException( + "User cancelled during entity extraction" + ) + use_llm_func: callable = global_config["llm_model_func"] entity_extract_max_gleaning = global_config["entity_extract_max_gleaning"] @@ -2692,6 +2723,14 @@ async def extract_entities( async def _process_with_semaphore(chunk): async with semaphore: + # Check for cancellation before processing chunk + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException( + "User cancelled during chunk processing" + ) + try: return await _process_single_content(chunk) except Exception as e: