Improve error handling and add cancellation checks in pipeline
This commit is contained in:
parent
f89b5ab101
commit
77336e50b6
2 changed files with 67 additions and 23 deletions
|
|
@ -1699,10 +1699,16 @@ class LightRAG:
|
||||||
semaphore: asyncio.Semaphore,
|
semaphore: asyncio.Semaphore,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Process single document"""
|
"""Process single document"""
|
||||||
|
# Initialize variables at the start to prevent UnboundLocalError in error handling
|
||||||
|
file_path = "unknown_source"
|
||||||
|
current_file_number = 0
|
||||||
file_extraction_stage_ok = False
|
file_extraction_stage_ok = False
|
||||||
|
processing_start_time = int(time.time())
|
||||||
|
first_stage_tasks = []
|
||||||
|
entity_relation_task = None
|
||||||
|
|
||||||
async with semaphore:
|
async with semaphore:
|
||||||
nonlocal processed_count
|
nonlocal processed_count
|
||||||
current_file_number = 0
|
|
||||||
# Initialize to prevent UnboundLocalError in error handling
|
# Initialize to prevent UnboundLocalError in error handling
|
||||||
first_stage_tasks = []
|
first_stage_tasks = []
|
||||||
entity_relation_task = None
|
entity_relation_task = None
|
||||||
|
|
@ -1833,16 +1839,29 @@ class LightRAG:
|
||||||
file_extraction_stage_ok = True
|
file_extraction_stage_ok = True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Log error and update pipeline status
|
# Check if this is a user cancellation
|
||||||
logger.error(traceback.format_exc())
|
if isinstance(e, PipelineCancelledException):
|
||||||
error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}"
|
# User cancellation - log brief message only, no traceback
|
||||||
logger.error(error_msg)
|
error_msg = f"User cancelled {current_file_number}/{total_files}: {file_path}"
|
||||||
async with pipeline_status_lock:
|
logger.warning(error_msg)
|
||||||
pipeline_status["latest_message"] = error_msg
|
async with pipeline_status_lock:
|
||||||
pipeline_status["history_messages"].append(
|
pipeline_status["latest_message"] = error_msg
|
||||||
traceback.format_exc()
|
pipeline_status["history_messages"].append(
|
||||||
)
|
error_msg
|
||||||
pipeline_status["history_messages"].append(error_msg)
|
)
|
||||||
|
else:
|
||||||
|
# Other exceptions - log with traceback
|
||||||
|
logger.error(traceback.format_exc())
|
||||||
|
error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}"
|
||||||
|
logger.error(error_msg)
|
||||||
|
async with pipeline_status_lock:
|
||||||
|
pipeline_status["latest_message"] = error_msg
|
||||||
|
pipeline_status["history_messages"].append(
|
||||||
|
traceback.format_exc()
|
||||||
|
)
|
||||||
|
pipeline_status["history_messages"].append(
|
||||||
|
error_msg
|
||||||
|
)
|
||||||
|
|
||||||
# Cancel tasks that are not yet completed
|
# Cancel tasks that are not yet completed
|
||||||
all_tasks = first_stage_tasks + (
|
all_tasks = first_stage_tasks + (
|
||||||
|
|
@ -1951,18 +1970,29 @@ class LightRAG:
|
||||||
)
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Log error and update pipeline status
|
# Check if this is a user cancellation
|
||||||
logger.error(traceback.format_exc())
|
if isinstance(e, PipelineCancelledException):
|
||||||
error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}"
|
# User cancellation - log brief message only, no traceback
|
||||||
logger.error(error_msg)
|
error_msg = f"User cancelled during merge {current_file_number}/{total_files}: {file_path}"
|
||||||
async with pipeline_status_lock:
|
logger.warning(error_msg)
|
||||||
pipeline_status["latest_message"] = error_msg
|
async with pipeline_status_lock:
|
||||||
pipeline_status["history_messages"].append(
|
pipeline_status["latest_message"] = error_msg
|
||||||
traceback.format_exc()
|
pipeline_status["history_messages"].append(
|
||||||
)
|
error_msg
|
||||||
pipeline_status["history_messages"].append(
|
)
|
||||||
error_msg
|
else:
|
||||||
)
|
# Other exceptions - log with traceback
|
||||||
|
logger.error(traceback.format_exc())
|
||||||
|
error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}"
|
||||||
|
logger.error(error_msg)
|
||||||
|
async with pipeline_status_lock:
|
||||||
|
pipeline_status["latest_message"] = error_msg
|
||||||
|
pipeline_status["history_messages"].append(
|
||||||
|
traceback.format_exc()
|
||||||
|
)
|
||||||
|
pipeline_status["history_messages"].append(
|
||||||
|
error_msg
|
||||||
|
)
|
||||||
|
|
||||||
# Persistent llm cache
|
# Persistent llm cache
|
||||||
if self.llm_response_cache:
|
if self.llm_response_cache:
|
||||||
|
|
|
||||||
|
|
@ -1639,6 +1639,12 @@ async def _merge_nodes_then_upsert(
|
||||||
logger.error(f"Entity {entity_name} has no description")
|
logger.error(f"Entity {entity_name} has no description")
|
||||||
raise ValueError(f"Entity {entity_name} has no description")
|
raise ValueError(f"Entity {entity_name} has no description")
|
||||||
|
|
||||||
|
# Check for cancellation before LLM summary
|
||||||
|
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||||
|
async with pipeline_status_lock:
|
||||||
|
if pipeline_status.get("cancellation_requested", False):
|
||||||
|
raise PipelineCancelledException("User cancelled during entity summary")
|
||||||
|
|
||||||
# 8. Get summary description an LLM usage status
|
# 8. Get summary description an LLM usage status
|
||||||
description, llm_was_used = await _handle_entity_relation_summary(
|
description, llm_was_used = await _handle_entity_relation_summary(
|
||||||
"Entity",
|
"Entity",
|
||||||
|
|
@ -1959,6 +1965,14 @@ async def _merge_edges_then_upsert(
|
||||||
logger.error(f"Relation {src_id}~{tgt_id} has no description")
|
logger.error(f"Relation {src_id}~{tgt_id} has no description")
|
||||||
raise ValueError(f"Relation {src_id}~{tgt_id} has no description")
|
raise ValueError(f"Relation {src_id}~{tgt_id} has no description")
|
||||||
|
|
||||||
|
# Check for cancellation before LLM summary
|
||||||
|
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||||
|
async with pipeline_status_lock:
|
||||||
|
if pipeline_status.get("cancellation_requested", False):
|
||||||
|
raise PipelineCancelledException(
|
||||||
|
"User cancelled during relation summary"
|
||||||
|
)
|
||||||
|
|
||||||
# 8. Get summary description an LLM usage status
|
# 8. Get summary description an LLM usage status
|
||||||
description, llm_was_used = await _handle_entity_relation_summary(
|
description, llm_was_used = await _handle_entity_relation_summary(
|
||||||
"Relation",
|
"Relation",
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue