From 83b54975a258265f8694d9fe64dcfb845e183482 Mon Sep 17 00:00:00 2001 From: yangdx Date: Thu, 4 Sep 2025 12:40:41 +0800 Subject: [PATCH] fix: resolve "Task exception was never retrieved" warnings in async task handling - Handle multiple simultaneous exceptions correctly - Maintain fast-fail behavior while ensuring proper exception cleanup to prevent asyncio warnings --- lightrag/operate.py | 144 +++++++++++++++++++++++++++++++------------- 1 file changed, 101 insertions(+), 43 deletions(-) diff --git a/lightrag/operate.py b/lightrag/operate.py index 22ed9117..1ce4630f 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -679,19 +679,34 @@ async def _rebuild_knowledge_from_chunks( # Execute all tasks in parallel with semaphore control and early failure detection done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) - # Check if any task raised an exception + # Check if any task raised an exception and ensure all exceptions are retrieved + first_exception = None + for task in done: - if task.exception(): - # If a task failed, cancel all pending tasks - for pending_task in pending: - pending_task.cancel() + try: + exception = task.exception() + if exception is not None: + if first_exception is None: + first_exception = exception + else: + # Task completed successfully, retrieve result to mark as processed + task.result() + except Exception as e: + if first_exception is None: + first_exception = e - # Wait for cancellation to complete - if pending: - await asyncio.wait(pending) + # If any task failed, cancel all pending tasks and raise the first exception + if first_exception is not None: + # Cancel all pending tasks + for pending_task in pending: + pending_task.cancel() - # Re-raise the exception to notify the caller - raise task.exception() + # Wait for cancellation to complete + if pending: + await asyncio.wait(pending) + + # Re-raise the first exception to notify the caller + raise first_exception # Final status report status_message = f"KG rebuild completed: {rebuilt_entities_count} entities and {rebuilt_relationships_count} relationships rebuilt successfully." @@ -1619,17 +1634,32 @@ async def merge_nodes_and_edges( entity_tasks, return_when=asyncio.FIRST_EXCEPTION ) - # Check if any task raised an exception + # Check if any task raised an exception and ensure all exceptions are retrieved + first_exception = None + successful_results = [] + for task in done: - if task.exception(): - # If a task failed, cancel all pending tasks - for pending_task in pending: - pending_task.cancel() - # Wait for cancellation to complete - if pending: - await asyncio.wait(pending) - # Re-raise the exception to notify the caller - raise task.exception() + try: + exception = task.exception() + if exception is not None: + if first_exception is None: + first_exception = exception + else: + successful_results.append(task.result()) + except Exception as e: + if first_exception is None: + first_exception = e + + # If any task failed, cancel all pending tasks and raise the first exception + if first_exception is not None: + # Cancel all pending tasks + for pending_task in pending: + pending_task.cancel() + # Wait for cancellation to complete + if pending: + await asyncio.wait(pending) + # Re-raise the first exception to notify the caller + raise first_exception # If all tasks completed successfully, collect results processed_entities = [task.result() for task in entity_tasks] @@ -1737,17 +1767,32 @@ async def merge_nodes_and_edges( edge_tasks, return_when=asyncio.FIRST_EXCEPTION ) - # Check if any task raised an exception + # Check if any task raised an exception and ensure all exceptions are retrieved + first_exception = None + successful_results = [] + for task in done: - if task.exception(): - # If a task failed, cancel all pending tasks - for pending_task in pending: - pending_task.cancel() - # Wait for cancellation to complete - if pending: - await asyncio.wait(pending) - # Re-raise the exception to notify the caller - raise task.exception() + try: + exception = task.exception() + if exception is not None: + if first_exception is None: + first_exception = exception + else: + successful_results.append(task.result()) + except Exception as e: + if first_exception is None: + first_exception = e + + # If any task failed, cancel all pending tasks and raise the first exception + if first_exception is not None: + # Cancel all pending tasks + for pending_task in pending: + pending_task.cancel() + # Wait for cancellation to complete + if pending: + await asyncio.wait(pending) + # Re-raise the first exception to notify the caller + raise first_exception # If all tasks completed successfully, collect results for task in edge_tasks: @@ -1995,23 +2040,36 @@ async def extract_entities( # This allows us to cancel remaining tasks if any task fails done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) - # Check if any task raised an exception + # Check if any task raised an exception and ensure all exceptions are retrieved + first_exception = None + chunk_results = [] + for task in done: - if task.exception(): - # If a task failed, cancel all pending tasks - # This prevents unnecessary processing since the parent function will abort anyway - for pending_task in pending: - pending_task.cancel() + try: + exception = task.exception() + if exception is not None: + if first_exception is None: + first_exception = exception + else: + chunk_results.append(task.result()) + except Exception as e: + if first_exception is None: + first_exception = e - # Wait for cancellation to complete - if pending: - await asyncio.wait(pending) + # If any task failed, cancel all pending tasks and raise the first exception + if first_exception is not None: + # Cancel all pending tasks + for pending_task in pending: + pending_task.cancel() - # Re-raise the exception to notify the caller - raise task.exception() + # Wait for cancellation to complete + if pending: + await asyncio.wait(pending) - # If all tasks completed successfully, collect results - chunk_results = [task.result() for task in tasks] + # Re-raise the first exception to notify the caller + raise first_exception + + # If all tasks completed successfully, chunk_results already contains the results # Return the chunk_results for later processing in merge_nodes_and_edges return chunk_results