fix: reset PROCESSING/FAILED docs to PENDING at the beginging of document processing pipeline

- Reset documents with PROCESSING/FAILED status to PENDING when they pass consistency checks
- Update doc_status storage and clear error messages/metadata on reset
This commit is contained in:
yangdx 2025-08-18 00:49:52 +08:00
parent add8b07a21
commit 377f1a022e
2 changed files with 51 additions and 7 deletions

View file

@ -1154,7 +1154,9 @@ async def pipeline_enqueue_file(
content, file_paths=file_path.name, track_id=track_id
)
logger.info(f"Successfully extracted and enqueued file: {file_path.name}")
logger.info(
f"Successfully extracted and enqueued file: {file_path.name}"
)
# Move file to __enqueued__ directory after enqueuing
try:

View file

@ -1188,6 +1188,7 @@ class LightRAG:
"""Validate and fix document data consistency by deleting inconsistent entries, but preserve failed documents"""
inconsistent_docs = []
failed_docs_to_preserve = []
successful_deletions = 0
# Check each document's data consistency
for doc_id, status_doc in to_process_docs.items():
@ -1255,12 +1256,53 @@ class LightRAG:
pipeline_status["latest_message"] = error_message
pipeline_status["history_messages"].append(error_message)
# Final summary log
# Final summary log
# async with pipeline_status_lock:
# final_message = f"Successfully deleted {successful_deletions} inconsistent entries, preserved {len(failed_docs_to_preserve)} failed documents"
# logger.info(final_message)
# pipeline_status["latest_message"] = final_message
# pipeline_status["history_messages"].append(final_message)
# Reset PROCESSING and FAILED documents that pass consistency checks to PENDING status
docs_to_reset = {}
reset_count = 0
for doc_id, status_doc in to_process_docs.items():
# Check if document has corresponding content in full_docs (consistency check)
content_data = await self.full_docs.get_by_id(doc_id)
if content_data: # Document passes consistency check
# Check if document is in PROCESSING or FAILED status
if hasattr(status_doc, "status") and status_doc.status in [
DocStatus.PROCESSING,
DocStatus.FAILED,
]:
# Prepare document for status reset to PENDING
docs_to_reset[doc_id] = {
"status": DocStatus.PENDING,
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
"updated_at": datetime.now(timezone.utc).isoformat(),
"file_path": getattr(status_doc, "file_path", "unknown_source"),
"track_id": getattr(status_doc, "track_id", ""),
# Clear any error messages and processing metadata
"error_msg": "",
"metadata": {},
}
# Update the status in to_process_docs as well
status_doc.status = DocStatus.PENDING
reset_count += 1
# Update doc_status storage if there are documents to reset
if docs_to_reset:
await self.doc_status.upsert(docs_to_reset)
async with pipeline_status_lock:
final_message = f"Data consistency cleanup completed: successfully deleted {successful_deletions} inconsistent entries, preserved {len(failed_docs_to_preserve)} failed documents"
logger.info(final_message)
pipeline_status["latest_message"] = final_message
pipeline_status["history_messages"].append(final_message)
reset_message = f"Reset {reset_count} documents from PROCESSING/FAILED to PENDING status"
logger.info(reset_message)
pipeline_status["latest_message"] = reset_message
pipeline_status["history_messages"].append(reset_message)
return to_process_docs
@ -1702,7 +1744,7 @@ class LightRAG:
to_process_docs.update(pending_docs)
finally:
log_message = "Document processing pipeline completed"
log_message = "Enqueued document processing pipeline stoped"
logger.info(log_message)
# Always reset busy status when done or if an exception occurs (with lock)
async with pipeline_status_lock: