From 6636fe8afdb39b39f1981fe7f1c65c4e087203a1 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Thu, 25 Sep 2025 18:03:17 +0200 Subject: [PATCH 1/4] refactor: Add maximum document batch size for document processing --- .../modules/pipelines/operations/run_tasks.py | 41 +++++++++++-------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index 62d4972ad..4a86a5807 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -37,6 +37,8 @@ from ..tasks.task import Task logger = get_logger("run_tasks(tasks: [Task], data)") +# TODO: See if this parameter should be configurable as input for run_tasks itself +DOCUMENT_BATCH_SIZE = 10 def override_run_tasks(new_gen): @@ -266,24 +268,29 @@ async def run_tasks( if incremental_loading: data = await resolve_data_directories(data) - # Create async tasks per data item that will run the pipeline for the data item - data_item_tasks = [ - asyncio.create_task( - _run_tasks_data_item( - data_item, - dataset, - tasks, - pipeline_name, - pipeline_id, - pipeline_run_id, - context, - user, - incremental_loading, + # Create and gather batches of async tasks of data items that will run the pipeline for the data item + results = [] + for start in range(0, len(data), DOCUMENT_BATCH_SIZE): + document_batch = data[start : start + DOCUMENT_BATCH_SIZE] + + data_item_tasks = [ + asyncio.create_task( + _run_tasks_data_item( + data_item, + dataset, + tasks, + pipeline_name, + pipeline_id, + pipeline_run_id, + context, + user, + incremental_loading, + ) ) - ) - for data_item in data - ] - results = await asyncio.gather(*data_item_tasks) + for data_item in document_batch + ] + + results.extend(await asyncio.gather(*data_item_tasks)) # Remove skipped data items from results results = [result for result in results if result] From ecb285e36613a22d1ad7338b5aa13ade9ff21a9b Mon Sep 17 00:00:00 2001 From: vasilije Date: Sun, 12 Oct 2025 13:46:12 +0200 Subject: [PATCH 2/4] added formatting --- cognee/modules/pipelines/operations/run_tasks.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index 2a5bf81a8..2e0055384 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -91,7 +91,6 @@ async def run_tasks( if incremental_loading: data = await resolve_data_directories(data) - # Create and gather batches of async tasks of data items that will run the pipeline for the data item results = [] for start in range(0, len(data), DOCUMENT_BATCH_SIZE): From 2fb06e07299a53c2e5412cbe30e851b26e97b783 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Wed, 15 Oct 2025 20:18:48 +0200 Subject: [PATCH 3/4] refactor: forwarding of data batch size rework --- cognee/api/v1/add/add.py | 2 ++ cognee/api/v1/cognify/cognify.py | 2 ++ cognee/modules/pipelines/operations/pipeline.py | 5 ++++- cognee/modules/pipelines/operations/run_tasks.py | 12 +++++------- 4 files changed, 13 insertions(+), 8 deletions(-) diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index 65394f1ec..b5a8a230f 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -41,6 +41,7 @@ async def add( extraction_rules: Optional[Dict[str, Any]] = None, tavily_config: Optional[BaseModel] = None, soup_crawler_config: Optional[BaseModel] = None, + data_batch_size: Optional[int] = 20, ): """ Add data to Cognee for knowledge graph processing. @@ -235,6 +236,7 @@ async def add( vector_db_config=vector_db_config, graph_db_config=graph_db_config, incremental_loading=incremental_loading, + data_batch_size=data_batch_size, ): pipeline_run_info = run_info diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index c3045f00a..ab5e4a023 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -51,6 +51,7 @@ async def cognify( incremental_loading: bool = True, custom_prompt: Optional[str] = None, temporal_cognify: bool = False, + data_batch_size: int = 20, ): """ Transform ingested data into a structured knowledge graph. @@ -228,6 +229,7 @@ async def cognify( graph_db_config=graph_db_config, incremental_loading=incremental_loading, pipeline_name="cognify_pipeline", + data_batch_size=data_batch_size, ) diff --git a/cognee/modules/pipelines/operations/pipeline.py b/cognee/modules/pipelines/operations/pipeline.py index b59a171f7..9d61235c1 100644 --- a/cognee/modules/pipelines/operations/pipeline.py +++ b/cognee/modules/pipelines/operations/pipeline.py @@ -35,6 +35,7 @@ async def run_pipeline( vector_db_config: dict = None, graph_db_config: dict = None, incremental_loading: bool = False, + data_batch_size: int = 20, ): validate_pipeline_tasks(tasks) await setup_and_check_environment(vector_db_config, graph_db_config) @@ -50,6 +51,7 @@ async def run_pipeline( pipeline_name=pipeline_name, context={"dataset": dataset}, incremental_loading=incremental_loading, + data_batch_size=data_batch_size, ): yield run_info @@ -62,6 +64,7 @@ async def run_pipeline_per_dataset( pipeline_name: str = "custom_pipeline", context: dict = None, incremental_loading=False, + data_batch_size: int = 20, ): # Will only be used if ENABLE_BACKEND_ACCESS_CONTROL is set to True await set_database_global_context_variables(dataset.id, dataset.owner_id) @@ -77,7 +80,7 @@ async def run_pipeline_per_dataset( return pipeline_run = run_tasks( - tasks, dataset.id, data, user, pipeline_name, context, incremental_loading + tasks, dataset.id, data, user, pipeline_name, context, incremental_loading, data_batch_size ) async for pipeline_run_info in pipeline_run: diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index 2e0055384..18eaf8011 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -24,14 +24,11 @@ from cognee.modules.pipelines.operations import ( log_pipeline_run_complete, log_pipeline_run_error, ) -from .run_tasks_with_telemetry import run_tasks_with_telemetry from .run_tasks_data_item import run_tasks_data_item from ..tasks.task import Task logger = get_logger("run_tasks(tasks: [Task], data)") -# TODO: See if this parameter should be configurable as input for run_tasks itself -DOCUMENT_BATCH_SIZE = 10 def override_run_tasks(new_gen): @@ -62,6 +59,7 @@ async def run_tasks( pipeline_name: str = "unknown_pipeline", context: dict = None, incremental_loading: bool = False, + data_batch_size: int = 20, ): if not user: user = await get_default_user() @@ -93,12 +91,12 @@ async def run_tasks( # Create and gather batches of async tasks of data items that will run the pipeline for the data item results = [] - for start in range(0, len(data), DOCUMENT_BATCH_SIZE): - document_batch = data[start : start + DOCUMENT_BATCH_SIZE] + for start in range(0, len(data), data_batch_size): + data_batch = data[start : start + data_batch_size] data_item_tasks = [ asyncio.create_task( - _run_tasks_data_item( + run_tasks_data_item( data_item, dataset, tasks, @@ -110,7 +108,7 @@ async def run_tasks( incremental_loading, ) ) - for data_item in document_batch + for data_item in data_batch ] results.extend(await asyncio.gather(*data_item_tasks)) From 2e1bfe78b1d63b2b089235d2cc7a7742a208d3f5 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Wed, 15 Oct 2025 20:26:59 +0200 Subject: [PATCH 4/4] refactor: rename variable to be more understandable --- cognee/api/v1/add/add.py | 4 ++-- cognee/api/v1/cognify/cognify.py | 4 ++-- cognee/modules/pipelines/operations/pipeline.py | 8 ++++---- cognee/modules/pipelines/operations/run_tasks.py | 6 +++--- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index b5a8a230f..0f14683f9 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -41,7 +41,7 @@ async def add( extraction_rules: Optional[Dict[str, Any]] = None, tavily_config: Optional[BaseModel] = None, soup_crawler_config: Optional[BaseModel] = None, - data_batch_size: Optional[int] = 20, + data_per_batch: Optional[int] = 20, ): """ Add data to Cognee for knowledge graph processing. @@ -236,7 +236,7 @@ async def add( vector_db_config=vector_db_config, graph_db_config=graph_db_config, incremental_loading=incremental_loading, - data_batch_size=data_batch_size, + data_per_batch=data_per_batch, ): pipeline_run_info = run_info diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index ab5e4a023..1eb266765 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -51,7 +51,7 @@ async def cognify( incremental_loading: bool = True, custom_prompt: Optional[str] = None, temporal_cognify: bool = False, - data_batch_size: int = 20, + data_per_batch: int = 20, ): """ Transform ingested data into a structured knowledge graph. @@ -229,7 +229,7 @@ async def cognify( graph_db_config=graph_db_config, incremental_loading=incremental_loading, pipeline_name="cognify_pipeline", - data_batch_size=data_batch_size, + data_per_batch=data_per_batch, ) diff --git a/cognee/modules/pipelines/operations/pipeline.py b/cognee/modules/pipelines/operations/pipeline.py index 9d61235c1..e15e9e505 100644 --- a/cognee/modules/pipelines/operations/pipeline.py +++ b/cognee/modules/pipelines/operations/pipeline.py @@ -35,7 +35,7 @@ async def run_pipeline( vector_db_config: dict = None, graph_db_config: dict = None, incremental_loading: bool = False, - data_batch_size: int = 20, + data_per_batch: int = 20, ): validate_pipeline_tasks(tasks) await setup_and_check_environment(vector_db_config, graph_db_config) @@ -51,7 +51,7 @@ async def run_pipeline( pipeline_name=pipeline_name, context={"dataset": dataset}, incremental_loading=incremental_loading, - data_batch_size=data_batch_size, + data_per_batch=data_per_batch, ): yield run_info @@ -64,7 +64,7 @@ async def run_pipeline_per_dataset( pipeline_name: str = "custom_pipeline", context: dict = None, incremental_loading=False, - data_batch_size: int = 20, + data_per_batch: int = 20, ): # Will only be used if ENABLE_BACKEND_ACCESS_CONTROL is set to True await set_database_global_context_variables(dataset.id, dataset.owner_id) @@ -80,7 +80,7 @@ async def run_pipeline_per_dataset( return pipeline_run = run_tasks( - tasks, dataset.id, data, user, pipeline_name, context, incremental_loading, data_batch_size + tasks, dataset.id, data, user, pipeline_name, context, incremental_loading, data_per_batch ) async for pipeline_run_info in pipeline_run: diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index 18eaf8011..ecc2f647b 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -59,7 +59,7 @@ async def run_tasks( pipeline_name: str = "unknown_pipeline", context: dict = None, incremental_loading: bool = False, - data_batch_size: int = 20, + data_per_batch: int = 20, ): if not user: user = await get_default_user() @@ -91,8 +91,8 @@ async def run_tasks( # Create and gather batches of async tasks of data items that will run the pipeline for the data item results = [] - for start in range(0, len(data), data_batch_size): - data_batch = data[start : start + data_batch_size] + for start in range(0, len(data), data_per_batch): + data_batch = data[start : start + data_per_batch] data_item_tasks = [ asyncio.create_task(