From 94cbef44ed9f17f203e4e7aeead7a177520aa03c Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Wed, 27 Aug 2025 16:42:53 +0200 Subject: [PATCH] feat: Return async gather for documents --- .../modules/pipelines/operations/run_tasks.py | 58 ++++++------------- 1 file changed, 17 insertions(+), 41 deletions(-) diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index 369f3cfc2..62d4972ad 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -266,48 +266,24 @@ async def run_tasks( if incremental_loading: data = await resolve_data_directories(data) - # TODO: Return to using async.gather for data items after Cognee release - # # Create async tasks per data item that will run the pipeline for the data item - # data_item_tasks = [ - # asyncio.create_task( - # _run_tasks_data_item( - # data_item, - # dataset, - # tasks, - # pipeline_name, - # pipeline_id, - # pipeline_run_id, - # context, - # user, - # incremental_loading, - # ) - # ) - # for data_item in data - # ] - # results = await asyncio.gather(*data_item_tasks) - # # Remove skipped data items from results - # results = [result for result in results if result] - - ### TEMP sync data item handling - results = [] - # Run the pipeline for each data_item sequentially, one after the other - for data_item in data: - result = await _run_tasks_data_item( - data_item, - dataset, - tasks, - pipeline_name, - pipeline_id, - pipeline_run_id, - context, - user, - incremental_loading, + # Create async tasks per data item that will run the pipeline for the data item + data_item_tasks = [ + asyncio.create_task( + _run_tasks_data_item( + data_item, + dataset, + tasks, + pipeline_name, + pipeline_id, + pipeline_run_id, + context, + user, + incremental_loading, + ) ) - - # Skip items that returned a false-y value - if result: - results.append(result) - ### END + for data_item in data + ] + results = await asyncio.gather(*data_item_tasks) # Remove skipped data items from results results = [result for result in results if result]