refactor: Add maximum document batch size for document processing

This commit is contained in:
Igor Ilic 2025-09-25 18:03:17 +02:00
parent 997b85e1ce
commit 6636fe8afd

View file

@ -37,6 +37,8 @@ from ..tasks.task import Task
logger = get_logger("run_tasks(tasks: [Task], data)") logger = get_logger("run_tasks(tasks: [Task], data)")
# TODO: See if this parameter should be configurable as input for run_tasks itself
DOCUMENT_BATCH_SIZE = 10
def override_run_tasks(new_gen): def override_run_tasks(new_gen):
@ -266,24 +268,29 @@ async def run_tasks(
if incremental_loading: if incremental_loading:
data = await resolve_data_directories(data) data = await resolve_data_directories(data)
# Create async tasks per data item that will run the pipeline for the data item # Create and gather batches of async tasks of data items that will run the pipeline for the data item
data_item_tasks = [ results = []
asyncio.create_task( for start in range(0, len(data), DOCUMENT_BATCH_SIZE):
_run_tasks_data_item( document_batch = data[start : start + DOCUMENT_BATCH_SIZE]
data_item,
dataset, data_item_tasks = [
tasks, asyncio.create_task(
pipeline_name, _run_tasks_data_item(
pipeline_id, data_item,
pipeline_run_id, dataset,
context, tasks,
user, pipeline_name,
incremental_loading, pipeline_id,
pipeline_run_id,
context,
user,
incremental_loading,
)
) )
) for data_item in document_batch
for data_item in data ]
]
results = await asyncio.gather(*data_item_tasks) results.extend(await asyncio.gather(*data_item_tasks))
# Remove skipped data items from results # Remove skipped data items from results
results = [result for result in results if result] results = [result for result in results if result]