diff --git a/.github/workflows/e2e_tests.yml b/.github/workflows/e2e_tests.yml index 5f5828da8..badf88e71 100644 --- a/.github/workflows/e2e_tests.yml +++ b/.github/workflows/e2e_tests.yml @@ -288,7 +288,7 @@ jobs: EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }} EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }} EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }} - run: uv run python ./cognee/tests/test_permissions.py + run: uv run pytest cognee/tests/test_permissions.py -v --log-level=INFO test-multi-tenancy: name: Test multi tenancy with different situations in Cognee diff --git a/alembic/versions/e1ec1dcb50b6_add_last_accessed_to_data.py b/alembic/versions/e1ec1dcb50b6_add_last_accessed_to_data.py index f1a36ae59..80e072678 100644 --- a/alembic/versions/e1ec1dcb50b6_add_last_accessed_to_data.py +++ b/alembic/versions/e1ec1dcb50b6_add_last_accessed_to_data.py @@ -1,52 +1,51 @@ -"""add_last_accessed_to_data - -Revision ID: e1ec1dcb50b6 -Revises: 211ab850ef3d -Create Date: 2025-11-04 21:45:52.642322 - -""" -import os -from typing import Sequence, Union - -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision: str = 'e1ec1dcb50b6' -down_revision: Union[str, None] = '211ab850ef3d' -branch_labels: Union[str, Sequence[str], None] = None -depends_on: Union[str, Sequence[str], None] = None - - -def _get_column(inspector, table, name, schema=None): - for col in inspector.get_columns(table, schema=schema): - if col["name"] == name: - return col - return None - - -def upgrade() -> None: - conn = op.get_bind() - insp = sa.inspect(conn) - - last_accessed_column = _get_column(insp, "data", "last_accessed") - if not last_accessed_column: - # Always create the column for schema consistency - op.add_column('data', - sa.Column('last_accessed', sa.DateTime(timezone=True), nullable=True) - ) - - # Only initialize existing records if feature is enabled - enable_last_accessed = os.getenv("ENABLE_LAST_ACCESSED", "false").lower() == "true" - if enable_last_accessed: - op.execute("UPDATE data SET last_accessed = CURRENT_TIMESTAMP") - - -def downgrade() -> None: - conn = op.get_bind() - insp = sa.inspect(conn) - - last_accessed_column = _get_column(insp, "data", "last_accessed") - if last_accessed_column: - op.drop_column('data', 'last_accessed') +"""add_last_accessed_to_data + +Revision ID: e1ec1dcb50b6 +Revises: 211ab850ef3d +Create Date: 2025-11-04 21:45:52.642322 + +""" + +import os +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "e1ec1dcb50b6" +down_revision: Union[str, None] = "a1b2c3d4e5f6" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def _get_column(inspector, table, name, schema=None): + for col in inspector.get_columns(table, schema=schema): + if col["name"] == name: + return col + return None + + +def upgrade() -> None: + conn = op.get_bind() + insp = sa.inspect(conn) + + last_accessed_column = _get_column(insp, "data", "last_accessed") + if not last_accessed_column: + # Always create the column for schema consistency + op.add_column("data", sa.Column("last_accessed", sa.DateTime(timezone=True), nullable=True)) + + # Only initialize existing records if feature is enabled + enable_last_accessed = os.getenv("ENABLE_LAST_ACCESSED", "false").lower() == "true" + if enable_last_accessed: + op.execute("UPDATE data SET last_accessed = CURRENT_TIMESTAMP") + + +def downgrade() -> None: + conn = op.get_bind() + insp = sa.inspect(conn) + + last_accessed_column = _get_column(insp, "data", "last_accessed") + if last_accessed_column: + op.drop_column("data", "last_accessed") diff --git a/cognee/modules/engine/models/Entity.py b/cognee/modules/engine/models/Entity.py index a34a6503c..a793ca73b 100644 --- a/cognee/modules/engine/models/Entity.py +++ b/cognee/modules/engine/models/Entity.py @@ -2,6 +2,7 @@ from cognee.infrastructure.engine import DataPoint from cognee.modules.engine.models.EntityType import EntityType from typing import Optional + class Entity(DataPoint): name: str is_a: Optional[EntityType] = None diff --git a/cognee/modules/graph/utils/get_entity_nodes_from_triplets.py b/cognee/modules/graph/utils/get_entity_nodes_from_triplets.py index 598a36854..6c661e53a 100644 --- a/cognee/modules/graph/utils/get_entity_nodes_from_triplets.py +++ b/cognee/modules/graph/utils/get_entity_nodes_from_triplets.py @@ -1,13 +1,12 @@ - def get_entity_nodes_from_triplets(triplets): - entity_nodes = [] - seen_ids = set() - for triplet in triplets: - if hasattr(triplet, 'node1') and triplet.node1 and triplet.node1.id not in seen_ids: - entity_nodes.append({"id": str(triplet.node1.id)}) - seen_ids.add(triplet.node1.id) - if hasattr(triplet, 'node2') and triplet.node2 and triplet.node2.id not in seen_ids: - entity_nodes.append({"id": str(triplet.node2.id)}) - seen_ids.add(triplet.node2.id) + entity_nodes = [] + seen_ids = set() + for triplet in triplets: + if hasattr(triplet, "node1") and triplet.node1 and triplet.node1.id not in seen_ids: + entity_nodes.append({"id": str(triplet.node1.id)}) + seen_ids.add(triplet.node1.id) + if hasattr(triplet, "node2") and triplet.node2 and triplet.node2.id not in seen_ids: + entity_nodes.append({"id": str(triplet.node2.id)}) + seen_ids.add(triplet.node2.id) - return entity_nodes + return entity_nodes diff --git a/cognee/modules/retrieval/chunks_retriever.py b/cognee/modules/retrieval/chunks_retriever.py index b7a90238a..ce9b8233b 100644 --- a/cognee/modules/retrieval/chunks_retriever.py +++ b/cognee/modules/retrieval/chunks_retriever.py @@ -5,7 +5,7 @@ from cognee.infrastructure.databases.vector import get_vector_engine from cognee.modules.retrieval.base_retriever import BaseRetriever from cognee.modules.retrieval.exceptions.exceptions import NoDataError from cognee.infrastructure.databases.vector.exceptions.exceptions import CollectionNotFoundError -from datetime import datetime, timezone +from datetime import datetime, timezone logger = get_logger("ChunksRetriever") @@ -28,7 +28,7 @@ class ChunksRetriever(BaseRetriever): ): self.top_k = top_k - async def get_context(self, query: str) -> Any: + async def get_context(self, query: str) -> Any: """ Retrieves document chunks context based on the query. Searches for document chunks relevant to the specified query using a vector engine. diff --git a/cognee/modules/retrieval/graph_completion_retriever.py b/cognee/modules/retrieval/graph_completion_retriever.py index 317d7cd9a..bb8b34327 100644 --- a/cognee/modules/retrieval/graph_completion_retriever.py +++ b/cognee/modules/retrieval/graph_completion_retriever.py @@ -148,8 +148,8 @@ class GraphCompletionRetriever(BaseGraphRetriever): # context = await self.resolve_edges_to_text(triplets) entity_nodes = get_entity_nodes_from_triplets(triplets) - - await update_node_access_timestamps(entity_nodes) + + await update_node_access_timestamps(entity_nodes) return triplets async def convert_retrieved_objects_to_context(self, triplets: List[Edge]): diff --git a/cognee/modules/retrieval/summaries_retriever.py b/cognee/modules/retrieval/summaries_retriever.py index 0df750d22..13972bb8d 100644 --- a/cognee/modules/retrieval/summaries_retriever.py +++ b/cognee/modules/retrieval/summaries_retriever.py @@ -55,9 +55,9 @@ class SummariesRetriever(BaseRetriever): "TextSummary_text", query, limit=self.top_k ) logger.info(f"Found {len(summaries_results)} summaries from vector search") - + await update_node_access_timestamps(summaries_results) - + except CollectionNotFoundError as error: logger.error("TextSummary_text collection not found in vector database") raise NoDataError("No data found in the system, please add data first.") from error diff --git a/cognee/modules/retrieval/utils/access_tracking.py b/cognee/modules/retrieval/utils/access_tracking.py index 54fd043b9..e93606c1b 100644 --- a/cognee/modules/retrieval/utils/access_tracking.py +++ b/cognee/modules/retrieval/utils/access_tracking.py @@ -1,82 +1,88 @@ -"""Utilities for tracking data access in retrievers.""" - -import json -from datetime import datetime, timezone -from typing import List, Any -from uuid import UUID -import os -from cognee.infrastructure.databases.graph import get_graph_engine -from cognee.infrastructure.databases.relational import get_relational_engine -from cognee.modules.data.models import Data -from cognee.shared.logging_utils import get_logger -from sqlalchemy import update -from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph - -logger = get_logger(__name__) - - -async def update_node_access_timestamps(items: List[Any]): - if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true": - return - - if not items: - return - - graph_engine = await get_graph_engine() - timestamp_dt = datetime.now(timezone.utc) - - # Extract node IDs - node_ids = [] - for item in items: - item_id = item.payload.get("id") if hasattr(item, 'payload') else item.get("id") - if item_id: - node_ids.append(str(item_id)) - - if not node_ids: - return - - # Focus on document-level tracking via projection - try: - doc_ids = await _find_origin_documents_via_projection(graph_engine, node_ids) - if doc_ids: - await _update_sql_records(doc_ids, timestamp_dt) - except Exception as e: - logger.error(f"Failed to update SQL timestamps: {e}") - raise - - -async def _find_origin_documents_via_projection(graph_engine, node_ids): - """Find origin documents using graph projection instead of DB queries""" - # Project the entire graph with necessary properties - memory_fragment = CogneeGraph() - await memory_fragment.project_graph_from_db( - graph_engine, - node_properties_to_project=["id", "type"], - edge_properties_to_project=["relationship_name"] - ) - - # Find origin documents by traversing the in-memory graph - doc_ids = set() - for node_id in node_ids: - node = memory_fragment.get_node(node_id) - if node and node.get_attribute("type") == "DocumentChunk": - # Traverse edges to find connected documents - for edge in node.get_skeleton_edges(): - # Get the neighbor node - neighbor = edge.get_destination_node() if edge.get_source_node().id == node_id else edge.get_source_node() - if neighbor and neighbor.get_attribute("type") in ["TextDocument", "Document"]: - doc_ids.add(neighbor.id) - - return list(doc_ids) - - -async def _update_sql_records(doc_ids, timestamp_dt): - """Update SQL Data table (same for all providers)""" - db_engine = get_relational_engine() - async with db_engine.get_async_session() as session: - stmt = update(Data).where( - Data.id.in_([UUID(doc_id) for doc_id in doc_ids]) - ).values(last_accessed=timestamp_dt) - - await session.execute(stmt) +"""Utilities for tracking data access in retrievers.""" + +import json +from datetime import datetime, timezone +from typing import List, Any +from uuid import UUID +import os +from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.infrastructure.databases.relational import get_relational_engine +from cognee.modules.data.models import Data +from cognee.shared.logging_utils import get_logger +from sqlalchemy import update +from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph + +logger = get_logger(__name__) + + +async def update_node_access_timestamps(items: List[Any]): + if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true": + return + + if not items: + return + + graph_engine = await get_graph_engine() + timestamp_dt = datetime.now(timezone.utc) + + # Extract node IDs + node_ids = [] + for item in items: + item_id = item.payload.get("id") if hasattr(item, "payload") else item.get("id") + if item_id: + node_ids.append(str(item_id)) + + if not node_ids: + return + + # Focus on document-level tracking via projection + try: + doc_ids = await _find_origin_documents_via_projection(graph_engine, node_ids) + if doc_ids: + await _update_sql_records(doc_ids, timestamp_dt) + except Exception as e: + logger.error(f"Failed to update SQL timestamps: {e}") + raise + + +async def _find_origin_documents_via_projection(graph_engine, node_ids): + """Find origin documents using graph projection instead of DB queries""" + # Project the entire graph with necessary properties + memory_fragment = CogneeGraph() + await memory_fragment.project_graph_from_db( + graph_engine, + node_properties_to_project=["id", "type"], + edge_properties_to_project=["relationship_name"], + ) + + # Find origin documents by traversing the in-memory graph + doc_ids = set() + for node_id in node_ids: + node = memory_fragment.get_node(node_id) + if node and node.get_attribute("type") == "DocumentChunk": + # Traverse edges to find connected documents + for edge in node.get_skeleton_edges(): + # Get the neighbor node + neighbor = ( + edge.get_destination_node() + if edge.get_source_node().id == node_id + else edge.get_source_node() + ) + if neighbor and neighbor.get_attribute("type") in ["TextDocument", "Document"]: + doc_ids.add(neighbor.id) + + return list(doc_ids) + + +async def _update_sql_records(doc_ids, timestamp_dt): + """Update SQL Data table (same for all providers)""" + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + stmt = ( + update(Data) + .where(Data.id.in_([UUID(doc_id) for doc_id in doc_ids])) + .values(last_accessed=timestamp_dt) + ) + + await session.execute(stmt) await session.commit() diff --git a/cognee/tasks/cleanup/cleanup_unused_data.py b/cognee/tasks/cleanup/cleanup_unused_data.py index 34cde1b6f..c8fa5d05c 100644 --- a/cognee/tasks/cleanup/cleanup_unused_data.py +++ b/cognee/tasks/cleanup/cleanup_unused_data.py @@ -1,187 +1,172 @@ -""" -Task for automatically deleting unused data from the memify pipeline. - -This task identifies and removes entire documents that haven't -been accessed by retrievers for a specified period, helping maintain system -efficiency and storage optimization through whole-document removal. -""" - -import json -from datetime import datetime, timezone, timedelta -from typing import Optional, Dict, Any -from uuid import UUID -import os -from cognee.infrastructure.databases.graph import get_graph_engine -from cognee.infrastructure.databases.vector import get_vector_engine -from cognee.infrastructure.databases.relational import get_relational_engine -from cognee.modules.data.models import Data, DatasetData -from cognee.shared.logging_utils import get_logger -from sqlalchemy import select, or_ -import cognee -import sqlalchemy as sa -from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph - -logger = get_logger(__name__) - - -async def cleanup_unused_data( - minutes_threshold: Optional[int], - dry_run: bool = True, - user_id: Optional[UUID] = None -) -> Dict[str, Any]: - """ - Identify and remove unused data from the memify pipeline. - - Parameters - ---------- - minutes_threshold : int - Minutes since last access to consider data unused - dry_run : bool - If True, only report what would be deleted without actually deleting (default: True) - user_id : UUID, optional - Limit cleanup to specific user's data (default: None) - - Returns - ------- - Dict[str, Any] - Cleanup results with status, counts, and timestamp - """ - # Check 1: Environment variable must be enabled - if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true": - logger.warning( - "Cleanup skipped: ENABLE_LAST_ACCESSED is not enabled." - ) - return { - "status": "skipped", - "reason": "ENABLE_LAST_ACCESSED not enabled", - "unused_count": 0, - "deleted_count": {}, - "cleanup_date": datetime.now(timezone.utc).isoformat() - } - - # Check 2: Verify tracking has actually been running - db_engine = get_relational_engine() - async with db_engine.get_async_session() as session: - # Count records with non-NULL last_accessed - tracked_count = await session.execute( - select(sa.func.count(Data.id)).where(Data.last_accessed.isnot(None)) - ) - tracked_records = tracked_count.scalar() - - if tracked_records == 0: - logger.warning( - "Cleanup skipped: No records have been tracked yet. " - "ENABLE_LAST_ACCESSED may have been recently enabled. " - "Wait for retrievers to update timestamps before running cleanup." - ) - return { - "status": "skipped", - "reason": "No tracked records found - tracking may be newly enabled", - "unused_count": 0, - "deleted_count": {}, - "cleanup_date": datetime.now(timezone.utc).isoformat() - } - - logger.info( - "Starting cleanup task", - minutes_threshold=minutes_threshold, - dry_run=dry_run, - user_id=str(user_id) if user_id else None - ) - - # Calculate cutoff timestamp - cutoff_date = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold) - - # Document-level approach (recommended) - return await _cleanup_via_sql(cutoff_date, dry_run, user_id) - - -async def _cleanup_via_sql( - cutoff_date: datetime, - dry_run: bool, - user_id: Optional[UUID] = None -) -> Dict[str, Any]: - """ - SQL-based cleanup: Query Data table for unused documents and use cognee.delete(). - - Parameters - ---------- - cutoff_date : datetime - Cutoff date for last_accessed filtering - dry_run : bool - If True, only report what would be deleted - user_id : UUID, optional - Filter by user ID if provided - - Returns - ------- - Dict[str, Any] - Cleanup results - """ - db_engine = get_relational_engine() - - async with db_engine.get_async_session() as session: - # Query for Data records with old last_accessed timestamps - query = select(Data, DatasetData).join( - DatasetData, Data.id == DatasetData.data_id - ).where( - or_( - Data.last_accessed < cutoff_date, - Data.last_accessed.is_(None) - ) - ) - - if user_id: - from cognee.modules.data.models import Dataset - query = query.join(Dataset, DatasetData.dataset_id == Dataset.id).where( - Dataset.owner_id == user_id - ) - - result = await session.execute(query) - unused_data = result.all() - - logger.info(f"Found {len(unused_data)} unused documents in SQL") - - if dry_run: - return { - "status": "dry_run", - "unused_count": len(unused_data), - "deleted_count": { - "data_items": 0, - "documents": 0 - }, - "cleanup_date": datetime.now(timezone.utc).isoformat(), - "preview": { - "documents": len(unused_data) - } - } - - # Delete each document using cognee.delete() - deleted_count = 0 - from cognee.modules.users.methods import get_default_user - user = await get_default_user() if user_id is None else None - - for data, dataset_data in unused_data: - try: - await cognee.delete( - data_id=data.id, - dataset_id=dataset_data.dataset_id, - mode="hard", # Use hard mode to also remove orphaned entities - user=user - ) - deleted_count += 1 - logger.info(f"Deleted document {data.id} from dataset {dataset_data.dataset_id}") - except Exception as e: - logger.error(f"Failed to delete document {data.id}: {e}") - - logger.info("Cleanup completed", deleted_count=deleted_count) - - return { - "status": "completed", - "unused_count": len(unused_data), - "deleted_count": { - "data_items": deleted_count, - "documents": deleted_count - }, - "cleanup_date": datetime.now(timezone.utc).isoformat() +""" +Task for automatically deleting unused data from the memify pipeline. + +This task identifies and removes entire documents that haven't +been accessed by retrievers for a specified period, helping maintain system +efficiency and storage optimization through whole-document removal. +""" + +import json +from datetime import datetime, timezone, timedelta +from typing import Optional, Dict, Any +from uuid import UUID +import os +from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.infrastructure.databases.vector import get_vector_engine +from cognee.infrastructure.databases.relational import get_relational_engine +from cognee.modules.data.models import Data, DatasetData +from cognee.shared.logging_utils import get_logger +from sqlalchemy import select, or_ +import cognee +import sqlalchemy as sa +from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph + +logger = get_logger(__name__) + + +async def cleanup_unused_data( + minutes_threshold: Optional[int], dry_run: bool = True, user_id: Optional[UUID] = None +) -> Dict[str, Any]: + """ + Identify and remove unused data from the memify pipeline. + + Parameters + ---------- + minutes_threshold : int + Minutes since last access to consider data unused + dry_run : bool + If True, only report what would be deleted without actually deleting (default: True) + user_id : UUID, optional + Limit cleanup to specific user's data (default: None) + + Returns + ------- + Dict[str, Any] + Cleanup results with status, counts, and timestamp + """ + # Check 1: Environment variable must be enabled + if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true": + logger.warning("Cleanup skipped: ENABLE_LAST_ACCESSED is not enabled.") + return { + "status": "skipped", + "reason": "ENABLE_LAST_ACCESSED not enabled", + "unused_count": 0, + "deleted_count": {}, + "cleanup_date": datetime.now(timezone.utc).isoformat(), + } + + # Check 2: Verify tracking has actually been running + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + # Count records with non-NULL last_accessed + tracked_count = await session.execute( + select(sa.func.count(Data.id)).where(Data.last_accessed.isnot(None)) + ) + tracked_records = tracked_count.scalar() + + if tracked_records == 0: + logger.warning( + "Cleanup skipped: No records have been tracked yet. " + "ENABLE_LAST_ACCESSED may have been recently enabled. " + "Wait for retrievers to update timestamps before running cleanup." + ) + return { + "status": "skipped", + "reason": "No tracked records found - tracking may be newly enabled", + "unused_count": 0, + "deleted_count": {}, + "cleanup_date": datetime.now(timezone.utc).isoformat(), + } + + logger.info( + "Starting cleanup task", + minutes_threshold=minutes_threshold, + dry_run=dry_run, + user_id=str(user_id) if user_id else None, + ) + + # Calculate cutoff timestamp + cutoff_date = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold) + + # Document-level approach (recommended) + return await _cleanup_via_sql(cutoff_date, dry_run, user_id) + + +async def _cleanup_via_sql( + cutoff_date: datetime, dry_run: bool, user_id: Optional[UUID] = None +) -> Dict[str, Any]: + """ + SQL-based cleanup: Query Data table for unused documents and use cognee.delete(). + + Parameters + ---------- + cutoff_date : datetime + Cutoff date for last_accessed filtering + dry_run : bool + If True, only report what would be deleted + user_id : UUID, optional + Filter by user ID if provided + + Returns + ------- + Dict[str, Any] + Cleanup results + """ + db_engine = get_relational_engine() + + async with db_engine.get_async_session() as session: + # Query for Data records with old last_accessed timestamps + query = ( + select(Data, DatasetData) + .join(DatasetData, Data.id == DatasetData.data_id) + .where(or_(Data.last_accessed < cutoff_date, Data.last_accessed.is_(None))) + ) + + if user_id: + from cognee.modules.data.models import Dataset + + query = query.join(Dataset, DatasetData.dataset_id == Dataset.id).where( + Dataset.owner_id == user_id + ) + + result = await session.execute(query) + unused_data = result.all() + + logger.info(f"Found {len(unused_data)} unused documents in SQL") + + if dry_run: + return { + "status": "dry_run", + "unused_count": len(unused_data), + "deleted_count": {"data_items": 0, "documents": 0}, + "cleanup_date": datetime.now(timezone.utc).isoformat(), + "preview": {"documents": len(unused_data)}, + } + + # Delete each document using cognee.delete() + deleted_count = 0 + from cognee.modules.users.methods import get_default_user + + user = await get_default_user() if user_id is None else None + + for data, dataset_data in unused_data: + try: + await cognee.delete( + data_id=data.id, + dataset_id=dataset_data.dataset_id, + mode="hard", # Use hard mode to also remove orphaned entities + user=user, + ) + deleted_count += 1 + logger.info(f"Deleted document {data.id} from dataset {dataset_data.dataset_id}") + except Exception as e: + logger.error(f"Failed to delete document {data.id}: {e}") + + logger.info("Cleanup completed", deleted_count=deleted_count) + + return { + "status": "completed", + "unused_count": len(unused_data), + "deleted_count": {"data_items": deleted_count, "documents": deleted_count}, + "cleanup_date": datetime.now(timezone.utc).isoformat(), } diff --git a/cognee/tasks/summarization/models.py b/cognee/tasks/summarization/models.py index 8420cfaa5..b3c39aa98 100644 --- a/cognee/tasks/summarization/models.py +++ b/cognee/tasks/summarization/models.py @@ -1,4 +1,3 @@ - from typing import Union from cognee.infrastructure.engine import DataPoint from cognee.modules.chunking.models import DocumentChunk diff --git a/cognee/tests/test_cleanup_unused_data.py b/cognee/tests/test_cleanup_unused_data.py index e738dcba0..fa5c174c2 100644 --- a/cognee/tests/test_cleanup_unused_data.py +++ b/cognee/tests/test_cleanup_unused_data.py @@ -1,172 +1,165 @@ -import os -import pathlib -import cognee -from datetime import datetime, timezone, timedelta -from uuid import UUID -from sqlalchemy import select, update -from cognee.modules.data.models import Data, DatasetData -from cognee.infrastructure.databases.relational import get_relational_engine -from cognee.modules.users.methods import get_default_user -from cognee.shared.logging_utils import get_logger -from cognee.modules.search.types import SearchType - -logger = get_logger() - - -async def test_textdocument_cleanup_with_sql(): - """ - End-to-end test for TextDocument cleanup based on last_accessed timestamps. - """ - # Enable last accessed tracking BEFORE any cognee operations - os.environ["ENABLE_LAST_ACCESSED"] = "true" - - # Setup test directories - data_directory_path = str( - pathlib.Path( - os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_cleanup") - ).resolve() - ) - cognee_directory_path = str( - pathlib.Path( - os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_cleanup") - ).resolve() - ) - - cognee.config.data_root_directory(data_directory_path) - cognee.config.system_root_directory(cognee_directory_path) - - # Initialize database - from cognee.modules.engine.operations.setup import setup - - # Clean slate - await cognee.prune.prune_data() - await cognee.prune.prune_system(metadata=True) - - logger.info("๐Ÿงช Testing TextDocument cleanup based on last_accessed") - - # Step 1: Add and cognify a test document - dataset_name = "test_cleanup_dataset" +import os +import pathlib +import cognee +from datetime import datetime, timezone, timedelta +from uuid import UUID +from sqlalchemy import select, update +from cognee.modules.data.models import Data, DatasetData +from cognee.infrastructure.databases.relational import get_relational_engine +from cognee.modules.users.methods import get_default_user +from cognee.shared.logging_utils import get_logger +from cognee.modules.search.types import SearchType + +logger = get_logger() + + +async def test_textdocument_cleanup_with_sql(): + """ + End-to-end test for TextDocument cleanup based on last_accessed timestamps. + """ + # Enable last accessed tracking BEFORE any cognee operations + os.environ["ENABLE_LAST_ACCESSED"] = "true" + + # Setup test directories + data_directory_path = str( + pathlib.Path( + os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_cleanup") + ).resolve() + ) + cognee_directory_path = str( + pathlib.Path( + os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_cleanup") + ).resolve() + ) + + cognee.config.data_root_directory(data_directory_path) + cognee.config.system_root_directory(cognee_directory_path) + + # Initialize database + from cognee.modules.engine.operations.setup import setup + + # Clean slate + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + logger.info("๐Ÿงช Testing TextDocument cleanup based on last_accessed") + + # Step 1: Add and cognify a test document + dataset_name = "test_cleanup_dataset" test_text = """ Machine learning is a subset of artificial intelligence that enables systems to learn and improve from experience without being explicitly programmed. Deep learning uses neural networks with multiple layers to process data. - """ - - await setup() - user = await get_default_user() - await cognee.add([test_text], dataset_name=dataset_name, user=user) - - cognify_result = await cognee.cognify([dataset_name], user=user) - - # Extract dataset_id from cognify result - dataset_id = None - for ds_id, pipeline_result in cognify_result.items(): - dataset_id = ds_id - break - - assert dataset_id is not None, "Failed to get dataset_id from cognify result" - logger.info(f"โœ… Document added and cognified. Dataset ID: {dataset_id}") - - # Step 2: Perform search to trigger last_accessed update - logger.info("Triggering search to update last_accessed...") - search_results = await cognee.search( - query_type=SearchType.CHUNKS, - query_text="machine learning", - datasets=[dataset_name], - user=user - ) - logger.info(f"โœ… Search completed, found {len(search_results)} results") - assert len(search_results) > 0, "Search should return results" - - # Step 3: Verify last_accessed was set and get data_id - db_engine = get_relational_engine() - async with db_engine.get_async_session() as session: - result = await session.execute( - select(Data, DatasetData) - .join(DatasetData, Data.id == DatasetData.data_id) - .where(DatasetData.dataset_id == dataset_id) - ) - data_records = result.all() - assert len(data_records) > 0, "No Data records found for the dataset" - data_record = data_records[0][0] - data_id = data_record.id - - # Verify last_accessed is set - assert data_record.last_accessed is not None, ( - "last_accessed should be set after search operation" - ) - - original_last_accessed = data_record.last_accessed - logger.info(f"โœ… last_accessed verified: {original_last_accessed}") - - # Step 4: Manually age the timestamp - minutes_threshold = 30 - aged_timestamp = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold + 10) - - async with db_engine.get_async_session() as session: - stmt = update(Data).where(Data.id == data_id).values(last_accessed=aged_timestamp) - await session.execute(stmt) - await session.commit() - - # Verify timestamp was updated - async with db_engine.get_async_session() as session: - result = await session.execute(select(Data).where(Data.id == data_id)) - updated_data = result.scalar_one_or_none() - assert updated_data is not None, "Data record should exist" - retrieved_timestamp = updated_data.last_accessed - if retrieved_timestamp.tzinfo is None: - retrieved_timestamp = retrieved_timestamp.replace(tzinfo=timezone.utc) - assert retrieved_timestamp == aged_timestamp, ( - f"Timestamp should be updated to aged value" - ) - - # Step 5: Test cleanup (document-level is now the default) - from cognee.tasks.cleanup.cleanup_unused_data import cleanup_unused_data - - # First do a dry run - logger.info("Testing dry run...") - dry_run_result = await cleanup_unused_data( - minutes_threshold=10, - dry_run=True, - user_id=user.id - ) - - # Debug: Print the actual result - logger.info(f"Dry run result: {dry_run_result}") - - assert dry_run_result['status'] == 'dry_run', f"Status should be 'dry_run', got: {dry_run_result['status']}" - assert dry_run_result['unused_count'] > 0, ( - "Should find at least one unused document" - ) - logger.info(f"โœ… Dry run found {dry_run_result['unused_count']} unused documents") - - # Now run actual cleanup - logger.info("Executing cleanup...") - cleanup_result = await cleanup_unused_data( - minutes_threshold=30, - dry_run=False, - user_id=user.id - ) - - assert cleanup_result["status"] == "completed", "Cleanup should complete successfully" - assert cleanup_result["deleted_count"]["documents"] > 0, ( - "At least one document should be deleted" - ) - logger.info(f"โœ… Cleanup completed. Deleted {cleanup_result['deleted_count']['documents']} documents") - - # Step 6: Verify deletion - async with db_engine.get_async_session() as session: - deleted_data = ( - await session.execute(select(Data).where(Data.id == data_id)) - ).scalar_one_or_none() - assert deleted_data is None, "Data record should be deleted" - logger.info("โœ… Confirmed: Data record was deleted") - - logger.info("๐ŸŽ‰ All cleanup tests passed!") - return True - - -if __name__ == "__main__": - import asyncio - success = asyncio.run(test_textdocument_cleanup_with_sql()) + """ + + await setup() + user = await get_default_user() + await cognee.add([test_text], dataset_name=dataset_name, user=user) + + cognify_result = await cognee.cognify([dataset_name], user=user) + + # Extract dataset_id from cognify result + dataset_id = None + for ds_id, pipeline_result in cognify_result.items(): + dataset_id = ds_id + break + + assert dataset_id is not None, "Failed to get dataset_id from cognify result" + logger.info(f"โœ… Document added and cognified. Dataset ID: {dataset_id}") + + # Step 2: Perform search to trigger last_accessed update + logger.info("Triggering search to update last_accessed...") + search_results = await cognee.search( + query_type=SearchType.CHUNKS, + query_text="machine learning", + datasets=[dataset_name], + user=user, + ) + logger.info(f"โœ… Search completed, found {len(search_results)} results") + assert len(search_results) > 0, "Search should return results" + + # Step 3: Verify last_accessed was set and get data_id + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + result = await session.execute( + select(Data, DatasetData) + .join(DatasetData, Data.id == DatasetData.data_id) + .where(DatasetData.dataset_id == dataset_id) + ) + data_records = result.all() + assert len(data_records) > 0, "No Data records found for the dataset" + data_record = data_records[0][0] + data_id = data_record.id + + # Verify last_accessed is set + assert data_record.last_accessed is not None, ( + "last_accessed should be set after search operation" + ) + + original_last_accessed = data_record.last_accessed + logger.info(f"โœ… last_accessed verified: {original_last_accessed}") + + # Step 4: Manually age the timestamp + minutes_threshold = 30 + aged_timestamp = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold + 10) + + async with db_engine.get_async_session() as session: + stmt = update(Data).where(Data.id == data_id).values(last_accessed=aged_timestamp) + await session.execute(stmt) + await session.commit() + + # Verify timestamp was updated + async with db_engine.get_async_session() as session: + result = await session.execute(select(Data).where(Data.id == data_id)) + updated_data = result.scalar_one_or_none() + assert updated_data is not None, "Data record should exist" + retrieved_timestamp = updated_data.last_accessed + if retrieved_timestamp.tzinfo is None: + retrieved_timestamp = retrieved_timestamp.replace(tzinfo=timezone.utc) + assert retrieved_timestamp == aged_timestamp, "Timestamp should be updated to aged value" + + # Step 5: Test cleanup (document-level is now the default) + from cognee.tasks.cleanup.cleanup_unused_data import cleanup_unused_data + + # First do a dry run + logger.info("Testing dry run...") + dry_run_result = await cleanup_unused_data(minutes_threshold=10, dry_run=True, user_id=user.id) + + # Debug: Print the actual result + logger.info(f"Dry run result: {dry_run_result}") + + assert dry_run_result["status"] == "dry_run", ( + f"Status should be 'dry_run', got: {dry_run_result['status']}" + ) + assert dry_run_result["unused_count"] > 0, "Should find at least one unused document" + logger.info(f"โœ… Dry run found {dry_run_result['unused_count']} unused documents") + + # Now run actual cleanup + logger.info("Executing cleanup...") + cleanup_result = await cleanup_unused_data(minutes_threshold=30, dry_run=False, user_id=user.id) + + assert cleanup_result["status"] == "completed", "Cleanup should complete successfully" + assert cleanup_result["deleted_count"]["documents"] > 0, ( + "At least one document should be deleted" + ) + logger.info( + f"โœ… Cleanup completed. Deleted {cleanup_result['deleted_count']['documents']} documents" + ) + + # Step 6: Verify deletion + async with db_engine.get_async_session() as session: + deleted_data = ( + await session.execute(select(Data).where(Data.id == data_id)) + ).scalar_one_or_none() + assert deleted_data is None, "Data record should be deleted" + logger.info("โœ… Confirmed: Data record was deleted") + + logger.info("๐ŸŽ‰ All cleanup tests passed!") + return True + + +if __name__ == "__main__": + import asyncio + + success = asyncio.run(test_textdocument_cleanup_with_sql()) exit(0 if success else 1) diff --git a/cognee/tests/test_permissions.py b/cognee/tests/test_permissions.py index 3c8a4606a..10696441e 100644 --- a/cognee/tests/test_permissions.py +++ b/cognee/tests/test_permissions.py @@ -1,227 +1,212 @@ +import asyncio import os -import cognee import pathlib -from cognee.modules.users.exceptions import PermissionDeniedError -from cognee.shared.logging_utils import get_logger +import pytest +import pytest_asyncio +from unittest.mock import AsyncMock, patch + +import cognee +from cognee.context_global_variables import backend_access_control_enabled +from cognee.modules.engine.operations.setup import setup as engine_setup from cognee.modules.search.types import SearchType -from cognee.modules.users.methods import get_default_user, create_user +from cognee.modules.users.exceptions import PermissionDeniedError +from cognee.modules.users.methods import create_user, get_user from cognee.modules.users.permissions.methods import authorized_give_permission_on_datasets -from cognee.modules.data.methods import get_dataset_data +from cognee.modules.users.roles.methods import add_user_to_role, create_role +from cognee.modules.users.tenants.methods import ( + add_user_to_tenant, + create_tenant, + select_tenant, +) -logger = get_logger() +pytestmark = pytest.mark.asyncio -async def main(): - # Enable permissions feature - os.environ["ENABLE_BACKEND_ACCESS_CONTROL"] = "True" +def _extract_dataset_id_from_cognify(cognify_result: dict): + """Extract dataset_id from cognify output dictionary.""" + for dataset_id, _pipeline_result in cognify_result.items(): + return dataset_id + return None - # Clean up test directories before starting - data_directory_path = str( - pathlib.Path( - os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_permissions") - ).resolve() - ) - cognee_directory_path = str( - pathlib.Path( - os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_permissions") - ).resolve() + +async def _reset_engines_and_prune() -> None: + """Reset db engine caches and prune data/system.""" + try: + from cognee.infrastructure.databases.vector import get_vector_engine + + vector_engine = get_vector_engine() + if hasattr(vector_engine, "engine") and hasattr(vector_engine.engine, "dispose"): + await vector_engine.engine.dispose(close=True) + except Exception: + pass + + from cognee.infrastructure.databases.graph.get_graph_engine import create_graph_engine + from cognee.infrastructure.databases.relational.create_relational_engine import ( + create_relational_engine, ) + from cognee.infrastructure.databases.vector.create_vector_engine import create_vector_engine - cognee.config.data_root_directory(data_directory_path) - cognee.config.system_root_directory(cognee_directory_path) + create_graph_engine.cache_clear() + create_vector_engine.cache_clear() + create_relational_engine.cache_clear() await cognee.prune.prune_data() await cognee.prune.prune_system(metadata=True) - explanation_file_path_nlp = os.path.join( - pathlib.Path(__file__).parent, "test_data/Natural_language_processing.txt" - ) - # Add document for default user - await cognee.add([explanation_file_path_nlp], dataset_name="NLP") - default_user = await get_default_user() - - explanation_file_path_quantum = os.path.join( - pathlib.Path(__file__).parent, "test_data/Quantum_computers.txt" - ) - - # Add document for test user - test_user = await create_user("user@example.com", "example") - await cognee.add([explanation_file_path_quantum], dataset_name="QUANTUM", user=test_user) - - nlp_cognify_result = await cognee.cognify(["NLP"], user=default_user) - quantum_cognify_result = await cognee.cognify(["QUANTUM"], user=test_user) - - # Extract dataset_ids from cognify results - def extract_dataset_id_from_cognify(cognify_result): - """Extract dataset_id from cognify output dictionary""" - for dataset_id, pipeline_result in cognify_result.items(): - return dataset_id # Return the first (and likely only) dataset_id - return None - - # Get dataset IDs from cognify results - default_user_dataset_id = extract_dataset_id_from_cognify(nlp_cognify_result) - print("User is", default_user_dataset_id) - test_user_dataset_id = extract_dataset_id_from_cognify(quantum_cognify_result) - - # Check if default_user can only see information from the NLP dataset - search_results = await cognee.search( - query_type=SearchType.GRAPH_COMPLETION, - query_text="What is in the document?", - user=default_user, - ) - assert len(search_results) == 1, "The search results list lenght is not one." - print("\n\nExtracted sentences are:\n") - for result in search_results: - print(f"{result}\n") - assert search_results[0]["dataset_name"] == "NLP", ( - f"Dict must contain dataset name 'NLP': {search_results[0]}" - ) - - # Check if test_user can only see information from the QUANTUM dataset - search_results = await cognee.search( - query_type=SearchType.GRAPH_COMPLETION, - query_text="What is in the document?", - user=test_user, - ) - assert len(search_results) == 1, "The search results list lenght is not one." - print("\n\nExtracted sentences are:\n") - for result in search_results: - print(f"{result}\n") - assert search_results[0]["dataset_name"] == "QUANTUM", ( - f"Dict must contain dataset name 'QUANTUM': {search_results[0]}" - ) - - # Try to add document with default_user to test_users dataset (test write permission enforcement) - add_error = False +@pytest.fixture(scope="module") +def event_loop(): + """Single event loop for this module (avoids cross-loop futures).""" + loop = asyncio.new_event_loop() try: - await cognee.add( - [explanation_file_path_nlp], - dataset_name="QUANTUM", - dataset_id=test_user_dataset_id, - user=default_user, + yield loop + finally: + loop.close() + + +@pytest_asyncio.fixture(scope="module") +async def permissions_example_env(tmp_path_factory): + """One-time environment setup for the permissions example test.""" + # Ensure permissions feature is enabled (example requires it), but don't override if caller set it already. + os.environ.setdefault("ENABLE_BACKEND_ACCESS_CONTROL", "True") + + root = tmp_path_factory.mktemp("permissions_example") + cognee.config.data_root_directory(str(root / "data")) + cognee.config.system_root_directory(str(root / "system")) + + await _reset_engines_and_prune() + await engine_setup() + + assert backend_access_control_enabled(), ( + "Expected permissions to be enabled via ENABLE_BACKEND_ACCESS_CONTROL=True" + ) + + yield + + await _reset_engines_and_prune() + + +async def test_permissions_example_flow(permissions_example_env): + """Pytest version of `examples/python/permissions_example.py` (same scenarios, asserts instead of prints).""" + # Patch LLM calls so GRAPH_COMPLETION can run without external API keys. + llm_patch = patch( + "cognee.infrastructure.llm.LLMGateway.LLMGateway.acreate_structured_output", + new_callable=AsyncMock, + return_value="MOCK_ANSWER", + ) + + # Resolve example data file path (repo-shipped PDF). + repo_root = pathlib.Path(__file__).resolve().parent + explanation_file_path = str(repo_root / "test_data" / "artificial-intelligence.pdf") + assert pathlib.Path(explanation_file_path).exists(), ( + f"Expected example PDF to exist at {explanation_file_path}" + ) + + # Same QUANTUM text as in the example. + text = """A quantum computer is a computer that takes advantage of quantum mechanical phenomena. + At small scales, physical matter exhibits properties of both particles and waves, and quantum computing leverages + this behavior, specifically quantum superposition and entanglement, using specialized hardware that supports the + preparation and manipulation of quantum states. + """ + + # Create user_1, add AI dataset. + user_1 = await create_user("user_1@example.com", "example") + await cognee.add([explanation_file_path], dataset_name="AI", user=user_1) + + # Create user_2, add QUANTUM dataset. + user_2 = await create_user("user_2@example.com", "example") + await cognee.add([text], dataset_name="QUANTUM", user=user_2) + + ai_cognify_result = await cognee.cognify(["AI"], user=user_1) + quantum_cognify_result = await cognee.cognify(["QUANTUM"], user=user_2) + + ai_dataset_id = _extract_dataset_id_from_cognify(ai_cognify_result) + quantum_dataset_id = _extract_dataset_id_from_cognify(quantum_cognify_result) + assert ai_dataset_id is not None + assert quantum_dataset_id is not None + + with llm_patch: + # user_1 can read own dataset. + search_results = await cognee.search( + query_type=SearchType.GRAPH_COMPLETION, + query_text="What is in the document?", + user=user_1, + datasets=[ai_dataset_id], ) - except PermissionDeniedError: - add_error = True - assert add_error, "PermissionDeniedError was not raised during add as expected" + assert isinstance(search_results, list) and len(search_results) == 1 + assert search_results[0]["dataset_name"] == "AI" + assert search_results[0]["search_result"] == ["MOCK_ANSWER"] - # Try to cognify with default_user the test_users dataset (test write permission enforcement) - cognify_error = False - try: - await cognee.cognify(datasets=[test_user_dataset_id], user=default_user) - except PermissionDeniedError: - cognify_error = True - assert cognify_error, "PermissionDeniedError was not raised during cognify as expected" + # user_1 can't read dataset owned by user_2. + with pytest.raises(PermissionDeniedError): + await cognee.search( + query_type=SearchType.GRAPH_COMPLETION, + query_text="What is in the document?", + user=user_1, + datasets=[quantum_dataset_id], + ) - # Try to add permission for a dataset default_user does not have share permission for - give_permission_error = False - try: + # user_1 can't add to user_2's dataset. + with pytest.raises(PermissionDeniedError): + await cognee.add([explanation_file_path], dataset_id=quantum_dataset_id, user=user_1) + + # user_2 grants read permission to user_1 for QUANTUM dataset. await authorized_give_permission_on_datasets( - default_user.id, - [test_user_dataset_id], - "write", - default_user.id, + user_1.id, [quantum_dataset_id], "read", user_2.id ) - except PermissionDeniedError: - give_permission_error = True - assert give_permission_error, ( - "PermissionDeniedError was not raised during assignment of permission as expected" - ) - # Actually give permission to default_user to write on test_users dataset - await authorized_give_permission_on_datasets( - default_user.id, - [test_user_dataset_id], - "write", - test_user.id, - ) + with llm_patch: + # Now user_1 can read QUANTUM dataset via dataset_id. + search_results = await cognee.search( + query_type=SearchType.GRAPH_COMPLETION, + query_text="What is in the document?", + user=user_1, + dataset_ids=[quantum_dataset_id], + ) + assert isinstance(search_results, list) and len(search_results) == 1 + assert search_results[0]["dataset_name"] == "QUANTUM" + assert search_results[0]["search_result"] == ["MOCK_ANSWER"] - # Add new data to test_users dataset from default_user - await cognee.add( - [explanation_file_path_nlp], - dataset_name="QUANTUM", - dataset_id=test_user_dataset_id, - user=default_user, - ) - await cognee.cognify(datasets=[test_user_dataset_id], user=default_user) + # Tenant + role scenario. + tenant_id = await create_tenant("CogneeLab", user_2.id) + await select_tenant(user_id=user_2.id, tenant_id=tenant_id) + role_id = await create_role(role_name="Researcher", owner_id=user_2.id) - # Actually give permission to default_user to read on test_users dataset - await authorized_give_permission_on_datasets( - default_user.id, - [test_user_dataset_id], - "read", - test_user.id, - ) + user_3 = await create_user("user_3@example.com", "example") + await add_user_to_tenant(user_id=user_3.id, tenant_id=tenant_id, owner_id=user_2.id) + await add_user_to_role(user_id=user_3.id, role_id=role_id, owner_id=user_2.id) + await select_tenant(user_id=user_3.id, tenant_id=tenant_id) - # Check if default_user can see from test_users datasets now - search_results = await cognee.search( - query_type=SearchType.GRAPH_COMPLETION, - query_text="What is in the document?", - user=default_user, - dataset_ids=[test_user_dataset_id], - ) - assert len(search_results) == 1, "The search results list length is not one." - print("\n\nExtracted sentences are:\n") - for result in search_results: - print(f"{result}\n") + # Can't grant role permission on a dataset that isn't part of the active tenant. + with pytest.raises(PermissionDeniedError): + await authorized_give_permission_on_datasets( + role_id, [quantum_dataset_id], "read", user_2.id + ) - assert search_results[0]["dataset_name"] == "QUANTUM", ( - f"Dict must contain dataset name 'QUANTUM': {search_results[0]}" - ) - - # Check if default_user can only see information from both datasets now - search_results = await cognee.search( - query_type=SearchType.GRAPH_COMPLETION, - query_text="What is in the document?", - user=default_user, - ) - assert len(search_results) == 2, "The search results list length is not two." - print("\n\nExtracted sentences are:\n") - for result in search_results: - print(f"{result}\n") - - # Try deleting data from test_user dataset with default_user without delete permission - delete_error = False - try: - # Get the dataset data to find the ID of the first data item (text) - test_user_dataset_data = await get_dataset_data(test_user_dataset_id) - text_data_id = test_user_dataset_data[0].id - - await cognee.delete( - data_id=text_data_id, dataset_id=test_user_dataset_id, user=default_user + # Re-create QUANTUM dataset in CogneeLab tenant so role permissions can be assigned. + user_2 = await get_user(user_2.id) # refresh tenant context + await cognee.add([text], dataset_name="QUANTUM_COGNEE_LAB", user=user_2) + quantum_cognee_lab_cognify_result = await cognee.cognify( + ["QUANTUM_COGNEE_LAB"], user=user_2 ) - except PermissionDeniedError: - delete_error = True + quantum_cognee_lab_dataset_id = _extract_dataset_id_from_cognify( + quantum_cognee_lab_cognify_result + ) + assert quantum_cognee_lab_dataset_id is not None - assert delete_error, "PermissionDeniedError was not raised during delete operation as expected" + await authorized_give_permission_on_datasets( + role_id, [quantum_cognee_lab_dataset_id], "read", user_2.id + ) - # Try deleting data from test_user dataset with test_user - # Get the dataset data to find the ID of the first data item (text) - test_user_dataset_data = await get_dataset_data(test_user_dataset_id) - text_data_id = test_user_dataset_data[0].id - - await cognee.delete(data_id=text_data_id, dataset_id=test_user_dataset_id, user=test_user) - - # Actually give permission to default_user to delete data for test_users dataset - await authorized_give_permission_on_datasets( - default_user.id, - [test_user_dataset_id], - "delete", - test_user.id, - ) - - # Try deleting data from test_user dataset with default_user after getting delete permission - # Get the dataset data to find the ID of the remaining data item (explanation_file_path_nlp) - test_user_dataset_data = await get_dataset_data(test_user_dataset_id) - explanation_file_data_id = test_user_dataset_data[0].id - - await cognee.delete( - data_id=explanation_file_data_id, dataset_id=test_user_dataset_id, user=default_user - ) - - -if __name__ == "__main__": - import asyncio - - asyncio.run(main()) + with llm_patch: + # user_3 can read via role permission. + search_results = await cognee.search( + query_type=SearchType.GRAPH_COMPLETION, + query_text="What is in the document?", + user=user_3, + dataset_ids=[quantum_cognee_lab_dataset_id], + ) + assert isinstance(search_results, list) and len(search_results) == 1 + assert search_results[0]["dataset_name"] == "QUANTUM_COGNEE_LAB" + assert search_results[0]["search_result"] == ["MOCK_ANSWER"]