From dffe659388ba2fecaa0de43c1e8c9507921b4dc9 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 23 Jun 2025 18:10:40 +0800 Subject: [PATCH] Feat: Add document deletion by ID API endpoint - New DELETE endpoint for document removal - Implements doc_id-based deletion - Handles pipeline status during operation - Includes proper error handling - Updates pipeline status messages --- lightrag/api/routers/document_routes.py | 108 +++++++++++++++++++++++- 1 file changed, 107 insertions(+), 1 deletion(-) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index b2e9baf8..16dd4b0e 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -12,7 +12,14 @@ import pipmaster as pm from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Any, Literal -from fastapi import APIRouter, BackgroundTasks, Depends, File, HTTPException, UploadFile +from fastapi import ( + APIRouter, + BackgroundTasks, + Depends, + File, + HTTPException, + UploadFile, +) from pydantic import BaseModel, Field, field_validator from lightrag import LightRAG @@ -252,6 +259,10 @@ Attributes: """ +class DeleteDocRequest(BaseModel): + doc_id: str = Field(..., description="The ID of the document to delete.") + + class DocStatusResponse(BaseModel): id: str = Field(description="Document identifier") content_summary: str = Field(description="Summary of document content") @@ -1318,6 +1329,100 @@ def create_document_routes( logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) + class DeleteDocByIdResponse(BaseModel): + """Response model for single document deletion operation.""" + + status: Literal["success", "fail", "not_found", "busy"] = Field( + description="Status of the deletion operation" + ) + message: str = Field(description="Message describing the operation result") + doc_id: Optional[str] = Field(default=None, description="The ID of the document.") + + @router.delete( + "/delete_by_doc_id", + response_model=DeleteDocByIdResponse, + dependencies=[Depends(combined_auth)], + summary="Delete a document and all its associated data by its ID.", + ) + async def delete_by_doc_id( + delete_request: DeleteDocRequest, + ) -> DeleteDocByIdResponse: + """ + Deletes a specific document and all its associated data, including its status, + text chunks, vector embeddings, and any related graph data. + + This operation is irreversible and will interact with the pipeline status. + + Args: + delete_request (DeleteDocRequest): The request containing the document ID. + + Returns: + DeleteDocByIdResponse: The result of the deletion operation. + - status="success": The document was successfully deleted. + - status="not_found": The document with the specified ID was not found. + - status="fail": The deletion operation failed. + - status="busy": The pipeline is busy with another operation. + + Raises: + HTTPException: + - 500: If an unexpected internal error occurs. + """ + from lightrag.kg.shared_storage import ( + get_namespace_data, + get_pipeline_status_lock, + ) + + doc_id = delete_request.doc_id + pipeline_status = await get_namespace_data("pipeline_status") + pipeline_status_lock = get_pipeline_status_lock() + + async with pipeline_status_lock: + if pipeline_status.get("busy", False): + return DeleteDocByIdResponse( + status="busy", + message="Cannot delete document while pipeline is busy", + doc_id=doc_id, + ) + pipeline_status.update( + { + "busy": True, + "job_name": f"Deleting Document: {doc_id}", + "job_start": datetime.now().isoformat(), + "latest_message": "Starting document deletion process", + } + ) + # Use slice assignment to clear the list in place + pipeline_status["history_messages"][:] = [ + f"Starting deletion for doc_id: {doc_id}" + ] + + try: + result = await rag.adelete_by_doc_id(doc_id) + response_data = { + "doc_id": result.doc_id, + "message": result.message, + "status": result.status, + } + if "history_messages" in pipeline_status: + pipeline_status["history_messages"].append(result.message) + return DeleteDocByIdResponse(**response_data) + + except Exception as e: + error_msg = f"Error deleting document {doc_id}: {str(e)}" + logger.error(error_msg) + logger.error(traceback.format_exc()) + if "history_messages" in pipeline_status: + pipeline_status["history_messages"].append(error_msg) + # Re-raise as HTTPException for consistent error handling by FastAPI + raise HTTPException(status_code=500, detail=error_msg) + finally: + async with pipeline_status_lock: + pipeline_status["busy"] = False + completion_msg = f"Document deletion process for {doc_id} completed." + pipeline_status["latest_message"] = completion_msg + if "history_messages" in pipeline_status: + pipeline_status["history_messages"].append(completion_msg) + @router.post( "/clear_cache", response_model=ClearCacheResponse, @@ -1372,3 +1477,4 @@ def create_document_routes( raise HTTPException(status_code=500, detail=str(e)) return router +