Fix document deletion concurrency control and validation logic
• Clarify job naming for single vs batch deletion • Update job name validation in busy pipeline check
This commit is contained in:
parent
656025b75e
commit
702cfd2981
2 changed files with 25 additions and 4 deletions
|
|
@ -1665,6 +1665,7 @@ async def background_delete_documents(
|
|||
pipeline_status.update(
|
||||
{
|
||||
"busy": True,
|
||||
# Job name can not be changed, it's verified in adelete_by_doc_id()
|
||||
"job_name": f"Deleting {total_docs} Documents",
|
||||
"job_start": datetime.now().isoformat(),
|
||||
"docs": total_docs,
|
||||
|
|
|
|||
|
|
@ -2948,6 +2948,26 @@ class LightRAG:
|
|||
data across different storage layers are removed or rebuiled. If entities or relationships
|
||||
are partially affected, they will be rebuilded using LLM cached from remaining documents.
|
||||
|
||||
**Concurrency Control Design:**
|
||||
|
||||
This function implements a pipeline-based concurrency control to prevent data corruption:
|
||||
|
||||
1. **Single Document Deletion** (when WE acquire pipeline):
|
||||
- Sets job_name to "Single document deletion" (NOT starting with "deleting")
|
||||
- Prevents other adelete_by_doc_id calls from running concurrently
|
||||
- Ensures exclusive access to graph operations for this deletion
|
||||
|
||||
2. **Batch Document Deletion** (when background_delete_documents acquires pipeline):
|
||||
- Sets job_name to "Deleting {N} Documents" (starts with "deleting")
|
||||
- Allows multiple adelete_by_doc_id calls to join the deletion queue
|
||||
- Each call validates the job name to ensure it's part of a deletion operation
|
||||
|
||||
The validation logic `if not job_name.startswith("deleting") or "document" not in job_name`
|
||||
ensures that:
|
||||
- adelete_by_doc_id can only run when pipeline is idle OR during batch deletion
|
||||
- Prevents concurrent single deletions that could cause race conditions
|
||||
- Rejects operations when pipeline is busy with non-deletion tasks
|
||||
|
||||
Args:
|
||||
doc_id (str): The unique identifier of the document to be deleted.
|
||||
delete_llm_cache (bool): Whether to delete cached LLM extraction results
|
||||
|
|
@ -2955,10 +2975,10 @@ class LightRAG:
|
|||
|
||||
Returns:
|
||||
DeletionResult: An object containing the outcome of the deletion process.
|
||||
- `status` (str): "success", "not_found", or "failure".
|
||||
- `status` (str): "success", "not_found", "not_allowed", or "failure".
|
||||
- `doc_id` (str): The ID of the document attempted to be deleted.
|
||||
- `message` (str): A summary of the operation's result.
|
||||
- `status_code` (int): HTTP status code (e.g., 200, 404, 500).
|
||||
- `status_code` (int): HTTP status code (e.g., 200, 404, 403, 500).
|
||||
- `file_path` (str | None): The file path of the deleted document, if available.
|
||||
"""
|
||||
# Get pipeline status shared data and lock for validation
|
||||
|
|
@ -2980,7 +3000,7 @@ class LightRAG:
|
|||
pipeline_status.update(
|
||||
{
|
||||
"busy": True,
|
||||
"job_name": "Deleting 1 document",
|
||||
"job_name": "Single document deletion",
|
||||
"job_start": datetime.now(timezone.utc).isoformat(),
|
||||
"docs": 1,
|
||||
"batchs": 1,
|
||||
|
|
@ -2997,7 +3017,7 @@ class LightRAG:
|
|||
else:
|
||||
# Pipeline already busy - verify it's a deletion job
|
||||
job_name = pipeline_status.get("job_name", "").lower()
|
||||
if "deleting" not in job_name or "document" not in job_name:
|
||||
if not job_name.startswith("deleting") or "document" not in job_name:
|
||||
return DeletionResult(
|
||||
status="not_allowed",
|
||||
doc_id=doc_id,
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue