From c4a6c946758045d7cdf638d67e9423973d6d7eb2 Mon Sep 17 00:00:00 2001 From: Igor Ilic <30923996+dexters1@users.noreply.github.com> Date: Mon, 7 Apr 2025 18:03:36 +0200 Subject: [PATCH] fix: Resolve duplicate chunk issue for PGVector [COG-895] (#705) ## Description Resolve issues with duplicate chunks for PGVector ## DCO Affirmation I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin. --- .../vector/pgvector/PGVectorAdapter.py | 34 ++++++++++++++----- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py index 789ed2e84..4badb0a97 100644 --- a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py +++ b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py @@ -124,16 +124,32 @@ class PGVectorAdapter(SQLAlchemyAdapter, VectorDBInterface): self.payload = payload self.vector = vector - pgvector_data_points = [ - PGVectorDataPoint( - id=data_point.id, - vector=data_vectors[data_index], - payload=serialize_data(data_point.model_dump()), - ) - for (data_index, data_point) in enumerate(data_points) - ] - async with self.get_async_session() as session: + pgvector_data_points = [] + + for data_index, data_point in enumerate(data_points): + # Check to see if data should be updated or a new data item should be created + data_point_db = ( + await session.execute( + select(PGVectorDataPoint).filter(PGVectorDataPoint.id == data_point.id) + ) + ).scalar_one_or_none() + + # If data point exists update it, if not create a new one + if data_point_db: + data_point_db.id = data_point.id + data_point_db.vector = data_vectors[data_index] + data_point_db.payload = serialize_data(data_point.model_dump()) + pgvector_data_points.append(data_point_db) + else: + pgvector_data_points.append( + PGVectorDataPoint( + id=data_point.id, + vector=data_vectors[data_index], + payload=serialize_data(data_point.model_dump()), + ) + ) + session.add_all(pgvector_data_points) await session.commit()