diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index 62d4972ad..4a86a5807 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -37,6 +37,8 @@ from ..tasks.task import Task logger = get_logger("run_tasks(tasks: [Task], data)") +# TODO: See if this parameter should be configurable as input for run_tasks itself +DOCUMENT_BATCH_SIZE = 10 def override_run_tasks(new_gen): @@ -266,24 +268,29 @@ async def run_tasks( if incremental_loading: data = await resolve_data_directories(data) - # Create async tasks per data item that will run the pipeline for the data item - data_item_tasks = [ - asyncio.create_task( - _run_tasks_data_item( - data_item, - dataset, - tasks, - pipeline_name, - pipeline_id, - pipeline_run_id, - context, - user, - incremental_loading, + # Create and gather batches of async tasks of data items that will run the pipeline for the data item + results = [] + for start in range(0, len(data), DOCUMENT_BATCH_SIZE): + document_batch = data[start : start + DOCUMENT_BATCH_SIZE] + + data_item_tasks = [ + asyncio.create_task( + _run_tasks_data_item( + data_item, + dataset, + tasks, + pipeline_name, + pipeline_id, + pipeline_run_id, + context, + user, + incremental_loading, + ) ) - ) - for data_item in data - ] - results = await asyncio.gather(*data_item_tasks) + for data_item in document_batch + ] + + results.extend(await asyncio.gather(*data_item_tasks)) # Remove skipped data items from results results = [result for result in results if result]