diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index c1752e06..fe2bd9a5 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -1154,7 +1154,9 @@ async def pipeline_enqueue_file( content, file_paths=file_path.name, track_id=track_id ) - logger.info(f"Successfully extracted and enqueued file: {file_path.name}") + logger.info( + f"Successfully extracted and enqueued file: {file_path.name}" + ) # Move file to __enqueued__ directory after enqueuing try: diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 55067b59..87e67d2d 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1188,6 +1188,7 @@ class LightRAG: """Validate and fix document data consistency by deleting inconsistent entries, but preserve failed documents""" inconsistent_docs = [] failed_docs_to_preserve = [] + successful_deletions = 0 # Check each document's data consistency for doc_id, status_doc in to_process_docs.items(): @@ -1255,12 +1256,53 @@ class LightRAG: pipeline_status["latest_message"] = error_message pipeline_status["history_messages"].append(error_message) - # Final summary log + # Final summary log + # async with pipeline_status_lock: + # final_message = f"Successfully deleted {successful_deletions} inconsistent entries, preserved {len(failed_docs_to_preserve)} failed documents" + # logger.info(final_message) + # pipeline_status["latest_message"] = final_message + # pipeline_status["history_messages"].append(final_message) + + # Reset PROCESSING and FAILED documents that pass consistency checks to PENDING status + docs_to_reset = {} + reset_count = 0 + + for doc_id, status_doc in to_process_docs.items(): + # Check if document has corresponding content in full_docs (consistency check) + content_data = await self.full_docs.get_by_id(doc_id) + if content_data: # Document passes consistency check + # Check if document is in PROCESSING or FAILED status + if hasattr(status_doc, "status") and status_doc.status in [ + DocStatus.PROCESSING, + DocStatus.FAILED, + ]: + # Prepare document for status reset to PENDING + docs_to_reset[doc_id] = { + "status": DocStatus.PENDING, + "content_summary": status_doc.content_summary, + "content_length": status_doc.content_length, + "created_at": status_doc.created_at, + "updated_at": datetime.now(timezone.utc).isoformat(), + "file_path": getattr(status_doc, "file_path", "unknown_source"), + "track_id": getattr(status_doc, "track_id", ""), + # Clear any error messages and processing metadata + "error_msg": "", + "metadata": {}, + } + + # Update the status in to_process_docs as well + status_doc.status = DocStatus.PENDING + reset_count += 1 + + # Update doc_status storage if there are documents to reset + if docs_to_reset: + await self.doc_status.upsert(docs_to_reset) + async with pipeline_status_lock: - final_message = f"Data consistency cleanup completed: successfully deleted {successful_deletions} inconsistent entries, preserved {len(failed_docs_to_preserve)} failed documents" - logger.info(final_message) - pipeline_status["latest_message"] = final_message - pipeline_status["history_messages"].append(final_message) + reset_message = f"Reset {reset_count} documents from PROCESSING/FAILED to PENDING status" + logger.info(reset_message) + pipeline_status["latest_message"] = reset_message + pipeline_status["history_messages"].append(reset_message) return to_process_docs @@ -1702,7 +1744,7 @@ class LightRAG: to_process_docs.update(pending_docs) finally: - log_message = "Document processing pipeline completed" + log_message = "Enqueued document processing pipeline stoped" logger.info(log_message) # Always reset busy status when done or if an exception occurs (with lock) async with pipeline_status_lock: