fix: properly delete vector collection items

This commit is contained in:
Boris Arzentar 2025-11-03 13:32:59 +01:00
parent b0d1b92cce
commit 42eb898f3a
No known key found for this signature in database
GPG key ID: D5CC274C784807B7
9 changed files with 43 additions and 25 deletions

View file

@ -50,6 +50,7 @@ def upgrade() -> None:
op.create_table(
"edges",
sa.Column("id", sa.UUID, primary_key=True),
sa.Column("slug", sa.UUID, nullable=False),
sa.Column("user_id", sa.UUID, nullable=False),
sa.Column("data_id", sa.UUID, index=True),
sa.Column("dataset_id", sa.UUID, index=True),

View file

@ -1,5 +1,5 @@
from uuid import NAMESPACE_OID, uuid5
from uuid import NAMESPACE_OID, UUID, uuid5
def generate_edge_id(edge_id: str) -> str:
def generate_edge_id(edge_id: str) -> UUID:
return uuid5(NAMESPACE_OID, edge_id.lower().replace(" ", "_").replace("'", ""))

View file

@ -3,7 +3,6 @@ from typing import Dict, List
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.databases.vector.get_vector_engine import get_vector_engine
from cognee.modules.engine.utils import generate_edge_id
from cognee.modules.graph.methods import (
delete_data_related_edges,
delete_data_related_nodes,
@ -32,14 +31,14 @@ async def delete_data_nodes_and_edges(dataset_id: UUID, data_id: UUID) -> None:
vector_engine = get_vector_engine()
for affected_collection, affected_nodes in affected_vector_collections.items():
await vector_engine.delete_data_points(
affected_collection, [node.id for node in affected_nodes]
affected_collection, [node.slug for node in affected_nodes]
)
affected_relationships = await get_data_related_edges(dataset_id, data_id)
await vector_engine.delete_data_points(
"EdgeType_relationship_name",
[generate_edge_id(edge.relationship_name) for edge in affected_relationships],
[edge.slug for edge in affected_relationships],
)
await delete_data_related_nodes(data_id)

View file

@ -1,5 +1,6 @@
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.orm import aliased
from sqlalchemy import and_, exists, select
from sqlalchemy.ext.asyncio import AsyncSession
from cognee.infrastructure.databases.relational import with_async_session
@ -8,10 +9,23 @@ from cognee.modules.graph.models import Edge
@with_async_session
async def get_data_related_edges(dataset_id: UUID, data_id: UUID, session: AsyncSession):
return (
await session.scalars(
select(Edge)
.where(Edge.data_id == data_id, Edge.dataset_id == dataset_id)
.distinct(Edge.relationship_name)
EdgeAlias = aliased(Edge)
subq = select(EdgeAlias.id).where(
and_(
EdgeAlias.slug == Edge.slug,
EdgeAlias.dataset_id == dataset_id,
EdgeAlias.data_id != data_id,
)
).all()
)
query_statement = select(Edge).where(
and_(
Edge.data_id == data_id,
Edge.dataset_id == dataset_id,
~exists(subq),
)
)
data_related_edges = await session.scalars(query_statement)
return data_related_edges.all()

View file

@ -1,4 +1,5 @@
from uuid import UUID
from sqlalchemy.orm import aliased
from sqlalchemy import and_, exists, select
from sqlalchemy.ext.asyncio import AsyncSession
@ -8,13 +9,13 @@ from cognee.modules.graph.models import Node
@with_async_session
async def get_data_related_nodes(dataset_id: UUID, data_id: UUID, session: AsyncSession):
NodeAlias = Node.__table__.alias("n2")
NodeAlias = aliased(Node)
subq = select(NodeAlias.c.id).where(
subq = select(NodeAlias.id).where(
and_(
NodeAlias.c.slug == Node.slug,
NodeAlias.c.dataset_id == dataset_id,
NodeAlias.c.data_id != data_id,
NodeAlias.slug == Node.slug,
NodeAlias.dataset_id == dataset_id,
NodeAlias.data_id != data_id,
)
)

View file

@ -3,6 +3,7 @@ from typing import Any, Dict, List, Tuple
from fastapi.encoders import jsonable_encoder
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.dialects.postgresql import insert
from cognee.modules.engine.utils import generate_edge_id
from cognee.infrastructure.databases.relational import with_async_session
from cognee.modules.graph.models.Edge import Edge
@ -37,6 +38,7 @@ async def upsert_edges(
NAMESPACE_OID,
str(user_id) + str(dataset_id) + str(edge[0]) + str(edge[2]) + str(edge[1]),
),
"slug": generate_edge_id(edge[2]),
"user_id": user_id,
"data_id": data_id,
"dataset_id": dataset_id,

View file

@ -18,6 +18,8 @@ class Edge(Base):
id: Mapped[UUID] = mapped_column(UUID(as_uuid=True), primary_key=True)
slug: Mapped[UUID] = mapped_column(UUID(as_uuid=True), nullable=False)
user_id: Mapped[UUID] = mapped_column(UUID(as_uuid=True), nullable=False)
data_id: Mapped[UUID] = mapped_column(UUID(as_uuid=True), index=True, nullable=False)

View file

@ -5,6 +5,7 @@ from pydantic import BaseModel
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.modules.graph.methods import upsert_edges
from cognee.modules.ontology.ontology_env_config import get_ontology_env_config
from cognee.tasks.storage import index_graph_edges
from cognee.tasks.storage.add_data_points import add_data_points
from cognee.modules.ontology.ontology_config import Config
from cognee.modules.ontology.get_default_ontology_resolver import (
@ -90,6 +91,7 @@ async def integrate_chunk_graphs(
if len(graph_edges) > 0:
await graph_engine.add_edges(graph_edges)
await index_graph_edges(graph_edges)
await upsert_edges(
graph_edges,
user_id=context["user"].id,

View file

@ -20,16 +20,13 @@ logger = get_logger()
@pytest.mark.asyncio
@patch.object(LLMGateway, "acreate_structured_output", new_callable=AsyncMock)
async def main(mock_create_structured_output: AsyncMock):
data_directory_path = str(
pathlib.Path(
os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_delete_default_graph")
).resolve()
data_directory_path = os.path.join(
pathlib.Path(__file__).parent, ".data_storage/test_delete_default_graph"
)
cognee.config.data_root_directory(data_directory_path)
cognee_directory_path = str(
pathlib.Path(
os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_delete_default_graph")
).resolve()
cognee_directory_path = os.path.join(
pathlib.Path(__file__).parent, ".cognee_system/test_delete_default_graph"
)
cognee.config.system_root_directory(cognee_directory_path)