fix: Add async lock for dynamic vector table creation (#1175)
<!-- .github/pull_request_template.md --> ## Description Add async lock for dynamic table creation ## DCO Affirmation I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin.
This commit is contained in:
parent
cd930ed646
commit
5b6e946c43
1 changed files with 10 additions and 3 deletions
|
|
@ -1,3 +1,5 @@
|
|||
import asyncio
|
||||
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
|
||||
from cognee.infrastructure.databases.exceptions.EmbeddingException import EmbeddingException
|
||||
|
|
@ -6,6 +8,9 @@ from cognee.infrastructure.engine import DataPoint
|
|||
|
||||
logger = get_logger("index_data_points")
|
||||
|
||||
# A single lock shared by all coroutines
|
||||
vector_index_lock = asyncio.Lock()
|
||||
|
||||
|
||||
async def index_data_points(data_points: list[DataPoint]):
|
||||
created_indexes = {}
|
||||
|
|
@ -22,9 +27,11 @@ async def index_data_points(data_points: list[DataPoint]):
|
|||
|
||||
index_name = f"{data_point_type.__name__}_{field_name}"
|
||||
|
||||
if index_name not in created_indexes:
|
||||
await vector_engine.create_vector_index(data_point_type.__name__, field_name)
|
||||
created_indexes[index_name] = True
|
||||
# Add async lock to make sure two different coroutines won't create a table at the same time
|
||||
async with vector_index_lock:
|
||||
if index_name not in created_indexes:
|
||||
await vector_engine.create_vector_index(data_point_type.__name__, field_name)
|
||||
created_indexes[index_name] = True
|
||||
|
||||
if index_name not in index_points:
|
||||
index_points[index_name] = []
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue