Feat: Check pending equest_pending after document deletion
- Add double-check for pipeline status to prevent race conditions - Implement automatic processing of pending indexing requests after deletion
This commit is contained in:
parent
6c2ae40d7d
commit
05231233f1
1 changed files with 18 additions and 2 deletions
|
|
@ -861,8 +861,13 @@ async def background_delete_documents(
|
||||||
successful_deletions = []
|
successful_deletions = []
|
||||||
failed_deletions = []
|
failed_deletions = []
|
||||||
|
|
||||||
# Set pipeline status to busy for deletion
|
# Double-check pipeline status before proceeding
|
||||||
async with pipeline_status_lock:
|
async with pipeline_status_lock:
|
||||||
|
if pipeline_status.get("busy", False):
|
||||||
|
logger.warning("Error: Unexpected pipeline busy state, aborting deletion.")
|
||||||
|
return # Abort deletion operation
|
||||||
|
|
||||||
|
# Set pipeline status to busy for deletion
|
||||||
pipeline_status.update(
|
pipeline_status.update(
|
||||||
{
|
{
|
||||||
"busy": True,
|
"busy": True,
|
||||||
|
|
@ -971,12 +976,23 @@ async def background_delete_documents(
|
||||||
async with pipeline_status_lock:
|
async with pipeline_status_lock:
|
||||||
pipeline_status["history_messages"].append(error_msg)
|
pipeline_status["history_messages"].append(error_msg)
|
||||||
finally:
|
finally:
|
||||||
# Final summary
|
# Final summary and check for pending requests
|
||||||
async with pipeline_status_lock:
|
async with pipeline_status_lock:
|
||||||
pipeline_status["busy"] = False
|
pipeline_status["busy"] = False
|
||||||
completion_msg = f"Deletion completed: {len(successful_deletions)} successful, {len(failed_deletions)} failed"
|
completion_msg = f"Deletion completed: {len(successful_deletions)} successful, {len(failed_deletions)} failed"
|
||||||
pipeline_status["latest_message"] = completion_msg
|
pipeline_status["latest_message"] = completion_msg
|
||||||
pipeline_status["history_messages"].append(completion_msg)
|
pipeline_status["history_messages"].append(completion_msg)
|
||||||
|
|
||||||
|
# Check if there are pending document indexing requests
|
||||||
|
has_pending_request = pipeline_status.get("request_pending", False)
|
||||||
|
|
||||||
|
# If there are pending requests, start document processing pipeline
|
||||||
|
if has_pending_request:
|
||||||
|
try:
|
||||||
|
logger.info("Processing pending document indexing requests after deletion")
|
||||||
|
await rag.apipeline_process_enqueue_documents()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error processing pending documents after deletion: {e}")
|
||||||
|
|
||||||
|
|
||||||
def create_document_routes(
|
def create_document_routes(
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue