feat: add write permission enforcement to Cognee

This commit is contained in:
Igor Ilic 2025-05-27 16:02:17 +02:00
parent 5582a4cd69
commit 7053ce7c84
2 changed files with 71 additions and 15 deletions

View file

@ -16,6 +16,7 @@ from cognee.modules.pipelines.tasks.task import Task
from cognee.modules.users.methods import get_default_user
from cognee.modules.users.models import User
from cognee.modules.pipelines.operations import log_pipeline_run_initiated
from cognee.modules.users.permissions.methods import get_all_user_permission_datasets
from cognee.context_global_variables import set_database_global_context_variables
from cognee.infrastructure.databases.relational import (
@ -74,21 +75,9 @@ async def cognee_pipeline(
if isinstance(datasets, str):
datasets = [datasets]
# Convert list of dataset names to dataset UUID
# TODO: ADD FUNCTION CAN"T WORK LIKE THIS JESUS
if all(isinstance(dataset, str) for dataset in datasets):
# Get all user owned dataset objects
user_datasets = await get_datasets(user.id)
# Filter out non name mentioned datasets
dataset_ids = [dataset.id for dataset in user_datasets if dataset in datasets]
# Return list of dataset UUIDs
elif all(isinstance(dataset, UUID) for dataset in datasets):
dataset_ids = datasets
else:
raise InvalidValueError(f"Provided datasets value is not supported: f{datasets}")
# If no datasets are provided, work with all existing datasets user has permission for.
existing_datasets = await get_specific_user_permission_datasets(user.id, "write", dataset_ids)
# Get datasets user wants write permissions for (verify user has permissions if datasets are provided as well)
# NOTE: If a user wants to write to a dataset he does not own it must be provided through UUID
existing_datasets = await get_existing_datasets(datasets, user)
if not datasets:
# Get datasets from database if none sent.
@ -117,6 +106,10 @@ async def cognee_pipeline(
owner_id=user.id,
)
)
else:
raise InvalidValueError(
f"Provided dataset is not handled properly: f{dataset_name}"
)
datasets = dataset_instances
@ -210,3 +203,65 @@ async def run_pipeline(
def check_dataset_name(dataset_name: str) -> str:
if "." in dataset_name or " " in dataset_name:
raise ValueError("Dataset name cannot contain spaces or underscores")
async def get_dataset_ids(datasets: Union[list[str], list[UUID]], user):
"""
Function returns dataset IDs necessary based on provided input.
It transforms raw strings into real dataset_ids with keeping write permissions in mind.
If a user wants to write to a dataset he is not the owner of it must be provided through UUID.
Args:
datasets:
pipeline_name:
user:
Returns: a list of write access dataset_ids if they exist
"""
if all(isinstance(dataset, UUID) for dataset in datasets):
# Return list of dataset UUIDs
dataset_ids = datasets
else:
# Convert list of dataset names to dataset UUID
if all(isinstance(dataset, str) for dataset in datasets):
# Get all user owned dataset objects (If a user wants to write to a dataset he is not the owner of it must be provided through UUID.)
user_datasets = await get_datasets(user.id)
# Filter out non name mentioned datasets
dataset_ids = [dataset.id for dataset in user_datasets if dataset.name in datasets]
else:
raise InvalidValueError(f"Provided datasets value is not handled: f{datasets}")
return dataset_ids
async def get_existing_datasets(
datasets: Union[list[str], list[UUID]], user: User
) -> list[Dataset]:
"""
Function returns a list of existing dataset objects user has access for based on datasets input.
Args:
datasets:
user:
Returns:
list of Dataset objects
"""
# TODO: Test 1. add pipeline with: datasetName, datasetName and datasetID
# Test 2. Cognify without dataset info, cognify with datasetIDs user has write and no write access for
if datasets:
# Function handles transforming dataset input to dataset IDs (if possible)
dataset_ids = await get_dataset_ids(datasets, user)
# If dataset_ids are provided filter these datasets based on what user has permission for.
if dataset_ids:
existing_datasets = await get_specific_user_permission_datasets(
user.id, "write", dataset_ids
)
else:
existing_datasets = []
else:
# If no datasets are provided, work with all existing datasets user has permission for.
existing_datasets = await get_all_user_permission_datasets(user, "write")
return existing_datasets

View file

@ -172,6 +172,7 @@ async def ingest_data(
await give_permission_on_dataset(user, dataset.id, "read")
await give_permission_on_dataset(user, dataset.id, "write")
await give_permission_on_dataset(user, dataset.id, "delete")
await give_permission_on_dataset(user, dataset.id, "share")
return file_paths