From d9995a865cc071610494c1533149979a330f96b3 Mon Sep 17 00:00:00 2001 From: Boris Arzentar Date: Thu, 16 Oct 2025 16:31:59 +0200 Subject: [PATCH] fix: check delete permissions before deleting dataset data --- .../codingagents/coding_rule_associations.py | 18 ++++--- cognee-mcp/src/cognee_client.py | 2 +- cognee-mcp/src/server.py | 22 +++++++- cognee-starter-kit/src/pipelines/low_level.py | 11 ++-- cognee/api/v1/cognify/cognify.py | 3 -- cognee/api/v1/datasets/datasets.py | 51 ++++++++++--------- .../datasets/routers/get_datasets_router.py | 6 +-- cognee/api/v1/update/update.py | 2 +- cognee/cli/commands/delete_command.py | 4 +- cognee/modules/data/methods/__init__.py | 1 + .../data/methods/get_last_added_data.py | 22 ++++++++ .../methods/delete_data_nodes_and_edges.py | 3 ++ .../methods/delete_data_related_edges.py | 4 +- .../methods/delete_dataset_nodes_and_edges.py | 3 ++ .../modules/pipelines/operations/__init__.py | 1 + .../chunk_naive_llm_classifier.py | 37 ++++++++++---- .../codingagents/coding_rule_associations.py | 11 +++- cognee/tasks/graph/extract_graph_from_code.py | 8 +-- cognee/tasks/graph/extract_graph_from_data.py | 7 +++ cognee/tests/test_delete_custom_graph.py | 19 ++++--- cognee/tests/test_delete_default_graph.py | 11 ++-- cognee/tests/test_permissions.py | 6 +-- distributed/workers/graph_saving_worker.py | 2 + examples/python/graphiti_example.py | 5 +- 24 files changed, 176 insertions(+), 83 deletions(-) create mode 100644 cognee/modules/data/methods/get_last_added_data.py diff --git a/cognee-mcp/src/codingagents/coding_rule_associations.py b/cognee-mcp/src/codingagents/coding_rule_associations.py index c860cb9f1..f0fac6a90 100644 --- a/cognee-mcp/src/codingagents/coding_rule_associations.py +++ b/cognee-mcp/src/codingagents/coding_rule_associations.py @@ -3,12 +3,14 @@ from uuid import NAMESPACE_OID, uuid5 from cognee.infrastructure.databases.graph import get_graph_engine from cognee.infrastructure.databases.vector import get_vector_engine +from cognee.infrastructure.llm.prompts import render_prompt from cognee.low_level import DataPoint from cognee.infrastructure.llm import LLMGateway +from cognee.modules.graph.methods import upsert_edges from cognee.shared.logging_utils import get_logger from cognee.modules.engine.models import NodeSet from cognee.tasks.storage import add_data_points, index_graph_edges -from typing import Optional, List, Any +from typing import Dict, Optional, List, Any from pydantic import Field logger = get_logger("coding_rule_association") @@ -90,16 +92,14 @@ async def get_origin_edges(data: str, rules: List[Rule]) -> list[Any]: return relationships -async def add_rule_associations(data: str, rules_nodeset_name: str): +async def add_rule_associations(data: str, rules_nodeset_name: str, context: Dict): graph_engine = await get_graph_engine() existing_rules = await get_existing_rules(rules_nodeset_name=rules_nodeset_name) user_context = {"chat": data, "rules": existing_rules} - user_prompt = LLMGateway.render_prompt( - "coding_rule_association_agent_user.txt", context=user_context - ) - system_prompt = LLMGateway.render_prompt("coding_rule_association_agent_system.txt", context={}) + user_prompt = render_prompt("coding_rule_association_agent_user.txt", context=user_context) + system_prompt = render_prompt("coding_rule_association_agent_system.txt", context={}) rule_list = await LLMGateway.acreate_structured_output( text_input=user_prompt, system_prompt=system_prompt, response_model=RuleSet @@ -117,4 +117,10 @@ async def add_rule_associations(data: str, rules_nodeset_name: str): if len(edges_to_save) > 0: await graph_engine.add_edges(edges_to_save) + await upsert_edges( + edges_to_save, + user_id=context["user"].id, + dataset_id=context["dataset"].id, + data_id=context["data"].id, + ) await index_graph_edges(edges_to_save) diff --git a/cognee-mcp/src/cognee_client.py b/cognee-mcp/src/cognee_client.py index dfcfad66c..442a97c5c 100644 --- a/cognee-mcp/src/cognee_client.py +++ b/cognee-mcp/src/cognee_client.py @@ -228,7 +228,7 @@ class CogneeClient: await self.cognee.datasets.delete_data( dataset_id=dataset_id, data_id=data_id, - user_id=user.id, + user=user, ) async def prune_data(self) -> Dict[str, Any]: diff --git a/cognee-mcp/src/server.py b/cognee-mcp/src/server.py index ce6dad88a..7fca31e08 100755 --- a/cognee-mcp/src/server.py +++ b/cognee-mcp/src/server.py @@ -6,7 +6,11 @@ import asyncio import subprocess from pathlib import Path from typing import Optional - +from cognee.modules.data.methods import ( + get_datasets_by_name, + get_last_added_data, +) +from cognee.modules.users.methods import get_default_user from cognee.shared.logging_utils import get_logger, setup_logging, get_log_file_location import importlib.util from contextlib import redirect_stdout @@ -373,12 +377,26 @@ async def save_interaction(data: str) -> list: try: await cognee_client.cognify() + + user = await get_default_user() + datasets = await get_datasets_by_name("main_dataset", user_id=user.id) + dataset = datasets[0] + added_data = await get_last_added_data(dataset.id) + logger.info("Save interaction process finished.") # Rule associations only work in direct mode if not cognee_client.use_api: logger.info("Generating associated rules from interaction data.") - await add_rule_associations(data=data, rules_nodeset_name="coding_agent_rules") + await add_rule_associations( + data=data, + rules_nodeset_name="coding_agent_rules", + context={ + "user": user, + "dataset": dataset, + "data": added_data, + }, + ) logger.info("Associated rules generated from interaction data.") else: logger.warning("Rule associations are not available in API mode, skipping.") diff --git a/cognee-starter-kit/src/pipelines/low_level.py b/cognee-starter-kit/src/pipelines/low_level.py index 07bcb1687..02d55c185 100644 --- a/cognee-starter-kit/src/pipelines/low_level.py +++ b/cognee-starter-kit/src/pipelines/low_level.py @@ -14,11 +14,11 @@ from pydantic import BaseModel from cognee import config, prune, search, SearchType, visualize_graph from cognee.low_level import setup, DataPoint -from cognee.modules.data.models import Dataset +from cognee.modules.data.methods import create_authorized_dataset +from cognee.modules.pipelines.operations import run_pipeline from cognee.modules.users.models import User -from cognee.pipelines import run_tasks, Task +from cognee.pipelines import Task from cognee.tasks.storage import add_data_points -from cognee.tasks.storage.index_graph_edges import index_graph_edges from cognee.modules.users.methods import get_default_user @@ -224,12 +224,13 @@ async def execute_pipeline() -> None: # Get user and dataset user: User = await get_default_user() # type: ignore - dataset = Dataset(id=uuid4(), name="demo_dataset") + dataset = await create_authorized_dataset("demo_dataset", user) data = load_default_payload() # Build and run pipeline tasks = [Task(ingest_payloads), Task(add_data_points)] - pipeline = run_tasks(tasks, dataset, [data], user, "demo_pipeline") + pipeline = run_pipeline(tasks, [data], [dataset.id], user, "demo_pipeline") + async for status in pipeline: logging.info("Pipeline status: %s", status) diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index 26fcd929c..ac33f7ffe 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -209,9 +209,6 @@ async def cognify( "ontology_config": {"ontology_resolver": get_default_ontology_resolver()} } - if user is None: - user = await get_default_user() - if temporal_cognify: tasks = await get_temporal_tasks(user, chunker, chunk_size) else: diff --git a/cognee/api/v1/datasets/datasets.py b/cognee/api/v1/datasets/datasets.py index b12c45620..14979df87 100644 --- a/cognee/api/v1/datasets/datasets.py +++ b/cognee/api/v1/datasets/datasets.py @@ -1,32 +1,35 @@ from typing import Optional from uuid import UUID from cognee.modules.data.exceptions.exceptions import UnauthorizedDataAccessError -from cognee.modules.data.methods import get_datasets +from cognee.modules.data.methods import get_authorized_dataset, get_authorized_existing_datasets from cognee.modules.graph.methods import delete_data_nodes_and_edges, delete_dataset_nodes_and_edges +from cognee.modules.users.exceptions import PermissionDeniedError from cognee.modules.users.methods import get_default_user from cognee.modules.ingestion import discover_directory_datasets from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status +from cognee.modules.users.models import User class datasets: @staticmethod - async def list_datasets(): - from cognee.modules.data.methods import get_datasets + async def list_datasets(user: Optional[User] = None): + if user == None: + user = await get_default_user() - user = await get_default_user() - return await get_datasets(user.id) + return await get_authorized_existing_datasets([], "read", user) @staticmethod def discover_datasets(directory_path: str): return list(discover_directory_datasets(directory_path).keys()) @staticmethod - async def list_data(dataset_id: str): - from cognee.modules.data.methods import get_dataset, get_dataset_data + async def list_data(dataset_id: UUID, user: Optional[User] = None): + from cognee.modules.data.methods import get_dataset_data - user = await get_default_user() + if not user: + user = await get_default_user() - dataset = await get_dataset(user.id, dataset_id) + dataset = await get_authorized_dataset(user, dataset_id) return await get_dataset_data(dataset.id) @@ -35,14 +38,13 @@ class datasets: return await get_pipeline_status(dataset_ids, pipeline_name="cognify_pipeline") @staticmethod - async def delete_dataset(dataset_id: UUID, user_id: Optional[UUID] = None): - from cognee.modules.data.methods import get_dataset, delete_dataset + async def delete_dataset(dataset_id: UUID, user: Optional[User] = None): + from cognee.modules.data.methods import delete_dataset - if not user_id: + if not user: user = await get_default_user() - user_id = user.id - dataset = await get_dataset(user.id, dataset_id) + dataset = await get_authorized_dataset(user, dataset_id, "delete") if not dataset: raise UnauthorizedDataAccessError(f"Dataset {dataset_id} not accessible.") @@ -52,14 +54,18 @@ class datasets: return await delete_dataset(dataset) @staticmethod - async def delete_data(dataset_id: UUID, data_id: UUID, user_id: Optional[UUID] = None): + async def delete_data(dataset_id: UUID, data_id: UUID, user: Optional[User] = None): from cognee.modules.data.methods import delete_data, get_data - if not user_id: + if not user: user = await get_default_user() - user_id = user.id - data = await get_data(user_id, data_id) + try: + await get_authorized_dataset(user, dataset_id, "delete") + except PermissionDeniedError: + raise UnauthorizedDataAccessError(f"Dataset {dataset_id} not accessible.") + + data = await get_data(user.id, data_id) if not data: # If data is not found in the system, user is using a custom graph model. @@ -76,12 +82,11 @@ class datasets: await delete_data(data) @staticmethod - async def delete_all(user_id: Optional[UUID] = None): - if not user_id: + async def delete_all(user: Optional[User] = None): + if not user: user = await get_default_user() - user_id = user.id - user_datasets = await get_datasets(user_id) + user_datasets = await get_authorized_existing_datasets([], "read", user) for dataset in user_datasets: - await datasets.delete_dataset(dataset.id, user_id) + await datasets.delete_dataset(dataset.id, user) diff --git a/cognee/api/v1/datasets/routers/get_datasets_router.py b/cognee/api/v1/datasets/routers/get_datasets_router.py index 596c81d3f..572be81f9 100644 --- a/cognee/api/v1/datasets/routers/get_datasets_router.py +++ b/cognee/api/v1/datasets/routers/get_datasets_router.py @@ -188,7 +188,7 @@ def get_datasets_router() -> APIRouter: No content returned on successful deletion. If no datasets exist for the users, nothing happens. """ - await datasets.delete_all(user.id) + await datasets.delete_all(user) @router.delete( "/{dataset_id}", response_model=None, responses={404: {"model": ErrorResponseDTO}} @@ -219,7 +219,7 @@ def get_datasets_router() -> APIRouter: }, ) - await datasets.delete_dataset(dataset_id, user.id) + await datasets.delete_dataset(dataset_id, user) @router.delete( "/{dataset_id}/data/{data_id}", @@ -257,7 +257,7 @@ def get_datasets_router() -> APIRouter: }, ) - await datasets.delete_data(dataset_id, data_id, user.id) + await datasets.delete_data(dataset_id, data_id, user) @router.get("/{dataset_id}/graph", response_model=GraphDTO) async def get_dataset_graph(dataset_id: UUID, user: User = Depends(get_authenticated_user)): diff --git a/cognee/api/v1/update/update.py b/cognee/api/v1/update/update.py index f890b27ca..f1ae14218 100644 --- a/cognee/api/v1/update/update.py +++ b/cognee/api/v1/update/update.py @@ -79,7 +79,7 @@ async def update( await datasets.delete_data( dataset_id=dataset_id, data_id=data_id, - user_id=user.id, + user=user, ) await add( diff --git a/cognee/cli/commands/delete_command.py b/cognee/cli/commands/delete_command.py index 1b6b653bf..37fd5736d 100644 --- a/cognee/cli/commands/delete_command.py +++ b/cognee/cli/commands/delete_command.py @@ -127,9 +127,7 @@ Be careful with deletion operations as they are irreversible. "No user ID provided for deletion. Please specify using --user-id param." ) - await cognee_datasets.delete_dataset( - dataset_id=dataset_id, user_id=args.user_id - ) + await cognee_datasets.delete_dataset(dataset_id=dataset_id, user=args.user) elif hasattr(args, "dataset_id") and hasattr(args, "data_id"): await cognee_datasets.delete_data(args.dataset_id, args.data_id) except Exception as e: diff --git a/cognee/modules/data/methods/__init__.py b/cognee/modules/data/methods/__init__.py index d9716de95..487fb398f 100644 --- a/cognee/modules/data/methods/__init__.py +++ b/cognee/modules/data/methods/__init__.py @@ -9,6 +9,7 @@ from .get_dataset_data import get_dataset_data from .get_authorized_dataset import get_authorized_dataset from .get_authorized_dataset_by_name import get_authorized_dataset_by_name from .get_data import get_data +from .get_last_added_data import get_last_added_data from .get_unique_dataset_id import get_unique_dataset_id from .get_authorized_existing_datasets import get_authorized_existing_datasets from .get_dataset_ids import get_dataset_ids diff --git a/cognee/modules/data/methods/get_last_added_data.py b/cognee/modules/data/methods/get_last_added_data.py new file mode 100644 index 000000000..23d8c76cb --- /dev/null +++ b/cognee/modules/data/methods/get_last_added_data.py @@ -0,0 +1,22 @@ +from uuid import UUID +from typing import Optional +from sqlalchemy import select +from cognee.modules.data.models import Data, Dataset +from cognee.infrastructure.databases.relational import get_relational_engine + + +async def get_last_added_data(dataset_id: UUID) -> Optional[Data]: + db_engine = get_relational_engine() + + async with db_engine.get_async_session() as session: + result = await session.execute( + select(Data) + .join(Data.datasets) + .where((Dataset.id == dataset_id)) + .order_by(Data.created_at.desc()) + .limit(1) + ) + + data = result.scalar_one_or_none() + + return data diff --git a/cognee/modules/graph/methods/delete_data_nodes_and_edges.py b/cognee/modules/graph/methods/delete_data_nodes_and_edges.py index dd9619cbe..f937c2c63 100644 --- a/cognee/modules/graph/methods/delete_data_nodes_and_edges.py +++ b/cognee/modules/graph/methods/delete_data_nodes_and_edges.py @@ -15,6 +15,9 @@ from cognee.modules.graph.methods import ( async def delete_data_nodes_and_edges(dataset_id: UUID, data_id: UUID) -> None: affected_nodes = await get_data_related_nodes(dataset_id, data_id) + if len(affected_nodes) == 0: + return + graph_engine = await get_graph_engine() await graph_engine.delete_nodes([str(node.slug) for node in affected_nodes]) diff --git a/cognee/modules/graph/methods/delete_data_related_edges.py b/cognee/modules/graph/methods/delete_data_related_edges.py index c8d6bc563..7e037ab7d 100644 --- a/cognee/modules/graph/methods/delete_data_related_edges.py +++ b/cognee/modules/graph/methods/delete_data_related_edges.py @@ -8,6 +8,6 @@ from cognee.modules.graph.models import Edge @with_async_session async def delete_data_related_edges(data_id: UUID, session: AsyncSession): - nodes = (await session.scalars(select(Edge).where(Edge.data_id == data_id))).all() + edges = (await session.scalars(select(Edge).where(Edge.data_id == data_id))).all() - await session.execute(delete(Edge).where(Edge.id.in_([node.id for node in nodes]))) + await session.execute(delete(Edge).where(Edge.id.in_([edge.id for edge in edges]))) diff --git a/cognee/modules/graph/methods/delete_dataset_nodes_and_edges.py b/cognee/modules/graph/methods/delete_dataset_nodes_and_edges.py index 90617e956..b11db3d56 100644 --- a/cognee/modules/graph/methods/delete_dataset_nodes_and_edges.py +++ b/cognee/modules/graph/methods/delete_dataset_nodes_and_edges.py @@ -15,6 +15,9 @@ from cognee.modules.graph.methods import ( async def delete_dataset_nodes_and_edges(dataset_id: UUID) -> None: affected_nodes = await get_dataset_related_nodes(dataset_id) + if len(affected_nodes) == 0: + return + graph_engine = await get_graph_engine() await graph_engine.delete_nodes([str(node.slug) for node in affected_nodes]) diff --git a/cognee/modules/pipelines/operations/__init__.py b/cognee/modules/pipelines/operations/__init__.py index 68d7582c3..360d2a90e 100644 --- a/cognee/modules/pipelines/operations/__init__.py +++ b/cognee/modules/pipelines/operations/__init__.py @@ -2,3 +2,4 @@ from .log_pipeline_run_initiated import log_pipeline_run_initiated from .log_pipeline_run_start import log_pipeline_run_start from .log_pipeline_run_complete import log_pipeline_run_complete from .log_pipeline_run_error import log_pipeline_run_error +from .pipeline import run_pipeline diff --git a/cognee/tasks/chunk_naive_llm_classifier/chunk_naive_llm_classifier.py b/cognee/tasks/chunk_naive_llm_classifier/chunk_naive_llm_classifier.py index f2c9f11ef..722b8de36 100644 --- a/cognee/tasks/chunk_naive_llm_classifier/chunk_naive_llm_classifier.py +++ b/cognee/tasks/chunk_naive_llm_classifier/chunk_naive_llm_classifier.py @@ -2,17 +2,20 @@ import asyncio from uuid import uuid5, NAMESPACE_OID -from typing import Type +from typing import Dict, Type from pydantic import BaseModel from cognee.infrastructure.databases.graph import get_graph_engine from cognee.infrastructure.databases.vector import get_vector_engine -from cognee.infrastructure.engine.models import DataPoint +from cognee.infrastructure.engine.models.DataPoint import DataPoint from cognee.infrastructure.llm.extraction import extract_categories from cognee.modules.chunking.models.DocumentChunk import DocumentChunk +from cognee.modules.graph.methods import upsert_edges, upsert_nodes async def chunk_naive_llm_classifier( - data_chunks: list[DocumentChunk], classification_model: Type[BaseModel] + data_chunks: list[DocumentChunk], + classification_model: Type[BaseModel], + context: Dict, ) -> list[DocumentChunk]: """ Classifies a list of document chunks using a specified classification model and updates vector and graph databases with the classification results. @@ -97,8 +100,8 @@ async def chunk_naive_llm_classifier( { "uuid": str(classification_type_id), "text": classification_type_label, - "chunk_id": str(data_chunk.chunk_id), - "document_id": str(data_chunk.document_id), + "chunk_id": str(data_chunk.id), + "document_id": str(data_chunk.is_part_of.id), } ), index_fields=["text"], @@ -119,12 +122,12 @@ async def chunk_naive_llm_classifier( edges.append( ( - str(data_chunk.chunk_id), + str(data_chunk.id), str(classification_type_id), "is_media_type", dict( relationship_name="is_media_type", - source_node_id=str(data_chunk.chunk_id), + source_node_id=str(data_chunk.id), target_node_id=str(classification_type_id), ), ) @@ -142,8 +145,8 @@ async def chunk_naive_llm_classifier( { "uuid": str(classification_subtype_id), "text": classification_subtype_label, - "chunk_id": str(data_chunk.chunk_id), - "document_id": str(data_chunk.document_id), + "chunk_id": str(data_chunk.id), + "document_id": str(data_chunk.is_part_of.id), } ), index_fields=["text"], @@ -177,12 +180,12 @@ async def chunk_naive_llm_classifier( edges.append( ( - str(data_chunk.chunk_id), + str(data_chunk.id), str(classification_subtype_id), "is_classified_as", dict( relationship_name="is_classified_as", - source_node_id=str(data_chunk.chunk_id), + source_node_id=str(data_chunk.id), target_node_id=str(classification_subtype_id), ), ) @@ -194,6 +197,18 @@ async def chunk_naive_llm_classifier( graph_engine = await get_graph_engine() await graph_engine.add_nodes(nodes) + await upsert_nodes( + nodes, + user_id=context["user"].id, + dataset_id=context["dataset"].id, + data_id=context["data"].id, + ) await graph_engine.add_edges(edges) + await upsert_edges( + edges, + user_id=context["user"].id, + dataset_id=context["dataset"].id, + data_id=context["data"].id, + ) return data_chunks diff --git a/cognee/tasks/codingagents/coding_rule_associations.py b/cognee/tasks/codingagents/coding_rule_associations.py index d8d29ab91..edb5503ca 100644 --- a/cognee/tasks/codingagents/coding_rule_associations.py +++ b/cognee/tasks/codingagents/coding_rule_associations.py @@ -8,8 +8,9 @@ from cognee.infrastructure.llm.prompts import render_prompt from cognee.infrastructure.llm import LLMGateway from cognee.shared.logging_utils import get_logger from cognee.modules.engine.models import NodeSet +from cognee.modules.graph.methods import upsert_edges from cognee.tasks.storage import add_data_points, index_graph_edges -from typing import Optional, List, Any +from typing import Dict, Optional, List, Any from pydantic import Field logger = get_logger("coding_rule_association") @@ -94,6 +95,7 @@ async def add_rule_associations( rules_nodeset_name: str, user_prompt_location: str = "coding_rule_association_agent_user.txt", system_prompt_location: str = "coding_rule_association_agent_system.txt", + context: Optional[Dict] = None, ): if isinstance(data, list): # If data is a list of strings join all strings in list @@ -124,4 +126,11 @@ async def add_rule_associations( if len(edges_to_save) > 0: await graph_engine.add_edges(edges_to_save) + if context: + await upsert_edges( + edges_to_save, + user_id=context["user"].id, + dataset_id=context["dataset"].id, + data_id=context["data"].id, + ) await index_graph_edges(edges_to_save) diff --git a/cognee/tasks/graph/extract_graph_from_code.py b/cognee/tasks/graph/extract_graph_from_code.py index 555cb59a3..46064ace9 100644 --- a/cognee/tasks/graph/extract_graph_from_code.py +++ b/cognee/tasks/graph/extract_graph_from_code.py @@ -1,5 +1,5 @@ import asyncio -from typing import Type, List +from typing import Dict, Type, List from pydantic import BaseModel from cognee.infrastructure.llm.extraction import extract_content_graph @@ -8,7 +8,9 @@ from cognee.tasks.storage import add_data_points async def extract_graph_from_code( - data_chunks: list[DocumentChunk], graph_model: Type[BaseModel] + data_chunks: list[DocumentChunk], + graph_model: Type[BaseModel], + context: Dict, ) -> List[DocumentChunk]: """ Extracts a knowledge graph from the text content of document chunks using a specified graph model. @@ -23,6 +25,6 @@ async def extract_graph_from_code( for chunk_index, chunk in enumerate(data_chunks): chunk_graph = chunk_graphs[chunk_index] - await add_data_points(chunk_graph.nodes) + await add_data_points(chunk_graph.nodes, context) return data_chunks diff --git a/cognee/tasks/graph/extract_graph_from_data.py b/cognee/tasks/graph/extract_graph_from_data.py index 4aec04526..f52e2ec2b 100644 --- a/cognee/tasks/graph/extract_graph_from_data.py +++ b/cognee/tasks/graph/extract_graph_from_data.py @@ -3,6 +3,7 @@ from typing import Dict, Type, List, Optional from pydantic import BaseModel from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.modules.graph.methods import upsert_edges from cognee.modules.ontology.ontology_env_config import get_ontology_env_config from cognee.tasks.storage.add_data_points import add_data_points from cognee.modules.ontology.ontology_config import Config @@ -89,6 +90,12 @@ async def integrate_chunk_graphs( if len(graph_edges) > 0: await graph_engine.add_edges(graph_edges) + await upsert_edges( + graph_edges, + user_id=context["user"].id, + dataset_id=context["dataset"].id, + data_id=context["data"].id, + ) return data_chunks diff --git a/cognee/tests/test_delete_custom_graph.py b/cognee/tests/test_delete_custom_graph.py index 9bbb78c76..42d7d0d63 100644 --- a/cognee/tests/test_delete_custom_graph.py +++ b/cognee/tests/test_delete_custom_graph.py @@ -1,14 +1,14 @@ import os import pathlib from typing import List -from uuid import uuid4 +from uuid import UUID, uuid4 +from pydantic import BaseModel import cognee from cognee.api.v1.datasets import datasets from cognee.infrastructure.engine import DataPoint -from cognee.modules.data.models import Data, Dataset +from cognee.modules.data.methods import create_authorized_dataset from cognee.modules.engine.operations.setup import setup -from cognee.modules.graph.methods import delete_data_nodes_and_edges from cognee.modules.users.models import User from cognee.modules.users.methods import get_default_user from cognee.shared.logging_utils import get_logger @@ -60,9 +60,12 @@ async def main(): user: User = await get_default_user() # type: ignore - dataset = Dataset(id=uuid4()) - data1 = Data(id=uuid4()) - data2 = Data(id=uuid4()) + class CustomData(BaseModel): + id: UUID + + dataset = await create_authorized_dataset(dataset_name="test_dataset", user=user) + data1 = CustomData(id=uuid4()) + data2 = CustomData(id=uuid4()) await add_data_points( [person1], @@ -91,12 +94,12 @@ async def main(): "Nodes and edges are not correctly added to the graph." ) - await datasets.delete_data(dataset.id, data1.id, user.id) # type: ignore + await datasets.delete_data(dataset.id, data1.id, user) nodes, edges = await graph_engine.get_graph_data() assert len(nodes) == 2 and len(edges) == 1, "Nodes and edges are not deleted properly." - await datasets.delete_data(dataset.id, data2.id, user.id) # type: ignore + await datasets.delete_data(dataset.id, data2.id, user) nodes, edges = await graph_engine.get_graph_data() assert len(nodes) == 0 and len(edges) == 0, "Nodes and edges are not deleted." diff --git a/cognee/tests/test_delete_default_graph.py b/cognee/tests/test_delete_default_graph.py index ca0152970..0a7d54e40 100644 --- a/cognee/tests/test_delete_default_graph.py +++ b/cognee/tests/test_delete_default_graph.py @@ -6,11 +6,8 @@ from cognee.api.v1.datasets import datasets from cognee.api.v1.visualize.visualize import visualize_graph from cognee.infrastructure.databases.vector import get_vector_engine from cognee.infrastructure.databases.graph import get_graph_engine -from cognee.modules.data.methods import delete_data, get_dataset_data +from cognee.modules.data.methods import get_dataset_data from cognee.modules.engine.operations.setup import setup -from cognee.modules.graph.methods import ( - delete_data_nodes_and_edges, -) from cognee.modules.users.methods import get_default_user from cognee.shared.logging_utils import get_logger @@ -61,7 +58,7 @@ async def main(): assert len(nodes) >= 12 and len(edges) >= 18, "Nodes and edges are not deleted." user = await get_default_user() - await datasets.delete_data(dataset_id, added_data.id, user.id) # type: ignore + await datasets.delete_data(dataset_id, added_data.id, user) # type: ignore file_path = os.path.join( pathlib.Path(__file__).parent, ".artifacts", "graph_visualization_after_delete.html" @@ -69,7 +66,9 @@ async def main(): await visualize_graph(file_path) nodes, edges = await graph_engine.get_graph_data() - assert len(nodes) >= 8 and len(edges) >= 10, "Nodes and edges are not deleted." + assert len(nodes) >= 8 and len(nodes) < 12 and len(edges) >= 10 and len(edges) < 18, ( + "Nodes and edges are not deleted." + ) if __name__ == "__main__": diff --git a/cognee/tests/test_permissions.py b/cognee/tests/test_permissions.py index 16dbdeac0..ae4092178 100644 --- a/cognee/tests/test_permissions.py +++ b/cognee/tests/test_permissions.py @@ -192,7 +192,7 @@ async def main(): await cognee.datasets.delete_data( dataset_id=test_user_dataset_id, data_id=text_data_id, - user_id=default_user.id, + user=default_user, ) except UnauthorizedDataAccessError: delete_error = True @@ -207,7 +207,7 @@ async def main(): text_data_id = test_user_dataset_data[0].id await cognee.datasets.delete_data( - dataset_id=test_user_dataset_id, data_id=text_data_id, user_id=test_user.id + dataset_id=test_user_dataset_id, data_id=text_data_id, user=test_user ) # Actually give permission to default_user to delete data for test_users dataset @@ -226,7 +226,7 @@ async def main(): await cognee.datasets.delete_data( dataset_id=test_user_dataset_id, data_id=explanation_file_data_id, - user_id=default_user.id, + user=default_user, ) diff --git a/distributed/workers/graph_saving_worker.py b/distributed/workers/graph_saving_worker.py index 67c034225..b982ebe2f 100644 --- a/distributed/workers/graph_saving_worker.py +++ b/distributed/workers/graph_saving_worker.py @@ -95,6 +95,7 @@ async def graph_saving_worker(): async def save_graph_nodes(new_nodes): try: await graph_engine.add_nodes(new_nodes, distributed=False) + # await upsert_nodes(new_nodes, user_id=user.id, dataset_id=dataset.id, data_id=data.id) except Exception as error: if is_deadlock_error(error): raise GraphDatabaseDeadlockError() @@ -107,6 +108,7 @@ async def graph_saving_worker(): async def save_graph_edges(new_edges): try: await graph_engine.add_edges(new_edges, distributed=False) + # await upsert_edges(new_edges, user_id=user.id, dataset_id=dataset.id, data_id=data.id) except Exception as error: if is_deadlock_error(error): raise GraphDatabaseDeadlockError() diff --git a/examples/python/graphiti_example.py b/examples/python/graphiti_example.py index ecc616b9b..91aac8286 100644 --- a/examples/python/graphiti_example.py +++ b/examples/python/graphiti_example.py @@ -2,8 +2,9 @@ import asyncio import cognee from cognee.modules.data.methods import get_dataset_data, get_datasets +from cognee.modules.pipelines.operations import run_pipeline from cognee.shared.logging_utils import setup_logging, ERROR -from cognee.modules.pipelines import Task, run_tasks +from cognee.modules.pipelines import Task from cognee.tasks.temporal_awareness import build_graph_with_temporal_awareness from cognee.infrastructure.databases.relational import ( create_db_and_tables as create_relational_db_and_tables, @@ -42,7 +43,7 @@ async def main(): datasets = await get_datasets(user.id) dataset_data = await get_dataset_data(datasets[0].id) # type: ignore - pipeline = run_tasks(tasks, dataset=datasets[0], data=dataset_data, user=user) + pipeline = run_pipeline(tasks, data=dataset_data, datasets=[datasets[0].id], user=user) async for result in pipeline: print(result)