Add pipeline cancellation feature for graceful processing termination

• Add cancel_pipeline API endpoint
• Implement PipelineCancelledException
• Add cancellation checks in main loop
• Handle task cancellation gracefully
• Mark cancelled docs as FAILED

(cherry picked from commit 743aefc655)
This commit is contained in:
yangdx 2025-10-24 14:08:12 +08:00 committed by Raphaël MANSUY
parent 37d48bafb6
commit a471f1ca0e
3 changed files with 186 additions and 443 deletions

View file

@ -68,7 +68,10 @@ class StorageNotInitializedError(RuntimeError):
f"{storage_type} not initialized. Please ensure proper initialization:\n" f"{storage_type} not initialized. Please ensure proper initialization:\n"
f"\n" f"\n"
f" rag = LightRAG(...)\n" f" rag = LightRAG(...)\n"
f" await rag.initialize_storages() # Required - auto-initializes pipeline_status\n" f" await rag.initialize_storages() # Required\n"
f" \n"
f" from lightrag.kg.shared_storage import initialize_pipeline_status\n"
f" await initialize_pipeline_status() # Required for pipeline operations\n"
f"\n" f"\n"
f"See: https://github.com/HKUDS/LightRAG#important-initialization-requirements" f"See: https://github.com/HKUDS/LightRAG#important-initialization-requirements"
) )
@ -79,21 +82,18 @@ class PipelineNotInitializedError(KeyError):
def __init__(self, namespace: str = ""): def __init__(self, namespace: str = ""):
msg = ( msg = (
f"Pipeline namespace '{namespace}' not found.\n" f"Pipeline namespace '{namespace}' not found. "
f"This usually means pipeline status was not initialized.\n"
f"\n" f"\n"
f"Pipeline status should be auto-initialized by initialize_storages().\n" f"Please call 'await initialize_pipeline_status()' after initializing storages:\n"
f"If you see this error, please ensure:\n"
f"\n" f"\n"
f" 1. You called await rag.initialize_storages()\n"
f" 2. For multi-workspace setups, each LightRAG instance was properly initialized\n"
f"\n"
f"Standard initialization:\n"
f" rag = LightRAG(workspace='your_workspace')\n"
f" await rag.initialize_storages() # Auto-initializes pipeline_status\n"
f"\n"
f"If you need manual control (advanced):\n"
f" from lightrag.kg.shared_storage import initialize_pipeline_status\n" f" from lightrag.kg.shared_storage import initialize_pipeline_status\n"
f" await initialize_pipeline_status(workspace='your_workspace')" f" await initialize_pipeline_status()\n"
f"\n"
f"Full initialization sequence:\n"
f" rag = LightRAG(...)\n"
f" await rag.initialize_storages()\n"
f" await initialize_pipeline_status()"
) )
super().__init__(msg) super().__init__(msg)
@ -104,33 +104,3 @@ class PipelineCancelledException(Exception):
def __init__(self, message: str = "User cancelled"): def __init__(self, message: str = "User cancelled"):
super().__init__(message) super().__init__(message)
self.message = message self.message = message
class ChunkTokenLimitExceededError(ValueError):
"""Raised when a chunk exceeds the configured token limit."""
def __init__(
self,
chunk_tokens: int,
chunk_token_limit: int,
chunk_preview: str | None = None,
) -> None:
preview = chunk_preview.strip() if chunk_preview else None
truncated_preview = preview[:80] if preview else None
preview_note = f" Preview: '{truncated_preview}'" if truncated_preview else ""
message = (
f"Chunk token length {chunk_tokens} exceeds chunk_token_size {chunk_token_limit}."
f"{preview_note}"
)
super().__init__(message)
self.chunk_tokens = chunk_tokens
self.chunk_token_limit = chunk_token_limit
self.chunk_preview = truncated_preview
class QdrantMigrationError(Exception):
"""Raised when Qdrant data migration from legacy collections fails."""
def __init__(self, message: str):
super().__init__(message)
self.message = message

View file

@ -3,7 +3,6 @@ from __future__ import annotations
import traceback import traceback
import asyncio import asyncio
import configparser import configparser
import inspect
import os import os
import time import time
import warnings import warnings
@ -13,7 +12,6 @@ from functools import partial
from typing import ( from typing import (
Any, Any,
AsyncIterator, AsyncIterator,
Awaitable,
Callable, Callable,
Iterator, Iterator,
cast, cast,
@ -22,7 +20,6 @@ from typing import (
Optional, Optional,
List, List,
Dict, Dict,
Union,
) )
from lightrag.prompt import PROMPTS from lightrag.prompt import PROMPTS
from lightrag.exceptions import PipelineCancelledException from lightrag.exceptions import PipelineCancelledException
@ -64,10 +61,9 @@ from lightrag.kg import (
from lightrag.kg.shared_storage import ( from lightrag.kg.shared_storage import (
get_namespace_data, get_namespace_data,
get_pipeline_status_lock,
get_graph_db_lock,
get_data_init_lock, get_data_init_lock,
get_default_workspace,
set_default_workspace,
get_namespace_lock,
) )
from lightrag.base import ( from lightrag.base import (
@ -91,7 +87,7 @@ from lightrag.operate import (
merge_nodes_and_edges, merge_nodes_and_edges,
kg_query, kg_query,
naive_query, naive_query,
rebuild_knowledge_from_chunks, _rebuild_knowledge_from_chunks,
) )
from lightrag.constants import GRAPH_FIELD_SEP from lightrag.constants import GRAPH_FIELD_SEP
from lightrag.utils import ( from lightrag.utils import (
@ -247,28 +243,23 @@ class LightRAG:
int, int,
int, int,
], ],
Union[List[Dict[str, Any]], Awaitable[List[Dict[str, Any]]]], List[Dict[str, Any]],
] = field(default_factory=lambda: chunking_by_token_size) ] = field(default_factory=lambda: chunking_by_token_size)
""" """
Custom chunking function for splitting text into chunks before processing. Custom chunking function for splitting text into chunks before processing.
The function can be either synchronous or asynchronous.
The function should take the following parameters: The function should take the following parameters:
- `tokenizer`: A Tokenizer instance to use for tokenization. - `tokenizer`: A Tokenizer instance to use for tokenization.
- `content`: The text to be split into chunks. - `content`: The text to be split into chunks.
- `split_by_character`: The character to split the text on. If None, the text is split into chunks of `chunk_token_size` tokens. - `split_by_character`: The character to split the text on. If None, the text is split into chunks of `chunk_token_size` tokens.
- `split_by_character_only`: If True, the text is split only on the specified character. - `split_by_character_only`: If True, the text is split only on the specified character.
- `chunk_overlap_token_size`: The number of overlapping tokens between consecutive chunks.
- `chunk_token_size`: The maximum number of tokens per chunk. - `chunk_token_size`: The maximum number of tokens per chunk.
- `chunk_overlap_token_size`: The number of overlapping tokens between consecutive chunks.
The function should return a list of dictionaries, where each dictionary contains the following keys:
The function should return a list of dictionaries (or an awaitable that resolves to a list), - `tokens`: The number of tokens in the chunk.
where each dictionary contains the following keys: - `content`: The text content of the chunk.
- `tokens` (int): The number of tokens in the chunk.
- `content` (str): The text content of the chunk.
- `chunk_order_index` (int): Zero-based index indicating the chunk's order in the document.
Defaults to `chunking_by_token_size` if not specified. Defaults to `chunking_by_token_size` if not specified.
""" """
@ -279,9 +270,6 @@ class LightRAG:
embedding_func: EmbeddingFunc | None = field(default=None) embedding_func: EmbeddingFunc | None = field(default=None)
"""Function for computing text embeddings. Must be set before use.""" """Function for computing text embeddings. Must be set before use."""
embedding_token_limit: int | None = field(default=None, init=False)
"""Token limit for embedding model. Set automatically from embedding_func.max_token_size in __post_init__."""
embedding_batch_num: int = field(default=int(os.getenv("EMBEDDING_BATCH_NUM", 10))) embedding_batch_num: int = field(default=int(os.getenv("EMBEDDING_BATCH_NUM", 10)))
"""Batch size for embedding computations.""" """Batch size for embedding computations."""
@ -525,16 +513,6 @@ class LightRAG:
logger.debug(f"LightRAG init with param:\n {_print_config}\n") logger.debug(f"LightRAG init with param:\n {_print_config}\n")
# Init Embedding # Init Embedding
# Step 1: Capture max_token_size before applying decorator (decorator strips dataclass attributes)
embedding_max_token_size = None
if self.embedding_func and hasattr(self.embedding_func, "max_token_size"):
embedding_max_token_size = self.embedding_func.max_token_size
logger.debug(
f"Captured embedding max_token_size: {embedding_max_token_size}"
)
self.embedding_token_limit = embedding_max_token_size
# Step 2: Apply priority wrapper decorator
self.embedding_func = priority_limit_async_func_call( self.embedding_func = priority_limit_async_func_call(
self.embedding_func_max_async, self.embedding_func_max_async,
llm_timeout=self.default_embedding_timeout, llm_timeout=self.default_embedding_timeout,
@ -661,22 +639,6 @@ class LightRAG:
async def initialize_storages(self): async def initialize_storages(self):
"""Storage initialization must be called one by one to prevent deadlock""" """Storage initialization must be called one by one to prevent deadlock"""
if self._storages_status == StoragesStatus.CREATED: if self._storages_status == StoragesStatus.CREATED:
# Set the first initialized workspace will set the default workspace
# Allows namespace operation without specifying workspace for backward compatibility
default_workspace = get_default_workspace()
if default_workspace is None:
set_default_workspace(self.workspace)
elif default_workspace != self.workspace:
logger.info(
f"Creating LightRAG instance with workspace='{self.workspace}' "
f"while default workspace is set to '{default_workspace}'"
)
# Auto-initialize pipeline_status for this workspace
from lightrag.kg.shared_storage import initialize_pipeline_status
await initialize_pipeline_status(workspace=self.workspace)
for storage in ( for storage in (
self.full_docs, self.full_docs,
self.text_chunks, self.text_chunks,
@ -748,7 +710,7 @@ class LightRAG:
async def check_and_migrate_data(self): async def check_and_migrate_data(self):
"""Check if data migration is needed and perform migration if necessary""" """Check if data migration is needed and perform migration if necessary"""
async with get_data_init_lock(): async with get_data_init_lock(enable_logging=True):
try: try:
# Check if migration is needed: # Check if migration is needed:
# 1. chunk_entity_relation_graph has entities and relations (count > 0) # 1. chunk_entity_relation_graph has entities and relations (count > 0)
@ -1611,12 +1573,8 @@ class LightRAG:
""" """
# Get pipeline status shared data and lock # Get pipeline status shared data and lock
pipeline_status = await get_namespace_data( pipeline_status = await get_namespace_data("pipeline_status")
"pipeline_status", workspace=self.workspace pipeline_status_lock = get_pipeline_status_lock()
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=self.workspace
)
# Check if another process is already processing the queue # Check if another process is already processing the queue
async with pipeline_status_lock: async with pipeline_status_lock:
@ -1741,16 +1699,10 @@ class LightRAG:
semaphore: asyncio.Semaphore, semaphore: asyncio.Semaphore,
) -> None: ) -> None:
"""Process single document""" """Process single document"""
# Initialize variables at the start to prevent UnboundLocalError in error handling
file_path = "unknown_source"
current_file_number = 0
file_extraction_stage_ok = False file_extraction_stage_ok = False
processing_start_time = int(time.time())
first_stage_tasks = []
entity_relation_task = None
async with semaphore: async with semaphore:
nonlocal processed_count nonlocal processed_count
current_file_number = 0
# Initialize to prevent UnboundLocalError in error handling # Initialize to prevent UnboundLocalError in error handling
first_stage_tasks = [] first_stage_tasks = []
entity_relation_task = None entity_relation_task = None
@ -1798,28 +1750,7 @@ class LightRAG:
) )
content = content_data["content"] content = content_data["content"]
# Call chunking function, supporting both sync and async implementations # Generate chunks from document
chunking_result = self.chunking_func(
self.tokenizer,
content,
split_by_character,
split_by_character_only,
self.chunk_overlap_token_size,
self.chunk_token_size,
)
# If result is awaitable, await to get actual result
if inspect.isawaitable(chunking_result):
chunking_result = await chunking_result
# Validate return type
if not isinstance(chunking_result, (list, tuple)):
raise TypeError(
f"chunking_func must return a list or tuple of dicts, "
f"got {type(chunking_result)}"
)
# Build chunks dictionary
chunks: dict[str, Any] = { chunks: dict[str, Any] = {
compute_mdhash_id(dp["content"], prefix="chunk-"): { compute_mdhash_id(dp["content"], prefix="chunk-"): {
**dp, **dp,
@ -1827,7 +1758,14 @@ class LightRAG:
"file_path": file_path, # Add file path to each chunk "file_path": file_path, # Add file path to each chunk
"llm_cache_list": [], # Initialize empty LLM cache list for each chunk "llm_cache_list": [], # Initialize empty LLM cache list for each chunk
} }
for dp in chunking_result for dp in self.chunking_func(
self.tokenizer,
content,
split_by_character,
split_by_character_only,
self.chunk_overlap_token_size,
self.chunk_token_size,
)
} }
if not chunks: if not chunks:
@ -1891,33 +1829,20 @@ class LightRAG:
chunks, pipeline_status, pipeline_status_lock chunks, pipeline_status, pipeline_status_lock
) )
) )
chunk_results = await entity_relation_task await entity_relation_task
file_extraction_stage_ok = True file_extraction_stage_ok = True
except Exception as e: except Exception as e:
# Check if this is a user cancellation # Log error and update pipeline status
if isinstance(e, PipelineCancelledException): logger.error(traceback.format_exc())
# User cancellation - log brief message only, no traceback error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}"
error_msg = f"User cancelled {current_file_number}/{total_files}: {file_path}" logger.error(error_msg)
logger.warning(error_msg) async with pipeline_status_lock:
async with pipeline_status_lock: pipeline_status["latest_message"] = error_msg
pipeline_status["latest_message"] = error_msg pipeline_status["history_messages"].append(
pipeline_status["history_messages"].append( traceback.format_exc()
error_msg )
) pipeline_status["history_messages"].append(error_msg)
else:
# Other exceptions - log with traceback
logger.error(traceback.format_exc())
error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(
error_msg
)
# Cancel tasks that are not yet completed # Cancel tasks that are not yet completed
all_tasks = first_stage_tasks + ( all_tasks = first_stage_tasks + (
@ -1927,14 +1852,9 @@ class LightRAG:
if task and not task.done(): if task and not task.done():
task.cancel() task.cancel()
# Persistent llm cache with error handling # Persistent llm cache
if self.llm_response_cache: if self.llm_response_cache:
try: await self.llm_response_cache.index_done_callback()
await self.llm_response_cache.index_done_callback()
except Exception as persist_error:
logger.error(
f"Failed to persist LLM cache: {persist_error}"
)
# Record processing end time for failed case # Record processing end time for failed case
processing_end_time = int(time.time()) processing_end_time = int(time.time())
@ -1973,7 +1893,8 @@ class LightRAG:
"User cancelled" "User cancelled"
) )
# Use chunk_results from entity_relation_task # Get chunk_results from entity_relation_task
chunk_results = await entity_relation_task
await merge_nodes_and_edges( await merge_nodes_and_edges(
chunk_results=chunk_results, # result collected from entity_relation_task chunk_results=chunk_results, # result collected from entity_relation_task
knowledge_graph_inst=self.chunk_entity_relation_graph, knowledge_graph_inst=self.chunk_entity_relation_graph,
@ -2030,38 +1951,22 @@ class LightRAG:
) )
except Exception as e: except Exception as e:
# Check if this is a user cancellation # Log error and update pipeline status
if isinstance(e, PipelineCancelledException): logger.error(traceback.format_exc())
# User cancellation - log brief message only, no traceback error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}"
error_msg = f"User cancelled during merge {current_file_number}/{total_files}: {file_path}" logger.error(error_msg)
logger.warning(error_msg) async with pipeline_status_lock:
async with pipeline_status_lock: pipeline_status["latest_message"] = error_msg
pipeline_status["latest_message"] = error_msg pipeline_status["history_messages"].append(
pipeline_status["history_messages"].append( traceback.format_exc()
error_msg )
) pipeline_status["history_messages"].append(
else: error_msg
# Other exceptions - log with traceback )
logger.error(traceback.format_exc())
error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(
error_msg
)
# Persistent llm cache with error handling # Persistent llm cache
if self.llm_response_cache: if self.llm_response_cache:
try: await self.llm_response_cache.index_done_callback()
await self.llm_response_cache.index_done_callback()
except Exception as persist_error:
logger.error(
f"Failed to persist LLM cache: {persist_error}"
)
# Record processing end time for failed case # Record processing end time for failed case
processing_end_time = int(time.time()) processing_end_time = int(time.time())
@ -2950,26 +2855,6 @@ class LightRAG:
data across different storage layers are removed or rebuiled. If entities or relationships data across different storage layers are removed or rebuiled. If entities or relationships
are partially affected, they will be rebuilded using LLM cached from remaining documents. are partially affected, they will be rebuilded using LLM cached from remaining documents.
**Concurrency Control Design:**
This function implements a pipeline-based concurrency control to prevent data corruption:
1. **Single Document Deletion** (when WE acquire pipeline):
- Sets job_name to "Single document deletion" (NOT starting with "deleting")
- Prevents other adelete_by_doc_id calls from running concurrently
- Ensures exclusive access to graph operations for this deletion
2. **Batch Document Deletion** (when background_delete_documents acquires pipeline):
- Sets job_name to "Deleting {N} Documents" (starts with "deleting")
- Allows multiple adelete_by_doc_id calls to join the deletion queue
- Each call validates the job name to ensure it's part of a deletion operation
The validation logic `if not job_name.startswith("deleting") or "document" not in job_name`
ensures that:
- adelete_by_doc_id can only run when pipeline is idle OR during batch deletion
- Prevents concurrent single deletions that could cause race conditions
- Rejects operations when pipeline is busy with non-deletion tasks
Args: Args:
doc_id (str): The unique identifier of the document to be deleted. doc_id (str): The unique identifier of the document to be deleted.
delete_llm_cache (bool): Whether to delete cached LLM extraction results delete_llm_cache (bool): Whether to delete cached LLM extraction results
@ -2977,62 +2862,20 @@ class LightRAG:
Returns: Returns:
DeletionResult: An object containing the outcome of the deletion process. DeletionResult: An object containing the outcome of the deletion process.
- `status` (str): "success", "not_found", "not_allowed", or "failure". - `status` (str): "success", "not_found", or "failure".
- `doc_id` (str): The ID of the document attempted to be deleted. - `doc_id` (str): The ID of the document attempted to be deleted.
- `message` (str): A summary of the operation's result. - `message` (str): A summary of the operation's result.
- `status_code` (int): HTTP status code (e.g., 200, 404, 403, 500). - `status_code` (int): HTTP status code (e.g., 200, 404, 500).
- `file_path` (str | None): The file path of the deleted document, if available. - `file_path` (str | None): The file path of the deleted document, if available.
""" """
# Get pipeline status shared data and lock for validation
pipeline_status = await get_namespace_data(
"pipeline_status", workspace=self.workspace
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=self.workspace
)
# Track whether WE acquired the pipeline
we_acquired_pipeline = False
# Check and acquire pipeline if needed
async with pipeline_status_lock:
if not pipeline_status.get("busy", False):
# Pipeline is idle - WE acquire it for this deletion
we_acquired_pipeline = True
pipeline_status.update(
{
"busy": True,
"job_name": "Single document deletion",
"job_start": datetime.now(timezone.utc).isoformat(),
"docs": 1,
"batchs": 1,
"cur_batch": 0,
"request_pending": False,
"cancellation_requested": False,
"latest_message": f"Starting deletion for document: {doc_id}",
}
)
# Initialize history messages
pipeline_status["history_messages"][:] = [
f"Starting deletion for document: {doc_id}"
]
else:
# Pipeline already busy - verify it's a deletion job
job_name = pipeline_status.get("job_name", "").lower()
if not job_name.startswith("deleting") or "document" not in job_name:
return DeletionResult(
status="not_allowed",
doc_id=doc_id,
message=f"Deletion not allowed: current job '{pipeline_status.get('job_name')}' is not a document deletion job",
status_code=403,
file_path=None,
)
# Pipeline is busy with deletion - proceed without acquiring
deletion_operations_started = False deletion_operations_started = False
original_exception = None original_exception = None
doc_llm_cache_ids: list[str] = [] doc_llm_cache_ids: list[str] = []
# Get pipeline status shared data and lock for status updates
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
async with pipeline_status_lock: async with pipeline_status_lock:
log_message = f"Starting deletion process for document {doc_id}" log_message = f"Starting deletion process for document {doc_id}"
logger.info(log_message) logger.info(log_message)
@ -3264,9 +3107,6 @@ class LightRAG:
] ]
if not existing_sources: if not existing_sources:
# No chunk references means this entity should be deleted
entities_to_delete.add(node_label)
entity_chunk_updates[node_label] = []
continue continue
remaining_sources = subtract_source_ids(existing_sources, chunk_ids) remaining_sources = subtract_source_ids(existing_sources, chunk_ids)
@ -3288,7 +3128,6 @@ class LightRAG:
# Process relationships # Process relationships
for edge_data in affected_edges: for edge_data in affected_edges:
# source target is not in normalize order in graph db property
src = edge_data.get("source") src = edge_data.get("source")
tgt = edge_data.get("target") tgt = edge_data.get("target")
@ -3325,9 +3164,6 @@ class LightRAG:
] ]
if not existing_sources: if not existing_sources:
# No chunk references means this relationship should be deleted
relationships_to_delete.add(edge_tuple)
relation_chunk_updates[edge_tuple] = []
continue continue
remaining_sources = subtract_source_ids(existing_sources, chunk_ids) remaining_sources = subtract_source_ids(existing_sources, chunk_ids)
@ -3353,31 +3189,38 @@ class LightRAG:
if entity_chunk_updates and self.entity_chunks: if entity_chunk_updates and self.entity_chunks:
entity_upsert_payload = {} entity_upsert_payload = {}
entity_delete_ids: set[str] = set()
for entity_name, remaining in entity_chunk_updates.items(): for entity_name, remaining in entity_chunk_updates.items():
if not remaining: if not remaining:
# Empty entities are deleted alongside graph nodes later entity_delete_ids.add(entity_name)
continue else:
entity_upsert_payload[entity_name] = { entity_upsert_payload[entity_name] = {
"chunk_ids": remaining, "chunk_ids": remaining,
"count": len(remaining), "count": len(remaining),
"updated_at": current_time, "updated_at": current_time,
} }
if entity_delete_ids:
await self.entity_chunks.delete(list(entity_delete_ids))
if entity_upsert_payload: if entity_upsert_payload:
await self.entity_chunks.upsert(entity_upsert_payload) await self.entity_chunks.upsert(entity_upsert_payload)
if relation_chunk_updates and self.relation_chunks: if relation_chunk_updates and self.relation_chunks:
relation_upsert_payload = {} relation_upsert_payload = {}
relation_delete_ids: set[str] = set()
for edge_tuple, remaining in relation_chunk_updates.items(): for edge_tuple, remaining in relation_chunk_updates.items():
if not remaining:
# Empty relations are deleted alongside graph edges later
continue
storage_key = make_relation_chunk_key(*edge_tuple) storage_key = make_relation_chunk_key(*edge_tuple)
relation_upsert_payload[storage_key] = { if not remaining:
"chunk_ids": remaining, relation_delete_ids.add(storage_key)
"count": len(remaining), else:
"updated_at": current_time, relation_upsert_payload[storage_key] = {
} "chunk_ids": remaining,
"count": len(remaining),
"updated_at": current_time,
}
if relation_delete_ids:
await self.relation_chunks.delete(list(relation_delete_ids))
if relation_upsert_payload: if relation_upsert_payload:
await self.relation_chunks.upsert(relation_upsert_payload) await self.relation_chunks.upsert(relation_upsert_payload)
@ -3385,111 +3228,56 @@ class LightRAG:
logger.error(f"Failed to process graph analysis results: {e}") logger.error(f"Failed to process graph analysis results: {e}")
raise Exception(f"Failed to process graph dependencies: {e}") from e raise Exception(f"Failed to process graph dependencies: {e}") from e
# Data integrity is ensured by allowing only one process to hold pipeline at a timeno graph db lock is needed anymore) # Use graph database lock to prevent dirty read
graph_db_lock = get_graph_db_lock(enable_logging=False)
async with graph_db_lock:
# 5. Delete chunks from storage
if chunk_ids:
try:
await self.chunks_vdb.delete(chunk_ids)
await self.text_chunks.delete(chunk_ids)
# 5. Delete chunks from storage async with pipeline_status_lock:
if chunk_ids: log_message = f"Successfully deleted {len(chunk_ids)} chunks from storage"
try: logger.info(log_message)
await self.chunks_vdb.delete(chunk_ids) pipeline_status["latest_message"] = log_message
await self.text_chunks.delete(chunk_ids) pipeline_status["history_messages"].append(log_message)
async with pipeline_status_lock: except Exception as e:
log_message = ( logger.error(f"Failed to delete chunks: {e}")
f"Successfully deleted {len(chunk_ids)} chunks from storage" raise Exception(f"Failed to delete document chunks: {e}") from e
)
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e: # 6. Delete entities that have no remaining sources
logger.error(f"Failed to delete chunks: {e}") if entities_to_delete:
raise Exception(f"Failed to delete document chunks: {e}") from e try:
# Delete from vector database
# 6. Delete relationships that have no remaining sources entity_vdb_ids = [
if relationships_to_delete: compute_mdhash_id(entity, prefix="ent-")
try: for entity in entities_to_delete
# Delete from relation vdb
rel_ids_to_delete = []
for src, tgt in relationships_to_delete:
rel_ids_to_delete.extend(
[
compute_mdhash_id(src + tgt, prefix="rel-"),
compute_mdhash_id(tgt + src, prefix="rel-"),
]
)
await self.relationships_vdb.delete(rel_ids_to_delete)
# Delete from graph
await self.chunk_entity_relation_graph.remove_edges(
list(relationships_to_delete)
)
# Delete from relation_chunks storage
if self.relation_chunks:
relation_storage_keys = [
make_relation_chunk_key(src, tgt)
for src, tgt in relationships_to_delete
] ]
await self.relation_chunks.delete(relation_storage_keys) await self.entities_vdb.delete(entity_vdb_ids)
async with pipeline_status_lock: # Delete from graph
log_message = f"Successfully deleted {len(relationships_to_delete)} relations" await self.chunk_entity_relation_graph.remove_nodes(
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
logger.error(f"Failed to delete relationships: {e}")
raise Exception(f"Failed to delete relationships: {e}") from e
# 7. Delete entities that have no remaining sources
if entities_to_delete:
try:
# Batch get all edges for entities to avoid N+1 query problem
nodes_edges_dict = (
await self.chunk_entity_relation_graph.get_nodes_edges_batch(
list(entities_to_delete) list(entities_to_delete)
) )
)
# Debug: Check and log all edges before deleting nodes async with pipeline_status_lock:
edges_to_delete = set() log_message = f"Successfully deleted {len(entities_to_delete)} entities"
edges_still_exist = 0 logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
for entity, edges in nodes_edges_dict.items(): except Exception as e:
if edges: logger.error(f"Failed to delete entities: {e}")
for src, tgt in edges: raise Exception(f"Failed to delete entities: {e}") from e
# Normalize edge representation (sorted for consistency)
edge_tuple = tuple(sorted((src, tgt)))
edges_to_delete.add(edge_tuple)
if ( # 7. Delete relationships that have no remaining sources
src in entities_to_delete if relationships_to_delete:
and tgt in entities_to_delete try:
): # Delete from vector database
logger.warning(
f"Edge still exists: {src} <-> {tgt}"
)
elif src in entities_to_delete:
logger.warning(
f"Edge still exists: {src} --> {tgt}"
)
else:
logger.warning(
f"Edge still exists: {src} <-- {tgt}"
)
edges_still_exist += 1
if edges_still_exist:
logger.warning(
f"⚠️ {edges_still_exist} entities still has edges before deletion"
)
# Clean residual edges from VDB and storage before deleting nodes
if edges_to_delete:
# Delete from relationships_vdb
rel_ids_to_delete = [] rel_ids_to_delete = []
for src, tgt in edges_to_delete: for src, tgt in relationships_to_delete:
rel_ids_to_delete.extend( rel_ids_to_delete.extend(
[ [
compute_mdhash_id(src + tgt, prefix="rel-"), compute_mdhash_id(src + tgt, prefix="rel-"),
@ -3498,53 +3286,28 @@ class LightRAG:
) )
await self.relationships_vdb.delete(rel_ids_to_delete) await self.relationships_vdb.delete(rel_ids_to_delete)
# Delete from relation_chunks storage # Delete from graph
if self.relation_chunks: await self.chunk_entity_relation_graph.remove_edges(
relation_storage_keys = [ list(relationships_to_delete)
make_relation_chunk_key(src, tgt)
for src, tgt in edges_to_delete
]
await self.relation_chunks.delete(relation_storage_keys)
logger.info(
f"Cleaned {len(edges_to_delete)} residual edges from VDB and chunk-tracking storage"
) )
# Delete from graph (edges will be auto-deleted with nodes) async with pipeline_status_lock:
await self.chunk_entity_relation_graph.remove_nodes( log_message = f"Successfully deleted {len(relationships_to_delete)} relations"
list(entities_to_delete) logger.info(log_message)
) pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
# Delete from vector vdb except Exception as e:
entity_vdb_ids = [ logger.error(f"Failed to delete relationships: {e}")
compute_mdhash_id(entity, prefix="ent-") raise Exception(f"Failed to delete relationships: {e}") from e
for entity in entities_to_delete
]
await self.entities_vdb.delete(entity_vdb_ids)
# Delete from entity_chunks storage # Persist changes to graph database before releasing graph database lock
if self.entity_chunks: await self._insert_done()
await self.entity_chunks.delete(list(entities_to_delete))
async with pipeline_status_lock:
log_message = (
f"Successfully deleted {len(entities_to_delete)} entities"
)
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
logger.error(f"Failed to delete entities: {e}")
raise Exception(f"Failed to delete entities: {e}") from e
# Persist changes to graph database before entity and relationship rebuild
await self._insert_done()
# 8. Rebuild entities and relationships from remaining chunks # 8. Rebuild entities and relationships from remaining chunks
if entities_to_rebuild or relationships_to_rebuild: if entities_to_rebuild or relationships_to_rebuild:
try: try:
await rebuild_knowledge_from_chunks( await _rebuild_knowledge_from_chunks(
entities_to_rebuild=entities_to_rebuild, entities_to_rebuild=entities_to_rebuild,
relationships_to_rebuild=relationships_to_rebuild, relationships_to_rebuild=relationships_to_rebuild,
knowledge_graph_inst=self.chunk_entity_relation_graph, knowledge_graph_inst=self.chunk_entity_relation_graph,
@ -3645,18 +3408,6 @@ class LightRAG:
f"No deletion operations were started for document {doc_id}, skipping persistence" f"No deletion operations were started for document {doc_id}, skipping persistence"
) )
# Release pipeline only if WE acquired it
if we_acquired_pipeline:
async with pipeline_status_lock:
pipeline_status["busy"] = False
pipeline_status["cancellation_requested"] = False
completion_msg = (
f"Deletion process completed for document: {doc_id}"
)
pipeline_status["latest_message"] = completion_msg
pipeline_status["history_messages"].append(completion_msg)
logger.info(completion_msg)
async def adelete_by_entity(self, entity_name: str) -> DeletionResult: async def adelete_by_entity(self, entity_name: str) -> DeletionResult:
"""Asynchronously delete an entity and all its relationships. """Asynchronously delete an entity and all its relationships.
@ -3774,22 +3525,16 @@ class LightRAG:
) )
async def aedit_entity( async def aedit_entity(
self, self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True
entity_name: str,
updated_data: dict[str, str],
allow_rename: bool = True,
allow_merge: bool = False,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Asynchronously edit entity information. """Asynchronously edit entity information.
Updates entity information in the knowledge graph and re-embeds the entity in the vector database. Updates entity information in the knowledge graph and re-embeds the entity in the vector database.
Also synchronizes entity_chunks_storage and relation_chunks_storage to track chunk references.
Args: Args:
entity_name: Name of the entity to edit entity_name: Name of the entity to edit
updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"} updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"}
allow_rename: Whether to allow entity renaming, defaults to True allow_rename: Whether to allow entity renaming, defaults to True
allow_merge: Whether to merge into an existing entity when renaming to an existing name
Returns: Returns:
Dictionary containing updated entity information Dictionary containing updated entity information
@ -3803,21 +3548,14 @@ class LightRAG:
entity_name, entity_name,
updated_data, updated_data,
allow_rename, allow_rename,
allow_merge,
self.entity_chunks,
self.relation_chunks,
) )
def edit_entity( def edit_entity(
self, self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True
entity_name: str,
updated_data: dict[str, str],
allow_rename: bool = True,
allow_merge: bool = False,
) -> dict[str, Any]: ) -> dict[str, Any]:
loop = always_get_an_event_loop() loop = always_get_an_event_loop()
return loop.run_until_complete( return loop.run_until_complete(
self.aedit_entity(entity_name, updated_data, allow_rename, allow_merge) self.aedit_entity(entity_name, updated_data, allow_rename)
) )
async def aedit_relation( async def aedit_relation(
@ -3826,7 +3564,6 @@ class LightRAG:
"""Asynchronously edit relation information. """Asynchronously edit relation information.
Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database. Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database.
Also synchronizes the relation_chunks_storage to track which chunks reference this relation.
Args: Args:
source_entity: Name of the source entity source_entity: Name of the source entity
@ -3845,7 +3582,6 @@ class LightRAG:
source_entity, source_entity,
target_entity, target_entity,
updated_data, updated_data,
self.relation_chunks,
) )
def edit_relation( def edit_relation(
@ -3957,8 +3693,6 @@ class LightRAG:
target_entity, target_entity,
merge_strategy, merge_strategy,
target_entity_data, target_entity_data,
self.entity_chunks,
self.relation_chunks,
) )
def merge_entities( def merge_entities(

View file

@ -7,6 +7,7 @@ import json_repair
from typing import Any, AsyncIterator, overload, Literal from typing import Any, AsyncIterator, overload, Literal
from collections import Counter, defaultdict from collections import Counter, defaultdict
from lightrag.exceptions import PipelineCancelledException
from lightrag.utils import ( from lightrag.utils import (
logger, logger,
compute_mdhash_id, compute_mdhash_id,
@ -2204,6 +2205,12 @@ async def merge_nodes_and_edges(
file_path: File path for logging file_path: File path for logging
""" """
# Check for cancellation at the start of merge
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException("User cancelled during merge phase")
# Collect all nodes and edges from all chunks # Collect all nodes and edges from all chunks
all_nodes = defaultdict(list) all_nodes = defaultdict(list)
all_edges = defaultdict(list) all_edges = defaultdict(list)
@ -2240,6 +2247,14 @@ async def merge_nodes_and_edges(
async def _locked_process_entity_name(entity_name, entities): async def _locked_process_entity_name(entity_name, entities):
async with semaphore: async with semaphore:
# Check for cancellation before processing entity
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException(
"User cancelled during entity merge"
)
workspace = global_config.get("workspace", "") workspace = global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
async with get_storage_keyed_lock( async with get_storage_keyed_lock(
@ -2339,6 +2354,14 @@ async def merge_nodes_and_edges(
async def _locked_process_edges(edge_key, edges): async def _locked_process_edges(edge_key, edges):
async with semaphore: async with semaphore:
# Check for cancellation before processing edges
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException(
"User cancelled during relation merge"
)
workspace = global_config.get("workspace", "") workspace = global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
sorted_edge_key = sorted([edge_key[0], edge_key[1]]) sorted_edge_key = sorted([edge_key[0], edge_key[1]])
@ -2525,6 +2548,14 @@ async def extract_entities(
llm_response_cache: BaseKVStorage | None = None, llm_response_cache: BaseKVStorage | None = None,
text_chunks_storage: BaseKVStorage | None = None, text_chunks_storage: BaseKVStorage | None = None,
) -> list: ) -> list:
# Check for cancellation at the start of entity extraction
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException(
"User cancelled during entity extraction"
)
use_llm_func: callable = global_config["llm_model_func"] use_llm_func: callable = global_config["llm_model_func"]
entity_extract_max_gleaning = global_config["entity_extract_max_gleaning"] entity_extract_max_gleaning = global_config["entity_extract_max_gleaning"]
@ -2692,6 +2723,14 @@ async def extract_entities(
async def _process_with_semaphore(chunk): async def _process_with_semaphore(chunk):
async with semaphore: async with semaphore:
# Check for cancellation before processing chunk
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException(
"User cancelled during chunk processing"
)
try: try:
return await _process_single_content(chunk) return await _process_single_content(chunk)
except Exception as e: except Exception as e: