Fix: auto-acquire pipeline when idle in document deletion
• Track if we acquired the pipeline lock • Auto-acquire pipeline when idle • Only release if we acquired it • Prevent concurrent deletion conflicts • Improve deletion job validation
This commit is contained in:
parent
1745b30a5f
commit
4048fc4b89
1 changed files with 46 additions and 17 deletions
|
|
@ -2969,26 +2969,43 @@ class LightRAG:
|
||||||
"pipeline_status", workspace=self.workspace
|
"pipeline_status", workspace=self.workspace
|
||||||
)
|
)
|
||||||
|
|
||||||
# Validate pipeline status before proceeding with deletion
|
# Track whether WE acquired the pipeline
|
||||||
|
we_acquired_pipeline = False
|
||||||
|
|
||||||
|
# Check and acquire pipeline if needed
|
||||||
async with pipeline_status_lock:
|
async with pipeline_status_lock:
|
||||||
if not pipeline_status.get("busy", False):
|
if not pipeline_status.get("busy", False):
|
||||||
return DeletionResult(
|
# Pipeline is idle - WE acquire it for this deletion
|
||||||
status="not_allowed",
|
we_acquired_pipeline = True
|
||||||
doc_id=doc_id,
|
pipeline_status.update(
|
||||||
message="Deletion not allowed: pipeline is not busy",
|
{
|
||||||
status_code=403,
|
"busy": True,
|
||||||
file_path=None,
|
"job_name": "Deleting 1 document",
|
||||||
)
|
"job_start": datetime.now(timezone.utc).isoformat(),
|
||||||
|
"docs": 1,
|
||||||
job_name = pipeline_status.get("job_name", "").lower()
|
"batchs": 1,
|
||||||
if "deleting" not in job_name or "document" not in job_name:
|
"cur_batch": 0,
|
||||||
return DeletionResult(
|
"request_pending": False,
|
||||||
status="not_allowed",
|
"cancellation_requested": False,
|
||||||
doc_id=doc_id,
|
"latest_message": f"Starting deletion for document: {doc_id}",
|
||||||
message=f"Deletion not allowed: current job '{pipeline_status.get('job_name')}' is not a document deletion job",
|
}
|
||||||
status_code=403,
|
|
||||||
file_path=None,
|
|
||||||
)
|
)
|
||||||
|
# Initialize history messages
|
||||||
|
pipeline_status["history_messages"][:] = [
|
||||||
|
f"Starting deletion for document: {doc_id}"
|
||||||
|
]
|
||||||
|
else:
|
||||||
|
# Pipeline already busy - verify it's a deletion job
|
||||||
|
job_name = pipeline_status.get("job_name", "").lower()
|
||||||
|
if "deleting" not in job_name or "document" not in job_name:
|
||||||
|
return DeletionResult(
|
||||||
|
status="not_allowed",
|
||||||
|
doc_id=doc_id,
|
||||||
|
message=f"Deletion not allowed: current job '{pipeline_status.get('job_name')}' is not a document deletion job",
|
||||||
|
status_code=403,
|
||||||
|
file_path=None,
|
||||||
|
)
|
||||||
|
# Pipeline is busy with deletion - proceed without acquiring
|
||||||
|
|
||||||
deletion_operations_started = False
|
deletion_operations_started = False
|
||||||
original_exception = None
|
original_exception = None
|
||||||
|
|
@ -3606,6 +3623,18 @@ class LightRAG:
|
||||||
f"No deletion operations were started for document {doc_id}, skipping persistence"
|
f"No deletion operations were started for document {doc_id}, skipping persistence"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Release pipeline only if WE acquired it
|
||||||
|
if we_acquired_pipeline:
|
||||||
|
async with pipeline_status_lock:
|
||||||
|
pipeline_status["busy"] = False
|
||||||
|
pipeline_status["cancellation_requested"] = False
|
||||||
|
completion_msg = (
|
||||||
|
f"Deletion process completed for document: {doc_id}"
|
||||||
|
)
|
||||||
|
pipeline_status["latest_message"] = completion_msg
|
||||||
|
pipeline_status["history_messages"].append(completion_msg)
|
||||||
|
logger.info(completion_msg)
|
||||||
|
|
||||||
async def adelete_by_entity(self, entity_name: str) -> DeletionResult:
|
async def adelete_by_entity(self, entity_name: str) -> DeletionResult:
|
||||||
"""Asynchronously delete an entity and all its relationships.
|
"""Asynchronously delete an entity and all its relationships.
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue