From 4048fc4b89f7c6059c27d46e7dd75e2a13b51712 Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 18 Nov 2025 13:25:13 +0800 Subject: [PATCH] Fix: auto-acquire pipeline when idle in document deletion MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Track if we acquired the pipeline lock • Auto-acquire pipeline when idle • Only release if we acquired it • Prevent concurrent deletion conflicts • Improve deletion job validation --- lightrag/lightrag.py | 63 ++++++++++++++++++++++++++++++++------------ 1 file changed, 46 insertions(+), 17 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 16becbe7..a575554e 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -2969,26 +2969,43 @@ class LightRAG: "pipeline_status", workspace=self.workspace ) - # Validate pipeline status before proceeding with deletion + # Track whether WE acquired the pipeline + we_acquired_pipeline = False + + # Check and acquire pipeline if needed async with pipeline_status_lock: if not pipeline_status.get("busy", False): - return DeletionResult( - status="not_allowed", - doc_id=doc_id, - message="Deletion not allowed: pipeline is not busy", - status_code=403, - file_path=None, - ) - - job_name = pipeline_status.get("job_name", "").lower() - if "deleting" not in job_name or "document" not in job_name: - return DeletionResult( - status="not_allowed", - doc_id=doc_id, - message=f"Deletion not allowed: current job '{pipeline_status.get('job_name')}' is not a document deletion job", - status_code=403, - file_path=None, + # Pipeline is idle - WE acquire it for this deletion + we_acquired_pipeline = True + pipeline_status.update( + { + "busy": True, + "job_name": "Deleting 1 document", + "job_start": datetime.now(timezone.utc).isoformat(), + "docs": 1, + "batchs": 1, + "cur_batch": 0, + "request_pending": False, + "cancellation_requested": False, + "latest_message": f"Starting deletion for document: {doc_id}", + } ) + # Initialize history messages + pipeline_status["history_messages"][:] = [ + f"Starting deletion for document: {doc_id}" + ] + else: + # Pipeline already busy - verify it's a deletion job + job_name = pipeline_status.get("job_name", "").lower() + if "deleting" not in job_name or "document" not in job_name: + return DeletionResult( + status="not_allowed", + doc_id=doc_id, + message=f"Deletion not allowed: current job '{pipeline_status.get('job_name')}' is not a document deletion job", + status_code=403, + file_path=None, + ) + # Pipeline is busy with deletion - proceed without acquiring deletion_operations_started = False original_exception = None @@ -3606,6 +3623,18 @@ class LightRAG: f"No deletion operations were started for document {doc_id}, skipping persistence" ) + # Release pipeline only if WE acquired it + if we_acquired_pipeline: + async with pipeline_status_lock: + pipeline_status["busy"] = False + pipeline_status["cancellation_requested"] = False + completion_msg = ( + f"Deletion process completed for document: {doc_id}" + ) + pipeline_status["latest_message"] = completion_msg + pipeline_status["history_messages"].append(completion_msg) + logger.info(completion_msg) + async def adelete_by_entity(self, entity_name: str) -> DeletionResult: """Asynchronously delete an entity and all its relationships.