fix: check delete permissions before deleting dataset data

This commit is contained in:
Boris Arzentar 2025-10-16 16:31:59 +02:00
parent 48991912a9
commit d9995a865c
No known key found for this signature in database
GPG key ID: D5CC274C784807B7
24 changed files with 176 additions and 83 deletions

View file

@ -3,12 +3,14 @@ from uuid import NAMESPACE_OID, uuid5
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.infrastructure.llm.prompts import render_prompt
from cognee.low_level import DataPoint
from cognee.infrastructure.llm import LLMGateway
from cognee.modules.graph.methods import upsert_edges
from cognee.shared.logging_utils import get_logger
from cognee.modules.engine.models import NodeSet
from cognee.tasks.storage import add_data_points, index_graph_edges
from typing import Optional, List, Any
from typing import Dict, Optional, List, Any
from pydantic import Field
logger = get_logger("coding_rule_association")
@ -90,16 +92,14 @@ async def get_origin_edges(data: str, rules: List[Rule]) -> list[Any]:
return relationships
async def add_rule_associations(data: str, rules_nodeset_name: str):
async def add_rule_associations(data: str, rules_nodeset_name: str, context: Dict):
graph_engine = await get_graph_engine()
existing_rules = await get_existing_rules(rules_nodeset_name=rules_nodeset_name)
user_context = {"chat": data, "rules": existing_rules}
user_prompt = LLMGateway.render_prompt(
"coding_rule_association_agent_user.txt", context=user_context
)
system_prompt = LLMGateway.render_prompt("coding_rule_association_agent_system.txt", context={})
user_prompt = render_prompt("coding_rule_association_agent_user.txt", context=user_context)
system_prompt = render_prompt("coding_rule_association_agent_system.txt", context={})
rule_list = await LLMGateway.acreate_structured_output(
text_input=user_prompt, system_prompt=system_prompt, response_model=RuleSet
@ -117,4 +117,10 @@ async def add_rule_associations(data: str, rules_nodeset_name: str):
if len(edges_to_save) > 0:
await graph_engine.add_edges(edges_to_save)
await upsert_edges(
edges_to_save,
user_id=context["user"].id,
dataset_id=context["dataset"].id,
data_id=context["data"].id,
)
await index_graph_edges(edges_to_save)

View file

@ -228,7 +228,7 @@ class CogneeClient:
await self.cognee.datasets.delete_data(
dataset_id=dataset_id,
data_id=data_id,
user_id=user.id,
user=user,
)
async def prune_data(self) -> Dict[str, Any]:

View file

@ -6,7 +6,11 @@ import asyncio
import subprocess
from pathlib import Path
from typing import Optional
from cognee.modules.data.methods import (
get_datasets_by_name,
get_last_added_data,
)
from cognee.modules.users.methods import get_default_user
from cognee.shared.logging_utils import get_logger, setup_logging, get_log_file_location
import importlib.util
from contextlib import redirect_stdout
@ -373,12 +377,26 @@ async def save_interaction(data: str) -> list:
try:
await cognee_client.cognify()
user = await get_default_user()
datasets = await get_datasets_by_name("main_dataset", user_id=user.id)
dataset = datasets[0]
added_data = await get_last_added_data(dataset.id)
logger.info("Save interaction process finished.")
# Rule associations only work in direct mode
if not cognee_client.use_api:
logger.info("Generating associated rules from interaction data.")
await add_rule_associations(data=data, rules_nodeset_name="coding_agent_rules")
await add_rule_associations(
data=data,
rules_nodeset_name="coding_agent_rules",
context={
"user": user,
"dataset": dataset,
"data": added_data,
},
)
logger.info("Associated rules generated from interaction data.")
else:
logger.warning("Rule associations are not available in API mode, skipping.")

View file

@ -14,11 +14,11 @@ from pydantic import BaseModel
from cognee import config, prune, search, SearchType, visualize_graph
from cognee.low_level import setup, DataPoint
from cognee.modules.data.models import Dataset
from cognee.modules.data.methods import create_authorized_dataset
from cognee.modules.pipelines.operations import run_pipeline
from cognee.modules.users.models import User
from cognee.pipelines import run_tasks, Task
from cognee.pipelines import Task
from cognee.tasks.storage import add_data_points
from cognee.tasks.storage.index_graph_edges import index_graph_edges
from cognee.modules.users.methods import get_default_user
@ -224,12 +224,13 @@ async def execute_pipeline() -> None:
# Get user and dataset
user: User = await get_default_user() # type: ignore
dataset = Dataset(id=uuid4(), name="demo_dataset")
dataset = await create_authorized_dataset("demo_dataset", user)
data = load_default_payload()
# Build and run pipeline
tasks = [Task(ingest_payloads), Task(add_data_points)]
pipeline = run_tasks(tasks, dataset, [data], user, "demo_pipeline")
pipeline = run_pipeline(tasks, [data], [dataset.id], user, "demo_pipeline")
async for status in pipeline:
logging.info("Pipeline status: %s", status)

View file

@ -209,9 +209,6 @@ async def cognify(
"ontology_config": {"ontology_resolver": get_default_ontology_resolver()}
}
if user is None:
user = await get_default_user()
if temporal_cognify:
tasks = await get_temporal_tasks(user, chunker, chunk_size)
else:

View file

@ -1,32 +1,35 @@
from typing import Optional
from uuid import UUID
from cognee.modules.data.exceptions.exceptions import UnauthorizedDataAccessError
from cognee.modules.data.methods import get_datasets
from cognee.modules.data.methods import get_authorized_dataset, get_authorized_existing_datasets
from cognee.modules.graph.methods import delete_data_nodes_and_edges, delete_dataset_nodes_and_edges
from cognee.modules.users.exceptions import PermissionDeniedError
from cognee.modules.users.methods import get_default_user
from cognee.modules.ingestion import discover_directory_datasets
from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status
from cognee.modules.users.models import User
class datasets:
@staticmethod
async def list_datasets():
from cognee.modules.data.methods import get_datasets
async def list_datasets(user: Optional[User] = None):
if user == None:
user = await get_default_user()
user = await get_default_user()
return await get_datasets(user.id)
return await get_authorized_existing_datasets([], "read", user)
@staticmethod
def discover_datasets(directory_path: str):
return list(discover_directory_datasets(directory_path).keys())
@staticmethod
async def list_data(dataset_id: str):
from cognee.modules.data.methods import get_dataset, get_dataset_data
async def list_data(dataset_id: UUID, user: Optional[User] = None):
from cognee.modules.data.methods import get_dataset_data
user = await get_default_user()
if not user:
user = await get_default_user()
dataset = await get_dataset(user.id, dataset_id)
dataset = await get_authorized_dataset(user, dataset_id)
return await get_dataset_data(dataset.id)
@ -35,14 +38,13 @@ class datasets:
return await get_pipeline_status(dataset_ids, pipeline_name="cognify_pipeline")
@staticmethod
async def delete_dataset(dataset_id: UUID, user_id: Optional[UUID] = None):
from cognee.modules.data.methods import get_dataset, delete_dataset
async def delete_dataset(dataset_id: UUID, user: Optional[User] = None):
from cognee.modules.data.methods import delete_dataset
if not user_id:
if not user:
user = await get_default_user()
user_id = user.id
dataset = await get_dataset(user.id, dataset_id)
dataset = await get_authorized_dataset(user, dataset_id, "delete")
if not dataset:
raise UnauthorizedDataAccessError(f"Dataset {dataset_id} not accessible.")
@ -52,14 +54,18 @@ class datasets:
return await delete_dataset(dataset)
@staticmethod
async def delete_data(dataset_id: UUID, data_id: UUID, user_id: Optional[UUID] = None):
async def delete_data(dataset_id: UUID, data_id: UUID, user: Optional[User] = None):
from cognee.modules.data.methods import delete_data, get_data
if not user_id:
if not user:
user = await get_default_user()
user_id = user.id
data = await get_data(user_id, data_id)
try:
await get_authorized_dataset(user, dataset_id, "delete")
except PermissionDeniedError:
raise UnauthorizedDataAccessError(f"Dataset {dataset_id} not accessible.")
data = await get_data(user.id, data_id)
if not data:
# If data is not found in the system, user is using a custom graph model.
@ -76,12 +82,11 @@ class datasets:
await delete_data(data)
@staticmethod
async def delete_all(user_id: Optional[UUID] = None):
if not user_id:
async def delete_all(user: Optional[User] = None):
if not user:
user = await get_default_user()
user_id = user.id
user_datasets = await get_datasets(user_id)
user_datasets = await get_authorized_existing_datasets([], "read", user)
for dataset in user_datasets:
await datasets.delete_dataset(dataset.id, user_id)
await datasets.delete_dataset(dataset.id, user)

View file

@ -188,7 +188,7 @@ def get_datasets_router() -> APIRouter:
No content returned on successful deletion.
If no datasets exist for the users, nothing happens.
"""
await datasets.delete_all(user.id)
await datasets.delete_all(user)
@router.delete(
"/{dataset_id}", response_model=None, responses={404: {"model": ErrorResponseDTO}}
@ -219,7 +219,7 @@ def get_datasets_router() -> APIRouter:
},
)
await datasets.delete_dataset(dataset_id, user.id)
await datasets.delete_dataset(dataset_id, user)
@router.delete(
"/{dataset_id}/data/{data_id}",
@ -257,7 +257,7 @@ def get_datasets_router() -> APIRouter:
},
)
await datasets.delete_data(dataset_id, data_id, user.id)
await datasets.delete_data(dataset_id, data_id, user)
@router.get("/{dataset_id}/graph", response_model=GraphDTO)
async def get_dataset_graph(dataset_id: UUID, user: User = Depends(get_authenticated_user)):

View file

@ -79,7 +79,7 @@ async def update(
await datasets.delete_data(
dataset_id=dataset_id,
data_id=data_id,
user_id=user.id,
user=user,
)
await add(

View file

@ -127,9 +127,7 @@ Be careful with deletion operations as they are irreversible.
"No user ID provided for deletion. Please specify using --user-id param."
)
await cognee_datasets.delete_dataset(
dataset_id=dataset_id, user_id=args.user_id
)
await cognee_datasets.delete_dataset(dataset_id=dataset_id, user=args.user)
elif hasattr(args, "dataset_id") and hasattr(args, "data_id"):
await cognee_datasets.delete_data(args.dataset_id, args.data_id)
except Exception as e:

View file

@ -9,6 +9,7 @@ from .get_dataset_data import get_dataset_data
from .get_authorized_dataset import get_authorized_dataset
from .get_authorized_dataset_by_name import get_authorized_dataset_by_name
from .get_data import get_data
from .get_last_added_data import get_last_added_data
from .get_unique_dataset_id import get_unique_dataset_id
from .get_authorized_existing_datasets import get_authorized_existing_datasets
from .get_dataset_ids import get_dataset_ids

View file

@ -0,0 +1,22 @@
from uuid import UUID
from typing import Optional
from sqlalchemy import select
from cognee.modules.data.models import Data, Dataset
from cognee.infrastructure.databases.relational import get_relational_engine
async def get_last_added_data(dataset_id: UUID) -> Optional[Data]:
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
result = await session.execute(
select(Data)
.join(Data.datasets)
.where((Dataset.id == dataset_id))
.order_by(Data.created_at.desc())
.limit(1)
)
data = result.scalar_one_or_none()
return data

View file

@ -15,6 +15,9 @@ from cognee.modules.graph.methods import (
async def delete_data_nodes_and_edges(dataset_id: UUID, data_id: UUID) -> None:
affected_nodes = await get_data_related_nodes(dataset_id, data_id)
if len(affected_nodes) == 0:
return
graph_engine = await get_graph_engine()
await graph_engine.delete_nodes([str(node.slug) for node in affected_nodes])

View file

@ -8,6 +8,6 @@ from cognee.modules.graph.models import Edge
@with_async_session
async def delete_data_related_edges(data_id: UUID, session: AsyncSession):
nodes = (await session.scalars(select(Edge).where(Edge.data_id == data_id))).all()
edges = (await session.scalars(select(Edge).where(Edge.data_id == data_id))).all()
await session.execute(delete(Edge).where(Edge.id.in_([node.id for node in nodes])))
await session.execute(delete(Edge).where(Edge.id.in_([edge.id for edge in edges])))

View file

@ -15,6 +15,9 @@ from cognee.modules.graph.methods import (
async def delete_dataset_nodes_and_edges(dataset_id: UUID) -> None:
affected_nodes = await get_dataset_related_nodes(dataset_id)
if len(affected_nodes) == 0:
return
graph_engine = await get_graph_engine()
await graph_engine.delete_nodes([str(node.slug) for node in affected_nodes])

View file

@ -2,3 +2,4 @@ from .log_pipeline_run_initiated import log_pipeline_run_initiated
from .log_pipeline_run_start import log_pipeline_run_start
from .log_pipeline_run_complete import log_pipeline_run_complete
from .log_pipeline_run_error import log_pipeline_run_error
from .pipeline import run_pipeline

View file

@ -2,17 +2,20 @@
import asyncio
from uuid import uuid5, NAMESPACE_OID
from typing import Type
from typing import Dict, Type
from pydantic import BaseModel
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.infrastructure.engine.models import DataPoint
from cognee.infrastructure.engine.models.DataPoint import DataPoint
from cognee.infrastructure.llm.extraction import extract_categories
from cognee.modules.chunking.models.DocumentChunk import DocumentChunk
from cognee.modules.graph.methods import upsert_edges, upsert_nodes
async def chunk_naive_llm_classifier(
data_chunks: list[DocumentChunk], classification_model: Type[BaseModel]
data_chunks: list[DocumentChunk],
classification_model: Type[BaseModel],
context: Dict,
) -> list[DocumentChunk]:
"""
Classifies a list of document chunks using a specified classification model and updates vector and graph databases with the classification results.
@ -97,8 +100,8 @@ async def chunk_naive_llm_classifier(
{
"uuid": str(classification_type_id),
"text": classification_type_label,
"chunk_id": str(data_chunk.chunk_id),
"document_id": str(data_chunk.document_id),
"chunk_id": str(data_chunk.id),
"document_id": str(data_chunk.is_part_of.id),
}
),
index_fields=["text"],
@ -119,12 +122,12 @@ async def chunk_naive_llm_classifier(
edges.append(
(
str(data_chunk.chunk_id),
str(data_chunk.id),
str(classification_type_id),
"is_media_type",
dict(
relationship_name="is_media_type",
source_node_id=str(data_chunk.chunk_id),
source_node_id=str(data_chunk.id),
target_node_id=str(classification_type_id),
),
)
@ -142,8 +145,8 @@ async def chunk_naive_llm_classifier(
{
"uuid": str(classification_subtype_id),
"text": classification_subtype_label,
"chunk_id": str(data_chunk.chunk_id),
"document_id": str(data_chunk.document_id),
"chunk_id": str(data_chunk.id),
"document_id": str(data_chunk.is_part_of.id),
}
),
index_fields=["text"],
@ -177,12 +180,12 @@ async def chunk_naive_llm_classifier(
edges.append(
(
str(data_chunk.chunk_id),
str(data_chunk.id),
str(classification_subtype_id),
"is_classified_as",
dict(
relationship_name="is_classified_as",
source_node_id=str(data_chunk.chunk_id),
source_node_id=str(data_chunk.id),
target_node_id=str(classification_subtype_id),
),
)
@ -194,6 +197,18 @@ async def chunk_naive_llm_classifier(
graph_engine = await get_graph_engine()
await graph_engine.add_nodes(nodes)
await upsert_nodes(
nodes,
user_id=context["user"].id,
dataset_id=context["dataset"].id,
data_id=context["data"].id,
)
await graph_engine.add_edges(edges)
await upsert_edges(
edges,
user_id=context["user"].id,
dataset_id=context["dataset"].id,
data_id=context["data"].id,
)
return data_chunks

View file

@ -8,8 +8,9 @@ from cognee.infrastructure.llm.prompts import render_prompt
from cognee.infrastructure.llm import LLMGateway
from cognee.shared.logging_utils import get_logger
from cognee.modules.engine.models import NodeSet
from cognee.modules.graph.methods import upsert_edges
from cognee.tasks.storage import add_data_points, index_graph_edges
from typing import Optional, List, Any
from typing import Dict, Optional, List, Any
from pydantic import Field
logger = get_logger("coding_rule_association")
@ -94,6 +95,7 @@ async def add_rule_associations(
rules_nodeset_name: str,
user_prompt_location: str = "coding_rule_association_agent_user.txt",
system_prompt_location: str = "coding_rule_association_agent_system.txt",
context: Optional[Dict] = None,
):
if isinstance(data, list):
# If data is a list of strings join all strings in list
@ -124,4 +126,11 @@ async def add_rule_associations(
if len(edges_to_save) > 0:
await graph_engine.add_edges(edges_to_save)
if context:
await upsert_edges(
edges_to_save,
user_id=context["user"].id,
dataset_id=context["dataset"].id,
data_id=context["data"].id,
)
await index_graph_edges(edges_to_save)

View file

@ -1,5 +1,5 @@
import asyncio
from typing import Type, List
from typing import Dict, Type, List
from pydantic import BaseModel
from cognee.infrastructure.llm.extraction import extract_content_graph
@ -8,7 +8,9 @@ from cognee.tasks.storage import add_data_points
async def extract_graph_from_code(
data_chunks: list[DocumentChunk], graph_model: Type[BaseModel]
data_chunks: list[DocumentChunk],
graph_model: Type[BaseModel],
context: Dict,
) -> List[DocumentChunk]:
"""
Extracts a knowledge graph from the text content of document chunks using a specified graph model.
@ -23,6 +25,6 @@ async def extract_graph_from_code(
for chunk_index, chunk in enumerate(data_chunks):
chunk_graph = chunk_graphs[chunk_index]
await add_data_points(chunk_graph.nodes)
await add_data_points(chunk_graph.nodes, context)
return data_chunks

View file

@ -3,6 +3,7 @@ from typing import Dict, Type, List, Optional
from pydantic import BaseModel
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.modules.graph.methods import upsert_edges
from cognee.modules.ontology.ontology_env_config import get_ontology_env_config
from cognee.tasks.storage.add_data_points import add_data_points
from cognee.modules.ontology.ontology_config import Config
@ -89,6 +90,12 @@ async def integrate_chunk_graphs(
if len(graph_edges) > 0:
await graph_engine.add_edges(graph_edges)
await upsert_edges(
graph_edges,
user_id=context["user"].id,
dataset_id=context["dataset"].id,
data_id=context["data"].id,
)
return data_chunks

View file

@ -1,14 +1,14 @@
import os
import pathlib
from typing import List
from uuid import uuid4
from uuid import UUID, uuid4
from pydantic import BaseModel
import cognee
from cognee.api.v1.datasets import datasets
from cognee.infrastructure.engine import DataPoint
from cognee.modules.data.models import Data, Dataset
from cognee.modules.data.methods import create_authorized_dataset
from cognee.modules.engine.operations.setup import setup
from cognee.modules.graph.methods import delete_data_nodes_and_edges
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_default_user
from cognee.shared.logging_utils import get_logger
@ -60,9 +60,12 @@ async def main():
user: User = await get_default_user() # type: ignore
dataset = Dataset(id=uuid4())
data1 = Data(id=uuid4())
data2 = Data(id=uuid4())
class CustomData(BaseModel):
id: UUID
dataset = await create_authorized_dataset(dataset_name="test_dataset", user=user)
data1 = CustomData(id=uuid4())
data2 = CustomData(id=uuid4())
await add_data_points(
[person1],
@ -91,12 +94,12 @@ async def main():
"Nodes and edges are not correctly added to the graph."
)
await datasets.delete_data(dataset.id, data1.id, user.id) # type: ignore
await datasets.delete_data(dataset.id, data1.id, user)
nodes, edges = await graph_engine.get_graph_data()
assert len(nodes) == 2 and len(edges) == 1, "Nodes and edges are not deleted properly."
await datasets.delete_data(dataset.id, data2.id, user.id) # type: ignore
await datasets.delete_data(dataset.id, data2.id, user)
nodes, edges = await graph_engine.get_graph_data()
assert len(nodes) == 0 and len(edges) == 0, "Nodes and edges are not deleted."

View file

@ -6,11 +6,8 @@ from cognee.api.v1.datasets import datasets
from cognee.api.v1.visualize.visualize import visualize_graph
from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.modules.data.methods import delete_data, get_dataset_data
from cognee.modules.data.methods import get_dataset_data
from cognee.modules.engine.operations.setup import setup
from cognee.modules.graph.methods import (
delete_data_nodes_and_edges,
)
from cognee.modules.users.methods import get_default_user
from cognee.shared.logging_utils import get_logger
@ -61,7 +58,7 @@ async def main():
assert len(nodes) >= 12 and len(edges) >= 18, "Nodes and edges are not deleted."
user = await get_default_user()
await datasets.delete_data(dataset_id, added_data.id, user.id) # type: ignore
await datasets.delete_data(dataset_id, added_data.id, user) # type: ignore
file_path = os.path.join(
pathlib.Path(__file__).parent, ".artifacts", "graph_visualization_after_delete.html"
@ -69,7 +66,9 @@ async def main():
await visualize_graph(file_path)
nodes, edges = await graph_engine.get_graph_data()
assert len(nodes) >= 8 and len(edges) >= 10, "Nodes and edges are not deleted."
assert len(nodes) >= 8 and len(nodes) < 12 and len(edges) >= 10 and len(edges) < 18, (
"Nodes and edges are not deleted."
)
if __name__ == "__main__":

View file

@ -192,7 +192,7 @@ async def main():
await cognee.datasets.delete_data(
dataset_id=test_user_dataset_id,
data_id=text_data_id,
user_id=default_user.id,
user=default_user,
)
except UnauthorizedDataAccessError:
delete_error = True
@ -207,7 +207,7 @@ async def main():
text_data_id = test_user_dataset_data[0].id
await cognee.datasets.delete_data(
dataset_id=test_user_dataset_id, data_id=text_data_id, user_id=test_user.id
dataset_id=test_user_dataset_id, data_id=text_data_id, user=test_user
)
# Actually give permission to default_user to delete data for test_users dataset
@ -226,7 +226,7 @@ async def main():
await cognee.datasets.delete_data(
dataset_id=test_user_dataset_id,
data_id=explanation_file_data_id,
user_id=default_user.id,
user=default_user,
)

View file

@ -95,6 +95,7 @@ async def graph_saving_worker():
async def save_graph_nodes(new_nodes):
try:
await graph_engine.add_nodes(new_nodes, distributed=False)
# await upsert_nodes(new_nodes, user_id=user.id, dataset_id=dataset.id, data_id=data.id)
except Exception as error:
if is_deadlock_error(error):
raise GraphDatabaseDeadlockError()
@ -107,6 +108,7 @@ async def graph_saving_worker():
async def save_graph_edges(new_edges):
try:
await graph_engine.add_edges(new_edges, distributed=False)
# await upsert_edges(new_edges, user_id=user.id, dataset_id=dataset.id, data_id=data.id)
except Exception as error:
if is_deadlock_error(error):
raise GraphDatabaseDeadlockError()

View file

@ -2,8 +2,9 @@ import asyncio
import cognee
from cognee.modules.data.methods import get_dataset_data, get_datasets
from cognee.modules.pipelines.operations import run_pipeline
from cognee.shared.logging_utils import setup_logging, ERROR
from cognee.modules.pipelines import Task, run_tasks
from cognee.modules.pipelines import Task
from cognee.tasks.temporal_awareness import build_graph_with_temporal_awareness
from cognee.infrastructure.databases.relational import (
create_db_and_tables as create_relational_db_and_tables,
@ -42,7 +43,7 @@ async def main():
datasets = await get_datasets(user.id)
dataset_data = await get_dataset_data(datasets[0].id) # type: ignore
pipeline = run_tasks(tasks, dataset=datasets[0], data=dataset_data, user=user)
pipeline = run_pipeline(tasks, data=dataset_data, datasets=[datasets[0].id], user=user)
async for result in pipeline:
print(result)