Merge pull request #2258 from danielaskdd/pipeline-cancelllation

Feat: Add Pipeline Cancellation Feature with Enhanced Reliability
This commit is contained in:
Daniel.y 2025-10-25 04:26:39 +08:00 committed by GitHub
commit e2ec1cdcd4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 735 additions and 219 deletions

View file

@ -1,5 +1,5 @@
from .lightrag import LightRAG as LightRAG, QueryParam as QueryParam
__version__ = "1.4.9.4"
__version__ = "1.4.9.5"
__author__ = "Zirui Guo"
__url__ = "https://github.com/HKUDS/LightRAG"

View file

@ -1 +1 @@
__api_version__ = "0244"
__api_version__ = "0245"

View file

@ -161,6 +161,28 @@ class ReprocessResponse(BaseModel):
}
class CancelPipelineResponse(BaseModel):
"""Response model for pipeline cancellation operation
Attributes:
status: Status of the cancellation request
message: Message describing the operation result
"""
status: Literal["cancellation_requested", "not_busy"] = Field(
description="Status of the cancellation request"
)
message: str = Field(description="Human-readable message describing the operation")
class Config:
json_schema_extra = {
"example": {
"status": "cancellation_requested",
"message": "Pipeline cancellation has been requested. Documents will be marked as FAILED.",
}
}
class InsertTextRequest(BaseModel):
"""Request model for inserting a single text document
@ -1534,7 +1556,19 @@ async def background_delete_documents(
try:
# Loop through each document ID and delete them one by one
for i, doc_id in enumerate(doc_ids, 1):
# Check for cancellation at the start of each document deletion
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
cancel_msg = f"Deletion cancelled by user at document {i}/{total_docs}. {len(successful_deletions)} deleted, {total_docs - i + 1} remaining."
logger.info(cancel_msg)
pipeline_status["latest_message"] = cancel_msg
pipeline_status["history_messages"].append(cancel_msg)
# Add remaining documents to failed list with cancellation reason
failed_deletions.extend(
doc_ids[i - 1 :]
) # i-1 because enumerate starts at 1
break # Exit the loop, remaining documents unchanged
start_msg = f"Deleting document {i}/{total_docs}: {doc_id}"
logger.info(start_msg)
pipeline_status["cur_batch"] = i
@ -1697,6 +1731,10 @@ async def background_delete_documents(
# Final summary and check for pending requests
async with pipeline_status_lock:
pipeline_status["busy"] = False
pipeline_status["pending_requests"] = False # Reset pending requests flag
pipeline_status["cancellation_requested"] = (
False # Always reset cancellation flag
)
completion_msg = f"Deletion completed: {len(successful_deletions)} successful, {len(failed_deletions)} failed"
pipeline_status["latest_message"] = completion_msg
pipeline_status["history_messages"].append(completion_msg)
@ -2230,7 +2268,7 @@ def create_document_routes(
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
# TODO: Deprecated
# TODO: Deprecated, use /documents/paginated instead
@router.get(
"", response_model=DocsStatusesResponse, dependencies=[Depends(combined_auth)]
)
@ -2754,4 +2792,63 @@ def create_document_routes(
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@router.post(
"/cancel_pipeline",
response_model=CancelPipelineResponse,
dependencies=[Depends(combined_auth)],
)
async def cancel_pipeline():
"""
Request cancellation of the currently running pipeline.
This endpoint sets a cancellation flag in the pipeline status. The pipeline will:
1. Check this flag at key processing points
2. Stop processing new documents
3. Cancel all running document processing tasks
4. Mark all PROCESSING documents as FAILED with reason "User cancelled"
The cancellation is graceful and ensures data consistency. Documents that have
completed processing will remain in PROCESSED status.
Returns:
CancelPipelineResponse: Response with status and message
- status="cancellation_requested": Cancellation flag has been set
- status="not_busy": Pipeline is not currently running
Raises:
HTTPException: If an error occurs while setting cancellation flag (500).
"""
try:
from lightrag.kg.shared_storage import (
get_namespace_data,
get_pipeline_status_lock,
)
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
async with pipeline_status_lock:
if not pipeline_status.get("busy", False):
return CancelPipelineResponse(
status="not_busy",
message="Pipeline is not currently running. No cancellation needed.",
)
# Set cancellation flag
pipeline_status["cancellation_requested"] = True
cancel_msg = "Pipeline cancellation requested by user"
logger.info(cancel_msg)
pipeline_status["latest_message"] = cancel_msg
pipeline_status["history_messages"].append(cancel_msg)
return CancelPipelineResponse(
status="cancellation_requested",
message="Pipeline cancellation has been requested. Documents will be marked as FAILED.",
)
except Exception as e:
logger.error(f"Error requesting pipeline cancellation: {str(e)}")
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
return router

View file

@ -96,3 +96,11 @@ class PipelineNotInitializedError(KeyError):
f" await initialize_pipeline_status()"
)
super().__init__(msg)
class PipelineCancelledException(Exception):
"""Raised when pipeline processing is cancelled by user request."""
def __init__(self, message: str = "User cancelled"):
super().__init__(message)
self.message = message

View file

@ -12,7 +12,7 @@ from lightrag.exceptions import PipelineNotInitializedError
# Define a direct print function for critical logs that must be visible in all processes
def direct_log(message, enable_output: bool = False, level: str = "DEBUG"):
def direct_log(message, enable_output: bool = False, level: str = "INFO"):
"""
Log a message directly to stderr to ensure visibility in all processes,
including the Gunicorn master process.
@ -44,7 +44,6 @@ def direct_log(message, enable_output: bool = False, level: str = "DEBUG"):
}
message_level = level_mapping.get(level.upper(), logging.DEBUG)
# print(f"Diret_log: {level.upper()} {message_level} ? {current_level}", file=sys.stderr, flush=True)
if message_level >= current_level:
print(f"{level}: {message}", file=sys.stderr, flush=True)
@ -168,7 +167,7 @@ class UnifiedLock(Generic[T]):
direct_log(
f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}': {e}",
level="ERROR",
enable_output=self._enable_logging,
enable_output=True,
)
raise
@ -199,7 +198,7 @@ class UnifiedLock(Generic[T]):
direct_log(
f"== Lock == Process {self._pid}: Failed to release lock '{self._name}': {e}",
level="ERROR",
enable_output=self._enable_logging,
enable_output=True,
)
# If main lock release failed but async lock hasn't been released, try to release it
@ -223,7 +222,7 @@ class UnifiedLock(Generic[T]):
direct_log(
f"== Lock == Process {self._pid}: Failed to release async lock after main lock failure: {inner_e}",
level="ERROR",
enable_output=self._enable_logging,
enable_output=True,
)
raise
@ -247,7 +246,7 @@ class UnifiedLock(Generic[T]):
direct_log(
f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}' (sync): {e}",
level="ERROR",
enable_output=self._enable_logging,
enable_output=True,
)
raise
@ -269,7 +268,7 @@ class UnifiedLock(Generic[T]):
direct_log(
f"== Lock == Process {self._pid}: Failed to release lock '{self._name}' (sync): {e}",
level="ERROR",
enable_output=self._enable_logging,
enable_output=True,
)
raise
@ -401,7 +400,7 @@ def _perform_lock_cleanup(
direct_log(
f"== {lock_type} Lock == Cleanup failed: {e}",
level="ERROR",
enable_output=False,
enable_output=True,
)
return 0, earliest_cleanup_time, last_cleanup_time
@ -689,7 +688,7 @@ class KeyedUnifiedLock:
direct_log(
f"Error during multiprocess lock cleanup: {e}",
level="ERROR",
enable_output=False,
enable_output=True,
)
# 2. Cleanup async locks using generic function
@ -718,7 +717,7 @@ class KeyedUnifiedLock:
direct_log(
f"Error during async lock cleanup: {e}",
level="ERROR",
enable_output=False,
enable_output=True,
)
# 3. Get current status after cleanup
@ -772,7 +771,7 @@ class KeyedUnifiedLock:
direct_log(
f"Error getting keyed lock status: {e}",
level="ERROR",
enable_output=False,
enable_output=True,
)
return status
@ -797,32 +796,239 @@ class _KeyedLockContext:
if enable_logging is not None
else parent._default_enable_logging
)
self._ul: Optional[List["UnifiedLock"]] = None # set in __aenter__
self._ul: Optional[List[Dict[str, Any]]] = None # set in __aenter__
# ----- enter -----
async def __aenter__(self):
if self._ul is not None:
raise RuntimeError("KeyedUnifiedLock already acquired in current context")
# acquire locks for all keys in the namespace
self._ul = []
for key in self._keys:
lock = self._parent._get_lock_for_key(
self._namespace, key, enable_logging=self._enable_logging
)
await lock.__aenter__()
inc_debug_n_locks_acquired()
self._ul.append(lock)
return self
try:
# Acquire locks for all keys in the namespace
for key in self._keys:
lock = None
entry = None
try:
# 1. Get lock object (reference count is incremented here)
lock = self._parent._get_lock_for_key(
self._namespace, key, enable_logging=self._enable_logging
)
# 2. Immediately create and add entry to list (critical for rollback to work)
entry = {
"key": key,
"lock": lock,
"entered": False,
"debug_inc": False,
"ref_incremented": True, # Mark that reference count has been incremented
}
self._ul.append(
entry
) # Add immediately after _get_lock_for_key for rollback to work
# 3. Try to acquire the lock
# Use try-finally to ensure state is updated atomically
lock_acquired = False
try:
await lock.__aenter__()
lock_acquired = True # Lock successfully acquired
finally:
if lock_acquired:
entry["entered"] = True
inc_debug_n_locks_acquired()
entry["debug_inc"] = True
except asyncio.CancelledError:
# Lock acquisition was cancelled
# The finally block above ensures entry["entered"] is correct
direct_log(
f"Lock acquisition cancelled for key {key}",
level="WARNING",
enable_output=self._enable_logging,
)
raise
except Exception as e:
# Other exceptions, log and re-raise
direct_log(
f"Lock acquisition failed for key {key}: {e}",
level="ERROR",
enable_output=True,
)
raise
return self
except BaseException:
# Critical: if any exception occurs (including CancelledError) during lock acquisition,
# we must rollback all already acquired locks to prevent lock leaks
# Use shield to ensure rollback completes
await asyncio.shield(self._rollback_acquired_locks())
raise
async def _rollback_acquired_locks(self):
"""Rollback all acquired locks in case of exception during __aenter__"""
if not self._ul:
return
async def rollback_single_entry(entry):
"""Rollback a single lock acquisition"""
key = entry["key"]
lock = entry["lock"]
debug_inc = entry["debug_inc"]
entered = entry["entered"]
ref_incremented = entry.get(
"ref_incremented", True
) # Default to True for safety
errors = []
# 1. If lock was acquired, release it
if entered:
try:
await lock.__aexit__(None, None, None)
except Exception as e:
errors.append(("lock_exit", e))
direct_log(
f"Lock rollback error for key {key}: {e}",
level="ERROR",
enable_output=True,
)
# 2. Release reference count (if it was incremented)
if ref_incremented:
try:
self._parent._release_lock_for_key(self._namespace, key)
except Exception as e:
errors.append(("ref_release", e))
direct_log(
f"Lock rollback reference release error for key {key}: {e}",
level="ERROR",
enable_output=True,
)
# 3. Decrement debug counter
if debug_inc:
try:
dec_debug_n_locks_acquired()
except Exception as e:
errors.append(("debug_dec", e))
direct_log(
f"Lock rollback counter decrementing error for key {key}: {e}",
level="ERROR",
enable_output=True,
)
return errors
# Release already acquired locks in reverse order
for entry in reversed(self._ul):
# Use shield to protect each lock's rollback
try:
await asyncio.shield(rollback_single_entry(entry))
except Exception as e:
# Log but continue rolling back other locks
direct_log(
f"Lock rollback unexpected error for {entry['key']}: {e}",
level="ERROR",
enable_output=True,
)
self._ul = None
# ----- exit -----
async def __aexit__(self, exc_type, exc, tb):
# The UnifiedLock takes care of proper release order
for ul, key in zip(reversed(self._ul), reversed(self._keys)):
await ul.__aexit__(exc_type, exc, tb)
self._parent._release_lock_for_key(self._namespace, key)
dec_debug_n_locks_acquired()
self._ul = None
if self._ul is None:
return
async def release_all_locks():
"""Release all locks with comprehensive error handling, protected from cancellation"""
async def release_single_entry(entry, exc_type, exc, tb):
"""Release a single lock with full protection"""
key = entry["key"]
lock = entry["lock"]
debug_inc = entry["debug_inc"]
entered = entry["entered"]
errors = []
# 1. Release the lock
if entered:
try:
await lock.__aexit__(exc_type, exc, tb)
except Exception as e:
errors.append(("lock_exit", e))
direct_log(
f"Lock release error for key {key}: {e}",
level="ERROR",
enable_output=True,
)
# 2. Release reference count
try:
self._parent._release_lock_for_key(self._namespace, key)
except Exception as e:
errors.append(("ref_release", e))
direct_log(
f"Lock release reference error for key {key}: {e}",
level="ERROR",
enable_output=True,
)
# 3. Decrement debug counter
if debug_inc:
try:
dec_debug_n_locks_acquired()
except Exception as e:
errors.append(("debug_dec", e))
direct_log(
f"Lock release counter decrementing error for key {key}: {e}",
level="ERROR",
enable_output=True,
)
return errors
all_errors = []
# Release locks in reverse order
# This entire loop is protected by the outer shield
for entry in reversed(self._ul):
try:
errors = await release_single_entry(entry, exc_type, exc, tb)
for error_type, error in errors:
all_errors.append((entry["key"], error_type, error))
except Exception as e:
all_errors.append((entry["key"], "unexpected", e))
direct_log(
f"Lock release unexpected error for {entry['key']}: {e}",
level="ERROR",
enable_output=True,
)
return all_errors
# CRITICAL: Protect the entire release process with shield
# This ensures that even if cancellation occurs, all locks are released
try:
all_errors = await asyncio.shield(release_all_locks())
except Exception as e:
direct_log(
f"Critical error during __aexit__ cleanup: {e}",
level="ERROR",
enable_output=True,
)
all_errors = []
finally:
# Always clear the lock list, even if shield was cancelled
self._ul = None
# If there were release errors and no other exception, raise the first release error
if all_errors and exc_type is None:
raise all_errors[0][2] # (key, error_type, error)
def get_internal_lock(enable_logging: bool = False) -> UnifiedLock:

View file

@ -22,6 +22,7 @@ from typing import (
Dict,
)
from lightrag.prompt import PROMPTS
from lightrag.exceptions import PipelineCancelledException
from lightrag.constants import (
DEFAULT_MAX_GLEANING,
DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE,
@ -1603,6 +1604,7 @@ class LightRAG:
"batchs": 0, # Total number of files to be processed
"cur_batch": 0, # Number of files already processed
"request_pending": False, # Clear any previous request
"cancellation_requested": False, # Initialize cancellation flag
"latest_message": "",
}
)
@ -1619,6 +1621,22 @@ class LightRAG:
try:
# Process documents until no more documents or requests
while True:
# Check for cancellation request at the start of main loop
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
# Clear pending request
pipeline_status["request_pending"] = False
# Celar cancellation flag
pipeline_status["cancellation_requested"] = False
log_message = "Pipeline cancelled by user"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
# Exit directly, skipping request_pending check
return
if not to_process_docs:
log_message = "All enqueued documents have been processed"
logger.info(log_message)
@ -1681,14 +1699,25 @@ class LightRAG:
semaphore: asyncio.Semaphore,
) -> None:
"""Process single document"""
# Initialize variables at the start to prevent UnboundLocalError in error handling
file_path = "unknown_source"
current_file_number = 0
file_extraction_stage_ok = False
processing_start_time = int(time.time())
first_stage_tasks = []
entity_relation_task = None
async with semaphore:
nonlocal processed_count
current_file_number = 0
# Initialize to prevent UnboundLocalError in error handling
first_stage_tasks = []
entity_relation_task = None
try:
# Check for cancellation before starting document processing
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException("User cancelled")
# Get file path from status document
file_path = getattr(
status_doc, "file_path", "unknown_source"
@ -1751,6 +1780,11 @@ class LightRAG:
# Record processing start time
processing_start_time = int(time.time())
# Check for cancellation before entity extraction
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException("User cancelled")
# Process document in two stages
# Stage 1: Process text chunks and docs (parallel execution)
doc_status_task = asyncio.create_task(
@ -1805,16 +1839,29 @@ class LightRAG:
file_extraction_stage_ok = True
except Exception as e:
# Log error and update pipeline status
logger.error(traceback.format_exc())
error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(error_msg)
# Check if this is a user cancellation
if isinstance(e, PipelineCancelledException):
# User cancellation - log brief message only, no traceback
error_msg = f"User cancelled {current_file_number}/{total_files}: {file_path}"
logger.warning(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
error_msg
)
else:
# Other exceptions - log with traceback
logger.error(traceback.format_exc())
error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(
error_msg
)
# Cancel tasks that are not yet completed
all_tasks = first_stage_tasks + (
@ -1824,9 +1871,14 @@ class LightRAG:
if task and not task.done():
task.cancel()
# Persistent llm cache
# Persistent llm cache with error handling
if self.llm_response_cache:
await self.llm_response_cache.index_done_callback()
try:
await self.llm_response_cache.index_done_callback()
except Exception as persist_error:
logger.error(
f"Failed to persist LLM cache: {persist_error}"
)
# Record processing end time for failed case
processing_end_time = int(time.time())
@ -1856,6 +1908,15 @@ class LightRAG:
# Concurrency is controlled by keyed lock for individual entities and relationships
if file_extraction_stage_ok:
try:
# Check for cancellation before merge
async with pipeline_status_lock:
if pipeline_status.get(
"cancellation_requested", False
):
raise PipelineCancelledException(
"User cancelled"
)
# Get chunk_results from entity_relation_task
chunk_results = await entity_relation_task
await merge_nodes_and_edges(
@ -1914,22 +1975,38 @@ class LightRAG:
)
except Exception as e:
# Log error and update pipeline status
logger.error(traceback.format_exc())
error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(
error_msg
)
# Check if this is a user cancellation
if isinstance(e, PipelineCancelledException):
# User cancellation - log brief message only, no traceback
error_msg = f"User cancelled during merge {current_file_number}/{total_files}: {file_path}"
logger.warning(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
error_msg
)
else:
# Other exceptions - log with traceback
logger.error(traceback.format_exc())
error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(
error_msg
)
# Persistent llm cache
# Persistent llm cache with error handling
if self.llm_response_cache:
await self.llm_response_cache.index_done_callback()
try:
await self.llm_response_cache.index_done_callback()
except Exception as persist_error:
logger.error(
f"Failed to persist LLM cache: {persist_error}"
)
# Record processing end time for failed case
processing_end_time = int(time.time())
@ -1970,7 +2047,19 @@ class LightRAG:
)
# Wait for all document processing to complete
await asyncio.gather(*doc_tasks)
try:
await asyncio.gather(*doc_tasks)
except PipelineCancelledException:
# Cancel all remaining tasks
for task in doc_tasks:
if not task.done():
task.cancel()
# Wait for all tasks to complete cancellation
await asyncio.wait(doc_tasks, return_when=asyncio.ALL_COMPLETED)
# Exit directly (document statuses already updated in process_document)
return
# Check if there's a pending request to process more documents (with lock)
has_pending_request = False
@ -2001,11 +2090,14 @@ class LightRAG:
to_process_docs.update(pending_docs)
finally:
log_message = "Enqueued document processing pipeline stoped"
log_message = "Enqueued document processing pipeline stopped"
logger.info(log_message)
# Always reset busy status when done or if an exception occurs (with lock)
# Always reset busy status and cancellation flag when done or if an exception occurs (with lock)
async with pipeline_status_lock:
pipeline_status["busy"] = False
pipeline_status["cancellation_requested"] = (
False # Always reset cancellation flag
)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)

View file

@ -1,5 +1,6 @@
from __future__ import annotations
from functools import partial
from pathlib import Path
import asyncio
import json
@ -7,6 +8,7 @@ import json_repair
from typing import Any, AsyncIterator, overload, Literal
from collections import Counter, defaultdict
from lightrag.exceptions import PipelineCancelledException
from lightrag.utils import (
logger,
compute_mdhash_id,
@ -67,7 +69,7 @@ from dotenv import load_dotenv
# use the .env that is inside the current folder
# allows to use different .env file for each lightrag instance
# the OS environment variables take precedence over the .env file
load_dotenv(dotenv_path=".env", override=False)
load_dotenv(dotenv_path=Path(__file__).resolve().parent / ".env", override=False)
def _truncate_entity_identifier(
@ -1637,6 +1639,12 @@ async def _merge_nodes_then_upsert(
logger.error(f"Entity {entity_name} has no description")
raise ValueError(f"Entity {entity_name} has no description")
# Check for cancellation before LLM summary
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException("User cancelled during entity summary")
# 8. Get summary description an LLM usage status
description, llm_was_used = await _handle_entity_relation_summary(
"Entity",
@ -1957,6 +1965,14 @@ async def _merge_edges_then_upsert(
logger.error(f"Relation {src_id}~{tgt_id} has no description")
raise ValueError(f"Relation {src_id}~{tgt_id} has no description")
# Check for cancellation before LLM summary
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException(
"User cancelled during relation summary"
)
# 8. Get summary description an LLM usage status
description, llm_was_used = await _handle_entity_relation_summary(
"Relation",
@ -2214,6 +2230,12 @@ async def merge_nodes_and_edges(
file_path: File path for logging
"""
# Check for cancellation at the start of merge
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException("User cancelled during merge phase")
# Collect all nodes and edges from all chunks
all_nodes = defaultdict(list)
all_edges = defaultdict(list)
@ -2250,6 +2272,14 @@ async def merge_nodes_and_edges(
async def _locked_process_entity_name(entity_name, entities):
async with semaphore:
# Check for cancellation before processing entity
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException(
"User cancelled during entity merge"
)
workspace = global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
async with get_storage_keyed_lock(
@ -2272,9 +2302,7 @@ async def merge_nodes_and_edges(
return entity_data
except Exception as e:
error_msg = (
f"Critical error in entity processing for `{entity_name}`: {e}"
)
error_msg = f"Error processing entity `{entity_name}`: {e}"
logger.error(error_msg)
# Try to update pipeline status, but don't let status update failure affect main exception
@ -2310,36 +2338,32 @@ async def merge_nodes_and_edges(
entity_tasks, return_when=asyncio.FIRST_EXCEPTION
)
# Check if any task raised an exception and ensure all exceptions are retrieved
first_exception = None
successful_results = []
processed_entities = []
for task in done:
try:
exception = task.exception()
if exception is not None:
if first_exception is None:
first_exception = exception
else:
successful_results.append(task.result())
except Exception as e:
result = task.result()
except BaseException as e:
if first_exception is None:
first_exception = e
else:
processed_entities.append(result)
if pending:
for task in pending:
task.cancel()
pending_results = await asyncio.gather(*pending, return_exceptions=True)
for result in pending_results:
if isinstance(result, BaseException):
if first_exception is None:
first_exception = result
else:
processed_entities.append(result)
# If any task failed, cancel all pending tasks and raise the first exception
if first_exception is not None:
# Cancel all pending tasks
for pending_task in pending:
pending_task.cancel()
# Wait for cancellation to complete
if pending:
await asyncio.wait(pending)
# Re-raise the first exception to notify the caller
raise first_exception
# If all tasks completed successfully, collect results
processed_entities = [task.result() for task in entity_tasks]
# ===== Phase 2: Process all relationships concurrently =====
log_message = f"Phase 2: Processing {total_relations_count} relations from {doc_id} (async: {graph_max_async})"
logger.info(log_message)
@ -2349,6 +2373,14 @@ async def merge_nodes_and_edges(
async def _locked_process_edges(edge_key, edges):
async with semaphore:
# Check for cancellation before processing edges
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException(
"User cancelled during relation merge"
)
workspace = global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
sorted_edge_key = sorted([edge_key[0], edge_key[1]])
@ -2383,7 +2415,7 @@ async def merge_nodes_and_edges(
return edge_data, added_entities
except Exception as e:
error_msg = f"Critical error in relationship processing for `{sorted_edge_key}`: {e}"
error_msg = f"Error processing relation `{sorted_edge_key}`: {e}"
logger.error(error_msg)
# Try to update pipeline status, but don't let status update failure affect main exception
@ -2421,40 +2453,36 @@ async def merge_nodes_and_edges(
edge_tasks, return_when=asyncio.FIRST_EXCEPTION
)
# Check if any task raised an exception and ensure all exceptions are retrieved
first_exception = None
successful_results = []
for task in done:
try:
exception = task.exception()
if exception is not None:
if first_exception is None:
first_exception = exception
else:
successful_results.append(task.result())
except Exception as e:
edge_data, added_entities = task.result()
except BaseException as e:
if first_exception is None:
first_exception = e
else:
if edge_data is not None:
processed_edges.append(edge_data)
all_added_entities.extend(added_entities)
if pending:
for task in pending:
task.cancel()
pending_results = await asyncio.gather(*pending, return_exceptions=True)
for result in pending_results:
if isinstance(result, BaseException):
if first_exception is None:
first_exception = result
else:
edge_data, added_entities = result
if edge_data is not None:
processed_edges.append(edge_data)
all_added_entities.extend(added_entities)
# If any task failed, cancel all pending tasks and raise the first exception
if first_exception is not None:
# Cancel all pending tasks
for pending_task in pending:
pending_task.cancel()
# Wait for cancellation to complete
if pending:
await asyncio.wait(pending)
# Re-raise the first exception to notify the caller
raise first_exception
# If all tasks completed successfully, collect results
for task in edge_tasks:
edge_data, added_entities = task.result()
if edge_data is not None:
processed_edges.append(edge_data)
all_added_entities.extend(added_entities)
# ===== Phase 3: Update full_entities and full_relations storage =====
if full_entities_storage and full_relations_storage and doc_id:
try:
@ -2535,6 +2563,14 @@ async def extract_entities(
llm_response_cache: BaseKVStorage | None = None,
text_chunks_storage: BaseKVStorage | None = None,
) -> list:
# Check for cancellation at the start of entity extraction
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException(
"User cancelled during entity extraction"
)
use_llm_func: callable = global_config["llm_model_func"]
entity_extract_max_gleaning = global_config["entity_extract_max_gleaning"]
@ -2702,6 +2738,14 @@ async def extract_entities(
async def _process_with_semaphore(chunk):
async with semaphore:
# Check for cancellation before processing chunk
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException(
"User cancelled during chunk processing"
)
try:
return await _process_single_content(chunk)
except Exception as e:

View file

@ -242,6 +242,7 @@ export type PipelineStatusResponse = {
batchs: number
cur_batch: number
request_pending: boolean
cancellation_requested?: boolean
latest_message: string
history_messages?: string[]
update_status?: Record<string, any>
@ -691,6 +692,14 @@ export const getPipelineStatus = async (): Promise<PipelineStatusResponse> => {
return response.data
}
export const cancelPipeline = async (): Promise<{
status: 'cancellation_requested' | 'not_busy'
message: string
}> => {
const response = await axiosInstance.post('/documents/cancel_pipeline')
return response.data
}
export const loginToServer = async (username: string, password: string): Promise<LoginResponse> => {
const formData = new FormData();
formData.append('username', username);

View file

@ -11,7 +11,7 @@ import {
DialogDescription
} from '@/components/ui/Dialog'
import Button from '@/components/ui/Button'
import { getPipelineStatus, PipelineStatusResponse } from '@/api/lightrag'
import { getPipelineStatus, cancelPipeline, PipelineStatusResponse } from '@/api/lightrag'
import { errorMessage } from '@/lib/utils'
import { cn } from '@/lib/utils'
@ -30,6 +30,7 @@ export default function PipelineStatusDialog({
const [status, setStatus] = useState<PipelineStatusResponse | null>(null)
const [position, setPosition] = useState<DialogPosition>('center')
const [isUserScrolled, setIsUserScrolled] = useState(false)
const [showCancelConfirm, setShowCancelConfirm] = useState(false)
const historyRef = useRef<HTMLDivElement>(null)
// Reset position when dialog opens
@ -37,6 +38,9 @@ export default function PipelineStatusDialog({
if (open) {
setPosition('center')
setIsUserScrolled(false)
} else {
// Reset confirmation dialog state when main dialog closes
setShowCancelConfirm(false)
}
}, [open])
@ -81,6 +85,24 @@ export default function PipelineStatusDialog({
return () => clearInterval(interval)
}, [open, t])
// Handle cancel pipeline confirmation
const handleConfirmCancel = async () => {
setShowCancelConfirm(false)
try {
const result = await cancelPipeline()
if (result.status === 'cancellation_requested') {
toast.success(t('documentPanel.pipelineStatus.cancelSuccess'))
} else if (result.status === 'not_busy') {
toast.info(t('documentPanel.pipelineStatus.cancelNotBusy'))
}
} catch (err) {
toast.error(t('documentPanel.pipelineStatus.cancelFailed', { error: errorMessage(err) }))
}
}
// Determine if cancel button should be enabled
const canCancel = status?.busy === true && !status?.cancellation_requested
return (
<Dialog open={open} onOpenChange={onOpenChange}>
<DialogContent
@ -142,16 +164,43 @@ export default function PipelineStatusDialog({
{/* Status Content */}
<div className="space-y-4 pt-4">
{/* Pipeline Status */}
<div className="flex items-center gap-4">
<div className="flex items-center gap-2">
<div className="text-sm font-medium">{t('documentPanel.pipelineStatus.busy')}:</div>
<div className={`h-2 w-2 rounded-full ${status?.busy ? 'bg-green-500' : 'bg-gray-300'}`} />
</div>
<div className="flex items-center gap-2">
<div className="text-sm font-medium">{t('documentPanel.pipelineStatus.requestPending')}:</div>
<div className={`h-2 w-2 rounded-full ${status?.request_pending ? 'bg-green-500' : 'bg-gray-300'}`} />
{/* Pipeline Status - with cancel button */}
<div className="flex flex-wrap items-center justify-between gap-4">
{/* Left side: Status indicators */}
<div className="flex items-center gap-4">
<div className="flex items-center gap-2">
<div className="text-sm font-medium">{t('documentPanel.pipelineStatus.busy')}:</div>
<div className={`h-2 w-2 rounded-full ${status?.busy ? 'bg-green-500' : 'bg-gray-300'}`} />
</div>
<div className="flex items-center gap-2">
<div className="text-sm font-medium">{t('documentPanel.pipelineStatus.requestPending')}:</div>
<div className={`h-2 w-2 rounded-full ${status?.request_pending ? 'bg-green-500' : 'bg-gray-300'}`} />
</div>
{/* Only show cancellation status when it's requested */}
{status?.cancellation_requested && (
<div className="flex items-center gap-2">
<div className="text-sm font-medium">{t('documentPanel.pipelineStatus.cancellationRequested')}:</div>
<div className="h-2 w-2 rounded-full bg-red-500" />
</div>
)}
</div>
{/* Right side: Cancel button - only show when pipeline is busy */}
{status?.busy && (
<Button
variant="destructive"
size="sm"
disabled={!canCancel}
onClick={() => setShowCancelConfirm(true)}
title={
status?.cancellation_requested
? t('documentPanel.pipelineStatus.cancelInProgress')
: t('documentPanel.pipelineStatus.cancelTooltip')
}
>
{t('documentPanel.pipelineStatus.cancelButton')}
</Button>
)}
</div>
{/* Job Information */}
@ -172,31 +221,49 @@ export default function PipelineStatusDialog({
</div>
</div>
{/* Latest Message */}
<div className="space-y-2">
<div className="text-sm font-medium">{t('documentPanel.pipelineStatus.latestMessage')}:</div>
<div className="font-mono text-xs rounded-md bg-zinc-800 text-zinc-100 p-3 whitespace-pre-wrap break-words">
{status?.latest_message || '-'}
</div>
</div>
{/* History Messages */}
<div className="space-y-2">
<div className="text-sm font-medium">{t('documentPanel.pipelineStatus.historyMessages')}:</div>
<div className="text-sm font-medium">{t('documentPanel.pipelineStatus.pipelineMessages')}:</div>
<div
ref={historyRef}
onScroll={handleScroll}
className="font-mono text-xs rounded-md bg-zinc-800 text-zinc-100 p-3 overflow-y-auto min-h-[7.5em] max-h-[40vh]"
className="font-mono text-xs rounded-md bg-zinc-800 text-zinc-100 p-3 overflow-y-auto overflow-x-hidden min-h-[7.5em] max-h-[40vh]"
>
{status?.history_messages?.length ? (
status.history_messages.map((msg, idx) => (
<div key={idx} className="whitespace-pre-wrap break-words">{msg}</div>
<div key={idx} className="whitespace-pre-wrap break-all">{msg}</div>
))
) : '-'}
</div>
</div>
</div>
</DialogContent>
{/* Cancel Confirmation Dialog */}
<Dialog open={showCancelConfirm} onOpenChange={setShowCancelConfirm}>
<DialogContent className="sm:max-w-[425px]">
<DialogHeader>
<DialogTitle>{t('documentPanel.pipelineStatus.cancelConfirmTitle')}</DialogTitle>
<DialogDescription>
{t('documentPanel.pipelineStatus.cancelConfirmDescription')}
</DialogDescription>
</DialogHeader>
<div className="flex justify-end gap-3 mt-4">
<Button
variant="outline"
onClick={() => setShowCancelConfirm(false)}
>
{t('common.cancel')}
</Button>
<Button
variant="destructive"
onClick={handleConfirmCancel}
>
{t('documentPanel.pipelineStatus.cancelConfirmButton')}
</Button>
</div>
</DialogContent>
</Dialog>
</Dialog>
)
}

View file

@ -21,7 +21,6 @@ import PaginationControls from '@/components/ui/PaginationControls'
import {
scanNewDocuments,
reprocessFailedDocuments,
getDocumentsPaginated,
DocsStatusesResponse,
DocStatus,
@ -868,42 +867,6 @@ export default function DocumentManager() {
}
}, [t, startPollingInterval, currentTab, health, statusCounts])
const retryFailedDocuments = useCallback(async () => {
try {
// Check if component is still mounted before starting the request
if (!isMountedRef.current) return;
const { status, message, track_id: _track_id } = await reprocessFailedDocuments(); // eslint-disable-line @typescript-eslint/no-unused-vars
// Check again if component is still mounted after the request completes
if (!isMountedRef.current) return;
// Note: _track_id is available for future use (e.g., progress tracking)
toast.message(message || status);
// Reset health check timer with 1 second delay to avoid race condition
useBackendState.getState().resetHealthCheckTimerDelayed(1000);
// Start fast refresh with 2-second interval immediately after retry
startPollingInterval(2000);
// Set recovery timer to restore normal polling interval after 15 seconds
setTimeout(() => {
if (isMountedRef.current && currentTab === 'documents' && health) {
// Restore intelligent polling interval based on document status
const hasActiveDocuments = hasActiveDocumentsStatus(statusCounts);
const normalInterval = hasActiveDocuments ? 5000 : 30000;
startPollingInterval(normalInterval);
}
}, 15000); // Restore after 15 seconds
} catch (err) {
// Only show error if component is still mounted
if (isMountedRef.current) {
toast.error(errorMessage(err));
}
}
}, [startPollingInterval, currentTab, health, statusCounts])
// Handle page size change - update state and save to store
const handlePageSizeChange = useCallback((newPageSize: number) => {
if (newPageSize === pagination.page_size) return;
@ -1166,16 +1129,6 @@ export default function DocumentManager() {
>
<RefreshCwIcon /> {t('documentPanel.documentManager.scanButton')}
</Button>
<Button
variant="outline"
onClick={retryFailedDocuments}
side="bottom"
tooltip={t('documentPanel.documentManager.retryFailedTooltip')}
size="sm"
disabled={pipelineBusy}
>
<RotateCcwIcon /> {t('documentPanel.documentManager.retryFailedButton')}
</Button>
<Button
variant="outline"
onClick={() => setShowPipelineStatus(true)}

View file

@ -114,10 +114,8 @@
},
"documentManager": {
"title": "إدارة المستندات",
"scanButton": "مسح ضوئي",
"scanButton": "مسح/إعادة محاولة",
"scanTooltip": "مسح ومعالجة المستندات في مجلد الإدخال، وإعادة معالجة جميع المستندات الفاشلة أيضًا",
"retryFailedButton": "إعادة المحاولة",
"retryFailedTooltip": "إعادة معالجة جميع المستندات الفاشلة",
"refreshTooltip": "إعادة تعيين قائمة المستندات",
"pipelineStatusButton": "خط المعالجة",
"pipelineStatusTooltip": "عرض حالة خط معالجة المستندات",
@ -157,17 +155,27 @@
"hideFileNameTooltip": "إخفاء اسم الملف"
},
"pipelineStatus": {
"title": "حالة خط المعالجة",
"busy": "خط المعالجة مشغول",
"requestPending": "الطلب معلق",
"title": "حالة خط الأنابيب",
"busy": "خط الأنابيب مشغول",
"requestPending": "طلب معلق",
"cancellationRequested": "طلب الإلغاء",
"jobName": "اسم المهمة",
"startTime": "وقت البدء",
"progress": "التقدم",
"unit": "دفعة",
"latestMessage": "آخر رسالة",
"historyMessages": "سجل الرسائل",
"pipelineMessages": "رسائل خط الأنابيب",
"cancelButton": "إلغاء",
"cancelTooltip": "إلغاء معالجة خط الأنابيب",
"cancelConfirmTitle": "تأكيد إلغاء خط الأنابيب",
"cancelConfirmDescription": "سيؤدي هذا الإجراء إلى إيقاف معالجة خط الأنابيب الجارية. هل أنت متأكد من أنك تريد المتابعة؟",
"cancelConfirmButton": "تأكيد الإلغاء",
"cancelInProgress": "الإلغاء قيد التقدم...",
"pipelineNotRunning": "خط الأنابيب غير قيد التشغيل",
"cancelSuccess": "تم طلب إلغاء خط الأنابيب",
"cancelFailed": "فشل إلغاء خط الأنابيب\n{{error}}",
"cancelNotBusy": "خط الأنابيب غير قيد التشغيل، لا حاجة للإلغاء",
"errors": {
"fetchFailed": "فشل في جلب حالة خط المعالجة\n{{error}}"
"fetchFailed": "فشل في جلب حالة خط الأنابيب\n{{error}}"
}
}
},

View file

@ -114,10 +114,8 @@
},
"documentManager": {
"title": "Document Management",
"scanButton": "Scan",
"scanButton": "Scan/Retry",
"scanTooltip": "Scan and process documents in input folder, and also reprocess all failed documents",
"retryFailedButton": "Retry",
"retryFailedTooltip": "Retry processing all failed documents",
"refreshTooltip": "Reset document list",
"pipelineStatusButton": "Pipeline",
"pipelineStatusTooltip": "View document processing pipeline status",
@ -160,14 +158,24 @@
"title": "Pipeline Status",
"busy": "Pipeline Busy",
"requestPending": "Request Pending",
"cancellationRequested": "Cancellation Requested",
"jobName": "Job Name",
"startTime": "Start Time",
"progress": "Progress",
"unit": "batch",
"latestMessage": "Latest Message",
"historyMessages": "History Messages",
"unit": "Batch",
"pipelineMessages": "Pipeline Messages",
"cancelButton": "Cancel",
"cancelTooltip": "Cancel pipeline processing",
"cancelConfirmTitle": "Confirm Pipeline Cancellation",
"cancelConfirmDescription": "This will interrupt the ongoing pipeline processing. Are you sure you want to continue?",
"cancelConfirmButton": "Confirm Cancellation",
"cancelInProgress": "Cancellation in progress...",
"pipelineNotRunning": "Pipeline not running",
"cancelSuccess": "Pipeline cancellation requested",
"cancelFailed": "Failed to cancel pipeline\n{{error}}",
"cancelNotBusy": "Pipeline is not running, no need to cancel",
"errors": {
"fetchFailed": "Failed to get pipeline status\n{{error}}"
"fetchFailed": "Failed to fetch pipeline status\n{{error}}"
}
}
},

View file

@ -114,10 +114,8 @@
},
"documentManager": {
"title": "Gestion des documents",
"scanButton": "Scanner",
"scanButton": "Scanner/Retraiter",
"scanTooltip": "Scanner et traiter les documents dans le dossier d'entrée, et retraiter également tous les documents échoués",
"retryFailedButton": "Réessayer",
"retryFailedTooltip": "Réessayer le traitement de tous les documents échoués",
"refreshTooltip": "Réinitialiser la liste des documents",
"pipelineStatusButton": "Pipeline",
"pipelineStatusTooltip": "Voir l'état du pipeline de traitement des documents",
@ -158,14 +156,24 @@
},
"pipelineStatus": {
"title": "État du Pipeline",
"busy": "Pipeline occupé",
"requestPending": "Requête en attente",
"jobName": "Nom du travail",
"startTime": "Heure de début",
"progress": "Progression",
"unit": "lot",
"latestMessage": "Dernier message",
"historyMessages": "Historique des messages",
"busy": "Pipeline Occupé",
"requestPending": "Demande en Attente",
"cancellationRequested": "Annulation Demandée",
"jobName": "Nom du Travail",
"startTime": "Heure de Début",
"progress": "Progrès",
"unit": "Lot",
"pipelineMessages": "Messages de Pipeline",
"cancelButton": "Annuler",
"cancelTooltip": "Annuler le traitement du pipeline",
"cancelConfirmTitle": "Confirmer l'Annulation du Pipeline",
"cancelConfirmDescription": "Cette action interrompra le traitement du pipeline en cours. Êtes-vous sûr de vouloir continuer ?",
"cancelConfirmButton": "Confirmer l'Annulation",
"cancelInProgress": "Annulation en cours...",
"pipelineNotRunning": "Le pipeline n'est pas en cours d'exécution",
"cancelSuccess": "Annulation du pipeline demandée",
"cancelFailed": "Échec de l'annulation du pipeline\n{{error}}",
"cancelNotBusy": "Le pipeline n'est pas en cours d'exécution, pas besoin d'annuler",
"errors": {
"fetchFailed": "Échec de la récupération de l'état du pipeline\n{{error}}"
}

View file

@ -114,10 +114,8 @@
},
"documentManager": {
"title": "文档管理",
"scanButton": "扫描",
"scanButton": "扫描/重试",
"scanTooltip": "扫描处理输入目录中的文档,同时重新处理所有失败的文档",
"retryFailedButton": "重试",
"retryFailedTooltip": "重新处理所有失败的文档",
"refreshTooltip": "复位文档清单",
"pipelineStatusButton": "流水线",
"pipelineStatusTooltip": "查看文档处理流水线状态",
@ -160,12 +158,22 @@
"title": "流水线状态",
"busy": "流水线忙碌",
"requestPending": "待处理请求",
"cancellationRequested": "取消请求",
"jobName": "作业名称",
"startTime": "开始时间",
"progress": "进度",
"unit": "批",
"latestMessage": "最新消息",
"historyMessages": "历史消息",
"pipelineMessages": "流水线消息",
"cancelButton": "中断",
"cancelTooltip": "中断流水线处理",
"cancelConfirmTitle": "确认中断流水线",
"cancelConfirmDescription": "此操作将中断正在进行的流水线处理。确定要继续吗?",
"cancelConfirmButton": "确认中断",
"cancelInProgress": "取消请求进行中...",
"pipelineNotRunning": "流水线未运行",
"cancelSuccess": "流水线中断请求已发送",
"cancelFailed": "中断流水线失败\n{{error}}",
"cancelNotBusy": "流水线未运行,无需中断",
"errors": {
"fetchFailed": "获取流水线状态失败\n{{error}}"
}

View file

@ -114,10 +114,8 @@
},
"documentManager": {
"title": "文件管理",
"scanButton": "掃描",
"scanButton": "掃描/重試",
"scanTooltip": "掃描處理輸入目錄中的文件,同時重新處理所有失敗的文件",
"retryFailedButton": "重試",
"retryFailedTooltip": "重新處理所有失敗的文件",
"refreshTooltip": "重設文件清單",
"pipelineStatusButton": "管線狀態",
"pipelineStatusTooltip": "查看文件處理管線狀態",
@ -157,17 +155,27 @@
"hideFileNameTooltip": "隱藏檔案名稱"
},
"pipelineStatus": {
"title": "pipeline 狀態",
"busy": "pipeline 忙碌中",
"title": "流水線狀態",
"busy": "流水線忙碌",
"requestPending": "待處理請求",
"jobName": "工作名稱",
"cancellationRequested": "取消請求",
"jobName": "作業名稱",
"startTime": "開始時間",
"progress": "進度",
"unit": "梯次",
"latestMessage": "最新訊息",
"historyMessages": "歷史訊息",
"unit": "批",
"pipelineMessages": "流水線消息",
"cancelButton": "中斷",
"cancelTooltip": "中斷流水線處理",
"cancelConfirmTitle": "確認中斷流水線",
"cancelConfirmDescription": "此操作將中斷正在進行的流水線處理。確定要繼續嗎?",
"cancelConfirmButton": "確認中斷",
"cancelInProgress": "取消請求進行中...",
"pipelineNotRunning": "流水線未運行",
"cancelSuccess": "流水線中斷請求已發送",
"cancelFailed": "中斷流水線失敗\n{{error}}",
"cancelNotBusy": "流水線未運行,無需中斷",
"errors": {
"fetchFailed": "取得pipeline 狀態失敗\n{{error}}"
"fetchFailed": "獲取流水線狀態失敗\n{{error}}"
}
}
},