From 743aefc6557ee9a67a7c5e9275b76b866fa17dc7 Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 24 Oct 2025 14:08:12 +0800 Subject: [PATCH 1/9] Add pipeline cancellation feature for graceful processing termination MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Add cancel_pipeline API endpoint • Implement PipelineCancelledException • Add cancellation checks in main loop • Handle task cancellation gracefully • Mark cancelled docs as FAILED --- lightrag/api/routers/document_routes.py | 81 +++++++++++++++++++++++++ lightrag/exceptions.py | 8 +++ lightrag/lightrag.py | 58 +++++++++++++++++- lightrag/operate.py | 39 ++++++++++++ 4 files changed, 183 insertions(+), 3 deletions(-) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 9ea831d2..7f6164ad 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -161,6 +161,28 @@ class ReprocessResponse(BaseModel): } +class CancelPipelineResponse(BaseModel): + """Response model for pipeline cancellation operation + + Attributes: + status: Status of the cancellation request + message: Message describing the operation result + """ + + status: Literal["cancellation_requested", "not_busy"] = Field( + description="Status of the cancellation request" + ) + message: str = Field(description="Human-readable message describing the operation") + + class Config: + json_schema_extra = { + "example": { + "status": "cancellation_requested", + "message": "Pipeline cancellation has been requested. Documents will be marked as FAILED.", + } + } + + class InsertTextRequest(BaseModel): """Request model for inserting a single text document @@ -2754,4 +2776,63 @@ def create_document_routes( logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) + @router.post( + "/cancel_pipeline", + response_model=CancelPipelineResponse, + dependencies=[Depends(combined_auth)], + ) + async def cancel_pipeline(): + """ + Request cancellation of the currently running pipeline. + + This endpoint sets a cancellation flag in the pipeline status. The pipeline will: + 1. Check this flag at key processing points + 2. Stop processing new documents + 3. Cancel all running document processing tasks + 4. Mark all PROCESSING documents as FAILED with reason "User cancelled" + + The cancellation is graceful and ensures data consistency. Documents that have + completed processing will remain in PROCESSED status. + + Returns: + CancelPipelineResponse: Response with status and message + - status="cancellation_requested": Cancellation flag has been set + - status="not_busy": Pipeline is not currently running + + Raises: + HTTPException: If an error occurs while setting cancellation flag (500). + """ + try: + from lightrag.kg.shared_storage import ( + get_namespace_data, + get_pipeline_status_lock, + ) + + pipeline_status = await get_namespace_data("pipeline_status") + pipeline_status_lock = get_pipeline_status_lock() + + async with pipeline_status_lock: + if not pipeline_status.get("busy", False): + return CancelPipelineResponse( + status="not_busy", + message="Pipeline is not currently running. No cancellation needed.", + ) + + # Set cancellation flag + pipeline_status["cancellation_requested"] = True + cancel_msg = "Pipeline cancellation requested by user" + logger.info(cancel_msg) + pipeline_status["latest_message"] = cancel_msg + pipeline_status["history_messages"].append(cancel_msg) + + return CancelPipelineResponse( + status="cancellation_requested", + message="Pipeline cancellation has been requested. Documents will be marked as FAILED.", + ) + + except Exception as e: + logger.error(f"Error requesting pipeline cancellation: {str(e)}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=str(e)) + return router diff --git a/lightrag/exceptions.py b/lightrag/exceptions.py index d57df1ac..09e1d0e7 100644 --- a/lightrag/exceptions.py +++ b/lightrag/exceptions.py @@ -96,3 +96,11 @@ class PipelineNotInitializedError(KeyError): f" await initialize_pipeline_status()" ) super().__init__(msg) + + +class PipelineCancelledException(Exception): + """Raised when pipeline processing is cancelled by user request.""" + + def __init__(self, message: str = "User cancelled"): + super().__init__(message) + self.message = message diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index ff9ce8b0..191a5acd 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -22,6 +22,7 @@ from typing import ( Dict, ) from lightrag.prompt import PROMPTS +from lightrag.exceptions import PipelineCancelledException from lightrag.constants import ( DEFAULT_MAX_GLEANING, DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE, @@ -1603,6 +1604,7 @@ class LightRAG: "batchs": 0, # Total number of files to be processed "cur_batch": 0, # Number of files already processed "request_pending": False, # Clear any previous request + "cancellation_requested": False, # Initialize cancellation flag "latest_message": "", } ) @@ -1619,6 +1621,22 @@ class LightRAG: try: # Process documents until no more documents or requests while True: + # Check for cancellation request at the start of main loop + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + # Clear pending request + pipeline_status["request_pending"] = False + # Celar cancellation flag + pipeline_status["cancellation_requested"] = False + + log_message = "Pipeline cancelled by user" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + + # Exit directly, skipping request_pending check + return + if not to_process_docs: log_message = "All enqueued documents have been processed" logger.info(log_message) @@ -1689,6 +1707,11 @@ class LightRAG: first_stage_tasks = [] entity_relation_task = None try: + # Check for cancellation before starting document processing + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException("User cancelled") + # Get file path from status document file_path = getattr( status_doc, "file_path", "unknown_source" @@ -1751,6 +1774,11 @@ class LightRAG: # Record processing start time processing_start_time = int(time.time()) + # Check for cancellation before entity extraction + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException("User cancelled") + # Process document in two stages # Stage 1: Process text chunks and docs (parallel execution) doc_status_task = asyncio.create_task( @@ -1856,6 +1884,15 @@ class LightRAG: # Concurrency is controlled by keyed lock for individual entities and relationships if file_extraction_stage_ok: try: + # Check for cancellation before merge + async with pipeline_status_lock: + if pipeline_status.get( + "cancellation_requested", False + ): + raise PipelineCancelledException( + "User cancelled" + ) + # Get chunk_results from entity_relation_task chunk_results = await entity_relation_task await merge_nodes_and_edges( @@ -1970,7 +2007,19 @@ class LightRAG: ) # Wait for all document processing to complete - await asyncio.gather(*doc_tasks) + try: + await asyncio.gather(*doc_tasks) + except PipelineCancelledException: + # Cancel all remaining tasks + for task in doc_tasks: + if not task.done(): + task.cancel() + + # Wait for all tasks to complete cancellation + await asyncio.wait(doc_tasks, return_when=asyncio.ALL_COMPLETED) + + # Exit directly (document statuses already updated in process_document) + return # Check if there's a pending request to process more documents (with lock) has_pending_request = False @@ -2001,11 +2050,14 @@ class LightRAG: to_process_docs.update(pending_docs) finally: - log_message = "Enqueued document processing pipeline stoped" + log_message = "Enqueued document processing pipeline stopped" logger.info(log_message) - # Always reset busy status when done or if an exception occurs (with lock) + # Always reset busy status and cancellation flag when done or if an exception occurs (with lock) async with pipeline_status_lock: pipeline_status["busy"] = False + pipeline_status["cancellation_requested"] = ( + False # Always reset cancellation flag + ) pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) diff --git a/lightrag/operate.py b/lightrag/operate.py index 8ecec587..adb4730e 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -7,6 +7,7 @@ import json_repair from typing import Any, AsyncIterator, overload, Literal from collections import Counter, defaultdict +from lightrag.exceptions import PipelineCancelledException from lightrag.utils import ( logger, compute_mdhash_id, @@ -2214,6 +2215,12 @@ async def merge_nodes_and_edges( file_path: File path for logging """ + # Check for cancellation at the start of merge + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException("User cancelled during merge phase") + # Collect all nodes and edges from all chunks all_nodes = defaultdict(list) all_edges = defaultdict(list) @@ -2250,6 +2257,14 @@ async def merge_nodes_and_edges( async def _locked_process_entity_name(entity_name, entities): async with semaphore: + # Check for cancellation before processing entity + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException( + "User cancelled during entity merge" + ) + workspace = global_config.get("workspace", "") namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" async with get_storage_keyed_lock( @@ -2349,6 +2364,14 @@ async def merge_nodes_and_edges( async def _locked_process_edges(edge_key, edges): async with semaphore: + # Check for cancellation before processing edges + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException( + "User cancelled during relation merge" + ) + workspace = global_config.get("workspace", "") namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" sorted_edge_key = sorted([edge_key[0], edge_key[1]]) @@ -2535,6 +2558,14 @@ async def extract_entities( llm_response_cache: BaseKVStorage | None = None, text_chunks_storage: BaseKVStorage | None = None, ) -> list: + # Check for cancellation at the start of entity extraction + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException( + "User cancelled during entity extraction" + ) + use_llm_func: callable = global_config["llm_model_func"] entity_extract_max_gleaning = global_config["entity_extract_max_gleaning"] @@ -2702,6 +2733,14 @@ async def extract_entities( async def _process_with_semaphore(chunk): async with semaphore: + # Check for cancellation before processing chunk + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException( + "User cancelled during chunk processing" + ) + try: return await _process_single_content(chunk) except Exception as e: From 78ad8873b888a9ea1b1581c089f2c8ec6d718ed0 Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 24 Oct 2025 14:47:20 +0800 Subject: [PATCH 2/9] Add cancellation check in delete loop --- lightrag/api/routers/document_routes.py | 18 +++++++++++++++++- lightrag/operate.py | 3 ++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 7f6164ad..54e6477d 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -1556,7 +1556,19 @@ async def background_delete_documents( try: # Loop through each document ID and delete them one by one for i, doc_id in enumerate(doc_ids, 1): + # Check for cancellation at the start of each document deletion async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + cancel_msg = f"Deletion cancelled by user at document {i}/{total_docs}. {len(successful_deletions)} deleted, {total_docs - i + 1} remaining." + logger.info(cancel_msg) + pipeline_status["latest_message"] = cancel_msg + pipeline_status["history_messages"].append(cancel_msg) + # Add remaining documents to failed list with cancellation reason + failed_deletions.extend( + doc_ids[i - 1 :] + ) # i-1 because enumerate starts at 1 + break # Exit the loop, remaining documents unchanged + start_msg = f"Deleting document {i}/{total_docs}: {doc_id}" logger.info(start_msg) pipeline_status["cur_batch"] = i @@ -1719,6 +1731,10 @@ async def background_delete_documents( # Final summary and check for pending requests async with pipeline_status_lock: pipeline_status["busy"] = False + pipeline_status["pending_requests"] = False # Reset pending requests flag + pipeline_status["cancellation_requested"] = ( + False # Always reset cancellation flag + ) completion_msg = f"Deletion completed: {len(successful_deletions)} successful, {len(failed_deletions)} failed" pipeline_status["latest_message"] = completion_msg pipeline_status["history_messages"].append(completion_msg) @@ -2252,7 +2268,7 @@ def create_document_routes( logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) - # TODO: Deprecated + # TODO: Deprecated, use /documents/paginated instead @router.get( "", response_model=DocsStatusesResponse, dependencies=[Depends(combined_auth)] ) diff --git a/lightrag/operate.py b/lightrag/operate.py index adb4730e..4bbf47fd 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -1,5 +1,6 @@ from __future__ import annotations from functools import partial +from pathlib import Path import asyncio import json @@ -68,7 +69,7 @@ from dotenv import load_dotenv # use the .env that is inside the current folder # allows to use different .env file for each lightrag instance # the OS environment variables take precedence over the .env file -load_dotenv(dotenv_path=".env", override=False) +load_dotenv(dotenv_path=Path(__file__).resolve().parent / ".env", override=False) def _truncate_entity_identifier( From f89b5ab101fa5013fb9c14fdfaf83dc83a2a5607 Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 24 Oct 2025 15:30:27 +0800 Subject: [PATCH 3/9] Add pipeline cancellation feature with UI and i18n support - Add cancelPipeline API endpoint - Add cancel button to status dialog - Update status response type - Add cancellation UI translations - Handle cancellation request states --- lightrag_webui/src/api/lightrag.ts | 9 +++ .../documents/PipelineStatusDialog.tsx | 64 ++++++++++++++++--- lightrag_webui/src/locales/ar.json | 18 ++++-- lightrag_webui/src/locales/en.json | 12 +++- lightrag_webui/src/locales/fr.json | 24 ++++--- lightrag_webui/src/locales/zh.json | 8 +++ lightrag_webui/src/locales/zh_TW.json | 22 +++++-- 7 files changed, 125 insertions(+), 32 deletions(-) diff --git a/lightrag_webui/src/api/lightrag.ts b/lightrag_webui/src/api/lightrag.ts index eac6989e..7a268642 100644 --- a/lightrag_webui/src/api/lightrag.ts +++ b/lightrag_webui/src/api/lightrag.ts @@ -242,6 +242,7 @@ export type PipelineStatusResponse = { batchs: number cur_batch: number request_pending: boolean + cancellation_requested?: boolean latest_message: string history_messages?: string[] update_status?: Record @@ -691,6 +692,14 @@ export const getPipelineStatus = async (): Promise => { return response.data } +export const cancelPipeline = async (): Promise<{ + status: 'cancellation_requested' | 'not_busy' + message: string +}> => { + const response = await axiosInstance.post('/documents/cancel_pipeline') + return response.data +} + export const loginToServer = async (username: string, password: string): Promise => { const formData = new FormData(); formData.append('username', username); diff --git a/lightrag_webui/src/components/documents/PipelineStatusDialog.tsx b/lightrag_webui/src/components/documents/PipelineStatusDialog.tsx index 2a2c5d93..0aab7dfe 100644 --- a/lightrag_webui/src/components/documents/PipelineStatusDialog.tsx +++ b/lightrag_webui/src/components/documents/PipelineStatusDialog.tsx @@ -11,7 +11,7 @@ import { DialogDescription } from '@/components/ui/Dialog' import Button from '@/components/ui/Button' -import { getPipelineStatus, PipelineStatusResponse } from '@/api/lightrag' +import { getPipelineStatus, cancelPipeline, PipelineStatusResponse } from '@/api/lightrag' import { errorMessage } from '@/lib/utils' import { cn } from '@/lib/utils' @@ -81,6 +81,23 @@ export default function PipelineStatusDialog({ return () => clearInterval(interval) }, [open, t]) + // Handle cancel pipeline + const handleCancelPipeline = async () => { + try { + const result = await cancelPipeline() + if (result.status === 'cancellation_requested') { + toast.success(t('documentPanel.pipelineStatus.cancelSuccess')) + } else if (result.status === 'not_busy') { + toast.info(t('documentPanel.pipelineStatus.cancelNotBusy')) + } + } catch (err) { + toast.error(t('documentPanel.pipelineStatus.cancelFailed', { error: errorMessage(err) })) + } + } + + // Determine if cancel button should be enabled + const canCancel = status?.busy === true && !status?.cancellation_requested + return ( - {/* Pipeline Status */} -
-
-
{t('documentPanel.pipelineStatus.busy')}:
-
-
-
-
{t('documentPanel.pipelineStatus.requestPending')}:
-
+ {/* Pipeline Status - with cancel button */} +
+ {/* Left side: Status indicators */} +
+
+
{t('documentPanel.pipelineStatus.busy')}:
+
+
+
+
{t('documentPanel.pipelineStatus.requestPending')}:
+
+
+ {/* Only show cancellation status when it's requested */} + {status?.cancellation_requested && ( +
+
{t('documentPanel.pipelineStatus.cancellationRequested')}:
+
+
+ )}
+ + {/* Right side: Cancel button - only show when pipeline is busy */} + {status?.busy && ( + + )}
{/* Job Information */} diff --git a/lightrag_webui/src/locales/ar.json b/lightrag_webui/src/locales/ar.json index be0c82cb..67b90629 100644 --- a/lightrag_webui/src/locales/ar.json +++ b/lightrag_webui/src/locales/ar.json @@ -157,17 +157,25 @@ "hideFileNameTooltip": "إخفاء اسم الملف" }, "pipelineStatus": { - "title": "حالة خط المعالجة", - "busy": "خط المعالجة مشغول", - "requestPending": "الطلب معلق", + "title": "حالة خط الأنابيب", + "busy": "خط الأنابيب مشغول", + "requestPending": "طلب معلق", + "cancellationRequested": "طلب الإلغاء", "jobName": "اسم المهمة", "startTime": "وقت البدء", "progress": "التقدم", "unit": "دفعة", "latestMessage": "آخر رسالة", - "historyMessages": "سجل الرسائل", + "historyMessages": "رسائل السجل", + "cancelButton": "إلغاء", + "cancelTooltip": "إلغاء معالجة خط الأنابيب", + "cancelInProgress": "الإلغاء قيد التقدم...", + "pipelineNotRunning": "خط الأنابيب غير قيد التشغيل", + "cancelSuccess": "تم طلب إلغاء خط الأنابيب", + "cancelFailed": "فشل إلغاء خط الأنابيب\n{{error}}", + "cancelNotBusy": "خط الأنابيب غير قيد التشغيل، لا حاجة للإلغاء", "errors": { - "fetchFailed": "فشل في جلب حالة خط المعالجة\n{{error}}" + "fetchFailed": "فشل في جلب حالة خط الأنابيب\n{{error}}" } } }, diff --git a/lightrag_webui/src/locales/en.json b/lightrag_webui/src/locales/en.json index 5ce4b3df..e48c0207 100644 --- a/lightrag_webui/src/locales/en.json +++ b/lightrag_webui/src/locales/en.json @@ -160,14 +160,22 @@ "title": "Pipeline Status", "busy": "Pipeline Busy", "requestPending": "Request Pending", + "cancellationRequested": "Cancellation Requested", "jobName": "Job Name", "startTime": "Start Time", "progress": "Progress", - "unit": "batch", + "unit": "Batch", "latestMessage": "Latest Message", "historyMessages": "History Messages", + "cancelButton": "Cancel", + "cancelTooltip": "Cancel pipeline processing", + "cancelInProgress": "Cancellation in progress...", + "pipelineNotRunning": "Pipeline not running", + "cancelSuccess": "Pipeline cancellation requested", + "cancelFailed": "Failed to cancel pipeline\n{{error}}", + "cancelNotBusy": "Pipeline is not running, no need to cancel", "errors": { - "fetchFailed": "Failed to get pipeline status\n{{error}}" + "fetchFailed": "Failed to fetch pipeline status\n{{error}}" } } }, diff --git a/lightrag_webui/src/locales/fr.json b/lightrag_webui/src/locales/fr.json index 941b55de..b75e3e63 100644 --- a/lightrag_webui/src/locales/fr.json +++ b/lightrag_webui/src/locales/fr.json @@ -158,14 +158,22 @@ }, "pipelineStatus": { "title": "État du Pipeline", - "busy": "Pipeline occupé", - "requestPending": "Requête en attente", - "jobName": "Nom du travail", - "startTime": "Heure de début", - "progress": "Progression", - "unit": "lot", - "latestMessage": "Dernier message", - "historyMessages": "Historique des messages", + "busy": "Pipeline Occupé", + "requestPending": "Demande en Attente", + "cancellationRequested": "Annulation Demandée", + "jobName": "Nom du Travail", + "startTime": "Heure de Début", + "progress": "Progrès", + "unit": "Lot", + "latestMessage": "Dernier Message", + "historyMessages": "Messages d'Historique", + "cancelButton": "Annuler", + "cancelTooltip": "Annuler le traitement du pipeline", + "cancelInProgress": "Annulation en cours...", + "pipelineNotRunning": "Le pipeline n'est pas en cours d'exécution", + "cancelSuccess": "Annulation du pipeline demandée", + "cancelFailed": "Échec de l'annulation du pipeline\n{{error}}", + "cancelNotBusy": "Le pipeline n'est pas en cours d'exécution, pas besoin d'annuler", "errors": { "fetchFailed": "Échec de la récupération de l'état du pipeline\n{{error}}" } diff --git a/lightrag_webui/src/locales/zh.json b/lightrag_webui/src/locales/zh.json index 3bbb31aa..2712a6d2 100644 --- a/lightrag_webui/src/locales/zh.json +++ b/lightrag_webui/src/locales/zh.json @@ -160,12 +160,20 @@ "title": "流水线状态", "busy": "流水线忙碌", "requestPending": "待处理请求", + "cancellationRequested": "取消请求", "jobName": "作业名称", "startTime": "开始时间", "progress": "进度", "unit": "批", "latestMessage": "最新消息", "historyMessages": "历史消息", + "cancelButton": "中断", + "cancelTooltip": "中断流水线处理", + "cancelInProgress": "取消请求进行中...", + "pipelineNotRunning": "流水线未运行", + "cancelSuccess": "流水线中断请求已发送", + "cancelFailed": "中断流水线失败\n{{error}}", + "cancelNotBusy": "流水线未运行,无需中断", "errors": { "fetchFailed": "获取流水线状态失败\n{{error}}" } diff --git a/lightrag_webui/src/locales/zh_TW.json b/lightrag_webui/src/locales/zh_TW.json index e4387e98..8fb06bf8 100644 --- a/lightrag_webui/src/locales/zh_TW.json +++ b/lightrag_webui/src/locales/zh_TW.json @@ -157,17 +157,25 @@ "hideFileNameTooltip": "隱藏檔案名稱" }, "pipelineStatus": { - "title": "pipeline 狀態", - "busy": "pipeline 忙碌中", + "title": "流水線狀態", + "busy": "流水線忙碌", "requestPending": "待處理請求", - "jobName": "工作名稱", + "cancellationRequested": "取消請求", + "jobName": "作業名稱", "startTime": "開始時間", "progress": "進度", - "unit": "梯次", - "latestMessage": "最新訊息", - "historyMessages": "歷史訊息", + "unit": "批", + "latestMessage": "最新消息", + "historyMessages": "歷史消息", + "cancelButton": "中斷", + "cancelTooltip": "中斷流水線處理", + "cancelInProgress": "取消請求進行中...", + "pipelineNotRunning": "流水線未運行", + "cancelSuccess": "流水線中斷請求已發送", + "cancelFailed": "中斷流水線失敗\n{{error}}", + "cancelNotBusy": "流水線未運行,無需中斷", "errors": { - "fetchFailed": "取得pipeline 狀態失敗\n{{error}}" + "fetchFailed": "獲取流水線狀態失敗\n{{error}}" } } }, From 77336e50b6d5d0089a05488dc7ed02d59364e0a1 Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 24 Oct 2025 17:54:17 +0800 Subject: [PATCH 4/9] Improve error handling and add cancellation checks in pipeline --- lightrag/lightrag.py | 76 ++++++++++++++++++++++++++++++-------------- lightrag/operate.py | 14 ++++++++ 2 files changed, 67 insertions(+), 23 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 191a5acd..dff637f6 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1699,10 +1699,16 @@ class LightRAG: semaphore: asyncio.Semaphore, ) -> None: """Process single document""" + # Initialize variables at the start to prevent UnboundLocalError in error handling + file_path = "unknown_source" + current_file_number = 0 file_extraction_stage_ok = False + processing_start_time = int(time.time()) + first_stage_tasks = [] + entity_relation_task = None + async with semaphore: nonlocal processed_count - current_file_number = 0 # Initialize to prevent UnboundLocalError in error handling first_stage_tasks = [] entity_relation_task = None @@ -1833,16 +1839,29 @@ class LightRAG: file_extraction_stage_ok = True except Exception as e: - # Log error and update pipeline status - logger.error(traceback.format_exc()) - error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}" - logger.error(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append( - traceback.format_exc() - ) - pipeline_status["history_messages"].append(error_msg) + # Check if this is a user cancellation + if isinstance(e, PipelineCancelledException): + # User cancellation - log brief message only, no traceback + error_msg = f"User cancelled {current_file_number}/{total_files}: {file_path}" + logger.warning(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + error_msg + ) + else: + # Other exceptions - log with traceback + logger.error(traceback.format_exc()) + error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}" + logger.error(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + traceback.format_exc() + ) + pipeline_status["history_messages"].append( + error_msg + ) # Cancel tasks that are not yet completed all_tasks = first_stage_tasks + ( @@ -1951,18 +1970,29 @@ class LightRAG: ) except Exception as e: - # Log error and update pipeline status - logger.error(traceback.format_exc()) - error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}" - logger.error(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append( - traceback.format_exc() - ) - pipeline_status["history_messages"].append( - error_msg - ) + # Check if this is a user cancellation + if isinstance(e, PipelineCancelledException): + # User cancellation - log brief message only, no traceback + error_msg = f"User cancelled during merge {current_file_number}/{total_files}: {file_path}" + logger.warning(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + error_msg + ) + else: + # Other exceptions - log with traceback + logger.error(traceback.format_exc()) + error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}" + logger.error(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + traceback.format_exc() + ) + pipeline_status["history_messages"].append( + error_msg + ) # Persistent llm cache if self.llm_response_cache: diff --git a/lightrag/operate.py b/lightrag/operate.py index 4bbf47fd..496c000c 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -1639,6 +1639,12 @@ async def _merge_nodes_then_upsert( logger.error(f"Entity {entity_name} has no description") raise ValueError(f"Entity {entity_name} has no description") + # Check for cancellation before LLM summary + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException("User cancelled during entity summary") + # 8. Get summary description an LLM usage status description, llm_was_used = await _handle_entity_relation_summary( "Entity", @@ -1959,6 +1965,14 @@ async def _merge_edges_then_upsert( logger.error(f"Relation {src_id}~{tgt_id} has no description") raise ValueError(f"Relation {src_id}~{tgt_id} has no description") + # Check for cancellation before LLM summary + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException( + "User cancelled during relation summary" + ) + # 8. Get summary description an LLM usage status description, llm_was_used = await _handle_entity_relation_summary( "Relation", From a9ec15e669f45d2f9bb5c438a9a0ad96d64570b0 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 25 Oct 2025 03:06:45 +0800 Subject: [PATCH 5/9] Resolve lock leakage issue during user cancellation handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Change default log level to INFO • Force enable error logging output • Add lock cleanup rollback protection • Handle LLM cache persistence errors • Fix async task exception handling --- lightrag/kg/shared_storage.py | 260 ++++++++++++++++++++++++++++++---- lightrag/lightrag.py | 18 ++- lightrag/operate.py | 86 +++++------ 3 files changed, 285 insertions(+), 79 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index e20dce52..26fc3832 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -12,7 +12,7 @@ from lightrag.exceptions import PipelineNotInitializedError # Define a direct print function for critical logs that must be visible in all processes -def direct_log(message, enable_output: bool = False, level: str = "DEBUG"): +def direct_log(message, enable_output: bool = False, level: str = "INFO"): """ Log a message directly to stderr to ensure visibility in all processes, including the Gunicorn master process. @@ -44,7 +44,6 @@ def direct_log(message, enable_output: bool = False, level: str = "DEBUG"): } message_level = level_mapping.get(level.upper(), logging.DEBUG) - # print(f"Diret_log: {level.upper()} {message_level} ? {current_level}", file=sys.stderr, flush=True) if message_level >= current_level: print(f"{level}: {message}", file=sys.stderr, flush=True) @@ -168,7 +167,7 @@ class UnifiedLock(Generic[T]): direct_log( f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}': {e}", level="ERROR", - enable_output=self._enable_logging, + enable_output=True, ) raise @@ -199,7 +198,7 @@ class UnifiedLock(Generic[T]): direct_log( f"== Lock == Process {self._pid}: Failed to release lock '{self._name}': {e}", level="ERROR", - enable_output=self._enable_logging, + enable_output=True, ) # If main lock release failed but async lock hasn't been released, try to release it @@ -223,7 +222,7 @@ class UnifiedLock(Generic[T]): direct_log( f"== Lock == Process {self._pid}: Failed to release async lock after main lock failure: {inner_e}", level="ERROR", - enable_output=self._enable_logging, + enable_output=True, ) raise @@ -247,7 +246,7 @@ class UnifiedLock(Generic[T]): direct_log( f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}' (sync): {e}", level="ERROR", - enable_output=self._enable_logging, + enable_output=True, ) raise @@ -269,7 +268,7 @@ class UnifiedLock(Generic[T]): direct_log( f"== Lock == Process {self._pid}: Failed to release lock '{self._name}' (sync): {e}", level="ERROR", - enable_output=self._enable_logging, + enable_output=True, ) raise @@ -401,7 +400,7 @@ def _perform_lock_cleanup( direct_log( f"== {lock_type} Lock == Cleanup failed: {e}", level="ERROR", - enable_output=False, + enable_output=True, ) return 0, earliest_cleanup_time, last_cleanup_time @@ -689,7 +688,7 @@ class KeyedUnifiedLock: direct_log( f"Error during multiprocess lock cleanup: {e}", level="ERROR", - enable_output=False, + enable_output=True, ) # 2. Cleanup async locks using generic function @@ -718,7 +717,7 @@ class KeyedUnifiedLock: direct_log( f"Error during async lock cleanup: {e}", level="ERROR", - enable_output=False, + enable_output=True, ) # 3. Get current status after cleanup @@ -772,7 +771,7 @@ class KeyedUnifiedLock: direct_log( f"Error getting keyed lock status: {e}", level="ERROR", - enable_output=False, + enable_output=True, ) return status @@ -797,32 +796,239 @@ class _KeyedLockContext: if enable_logging is not None else parent._default_enable_logging ) - self._ul: Optional[List["UnifiedLock"]] = None # set in __aenter__ + self._ul: Optional[List[Dict[str, Any]]] = None # set in __aenter__ # ----- enter ----- async def __aenter__(self): if self._ul is not None: raise RuntimeError("KeyedUnifiedLock already acquired in current context") - # acquire locks for all keys in the namespace self._ul = [] - for key in self._keys: - lock = self._parent._get_lock_for_key( - self._namespace, key, enable_logging=self._enable_logging - ) - await lock.__aenter__() - inc_debug_n_locks_acquired() - self._ul.append(lock) - return self + + try: + # Acquire locks for all keys in the namespace + for key in self._keys: + lock = None + entry = None + + try: + # 1. Get lock object (reference count is incremented here) + lock = self._parent._get_lock_for_key( + self._namespace, key, enable_logging=self._enable_logging + ) + + # 2. Immediately create and add entry to list (critical for rollback to work) + entry = { + "key": key, + "lock": lock, + "entered": False, + "debug_inc": False, + "ref_incremented": True, # Mark that reference count has been incremented + } + self._ul.append( + entry + ) # Add immediately after _get_lock_for_key for rollback to work + + # 3. Try to acquire the lock + # Use try-finally to ensure state is updated atomically + lock_acquired = False + try: + await lock.__aenter__() + lock_acquired = True # Lock successfully acquired + finally: + if lock_acquired: + entry["entered"] = True + inc_debug_n_locks_acquired() + entry["debug_inc"] = True + + except asyncio.CancelledError: + # Lock acquisition was cancelled + # The finally block above ensures entry["entered"] is correct + direct_log( + f"Lock acquisition cancelled for key {key}", + level="WARNING", + enable_output=self._enable_logging, + ) + raise + except Exception as e: + # Other exceptions, log and re-raise + direct_log( + f"Lock acquisition failed for key {key}: {e}", + level="ERROR", + enable_output=True, + ) + raise + + return self + + except BaseException: + # Critical: if any exception occurs (including CancelledError) during lock acquisition, + # we must rollback all already acquired locks to prevent lock leaks + # Use shield to ensure rollback completes + await asyncio.shield(self._rollback_acquired_locks()) + raise + + async def _rollback_acquired_locks(self): + """Rollback all acquired locks in case of exception during __aenter__""" + if not self._ul: + return + + async def rollback_single_entry(entry): + """Rollback a single lock acquisition""" + key = entry["key"] + lock = entry["lock"] + debug_inc = entry["debug_inc"] + entered = entry["entered"] + ref_incremented = entry.get( + "ref_incremented", True + ) # Default to True for safety + + errors = [] + + # 1. If lock was acquired, release it + if entered: + try: + await lock.__aexit__(None, None, None) + except Exception as e: + errors.append(("lock_exit", e)) + direct_log( + f"Lock rollback error for key {key}: {e}", + level="ERROR", + enable_output=True, + ) + + # 2. Release reference count (if it was incremented) + if ref_incremented: + try: + self._parent._release_lock_for_key(self._namespace, key) + except Exception as e: + errors.append(("ref_release", e)) + direct_log( + f"Lock rollback reference release error for key {key}: {e}", + level="ERROR", + enable_output=True, + ) + + # 3. Decrement debug counter + if debug_inc: + try: + dec_debug_n_locks_acquired() + except Exception as e: + errors.append(("debug_dec", e)) + direct_log( + f"Lock rollback counter decrementing error for key {key}: {e}", + level="ERROR", + enable_output=True, + ) + + return errors + + # Release already acquired locks in reverse order + for entry in reversed(self._ul): + # Use shield to protect each lock's rollback + try: + await asyncio.shield(rollback_single_entry(entry)) + except Exception as e: + # Log but continue rolling back other locks + direct_log( + f"Lock rollback unexpected error for {entry['key']}: {e}", + level="ERROR", + enable_output=True, + ) + + self._ul = None # ----- exit ----- async def __aexit__(self, exc_type, exc, tb): - # The UnifiedLock takes care of proper release order - for ul, key in zip(reversed(self._ul), reversed(self._keys)): - await ul.__aexit__(exc_type, exc, tb) - self._parent._release_lock_for_key(self._namespace, key) - dec_debug_n_locks_acquired() - self._ul = None + if self._ul is None: + return + + async def release_all_locks(): + """Release all locks with comprehensive error handling, protected from cancellation""" + + async def release_single_entry(entry, exc_type, exc, tb): + """Release a single lock with full protection""" + key = entry["key"] + lock = entry["lock"] + debug_inc = entry["debug_inc"] + entered = entry["entered"] + + errors = [] + + # 1. Release the lock + if entered: + try: + await lock.__aexit__(exc_type, exc, tb) + except Exception as e: + errors.append(("lock_exit", e)) + direct_log( + f"Lock release error for key {key}: {e}", + level="ERROR", + enable_output=True, + ) + + # 2. Release reference count + try: + self._parent._release_lock_for_key(self._namespace, key) + except Exception as e: + errors.append(("ref_release", e)) + direct_log( + f"Lock release reference error for key {key}: {e}", + level="ERROR", + enable_output=True, + ) + + # 3. Decrement debug counter + if debug_inc: + try: + dec_debug_n_locks_acquired() + except Exception as e: + errors.append(("debug_dec", e)) + direct_log( + f"Lock release counter decrementing error for key {key}: {e}", + level="ERROR", + enable_output=True, + ) + + return errors + + all_errors = [] + + # Release locks in reverse order + # This entire loop is protected by the outer shield + for entry in reversed(self._ul): + try: + errors = await release_single_entry(entry, exc_type, exc, tb) + for error_type, error in errors: + all_errors.append((entry["key"], error_type, error)) + except Exception as e: + all_errors.append((entry["key"], "unexpected", e)) + direct_log( + f"Lock release unexpected error for {entry['key']}: {e}", + level="ERROR", + enable_output=True, + ) + + return all_errors + + # CRITICAL: Protect the entire release process with shield + # This ensures that even if cancellation occurs, all locks are released + try: + all_errors = await asyncio.shield(release_all_locks()) + except Exception as e: + direct_log( + f"Critical error during __aexit__ cleanup: {e}", + level="ERROR", + enable_output=True, + ) + all_errors = [] + finally: + # Always clear the lock list, even if shield was cancelled + self._ul = None + + # If there were release errors and no other exception, raise the first release error + if all_errors and exc_type is None: + raise all_errors[0][2] # (key, error_type, error) def get_internal_lock(enable_logging: bool = False) -> UnifiedLock: diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index dff637f6..24ea0209 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1871,9 +1871,14 @@ class LightRAG: if task and not task.done(): task.cancel() - # Persistent llm cache + # Persistent llm cache with error handling if self.llm_response_cache: - await self.llm_response_cache.index_done_callback() + try: + await self.llm_response_cache.index_done_callback() + except Exception as persist_error: + logger.error( + f"Failed to persist LLM cache: {persist_error}" + ) # Record processing end time for failed case processing_end_time = int(time.time()) @@ -1994,9 +1999,14 @@ class LightRAG: error_msg ) - # Persistent llm cache + # Persistent llm cache with error handling if self.llm_response_cache: - await self.llm_response_cache.index_done_callback() + try: + await self.llm_response_cache.index_done_callback() + except Exception as persist_error: + logger.error( + f"Failed to persist LLM cache: {persist_error}" + ) # Record processing end time for failed case processing_end_time = int(time.time()) diff --git a/lightrag/operate.py b/lightrag/operate.py index 496c000c..36c8251d 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -2302,9 +2302,7 @@ async def merge_nodes_and_edges( return entity_data except Exception as e: - error_msg = ( - f"Critical error in entity processing for `{entity_name}`: {e}" - ) + error_msg = f"Error processing entity `{entity_name}`: {e}" logger.error(error_msg) # Try to update pipeline status, but don't let status update failure affect main exception @@ -2340,36 +2338,32 @@ async def merge_nodes_and_edges( entity_tasks, return_when=asyncio.FIRST_EXCEPTION ) - # Check if any task raised an exception and ensure all exceptions are retrieved first_exception = None - successful_results = [] + processed_entities = [] for task in done: try: - exception = task.exception() - if exception is not None: - if first_exception is None: - first_exception = exception - else: - successful_results.append(task.result()) - except Exception as e: + result = task.result() + except BaseException as e: if first_exception is None: first_exception = e + else: + processed_entities.append(result) + + if pending: + for task in pending: + task.cancel() + pending_results = await asyncio.gather(*pending, return_exceptions=True) + for result in pending_results: + if isinstance(result, BaseException): + if first_exception is None: + first_exception = result + else: + processed_entities.append(result) - # If any task failed, cancel all pending tasks and raise the first exception if first_exception is not None: - # Cancel all pending tasks - for pending_task in pending: - pending_task.cancel() - # Wait for cancellation to complete - if pending: - await asyncio.wait(pending) - # Re-raise the first exception to notify the caller raise first_exception - # If all tasks completed successfully, collect results - processed_entities = [task.result() for task in entity_tasks] - # ===== Phase 2: Process all relationships concurrently ===== log_message = f"Phase 2: Processing {total_relations_count} relations from {doc_id} (async: {graph_max_async})" logger.info(log_message) @@ -2421,7 +2415,7 @@ async def merge_nodes_and_edges( return edge_data, added_entities except Exception as e: - error_msg = f"Critical error in relationship processing for `{sorted_edge_key}`: {e}" + error_msg = f"Error processing relation `{sorted_edge_key}`: {e}" logger.error(error_msg) # Try to update pipeline status, but don't let status update failure affect main exception @@ -2459,40 +2453,36 @@ async def merge_nodes_and_edges( edge_tasks, return_when=asyncio.FIRST_EXCEPTION ) - # Check if any task raised an exception and ensure all exceptions are retrieved first_exception = None - successful_results = [] for task in done: try: - exception = task.exception() - if exception is not None: - if first_exception is None: - first_exception = exception - else: - successful_results.append(task.result()) - except Exception as e: + edge_data, added_entities = task.result() + except BaseException as e: if first_exception is None: first_exception = e + else: + if edge_data is not None: + processed_edges.append(edge_data) + all_added_entities.extend(added_entities) + + if pending: + for task in pending: + task.cancel() + pending_results = await asyncio.gather(*pending, return_exceptions=True) + for result in pending_results: + if isinstance(result, BaseException): + if first_exception is None: + first_exception = result + else: + edge_data, added_entities = result + if edge_data is not None: + processed_edges.append(edge_data) + all_added_entities.extend(added_entities) - # If any task failed, cancel all pending tasks and raise the first exception if first_exception is not None: - # Cancel all pending tasks - for pending_task in pending: - pending_task.cancel() - # Wait for cancellation to complete - if pending: - await asyncio.wait(pending) - # Re-raise the first exception to notify the caller raise first_exception - # If all tasks completed successfully, collect results - for task in edge_tasks: - edge_data, added_entities = task.result() - if edge_data is not None: - processed_edges.append(edge_data) - all_added_entities.extend(added_entities) - # ===== Phase 3: Update full_entities and full_relations storage ===== if full_entities_storage and full_relations_storage and doc_id: try: From 2476d6b7f8a4d83731d8234c818ccda27d55bf56 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 25 Oct 2025 03:34:54 +0800 Subject: [PATCH 6/9] Simplify pipeline status dialog by consolidating message sections MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Remove separate latest message section • Combine into single pipeline messages area • Add overflow-x-hidden for better display • Change break-words to break-all • Update translations across all locales --- .../components/documents/PipelineStatusDialog.tsx | 14 +++----------- lightrag_webui/src/locales/ar.json | 3 +-- lightrag_webui/src/locales/en.json | 3 +-- lightrag_webui/src/locales/fr.json | 3 +-- lightrag_webui/src/locales/zh.json | 3 +-- lightrag_webui/src/locales/zh_TW.json | 3 +-- 6 files changed, 8 insertions(+), 21 deletions(-) diff --git a/lightrag_webui/src/components/documents/PipelineStatusDialog.tsx b/lightrag_webui/src/components/documents/PipelineStatusDialog.tsx index 0aab7dfe..956538fd 100644 --- a/lightrag_webui/src/components/documents/PipelineStatusDialog.tsx +++ b/lightrag_webui/src/components/documents/PipelineStatusDialog.tsx @@ -216,25 +216,17 @@ export default function PipelineStatusDialog({
- {/* Latest Message */} -
-
{t('documentPanel.pipelineStatus.latestMessage')}:
-
- {status?.latest_message || '-'} -
-
- {/* History Messages */}
-
{t('documentPanel.pipelineStatus.historyMessages')}:
+
{t('documentPanel.pipelineStatus.pipelineMessages')}:
{status?.history_messages?.length ? ( status.history_messages.map((msg, idx) => ( -
{msg}
+
{msg}
)) ) : '-'}
diff --git a/lightrag_webui/src/locales/ar.json b/lightrag_webui/src/locales/ar.json index 67b90629..9d78da19 100644 --- a/lightrag_webui/src/locales/ar.json +++ b/lightrag_webui/src/locales/ar.json @@ -165,8 +165,7 @@ "startTime": "وقت البدء", "progress": "التقدم", "unit": "دفعة", - "latestMessage": "آخر رسالة", - "historyMessages": "رسائل السجل", + "pipelineMessages": "رسائل خط الأنابيب", "cancelButton": "إلغاء", "cancelTooltip": "إلغاء معالجة خط الأنابيب", "cancelInProgress": "الإلغاء قيد التقدم...", diff --git a/lightrag_webui/src/locales/en.json b/lightrag_webui/src/locales/en.json index e48c0207..205a7e66 100644 --- a/lightrag_webui/src/locales/en.json +++ b/lightrag_webui/src/locales/en.json @@ -165,8 +165,7 @@ "startTime": "Start Time", "progress": "Progress", "unit": "Batch", - "latestMessage": "Latest Message", - "historyMessages": "History Messages", + "pipelineMessages": "Pipeline Messages", "cancelButton": "Cancel", "cancelTooltip": "Cancel pipeline processing", "cancelInProgress": "Cancellation in progress...", diff --git a/lightrag_webui/src/locales/fr.json b/lightrag_webui/src/locales/fr.json index b75e3e63..48dbe03d 100644 --- a/lightrag_webui/src/locales/fr.json +++ b/lightrag_webui/src/locales/fr.json @@ -165,8 +165,7 @@ "startTime": "Heure de Début", "progress": "Progrès", "unit": "Lot", - "latestMessage": "Dernier Message", - "historyMessages": "Messages d'Historique", + "pipelineMessages": "Messages de Pipeline", "cancelButton": "Annuler", "cancelTooltip": "Annuler le traitement du pipeline", "cancelInProgress": "Annulation en cours...", diff --git a/lightrag_webui/src/locales/zh.json b/lightrag_webui/src/locales/zh.json index 2712a6d2..b361e1ff 100644 --- a/lightrag_webui/src/locales/zh.json +++ b/lightrag_webui/src/locales/zh.json @@ -165,8 +165,7 @@ "startTime": "开始时间", "progress": "进度", "unit": "批", - "latestMessage": "最新消息", - "historyMessages": "历史消息", + "pipelineMessages": "流水线消息", "cancelButton": "中断", "cancelTooltip": "中断流水线处理", "cancelInProgress": "取消请求进行中...", diff --git a/lightrag_webui/src/locales/zh_TW.json b/lightrag_webui/src/locales/zh_TW.json index 8fb06bf8..e822aa77 100644 --- a/lightrag_webui/src/locales/zh_TW.json +++ b/lightrag_webui/src/locales/zh_TW.json @@ -165,8 +165,7 @@ "startTime": "開始時間", "progress": "進度", "unit": "批", - "latestMessage": "最新消息", - "historyMessages": "歷史消息", + "pipelineMessages": "流水線消息", "cancelButton": "中斷", "cancelTooltip": "中斷流水線處理", "cancelInProgress": "取消請求進行中...", From 81e3496aa42103a7d2287e17ff440ecd6cad2933 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 25 Oct 2025 03:55:00 +0800 Subject: [PATCH 7/9] Add confirmation dialog for pipeline cancellation --- .../documents/PipelineStatusDialog.tsx | 37 +++++++++++++++++-- lightrag_webui/src/locales/ar.json | 3 ++ lightrag_webui/src/locales/en.json | 3 ++ lightrag_webui/src/locales/fr.json | 3 ++ lightrag_webui/src/locales/zh.json | 3 ++ lightrag_webui/src/locales/zh_TW.json | 3 ++ 6 files changed, 49 insertions(+), 3 deletions(-) diff --git a/lightrag_webui/src/components/documents/PipelineStatusDialog.tsx b/lightrag_webui/src/components/documents/PipelineStatusDialog.tsx index 956538fd..c368d69c 100644 --- a/lightrag_webui/src/components/documents/PipelineStatusDialog.tsx +++ b/lightrag_webui/src/components/documents/PipelineStatusDialog.tsx @@ -30,6 +30,7 @@ export default function PipelineStatusDialog({ const [status, setStatus] = useState(null) const [position, setPosition] = useState('center') const [isUserScrolled, setIsUserScrolled] = useState(false) + const [showCancelConfirm, setShowCancelConfirm] = useState(false) const historyRef = useRef(null) // Reset position when dialog opens @@ -37,6 +38,9 @@ export default function PipelineStatusDialog({ if (open) { setPosition('center') setIsUserScrolled(false) + } else { + // Reset confirmation dialog state when main dialog closes + setShowCancelConfirm(false) } }, [open]) @@ -81,8 +85,9 @@ export default function PipelineStatusDialog({ return () => clearInterval(interval) }, [open, t]) - // Handle cancel pipeline - const handleCancelPipeline = async () => { + // Handle cancel pipeline confirmation + const handleConfirmCancel = async () => { + setShowCancelConfirm(false) try { const result = await cancelPipeline() if (result.status === 'cancellation_requested') { @@ -186,7 +191,7 @@ export default function PipelineStatusDialog({ variant="destructive" size="sm" disabled={!canCancel} - onClick={handleCancelPipeline} + onClick={() => setShowCancelConfirm(true)} title={ status?.cancellation_requested ? t('documentPanel.pipelineStatus.cancelInProgress') @@ -233,6 +238,32 @@ export default function PipelineStatusDialog({
+ + {/* Cancel Confirmation Dialog */} + + + + {t('documentPanel.pipelineStatus.cancelConfirmTitle')} + + {t('documentPanel.pipelineStatus.cancelConfirmDescription')} + + +
+ + +
+
+
) } diff --git a/lightrag_webui/src/locales/ar.json b/lightrag_webui/src/locales/ar.json index 9d78da19..d81b5480 100644 --- a/lightrag_webui/src/locales/ar.json +++ b/lightrag_webui/src/locales/ar.json @@ -168,6 +168,9 @@ "pipelineMessages": "رسائل خط الأنابيب", "cancelButton": "إلغاء", "cancelTooltip": "إلغاء معالجة خط الأنابيب", + "cancelConfirmTitle": "تأكيد إلغاء خط الأنابيب", + "cancelConfirmDescription": "سيؤدي هذا الإجراء إلى إيقاف معالجة خط الأنابيب الجارية. هل أنت متأكد من أنك تريد المتابعة؟", + "cancelConfirmButton": "تأكيد الإلغاء", "cancelInProgress": "الإلغاء قيد التقدم...", "pipelineNotRunning": "خط الأنابيب غير قيد التشغيل", "cancelSuccess": "تم طلب إلغاء خط الأنابيب", diff --git a/lightrag_webui/src/locales/en.json b/lightrag_webui/src/locales/en.json index 205a7e66..9498a8e6 100644 --- a/lightrag_webui/src/locales/en.json +++ b/lightrag_webui/src/locales/en.json @@ -168,6 +168,9 @@ "pipelineMessages": "Pipeline Messages", "cancelButton": "Cancel", "cancelTooltip": "Cancel pipeline processing", + "cancelConfirmTitle": "Confirm Pipeline Cancellation", + "cancelConfirmDescription": "This will interrupt the ongoing pipeline processing. Are you sure you want to continue?", + "cancelConfirmButton": "Confirm Cancellation", "cancelInProgress": "Cancellation in progress...", "pipelineNotRunning": "Pipeline not running", "cancelSuccess": "Pipeline cancellation requested", diff --git a/lightrag_webui/src/locales/fr.json b/lightrag_webui/src/locales/fr.json index 48dbe03d..2a5448d0 100644 --- a/lightrag_webui/src/locales/fr.json +++ b/lightrag_webui/src/locales/fr.json @@ -168,6 +168,9 @@ "pipelineMessages": "Messages de Pipeline", "cancelButton": "Annuler", "cancelTooltip": "Annuler le traitement du pipeline", + "cancelConfirmTitle": "Confirmer l'Annulation du Pipeline", + "cancelConfirmDescription": "Cette action interrompra le traitement du pipeline en cours. Êtes-vous sûr de vouloir continuer ?", + "cancelConfirmButton": "Confirmer l'Annulation", "cancelInProgress": "Annulation en cours...", "pipelineNotRunning": "Le pipeline n'est pas en cours d'exécution", "cancelSuccess": "Annulation du pipeline demandée", diff --git a/lightrag_webui/src/locales/zh.json b/lightrag_webui/src/locales/zh.json index b361e1ff..9791de7a 100644 --- a/lightrag_webui/src/locales/zh.json +++ b/lightrag_webui/src/locales/zh.json @@ -168,6 +168,9 @@ "pipelineMessages": "流水线消息", "cancelButton": "中断", "cancelTooltip": "中断流水线处理", + "cancelConfirmTitle": "确认中断流水线", + "cancelConfirmDescription": "此操作将中断正在进行的流水线处理。确定要继续吗?", + "cancelConfirmButton": "确认中断", "cancelInProgress": "取消请求进行中...", "pipelineNotRunning": "流水线未运行", "cancelSuccess": "流水线中断请求已发送", diff --git a/lightrag_webui/src/locales/zh_TW.json b/lightrag_webui/src/locales/zh_TW.json index e822aa77..6c5c1a34 100644 --- a/lightrag_webui/src/locales/zh_TW.json +++ b/lightrag_webui/src/locales/zh_TW.json @@ -168,6 +168,9 @@ "pipelineMessages": "流水線消息", "cancelButton": "中斷", "cancelTooltip": "中斷流水線處理", + "cancelConfirmTitle": "確認中斷流水線", + "cancelConfirmDescription": "此操作將中斷正在進行的流水線處理。確定要繼續嗎?", + "cancelConfirmButton": "確認中斷", "cancelInProgress": "取消請求進行中...", "pipelineNotRunning": "流水線未運行", "cancelSuccess": "流水線中斷請求已發送", From 9ed19695bb157cf15f378479f456e0a7d2828cd4 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 25 Oct 2025 04:12:44 +0800 Subject: [PATCH 8/9] Remove separate retry button and merge functionality into scan button --- .../src/features/DocumentManager.tsx | 47 ------------------- lightrag_webui/src/locales/ar.json | 4 +- lightrag_webui/src/locales/en.json | 4 +- lightrag_webui/src/locales/fr.json | 4 +- lightrag_webui/src/locales/zh.json | 4 +- lightrag_webui/src/locales/zh_TW.json | 4 +- 6 files changed, 5 insertions(+), 62 deletions(-) diff --git a/lightrag_webui/src/features/DocumentManager.tsx b/lightrag_webui/src/features/DocumentManager.tsx index 204c7daf..406faf2b 100644 --- a/lightrag_webui/src/features/DocumentManager.tsx +++ b/lightrag_webui/src/features/DocumentManager.tsx @@ -21,7 +21,6 @@ import PaginationControls from '@/components/ui/PaginationControls' import { scanNewDocuments, - reprocessFailedDocuments, getDocumentsPaginated, DocsStatusesResponse, DocStatus, @@ -868,42 +867,6 @@ export default function DocumentManager() { } }, [t, startPollingInterval, currentTab, health, statusCounts]) - const retryFailedDocuments = useCallback(async () => { - try { - // Check if component is still mounted before starting the request - if (!isMountedRef.current) return; - - const { status, message, track_id: _track_id } = await reprocessFailedDocuments(); // eslint-disable-line @typescript-eslint/no-unused-vars - - // Check again if component is still mounted after the request completes - if (!isMountedRef.current) return; - - // Note: _track_id is available for future use (e.g., progress tracking) - toast.message(message || status); - - // Reset health check timer with 1 second delay to avoid race condition - useBackendState.getState().resetHealthCheckTimerDelayed(1000); - - // Start fast refresh with 2-second interval immediately after retry - startPollingInterval(2000); - - // Set recovery timer to restore normal polling interval after 15 seconds - setTimeout(() => { - if (isMountedRef.current && currentTab === 'documents' && health) { - // Restore intelligent polling interval based on document status - const hasActiveDocuments = hasActiveDocumentsStatus(statusCounts); - const normalInterval = hasActiveDocuments ? 5000 : 30000; - startPollingInterval(normalInterval); - } - }, 15000); // Restore after 15 seconds - } catch (err) { - // Only show error if component is still mounted - if (isMountedRef.current) { - toast.error(errorMessage(err)); - } - } - }, [startPollingInterval, currentTab, health, statusCounts]) - // Handle page size change - update state and save to store const handlePageSizeChange = useCallback((newPageSize: number) => { if (newPageSize === pagination.page_size) return; @@ -1166,16 +1129,6 @@ export default function DocumentManager() { > {t('documentPanel.documentManager.scanButton')} -