Change the API for deleting documents to support deleting multiple documents at once.

This commit is contained in:
yangdx 2025-06-25 16:19:49 +08:00
parent 495d6c8cce
commit 51bb0471cd
2 changed files with 97 additions and 47 deletions

View file

@ -260,14 +260,25 @@ Attributes:
class DeleteDocRequest(BaseModel):
doc_id: str = Field(..., description="The ID of the document to delete.")
doc_ids: List[str] = Field(..., description="The IDs of the documents to delete.")
@field_validator("doc_id", mode="after")
@field_validator("doc_ids", mode="after")
@classmethod
def validate_doc_id(cls, doc_id: str) -> str:
if not doc_id or not doc_id.strip():
raise ValueError("Document ID cannot be empty")
return doc_id.strip()
def validate_doc_ids(cls, doc_ids: List[str]) -> List[str]:
if not doc_ids:
raise ValueError("Document IDs list cannot be empty")
validated_ids = []
for doc_id in doc_ids:
if not doc_id or not doc_id.strip():
raise ValueError("Document ID cannot be empty")
validated_ids.append(doc_id.strip())
# Check for duplicates
if len(validated_ids) != len(set(validated_ids)):
raise ValueError("Document IDs must be unique")
return validated_ids
class DeleteEntityRequest(BaseModel):
@ -782,8 +793,8 @@ async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager):
logger.error(traceback.format_exc())
async def background_delete_document(rag: LightRAG, doc_id: str):
"""Background task to delete a document"""
async def background_delete_documents(rag: LightRAG, doc_ids: List[str]):
"""Background task to delete multiple documents"""
from lightrag.kg.shared_storage import (
get_namespace_data,
get_pipeline_status_lock,
@ -792,13 +803,20 @@ async def background_delete_document(rag: LightRAG, doc_id: str):
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
total_docs = len(doc_ids)
successful_deletions = []
failed_deletions = []
# Set pipeline status to busy for deletion
async with pipeline_status_lock:
pipeline_status.update(
{
"busy": True,
"job_name": f"Deleting Document: {doc_id}",
"job_name": f"Deleting {total_docs} Documents",
"job_start": datetime.now().isoformat(),
"docs": total_docs,
"batchs": total_docs,
"cur_batch": 0,
"latest_message": "Starting document deletion process",
}
)
@ -806,40 +824,81 @@ async def background_delete_document(rag: LightRAG, doc_id: str):
pipeline_status["history_messages"][:] = ["Starting document deletion process"]
try:
result = await rag.adelete_by_doc_id(doc_id)
if "history_messages" in pipeline_status:
pipeline_status["history_messages"].append(result.message)
# Loop through each document ID and delete them one by one
for i, doc_id in enumerate(doc_ids, 1):
async with pipeline_status_lock:
pipeline_status["cur_batch"] = i
pipeline_status["latest_message"] = (
f"Deleting document {i}/{total_docs}: {doc_id}"
)
pipeline_status["history_messages"].append(
f"Processing document {i}/{total_docs}: {doc_id}"
)
logger.info(f"Document deletion completed for {doc_id}: {result.status}")
try:
result = await rag.adelete_by_doc_id(doc_id)
if result.status == "success":
successful_deletions.append(doc_id)
success_msg = (
f"Successfully deleted document {i}/{total_docs}: {doc_id}"
)
logger.info(success_msg)
async with pipeline_status_lock:
pipeline_status["history_messages"].append(success_msg)
else:
failed_deletions.append(doc_id)
error_msg = f"Failed to delete document {i}/{total_docs}: {doc_id} - {result.message}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["history_messages"].append(error_msg)
except Exception as e:
failed_deletions.append(doc_id)
error_msg = (
f"Error deleting document {i}/{total_docs}: {doc_id} - {str(e)}"
)
logger.error(error_msg)
logger.error(traceback.format_exc())
async with pipeline_status_lock:
pipeline_status["history_messages"].append(error_msg)
# Final summary
summary_msg = f"Deletion completed: {len(successful_deletions)} successful, {len(failed_deletions)} failed"
logger.info(summary_msg)
async with pipeline_status_lock:
pipeline_status["history_messages"].append(summary_msg)
if successful_deletions:
pipeline_status["history_messages"].append(
f"Successfully deleted: {', '.join(successful_deletions)}"
)
if failed_deletions:
pipeline_status["history_messages"].append(
f"Failed to delete: {', '.join(failed_deletions)}"
)
except Exception as e:
error_msg = f"Error deleting document {doc_id}: {str(e)}"
error_msg = f"Critical error during batch deletion: {str(e)}"
logger.error(error_msg)
logger.error(traceback.format_exc())
if "history_messages" in pipeline_status:
async with pipeline_status_lock:
pipeline_status["history_messages"].append(error_msg)
finally:
async with pipeline_status_lock:
pipeline_status["busy"] = False
completion_msg = "Document deletion process completed."
pipeline_status["latest_message"] = completion_msg
if "history_messages" in pipeline_status:
pipeline_status["history_messages"].append(completion_msg)
pipeline_status["history_messages"].append(completion_msg)
def create_document_routes(
rag: LightRAG, doc_manager: DocumentManager, api_key: Optional[str] = None
):
# Check if doc_id exists in the system - add this check at the beginning
async def check_doc_id_exists(doc_id: str) -> bool:
"""Check if document ID exists in the system"""
try:
doc_status = await rag.doc_status.get_by_id(doc_id)
return doc_status is not None
except Exception as e:
logger.error(f"Error checking doc_id existence: {str(e)}")
return False
# Create combined auth dependency for document routes
combined_auth = get_combined_auth_dependency(api_key)
@ -1326,9 +1385,9 @@ def create_document_routes(
background_tasks: BackgroundTasks,
) -> DeleteDocByIdResponse:
"""
Delete a document and all its associated data by its ID using background processing.
Delete documents and all their associated data by their IDs using background processing.
Deletes a specific document and all its associated data, including its status,
Deletes specific documents and all their associated data, including their status,
text chunks, vector embeddings, and any related graph data.
The deletion process runs in the background to avoid blocking the client connection.
It is disabled when llm cache for entity extraction is disabled.
@ -1336,7 +1395,7 @@ def create_document_routes(
This operation is irreversible and will interact with the pipeline status.
Args:
delete_request (DeleteDocRequest): The request containing the document ID.
delete_request (DeleteDocRequest): The request containing the document IDs.
background_tasks: FastAPI BackgroundTasks for async processing
Returns:
@ -1349,10 +1408,7 @@ def create_document_routes(
HTTPException:
- 500: If an unexpected internal error occurs during initialization.
"""
# Check if doc_id exists first - return error immediately if not found
doc_id = delete_request.doc_id
if not await check_doc_id_exists(doc_id):
raise HTTPException(status_code=404, detail=f"Document {doc_id} not found.")
doc_ids = delete_request.doc_ids
# The rag object is initialized from the server startup args,
# so we can access its properties here.
@ -1360,7 +1416,7 @@ def create_document_routes(
return DeleteDocByIdResponse(
status="not_allowed",
message="Operation not allowed when LLM cache for entity extraction is disabled.",
doc_id=delete_request.doc_id,
doc_id=", ".join(delete_request.doc_ids),
)
try:
@ -1372,21 +1428,21 @@ def create_document_routes(
if pipeline_status.get("busy", False):
return DeleteDocByIdResponse(
status="busy",
message="Cannot delete document while pipeline is busy",
doc_id=doc_id,
message="Cannot delete documents while pipeline is busy",
doc_id=", ".join(doc_ids),
)
# Add deletion task to background tasks
background_tasks.add_task(background_delete_document, rag, doc_id)
background_tasks.add_task(background_delete_documents, rag, doc_ids)
return DeleteDocByIdResponse(
status="deletion_started",
message=f"Document deletion for '{doc_id}' has been initiated. Processing will continue in background.",
doc_id=doc_id,
message=f"Document deletion for {len(doc_ids)} documents has been initiated. Processing will continue in background.",
doc_id=", ".join(doc_ids),
)
except Exception as e:
error_msg = f"Error initiating document deletion for {delete_request.doc_id}: {str(e)}"
error_msg = f"Error initiating document deletion for {delete_request.doc_ids}: {str(e)}"
logger.error(error_msg)
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=error_msg)

View file

@ -1961,12 +1961,6 @@ class LightRAG:
logger.error(f"Failed to delete document and status: {e}")
raise Exception(f"Failed to delete document and status: {e}") from e
async with pipeline_status_lock:
log_message = f"Successfully deleted document {doc_id}"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
return DeletionResult(
status="success",
doc_id=doc_id,