diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 361bba66..fe72b57c 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -260,14 +260,25 @@ Attributes: class DeleteDocRequest(BaseModel): - doc_id: str = Field(..., description="The ID of the document to delete.") + doc_ids: List[str] = Field(..., description="The IDs of the documents to delete.") - @field_validator("doc_id", mode="after") + @field_validator("doc_ids", mode="after") @classmethod - def validate_doc_id(cls, doc_id: str) -> str: - if not doc_id or not doc_id.strip(): - raise ValueError("Document ID cannot be empty") - return doc_id.strip() + def validate_doc_ids(cls, doc_ids: List[str]) -> List[str]: + if not doc_ids: + raise ValueError("Document IDs list cannot be empty") + + validated_ids = [] + for doc_id in doc_ids: + if not doc_id or not doc_id.strip(): + raise ValueError("Document ID cannot be empty") + validated_ids.append(doc_id.strip()) + + # Check for duplicates + if len(validated_ids) != len(set(validated_ids)): + raise ValueError("Document IDs must be unique") + + return validated_ids class DeleteEntityRequest(BaseModel): @@ -782,8 +793,8 @@ async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager): logger.error(traceback.format_exc()) -async def background_delete_document(rag: LightRAG, doc_id: str): - """Background task to delete a document""" +async def background_delete_documents(rag: LightRAG, doc_ids: List[str]): + """Background task to delete multiple documents""" from lightrag.kg.shared_storage import ( get_namespace_data, get_pipeline_status_lock, @@ -792,13 +803,20 @@ async def background_delete_document(rag: LightRAG, doc_id: str): pipeline_status = await get_namespace_data("pipeline_status") pipeline_status_lock = get_pipeline_status_lock() + total_docs = len(doc_ids) + successful_deletions = [] + failed_deletions = [] + # Set pipeline status to busy for deletion async with pipeline_status_lock: pipeline_status.update( { "busy": True, - "job_name": f"Deleting Document: {doc_id}", + "job_name": f"Deleting {total_docs} Documents", "job_start": datetime.now().isoformat(), + "docs": total_docs, + "batchs": total_docs, + "cur_batch": 0, "latest_message": "Starting document deletion process", } ) @@ -806,40 +824,81 @@ async def background_delete_document(rag: LightRAG, doc_id: str): pipeline_status["history_messages"][:] = ["Starting document deletion process"] try: - result = await rag.adelete_by_doc_id(doc_id) - if "history_messages" in pipeline_status: - pipeline_status["history_messages"].append(result.message) + # Loop through each document ID and delete them one by one + for i, doc_id in enumerate(doc_ids, 1): + async with pipeline_status_lock: + pipeline_status["cur_batch"] = i + pipeline_status["latest_message"] = ( + f"Deleting document {i}/{total_docs}: {doc_id}" + ) + pipeline_status["history_messages"].append( + f"Processing document {i}/{total_docs}: {doc_id}" + ) - logger.info(f"Document deletion completed for {doc_id}: {result.status}") + try: + result = await rag.adelete_by_doc_id(doc_id) + + if result.status == "success": + successful_deletions.append(doc_id) + success_msg = ( + f"Successfully deleted document {i}/{total_docs}: {doc_id}" + ) + logger.info(success_msg) + + async with pipeline_status_lock: + pipeline_status["history_messages"].append(success_msg) + else: + failed_deletions.append(doc_id) + error_msg = f"Failed to delete document {i}/{total_docs}: {doc_id} - {result.message}" + logger.error(error_msg) + + async with pipeline_status_lock: + pipeline_status["history_messages"].append(error_msg) + + except Exception as e: + failed_deletions.append(doc_id) + error_msg = ( + f"Error deleting document {i}/{total_docs}: {doc_id} - {str(e)}" + ) + logger.error(error_msg) + logger.error(traceback.format_exc()) + + async with pipeline_status_lock: + pipeline_status["history_messages"].append(error_msg) + + # Final summary + summary_msg = f"Deletion completed: {len(successful_deletions)} successful, {len(failed_deletions)} failed" + logger.info(summary_msg) + + async with pipeline_status_lock: + pipeline_status["history_messages"].append(summary_msg) + if successful_deletions: + pipeline_status["history_messages"].append( + f"Successfully deleted: {', '.join(successful_deletions)}" + ) + if failed_deletions: + pipeline_status["history_messages"].append( + f"Failed to delete: {', '.join(failed_deletions)}" + ) except Exception as e: - error_msg = f"Error deleting document {doc_id}: {str(e)}" + error_msg = f"Critical error during batch deletion: {str(e)}" logger.error(error_msg) logger.error(traceback.format_exc()) - if "history_messages" in pipeline_status: + + async with pipeline_status_lock: pipeline_status["history_messages"].append(error_msg) finally: async with pipeline_status_lock: pipeline_status["busy"] = False completion_msg = "Document deletion process completed." pipeline_status["latest_message"] = completion_msg - if "history_messages" in pipeline_status: - pipeline_status["history_messages"].append(completion_msg) + pipeline_status["history_messages"].append(completion_msg) def create_document_routes( rag: LightRAG, doc_manager: DocumentManager, api_key: Optional[str] = None ): - # Check if doc_id exists in the system - add this check at the beginning - async def check_doc_id_exists(doc_id: str) -> bool: - """Check if document ID exists in the system""" - try: - doc_status = await rag.doc_status.get_by_id(doc_id) - return doc_status is not None - except Exception as e: - logger.error(f"Error checking doc_id existence: {str(e)}") - return False - # Create combined auth dependency for document routes combined_auth = get_combined_auth_dependency(api_key) @@ -1326,9 +1385,9 @@ def create_document_routes( background_tasks: BackgroundTasks, ) -> DeleteDocByIdResponse: """ - Delete a document and all its associated data by its ID using background processing. + Delete documents and all their associated data by their IDs using background processing. - Deletes a specific document and all its associated data, including its status, + Deletes specific documents and all their associated data, including their status, text chunks, vector embeddings, and any related graph data. The deletion process runs in the background to avoid blocking the client connection. It is disabled when llm cache for entity extraction is disabled. @@ -1336,7 +1395,7 @@ def create_document_routes( This operation is irreversible and will interact with the pipeline status. Args: - delete_request (DeleteDocRequest): The request containing the document ID. + delete_request (DeleteDocRequest): The request containing the document IDs. background_tasks: FastAPI BackgroundTasks for async processing Returns: @@ -1349,10 +1408,7 @@ def create_document_routes( HTTPException: - 500: If an unexpected internal error occurs during initialization. """ - # Check if doc_id exists first - return error immediately if not found - doc_id = delete_request.doc_id - if not await check_doc_id_exists(doc_id): - raise HTTPException(status_code=404, detail=f"Document {doc_id} not found.") + doc_ids = delete_request.doc_ids # The rag object is initialized from the server startup args, # so we can access its properties here. @@ -1360,7 +1416,7 @@ def create_document_routes( return DeleteDocByIdResponse( status="not_allowed", message="Operation not allowed when LLM cache for entity extraction is disabled.", - doc_id=delete_request.doc_id, + doc_id=", ".join(delete_request.doc_ids), ) try: @@ -1372,21 +1428,21 @@ def create_document_routes( if pipeline_status.get("busy", False): return DeleteDocByIdResponse( status="busy", - message="Cannot delete document while pipeline is busy", - doc_id=doc_id, + message="Cannot delete documents while pipeline is busy", + doc_id=", ".join(doc_ids), ) # Add deletion task to background tasks - background_tasks.add_task(background_delete_document, rag, doc_id) + background_tasks.add_task(background_delete_documents, rag, doc_ids) return DeleteDocByIdResponse( status="deletion_started", - message=f"Document deletion for '{doc_id}' has been initiated. Processing will continue in background.", - doc_id=doc_id, + message=f"Document deletion for {len(doc_ids)} documents has been initiated. Processing will continue in background.", + doc_id=", ".join(doc_ids), ) except Exception as e: - error_msg = f"Error initiating document deletion for {delete_request.doc_id}: {str(e)}" + error_msg = f"Error initiating document deletion for {delete_request.doc_ids}: {str(e)}" logger.error(error_msg) logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=error_msg) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index ed4cc0d6..83ca2882 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1961,12 +1961,6 @@ class LightRAG: logger.error(f"Failed to delete document and status: {e}") raise Exception(f"Failed to delete document and status: {e}") from e - async with pipeline_status_lock: - log_message = f"Successfully deleted document {doc_id}" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - return DeletionResult( status="success", doc_id=doc_id,