refactor: integrate document consistency validation into pipeline processing

This ensures data consistency validation is part of the main processing pipeline and provides better monitoring of inconsistent document cleanup operations.
This commit is contained in:
yangdx 2025-08-14 11:38:36 +08:00
parent b5ae84fac6
commit 17faeb2fb8

View file

@ -1111,45 +1111,71 @@ class LightRAG:
return track_id
async def _validate_and_fix_document_consistency(
self, to_process_docs: dict[str, DocProcessingStatus]
self,
to_process_docs: dict[str, DocProcessingStatus],
pipeline_status: dict,
pipeline_status_lock: asyncio.Lock,
) -> dict[str, DocProcessingStatus]:
"""Validate and fix document data consistency"""
"""Validate and fix document data consistency by deleting inconsistent entries"""
inconsistent_docs = []
# Check each document's data consistency
for doc_id, status_doc in to_process_docs.items():
# Check if corresponding content exists in full_docs
content_data = await self.full_docs.get_by_id(doc_id)
if not content_data:
inconsistent_docs.append(doc_id)
logger.warning(
f"Document {doc_id} has status record but missing content in full_docs"
)
async with pipeline_status_lock:
log_message = f"Data inconsistency detected: Document {doc_id} ({status_doc.file_path}) missing content data"
logger.warning(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
# Mark inconsistent documents as FAILED
# Delete inconsistent document entries one by one
if inconsistent_docs:
failed_updates = {}
async with pipeline_status_lock:
summary_message = f"Starting cleanup of {len(inconsistent_docs)} inconsistent document entries"
logger.info(summary_message)
pipeline_status["latest_message"] = summary_message
pipeline_status["history_messages"].append(summary_message)
successful_deletions = 0
for doc_id in inconsistent_docs:
try:
status_doc = to_process_docs[doc_id]
failed_updates[doc_id] = {
"status": DocStatus.FAILED,
"error_msg": "Document content not found in full_docs storage - data inconsistency detected",
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
"updated_at": datetime.now(timezone.utc).isoformat(),
"file_path": status_doc.file_path,
"track_id": status_doc.track_id,
}
file_path = getattr(status_doc, "file_path", "unknown_source")
await self.doc_status.upsert(failed_updates)
logger.info(
f"Marked {len(inconsistent_docs)} inconsistent documents as FAILED"
)
# Delete doc_status entry
await self.doc_status.delete([doc_id])
successful_deletions += 1
# Remove these documents from the processing list
for doc_id in inconsistent_docs:
# Log successful deletion
async with pipeline_status_lock:
log_message = f"Deleted inconsistent document entry: {doc_id} ({file_path})"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
# Remove from processing list
to_process_docs.pop(doc_id, None)
except Exception as e:
# Log deletion failure
async with pipeline_status_lock:
error_message = (
f"Failed to delete document entry: {doc_id} - {str(e)}"
)
logger.error(error_message)
pipeline_status["latest_message"] = error_message
pipeline_status["history_messages"].append(error_message)
# Final summary log
async with pipeline_status_lock:
final_message = f"Data consistency cleanup completed: successfully deleted {successful_deletions} entries"
logger.info(final_message)
pipeline_status["latest_message"] = final_message
pipeline_status["history_messages"].append(final_message)
return to_process_docs
async def apipeline_process_enqueue_documents(
@ -1192,15 +1218,6 @@ class LightRAG:
logger.info("No documents to process")
return
# Validate document data consistency and fix any issues
to_process_docs = await self._validate_and_fix_document_consistency(
to_process_docs
)
if not to_process_docs:
logger.info("No valid documents to process after consistency check")
return
pipeline_status.update(
{
"busy": True,
@ -1233,6 +1250,20 @@ class LightRAG:
pipeline_status["history_messages"].append(log_message)
break
# Validate document data consistency and fix any issues as part of the pipeline
to_process_docs = await self._validate_and_fix_document_consistency(
to_process_docs, pipeline_status, pipeline_status_lock
)
if not to_process_docs:
log_message = (
"No valid documents to process after consistency check"
)
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
break
log_message = f"Processing {len(to_process_docs)} document(s)"
logger.info(log_message)