Compare commits

...
Sign in to create a new pull request.

44 commits

Author SHA1 Message Date
Igor Ilic
b82822ceb9
Merge branch 'dev' into dataset-permissions 2025-05-22 14:34:04 +02:00
Igor Ilic
1d2fd2f2ec refactor: Change return message 2025-05-22 14:31:06 +02:00
Igor Ilic
34ee9f577d refactor: Move user dataset databases to cognee system 2025-05-22 14:23:32 +02:00
Igor Ilic
a89d5570ff refactor: add better docstring explanation to variable 2025-05-22 02:13:18 +02:00
Igor Ilic
f8110c4548 Merge branch 'dataset-permissions' of github.com:topoteretes/cognee into dataset-permissions 2025-05-22 02:08:15 +02:00
Igor Ilic
450320ba2c fix: Resolve searching of dataset when you have permission but are not the owner 2025-05-22 02:07:45 +02:00
Igor Ilic
c68175d3f5
Merge branch 'dev' into dataset-permissions 2025-05-22 01:43:17 +02:00
Igor Ilic
a1bf8416bd feat: Added permission endpoints for managing database access, tenant and role creation and management 2025-05-22 01:42:27 +02:00
Igor Ilic
c803042280 fix: Allow swagger UI to authorize once to test all endpoints 2025-05-21 23:43:44 +02:00
Igor Ilic
c14f1a5fb0 fix: Resolve issue with Swagger authenthication for Cognee 2025-05-21 23:25:00 +02:00
Igor Ilic
d47d410499 refactor: refactor permissions search 2025-05-21 15:08:12 +02:00
Igor Ilic
9e09d26501 refactor: simplify retrieval of datasets user has access to 2025-05-21 14:42:03 +02:00
Igor Ilic
f8f400dbeb refactor: Return list of dictionaries for permission search 2025-05-21 12:28:20 +02:00
Igor Ilic
b11236f592 feat: Add ability to filter search of datasets by dataset permissions 2025-05-21 02:02:34 +02:00
Igor Ilic
c383253195 feat: Add permission filtering of cognee search 2025-05-21 00:33:52 +02:00
Igor Ilic
9aa8e543cb refactor: Remove global context handling from api code 2025-05-20 21:39:55 +02:00
Igor Ilic
6466f66a76
Merge branch 'dev' into dataset-permissions 2025-05-20 21:22:49 +02:00
Igor Ilic
2383843ec7 refactor: Remove unnecessary coments, rename dataset database table 2025-05-20 20:15:48 +02:00
Igor Ilic
c908aefd80 fix: Resolve issue with sqlite UUID conversion 2025-05-20 17:42:48 +02:00
Igor Ilic
4ca3baa383
Merge branch 'dev' into dataset-permissions 2025-05-20 17:27:16 +02:00
Igor Ilic
70d49745d9 fix: Resolve issue with gettings documents ids for user 2025-05-20 17:25:11 +02:00
Igor Ilic
d7a8b29147
Merge branch 'dev' into dataset-permissions 2025-05-20 16:02:31 +02:00
Igor Ilic
893fdd1588 feat: Make access control optional 2025-05-20 15:44:15 +02:00
Igor Ilic
232ac4e271
Merge branch 'dev' into dataset-permissions 2025-05-20 15:12:56 +02:00
Igor Ilic
00948ec8db refactor: Make context variable use more modular and reusable 2025-05-20 15:06:17 +02:00
Igor Ilic
1361203ead refactor: Add TODOs to handle refactoring 2025-05-20 13:54:02 +02:00
Igor Ilic
472143df03 feat: Add database per user + dataset for Cognee SaaS 2025-05-19 18:31:05 +02:00
Igor Ilic
cb7a8951ff feat: Rework cognee to have permissions work with datasets 2025-05-19 17:11:25 +02:00
Igor Ilic
7865b4ce3e feat: Add middleware for user specific settings for llms and databases 2025-05-16 17:02:53 +02:00
Igor Ilic
2871d68673
Merge branch 'dev' into async-multiple-db-solution 2025-05-16 15:31:07 +02:00
Igor Ilic
bc3d35d51e test: Add simple test to see if multiple databases will work in parallel in Cognee 2025-05-15 18:21:40 +02:00
Igor Ilic
5cb1b53ddd Merge branch 'async-multiple-db-solution' of github.com:topoteretes/cognee into async-multiple-db-solution 2025-05-15 18:13:28 +02:00
Igor Ilic
2d0d7fa71c refactor: Move testing of parallel databases to its own test 2025-05-15 18:13:07 +02:00
Igor Ilic
4a58913e55
Merge branch 'dev' into async-multiple-db-solution 2025-05-15 18:03:26 +02:00
Igor Ilic
42be438ab6 refactor: Moved database context variables to cognee pipeline 2025-05-15 18:00:44 +02:00
Igor Ilic
70e307a905 feat: first version of async multiple db support 2025-05-15 17:05:11 +02:00
Igor Ilic
ddfa506cf8 chore: Update tests to reflect new node value 2025-05-15 12:25:22 +02:00
Igor Ilic
be5e5078b3 feat: Add table_name as part of column naming 2025-05-15 12:21:51 +02:00
Igor Ilic
7e3d593684 Merge branch 'add-column-value-db-migration' of github.com:topoteretes/cognee into add-column-value-db-migration 2025-05-15 11:59:49 +02:00
Igor Ilic
e3dbc186fd chore: Updated tests to reflect new mapping of rel db to graph 2025-05-15 11:58:19 +02:00
Igor Ilic
6f78462f3c
Merge branch 'dev' into add-column-value-db-migration 2025-05-15 05:41:33 -04:00
Igor Ilic
4ddfdc13c8 refactor: Change color of ColumnValue node 2025-05-15 11:33:52 +02:00
Igor Ilic
cdaf4afba8 feat: Add filtering of foreign key column values from column migration 2025-05-15 11:24:25 +02:00
Igor Ilic
7e8f5473a7 feat: Add migration of relational database column values to rel db migration 2025-05-14 16:56:15 +02:00
54 changed files with 4582 additions and 3884 deletions

View file

@ -69,3 +69,11 @@ LITELLM_LOG="ERROR"
# Set this environment variable to disable sending telemetry data
# TELEMETRY_DISABLED=1
# Set this variable to True to enforce usage of backend access control for Cognee
# Note: This is only currently supported by the following databases:
# Relational: SQLite, Postgres
# Vector: LanceDB
# Graph: KuzuDB
#
# It enforces LanceDB and KuzuDB use and uses them to create databases per Cognee user + dataset
ENABLE_BACKEND_ACCESS_CONTROL=False

View file

@ -215,3 +215,34 @@ jobs:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
run: poetry run python ./cognee/tests/test_s3.py
test-parallel-databases:
name: Test using different async databases in parallel in Cognee
runs-on: ubuntu-22.04
steps:
- name: Check out repository
uses: actions/checkout@v4
- name: Cognee Setup
uses: ./.github/actions/cognee_setup
with:
python-version: '3.11.x'
- name: Install specific graph db dependency
run: |
poetry install -E kuzu
- name: Run parallel databases test
env:
ENV: 'dev'
LLM_MODEL: ${{ secrets.LLM_MODEL }}
LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }}
LLM_API_KEY: ${{ secrets.LLM_API_KEY }}
LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }}
EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }}
EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }}
EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }}
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
run: poetry run python ./cognee/tests/test_parallel_databases.py

View file

@ -1,6 +1,8 @@
"""FastAPI server for the Cognee API."""
import os
import fastapi.exceptions
import uvicorn
from cognee.shared.logging_utils import get_logger
import sentry_sdk
@ -63,6 +65,7 @@ async def lifespan(app: FastAPI):
app = FastAPI(debug=app_environment != "prod", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],

View file

@ -11,9 +11,17 @@ async def add(
dataset_name: str = "main_dataset",
user: User = None,
node_set: Optional[List[str]] = None,
vector_db_config: dict = None,
graph_db_config: dict = None,
):
tasks = [Task(resolve_data_directories), Task(ingest_data, dataset_name, user, node_set)]
await cognee_pipeline(
tasks=tasks, datasets=dataset_name, data=data, user=user, pipeline_name="add_pipeline"
tasks=tasks,
datasets=dataset_name,
data=data,
user=user,
pipeline_name="add_pipeline",
vector_db_config=vector_db_config,
graph_db_config=graph_db_config,
)

View file

@ -1,4 +1,5 @@
from uuid import UUID
from fastapi import Form, UploadFile, Depends
from fastapi.responses import JSONResponse
from fastapi import APIRouter
@ -11,6 +12,8 @@ import requests
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_authenticated_user
from cognee.context_global_variables import set_database_global_context_variables
logger = get_logger()

View file

@ -9,7 +9,7 @@ from cognee.modules.pipelines.tasks.task import Task
from cognee.modules.users.models import User
from cognee.shared.data_models import KnowledgeGraph
from cognee.tasks.documents import (
check_permissions_on_documents,
check_permissions_on_dataset,
classify_documents,
extract_chunks_from_documents,
)
@ -31,11 +31,18 @@ async def cognify(
chunker=TextChunker,
chunk_size: int = None,
ontology_file_path: Optional[str] = None,
vector_db_config: dict = None,
graph_db_config: dict = None,
):
tasks = await get_default_tasks(user, graph_model, chunker, chunk_size, ontology_file_path)
return await cognee_pipeline(
tasks=tasks, datasets=datasets, user=user, pipeline_name="cognify_pipeline"
tasks=tasks,
datasets=datasets,
user=user,
pipeline_name="cognify_pipeline",
vector_db_config=vector_db_config,
graph_db_config=graph_db_config,
)
@ -48,7 +55,7 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's
) -> list[Task]:
default_tasks = [
Task(classify_documents),
Task(check_permissions_on_documents, user=user, permissions=["write"]),
Task(check_permissions_on_dataset, user=user, permissions=["write"]),
Task(
extract_chunks_from_documents,
max_chunk_size=chunk_size or get_max_chunk_tokens(),

View file

@ -1,14 +1,27 @@
from uuid import UUID
from typing import List
from fastapi import APIRouter
from fastapi import APIRouter, Depends
from fastapi.responses import JSONResponse
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_authenticated_user
from cognee.api.DTO import InDTO
class DatasetPermissionsPayloadDTO(InDTO):
principal_id: str
permission_name: str
dataset_ids: List[str]
def get_permissions_router() -> APIRouter:
permissions_router = APIRouter()
@permissions_router.post("/roles/{role_id}/permissions")
async def give_default_permission_to_role(role_id: UUID, permission_name: str):
@permissions_router.post("/tenants/{role_id}/default_permissions")
async def give_default_permission_to_role(
role_id: UUID, permission_name: str, user: User = Depends(get_authenticated_user)
):
from cognee.modules.users.permissions.methods import (
give_default_permission_to_role as set_default_permission_to_role,
)
@ -17,8 +30,10 @@ def get_permissions_router() -> APIRouter:
return JSONResponse(status_code=200, content={"message": "Permission assigned to role"})
@permissions_router.post("/tenants/{tenant_id}/permissions")
async def give_default_permission_to_tenant(tenant_id: UUID, permission_name: str):
@permissions_router.post("/tenants/{tenant_id}/default_permissions")
async def give_default_permission_to_tenant(
tenant_id: UUID, permission_name: str, user: User = Depends(get_authenticated_user)
):
from cognee.modules.users.permissions.methods import (
give_default_permission_to_tenant as set_tenant_default_permissions,
)
@ -27,8 +42,10 @@ def get_permissions_router() -> APIRouter:
return JSONResponse(status_code=200, content={"message": "Permission assigned to tenant"})
@permissions_router.post("/users/{user_id}/permissions")
async def give_default_permission_to_user(user_id: UUID, permission_name: str):
@permissions_router.post("/tenants/{user_id}/default_permissions")
async def give_default_permission_to_user(
user_id: UUID, permission_name: str, user: User = Depends(get_authenticated_user)
):
from cognee.modules.users.permissions.methods import (
give_default_permission_to_user as set_default_permission_to_user,
)
@ -37,30 +54,54 @@ def get_permissions_router() -> APIRouter:
return JSONResponse(status_code=200, content={"message": "Permission assigned to user"})
@permissions_router.post("/roles")
async def create_role(
role_name: str,
tenant_id: UUID,
@permissions_router.post("/datasets/{principal_id}/")
async def give_datasets_permission_to_principal(
payload: DatasetPermissionsPayloadDTO, user: User = Depends(get_authenticated_user)
):
from cognee.modules.users.permissions.methods import authorized_give_permission_on_datasets
await authorized_give_permission_on_datasets(
UUID(payload.principal_id),
[UUID(dataset_id) for dataset_id in payload.dataset_ids],
payload.permission_name,
user.id,
)
return JSONResponse(
status_code=200, content={"message": "Permission assigned to principal"}
)
@permissions_router.post("/roles")
async def create_role(role_name: str, user: User = Depends(get_authenticated_user)):
from cognee.modules.users.roles.methods import create_role as create_role_method
await create_role_method(role_name=role_name, tenant_id=tenant_id)
await create_role_method(role_name=role_name, tenant_id=user.tenant_id)
return JSONResponse(status_code=200, content={"message": "Role created for tenant"})
@permissions_router.post("/users/{user_id}/roles")
async def add_user_to_role(user_id: UUID, role_id: UUID):
async def add_user_to_role(
user_id: str, role_id: str, user: User = Depends(get_authenticated_user)
):
from cognee.modules.users.roles.methods import add_user_to_role as add_user_to_role_method
await add_user_to_role_method(user_id=user_id, role_id=role_id)
await add_user_to_role_method(user_id=UUID(user_id), role_id=UUID(role_id))
return JSONResponse(status_code=200, content={"message": "User added to role"})
@permissions_router.post("/users/{user_id}/tenants")
async def add_user_to_tenant(user_id: str, user: User = Depends(get_authenticated_user)):
from cognee.modules.users.tenants.methods import add_user_to_tenant
await add_user_to_tenant(user_id=UUID(user_id), tenant_id=user.tenant_id, owner_id=user.id)
return JSONResponse(status_code=200, content={"message": "User added to tenant"})
@permissions_router.post("/tenants")
async def create_tenant(tenant_name: str):
async def create_tenant(tenant_name: str, user: User = Depends(get_authenticated_user)):
from cognee.modules.users.tenants.methods import create_tenant as create_tenant_method
await create_tenant_method(tenant_name=tenant_name)
await create_tenant_method(tenant_name=tenant_name, user_id=user.id)
return JSONResponse(status_code=200, content={"message": "Tenant created."})

View file

@ -1,4 +1,5 @@
from uuid import UUID
from typing import Optional
from datetime import datetime
from fastapi import Depends, APIRouter
from fastapi.responses import JSONResponse
@ -11,6 +12,7 @@ from cognee.modules.users.methods import get_authenticated_user
class SearchPayloadDTO(InDTO):
search_type: SearchType
datasets: Optional[list[str]] = None
query: str
@ -39,7 +41,10 @@ def get_search_router() -> APIRouter:
try:
results = await cognee_search(
query_text=payload.query, query_type=payload.search_type, user=user
query_text=payload.query,
query_type=payload.search_type,
user=user,
datasets=payload.datasets,
)
return results

View file

@ -0,0 +1,67 @@
import os
import pathlib
from contextvars import ContextVar
from typing import Union
from uuid import UUID
from cognee.infrastructure.databases.utils import get_or_create_dataset_database
from cognee.modules.users.methods import get_user
# Note: ContextVar allows us to use different graph db configurations in Cognee
# for different async tasks, threads and processes
vector_db_config = ContextVar("vector_db_config", default=None)
graph_db_config = ContextVar("graph_db_config", default=None)
async def set_database_global_context_variables(dataset: Union[str, UUID], user_id: UUID):
"""
If backend access control is enabled this function will ensure all datasets have their own databases,
access to which will be enforced by given permissions.
Database name will be determined by dataset_id and LanceDB and KuzuDB use will be enforced.
Note: This is only currently supported by the following databases:
Relational: SQLite, Postgres
Vector: LanceDB
Graph: KuzuDB
Args:
dataset: Cognee dataset name or id
user_id: UUID of the owner of the dataset
Returns:
"""
if not os.getenv("ENABLE_BACKEND_ACCESS_CONTROL", "false").lower() == "true":
return
user = await get_user(user_id)
# To ensure permissions are enforced properly all datasets will have their own databases
dataset_database = await get_or_create_dataset_database(dataset, user)
# TODO: Find better location for database files
cognee_directory_path = str(
pathlib.Path(
os.path.join(pathlib.Path(__file__).parent, f".cognee_system/databases/{user.id}")
).resolve()
)
# Set vector and graph database configuration based on dataset database information
vector_config = {
"vector_db_url": os.path.join(cognee_directory_path, dataset_database.vector_database_name),
"vector_db_key": "",
"vector_db_provider": "lancedb",
}
graph_config = {
"graph_database_provider": "kuzu",
"graph_file_path": os.path.join(
cognee_directory_path, dataset_database.graph_database_name
),
}
# Use ContextVar to use these graph and vector configurations are used
# in the current async context across Cognee
graph_db_config.set(graph_config)
vector_db_config.set(vector_config)

View file

@ -8,7 +8,7 @@ from cognee.modules.users.models import User
from cognee.shared.data_models import KnowledgeGraph
from cognee.shared.utils import send_telemetry
from cognee.tasks.documents import (
check_permissions_on_documents,
check_permissions_on_dataset,
classify_documents,
extract_chunks_from_documents,
)
@ -31,7 +31,7 @@ async def get_cascade_graph_tasks(
cognee_config = get_cognify_config()
default_tasks = [
Task(classify_documents),
Task(check_permissions_on_documents, user=user, permissions=["write"]),
Task(check_permissions_on_dataset, user=user, permissions=["write"]),
Task(
extract_chunks_from_documents, max_chunk_tokens=get_max_chunk_tokens()
), # Extract text chunks based on the document type.

View file

@ -49,3 +49,14 @@ class GraphConfig(BaseSettings):
@lru_cache
def get_graph_config():
return GraphConfig()
def get_graph_context_config():
"""This function will get the appropriate graph db config based on async context.
This allows the use of multiple graph databases for different threads, async tasks and parallelization
"""
from cognee.context_global_variables import graph_db_config
if graph_db_config.get():
return graph_db_config.get()
return get_graph_config().to_hashable_dict()

View file

@ -2,20 +2,21 @@
from functools import lru_cache
from .config import get_graph_config
from .config import get_graph_context_config
from .graph_db_interface import GraphDBInterface
async def get_graph_engine() -> GraphDBInterface:
"""Factory function to get the appropriate graph client based on the graph type."""
config = get_graph_config()
# Get appropriate graph configuration based on current async context
config = get_graph_context_config()
graph_client = create_graph_engine(**get_graph_config().to_hashable_dict())
graph_client = create_graph_engine(**config)
# Async functions can't be cached. After creating and caching the graph engine
# handle all necessary async operations for different graph types bellow.
# Handle loading of graph for NetworkX
if config.graph_database_provider.lower() == "networkx" and graph_client.graph is None:
if config["graph_database_provider"].lower() == "networkx" and graph_client.graph is None:
await graph_client.load_graph_from_file()
return graph_client
@ -24,11 +25,11 @@ async def get_graph_engine() -> GraphDBInterface:
@lru_cache
def create_graph_engine(
graph_database_provider,
graph_database_url,
graph_database_username,
graph_database_password,
graph_database_port,
graph_file_path,
graph_database_url="",
graph_database_username="",
graph_database_password="",
graph_database_port="",
):
"""Factory function to create the appropriate graph client based on the graph type."""

View file

@ -0,0 +1 @@
from .get_or_create_dataset_database import get_or_create_dataset_database

View file

@ -0,0 +1,68 @@
from uuid import UUID
from typing import Union
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from cognee.modules.data.methods import create_dataset
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.methods import get_unique_dataset_id
from cognee.modules.users.models import DatasetDatabase
from cognee.modules.users.models import User
async def get_or_create_dataset_database(
dataset: Union[str, UUID],
user: User,
) -> DatasetDatabase:
"""
Return the `DatasetDatabase` row for the given owner + dataset.
If the row already exists, it is fetched and returned.
Otherwise a new one is created atomically and returned.
Parameters
----------
user : User
Principal that owns this dataset.
dataset : Union[str, UUID]
Dataset being linked.
"""
db_engine = get_relational_engine()
dataset_id = await get_unique_dataset_id(dataset, user)
vector_db_name = f"{dataset_id}.lance.db"
graph_db_name = f"{dataset_id}.pkl"
async with db_engine.get_async_session() as session:
# Create dataset if it doesn't exist
if isinstance(dataset, str):
dataset = await create_dataset(dataset, user, session)
# Try to fetch an existing row first
stmt = select(DatasetDatabase).where(
DatasetDatabase.owner_id == user.id,
DatasetDatabase.dataset_id == dataset_id,
)
existing: DatasetDatabase = await session.scalar(stmt)
if existing:
return existing
# If there are no existing rows build a new row
record = DatasetDatabase(
owner_id=user.id,
dataset_id=dataset_id,
vector_database_name=vector_db_name,
graph_database_name=graph_db_name,
)
try:
session.add(record)
await session.commit()
await session.refresh(record)
return record
except IntegrityError:
await session.rollback()
raise

View file

@ -26,3 +26,12 @@ class VectorConfig(BaseSettings):
@lru_cache
def get_vectordb_config():
return VectorConfig()
def get_vectordb_context_config():
"""This function will get the appropriate vector db config based on async context."""
from cognee.context_global_variables import vector_db_config
if vector_db_config.get():
return vector_db_config.get()
return get_vectordb_config().to_dict()

View file

@ -5,10 +5,10 @@ from functools import lru_cache
@lru_cache
def create_vector_engine(
vector_db_url: str,
vector_db_port: str,
vector_db_key: str,
vector_db_provider: str,
vector_db_url: str,
vector_db_port: str = "",
vector_db_key: str = "",
):
embedding_engine = get_embedding_engine()

View file

@ -1,6 +1,7 @@
from .config import get_vectordb_config
from .config import get_vectordb_context_config
from .create_vector_engine import create_vector_engine
def get_vector_engine():
return create_vector_engine(**get_vectordb_config().to_dict())
# Get appropriate vector db configuration based on current async context
return create_vector_engine(**get_vectordb_context_config())

View file

@ -1,11 +1,13 @@
from ..get_vector_engine import get_vector_engine, get_vectordb_config
from ..get_vector_engine import get_vector_engine, get_vectordb_context_config
from sqlalchemy import text
from cognee.context_global_variables import vector_db_config as context_vector_db_config
async def create_db_and_tables():
vector_config = get_vectordb_config()
# Get appropriate vector db configuration based on current async context
vector_config = get_vectordb_context_config()
vector_engine = get_vector_engine()
if vector_config.vector_db_provider == "pgvector":
if vector_config["vector_db_provider"] == "pgvector":
async with vector_engine.engine.begin() as connection:
await connection.execute(text("CREATE EXTENSION IF NOT EXISTS vector;"))

View file

@ -1,4 +1,3 @@
from uuid import UUID, uuid5, NAMESPACE_OID
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.orm import joinedload

View file

@ -1,6 +1,9 @@
from uuid import UUID, uuid5, NAMESPACE_OID
from cognee.modules.users.models import User
from typing import Union
async def get_unique_dataset_id(dataset_name: str, user: User) -> UUID:
async def get_unique_dataset_id(dataset_name: Union[str, UUID], user: User) -> UUID:
if isinstance(dataset_name, UUID):
return dataset_name
return uuid5(NAMESPACE_OID, f"{dataset_name}{str(user.id)}")

View file

@ -33,9 +33,6 @@ class Data(Base):
cascade="all, delete",
)
# New relationship for ACLs with cascade deletion
acls = relationship("ACL", back_populates="data", cascade="all, delete-orphan")
def to_json(self) -> dict:
return {
"id": str(self.id),

View file

@ -19,6 +19,8 @@ class Dataset(Base):
owner_id = Column(UUID, index=True)
acls = relationship("ACL", back_populates="dataset", cascade="all, delete-orphan")
data: Mapped[List["Data"]] = relationship(
"Data",
secondary=DatasetData.__tablename__,

View file

@ -14,6 +14,7 @@ from cognee.modules.pipelines.tasks.task import Task
from cognee.modules.users.methods import get_default_user
from cognee.modules.users.models import User
from cognee.modules.pipelines.operations import log_pipeline_run_initiated
from cognee.context_global_variables import set_database_global_context_variables
from cognee.infrastructure.databases.relational import (
create_db_and_tables as create_relational_db_and_tables,
@ -21,6 +22,10 @@ from cognee.infrastructure.databases.relational import (
from cognee.infrastructure.databases.vector.pgvector import (
create_db_and_tables as create_pgvector_db_and_tables,
)
from cognee.context_global_variables import (
graph_db_config as context_graph_db_config,
vector_db_config as context_vector_db_config,
)
logger = get_logger("cognee.pipeline")
@ -33,7 +38,16 @@ async def cognee_pipeline(
datasets: Union[str, list[str]] = None,
user: User = None,
pipeline_name: str = "custom_pipeline",
vector_db_config: dict = None,
graph_db_config: dict = None,
):
# Note: These context variables allow different value assignment for databases in Cognee
# per async task, thread, process and etc.
if vector_db_config:
context_vector_db_config.set(vector_db_config)
if graph_db_config:
context_graph_db_config.set(graph_db_config)
# Create tables for databases
await create_relational_db_and_tables()
await create_pgvector_db_and_tables()
@ -96,7 +110,12 @@ async def cognee_pipeline(
for dataset in datasets:
awaitables.append(
run_pipeline(
dataset=dataset, user=user, tasks=tasks, data=data, pipeline_name=pipeline_name
dataset=dataset,
user=user,
tasks=tasks,
data=data,
pipeline_name=pipeline_name,
context={"dataset": dataset},
)
)
@ -109,9 +128,13 @@ async def run_pipeline(
tasks: list[Task],
data=None,
pipeline_name: str = "custom_pipeline",
context: dict = None,
):
check_dataset_name(dataset.name)
# Will only be used if ENABLE_BACKEND_ACCESS_CONTROL is set to True
await set_database_global_context_variables(dataset.name, user.id)
# Ugly hack, but no easier way to do this.
if pipeline_name == "add_pipeline":
# Refresh the add pipeline status so data is added to a dataset.
@ -160,7 +183,7 @@ async def run_pipeline(
if not isinstance(task, Task):
raise ValueError(f"Task {task} is not an instance of Task")
pipeline_run = run_tasks(tasks, dataset_id, data, user, pipeline_name)
pipeline_run = run_tasks(tasks, dataset_id, data, user, pipeline_name, context=context)
pipeline_run_status = None
async for run_status in pipeline_run:

View file

@ -1,6 +1,9 @@
import asyncio
import os
import json
from typing import Callable
from typing import Callable, Optional
from cognee.context_global_variables import set_database_global_context_variables
from cognee.exceptions import InvalidValueError
from cognee.infrastructure.engine.utils import parse_id
from cognee.modules.retrieval.chunks_retriever import ChunksRetriever
@ -17,8 +20,10 @@ from cognee.modules.retrieval.natural_language_retriever import NaturalLanguageR
from cognee.modules.search.types import SearchType
from cognee.modules.storage.utils import JSONEncoder
from cognee.modules.users.models import User
from cognee.modules.data.models import Dataset
from cognee.modules.users.permissions.methods import get_document_ids_for_user
from cognee.shared.utils import send_telemetry
from cognee.modules.users.permissions.methods import get_specific_user_permission_datasets
from ..operations import log_query, log_result
from ...retrieval.graph_completion_cot_retriever import GraphCompletionCotRetriever
@ -31,6 +36,12 @@ async def search(
system_prompt_path="answer_simple_question.txt",
top_k: int = 10,
):
# Use search function filtered by permissions if access control is enabled
if os.getenv("ENABLE_BACKEND_ACCESS_CONTROL", "false").lower() == "true":
return await permissions_search(
query_text, query_type, user, datasets, system_prompt_path, top_k
)
query = await log_query(query_text, query_type.value, user.id)
own_document_ids = await get_document_ids_for_user(user.id, datasets)
@ -40,6 +51,7 @@ async def search(
filtered_search_results = []
# TODO: Is document_id ever not None? Should we remove document handling from here if it's not?
for search_result in search_results:
document_id = search_result["document_id"] if "document_id" in search_result else None
document_id = parse_id(document_id)
@ -95,3 +107,66 @@ async def specific_search(
send_telemetry("cognee.search EXECUTION COMPLETED", user.id)
return results
async def permissions_search(
query_text: str,
query_type: SearchType,
user: User = None,
datasets: Optional[list[str]] = None,
system_prompt_path: str = "answer_simple_question.txt",
top_k: int = 10,
) -> list:
"""
Verifies access for provided datasets or uses all datasets user has read access for and performs search per dataset.
Not to be used outside of active access control mode.
"""
query = await log_query(query_text, query_type.value, user.id)
# Find datasets user has read access for (if datasets are provided only return them. Provided user has read access)
search_datasets = await get_specific_user_permission_datasets(user, "read", datasets)
# TODO: If there are no datasets the user has access to do we raise an error? How do we handle informing him?
if not search_datasets:
pass
# Searches all provided datasets and handles setting up of appropriate database context based on permissions
search_results = await specific_search_by_context(
search_datasets, query_text, query_type, user, system_prompt_path, top_k
)
await log_result(query.id, json.dumps(search_results, cls=JSONEncoder), user.id)
return search_results
async def specific_search_by_context(
search_datasets: list[Dataset],
query_text: str,
query_type: SearchType,
user: User,
system_prompt_path: str,
top_k: int,
):
"""
Searches all provided datasets and handles setting up of appropriate database context based on permissions.
Not to be used outside of active access control mode.
"""
async def _search_by_context(dataset, user, query_type, query_text, system_prompt_path, top_k):
# Set database configuration in async context for each dataset user has access for
await set_database_global_context_variables(dataset.id, dataset.owner_id)
search_results = await specific_search(
query_type, query_text, user, system_prompt_path=system_prompt_path, top_k=top_k
)
return {dataset.name: search_results}
# Search every dataset async based on query and appropriate database configuration
tasks = []
for dataset in search_datasets:
tasks.append(
_search_by_context(dataset, user, query_type, query_text, system_prompt_path, top_k)
)
return await asyncio.gather(*tasks)

View file

@ -20,16 +20,24 @@ class CustomJWTStrategy(JWTStrategy):
user = await get_user(user.id)
if user.tenant:
data = {"user_id": str(user.id), "tenant_id": str(user.tenant.id), "roles": user.roles}
data = {
"user_id": str(user.id),
"tenant_id": str(user.tenant.id),
"roles": [role.name for role in user.roles],
}
else:
# The default tenant is None
data = {"user_id": str(user.id), "tenant_id": None, "roles": user.roles}
data = {
"user_id": str(user.id),
"tenant_id": None,
"roles": [role.name for role in user.roles],
}
return generate_jwt(data, self.encode_key, self.lifetime_seconds, algorithm=self.algorithm)
@lru_cache
def get_auth_backend():
bearer_transport = BearerTransport(tokenUrl="auth/jwt/login")
bearer_transport = BearerTransport(tokenUrl="api/v1/auth/login")
def get_jwt_strategy() -> JWTStrategy[models.UP, models.ID]:
secret = os.getenv("FASTAPI_USERS_JWT_SECRET", "super_secret")

View file

@ -1,7 +1,8 @@
from types import SimpleNamespace
from ..get_fastapi_users import get_fastapi_users
from fastapi import HTTPException, Header
from fastapi import HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import os
import jwt
@ -9,30 +10,33 @@ from uuid import UUID
fastapi_users = get_fastapi_users()
# Allows Swagger to understand authorization type and allow single sign on for the Swagger docs to test backend
bearer_scheme = HTTPBearer(scheme_name="BearerAuth", description="Paste **Bearer <JWT>**")
async def get_authenticated_user(authorization: str = Header(...)) -> SimpleNamespace:
"""Extract and validate JWT from Authorization header."""
async def get_authenticated_user(
creds: HTTPAuthorizationCredentials = Security(bearer_scheme),
) -> SimpleNamespace:
"""
Extract and validate the JWT presented in the Authorization header.
"""
if creds is None: # header missing
raise HTTPException(status_code=401, detail="Not authenticated")
if creds.scheme.lower() != "bearer": # shouldn't happen extra guard
raise HTTPException(status_code=401, detail="Invalid authentication scheme")
token = creds.credentials
try:
scheme, token = authorization.split()
if scheme.lower() != "bearer":
raise HTTPException(status_code=401, detail="Invalid authentication scheme")
payload = jwt.decode(
token, os.getenv("FASTAPI_USERS_JWT_SECRET", "super_secret"), algorithms=["HS256"]
)
if payload["tenant_id"]:
# SimpleNamespace lets us access dictionary elements like attributes
auth_data = SimpleNamespace(
id=UUID(payload["user_id"]),
tenant_id=UUID(payload["tenant_id"]),
roles=payload["roles"],
)
else:
auth_data = SimpleNamespace(
id=UUID(payload["user_id"]), tenant_id=None, roles=payload["roles"]
)
auth_data = SimpleNamespace(
id=UUID(payload["user_id"]),
tenant_id=UUID(payload["tenant_id"]) if payload.get("tenant_id") else None,
roles=payload["roles"],
)
return auth_data
except jwt.ExpiredSignatureError:

View file

@ -1,5 +1,6 @@
from types import SimpleNamespace
from sqlalchemy.orm import selectinload
from sqlalchemy.exc import NoResultFound
from sqlalchemy.future import select
from cognee.modules.users.models import User
from cognee.base_config import get_base_config
@ -33,5 +34,6 @@ async def get_default_user() -> SimpleNamespace:
except Exception as error:
if "principals" in str(error.args):
raise DatabaseNotCreatedError() from error
raise UserNotFoundError(f"Failed to retrieve default user: {default_email}") from error
if isinstance(error, NoResultFound):
raise UserNotFoundError(f"Failed to retrieve default user: {default_email}") from error
raise

View file

@ -15,8 +15,8 @@ class ACL(Base):
principal_id = Column(UUID, ForeignKey("principals.id"))
permission_id = Column(UUID, ForeignKey("permissions.id"))
data_id = Column(UUID, ForeignKey("data.id", ondelete="CASCADE"))
dataset_id = Column(UUID, ForeignKey("datasets.id", ondelete="CASCADE"))
principal = relationship("Principal")
permission = relationship("Permission")
data = relationship("Data", back_populates="acls")
dataset = relationship("Dataset", back_populates="acls")

View file

@ -0,0 +1,19 @@
from datetime import datetime, timezone
from sqlalchemy import Column, DateTime, String, UUID, ForeignKey
from cognee.infrastructure.databases.relational import Base
class DatasetDatabase(Base):
__tablename__ = "dataset_database"
owner_id = Column(UUID, ForeignKey("principals.id", ondelete="CASCADE"), index=True)
dataset_id = Column(
UUID, ForeignKey("datasets.id", ondelete="CASCADE"), primary_key=True, index=True
)
vector_database_name = Column(String, unique=True, nullable=False)
graph_database_name = Column(String, unique=True, nullable=False)
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc))

View file

@ -11,6 +11,8 @@ class Tenant(Principal):
id = Column(UUID, ForeignKey("principals.id"), primary_key=True)
name = Column(String, unique=True, nullable=False, index=True)
owner_id = Column(UUID, index=True)
# One-to-Many relationship with User; specify the join via User.tenant_id
users = relationship(
"User",

View file

@ -1,6 +1,7 @@
from .User import User
from .Role import Role
from .UserRole import UserRole
from .DatasetDatabase import DatasetDatabase
from .RoleDefaultPermissions import RoleDefaultPermissions
from .UserDefaultPermissions import UserDefaultPermissions
from .TenantDefaultPermissions import TenantDefaultPermissions

View file

@ -1,6 +1,13 @@
from .check_permission_on_documents import check_permission_on_documents
from .give_permission_on_document import give_permission_on_document
from .get_role import get_role
from .get_tenant import get_tenant
from .get_principal import get_principal
from .check_permission_on_dataset import check_permission_on_dataset
from .give_permission_on_dataset import give_permission_on_dataset
from .get_document_ids_for_user import get_document_ids_for_user
from .get_principal_datasets import get_principal_datasets
from .get_all_user_permission_datasets import get_all_user_permission_datasets
from .give_default_permission_to_tenant import give_default_permission_to_tenant
from .give_default_permission_to_role import give_default_permission_to_role
from .give_default_permission_to_user import give_default_permission_to_user
from .get_specific_user_permission_datasets import get_specific_user_permission_datasets
from .authorized_give_permission_on_datasets import authorized_give_permission_on_datasets

View file

@ -0,0 +1,18 @@
from typing import Union, List
from cognee.modules.users.permissions.methods import get_principal
from cognee.modules.users.permissions.methods import give_permission_on_dataset
from uuid import UUID
async def authorized_give_permission_on_datasets(
principal_id: UUID, dataset_ids: Union[List[UUID], UUID], permission_name: str, user_id: UUID
):
# TODO: Validate user can give permission to other users for given datasets
# If only a single dataset UUID is provided transform it to a list
if not isinstance(dataset_ids, list):
dataset_ids = [dataset_ids]
principal = await get_principal(principal_id)
for dataset_id in dataset_ids:
await give_permission_on_dataset(principal, dataset_id, permission_name)

View file

@ -13,29 +13,29 @@ from ...models.ACL import ACL
logger = get_logger()
async def check_permission_on_documents(user: User, permission_type: str, document_ids: list[UUID]):
async def check_permission_on_dataset(user: User, permission_type: str, dataset_id: UUID):
if user is None:
user = await get_default_user()
# TODO: Enable user role permissions again. Temporarily disabled during rework.
# # TODO: Enable user role permissions again. Temporarily disabled during rework.
# user_roles_ids = [role.id for role in user.roles]
user_roles_ids = []
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
# If dataset id was returned it means the user has permission to access it
result = await session.execute(
select(ACL)
.join(ACL.permission)
.options(joinedload(ACL.data))
.options(joinedload(ACL.dataset))
.where(ACL.principal_id.in_([user.id, *user_roles_ids]))
.where(ACL.permission.has(name=permission_type))
)
acls = result.unique().scalars().all()
data_ids = [acl.data.id for acl in acls]
has_permissions = all(document_id in data_ids for document_id in document_ids)
has_permission = dataset_id in [acl.dataset.id for acl in acls]
if not has_permissions:
if not has_permission:
raise PermissionDeniedError(
message=f"User {user.id} does not have {permission_type} permission on documents"
)

View file

@ -0,0 +1,33 @@
from cognee.shared.logging_utils import get_logger
from ...models.User import User
from cognee.modules.data.models.Dataset import Dataset
from cognee.modules.users.permissions.methods import get_principal_datasets
from cognee.modules.users.permissions.methods import get_role, get_tenant
logger = get_logger()
async def get_all_user_permission_datasets(user: User, permission_type: str) -> list[Dataset]:
datasets = list()
# Get all datasets User has explicit access to
datasets.extend(await get_principal_datasets(user, permission_type))
if user.tenant_id:
# Get all datasets all tenants have access to
tenant = await get_tenant(user.tenant_id)
datasets.extend(await get_principal_datasets(tenant, permission_type))
# Get all datasets Users roles have access to
for role_name in user.roles:
# TODO: user.roles in pydantic is mapped to Role objects, but in our backend it's used by role name only
# Make user.roles uniform in usage across cognee lib + backend
role = await get_role(user.tenant_id, role_name)
datasets.extend(await get_principal_datasets(role, permission_type))
# Deduplicate datasets with same ID
unique = {}
for dataset in datasets:
# If the dataset id key already exists, leave the dictionary unchanged.
unique.setdefault(dataset.id, dataset)
return list(unique.values())

View file

@ -1,7 +1,9 @@
from uuid import UUID
from cognee.modules.data.methods import get_dataset_data
from sqlalchemy import select
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Dataset, DatasetData, Data
from cognee.modules.data.models import Dataset, DatasetData
from ...models import ACL, Permission
@ -10,10 +12,10 @@ async def get_document_ids_for_user(user_id: UUID, datasets: list[str] = None) -
async with db_engine.get_async_session() as session:
async with session.begin():
document_ids = (
dataset_ids = (
await session.scalars(
select(Data.id)
.join(ACL.data)
select(Dataset.id)
.join(ACL.dataset)
.join(ACL.permission)
.where(
ACL.principal_id == user_id,
@ -22,9 +24,15 @@ async def get_document_ids_for_user(user_id: UUID, datasets: list[str] = None) -
)
).all()
# Get documents from datasets user has read access for
document_ids = []
for dataset_id in dataset_ids:
data_list = await get_dataset_data(dataset_id)
document_ids.extend([data.id for data in data_list])
if datasets:
documents_ids_in_dataset = set()
# If datasets are specified filter out documents that aren't part of the specified datasets
documents_ids_in_dataset = set()
for dataset in datasets:
# Find dataset id for dataset element
dataset_id = (

View file

@ -0,0 +1,14 @@
from sqlalchemy import select
from uuid import UUID
from cognee.infrastructure.databases.relational import get_relational_engine
from ...models.Principal import Principal
async def get_principal(principal_id: UUID):
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
result = await session.execute(select(Principal).where(Principal.id == principal_id))
principal = result.unique().scalar_one()
return principal

View file

@ -0,0 +1,24 @@
from sqlalchemy import select
from sqlalchemy.orm import joinedload
from cognee.infrastructure.databases.relational import get_relational_engine
from ...models.Principal import Principal
from cognee.modules.data.models.Dataset import Dataset
from ...models.ACL import ACL
async def get_principal_datasets(principal: Principal, permission_type: str) -> list[Dataset]:
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
# If dataset id was returned it means the principal has permission to access it
result = await session.execute(
select(ACL)
.join(ACL.permission)
.options(joinedload(ACL.dataset))
.where(ACL.principal_id == principal.id)
.where(ACL.permission.has(name=permission_type))
)
acls = result.unique().scalars().all()
return [acl.dataset for acl in acls]

View file

@ -0,0 +1,17 @@
from sqlalchemy import select
from uuid import UUID
from cognee.infrastructure.databases.relational import get_relational_engine
from ...models.Role import Role
async def get_role(tenant_id: UUID, role_name: str):
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
result = await session.execute(
select(Role).where(Role.name == role_name).where(Role.tenant_id == tenant_id)
)
role = result.unique().scalar_one()
return role

View file

@ -0,0 +1,20 @@
from ...models.User import User
from cognee.modules.data.models.Dataset import Dataset
from cognee.modules.users.permissions.methods import get_all_user_permission_datasets
async def get_specific_user_permission_datasets(
user: User, permission_type: str, datasets: list[str] = None
) -> list[Dataset]:
# Find all datasets user has permission for
user_read_access_datasets = await get_all_user_permission_datasets(user, permission_type)
# if specific datasets are provided filter out non provided datasets
if datasets:
search_datasets = [
dataset for dataset in user_read_access_datasets if dataset.name in datasets
]
else:
search_datasets = user_read_access_datasets
return search_datasets

View file

@ -0,0 +1,14 @@
from sqlalchemy import select
from uuid import UUID
from cognee.infrastructure.databases.relational import get_relational_engine
from ...models.Tenant import Tenant
async def get_tenant(tenant_id: UUID):
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
result = await session.execute(select(Tenant).where(Tenant.id == tenant_id))
tenant = result.unique().scalar_one()
return tenant

View file

@ -0,0 +1,39 @@
from sqlalchemy.future import select
from cognee.infrastructure.databases.relational import get_relational_engine
from ...models import Principal, ACL, Permission
from uuid import UUID
async def give_permission_on_dataset(
principal: Principal,
dataset_id: UUID,
permission_name: str,
):
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
permission = (
(await session.execute(select(Permission).filter(Permission.name == permission_name)))
.scalars()
.first()
)
if permission is None:
permission = Permission(name=permission_name)
existing_acl = None
else:
# Check if the ACL entry already exists to avoid duplicates
existing_acl = await session.execute(
select(ACL).filter(
ACL.principal_id == principal.id,
ACL.dataset_id == dataset_id,
ACL.permission_id == permission.id,
)
)
existing_acl = existing_acl.scalars().first()
# If no existing ACL entry is found, proceed to add a new one
if existing_acl is None:
acl = ACL(principal_id=principal.id, dataset_id=dataset_id, permission=permission)
session.add(acl)
await session.commit()

View file

@ -1,27 +0,0 @@
from sqlalchemy.future import select
from cognee.infrastructure.databases.relational import get_relational_engine
from ...models import User, ACL, Permission
async def give_permission_on_document(
user: User,
document_id: str,
permission_name: str,
):
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
permission = (
(await session.execute(select(Permission).filter(Permission.name == permission_name)))
.scalars()
.first()
)
if permission is None:
permission = Permission(name=permission_name)
acl = ACL(principal_id=user.id, data_id=document_id, permission=permission)
session.add(acl)
await session.commit()

View file

@ -9,6 +9,7 @@ from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.users.exceptions import (
UserNotFoundError,
RoleNotFoundError,
TenantNotFoundError,
)
from cognee.modules.users.models import (
User,
@ -20,6 +21,7 @@ from cognee.modules.users.models import (
async def add_user_to_role(user_id: UUID, role_id: UUID):
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
# TODO: Allow only tenant admin/owner to add users to roles
user = (await session.execute(select(User).where(User.id == user_id))).scalars().first()
role = (await session.execute(select(Role).where(Role.id == role_id))).scalars().first()
@ -27,6 +29,10 @@ async def add_user_to_role(user_id: UUID, role_id: UUID):
raise UserNotFoundError
elif not role:
raise RoleNotFoundError
elif user.tenant_id != role.tenant_id:
raise TenantNotFoundError(
message="User tenant does not match role tenant. User cannot be added to role."
)
try:
# Add association directly to the association table

View file

@ -1 +1,2 @@
from .create_tenant import create_tenant
from .add_user_to_tenant import add_user_to_tenant

View file

@ -0,0 +1,44 @@
from uuid import UUID
from sqlalchemy.exc import IntegrityError
from cognee.infrastructure.databases.exceptions import EntityAlreadyExistsError
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.users.methods import get_user
from cognee.modules.users.permissions.methods import get_tenant
from cognee.modules.users.exceptions import (
UserNotFoundError,
TenantNotFoundError,
PermissionDeniedError,
)
async def add_user_to_tenant(user_id: UUID, tenant_id: UUID, owner_id: UUID):
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
user = await get_user(user_id)
tenant = await get_tenant(tenant_id)
if not user:
raise UserNotFoundError
elif not tenant:
raise TenantNotFoundError
if tenant.owner_id != owner_id:
raise PermissionDeniedError(
message="Only tenant owner can add other users to organization."
)
try:
if user.tenant_id is None:
user.tenant_id = tenant_id
elif user.tenant_id == tenant_id:
return
else:
raise IntegrityError
await session.merge(user)
await session.commit()
except IntegrityError:
raise EntityAlreadyExistsError(
message="User is already part of a tenant. Only one tenant can be assigned to user."
)

View file

@ -1,19 +1,28 @@
from uuid import UUID
from sqlalchemy.exc import IntegrityError
from cognee.infrastructure.databases.exceptions import EntityAlreadyExistsError
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.users.models import Tenant
from cognee.modules.users.methods import get_user
async def create_tenant(tenant_name: str):
async def create_tenant(tenant_name: str, user_id: UUID):
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
try:
# Add association directly to the association table
tenant = Tenant(name=tenant_name)
user = await get_user(user_id)
if user.tenant_id:
raise EntityAlreadyExistsError(
message="User already has a tenant. New tenant cannot be created."
)
tenant = Tenant(name=tenant_name, owner_id=user_id)
session.add(tenant)
await session.flush()
user.tenant_id = tenant.id
await session.merge(user)
await session.commit()
except IntegrityError:
raise EntityAlreadyExistsError(message="Tenant already exists.")
await session.commit()
await session.refresh(tenant)

View file

@ -2,4 +2,4 @@ from .translate_text import translate_text
from .detect_language import detect_language
from .classify_documents import classify_documents
from .extract_chunks_from_documents import extract_chunks_from_documents
from .check_permissions_on_documents import check_permissions_on_documents
from .check_permissions_on_dataset import check_permissions_on_dataset

View file

@ -1,10 +1,10 @@
from cognee.modules.data.processing.document_types import Document
from cognee.modules.users.permissions.methods import check_permission_on_documents
from cognee.modules.users.permissions.methods import check_permission_on_dataset
from typing import List
async def check_permissions_on_documents(
documents: list[Document], user, permissions
async def check_permissions_on_dataset(
documents: List[Document], context: dict, user, permissions
) -> List[Document]:
"""
Validates a user's permissions on a list of documents.
@ -14,13 +14,12 @@ async def check_permissions_on_documents(
- It is designed to validate multiple permissions in a sequential manner for the same set of documents.
- Ensure that the `Document` and `user` objects conform to the expected structure and interfaces.
"""
document_ids = [document.id for document in documents]
for permission in permissions:
await check_permission_on_documents(
await check_permission_on_dataset(
user,
permission,
document_ids,
context["dataset"].id,
)
return documents

View file

@ -9,7 +9,8 @@ from cognee.modules.data.methods import create_dataset, get_dataset_data, get_da
from cognee.modules.users.methods import get_default_user
from cognee.modules.data.models.DatasetData import DatasetData
from cognee.modules.users.models import User
from cognee.modules.users.permissions.methods import give_permission_on_document
from cognee.modules.users.permissions.methods import give_permission_on_dataset
from cognee.modules.data.methods.get_unique_dataset_id import get_unique_dataset_id
from .get_dlt_destination import get_dlt_destination
from .save_data_item_to_storage import save_data_item_to_storage
@ -153,8 +154,9 @@ async def ingest_data(
await session.commit()
await give_permission_on_document(user, data_id, "read")
await give_permission_on_document(user, data_id, "write")
dataset_id = await get_unique_dataset_id(dataset_name=dataset_name, user=user)
await give_permission_on_dataset(user, dataset_id, "read")
await give_permission_on_dataset(user, dataset_id, "write")
return file_paths

View file

@ -0,0 +1,71 @@
import os
import pathlib
import cognee
from cognee.modules.search.operations import get_history
from cognee.modules.users.methods import get_default_user
from cognee.shared.logging_utils import get_logger
from cognee.modules.search.types import SearchType
logger = get_logger()
async def main():
data_directory_path = str(
pathlib.Path(
os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_library")
).resolve()
)
cognee.config.data_root_directory(data_directory_path)
cognee_directory_path = str(
pathlib.Path(
os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_library")
).resolve()
)
cognee.config.system_root_directory(cognee_directory_path)
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
await cognee.add(["TEST1"], "test1")
await cognee.add(["TEST2"], "test2")
task_1_config = {
"vector_db_url": "cognee1.test",
"vector_db_key": "",
"vector_db_provider": "lancedb",
}
task_2_config = {
"vector_db_url": "cognee2.test",
"vector_db_key": "",
"vector_db_provider": "lancedb",
}
task_1_graph_config = {
"graph_database_provider": "kuzu",
"graph_file_path": "kuzu1.db",
}
task_2_graph_config = {
"graph_database_provider": "kuzu",
"graph_file_path": "kuzu2.db",
}
# schedule both cognify calls concurrently
task1 = asyncio.create_task(
cognee.cognify(
["test1"], vector_db_config=task_1_config, graph_db_config=task_1_graph_config
)
)
task2 = asyncio.create_task(
cognee.cognify(
["test2"], vector_db_config=task_2_config, graph_db_config=task_2_graph_config
)
)
# wait until both are done (raises first error if any)
await asyncio.gather(task1, task2)
if __name__ == "__main__":
import asyncio
asyncio.run(main(), debug=True)

78
poetry.lock generated
View file

@ -4457,50 +4457,50 @@ files = [
[[package]]
name = "kuzu"
version = "0.8.2"
version = "0.9.0"
description = "Highly scalable, extremely fast, easy-to-use embeddable graph database"
optional = true
python-versions = "*"
groups = ["main"]
markers = "extra == \"kuzu\""
files = [
{file = "kuzu-0.8.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:78bcdf6cc7b130bce8b307709e8d7bddd2e9104b2b696a9dc52574556e754570"},
{file = "kuzu-0.8.2-cp310-cp310-macosx_11_0_x86_64.whl", hash = "sha256:b42e3e9b1eacf830700287b05e96f9455b89dd4140085053e6c86b32c61e8d5c"},
{file = "kuzu-0.8.2-cp310-cp310-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:cf06c602dc0231268d9cfa56a62afef15f8fca3be1ccd2cad22047a14bff4ae0"},
{file = "kuzu-0.8.2-cp310-cp310-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:50a873e7cd0c2e8e3093e9af14cffb14e49f1f67eceb32df3d0454ce101402d3"},
{file = "kuzu-0.8.2-cp310-cp310-win_amd64.whl", hash = "sha256:4d36261444d31432606f3f3ed00624f1a3a8edcf7d830564c72b76ffbdf4d318"},
{file = "kuzu-0.8.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:6c1694c6d1b19c46ad5d416cac429ccf1fe91aca4d367664e3aa0afa59800f93"},
{file = "kuzu-0.8.2-cp311-cp311-macosx_11_0_x86_64.whl", hash = "sha256:00156c64523a1377ffced998bdb031709336f90543da69544c0ab4b40d533692"},
{file = "kuzu-0.8.2-cp311-cp311-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:bc75f26afe8815b046cfb0d931303da6c36ce3afb49d4ae18a3899f23e62020f"},
{file = "kuzu-0.8.2-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:5f0de6910724a74cc492354e903cf76db78b6353eef1e2edfa0b79d600c3c572"},
{file = "kuzu-0.8.2-cp311-cp311-win_amd64.whl", hash = "sha256:56e99c39a725943aa7ad96ada8f29706da3d53cc98385f2c663b8ea026f0dce3"},
{file = "kuzu-0.8.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:adcc250b34963a6eea62b59d47a091018d83e61fb2e95552795ab61f103052be"},
{file = "kuzu-0.8.2-cp312-cp312-macosx_11_0_x86_64.whl", hash = "sha256:f72036924466143675980baed02a26c0fca15b6254c11de9a9c18d28fe66247e"},
{file = "kuzu-0.8.2-cp312-cp312-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:a2fd7895fdfd9df880091d32bfb79c148f849659c67e2b9e185f952a6bde9139"},
{file = "kuzu-0.8.2-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:68486e291aa8a61264be7e31233ec34eeb6da2402f4b980c3f2b67f9ccbbea3a"},
{file = "kuzu-0.8.2-cp312-cp312-win_amd64.whl", hash = "sha256:7cce7d06e6f09cd488c62be7cafe78752b037ed9e6585ed3da9df029104b1987"},
{file = "kuzu-0.8.2-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:aa0495f856f2e5f5067e281dab3fbc170aba0721d1f56156a8cd9fa50e706f91"},
{file = "kuzu-0.8.2-cp313-cp313-macosx_11_0_x86_64.whl", hash = "sha256:823577b472ba63c3b36e5ff81e2b744736f9eaf0b71585c247f3defc9d268f53"},
{file = "kuzu-0.8.2-cp313-cp313-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:bde76f38d293f49ad283a4831bd32d41f185b93a75d388d67f9b8996678203e9"},
{file = "kuzu-0.8.2-cp313-cp313-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:cdb189012613ecd26630096796e3817c260deea85782e764309cd36b2c39dac5"},
{file = "kuzu-0.8.2-cp313-cp313-win_amd64.whl", hash = "sha256:71fb98721f9c46f960a5c3baea6b083026485c4b9a3e74ab01418243e29e3753"},
{file = "kuzu-0.8.2-cp313-cp313t-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:8e12726af2cb552ab7b60e2b4312469359bb3b4b45ddbcfb75220def4be6f566"},
{file = "kuzu-0.8.2-cp313-cp313t-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:055f2cd9741bf39161f9ccff80428f8fb80b1910b2450b05bbe848487ba694f5"},
{file = "kuzu-0.8.2-cp37-cp37m-macosx_11_0_x86_64.whl", hash = "sha256:18cb3da3a650f8dfde3639fbd6319a5ad6f98f60689c5dd96d20d8d1fc184d4c"},
{file = "kuzu-0.8.2-cp37-cp37m-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:e55a8fddc21ac3e27b3cf2815d93264dd3c89e9ad8c7f3960d51bdfe48a02709"},
{file = "kuzu-0.8.2-cp37-cp37m-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:4d93600aceacdd7903aa39f016cb641811f96e4825b027a135aaaa1d82e23d24"},
{file = "kuzu-0.8.2-cp37-cp37m-win_amd64.whl", hash = "sha256:68601d9e741c7815c3d3f46a9c6884853388bcc6920945f069d5dc4f9492c9c5"},
{file = "kuzu-0.8.2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:32d7ff56d793df27f76129b8b15bd85c940e59bcb67acd189b6a5ed1af5e8b44"},
{file = "kuzu-0.8.2-cp38-cp38-macosx_11_0_x86_64.whl", hash = "sha256:5e639f24be2fca78bf3890774f273aa1a6b149bfdbeb5c7e966e03b8f610be98"},
{file = "kuzu-0.8.2-cp38-cp38-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:1caf46e2721dabed94b65cdcf3990551af2f3913c3f2dcd39f3e5397f0134243"},
{file = "kuzu-0.8.2-cp38-cp38-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:a5333c9e4557ccbfef7b822793ec382848411c8d11fdee063064b41bd1828404"},
{file = "kuzu-0.8.2-cp38-cp38-win_amd64.whl", hash = "sha256:765a8bd4c5b9d24583eb8aaa20ecd753d78220138a82bf643ec592ffb8128298"},
{file = "kuzu-0.8.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:3a215ff235d17a41c50d1cf2bd8e67a196eff32f23e59d989b1a40e6192f2008"},
{file = "kuzu-0.8.2-cp39-cp39-macosx_11_0_x86_64.whl", hash = "sha256:074b5440186e4214b653d46f8d5a15d4b4cae1185d4656eaf598fe9b840fcdca"},
{file = "kuzu-0.8.2-cp39-cp39-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:32303a9533674a35e52d429f1446a82e2fc97c423618bc86aaafef1d4d2621e4"},
{file = "kuzu-0.8.2-cp39-cp39-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:0baea115bc55c8ed710f2beae8f02e46cf2bac42326b4e2c3acd25a76031f59d"},
{file = "kuzu-0.8.2-cp39-cp39-win_amd64.whl", hash = "sha256:70e031131c5b8e327edd63993b05fb04196b74d0ade1baf0f4005968610310ed"},
{file = "kuzu-0.8.2.tar.gz", hash = "sha256:68ad72b3ef6a32a41ecfa955fa4ca9ca0c8a36d3a1bc13e34cc70c971b2b8ca7"},
{file = "kuzu-0.9.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:ec9f216d67c092ea52086c99cf4b1deabe0f8daaf47c80cf1892b3b41c57d58a"},
{file = "kuzu-0.9.0-cp310-cp310-macosx_11_0_x86_64.whl", hash = "sha256:bda6d845bf1c7da204ffa7730573118f2d43fe6b14b1a5d0d2845ec3d3481362"},
{file = "kuzu-0.9.0-cp310-cp310-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ab5b28f101c93899fc15668b6cb25f6db3d4a9844fcc4affed293caaaafaa4b7"},
{file = "kuzu-0.9.0-cp310-cp310-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:183bb1de19ffec1c3b07c0b4d5eecf02eb4eeafc1d50aea409bc91e1fad4d6d2"},
{file = "kuzu-0.9.0-cp310-cp310-win_amd64.whl", hash = "sha256:2e36ce7da1bbebb538082656de18a717895d9352a33c8bcac170ef2fc22a4902"},
{file = "kuzu-0.9.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:82dd690d823df816e7826945e5243a4ae65e3e948ef512709a59205b84b9f6dd"},
{file = "kuzu-0.9.0-cp311-cp311-macosx_11_0_x86_64.whl", hash = "sha256:c394e019a14e9c5636228cf1acd333997c31e5da3d9a60a1df2c03b828438432"},
{file = "kuzu-0.9.0-cp311-cp311-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:f7d493f88ed31eada4b88a92b115bc6085c60498c47336ab06a489e75a727bab"},
{file = "kuzu-0.9.0-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:171b47cf2b3923c813f1ed88fb9d3964a9355129b5d3ebca54eba3450bfc1f97"},
{file = "kuzu-0.9.0-cp311-cp311-win_amd64.whl", hash = "sha256:3c8a8a611f599801c8db6aeffb978cd1badcfa3ec8f79c15b701810fee71765f"},
{file = "kuzu-0.9.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:509af4029f9dcb9c3e843a825df44ec30009a70fad891cbcfb611c3b8cdfefd6"},
{file = "kuzu-0.9.0-cp312-cp312-macosx_11_0_x86_64.whl", hash = "sha256:885f17f6e46c15ecef121fc57a941f8b60f0a5c1d3995813bb7a4c7437fb2259"},
{file = "kuzu-0.9.0-cp312-cp312-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:94f2e35aa345b543a4a21de0e82b70eac4c753987cfa4ded75ae7f9f23edbf11"},
{file = "kuzu-0.9.0-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:67430c9813607a3b901c4a1e6bfb3b93538af230bc821e675c552a162818f589"},
{file = "kuzu-0.9.0-cp312-cp312-win_amd64.whl", hash = "sha256:549f4a72f815554fb998582876c5875cb0917a192e6a58d196e8247fd8902701"},
{file = "kuzu-0.9.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:ec2e709599b4015d0a179a191dd7850e7bf076f83b37b70d0dc2e4ee59ce7725"},
{file = "kuzu-0.9.0-cp313-cp313-macosx_11_0_x86_64.whl", hash = "sha256:8aad4fbd74b283ffb0b115138dfc62d9775c8f19ba62ab243e55e3cd648652b6"},
{file = "kuzu-0.9.0-cp313-cp313-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ba9dd4f412e31d34345b6461fc9489955ae9566abf426e56af478b6e791b735a"},
{file = "kuzu-0.9.0-cp313-cp313-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:340502cbce54f21a5b2440a75c28d61ddfd26d6d6848e9daa6140798bdd5b367"},
{file = "kuzu-0.9.0-cp313-cp313-win_amd64.whl", hash = "sha256:e1ddb189dfa2aee0123dcd1a5ccc5b831a7f297233a09fccfd76294fc2f9e6bd"},
{file = "kuzu-0.9.0-cp313-cp313t-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:1fae68db87ba48268228c89e70ed1fde2f43843d8ed6b2debaafd314c45e8542"},
{file = "kuzu-0.9.0-cp313-cp313t-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:a0279ba37c639d96f303eb6ad4481e634495be31210991d8008c385ee50b4e0a"},
{file = "kuzu-0.9.0-cp37-cp37m-macosx_11_0_x86_64.whl", hash = "sha256:3ca7424fe3831df687552b89903aa57fb88efff9c25df15c5d678fae7c933199"},
{file = "kuzu-0.9.0-cp37-cp37m-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:bce9284913434661f47cecfc763f8997a61ebd2bb7bfe993970c1403924708fa"},
{file = "kuzu-0.9.0-cp37-cp37m-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:66040cdf9a59a5423b49c3d2bc01a089114b573ee1345d5a7c912276fbca0135"},
{file = "kuzu-0.9.0-cp37-cp37m-win_amd64.whl", hash = "sha256:8e195774364123845df071eddb18873ce8c78244dd6f854badfe65053b058088"},
{file = "kuzu-0.9.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:2906f29ee36f9f642bdb8f5222c94f667092e38bde7dc53ebb252f9eb524ab6a"},
{file = "kuzu-0.9.0-cp38-cp38-macosx_11_0_x86_64.whl", hash = "sha256:4c3218e266766080fe1b31325d0156d1b334f62ae23dac854c3e4919115ef8c6"},
{file = "kuzu-0.9.0-cp38-cp38-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:a26214c1600c21f5e4aa96585706953a8792ad77e14788710d78f8af0d6b74ec"},
{file = "kuzu-0.9.0-cp38-cp38-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1b153fb28db9336757346eabb24b8c179b4ed48578a0ef158210fbc935df2184"},
{file = "kuzu-0.9.0-cp38-cp38-win_amd64.whl", hash = "sha256:b6ee075e2571b11a434efb004cb0b3a2fbd7aa416ae680816869f1388e5fc734"},
{file = "kuzu-0.9.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:56874ae750ff99b15c959d884b175adf24ac912ab08e084c42784902b2bce2fb"},
{file = "kuzu-0.9.0-cp39-cp39-macosx_11_0_x86_64.whl", hash = "sha256:6e0265b1ad445500397dc0df3cc4e7faddfd67fcd3d0952d9a4cdab6b77b47e9"},
{file = "kuzu-0.9.0-cp39-cp39-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:d66e69a3e135ea123cc7c9c2e507bbb614ffdbfe7be835782c6a588ae63ff900"},
{file = "kuzu-0.9.0-cp39-cp39-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:2e11c8b7186798ad95563e1d7ebf84495d817c406bd28c21af7170467e37e35e"},
{file = "kuzu-0.9.0-cp39-cp39-win_amd64.whl", hash = "sha256:4fb80eb6c71b02c4e57e3570b079c494082f7ff819d4c06ac482914f29211294"},
{file = "kuzu-0.9.0.tar.gz", hash = "sha256:2e59f3d4d1fc385e9e90d7ae09f072ec2f4cfeff508582523a0034ceb076f6eb"},
]
[[package]]
@ -11988,4 +11988,4 @@ weaviate = ["weaviate-client"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.10,<=3.13"
content-hash = "15b319ff8dbe5bd88e41ead93f4e9140b2b7d86d57a707682dd3a308e78ef245"
content-hash = "7a2b3d1aabca5f1994c435a162f7ab44b3528cd7b9ec366809d9c014adf7e335"

View file

@ -87,7 +87,7 @@ anthropic = ["anthropic>=0.26.1,<0.27"]
deepeval = ["deepeval>=2.0.1,<3"]
posthog = ["posthog>=3.5.0,<4"]
falkordb = ["falkordb==1.0.9"]
kuzu = ["kuzu==0.8.2"]
kuzu = ["kuzu==0.9.0"]
groq = ["groq==0.8.0"]
milvus = ["pymilvus>=2.5.0,<3"]
chromadb = [

7428
uv.lock generated

File diff suppressed because it is too large Load diff