From f6a45245bd42a02bda74ddb22877fa70e2e4f3fd Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 17 Nov 2025 14:58:10 +0800 Subject: [PATCH] Add pipeline status validation before document deletion (cherry picked from commit 9d7b7981ce8170226cdfe00ea24e0e0feab76816) --- lightrag/lightrag.py | 51 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 45 insertions(+), 6 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index a9eb60d4..fc4908cc 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -664,6 +664,16 @@ class LightRAG: default_workspace = get_default_workspace() if default_workspace is None: set_default_workspace(self.workspace) + elif default_workspace != self.workspace: + logger.warning( + f"Creating LightRAG instance with workspace='{self.workspace}' " + f"but default workspace is already set to '{default_workspace}'." + ) + + # Auto-initialize pipeline_status for this workspace + from lightrag.kg.shared_storage import initialize_pipeline_status + + await initialize_pipeline_status(workspace=self.workspace) for storage in ( self.full_docs, @@ -1599,8 +1609,12 @@ class LightRAG: """ # Get pipeline status shared data and lock - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_namespace_lock("pipeline_status") + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=self.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=self.workspace + ) # Check if another process is already processing the queue async with pipeline_status_lock: @@ -2947,14 +2961,39 @@ class LightRAG: - `status_code` (int): HTTP status code (e.g., 200, 404, 500). - `file_path` (str | None): The file path of the deleted document, if available. """ + # Get pipeline status shared data and lock for validation + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=self.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=self.workspace + ) + + # Validate pipeline status before proceeding with deletion + async with pipeline_status_lock: + if not pipeline_status.get("busy", False): + return DeletionResult( + status="not_allowed", + doc_id=doc_id, + message="Deletion not allowed: pipeline is not busy", + status_code=403, + file_path=None, + ) + + job_name = pipeline_status.get("job_name", "").lower() + if "deleting" not in job_name or "document" not in job_name: + return DeletionResult( + status="not_allowed", + doc_id=doc_id, + message=f"Deletion not allowed: current job '{pipeline_status.get('job_name')}' is not a document deletion job", + status_code=403, + file_path=None, + ) + deletion_operations_started = False original_exception = None doc_llm_cache_ids: list[str] = [] - # Get pipeline status shared data and lock for status updates - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_namespace_lock("pipeline_status") - async with pipeline_status_lock: log_message = f"Starting deletion process for document {doc_id}" logger.info(log_message)