diff --git a/alembic/versions/84e5d08260d6_replace_graph_ledger_table_with_nodes_.py b/alembic/versions/84e5d08260d6_replace_graph_ledger_table_with_nodes_.py index b709cf53b..44ef29d59 100644 --- a/alembic/versions/84e5d08260d6_replace_graph_ledger_table_with_nodes_.py +++ b/alembic/versions/84e5d08260d6_replace_graph_ledger_table_with_nodes_.py @@ -50,6 +50,7 @@ def upgrade() -> None: op.create_table( "edges", sa.Column("id", sa.UUID, primary_key=True), + sa.Column("slug", sa.UUID, nullable=False), sa.Column("user_id", sa.UUID, nullable=False), sa.Column("data_id", sa.UUID, index=True), sa.Column("dataset_id", sa.UUID, index=True), diff --git a/cognee/modules/engine/utils/generate_edge_id.py b/cognee/modules/engine/utils/generate_edge_id.py index 00645284b..445f2cbe7 100644 --- a/cognee/modules/engine/utils/generate_edge_id.py +++ b/cognee/modules/engine/utils/generate_edge_id.py @@ -1,5 +1,5 @@ -from uuid import NAMESPACE_OID, uuid5 +from uuid import NAMESPACE_OID, UUID, uuid5 -def generate_edge_id(edge_id: str) -> str: +def generate_edge_id(edge_id: str) -> UUID: return uuid5(NAMESPACE_OID, edge_id.lower().replace(" ", "_").replace("'", "")) diff --git a/cognee/modules/graph/methods/delete_data_nodes_and_edges.py b/cognee/modules/graph/methods/delete_data_nodes_and_edges.py index f937c2c63..799bd6b3f 100644 --- a/cognee/modules/graph/methods/delete_data_nodes_and_edges.py +++ b/cognee/modules/graph/methods/delete_data_nodes_and_edges.py @@ -3,7 +3,6 @@ from typing import Dict, List from cognee.infrastructure.databases.graph import get_graph_engine from cognee.infrastructure.databases.vector.get_vector_engine import get_vector_engine -from cognee.modules.engine.utils import generate_edge_id from cognee.modules.graph.methods import ( delete_data_related_edges, delete_data_related_nodes, @@ -32,14 +31,14 @@ async def delete_data_nodes_and_edges(dataset_id: UUID, data_id: UUID) -> None: vector_engine = get_vector_engine() for affected_collection, affected_nodes in affected_vector_collections.items(): await vector_engine.delete_data_points( - affected_collection, [node.id for node in affected_nodes] + affected_collection, [node.slug for node in affected_nodes] ) affected_relationships = await get_data_related_edges(dataset_id, data_id) await vector_engine.delete_data_points( "EdgeType_relationship_name", - [generate_edge_id(edge.relationship_name) for edge in affected_relationships], + [edge.slug for edge in affected_relationships], ) await delete_data_related_nodes(data_id) diff --git a/cognee/modules/graph/methods/get_data_related_edges.py b/cognee/modules/graph/methods/get_data_related_edges.py index cbbe2f296..326543b84 100644 --- a/cognee/modules/graph/methods/get_data_related_edges.py +++ b/cognee/modules/graph/methods/get_data_related_edges.py @@ -1,5 +1,6 @@ from uuid import UUID -from sqlalchemy import select +from sqlalchemy.orm import aliased +from sqlalchemy import and_, exists, select from sqlalchemy.ext.asyncio import AsyncSession from cognee.infrastructure.databases.relational import with_async_session @@ -8,10 +9,23 @@ from cognee.modules.graph.models import Edge @with_async_session async def get_data_related_edges(dataset_id: UUID, data_id: UUID, session: AsyncSession): - return ( - await session.scalars( - select(Edge) - .where(Edge.data_id == data_id, Edge.dataset_id == dataset_id) - .distinct(Edge.relationship_name) + EdgeAlias = aliased(Edge) + + subq = select(EdgeAlias.id).where( + and_( + EdgeAlias.slug == Edge.slug, + EdgeAlias.dataset_id == dataset_id, + EdgeAlias.data_id != data_id, ) - ).all() + ) + + query_statement = select(Edge).where( + and_( + Edge.data_id == data_id, + Edge.dataset_id == dataset_id, + ~exists(subq), + ) + ) + + data_related_edges = await session.scalars(query_statement) + return data_related_edges.all() diff --git a/cognee/modules/graph/methods/get_data_related_nodes.py b/cognee/modules/graph/methods/get_data_related_nodes.py index f6bff754e..df4f3f09b 100644 --- a/cognee/modules/graph/methods/get_data_related_nodes.py +++ b/cognee/modules/graph/methods/get_data_related_nodes.py @@ -1,4 +1,5 @@ from uuid import UUID +from sqlalchemy.orm import aliased from sqlalchemy import and_, exists, select from sqlalchemy.ext.asyncio import AsyncSession @@ -8,13 +9,13 @@ from cognee.modules.graph.models import Node @with_async_session async def get_data_related_nodes(dataset_id: UUID, data_id: UUID, session: AsyncSession): - NodeAlias = Node.__table__.alias("n2") + NodeAlias = aliased(Node) - subq = select(NodeAlias.c.id).where( + subq = select(NodeAlias.id).where( and_( - NodeAlias.c.slug == Node.slug, - NodeAlias.c.dataset_id == dataset_id, - NodeAlias.c.data_id != data_id, + NodeAlias.slug == Node.slug, + NodeAlias.dataset_id == dataset_id, + NodeAlias.data_id != data_id, ) ) diff --git a/cognee/modules/graph/methods/upsert_edges.py b/cognee/modules/graph/methods/upsert_edges.py index 9bfa8876e..8e0bfa728 100644 --- a/cognee/modules/graph/methods/upsert_edges.py +++ b/cognee/modules/graph/methods/upsert_edges.py @@ -3,6 +3,7 @@ from typing import Any, Dict, List, Tuple from fastapi.encoders import jsonable_encoder from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.dialects.postgresql import insert +from cognee.modules.engine.utils import generate_edge_id from cognee.infrastructure.databases.relational import with_async_session from cognee.modules.graph.models.Edge import Edge @@ -37,6 +38,7 @@ async def upsert_edges( NAMESPACE_OID, str(user_id) + str(dataset_id) + str(edge[0]) + str(edge[2]) + str(edge[1]), ), + "slug": generate_edge_id(edge[2]), "user_id": user_id, "data_id": data_id, "dataset_id": dataset_id, diff --git a/cognee/modules/graph/models/Edge.py b/cognee/modules/graph/models/Edge.py index 550838a1f..0536601dd 100644 --- a/cognee/modules/graph/models/Edge.py +++ b/cognee/modules/graph/models/Edge.py @@ -18,6 +18,8 @@ class Edge(Base): id: Mapped[UUID] = mapped_column(UUID(as_uuid=True), primary_key=True) + slug: Mapped[UUID] = mapped_column(UUID(as_uuid=True), nullable=False) + user_id: Mapped[UUID] = mapped_column(UUID(as_uuid=True), nullable=False) data_id: Mapped[UUID] = mapped_column(UUID(as_uuid=True), index=True, nullable=False) diff --git a/cognee/tasks/graph/extract_graph_from_data.py b/cognee/tasks/graph/extract_graph_from_data.py index f52e2ec2b..b152ea98f 100644 --- a/cognee/tasks/graph/extract_graph_from_data.py +++ b/cognee/tasks/graph/extract_graph_from_data.py @@ -5,6 +5,7 @@ from pydantic import BaseModel from cognee.infrastructure.databases.graph import get_graph_engine from cognee.modules.graph.methods import upsert_edges from cognee.modules.ontology.ontology_env_config import get_ontology_env_config +from cognee.tasks.storage import index_graph_edges from cognee.tasks.storage.add_data_points import add_data_points from cognee.modules.ontology.ontology_config import Config from cognee.modules.ontology.get_default_ontology_resolver import ( @@ -90,6 +91,7 @@ async def integrate_chunk_graphs( if len(graph_edges) > 0: await graph_engine.add_edges(graph_edges) + await index_graph_edges(graph_edges) await upsert_edges( graph_edges, user_id=context["user"].id, diff --git a/cognee/tests/test_delete_default_graph.py b/cognee/tests/test_delete_default_graph.py index ec0a1aa51..e629664b7 100644 --- a/cognee/tests/test_delete_default_graph.py +++ b/cognee/tests/test_delete_default_graph.py @@ -20,16 +20,13 @@ logger = get_logger() @pytest.mark.asyncio @patch.object(LLMGateway, "acreate_structured_output", new_callable=AsyncMock) async def main(mock_create_structured_output: AsyncMock): - data_directory_path = str( - pathlib.Path( - os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_delete_default_graph") - ).resolve() + data_directory_path = os.path.join( + pathlib.Path(__file__).parent, ".data_storage/test_delete_default_graph" ) cognee.config.data_root_directory(data_directory_path) - cognee_directory_path = str( - pathlib.Path( - os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_delete_default_graph") - ).resolve() + + cognee_directory_path = os.path.join( + pathlib.Path(__file__).parent, ".cognee_system/test_delete_default_graph" ) cognee.config.system_root_directory(cognee_directory_path)