Feat: Add document deletion by ID API endpoint

- New DELETE endpoint for document removal
- Implements doc_id-based deletion
- Handles pipeline status during operation
- Includes proper error handling
- Updates pipeline status messages
This commit is contained in:
yangdx 2025-06-23 18:10:40 +08:00
parent a0be65d5d9
commit dffe659388

View file

@ -12,7 +12,14 @@ import pipmaster as pm
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Any, Literal
from fastapi import APIRouter, BackgroundTasks, Depends, File, HTTPException, UploadFile
from fastapi import (
APIRouter,
BackgroundTasks,
Depends,
File,
HTTPException,
UploadFile,
)
from pydantic import BaseModel, Field, field_validator
from lightrag import LightRAG
@ -252,6 +259,10 @@ Attributes:
"""
class DeleteDocRequest(BaseModel):
doc_id: str = Field(..., description="The ID of the document to delete.")
class DocStatusResponse(BaseModel):
id: str = Field(description="Document identifier")
content_summary: str = Field(description="Summary of document content")
@ -1318,6 +1329,100 @@ def create_document_routes(
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
class DeleteDocByIdResponse(BaseModel):
"""Response model for single document deletion operation."""
status: Literal["success", "fail", "not_found", "busy"] = Field(
description="Status of the deletion operation"
)
message: str = Field(description="Message describing the operation result")
doc_id: Optional[str] = Field(default=None, description="The ID of the document.")
@router.delete(
"/delete_by_doc_id",
response_model=DeleteDocByIdResponse,
dependencies=[Depends(combined_auth)],
summary="Delete a document and all its associated data by its ID.",
)
async def delete_by_doc_id(
delete_request: DeleteDocRequest,
) -> DeleteDocByIdResponse:
"""
Deletes a specific document and all its associated data, including its status,
text chunks, vector embeddings, and any related graph data.
This operation is irreversible and will interact with the pipeline status.
Args:
delete_request (DeleteDocRequest): The request containing the document ID.
Returns:
DeleteDocByIdResponse: The result of the deletion operation.
- status="success": The document was successfully deleted.
- status="not_found": The document with the specified ID was not found.
- status="fail": The deletion operation failed.
- status="busy": The pipeline is busy with another operation.
Raises:
HTTPException:
- 500: If an unexpected internal error occurs.
"""
from lightrag.kg.shared_storage import (
get_namespace_data,
get_pipeline_status_lock,
)
doc_id = delete_request.doc_id
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
async with pipeline_status_lock:
if pipeline_status.get("busy", False):
return DeleteDocByIdResponse(
status="busy",
message="Cannot delete document while pipeline is busy",
doc_id=doc_id,
)
pipeline_status.update(
{
"busy": True,
"job_name": f"Deleting Document: {doc_id}",
"job_start": datetime.now().isoformat(),
"latest_message": "Starting document deletion process",
}
)
# Use slice assignment to clear the list in place
pipeline_status["history_messages"][:] = [
f"Starting deletion for doc_id: {doc_id}"
]
try:
result = await rag.adelete_by_doc_id(doc_id)
response_data = {
"doc_id": result.doc_id,
"message": result.message,
"status": result.status,
}
if "history_messages" in pipeline_status:
pipeline_status["history_messages"].append(result.message)
return DeleteDocByIdResponse(**response_data)
except Exception as e:
error_msg = f"Error deleting document {doc_id}: {str(e)}"
logger.error(error_msg)
logger.error(traceback.format_exc())
if "history_messages" in pipeline_status:
pipeline_status["history_messages"].append(error_msg)
# Re-raise as HTTPException for consistent error handling by FastAPI
raise HTTPException(status_code=500, detail=error_msg)
finally:
async with pipeline_status_lock:
pipeline_status["busy"] = False
completion_msg = f"Document deletion process for {doc_id} completed."
pipeline_status["latest_message"] = completion_msg
if "history_messages" in pipeline_status:
pipeline_status["history_messages"].append(completion_msg)
@router.post(
"/clear_cache",
response_model=ClearCacheResponse,
@ -1372,3 +1477,4 @@ def create_document_routes(
raise HTTPException(status_code=500, detail=str(e))
return router