feat: Return async gather for documents

This commit is contained in:
Igor Ilic 2025-08-27 16:42:53 +02:00
parent cc5e68472b
commit 94cbef44ed

View file

@ -266,48 +266,24 @@ async def run_tasks(
if incremental_loading: if incremental_loading:
data = await resolve_data_directories(data) data = await resolve_data_directories(data)
# TODO: Return to using async.gather for data items after Cognee release # Create async tasks per data item that will run the pipeline for the data item
# # Create async tasks per data item that will run the pipeline for the data item data_item_tasks = [
# data_item_tasks = [ asyncio.create_task(
# asyncio.create_task( _run_tasks_data_item(
# _run_tasks_data_item( data_item,
# data_item, dataset,
# dataset, tasks,
# tasks, pipeline_name,
# pipeline_name, pipeline_id,
# pipeline_id, pipeline_run_id,
# pipeline_run_id, context,
# context, user,
# user, incremental_loading,
# incremental_loading, )
# )
# )
# for data_item in data
# ]
# results = await asyncio.gather(*data_item_tasks)
# # Remove skipped data items from results
# results = [result for result in results if result]
### TEMP sync data item handling
results = []
# Run the pipeline for each data_item sequentially, one after the other
for data_item in data:
result = await _run_tasks_data_item(
data_item,
dataset,
tasks,
pipeline_name,
pipeline_id,
pipeline_run_id,
context,
user,
incremental_loading,
) )
for data_item in data
# Skip items that returned a false-y value ]
if result: results = await asyncio.gather(*data_item_tasks)
results.append(result)
### END
# Remove skipped data items from results # Remove skipped data items from results
results = [result for result in results if result] results = [result for result in results if result]