Preserve failed documents during data consistency validation for manual review
This commit is contained in:
parent
e1310c5262
commit
ca4c18baaa
1 changed files with 27 additions and 5 deletions
|
|
@ -1114,17 +1114,37 @@ class LightRAG:
|
||||||
pipeline_status: dict,
|
pipeline_status: dict,
|
||||||
pipeline_status_lock: asyncio.Lock,
|
pipeline_status_lock: asyncio.Lock,
|
||||||
) -> dict[str, DocProcessingStatus]:
|
) -> dict[str, DocProcessingStatus]:
|
||||||
"""Validate and fix document data consistency by deleting inconsistent entries"""
|
"""Validate and fix document data consistency by deleting inconsistent entries, but preserve failed documents"""
|
||||||
inconsistent_docs = []
|
inconsistent_docs = []
|
||||||
|
failed_docs_to_preserve = []
|
||||||
|
|
||||||
# Check each document's data consistency
|
# Check each document's data consistency
|
||||||
for doc_id, status_doc in to_process_docs.items():
|
for doc_id, status_doc in to_process_docs.items():
|
||||||
# Check if corresponding content exists in full_docs
|
# Check if corresponding content exists in full_docs
|
||||||
content_data = await self.full_docs.get_by_id(doc_id)
|
content_data = await self.full_docs.get_by_id(doc_id)
|
||||||
if not content_data:
|
if not content_data:
|
||||||
inconsistent_docs.append(doc_id)
|
# Check if this is a failed document that should be preserved
|
||||||
|
if (
|
||||||
|
hasattr(status_doc, "status")
|
||||||
|
and status_doc.status == DocStatus.FAILED
|
||||||
|
):
|
||||||
|
failed_docs_to_preserve.append(doc_id)
|
||||||
|
else:
|
||||||
|
inconsistent_docs.append(doc_id)
|
||||||
|
|
||||||
# Delete inconsistent document entries one by one
|
# Log information about failed documents that will be preserved
|
||||||
|
if failed_docs_to_preserve:
|
||||||
|
async with pipeline_status_lock:
|
||||||
|
preserve_message = f"Preserving {len(failed_docs_to_preserve)} failed document entries for manual review"
|
||||||
|
logger.info(preserve_message)
|
||||||
|
pipeline_status["latest_message"] = preserve_message
|
||||||
|
pipeline_status["history_messages"].append(preserve_message)
|
||||||
|
|
||||||
|
# Remove failed documents from processing list but keep them in doc_status
|
||||||
|
for doc_id in failed_docs_to_preserve:
|
||||||
|
to_process_docs.pop(doc_id, None)
|
||||||
|
|
||||||
|
# Delete inconsistent document entries(excluding failed documents)
|
||||||
if inconsistent_docs:
|
if inconsistent_docs:
|
||||||
async with pipeline_status_lock:
|
async with pipeline_status_lock:
|
||||||
summary_message = (
|
summary_message = (
|
||||||
|
|
@ -1146,7 +1166,9 @@ class LightRAG:
|
||||||
|
|
||||||
# Log successful deletion
|
# Log successful deletion
|
||||||
async with pipeline_status_lock:
|
async with pipeline_status_lock:
|
||||||
log_message = f"Deleted entry: {doc_id} ({file_path})"
|
log_message = (
|
||||||
|
f"Deleted inconsistent entry: {doc_id} ({file_path})"
|
||||||
|
)
|
||||||
logger.info(log_message)
|
logger.info(log_message)
|
||||||
pipeline_status["latest_message"] = log_message
|
pipeline_status["latest_message"] = log_message
|
||||||
pipeline_status["history_messages"].append(log_message)
|
pipeline_status["history_messages"].append(log_message)
|
||||||
|
|
@ -1164,7 +1186,7 @@ class LightRAG:
|
||||||
|
|
||||||
# Final summary log
|
# Final summary log
|
||||||
async with pipeline_status_lock:
|
async with pipeline_status_lock:
|
||||||
final_message = f"Data consistency cleanup completed: successfully deleted {successful_deletions} entries"
|
final_message = f"Data consistency cleanup completed: successfully deleted {successful_deletions} inconsistent entries, preserved {len(failed_docs_to_preserve)} failed documents"
|
||||||
logger.info(final_message)
|
logger.info(final_message)
|
||||||
pipeline_status["latest_message"] = final_message
|
pipeline_status["latest_message"] = final_message
|
||||||
pipeline_status["history_messages"].append(final_message)
|
pipeline_status["history_messages"].append(final_message)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue