From 8fb1c09b08028839441db6065e0e6ddef4c4b95b Mon Sep 17 00:00:00 2001 From: yangdx Date: Thu, 26 Jun 2025 01:00:54 +0800 Subject: [PATCH] Refac: pipelinge message --- lightrag/api/routers/document_routes.py | 53 ++++++++++--------------- lightrag/lightrag.py | 8 ++-- 2 files changed, 26 insertions(+), 35 deletions(-) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 3c518dca..fba5c3e8 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -836,24 +836,24 @@ async def background_delete_documents( # Loop through each document ID and delete them one by one for i, doc_id in enumerate(doc_ids, 1): async with pipeline_status_lock: + start_msg = f"Deleting document {i}/{total_docs}: {doc_id}" + logger.info(start_msg) pipeline_status["cur_batch"] = i - pipeline_status["latest_message"] = ( - f"Deleting document {i}/{total_docs}: {doc_id}" - ) - pipeline_status["history_messages"].append( - f"Processing document {i}/{total_docs}: {doc_id}" - ) + pipeline_status["latest_message"] = start_msg + pipeline_status["history_messages"].append(start_msg) + file_path = "#" try: result = await rag.adelete_by_doc_id(doc_id) - + file_path = ( + getattr(result, "file_path", "-") if "result" in locals() else "-" + ) if result.status == "success": successful_deletions.append(doc_id) success_msg = ( - f"Successfully deleted document {i}/{total_docs}: {doc_id}" + f"Deleted document {i}/{total_docs}: {doc_id}[{file_path}]" ) logger.info(success_msg) - async with pipeline_status_lock: pipeline_status["history_messages"].append(success_msg) @@ -872,6 +872,7 @@ async def background_delete_documents( ) logger.info(file_delete_msg) async with pipeline_status_lock: + pipeline_status["latest_message"] = file_delete_msg pipeline_status["history_messages"].append( file_delete_msg ) @@ -881,6 +882,9 @@ async def background_delete_documents( ) logger.warning(file_not_found_msg) async with pipeline_status_lock: + pipeline_status["latest_message"] = ( + file_not_found_msg + ) pipeline_status["history_messages"].append( file_not_found_msg ) @@ -888,6 +892,7 @@ async def background_delete_documents( file_error_msg = f"Failed to delete file {result.file_path}: {str(file_error)}" logger.error(file_error_msg) async with pipeline_status_lock: + pipeline_status["latest_message"] = file_error_msg pipeline_status["history_messages"].append( file_error_msg ) @@ -895,52 +900,36 @@ async def background_delete_documents( no_file_msg = f"No valid file path found for document {doc_id}" logger.warning(no_file_msg) async with pipeline_status_lock: + pipeline_status["latest_message"] = no_file_msg pipeline_status["history_messages"].append(no_file_msg) else: failed_deletions.append(doc_id) - error_msg = f"Failed to delete document {i}/{total_docs}: {doc_id} - {result.message}" + error_msg = f"Failed to delete {i}/{total_docs}: {doc_id}[{file_path}] - {result.message}" logger.error(error_msg) - async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg pipeline_status["history_messages"].append(error_msg) except Exception as e: failed_deletions.append(doc_id) - error_msg = ( - f"Error deleting document {i}/{total_docs}: {doc_id} - {str(e)}" - ) + error_msg = f"Error deleting document {i}/{total_docs}: {doc_id}[{file_path}] - {str(e)}" logger.error(error_msg) logger.error(traceback.format_exc()) - async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg pipeline_status["history_messages"].append(error_msg) - # Final summary - summary_msg = f"Deletion completed: {len(successful_deletions)} successful, {len(failed_deletions)} failed" - logger.info(summary_msg) - - async with pipeline_status_lock: - pipeline_status["history_messages"].append(summary_msg) - if successful_deletions: - pipeline_status["history_messages"].append( - f"Successfully deleted: {', '.join(successful_deletions)}" - ) - if failed_deletions: - pipeline_status["history_messages"].append( - f"Failed to delete: {', '.join(failed_deletions)}" - ) - except Exception as e: error_msg = f"Critical error during batch deletion: {str(e)}" logger.error(error_msg) logger.error(traceback.format_exc()) - async with pipeline_status_lock: pipeline_status["history_messages"].append(error_msg) finally: + # Final summary async with pipeline_status_lock: pipeline_status["busy"] = False - completion_msg = "Document deletion process completed." + completion_msg = f"Deletion completed: {len(successful_deletions)} successful, {len(failed_deletions)} failed" pipeline_status["latest_message"] = completion_msg pipeline_status["history_messages"].append(completion_msg) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 5e7d4153..748b9ef8 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1712,6 +1712,7 @@ class LightRAG: try: # 1. Get the document status and related data doc_status_data = await self.doc_status.get_by_id(doc_id) + file_path = doc_status_data.get("file_path") if doc_status_data else None if not doc_status_data: logger.warning(f"Document {doc_id} not found") return DeletionResult( @@ -1719,6 +1720,7 @@ class LightRAG: doc_id=doc_id, message=f"Document {doc_id} not found.", status_code=404, + file_path="", ) # 2. Get all chunks related to this document @@ -1770,6 +1772,7 @@ class LightRAG: doc_id=doc_id, message=log_message, status_code=200, + file_path=file_path, ) chunk_ids = set(related_chunks.keys()) @@ -1962,9 +1965,6 @@ class LightRAG: logger.error(f"Failed to delete document and status: {e}") raise Exception(f"Failed to delete document and status: {e}") from e - # Get file path from document status for return value - file_path = doc_status_data.get("file_path") if doc_status_data else None - return DeletionResult( status="success", doc_id=doc_id, @@ -1983,6 +1983,7 @@ class LightRAG: doc_id=doc_id, message=error_message, status_code=500, + file_path=file_path, ) finally: @@ -2002,6 +2003,7 @@ class LightRAG: doc_id=doc_id, message=f"Deletion completed but failed to persist changes: {persistence_error}", status_code=500, + file_path=file_path, ) # If there was an original exception, log the persistence error but don't override the original error # The original error result was already returned in the except block