fix: resolve "Task exception was never retrieved" warnings in async task handling

- Handle multiple simultaneous exceptions correctly
- Maintain fast-fail behavior while ensuring proper exception cleanup to
  prevent asyncio warnings
This commit is contained in:
yangdx 2025-09-04 12:40:41 +08:00
parent c903b14849
commit 83b54975a2

View file

@ -679,19 +679,34 @@ async def _rebuild_knowledge_from_chunks(
# Execute all tasks in parallel with semaphore control and early failure detection
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
# Check if any task raised an exception
# Check if any task raised an exception and ensure all exceptions are retrieved
first_exception = None
for task in done:
if task.exception():
# If a task failed, cancel all pending tasks
for pending_task in pending:
pending_task.cancel()
try:
exception = task.exception()
if exception is not None:
if first_exception is None:
first_exception = exception
else:
# Task completed successfully, retrieve result to mark as processed
task.result()
except Exception as e:
if first_exception is None:
first_exception = e
# Wait for cancellation to complete
if pending:
await asyncio.wait(pending)
# If any task failed, cancel all pending tasks and raise the first exception
if first_exception is not None:
# Cancel all pending tasks
for pending_task in pending:
pending_task.cancel()
# Re-raise the exception to notify the caller
raise task.exception()
# Wait for cancellation to complete
if pending:
await asyncio.wait(pending)
# Re-raise the first exception to notify the caller
raise first_exception
# Final status report
status_message = f"KG rebuild completed: {rebuilt_entities_count} entities and {rebuilt_relationships_count} relationships rebuilt successfully."
@ -1619,17 +1634,32 @@ async def merge_nodes_and_edges(
entity_tasks, return_when=asyncio.FIRST_EXCEPTION
)
# Check if any task raised an exception
# Check if any task raised an exception and ensure all exceptions are retrieved
first_exception = None
successful_results = []
for task in done:
if task.exception():
# If a task failed, cancel all pending tasks
for pending_task in pending:
pending_task.cancel()
# Wait for cancellation to complete
if pending:
await asyncio.wait(pending)
# Re-raise the exception to notify the caller
raise task.exception()
try:
exception = task.exception()
if exception is not None:
if first_exception is None:
first_exception = exception
else:
successful_results.append(task.result())
except Exception as e:
if first_exception is None:
first_exception = e
# If any task failed, cancel all pending tasks and raise the first exception
if first_exception is not None:
# Cancel all pending tasks
for pending_task in pending:
pending_task.cancel()
# Wait for cancellation to complete
if pending:
await asyncio.wait(pending)
# Re-raise the first exception to notify the caller
raise first_exception
# If all tasks completed successfully, collect results
processed_entities = [task.result() for task in entity_tasks]
@ -1737,17 +1767,32 @@ async def merge_nodes_and_edges(
edge_tasks, return_when=asyncio.FIRST_EXCEPTION
)
# Check if any task raised an exception
# Check if any task raised an exception and ensure all exceptions are retrieved
first_exception = None
successful_results = []
for task in done:
if task.exception():
# If a task failed, cancel all pending tasks
for pending_task in pending:
pending_task.cancel()
# Wait for cancellation to complete
if pending:
await asyncio.wait(pending)
# Re-raise the exception to notify the caller
raise task.exception()
try:
exception = task.exception()
if exception is not None:
if first_exception is None:
first_exception = exception
else:
successful_results.append(task.result())
except Exception as e:
if first_exception is None:
first_exception = e
# If any task failed, cancel all pending tasks and raise the first exception
if first_exception is not None:
# Cancel all pending tasks
for pending_task in pending:
pending_task.cancel()
# Wait for cancellation to complete
if pending:
await asyncio.wait(pending)
# Re-raise the first exception to notify the caller
raise first_exception
# If all tasks completed successfully, collect results
for task in edge_tasks:
@ -1995,23 +2040,36 @@ async def extract_entities(
# This allows us to cancel remaining tasks if any task fails
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
# Check if any task raised an exception
# Check if any task raised an exception and ensure all exceptions are retrieved
first_exception = None
chunk_results = []
for task in done:
if task.exception():
# If a task failed, cancel all pending tasks
# This prevents unnecessary processing since the parent function will abort anyway
for pending_task in pending:
pending_task.cancel()
try:
exception = task.exception()
if exception is not None:
if first_exception is None:
first_exception = exception
else:
chunk_results.append(task.result())
except Exception as e:
if first_exception is None:
first_exception = e
# Wait for cancellation to complete
if pending:
await asyncio.wait(pending)
# If any task failed, cancel all pending tasks and raise the first exception
if first_exception is not None:
# Cancel all pending tasks
for pending_task in pending:
pending_task.cancel()
# Re-raise the exception to notify the caller
raise task.exception()
# Wait for cancellation to complete
if pending:
await asyncio.wait(pending)
# If all tasks completed successfully, collect results
chunk_results = [task.result() for task in tasks]
# Re-raise the first exception to notify the caller
raise first_exception
# If all tasks completed successfully, chunk_results already contains the results
# Return the chunk_results for later processing in merge_nodes_and_edges
return chunk_results