From 5b6e946c436c12e010f37f8f65ba795535dadf31 Mon Sep 17 00:00:00 2001 From: Igor Ilic <30923996+dexters1@users.noreply.github.com> Date: Fri, 1 Aug 2025 15:12:04 +0200 Subject: [PATCH] fix: Add async lock for dynamic vector table creation (#1175) ## Description Add async lock for dynamic table creation ## DCO Affirmation I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin. --- cognee/tasks/storage/index_data_points.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/cognee/tasks/storage/index_data_points.py b/cognee/tasks/storage/index_data_points.py index 2813d9c54..452e7f2ac 100644 --- a/cognee/tasks/storage/index_data_points.py +++ b/cognee/tasks/storage/index_data_points.py @@ -1,3 +1,5 @@ +import asyncio + from cognee.shared.logging_utils import get_logger from cognee.infrastructure.databases.exceptions.EmbeddingException import EmbeddingException @@ -6,6 +8,9 @@ from cognee.infrastructure.engine import DataPoint logger = get_logger("index_data_points") +# A single lock shared by all coroutines +vector_index_lock = asyncio.Lock() + async def index_data_points(data_points: list[DataPoint]): created_indexes = {} @@ -22,9 +27,11 @@ async def index_data_points(data_points: list[DataPoint]): index_name = f"{data_point_type.__name__}_{field_name}" - if index_name not in created_indexes: - await vector_engine.create_vector_index(data_point_type.__name__, field_name) - created_indexes[index_name] = True + # Add async lock to make sure two different coroutines won't create a table at the same time + async with vector_index_lock: + if index_name not in created_indexes: + await vector_engine.create_vector_index(data_point_type.__name__, field_name) + created_indexes[index_name] = True if index_name not in index_points: index_points[index_name] = []