Fix document deletion concurrency control and validation logic

• Clarify job naming for single vs batch deletion
• Update job name validation in busy pipeline check
This commit is contained in:
yangdx 2025-11-18 13:59:24 +08:00
parent 656025b75e
commit 702cfd2981
2 changed files with 25 additions and 4 deletions

View file

@ -1665,6 +1665,7 @@ async def background_delete_documents(
pipeline_status.update(
{
"busy": True,
# Job name can not be changed, it's verified in adelete_by_doc_id()
"job_name": f"Deleting {total_docs} Documents",
"job_start": datetime.now().isoformat(),
"docs": total_docs,

View file

@ -2948,6 +2948,26 @@ class LightRAG:
data across different storage layers are removed or rebuiled. If entities or relationships
are partially affected, they will be rebuilded using LLM cached from remaining documents.
**Concurrency Control Design:**
This function implements a pipeline-based concurrency control to prevent data corruption:
1. **Single Document Deletion** (when WE acquire pipeline):
- Sets job_name to "Single document deletion" (NOT starting with "deleting")
- Prevents other adelete_by_doc_id calls from running concurrently
- Ensures exclusive access to graph operations for this deletion
2. **Batch Document Deletion** (when background_delete_documents acquires pipeline):
- Sets job_name to "Deleting {N} Documents" (starts with "deleting")
- Allows multiple adelete_by_doc_id calls to join the deletion queue
- Each call validates the job name to ensure it's part of a deletion operation
The validation logic `if not job_name.startswith("deleting") or "document" not in job_name`
ensures that:
- adelete_by_doc_id can only run when pipeline is idle OR during batch deletion
- Prevents concurrent single deletions that could cause race conditions
- Rejects operations when pipeline is busy with non-deletion tasks
Args:
doc_id (str): The unique identifier of the document to be deleted.
delete_llm_cache (bool): Whether to delete cached LLM extraction results
@ -2955,10 +2975,10 @@ class LightRAG:
Returns:
DeletionResult: An object containing the outcome of the deletion process.
- `status` (str): "success", "not_found", or "failure".
- `status` (str): "success", "not_found", "not_allowed", or "failure".
- `doc_id` (str): The ID of the document attempted to be deleted.
- `message` (str): A summary of the operation's result.
- `status_code` (int): HTTP status code (e.g., 200, 404, 500).
- `status_code` (int): HTTP status code (e.g., 200, 404, 403, 500).
- `file_path` (str | None): The file path of the deleted document, if available.
"""
# Get pipeline status shared data and lock for validation
@ -2980,7 +3000,7 @@ class LightRAG:
pipeline_status.update(
{
"busy": True,
"job_name": "Deleting 1 document",
"job_name": "Single document deletion",
"job_start": datetime.now(timezone.utc).isoformat(),
"docs": 1,
"batchs": 1,
@ -2997,7 +3017,7 @@ class LightRAG:
else:
# Pipeline already busy - verify it's a deletion job
job_name = pipeline_status.get("job_name", "").lower()
if "deleting" not in job_name or "document" not in job_name:
if not job_name.startswith("deleting") or "document" not in job_name:
return DeletionResult(
status="not_allowed",
doc_id=doc_id,