From 8dc23eeff298ce755c1c7fc15aa048d5728b50cf Mon Sep 17 00:00:00 2001 From: yangdx Date: Wed, 22 Oct 2025 20:15:29 +0800 Subject: [PATCH 01/20] Fix RayAnything compatible problem MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Use "preprocessed" to indicate multimodal processing is required • Update DocProcessingStatus to process status convertion automatically • Remove multimodal_processed from DocStatus enum value • Update UI filter logic --- lightrag/api/routers/document_routes.py | 2 +- lightrag/base.py | 21 +++++++++++++- lightrag_webui/src/api/lightrag.ts | 2 +- .../src/features/DocumentManager.tsx | 28 +++++++++---------- 4 files changed, 36 insertions(+), 17 deletions(-) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 848d5eb8..9ea831d2 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -458,7 +458,7 @@ class DocsStatusesResponse(BaseModel): "id": "doc_789", "content_summary": "Document pending final indexing", "content_length": 7200, - "status": "multimodal_processed", + "status": "preprocessed", "created_at": "2025-03-31T09:30:00", "updated_at": "2025-03-31T09:35:00", "track_id": "upload_20250331_093000_xyz789", diff --git a/lightrag/base.py b/lightrag/base.py index e569de2a..3cf40136 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -720,7 +720,7 @@ class DocStatus(str, Enum): PENDING = "pending" PROCESSING = "processing" - PREPROCESSED = "multimodal_processed" + PREPROCESSED = "preprocessed" PROCESSED = "processed" FAILED = "failed" @@ -751,6 +751,25 @@ class DocProcessingStatus: """Error message if failed""" metadata: dict[str, Any] = field(default_factory=dict) """Additional metadata""" + multimodal_processed: bool | None = field(default=None, repr=False) + """Internal field: indicates if multimodal processing is complete. Not shown in repr() but accessible for debugging.""" + + def __post_init__(self): + """ + Handle status conversion based on multimodal_processed field. + + Business rules: + - If multimodal_processed is False and status is PROCESSED, + then change status to PREPROCESSED + - The multimodal_processed field is kept (with repr=False) for internal use and debugging + """ + # Apply status conversion logic + if self.multimodal_processed is not None: + if ( + self.multimodal_processed is False + and self.status == DocStatus.PROCESSED + ): + self.status = DocStatus.PREPROCESSED @dataclass diff --git a/lightrag_webui/src/api/lightrag.ts b/lightrag_webui/src/api/lightrag.ts index cf9a7e7a..eac6989e 100644 --- a/lightrag_webui/src/api/lightrag.ts +++ b/lightrag_webui/src/api/lightrag.ts @@ -167,7 +167,7 @@ export type DeleteDocResponse = { doc_id: string } -export type DocStatus = 'pending' | 'processing' | 'multimodal_processed' | 'processed' | 'failed' +export type DocStatus = 'pending' | 'processing' | 'preprocessed' | 'processed' | 'failed' export type DocStatusResponse = { id: string diff --git a/lightrag_webui/src/features/DocumentManager.tsx b/lightrag_webui/src/features/DocumentManager.tsx index 530e98c7..204c7daf 100644 --- a/lightrag_webui/src/features/DocumentManager.tsx +++ b/lightrag_webui/src/features/DocumentManager.tsx @@ -52,7 +52,7 @@ const getCountValue = (counts: Record, ...keys: string[]): numbe const hasActiveDocumentsStatus = (counts: Record): boolean => getCountValue(counts, 'PROCESSING', 'processing') > 0 || getCountValue(counts, 'PENDING', 'pending') > 0 || - getCountValue(counts, 'PREPROCESSED', 'preprocessed', 'multimodal_processed') > 0 + getCountValue(counts, 'PREPROCESSED', 'preprocessed') > 0 const getDisplayFileName = (doc: DocStatusResponse, maxLength: number = 20): string => { // Check if file_path exists and is a non-empty string @@ -257,7 +257,7 @@ export default function DocumentManager() { const [pageByStatus, setPageByStatus] = useState>({ all: 1, processed: 1, - multimodal_processed: 1, + preprocessed: 1, processing: 1, pending: 1, failed: 1, @@ -324,7 +324,7 @@ export default function DocumentManager() { setPageByStatus({ all: 1, processed: 1, - 'multimodal_processed': 1, + preprocessed: 1, processing: 1, pending: 1, failed: 1, @@ -471,8 +471,8 @@ export default function DocumentManager() { const processedCount = getCountValue(statusCounts, 'PROCESSED', 'processed') || documentCounts.processed || 0; const preprocessedCount = - getCountValue(statusCounts, 'PREPROCESSED', 'preprocessed', 'multimodal_processed') || - documentCounts.multimodal_processed || + getCountValue(statusCounts, 'PREPROCESSED', 'preprocessed') || + documentCounts.preprocessed || 0; const processingCount = getCountValue(statusCounts, 'PROCESSING', 'processing') || documentCounts.processing || 0; const pendingCount = getCountValue(statusCounts, 'PENDING', 'pending') || documentCounts.pending || 0; @@ -481,7 +481,7 @@ export default function DocumentManager() { // Store previous status counts const prevStatusCounts = useRef({ processed: 0, - multimodal_processed: 0, + preprocessed: 0, processing: 0, pending: 0, failed: 0 @@ -572,7 +572,7 @@ export default function DocumentManager() { const legacyDocs: DocsStatusesResponse = { statuses: { processed: response.documents.filter((doc: DocStatusResponse) => doc.status === 'processed'), - multimodal_processed: response.documents.filter((doc: DocStatusResponse) => doc.status === 'multimodal_processed'), + preprocessed: response.documents.filter((doc: DocStatusResponse) => doc.status === 'preprocessed'), processing: response.documents.filter((doc: DocStatusResponse) => doc.status === 'processing'), pending: response.documents.filter((doc: DocStatusResponse) => doc.status === 'pending'), failed: response.documents.filter((doc: DocStatusResponse) => doc.status === 'failed') @@ -915,7 +915,7 @@ export default function DocumentManager() { setPageByStatus({ all: 1, processed: 1, - multimodal_processed: 1, + preprocessed: 1, processing: 1, pending: 1, failed: 1, @@ -956,7 +956,7 @@ export default function DocumentManager() { const legacyDocs: DocsStatusesResponse = { statuses: { processed: response.documents.filter(doc => doc.status === 'processed'), - multimodal_processed: response.documents.filter(doc => doc.status === 'multimodal_processed'), + preprocessed: response.documents.filter(doc => doc.status === 'preprocessed'), processing: response.documents.filter(doc => doc.status === 'processing'), pending: response.documents.filter(doc => doc.status === 'pending'), failed: response.documents.filter(doc => doc.status === 'failed') @@ -1032,7 +1032,7 @@ export default function DocumentManager() { // Get new status counts const newStatusCounts = { processed: docs?.statuses?.processed?.length || 0, - multimodal_processed: docs?.statuses?.multimodal_processed?.length || 0, + preprocessed: docs?.statuses?.preprocessed?.length || 0, processing: docs?.statuses?.processing?.length || 0, pending: docs?.statuses?.pending?.length || 0, failed: docs?.statuses?.failed?.length || 0 @@ -1270,12 +1270,12 @@ export default function DocumentManager() { + )} {/* Job Information */} diff --git a/lightrag_webui/src/locales/ar.json b/lightrag_webui/src/locales/ar.json index be0c82cb..67b90629 100644 --- a/lightrag_webui/src/locales/ar.json +++ b/lightrag_webui/src/locales/ar.json @@ -157,17 +157,25 @@ "hideFileNameTooltip": "إخفاء اسم الملف" }, "pipelineStatus": { - "title": "حالة خط المعالجة", - "busy": "خط المعالجة مشغول", - "requestPending": "الطلب معلق", + "title": "حالة خط الأنابيب", + "busy": "خط الأنابيب مشغول", + "requestPending": "طلب معلق", + "cancellationRequested": "طلب الإلغاء", "jobName": "اسم المهمة", "startTime": "وقت البدء", "progress": "التقدم", "unit": "دفعة", "latestMessage": "آخر رسالة", - "historyMessages": "سجل الرسائل", + "historyMessages": "رسائل السجل", + "cancelButton": "إلغاء", + "cancelTooltip": "إلغاء معالجة خط الأنابيب", + "cancelInProgress": "الإلغاء قيد التقدم...", + "pipelineNotRunning": "خط الأنابيب غير قيد التشغيل", + "cancelSuccess": "تم طلب إلغاء خط الأنابيب", + "cancelFailed": "فشل إلغاء خط الأنابيب\n{{error}}", + "cancelNotBusy": "خط الأنابيب غير قيد التشغيل، لا حاجة للإلغاء", "errors": { - "fetchFailed": "فشل في جلب حالة خط المعالجة\n{{error}}" + "fetchFailed": "فشل في جلب حالة خط الأنابيب\n{{error}}" } } }, diff --git a/lightrag_webui/src/locales/en.json b/lightrag_webui/src/locales/en.json index 5ce4b3df..e48c0207 100644 --- a/lightrag_webui/src/locales/en.json +++ b/lightrag_webui/src/locales/en.json @@ -160,14 +160,22 @@ "title": "Pipeline Status", "busy": "Pipeline Busy", "requestPending": "Request Pending", + "cancellationRequested": "Cancellation Requested", "jobName": "Job Name", "startTime": "Start Time", "progress": "Progress", - "unit": "batch", + "unit": "Batch", "latestMessage": "Latest Message", "historyMessages": "History Messages", + "cancelButton": "Cancel", + "cancelTooltip": "Cancel pipeline processing", + "cancelInProgress": "Cancellation in progress...", + "pipelineNotRunning": "Pipeline not running", + "cancelSuccess": "Pipeline cancellation requested", + "cancelFailed": "Failed to cancel pipeline\n{{error}}", + "cancelNotBusy": "Pipeline is not running, no need to cancel", "errors": { - "fetchFailed": "Failed to get pipeline status\n{{error}}" + "fetchFailed": "Failed to fetch pipeline status\n{{error}}" } } }, diff --git a/lightrag_webui/src/locales/fr.json b/lightrag_webui/src/locales/fr.json index 941b55de..b75e3e63 100644 --- a/lightrag_webui/src/locales/fr.json +++ b/lightrag_webui/src/locales/fr.json @@ -158,14 +158,22 @@ }, "pipelineStatus": { "title": "État du Pipeline", - "busy": "Pipeline occupé", - "requestPending": "Requête en attente", - "jobName": "Nom du travail", - "startTime": "Heure de début", - "progress": "Progression", - "unit": "lot", - "latestMessage": "Dernier message", - "historyMessages": "Historique des messages", + "busy": "Pipeline Occupé", + "requestPending": "Demande en Attente", + "cancellationRequested": "Annulation Demandée", + "jobName": "Nom du Travail", + "startTime": "Heure de Début", + "progress": "Progrès", + "unit": "Lot", + "latestMessage": "Dernier Message", + "historyMessages": "Messages d'Historique", + "cancelButton": "Annuler", + "cancelTooltip": "Annuler le traitement du pipeline", + "cancelInProgress": "Annulation en cours...", + "pipelineNotRunning": "Le pipeline n'est pas en cours d'exécution", + "cancelSuccess": "Annulation du pipeline demandée", + "cancelFailed": "Échec de l'annulation du pipeline\n{{error}}", + "cancelNotBusy": "Le pipeline n'est pas en cours d'exécution, pas besoin d'annuler", "errors": { "fetchFailed": "Échec de la récupération de l'état du pipeline\n{{error}}" } diff --git a/lightrag_webui/src/locales/zh.json b/lightrag_webui/src/locales/zh.json index 3bbb31aa..2712a6d2 100644 --- a/lightrag_webui/src/locales/zh.json +++ b/lightrag_webui/src/locales/zh.json @@ -160,12 +160,20 @@ "title": "流水线状态", "busy": "流水线忙碌", "requestPending": "待处理请求", + "cancellationRequested": "取消请求", "jobName": "作业名称", "startTime": "开始时间", "progress": "进度", "unit": "批", "latestMessage": "最新消息", "historyMessages": "历史消息", + "cancelButton": "中断", + "cancelTooltip": "中断流水线处理", + "cancelInProgress": "取消请求进行中...", + "pipelineNotRunning": "流水线未运行", + "cancelSuccess": "流水线中断请求已发送", + "cancelFailed": "中断流水线失败\n{{error}}", + "cancelNotBusy": "流水线未运行,无需中断", "errors": { "fetchFailed": "获取流水线状态失败\n{{error}}" } diff --git a/lightrag_webui/src/locales/zh_TW.json b/lightrag_webui/src/locales/zh_TW.json index e4387e98..8fb06bf8 100644 --- a/lightrag_webui/src/locales/zh_TW.json +++ b/lightrag_webui/src/locales/zh_TW.json @@ -157,17 +157,25 @@ "hideFileNameTooltip": "隱藏檔案名稱" }, "pipelineStatus": { - "title": "pipeline 狀態", - "busy": "pipeline 忙碌中", + "title": "流水線狀態", + "busy": "流水線忙碌", "requestPending": "待處理請求", - "jobName": "工作名稱", + "cancellationRequested": "取消請求", + "jobName": "作業名稱", "startTime": "開始時間", "progress": "進度", - "unit": "梯次", - "latestMessage": "最新訊息", - "historyMessages": "歷史訊息", + "unit": "批", + "latestMessage": "最新消息", + "historyMessages": "歷史消息", + "cancelButton": "中斷", + "cancelTooltip": "中斷流水線處理", + "cancelInProgress": "取消請求進行中...", + "pipelineNotRunning": "流水線未運行", + "cancelSuccess": "流水線中斷請求已發送", + "cancelFailed": "中斷流水線失敗\n{{error}}", + "cancelNotBusy": "流水線未運行,無需中斷", "errors": { - "fetchFailed": "取得pipeline 狀態失敗\n{{error}}" + "fetchFailed": "獲取流水線狀態失敗\n{{error}}" } } }, From 77336e50b6d5d0089a05488dc7ed02d59364e0a1 Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 24 Oct 2025 17:54:17 +0800 Subject: [PATCH 09/20] Improve error handling and add cancellation checks in pipeline --- lightrag/lightrag.py | 76 ++++++++++++++++++++++++++++++-------------- lightrag/operate.py | 14 ++++++++ 2 files changed, 67 insertions(+), 23 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 191a5acd..dff637f6 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1699,10 +1699,16 @@ class LightRAG: semaphore: asyncio.Semaphore, ) -> None: """Process single document""" + # Initialize variables at the start to prevent UnboundLocalError in error handling + file_path = "unknown_source" + current_file_number = 0 file_extraction_stage_ok = False + processing_start_time = int(time.time()) + first_stage_tasks = [] + entity_relation_task = None + async with semaphore: nonlocal processed_count - current_file_number = 0 # Initialize to prevent UnboundLocalError in error handling first_stage_tasks = [] entity_relation_task = None @@ -1833,16 +1839,29 @@ class LightRAG: file_extraction_stage_ok = True except Exception as e: - # Log error and update pipeline status - logger.error(traceback.format_exc()) - error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}" - logger.error(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append( - traceback.format_exc() - ) - pipeline_status["history_messages"].append(error_msg) + # Check if this is a user cancellation + if isinstance(e, PipelineCancelledException): + # User cancellation - log brief message only, no traceback + error_msg = f"User cancelled {current_file_number}/{total_files}: {file_path}" + logger.warning(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + error_msg + ) + else: + # Other exceptions - log with traceback + logger.error(traceback.format_exc()) + error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}" + logger.error(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + traceback.format_exc() + ) + pipeline_status["history_messages"].append( + error_msg + ) # Cancel tasks that are not yet completed all_tasks = first_stage_tasks + ( @@ -1951,18 +1970,29 @@ class LightRAG: ) except Exception as e: - # Log error and update pipeline status - logger.error(traceback.format_exc()) - error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}" - logger.error(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append( - traceback.format_exc() - ) - pipeline_status["history_messages"].append( - error_msg - ) + # Check if this is a user cancellation + if isinstance(e, PipelineCancelledException): + # User cancellation - log brief message only, no traceback + error_msg = f"User cancelled during merge {current_file_number}/{total_files}: {file_path}" + logger.warning(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + error_msg + ) + else: + # Other exceptions - log with traceback + logger.error(traceback.format_exc()) + error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}" + logger.error(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + traceback.format_exc() + ) + pipeline_status["history_messages"].append( + error_msg + ) # Persistent llm cache if self.llm_response_cache: diff --git a/lightrag/operate.py b/lightrag/operate.py index 4bbf47fd..496c000c 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -1639,6 +1639,12 @@ async def _merge_nodes_then_upsert( logger.error(f"Entity {entity_name} has no description") raise ValueError(f"Entity {entity_name} has no description") + # Check for cancellation before LLM summary + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException("User cancelled during entity summary") + # 8. Get summary description an LLM usage status description, llm_was_used = await _handle_entity_relation_summary( "Entity", @@ -1959,6 +1965,14 @@ async def _merge_edges_then_upsert( logger.error(f"Relation {src_id}~{tgt_id} has no description") raise ValueError(f"Relation {src_id}~{tgt_id} has no description") + # Check for cancellation before LLM summary + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + if pipeline_status.get("cancellation_requested", False): + raise PipelineCancelledException( + "User cancelled during relation summary" + ) + # 8. Get summary description an LLM usage status description, llm_was_used = await _handle_entity_relation_summary( "Relation", From a9ec15e669f45d2f9bb5c438a9a0ad96d64570b0 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 25 Oct 2025 03:06:45 +0800 Subject: [PATCH 10/20] Resolve lock leakage issue during user cancellation handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Change default log level to INFO • Force enable error logging output • Add lock cleanup rollback protection • Handle LLM cache persistence errors • Fix async task exception handling --- lightrag/kg/shared_storage.py | 260 ++++++++++++++++++++++++++++++---- lightrag/lightrag.py | 18 ++- lightrag/operate.py | 86 +++++------ 3 files changed, 285 insertions(+), 79 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index e20dce52..26fc3832 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -12,7 +12,7 @@ from lightrag.exceptions import PipelineNotInitializedError # Define a direct print function for critical logs that must be visible in all processes -def direct_log(message, enable_output: bool = False, level: str = "DEBUG"): +def direct_log(message, enable_output: bool = False, level: str = "INFO"): """ Log a message directly to stderr to ensure visibility in all processes, including the Gunicorn master process. @@ -44,7 +44,6 @@ def direct_log(message, enable_output: bool = False, level: str = "DEBUG"): } message_level = level_mapping.get(level.upper(), logging.DEBUG) - # print(f"Diret_log: {level.upper()} {message_level} ? {current_level}", file=sys.stderr, flush=True) if message_level >= current_level: print(f"{level}: {message}", file=sys.stderr, flush=True) @@ -168,7 +167,7 @@ class UnifiedLock(Generic[T]): direct_log( f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}': {e}", level="ERROR", - enable_output=self._enable_logging, + enable_output=True, ) raise @@ -199,7 +198,7 @@ class UnifiedLock(Generic[T]): direct_log( f"== Lock == Process {self._pid}: Failed to release lock '{self._name}': {e}", level="ERROR", - enable_output=self._enable_logging, + enable_output=True, ) # If main lock release failed but async lock hasn't been released, try to release it @@ -223,7 +222,7 @@ class UnifiedLock(Generic[T]): direct_log( f"== Lock == Process {self._pid}: Failed to release async lock after main lock failure: {inner_e}", level="ERROR", - enable_output=self._enable_logging, + enable_output=True, ) raise @@ -247,7 +246,7 @@ class UnifiedLock(Generic[T]): direct_log( f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}' (sync): {e}", level="ERROR", - enable_output=self._enable_logging, + enable_output=True, ) raise @@ -269,7 +268,7 @@ class UnifiedLock(Generic[T]): direct_log( f"== Lock == Process {self._pid}: Failed to release lock '{self._name}' (sync): {e}", level="ERROR", - enable_output=self._enable_logging, + enable_output=True, ) raise @@ -401,7 +400,7 @@ def _perform_lock_cleanup( direct_log( f"== {lock_type} Lock == Cleanup failed: {e}", level="ERROR", - enable_output=False, + enable_output=True, ) return 0, earliest_cleanup_time, last_cleanup_time @@ -689,7 +688,7 @@ class KeyedUnifiedLock: direct_log( f"Error during multiprocess lock cleanup: {e}", level="ERROR", - enable_output=False, + enable_output=True, ) # 2. Cleanup async locks using generic function @@ -718,7 +717,7 @@ class KeyedUnifiedLock: direct_log( f"Error during async lock cleanup: {e}", level="ERROR", - enable_output=False, + enable_output=True, ) # 3. Get current status after cleanup @@ -772,7 +771,7 @@ class KeyedUnifiedLock: direct_log( f"Error getting keyed lock status: {e}", level="ERROR", - enable_output=False, + enable_output=True, ) return status @@ -797,32 +796,239 @@ class _KeyedLockContext: if enable_logging is not None else parent._default_enable_logging ) - self._ul: Optional[List["UnifiedLock"]] = None # set in __aenter__ + self._ul: Optional[List[Dict[str, Any]]] = None # set in __aenter__ # ----- enter ----- async def __aenter__(self): if self._ul is not None: raise RuntimeError("KeyedUnifiedLock already acquired in current context") - # acquire locks for all keys in the namespace self._ul = [] - for key in self._keys: - lock = self._parent._get_lock_for_key( - self._namespace, key, enable_logging=self._enable_logging - ) - await lock.__aenter__() - inc_debug_n_locks_acquired() - self._ul.append(lock) - return self + + try: + # Acquire locks for all keys in the namespace + for key in self._keys: + lock = None + entry = None + + try: + # 1. Get lock object (reference count is incremented here) + lock = self._parent._get_lock_for_key( + self._namespace, key, enable_logging=self._enable_logging + ) + + # 2. Immediately create and add entry to list (critical for rollback to work) + entry = { + "key": key, + "lock": lock, + "entered": False, + "debug_inc": False, + "ref_incremented": True, # Mark that reference count has been incremented + } + self._ul.append( + entry + ) # Add immediately after _get_lock_for_key for rollback to work + + # 3. Try to acquire the lock + # Use try-finally to ensure state is updated atomically + lock_acquired = False + try: + await lock.__aenter__() + lock_acquired = True # Lock successfully acquired + finally: + if lock_acquired: + entry["entered"] = True + inc_debug_n_locks_acquired() + entry["debug_inc"] = True + + except asyncio.CancelledError: + # Lock acquisition was cancelled + # The finally block above ensures entry["entered"] is correct + direct_log( + f"Lock acquisition cancelled for key {key}", + level="WARNING", + enable_output=self._enable_logging, + ) + raise + except Exception as e: + # Other exceptions, log and re-raise + direct_log( + f"Lock acquisition failed for key {key}: {e}", + level="ERROR", + enable_output=True, + ) + raise + + return self + + except BaseException: + # Critical: if any exception occurs (including CancelledError) during lock acquisition, + # we must rollback all already acquired locks to prevent lock leaks + # Use shield to ensure rollback completes + await asyncio.shield(self._rollback_acquired_locks()) + raise + + async def _rollback_acquired_locks(self): + """Rollback all acquired locks in case of exception during __aenter__""" + if not self._ul: + return + + async def rollback_single_entry(entry): + """Rollback a single lock acquisition""" + key = entry["key"] + lock = entry["lock"] + debug_inc = entry["debug_inc"] + entered = entry["entered"] + ref_incremented = entry.get( + "ref_incremented", True + ) # Default to True for safety + + errors = [] + + # 1. If lock was acquired, release it + if entered: + try: + await lock.__aexit__(None, None, None) + except Exception as e: + errors.append(("lock_exit", e)) + direct_log( + f"Lock rollback error for key {key}: {e}", + level="ERROR", + enable_output=True, + ) + + # 2. Release reference count (if it was incremented) + if ref_incremented: + try: + self._parent._release_lock_for_key(self._namespace, key) + except Exception as e: + errors.append(("ref_release", e)) + direct_log( + f"Lock rollback reference release error for key {key}: {e}", + level="ERROR", + enable_output=True, + ) + + # 3. Decrement debug counter + if debug_inc: + try: + dec_debug_n_locks_acquired() + except Exception as e: + errors.append(("debug_dec", e)) + direct_log( + f"Lock rollback counter decrementing error for key {key}: {e}", + level="ERROR", + enable_output=True, + ) + + return errors + + # Release already acquired locks in reverse order + for entry in reversed(self._ul): + # Use shield to protect each lock's rollback + try: + await asyncio.shield(rollback_single_entry(entry)) + except Exception as e: + # Log but continue rolling back other locks + direct_log( + f"Lock rollback unexpected error for {entry['key']}: {e}", + level="ERROR", + enable_output=True, + ) + + self._ul = None # ----- exit ----- async def __aexit__(self, exc_type, exc, tb): - # The UnifiedLock takes care of proper release order - for ul, key in zip(reversed(self._ul), reversed(self._keys)): - await ul.__aexit__(exc_type, exc, tb) - self._parent._release_lock_for_key(self._namespace, key) - dec_debug_n_locks_acquired() - self._ul = None + if self._ul is None: + return + + async def release_all_locks(): + """Release all locks with comprehensive error handling, protected from cancellation""" + + async def release_single_entry(entry, exc_type, exc, tb): + """Release a single lock with full protection""" + key = entry["key"] + lock = entry["lock"] + debug_inc = entry["debug_inc"] + entered = entry["entered"] + + errors = [] + + # 1. Release the lock + if entered: + try: + await lock.__aexit__(exc_type, exc, tb) + except Exception as e: + errors.append(("lock_exit", e)) + direct_log( + f"Lock release error for key {key}: {e}", + level="ERROR", + enable_output=True, + ) + + # 2. Release reference count + try: + self._parent._release_lock_for_key(self._namespace, key) + except Exception as e: + errors.append(("ref_release", e)) + direct_log( + f"Lock release reference error for key {key}: {e}", + level="ERROR", + enable_output=True, + ) + + # 3. Decrement debug counter + if debug_inc: + try: + dec_debug_n_locks_acquired() + except Exception as e: + errors.append(("debug_dec", e)) + direct_log( + f"Lock release counter decrementing error for key {key}: {e}", + level="ERROR", + enable_output=True, + ) + + return errors + + all_errors = [] + + # Release locks in reverse order + # This entire loop is protected by the outer shield + for entry in reversed(self._ul): + try: + errors = await release_single_entry(entry, exc_type, exc, tb) + for error_type, error in errors: + all_errors.append((entry["key"], error_type, error)) + except Exception as e: + all_errors.append((entry["key"], "unexpected", e)) + direct_log( + f"Lock release unexpected error for {entry['key']}: {e}", + level="ERROR", + enable_output=True, + ) + + return all_errors + + # CRITICAL: Protect the entire release process with shield + # This ensures that even if cancellation occurs, all locks are released + try: + all_errors = await asyncio.shield(release_all_locks()) + except Exception as e: + direct_log( + f"Critical error during __aexit__ cleanup: {e}", + level="ERROR", + enable_output=True, + ) + all_errors = [] + finally: + # Always clear the lock list, even if shield was cancelled + self._ul = None + + # If there were release errors and no other exception, raise the first release error + if all_errors and exc_type is None: + raise all_errors[0][2] # (key, error_type, error) def get_internal_lock(enable_logging: bool = False) -> UnifiedLock: diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index dff637f6..24ea0209 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1871,9 +1871,14 @@ class LightRAG: if task and not task.done(): task.cancel() - # Persistent llm cache + # Persistent llm cache with error handling if self.llm_response_cache: - await self.llm_response_cache.index_done_callback() + try: + await self.llm_response_cache.index_done_callback() + except Exception as persist_error: + logger.error( + f"Failed to persist LLM cache: {persist_error}" + ) # Record processing end time for failed case processing_end_time = int(time.time()) @@ -1994,9 +1999,14 @@ class LightRAG: error_msg ) - # Persistent llm cache + # Persistent llm cache with error handling if self.llm_response_cache: - await self.llm_response_cache.index_done_callback() + try: + await self.llm_response_cache.index_done_callback() + except Exception as persist_error: + logger.error( + f"Failed to persist LLM cache: {persist_error}" + ) # Record processing end time for failed case processing_end_time = int(time.time()) diff --git a/lightrag/operate.py b/lightrag/operate.py index 496c000c..36c8251d 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -2302,9 +2302,7 @@ async def merge_nodes_and_edges( return entity_data except Exception as e: - error_msg = ( - f"Critical error in entity processing for `{entity_name}`: {e}" - ) + error_msg = f"Error processing entity `{entity_name}`: {e}" logger.error(error_msg) # Try to update pipeline status, but don't let status update failure affect main exception @@ -2340,36 +2338,32 @@ async def merge_nodes_and_edges( entity_tasks, return_when=asyncio.FIRST_EXCEPTION ) - # Check if any task raised an exception and ensure all exceptions are retrieved first_exception = None - successful_results = [] + processed_entities = [] for task in done: try: - exception = task.exception() - if exception is not None: - if first_exception is None: - first_exception = exception - else: - successful_results.append(task.result()) - except Exception as e: + result = task.result() + except BaseException as e: if first_exception is None: first_exception = e + else: + processed_entities.append(result) + + if pending: + for task in pending: + task.cancel() + pending_results = await asyncio.gather(*pending, return_exceptions=True) + for result in pending_results: + if isinstance(result, BaseException): + if first_exception is None: + first_exception = result + else: + processed_entities.append(result) - # If any task failed, cancel all pending tasks and raise the first exception if first_exception is not None: - # Cancel all pending tasks - for pending_task in pending: - pending_task.cancel() - # Wait for cancellation to complete - if pending: - await asyncio.wait(pending) - # Re-raise the first exception to notify the caller raise first_exception - # If all tasks completed successfully, collect results - processed_entities = [task.result() for task in entity_tasks] - # ===== Phase 2: Process all relationships concurrently ===== log_message = f"Phase 2: Processing {total_relations_count} relations from {doc_id} (async: {graph_max_async})" logger.info(log_message) @@ -2421,7 +2415,7 @@ async def merge_nodes_and_edges( return edge_data, added_entities except Exception as e: - error_msg = f"Critical error in relationship processing for `{sorted_edge_key}`: {e}" + error_msg = f"Error processing relation `{sorted_edge_key}`: {e}" logger.error(error_msg) # Try to update pipeline status, but don't let status update failure affect main exception @@ -2459,40 +2453,36 @@ async def merge_nodes_and_edges( edge_tasks, return_when=asyncio.FIRST_EXCEPTION ) - # Check if any task raised an exception and ensure all exceptions are retrieved first_exception = None - successful_results = [] for task in done: try: - exception = task.exception() - if exception is not None: - if first_exception is None: - first_exception = exception - else: - successful_results.append(task.result()) - except Exception as e: + edge_data, added_entities = task.result() + except BaseException as e: if first_exception is None: first_exception = e + else: + if edge_data is not None: + processed_edges.append(edge_data) + all_added_entities.extend(added_entities) + + if pending: + for task in pending: + task.cancel() + pending_results = await asyncio.gather(*pending, return_exceptions=True) + for result in pending_results: + if isinstance(result, BaseException): + if first_exception is None: + first_exception = result + else: + edge_data, added_entities = result + if edge_data is not None: + processed_edges.append(edge_data) + all_added_entities.extend(added_entities) - # If any task failed, cancel all pending tasks and raise the first exception if first_exception is not None: - # Cancel all pending tasks - for pending_task in pending: - pending_task.cancel() - # Wait for cancellation to complete - if pending: - await asyncio.wait(pending) - # Re-raise the first exception to notify the caller raise first_exception - # If all tasks completed successfully, collect results - for task in edge_tasks: - edge_data, added_entities = task.result() - if edge_data is not None: - processed_edges.append(edge_data) - all_added_entities.extend(added_entities) - # ===== Phase 3: Update full_entities and full_relations storage ===== if full_entities_storage and full_relations_storage and doc_id: try: From 2476d6b7f8a4d83731d8234c818ccda27d55bf56 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 25 Oct 2025 03:34:54 +0800 Subject: [PATCH 11/20] Simplify pipeline status dialog by consolidating message sections MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Remove separate latest message section • Combine into single pipeline messages area • Add overflow-x-hidden for better display • Change break-words to break-all • Update translations across all locales --- .../components/documents/PipelineStatusDialog.tsx | 14 +++----------- lightrag_webui/src/locales/ar.json | 3 +-- lightrag_webui/src/locales/en.json | 3 +-- lightrag_webui/src/locales/fr.json | 3 +-- lightrag_webui/src/locales/zh.json | 3 +-- lightrag_webui/src/locales/zh_TW.json | 3 +-- 6 files changed, 8 insertions(+), 21 deletions(-) diff --git a/lightrag_webui/src/components/documents/PipelineStatusDialog.tsx b/lightrag_webui/src/components/documents/PipelineStatusDialog.tsx index 0aab7dfe..956538fd 100644 --- a/lightrag_webui/src/components/documents/PipelineStatusDialog.tsx +++ b/lightrag_webui/src/components/documents/PipelineStatusDialog.tsx @@ -216,25 +216,17 @@ export default function PipelineStatusDialog({ - {/* Latest Message */} -
-
{t('documentPanel.pipelineStatus.latestMessage')}:
-
- {status?.latest_message || '-'} -
-
- {/* History Messages */}
-
{t('documentPanel.pipelineStatus.historyMessages')}:
+
{t('documentPanel.pipelineStatus.pipelineMessages')}:
{status?.history_messages?.length ? ( status.history_messages.map((msg, idx) => ( -
{msg}
+
{msg}
)) ) : '-'}
diff --git a/lightrag_webui/src/locales/ar.json b/lightrag_webui/src/locales/ar.json index 67b90629..9d78da19 100644 --- a/lightrag_webui/src/locales/ar.json +++ b/lightrag_webui/src/locales/ar.json @@ -165,8 +165,7 @@ "startTime": "وقت البدء", "progress": "التقدم", "unit": "دفعة", - "latestMessage": "آخر رسالة", - "historyMessages": "رسائل السجل", + "pipelineMessages": "رسائل خط الأنابيب", "cancelButton": "إلغاء", "cancelTooltip": "إلغاء معالجة خط الأنابيب", "cancelInProgress": "الإلغاء قيد التقدم...", diff --git a/lightrag_webui/src/locales/en.json b/lightrag_webui/src/locales/en.json index e48c0207..205a7e66 100644 --- a/lightrag_webui/src/locales/en.json +++ b/lightrag_webui/src/locales/en.json @@ -165,8 +165,7 @@ "startTime": "Start Time", "progress": "Progress", "unit": "Batch", - "latestMessage": "Latest Message", - "historyMessages": "History Messages", + "pipelineMessages": "Pipeline Messages", "cancelButton": "Cancel", "cancelTooltip": "Cancel pipeline processing", "cancelInProgress": "Cancellation in progress...", diff --git a/lightrag_webui/src/locales/fr.json b/lightrag_webui/src/locales/fr.json index b75e3e63..48dbe03d 100644 --- a/lightrag_webui/src/locales/fr.json +++ b/lightrag_webui/src/locales/fr.json @@ -165,8 +165,7 @@ "startTime": "Heure de Début", "progress": "Progrès", "unit": "Lot", - "latestMessage": "Dernier Message", - "historyMessages": "Messages d'Historique", + "pipelineMessages": "Messages de Pipeline", "cancelButton": "Annuler", "cancelTooltip": "Annuler le traitement du pipeline", "cancelInProgress": "Annulation en cours...", diff --git a/lightrag_webui/src/locales/zh.json b/lightrag_webui/src/locales/zh.json index 2712a6d2..b361e1ff 100644 --- a/lightrag_webui/src/locales/zh.json +++ b/lightrag_webui/src/locales/zh.json @@ -165,8 +165,7 @@ "startTime": "开始时间", "progress": "进度", "unit": "批", - "latestMessage": "最新消息", - "historyMessages": "历史消息", + "pipelineMessages": "流水线消息", "cancelButton": "中断", "cancelTooltip": "中断流水线处理", "cancelInProgress": "取消请求进行中...", diff --git a/lightrag_webui/src/locales/zh_TW.json b/lightrag_webui/src/locales/zh_TW.json index 8fb06bf8..e822aa77 100644 --- a/lightrag_webui/src/locales/zh_TW.json +++ b/lightrag_webui/src/locales/zh_TW.json @@ -165,8 +165,7 @@ "startTime": "開始時間", "progress": "進度", "unit": "批", - "latestMessage": "最新消息", - "historyMessages": "歷史消息", + "pipelineMessages": "流水線消息", "cancelButton": "中斷", "cancelTooltip": "中斷流水線處理", "cancelInProgress": "取消請求進行中...", From 81e3496aa42103a7d2287e17ff440ecd6cad2933 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 25 Oct 2025 03:55:00 +0800 Subject: [PATCH 12/20] Add confirmation dialog for pipeline cancellation --- .../documents/PipelineStatusDialog.tsx | 37 +++++++++++++++++-- lightrag_webui/src/locales/ar.json | 3 ++ lightrag_webui/src/locales/en.json | 3 ++ lightrag_webui/src/locales/fr.json | 3 ++ lightrag_webui/src/locales/zh.json | 3 ++ lightrag_webui/src/locales/zh_TW.json | 3 ++ 6 files changed, 49 insertions(+), 3 deletions(-) diff --git a/lightrag_webui/src/components/documents/PipelineStatusDialog.tsx b/lightrag_webui/src/components/documents/PipelineStatusDialog.tsx index 956538fd..c368d69c 100644 --- a/lightrag_webui/src/components/documents/PipelineStatusDialog.tsx +++ b/lightrag_webui/src/components/documents/PipelineStatusDialog.tsx @@ -30,6 +30,7 @@ export default function PipelineStatusDialog({ const [status, setStatus] = useState(null) const [position, setPosition] = useState('center') const [isUserScrolled, setIsUserScrolled] = useState(false) + const [showCancelConfirm, setShowCancelConfirm] = useState(false) const historyRef = useRef(null) // Reset position when dialog opens @@ -37,6 +38,9 @@ export default function PipelineStatusDialog({ if (open) { setPosition('center') setIsUserScrolled(false) + } else { + // Reset confirmation dialog state when main dialog closes + setShowCancelConfirm(false) } }, [open]) @@ -81,8 +85,9 @@ export default function PipelineStatusDialog({ return () => clearInterval(interval) }, [open, t]) - // Handle cancel pipeline - const handleCancelPipeline = async () => { + // Handle cancel pipeline confirmation + const handleConfirmCancel = async () => { + setShowCancelConfirm(false) try { const result = await cancelPipeline() if (result.status === 'cancellation_requested') { @@ -186,7 +191,7 @@ export default function PipelineStatusDialog({ variant="destructive" size="sm" disabled={!canCancel} - onClick={handleCancelPipeline} + onClick={() => setShowCancelConfirm(true)} title={ status?.cancellation_requested ? t('documentPanel.pipelineStatus.cancelInProgress') @@ -233,6 +238,32 @@ export default function PipelineStatusDialog({
+ + {/* Cancel Confirmation Dialog */} + + + + {t('documentPanel.pipelineStatus.cancelConfirmTitle')} + + {t('documentPanel.pipelineStatus.cancelConfirmDescription')} + + +
+ + +
+
+
) } diff --git a/lightrag_webui/src/locales/ar.json b/lightrag_webui/src/locales/ar.json index 9d78da19..d81b5480 100644 --- a/lightrag_webui/src/locales/ar.json +++ b/lightrag_webui/src/locales/ar.json @@ -168,6 +168,9 @@ "pipelineMessages": "رسائل خط الأنابيب", "cancelButton": "إلغاء", "cancelTooltip": "إلغاء معالجة خط الأنابيب", + "cancelConfirmTitle": "تأكيد إلغاء خط الأنابيب", + "cancelConfirmDescription": "سيؤدي هذا الإجراء إلى إيقاف معالجة خط الأنابيب الجارية. هل أنت متأكد من أنك تريد المتابعة؟", + "cancelConfirmButton": "تأكيد الإلغاء", "cancelInProgress": "الإلغاء قيد التقدم...", "pipelineNotRunning": "خط الأنابيب غير قيد التشغيل", "cancelSuccess": "تم طلب إلغاء خط الأنابيب", diff --git a/lightrag_webui/src/locales/en.json b/lightrag_webui/src/locales/en.json index 205a7e66..9498a8e6 100644 --- a/lightrag_webui/src/locales/en.json +++ b/lightrag_webui/src/locales/en.json @@ -168,6 +168,9 @@ "pipelineMessages": "Pipeline Messages", "cancelButton": "Cancel", "cancelTooltip": "Cancel pipeline processing", + "cancelConfirmTitle": "Confirm Pipeline Cancellation", + "cancelConfirmDescription": "This will interrupt the ongoing pipeline processing. Are you sure you want to continue?", + "cancelConfirmButton": "Confirm Cancellation", "cancelInProgress": "Cancellation in progress...", "pipelineNotRunning": "Pipeline not running", "cancelSuccess": "Pipeline cancellation requested", diff --git a/lightrag_webui/src/locales/fr.json b/lightrag_webui/src/locales/fr.json index 48dbe03d..2a5448d0 100644 --- a/lightrag_webui/src/locales/fr.json +++ b/lightrag_webui/src/locales/fr.json @@ -168,6 +168,9 @@ "pipelineMessages": "Messages de Pipeline", "cancelButton": "Annuler", "cancelTooltip": "Annuler le traitement du pipeline", + "cancelConfirmTitle": "Confirmer l'Annulation du Pipeline", + "cancelConfirmDescription": "Cette action interrompra le traitement du pipeline en cours. Êtes-vous sûr de vouloir continuer ?", + "cancelConfirmButton": "Confirmer l'Annulation", "cancelInProgress": "Annulation en cours...", "pipelineNotRunning": "Le pipeline n'est pas en cours d'exécution", "cancelSuccess": "Annulation du pipeline demandée", diff --git a/lightrag_webui/src/locales/zh.json b/lightrag_webui/src/locales/zh.json index b361e1ff..9791de7a 100644 --- a/lightrag_webui/src/locales/zh.json +++ b/lightrag_webui/src/locales/zh.json @@ -168,6 +168,9 @@ "pipelineMessages": "流水线消息", "cancelButton": "中断", "cancelTooltip": "中断流水线处理", + "cancelConfirmTitle": "确认中断流水线", + "cancelConfirmDescription": "此操作将中断正在进行的流水线处理。确定要继续吗?", + "cancelConfirmButton": "确认中断", "cancelInProgress": "取消请求进行中...", "pipelineNotRunning": "流水线未运行", "cancelSuccess": "流水线中断请求已发送", diff --git a/lightrag_webui/src/locales/zh_TW.json b/lightrag_webui/src/locales/zh_TW.json index e822aa77..6c5c1a34 100644 --- a/lightrag_webui/src/locales/zh_TW.json +++ b/lightrag_webui/src/locales/zh_TW.json @@ -168,6 +168,9 @@ "pipelineMessages": "流水線消息", "cancelButton": "中斷", "cancelTooltip": "中斷流水線處理", + "cancelConfirmTitle": "確認中斷流水線", + "cancelConfirmDescription": "此操作將中斷正在進行的流水線處理。確定要繼續嗎?", + "cancelConfirmButton": "確認中斷", "cancelInProgress": "取消請求進行中...", "pipelineNotRunning": "流水線未運行", "cancelSuccess": "流水線中斷請求已發送", From 9ed19695bb157cf15f378479f456e0a7d2828cd4 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 25 Oct 2025 04:12:44 +0800 Subject: [PATCH 13/20] Remove separate retry button and merge functionality into scan button --- .../src/features/DocumentManager.tsx | 47 ------------------- lightrag_webui/src/locales/ar.json | 4 +- lightrag_webui/src/locales/en.json | 4 +- lightrag_webui/src/locales/fr.json | 4 +- lightrag_webui/src/locales/zh.json | 4 +- lightrag_webui/src/locales/zh_TW.json | 4 +- 6 files changed, 5 insertions(+), 62 deletions(-) diff --git a/lightrag_webui/src/features/DocumentManager.tsx b/lightrag_webui/src/features/DocumentManager.tsx index 204c7daf..406faf2b 100644 --- a/lightrag_webui/src/features/DocumentManager.tsx +++ b/lightrag_webui/src/features/DocumentManager.tsx @@ -21,7 +21,6 @@ import PaginationControls from '@/components/ui/PaginationControls' import { scanNewDocuments, - reprocessFailedDocuments, getDocumentsPaginated, DocsStatusesResponse, DocStatus, @@ -868,42 +867,6 @@ export default function DocumentManager() { } }, [t, startPollingInterval, currentTab, health, statusCounts]) - const retryFailedDocuments = useCallback(async () => { - try { - // Check if component is still mounted before starting the request - if (!isMountedRef.current) return; - - const { status, message, track_id: _track_id } = await reprocessFailedDocuments(); // eslint-disable-line @typescript-eslint/no-unused-vars - - // Check again if component is still mounted after the request completes - if (!isMountedRef.current) return; - - // Note: _track_id is available for future use (e.g., progress tracking) - toast.message(message || status); - - // Reset health check timer with 1 second delay to avoid race condition - useBackendState.getState().resetHealthCheckTimerDelayed(1000); - - // Start fast refresh with 2-second interval immediately after retry - startPollingInterval(2000); - - // Set recovery timer to restore normal polling interval after 15 seconds - setTimeout(() => { - if (isMountedRef.current && currentTab === 'documents' && health) { - // Restore intelligent polling interval based on document status - const hasActiveDocuments = hasActiveDocumentsStatus(statusCounts); - const normalInterval = hasActiveDocuments ? 5000 : 30000; - startPollingInterval(normalInterval); - } - }, 15000); // Restore after 15 seconds - } catch (err) { - // Only show error if component is still mounted - if (isMountedRef.current) { - toast.error(errorMessage(err)); - } - } - }, [startPollingInterval, currentTab, health, statusCounts]) - // Handle page size change - update state and save to store const handlePageSizeChange = useCallback((newPageSize: number) => { if (newPageSize === pagination.page_size) return; @@ -1166,16 +1129,6 @@ export default function DocumentManager() { > {t('documentPanel.documentManager.scanButton')} -