diff --git a/cognee/api/v1/cognify/cognify_v2.py b/cognee/api/v1/cognify/cognify_v2.py index e10680f19..20e2624cb 100644 --- a/cognee/api/v1/cognify/cognify_v2.py +++ b/cognee/api/v1/cognify/cognify_v2.py @@ -1,8 +1,14 @@ import asyncio +import hashlib import logging +import uuid from typing import Union +from sqlalchemy.ext.asyncio import AsyncSession + from cognee.infrastructure.databases.graph import get_graph_config +from cognee.infrastructure.databases.relational.user_authentication.authentication_db import async_session_maker +from cognee.infrastructure.databases.relational.user_authentication.users import get_user_permissions, fastapi_users from cognee.modules.cognify.config import get_cognify_config from cognee.infrastructure.databases.relational.config import get_relationaldb_config from cognee.modules.data.processing.document_types.AudioDocument import AudioDocument @@ -25,7 +31,21 @@ logger = logging.getLogger("cognify.v2") update_status_lock = asyncio.Lock() -async def cognify(datasets: Union[str, list[str]] = None, root_node_id: str = None): +class PermissionDeniedException(Exception): + def __init__(self, message: str): + self.message = message + super().__init__(self.message) + +async def cognify(datasets: Union[str, list[str]] = None, root_node_id: str = None, user_id:str="default_user"): + session: AsyncSession = async_session_maker() + user = await fastapi_users.get_user_manager.get(user_id) + user_permissions = await get_user_permissions(user, session) + hash_object = hashlib.sha256(user.encode()) + hashed_user_id = hash_object.hexdigest() + required_permission = "write" + if required_permission not in user_permissions: + raise PermissionDeniedException("Not enough permissions") + relational_config = get_relationaldb_config() db_engine = relational_config.database_engine create_task_status_table() @@ -60,7 +80,7 @@ async def cognify(datasets: Union[str, list[str]] = None, root_node_id: str = No root_node_id = "ROOT" tasks = [ - Task(process_documents, parent_node_id = root_node_id, task_config = { "batch_size": 10 }), # Classify documents and save them as a nodes in graph db, extract text chunks based on the document type + Task(process_documents, parent_node_id = root_node_id, task_config = { "batch_size": 10 }, user_id = hashed_user_id, user_permissions=user_permissions), # Classify documents and save them as a nodes in graph db, extract text chunks based on the document type Task(establish_graph_topology, topology_model = KnowledgeGraph), # Set the graph topology for the document chunk data Task(expand_knowledge_graph, graph_model = KnowledgeGraph), # Generate knowledge graphs from the document chunks and attach it to chunk nodes Task(filter_affected_chunks, collection_name = "chunks"), # Find all affected chunks, so we don't process unchanged chunks diff --git a/cognee/infrastructure/databases/relational/user_authentication/authentication_db.py b/cognee/infrastructure/databases/relational/user_authentication/authentication_db.py index d517453c6..ae4405a9b 100644 --- a/cognee/infrastructure/databases/relational/user_authentication/authentication_db.py +++ b/cognee/infrastructure/databases/relational/user_authentication/authentication_db.py @@ -31,7 +31,7 @@ class Group(Base): __tablename__ = 'groups' id = Column(UUID, primary_key=True, index=True) name = Column(String, unique=True, index=True) - users = relationship('User', secondary=user_group, back_populates='groups') + users = relationship('users', secondary=user_group, back_populates='groups') permissions = relationship('Permission', secondary=group_permission, back_populates='groups') class Permission(Base): diff --git a/cognee/modules/data/processing/process_documents.py b/cognee/modules/data/processing/process_documents.py index 1eb3bbd3e..8df8067b6 100644 --- a/cognee/modules/data/processing/process_documents.py +++ b/cognee/modules/data/processing/process_documents.py @@ -1,7 +1,7 @@ from cognee.infrastructure.databases.graph import get_graph_engine from .document_types import Document -async def process_documents(documents: list[Document], parent_node_id: str = None): +async def process_documents(documents: list[Document], parent_node_id: str = None, user:str=None, user_permissions:str=None): graph_engine = await get_graph_engine() nodes = [] @@ -16,6 +16,9 @@ async def process_documents(documents: list[Document], parent_node_id: str = Non document_node = document_nodes[document_index] if document_index in document_nodes else None if document_node is None: + document_dict = document.to_dict() + document_dict["user"] = user + document_dict["user_permissions"] = user_permissions nodes.append((str(document.id), document.to_dict())) if parent_node_id: