Add pipeline status validation before document deletion
(cherry picked from commit 9d7b7981ce)
This commit is contained in:
parent
94ae13a037
commit
f6a45245bd
1 changed files with 45 additions and 6 deletions
|
|
@ -664,6 +664,16 @@ class LightRAG:
|
||||||
default_workspace = get_default_workspace()
|
default_workspace = get_default_workspace()
|
||||||
if default_workspace is None:
|
if default_workspace is None:
|
||||||
set_default_workspace(self.workspace)
|
set_default_workspace(self.workspace)
|
||||||
|
elif default_workspace != self.workspace:
|
||||||
|
logger.warning(
|
||||||
|
f"Creating LightRAG instance with workspace='{self.workspace}' "
|
||||||
|
f"but default workspace is already set to '{default_workspace}'."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Auto-initialize pipeline_status for this workspace
|
||||||
|
from lightrag.kg.shared_storage import initialize_pipeline_status
|
||||||
|
|
||||||
|
await initialize_pipeline_status(workspace=self.workspace)
|
||||||
|
|
||||||
for storage in (
|
for storage in (
|
||||||
self.full_docs,
|
self.full_docs,
|
||||||
|
|
@ -1599,8 +1609,12 @@ class LightRAG:
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Get pipeline status shared data and lock
|
# Get pipeline status shared data and lock
|
||||||
pipeline_status = await get_namespace_data("pipeline_status")
|
pipeline_status = await get_namespace_data(
|
||||||
pipeline_status_lock = get_namespace_lock("pipeline_status")
|
"pipeline_status", workspace=self.workspace
|
||||||
|
)
|
||||||
|
pipeline_status_lock = get_namespace_lock(
|
||||||
|
"pipeline_status", workspace=self.workspace
|
||||||
|
)
|
||||||
|
|
||||||
# Check if another process is already processing the queue
|
# Check if another process is already processing the queue
|
||||||
async with pipeline_status_lock:
|
async with pipeline_status_lock:
|
||||||
|
|
@ -2947,14 +2961,39 @@ class LightRAG:
|
||||||
- `status_code` (int): HTTP status code (e.g., 200, 404, 500).
|
- `status_code` (int): HTTP status code (e.g., 200, 404, 500).
|
||||||
- `file_path` (str | None): The file path of the deleted document, if available.
|
- `file_path` (str | None): The file path of the deleted document, if available.
|
||||||
"""
|
"""
|
||||||
|
# Get pipeline status shared data and lock for validation
|
||||||
|
pipeline_status = await get_namespace_data(
|
||||||
|
"pipeline_status", workspace=self.workspace
|
||||||
|
)
|
||||||
|
pipeline_status_lock = get_namespace_lock(
|
||||||
|
"pipeline_status", workspace=self.workspace
|
||||||
|
)
|
||||||
|
|
||||||
|
# Validate pipeline status before proceeding with deletion
|
||||||
|
async with pipeline_status_lock:
|
||||||
|
if not pipeline_status.get("busy", False):
|
||||||
|
return DeletionResult(
|
||||||
|
status="not_allowed",
|
||||||
|
doc_id=doc_id,
|
||||||
|
message="Deletion not allowed: pipeline is not busy",
|
||||||
|
status_code=403,
|
||||||
|
file_path=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
job_name = pipeline_status.get("job_name", "").lower()
|
||||||
|
if "deleting" not in job_name or "document" not in job_name:
|
||||||
|
return DeletionResult(
|
||||||
|
status="not_allowed",
|
||||||
|
doc_id=doc_id,
|
||||||
|
message=f"Deletion not allowed: current job '{pipeline_status.get('job_name')}' is not a document deletion job",
|
||||||
|
status_code=403,
|
||||||
|
file_path=None,
|
||||||
|
)
|
||||||
|
|
||||||
deletion_operations_started = False
|
deletion_operations_started = False
|
||||||
original_exception = None
|
original_exception = None
|
||||||
doc_llm_cache_ids: list[str] = []
|
doc_llm_cache_ids: list[str] = []
|
||||||
|
|
||||||
# Get pipeline status shared data and lock for status updates
|
|
||||||
pipeline_status = await get_namespace_data("pipeline_status")
|
|
||||||
pipeline_status_lock = get_namespace_lock("pipeline_status")
|
|
||||||
|
|
||||||
async with pipeline_status_lock:
|
async with pipeline_status_lock:
|
||||||
log_message = f"Starting deletion process for document {doc_id}"
|
log_message = f"Starting deletion process for document {doc_id}"
|
||||||
logger.info(log_message)
|
logger.info(log_message)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue