Fix document deletion concurrency control and validation logic

• Clarify job naming for single vs batch deletion
• Update job name validation in busy pipeline check
This commit is contained in:
yangdx 2025-11-18 13:59:24 +08:00
parent 656025b75e
commit 702cfd2981
2 changed files with 25 additions and 4 deletions

View file

@ -1665,6 +1665,7 @@ async def background_delete_documents(
pipeline_status.update( pipeline_status.update(
{ {
"busy": True, "busy": True,
# Job name can not be changed, it's verified in adelete_by_doc_id()
"job_name": f"Deleting {total_docs} Documents", "job_name": f"Deleting {total_docs} Documents",
"job_start": datetime.now().isoformat(), "job_start": datetime.now().isoformat(),
"docs": total_docs, "docs": total_docs,

View file

@ -2948,6 +2948,26 @@ class LightRAG:
data across different storage layers are removed or rebuiled. If entities or relationships data across different storage layers are removed or rebuiled. If entities or relationships
are partially affected, they will be rebuilded using LLM cached from remaining documents. are partially affected, they will be rebuilded using LLM cached from remaining documents.
**Concurrency Control Design:**
This function implements a pipeline-based concurrency control to prevent data corruption:
1. **Single Document Deletion** (when WE acquire pipeline):
- Sets job_name to "Single document deletion" (NOT starting with "deleting")
- Prevents other adelete_by_doc_id calls from running concurrently
- Ensures exclusive access to graph operations for this deletion
2. **Batch Document Deletion** (when background_delete_documents acquires pipeline):
- Sets job_name to "Deleting {N} Documents" (starts with "deleting")
- Allows multiple adelete_by_doc_id calls to join the deletion queue
- Each call validates the job name to ensure it's part of a deletion operation
The validation logic `if not job_name.startswith("deleting") or "document" not in job_name`
ensures that:
- adelete_by_doc_id can only run when pipeline is idle OR during batch deletion
- Prevents concurrent single deletions that could cause race conditions
- Rejects operations when pipeline is busy with non-deletion tasks
Args: Args:
doc_id (str): The unique identifier of the document to be deleted. doc_id (str): The unique identifier of the document to be deleted.
delete_llm_cache (bool): Whether to delete cached LLM extraction results delete_llm_cache (bool): Whether to delete cached LLM extraction results
@ -2955,10 +2975,10 @@ class LightRAG:
Returns: Returns:
DeletionResult: An object containing the outcome of the deletion process. DeletionResult: An object containing the outcome of the deletion process.
- `status` (str): "success", "not_found", or "failure". - `status` (str): "success", "not_found", "not_allowed", or "failure".
- `doc_id` (str): The ID of the document attempted to be deleted. - `doc_id` (str): The ID of the document attempted to be deleted.
- `message` (str): A summary of the operation's result. - `message` (str): A summary of the operation's result.
- `status_code` (int): HTTP status code (e.g., 200, 404, 500). - `status_code` (int): HTTP status code (e.g., 200, 404, 403, 500).
- `file_path` (str | None): The file path of the deleted document, if available. - `file_path` (str | None): The file path of the deleted document, if available.
""" """
# Get pipeline status shared data and lock for validation # Get pipeline status shared data and lock for validation
@ -2980,7 +3000,7 @@ class LightRAG:
pipeline_status.update( pipeline_status.update(
{ {
"busy": True, "busy": True,
"job_name": "Deleting 1 document", "job_name": "Single document deletion",
"job_start": datetime.now(timezone.utc).isoformat(), "job_start": datetime.now(timezone.utc).isoformat(),
"docs": 1, "docs": 1,
"batchs": 1, "batchs": 1,
@ -2997,7 +3017,7 @@ class LightRAG:
else: else:
# Pipeline already busy - verify it's a deletion job # Pipeline already busy - verify it's a deletion job
job_name = pipeline_status.get("job_name", "").lower() job_name = pipeline_status.get("job_name", "").lower()
if "deleting" not in job_name or "document" not in job_name: if not job_name.startswith("deleting") or "document" not in job_name:
return DeletionResult( return DeletionResult(
status="not_allowed", status="not_allowed",
doc_id=doc_id, doc_id=doc_id,