diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index 1292d243a..6a9f68443 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -269,13 +269,13 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's graph_model=graph_model, config=config, custom_prompt=custom_prompt, - task_config={"batch_size": 10}, + task_config={"batch_size": 30}, ), # Generate knowledge graphs from the document chunks. Task( summarize_text, - task_config={"batch_size": 10}, + task_config={"batch_size": 30}, ), - Task(add_data_points, task_config={"batch_size": 10}), + Task(add_data_points, task_config={"batch_size": 100}), ] return default_tasks diff --git a/cognee/tasks/storage/index_data_points.py b/cognee/tasks/storage/index_data_points.py index 362412657..ebc4640d6 100644 --- a/cognee/tasks/storage/index_data_points.py +++ b/cognee/tasks/storage/index_data_points.py @@ -1,6 +1,6 @@ -from cognee.shared.logging_utils import get_logger +import asyncio -from cognee.infrastructure.databases.exceptions import EmbeddingException +from cognee.shared.logging_utils import get_logger from cognee.infrastructure.databases.vector import get_vector_engine from cognee.infrastructure.engine import DataPoint @@ -33,18 +33,23 @@ async def index_data_points(data_points: list[DataPoint]): indexed_data_point.metadata["index_fields"] = [field_name] index_points[index_name].append(indexed_data_point) - for index_name_and_field, indexable_points in index_points.items(): - first_occurence = index_name_and_field.index("_") - index_name = index_name_and_field[:first_occurence] - field_name = index_name_and_field[first_occurence + 1 :] - try: - # In case the amount of indexable points is too large we need to send them in batches - batch_size = vector_engine.embedding_engine.get_batch_size() - for i in range(0, len(indexable_points), batch_size): - batch = indexable_points[i : i + batch_size] - await vector_engine.index_data_points(index_name, field_name, batch) - except EmbeddingException as e: - logger.warning(f"Failed to index data points for {index_name}.{field_name}: {e}") + tasks: list[asyncio.Task] = [] + batch_size = vector_engine.embedding_engine.get_batch_size() + + for index_name_and_field, points in index_points.items(): + first = index_name_and_field.index("_") + index_name = index_name_and_field[:first] + field_name = index_name_and_field[first + 1 :] + + # Split in the usual “range step batch_size” manner + for i in range(0, len(points), batch_size): + batch = points[i : i + batch_size] + tasks.append( + asyncio.create_task(vector_engine.index_data_points(index_name, field_name, batch)) + ) + + # Fire them all and wait until every task is done. + await asyncio.gather(*tasks) return data_points