diff --git a/lightrag/operate.py b/lightrag/operate.py index f5eac4c3..c73139b0 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -31,6 +31,7 @@ from .utils import ( process_chunks_unified, build_file_path, safe_vdb_operation_with_exception, + create_prefixed_exception, ) from .base import ( BaseGraphStorage, @@ -1622,8 +1623,11 @@ async def merge_nodes_and_edges( f"Failed to update pipeline status: {status_error}" ) - # Re-raise the original exception - raise + # Re-raise the original exception with a prefix + prefixed_exception = create_prefixed_exception( + e, f"`{entity_name}`" + ) + raise prefixed_exception from e # Create entity processing tasks entity_tasks = [] @@ -1753,8 +1757,11 @@ async def merge_nodes_and_edges( f"Failed to update pipeline status: {status_error}" ) - # Re-raise the original exception - raise + # Re-raise the original exception with a prefix + prefixed_exception = create_prefixed_exception( + e, f"`{sorted_edge_key}`" + ) + raise prefixed_exception from e # Create relationship processing tasks edge_tasks = [] @@ -2070,11 +2077,14 @@ async def extract_entities( if pending: await asyncio.wait(pending) - # Re-raise the first exception to notify the caller - raise first_exception + # Add progress prefix to the exception message + progress_prefix = f"Chunks[{processed_chunks+1}/{total_chunks}]" + + # Re-raise the original exception with a prefix + prefixed_exception = create_prefixed_exception(first_exception, progress_prefix) + raise prefixed_exception from first_exception # If all tasks completed successfully, chunk_results already contains the results - # Return the chunk_results for later processing in merge_nodes_and_edges return chunk_results diff --git a/lightrag/utils.py b/lightrag/utils.py index e697de8a..f3b3ae44 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -2588,3 +2588,45 @@ def get_pinyin_sort_key(text: str) -> str: else: # pypinyin not available, use simple string sorting return text.lower() + + +def create_prefixed_exception(original_exception: Exception, prefix: str) -> Exception: + """ + Safely create a prefixed exception that adapts to all error types. + + Args: + original_exception: The original exception. + prefix: The prefix to add. + + Returns: + A new exception with the prefix, maintaining the original exception type if possible. + """ + try: + # Method 1: Try to reconstruct using original arguments. + if hasattr(original_exception, "args") and original_exception.args: + args = list(original_exception.args) + # Find the first string argument and prefix it. This is safer for + # exceptions like OSError where the first arg is an integer (errno). + found_str = False + for i, arg in enumerate(args): + if isinstance(arg, str): + args[i] = f"{prefix}: {arg}" + found_str = True + break + + # If no string argument is found, prefix the first argument's string representation. + if not found_str: + args[0] = f"{prefix}: {args[0]}" + + return type(original_exception)(*args) + else: + # Method 2: If no args, try single parameter construction. + return type(original_exception)(f"{prefix}: {str(original_exception)}") + except (TypeError, ValueError, AttributeError) as construct_error: + # Method 3: If reconstruction fails, wrap it in a RuntimeError. + # This is the safest fallback, as attempting to create the same type + # with a single string can fail if the constructor requires multiple arguments. + return RuntimeError( + f"{prefix}: {type(original_exception).__name__}: {str(original_exception)} " + f"(Original exception could not be reconstructed: {construct_error})" + )