diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index b5a8a230f..0f14683f9 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -41,7 +41,7 @@ async def add( extraction_rules: Optional[Dict[str, Any]] = None, tavily_config: Optional[BaseModel] = None, soup_crawler_config: Optional[BaseModel] = None, - data_batch_size: Optional[int] = 20, + data_per_batch: Optional[int] = 20, ): """ Add data to Cognee for knowledge graph processing. @@ -236,7 +236,7 @@ async def add( vector_db_config=vector_db_config, graph_db_config=graph_db_config, incremental_loading=incremental_loading, - data_batch_size=data_batch_size, + data_per_batch=data_per_batch, ): pipeline_run_info = run_info diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index ab5e4a023..1eb266765 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -51,7 +51,7 @@ async def cognify( incremental_loading: bool = True, custom_prompt: Optional[str] = None, temporal_cognify: bool = False, - data_batch_size: int = 20, + data_per_batch: int = 20, ): """ Transform ingested data into a structured knowledge graph. @@ -229,7 +229,7 @@ async def cognify( graph_db_config=graph_db_config, incremental_loading=incremental_loading, pipeline_name="cognify_pipeline", - data_batch_size=data_batch_size, + data_per_batch=data_per_batch, ) diff --git a/cognee/modules/pipelines/operations/pipeline.py b/cognee/modules/pipelines/operations/pipeline.py index 9d61235c1..e15e9e505 100644 --- a/cognee/modules/pipelines/operations/pipeline.py +++ b/cognee/modules/pipelines/operations/pipeline.py @@ -35,7 +35,7 @@ async def run_pipeline( vector_db_config: dict = None, graph_db_config: dict = None, incremental_loading: bool = False, - data_batch_size: int = 20, + data_per_batch: int = 20, ): validate_pipeline_tasks(tasks) await setup_and_check_environment(vector_db_config, graph_db_config) @@ -51,7 +51,7 @@ async def run_pipeline( pipeline_name=pipeline_name, context={"dataset": dataset}, incremental_loading=incremental_loading, - data_batch_size=data_batch_size, + data_per_batch=data_per_batch, ): yield run_info @@ -64,7 +64,7 @@ async def run_pipeline_per_dataset( pipeline_name: str = "custom_pipeline", context: dict = None, incremental_loading=False, - data_batch_size: int = 20, + data_per_batch: int = 20, ): # Will only be used if ENABLE_BACKEND_ACCESS_CONTROL is set to True await set_database_global_context_variables(dataset.id, dataset.owner_id) @@ -80,7 +80,7 @@ async def run_pipeline_per_dataset( return pipeline_run = run_tasks( - tasks, dataset.id, data, user, pipeline_name, context, incremental_loading, data_batch_size + tasks, dataset.id, data, user, pipeline_name, context, incremental_loading, data_per_batch ) async for pipeline_run_info in pipeline_run: diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index 18eaf8011..ecc2f647b 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -59,7 +59,7 @@ async def run_tasks( pipeline_name: str = "unknown_pipeline", context: dict = None, incremental_loading: bool = False, - data_batch_size: int = 20, + data_per_batch: int = 20, ): if not user: user = await get_default_user() @@ -91,8 +91,8 @@ async def run_tasks( # Create and gather batches of async tasks of data items that will run the pipeline for the data item results = [] - for start in range(0, len(data), data_batch_size): - data_batch = data[start : start + data_batch_size] + for start in range(0, len(data), data_per_batch): + data_batch = data[start : start + data_per_batch] data_item_tasks = [ asyncio.create_task(