From ca4c18baaa9093aab54714587630fefd7af1197c Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 16 Aug 2025 22:29:46 +0800 Subject: [PATCH] Preserve failed documents during data consistency validation for manual review --- lightrag/lightrag.py | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 8be61205..b659c70f 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1114,17 +1114,37 @@ class LightRAG: pipeline_status: dict, pipeline_status_lock: asyncio.Lock, ) -> dict[str, DocProcessingStatus]: - """Validate and fix document data consistency by deleting inconsistent entries""" + """Validate and fix document data consistency by deleting inconsistent entries, but preserve failed documents""" inconsistent_docs = [] + failed_docs_to_preserve = [] # Check each document's data consistency for doc_id, status_doc in to_process_docs.items(): # Check if corresponding content exists in full_docs content_data = await self.full_docs.get_by_id(doc_id) if not content_data: - inconsistent_docs.append(doc_id) + # Check if this is a failed document that should be preserved + if ( + hasattr(status_doc, "status") + and status_doc.status == DocStatus.FAILED + ): + failed_docs_to_preserve.append(doc_id) + else: + inconsistent_docs.append(doc_id) - # Delete inconsistent document entries one by one + # Log information about failed documents that will be preserved + if failed_docs_to_preserve: + async with pipeline_status_lock: + preserve_message = f"Preserving {len(failed_docs_to_preserve)} failed document entries for manual review" + logger.info(preserve_message) + pipeline_status["latest_message"] = preserve_message + pipeline_status["history_messages"].append(preserve_message) + + # Remove failed documents from processing list but keep them in doc_status + for doc_id in failed_docs_to_preserve: + to_process_docs.pop(doc_id, None) + + # Delete inconsistent document entries(excluding failed documents) if inconsistent_docs: async with pipeline_status_lock: summary_message = ( @@ -1146,7 +1166,9 @@ class LightRAG: # Log successful deletion async with pipeline_status_lock: - log_message = f"Deleted entry: {doc_id} ({file_path})" + log_message = ( + f"Deleted inconsistent entry: {doc_id} ({file_path})" + ) logger.info(log_message) pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) @@ -1164,7 +1186,7 @@ class LightRAG: # Final summary log async with pipeline_status_lock: - final_message = f"Data consistency cleanup completed: successfully deleted {successful_deletions} entries" + final_message = f"Data consistency cleanup completed: successfully deleted {successful_deletions} inconsistent entries, preserved {len(failed_docs_to_preserve)} failed documents" logger.info(final_message) pipeline_status["latest_message"] = final_message pipeline_status["history_messages"].append(final_message)