refactor: Have embedding calls run in async gather

This commit is contained in:
Igor Ilic 2025-10-10 15:36:36 +02:00
parent a8dab3019e
commit abfcbc69d6
2 changed files with 22 additions and 17 deletions

View file

@ -269,13 +269,13 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's
graph_model=graph_model, graph_model=graph_model,
config=config, config=config,
custom_prompt=custom_prompt, custom_prompt=custom_prompt,
task_config={"batch_size": 10}, task_config={"batch_size": 30},
), # Generate knowledge graphs from the document chunks. ), # Generate knowledge graphs from the document chunks.
Task( Task(
summarize_text, summarize_text,
task_config={"batch_size": 10}, task_config={"batch_size": 30},
), ),
Task(add_data_points, task_config={"batch_size": 10}), Task(add_data_points, task_config={"batch_size": 100}),
] ]
return default_tasks return default_tasks

View file

@ -1,6 +1,6 @@
from cognee.shared.logging_utils import get_logger import asyncio
from cognee.infrastructure.databases.exceptions import EmbeddingException from cognee.shared.logging_utils import get_logger
from cognee.infrastructure.databases.vector import get_vector_engine from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.infrastructure.engine import DataPoint from cognee.infrastructure.engine import DataPoint
@ -33,18 +33,23 @@ async def index_data_points(data_points: list[DataPoint]):
indexed_data_point.metadata["index_fields"] = [field_name] indexed_data_point.metadata["index_fields"] = [field_name]
index_points[index_name].append(indexed_data_point) index_points[index_name].append(indexed_data_point)
for index_name_and_field, indexable_points in index_points.items(): tasks: list[asyncio.Task] = []
first_occurence = index_name_and_field.index("_") batch_size = vector_engine.embedding_engine.get_batch_size()
index_name = index_name_and_field[:first_occurence]
field_name = index_name_and_field[first_occurence + 1 :] for index_name_and_field, points in index_points.items():
try: first = index_name_and_field.index("_")
# In case the amount of indexable points is too large we need to send them in batches index_name = index_name_and_field[:first]
batch_size = vector_engine.embedding_engine.get_batch_size() field_name = index_name_and_field[first + 1 :]
for i in range(0, len(indexable_points), batch_size):
batch = indexable_points[i : i + batch_size] # Split in the usual “range step batch_size” manner
await vector_engine.index_data_points(index_name, field_name, batch) for i in range(0, len(points), batch_size):
except EmbeddingException as e: batch = points[i : i + batch_size]
logger.warning(f"Failed to index data points for {index_name}.{field_name}: {e}") tasks.append(
asyncio.create_task(vector_engine.index_data_points(index_name, field_name, batch))
)
# Fire them all and wait until every task is done.
await asyncio.gather(*tasks)
return data_points return data_points