From 829a6f0d04bcfec6e9c9f94219a29d6ab5cd909d Mon Sep 17 00:00:00 2001 From: chinu0609 Date: Wed, 10 Dec 2025 22:41:01 +0530 Subject: [PATCH] fix: only document level deletion --- .../retrieval/utils/access_tracking.py | 80 +-- cognee/tasks/cleanup/cleanup_unused_data.py | 521 ++++++------------ cognee/tests/test_cleanup_unused_data.py | 388 ++++++------- 3 files changed, 333 insertions(+), 656 deletions(-) diff --git a/cognee/modules/retrieval/utils/access_tracking.py b/cognee/modules/retrieval/utils/access_tracking.py index c7b06ee17..54fd043b9 100644 --- a/cognee/modules/retrieval/utils/access_tracking.py +++ b/cognee/modules/retrieval/utils/access_tracking.py @@ -4,7 +4,7 @@ import json from datetime import datetime, timezone from typing import List, Any from uuid import UUID -import os +import os from cognee.infrastructure.databases.graph import get_graph_engine from cognee.infrastructure.databases.relational import get_relational_engine from cognee.modules.data.models import Data @@ -14,38 +14,28 @@ from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph logger = get_logger(__name__) + async def update_node_access_timestamps(items: List[Any]): if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true": return - + if not items: return - + graph_engine = await get_graph_engine() - timestamp_ms = int(datetime.now(timezone.utc).timestamp() * 1000) timestamp_dt = datetime.now(timezone.utc) - + # Extract node IDs node_ids = [] for item in items: item_id = item.payload.get("id") if hasattr(item, 'payload') else item.get("id") if item_id: node_ids.append(str(item_id)) - + if not node_ids: return - - try: - # Try to update nodes in graph database (may fail for unsupported DBs) - await _update_nodes_via_projection(graph_engine, node_ids, timestamp_ms) - except Exception as e: - logger.warning( - f"Failed to update node timestamps in graph database: {e}. " - "Will update document-level timestamps in SQL instead." - ) - - # Always try to find origin documents and update SQL - # This ensures document-level tracking works even if graph updates fail + + # Focus on document-level tracking via projection try: doc_ids = await _find_origin_documents_via_projection(graph_engine, node_ids) if doc_ids: @@ -54,53 +44,6 @@ async def update_node_access_timestamps(items: List[Any]): logger.error(f"Failed to update SQL timestamps: {e}") raise -async def _update_nodes_via_projection(graph_engine, node_ids, timestamp_ms): - """Update nodes using graph projection - works with any graph database""" - # Project the graph with necessary properties - memory_fragment = CogneeGraph() - await memory_fragment.project_graph_from_db( - graph_engine, - node_properties_to_project=["id"], - edge_properties_to_project=[] - ) - - # Update each node's last_accessed_at property - provider = os.getenv("GRAPH_DATABASE_PROVIDER", "kuzu").lower() - - for node_id in node_ids: - node = memory_fragment.get_node(node_id) - if node: - try: - # Update the node in the database - if provider == "kuzu": - # Kuzu stores properties as JSON - result = await graph_engine.query( - "MATCH (n:Node {id: $id}) RETURN n.properties", - {"id": node_id} - ) - - if result and result[0]: - props = json.loads(result[0][0]) if result[0][0] else {} - props["last_accessed_at"] = timestamp_ms - - await graph_engine.query( - "MATCH (n:Node {id: $id}) SET n.properties = $props", - {"id": node_id, "props": json.dumps(props)} - ) - elif provider == "neo4j": - await graph_engine.query( - "MATCH (n:__Node__ {id: $id}) SET n.last_accessed_at = $timestamp", - {"id": node_id, "timestamp": timestamp_ms} - ) - elif provider == "neptune": - await graph_engine.query( - "MATCH (n:Node {id: $id}) SET n.last_accessed_at = $timestamp", - {"id": node_id, "timestamp": timestamp_ms} - ) - except Exception as e: - # Log but continue with other nodes - logger.debug(f"Failed to update node {node_id}: {e}") - continue async def _find_origin_documents_via_projection(graph_engine, node_ids): """Find origin documents using graph projection instead of DB queries""" @@ -111,7 +54,7 @@ async def _find_origin_documents_via_projection(graph_engine, node_ids): node_properties_to_project=["id", "type"], edge_properties_to_project=["relationship_name"] ) - + # Find origin documents by traversing the in-memory graph doc_ids = set() for node_id in node_ids: @@ -123,9 +66,10 @@ async def _find_origin_documents_via_projection(graph_engine, node_ids): neighbor = edge.get_destination_node() if edge.get_source_node().id == node_id else edge.get_source_node() if neighbor and neighbor.get_attribute("type") in ["TextDocument", "Document"]: doc_ids.add(neighbor.id) - + return list(doc_ids) + async def _update_sql_records(doc_ids, timestamp_dt): """Update SQL Data table (same for all providers)""" db_engine = get_relational_engine() @@ -133,6 +77,6 @@ async def _update_sql_records(doc_ids, timestamp_dt): stmt = update(Data).where( Data.id.in_([UUID(doc_id) for doc_id in doc_ids]) ).values(last_accessed=timestamp_dt) - + await session.execute(stmt) await session.commit() diff --git a/cognee/tasks/cleanup/cleanup_unused_data.py b/cognee/tasks/cleanup/cleanup_unused_data.py index 3894635dd..34cde1b6f 100644 --- a/cognee/tasks/cleanup/cleanup_unused_data.py +++ b/cognee/tasks/cleanup/cleanup_unused_data.py @@ -1,382 +1,187 @@ -""" -Task for automatically deleting unused data from the memify pipeline. +""" +Task for automatically deleting unused data from the memify pipeline. + +This task identifies and removes entire documents that haven't +been accessed by retrievers for a specified period, helping maintain system +efficiency and storage optimization through whole-document removal. +""" + +import json +from datetime import datetime, timezone, timedelta +from typing import Optional, Dict, Any +from uuid import UUID +import os +from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.infrastructure.databases.vector import get_vector_engine +from cognee.infrastructure.databases.relational import get_relational_engine +from cognee.modules.data.models import Data, DatasetData +from cognee.shared.logging_utils import get_logger +from sqlalchemy import select, or_ +import cognee +import sqlalchemy as sa +from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph + +logger = get_logger(__name__) + + +async def cleanup_unused_data( + minutes_threshold: Optional[int], + dry_run: bool = True, + user_id: Optional[UUID] = None +) -> Dict[str, Any]: + """ + Identify and remove unused data from the memify pipeline. + + Parameters + ---------- + minutes_threshold : int + Minutes since last access to consider data unused + dry_run : bool + If True, only report what would be deleted without actually deleting (default: True) + user_id : UUID, optional + Limit cleanup to specific user's data (default: None) + + Returns + ------- + Dict[str, Any] + Cleanup results with status, counts, and timestamp + """ + # Check 1: Environment variable must be enabled + if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true": + logger.warning( + "Cleanup skipped: ENABLE_LAST_ACCESSED is not enabled." + ) + return { + "status": "skipped", + "reason": "ENABLE_LAST_ACCESSED not enabled", + "unused_count": 0, + "deleted_count": {}, + "cleanup_date": datetime.now(timezone.utc).isoformat() + } + + # Check 2: Verify tracking has actually been running + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + # Count records with non-NULL last_accessed + tracked_count = await session.execute( + select(sa.func.count(Data.id)).where(Data.last_accessed.isnot(None)) + ) + tracked_records = tracked_count.scalar() + + if tracked_records == 0: + logger.warning( + "Cleanup skipped: No records have been tracked yet. " + "ENABLE_LAST_ACCESSED may have been recently enabled. " + "Wait for retrievers to update timestamps before running cleanup." + ) + return { + "status": "skipped", + "reason": "No tracked records found - tracking may be newly enabled", + "unused_count": 0, + "deleted_count": {}, + "cleanup_date": datetime.now(timezone.utc).isoformat() + } + + logger.info( + "Starting cleanup task", + minutes_threshold=minutes_threshold, + dry_run=dry_run, + user_id=str(user_id) if user_id else None + ) + + # Calculate cutoff timestamp + cutoff_date = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold) -This task identifies and removes entire documents that haven't -been accessed by retrievers for a specified period, helping maintain system -efficiency and storage optimization through whole-document removal. -""" - -import json -from datetime import datetime, timezone, timedelta -from typing import Optional, Dict, Any -from uuid import UUID -import os -from cognee.infrastructure.databases.graph import get_graph_engine -from cognee.infrastructure.databases.vector import get_vector_engine -from cognee.infrastructure.databases.relational import get_relational_engine -from cognee.modules.data.models import Data, DatasetData -from cognee.shared.logging_utils import get_logger -from sqlalchemy import select, or_ -import cognee -import sqlalchemy as sa -from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph - -logger = get_logger(__name__) + # Document-level approach (recommended) + return await _cleanup_via_sql(cutoff_date, dry_run, user_id) -async def cleanup_unused_data( - minutes_threshold: Optional[int], - dry_run: bool = True, - user_id: Optional[UUID] = None, - text_doc: bool = True, # Changed default to True for document-level cleanup - node_level: bool = False # New parameter for explicit node-level cleanup +async def _cleanup_via_sql( + cutoff_date: datetime, + dry_run: bool, + user_id: Optional[UUID] = None ) -> Dict[str, Any]: """ - Identify and remove unused data from the memify pipeline. - + SQL-based cleanup: Query Data table for unused documents and use cognee.delete(). + Parameters ---------- - minutes_threshold : int - Minutes since last access to consider data unused + cutoff_date : datetime + Cutoff date for last_accessed filtering dry_run : bool - If True, only report what would be deleted without actually deleting (default: True) + If True, only report what would be deleted user_id : UUID, optional - Limit cleanup to specific user's data (default: None) - text_doc : bool - If True (default), use SQL-based filtering to find unused TextDocuments and call cognee.delete() - for proper whole-document deletion - node_level : bool - If True, perform chaotic node-level deletion of unused chunks, entities, and summaries - (default: False - deprecated in favor of document-level cleanup) + Filter by user ID if provided Returns ------- Dict[str, Any] - Cleanup results with status, counts, and timestamp - """ - # Check 1: Environment variable must be enabled - if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true": - logger.warning( - "Cleanup skipped: ENABLE_LAST_ACCESSED is not enabled." - ) - return { - "status": "skipped", - "reason": "ENABLE_LAST_ACCESSED not enabled", - "unused_count": 0, - "deleted_count": {}, - "cleanup_date": datetime.now(timezone.utc).isoformat() - } - - # Check 2: Verify tracking has actually been running - db_engine = get_relational_engine() - async with db_engine.get_async_session() as session: - # Count records with non-NULL last_accessed - tracked_count = await session.execute( - select(sa.func.count(Data.id)).where(Data.last_accessed.isnot(None)) - ) - tracked_records = tracked_count.scalar() - - if tracked_records == 0: - logger.warning( - "Cleanup skipped: No records have been tracked yet. " - "ENABLE_LAST_ACCESSED may have been recently enabled. " - "Wait for retrievers to update timestamps before running cleanup." - ) - return { - "status": "skipped", - "reason": "No tracked records found - tracking may be newly enabled", - "unused_count": 0, - "deleted_count": {}, - "cleanup_date": datetime.now(timezone.utc).isoformat() - } - - logger.info( - "Starting cleanup task", - minutes_threshold=minutes_threshold, - dry_run=dry_run, - user_id=str(user_id) if user_id else None, - text_doc=text_doc, - node_level=node_level - ) + Cleanup results + """ + db_engine = get_relational_engine() - # Calculate cutoff timestamp - cutoff_date = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold) - - if node_level: - # Deprecated: Node-level approach (chaotic) - logger.warning( - "Node-level cleanup is deprecated and may lead to fragmented knowledge graphs. " - "Consider using document-level cleanup (default) instead." - ) - cutoff_timestamp_ms = int(cutoff_date.timestamp() * 1000) - logger.debug(f"Cutoff timestamp: {cutoff_date.isoformat()} ({cutoff_timestamp_ms}ms)") + async with db_engine.get_async_session() as session: + # Query for Data records with old last_accessed timestamps + query = select(Data, DatasetData).join( + DatasetData, Data.id == DatasetData.data_id + ).where( + or_( + Data.last_accessed < cutoff_date, + Data.last_accessed.is_(None) + ) + ) - # Find unused nodes using graph projection - unused_nodes = await _find_unused_nodes_via_projection(cutoff_timestamp_ms) - - total_unused = sum(len(nodes) for nodes in unused_nodes.values()) - logger.info(f"Found {total_unused} unused nodes", unused_nodes={k: len(v) for k, v in unused_nodes.items()}) - - if dry_run: - return { - "status": "dry_run", - "unused_count": total_unused, - "deleted_count": { - "data_items": 0, - "chunks": 0, - "entities": 0, - "summaries": 0, - "associations": 0 - }, - "cleanup_date": datetime.now(timezone.utc).isoformat(), - "preview": { - "chunks": len(unused_nodes["DocumentChunk"]), - "entities": len(unused_nodes["Entity"]), - "summaries": len(unused_nodes["TextSummary"]) - } - } - - # Delete unused nodes (provider-agnostic deletion) - deleted_counts = await _delete_unused_nodes(unused_nodes) - - logger.info("Cleanup completed", deleted_counts=deleted_counts) + if user_id: + from cognee.modules.data.models import Dataset + query = query.join(Dataset, DatasetData.dataset_id == Dataset.id).where( + Dataset.owner_id == user_id + ) + result = await session.execute(query) + unused_data = result.all() + + logger.info(f"Found {len(unused_data)} unused documents in SQL") + + if dry_run: return { - "status": "completed", - "unused_count": total_unused, + "status": "dry_run", + "unused_count": len(unused_data), "deleted_count": { "data_items": 0, - "chunks": deleted_counts["DocumentChunk"], - "entities": deleted_counts["Entity"], - "summaries": deleted_counts["TextSummary"], - "associations": deleted_counts["associations"] + "documents": 0 }, - "cleanup_date": datetime.now(timezone.utc).isoformat() - } - else: - # Default: Document-level approach (recommended) - return await _cleanup_via_sql(cutoff_date, dry_run, user_id) - - -async def _cleanup_via_sql( - cutoff_date: datetime, - dry_run: bool, - user_id: Optional[UUID] = None -) -> Dict[str, Any]: - """ - SQL-based cleanup: Query Data table for unused documents and use cognee.delete(). - - Parameters - ---------- - cutoff_date : datetime - Cutoff date for last_accessed filtering - dry_run : bool - If True, only report what would be deleted - user_id : UUID, optional - Filter by user ID if provided - - Returns - ------- - Dict[str, Any] - Cleanup results - """ - db_engine = get_relational_engine() - - async with db_engine.get_async_session() as session: - # Query for Data records with old last_accessed timestamps - query = select(Data, DatasetData).join( - DatasetData, Data.id == DatasetData.data_id - ).where( - or_( - Data.last_accessed < cutoff_date, - Data.last_accessed.is_(None) - ) - ) - - if user_id: - from cognee.modules.data.models import Dataset - query = query.join(Dataset, DatasetData.dataset_id == Dataset.id).where( - Dataset.owner_id == user_id - ) - - result = await session.execute(query) - unused_data = result.all() - - logger.info(f"Found {len(unused_data)} unused documents in SQL") - - if dry_run: - return { - "status": "dry_run", - "unused_count": len(unused_data), - "deleted_count": { - "data_items": 0, - "documents": 0 - }, - "cleanup_date": datetime.now(timezone.utc).isoformat(), - "preview": { - "documents": len(unused_data) - } - } - - # Delete each document using cognee.delete() - deleted_count = 0 - from cognee.modules.users.methods import get_default_user - user = await get_default_user() if user_id is None else None - - for data, dataset_data in unused_data: - try: - await cognee.delete( - data_id=data.id, - dataset_id=dataset_data.dataset_id, - mode="hard", # Use hard mode to also remove orphaned entities - user=user - ) - deleted_count += 1 - logger.info(f"Deleted document {data.id} from dataset {dataset_data.dataset_id}") - except Exception as e: - logger.error(f"Failed to delete document {data.id}: {e}") - - logger.info("Cleanup completed", deleted_count=deleted_count) - - return { - "status": "completed", - "unused_count": len(unused_data), - "deleted_count": { - "data_items": deleted_count, - "documents": deleted_count - }, - "cleanup_date": datetime.now(timezone.utc).isoformat() - } - - -async def _find_unused_nodes_via_projection(cutoff_timestamp_ms: int) -> Dict[str, list]: - """ - Find unused nodes using graph projection - database-agnostic approach. - NOTE: This function is deprecated as it leads to fragmented knowledge graphs. + "cleanup_date": datetime.now(timezone.utc).isoformat(), + "preview": { + "documents": len(unused_data) + } + } - Parameters - ---------- - cutoff_timestamp_ms : int - Cutoff timestamp in milliseconds since epoch + # Delete each document using cognee.delete() + deleted_count = 0 + from cognee.modules.users.methods import get_default_user + user = await get_default_user() if user_id is None else None - Returns - ------- - Dict[str, list] - Dictionary mapping node types to lists of unused node IDs - """ - graph_engine = await get_graph_engine() + for data, dataset_data in unused_data: + try: + await cognee.delete( + data_id=data.id, + dataset_id=dataset_data.dataset_id, + mode="hard", # Use hard mode to also remove orphaned entities + user=user + ) + deleted_count += 1 + logger.info(f"Deleted document {data.id} from dataset {dataset_data.dataset_id}") + except Exception as e: + logger.error(f"Failed to delete document {data.id}: {e}") - # Project the entire graph with necessary properties - memory_fragment = CogneeGraph() - await memory_fragment.project_graph_from_db( - graph_engine, - node_properties_to_project=["id", "type", "last_accessed_at"], - edge_properties_to_project=[] - ) - - unused_nodes = {"DocumentChunk": [], "Entity": [], "TextSummary": []} - - # Get all nodes from the projected graph - all_nodes = memory_fragment.get_nodes() - - for node in all_nodes: - node_type = node.get_attribute("type") - if node_type not in unused_nodes: - continue - - # Check last_accessed_at property - last_accessed = node.get_attribute("last_accessed_at") + logger.info("Cleanup completed", deleted_count=deleted_count) - if last_accessed is None or last_accessed < cutoff_timestamp_ms: - unused_nodes[node_type].append(node.id) - logger.debug( - f"Found unused {node_type}", - node_id=node.id, - last_accessed=last_accessed - ) - - return unused_nodes - - -async def _delete_unused_nodes(unused_nodes: Dict[str, list]) -> Dict[str, int]: - """ - Delete unused nodes from graph and vector databases. - NOTE: This function is deprecated as it leads to fragmented knowledge graphs. - - Parameters - ---------- - unused_nodes : Dict[str, list] - Dictionary mapping node types to lists of node IDs to delete - - Returns - ------- - Dict[str, int] - Count of deleted items by type - """ - graph_engine = await get_graph_engine() - vector_engine = get_vector_engine() - - deleted_counts = { - "DocumentChunk": 0, - "Entity": 0, - "TextSummary": 0, - "associations": 0 - } - - # Count associations before deletion (using graph projection for consistency) - if any(unused_nodes.values()): - memory_fragment = CogneeGraph() - await memory_fragment.project_graph_from_db( - graph_engine, - node_properties_to_project=["id"], - edge_properties_to_project=[] - ) - - for node_type, node_ids in unused_nodes.items(): - if not node_ids: - continue - - # Count edges from the in-memory graph - for node_id in node_ids: - node = memory_fragment.get_node(node_id) - if node: - # Count edges from the in-memory graph - edge_count = len(node.get_skeleton_edges()) - deleted_counts["associations"] += edge_count - - # Delete from graph database (uses DETACH DELETE, so edges are automatically removed) - for node_type, node_ids in unused_nodes.items(): - if not node_ids: - continue - - logger.info(f"Deleting {len(node_ids)} {node_type} nodes from graph database") - - # Delete nodes in batches (database-agnostic) - await graph_engine.delete_nodes(node_ids) - deleted_counts[node_type] = len(node_ids) - - # Delete from vector database - vector_collections = { - "DocumentChunk": "DocumentChunk_text", - "Entity": "Entity_name", - "TextSummary": "TextSummary_text" - } - - - for node_type, collection_name in vector_collections.items(): - node_ids = unused_nodes[node_type] - if not node_ids: - continue - - logger.info(f"Deleting {len(node_ids)} {node_type} embeddings from vector database") - - try: - if await vector_engine.has_collection(collection_name): - await vector_engine.delete_data_points( - collection_name, - [str(node_id) for node_id in node_ids] - ) - except Exception as e: - logger.error(f"Error deleting from vector collection {collection_name}: {e}") - - return deleted_counts + return { + "status": "completed", + "unused_count": len(unused_data), + "deleted_count": { + "data_items": deleted_count, + "documents": deleted_count + }, + "cleanup_date": datetime.now(timezone.utc).isoformat() + } diff --git a/cognee/tests/test_cleanup_unused_data.py b/cognee/tests/test_cleanup_unused_data.py index c21b9f5ea..e738dcba0 100644 --- a/cognee/tests/test_cleanup_unused_data.py +++ b/cognee/tests/test_cleanup_unused_data.py @@ -1,244 +1,172 @@ -import os -import pathlib -import cognee -from datetime import datetime, timezone, timedelta -from uuid import UUID -from sqlalchemy import select, update -from cognee.modules.data.models import Data, DatasetData -from cognee.infrastructure.databases.relational import get_relational_engine -from cognee.modules.users.methods import get_default_user -from cognee.shared.logging_utils import get_logger -from cognee.modules.search.types import SearchType - -logger = get_logger() - - -async def test_textdocument_cleanup_with_sql(): - """ - End-to-end test for TextDocument cleanup based on last_accessed timestamps. +import os +import pathlib +import cognee +from datetime import datetime, timezone, timedelta +from uuid import UUID +from sqlalchemy import select, update +from cognee.modules.data.models import Data, DatasetData +from cognee.infrastructure.databases.relational import get_relational_engine +from cognee.modules.users.methods import get_default_user +from cognee.shared.logging_utils import get_logger +from cognee.modules.search.types import SearchType - Tests: - 1. Add and cognify a document - 2. Perform search to populate last_accessed timestamp - 3. Verify last_accessed is set in SQL Data table - 4. Manually age the timestamp beyond cleanup threshold - 5. Run cleanup with text_doc=True - 6. Verify document was deleted from all databases (relational, graph, and vector) - """ - # Setup test directories - data_directory_path = str( - pathlib.Path( - os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_cleanup") - ).resolve() - ) - cognee_directory_path = str( - pathlib.Path( - os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_cleanup") - ).resolve() - ) +logger = get_logger() - cognee.config.data_root_directory(data_directory_path) - cognee.config.system_root_directory(cognee_directory_path) - # Initialize database - from cognee.modules.engine.operations.setup import setup +async def test_textdocument_cleanup_with_sql(): + """ + End-to-end test for TextDocument cleanup based on last_accessed timestamps. + """ + # Enable last accessed tracking BEFORE any cognee operations + os.environ["ENABLE_LAST_ACCESSED"] = "true" - # Clean slate - await cognee.prune.prune_data() - await cognee.prune.prune_system(metadata=True) - - logger.info("๐Ÿงช Testing TextDocument cleanup based on last_accessed") - - # Step 1: Add and cognify a test document - dataset_name = "test_cleanup_dataset" - test_text = """ - Machine learning is a subset of artificial intelligence that enables systems to learn - and improve from experience without being explicitly programmed. Deep learning uses - neural networks with multiple layers to process data. - """ - - await setup() - user = await get_default_user() - await cognee.add([test_text], dataset_name=dataset_name, user=user) - - cognify_result = await cognee.cognify([dataset_name], user=user) - - # Extract dataset_id from cognify result (ds_id is already a UUID) - dataset_id = None - for ds_id, pipeline_result in cognify_result.items(): - dataset_id = ds_id # Don't wrap in UUID() - it's already a UUID object - break - - assert dataset_id is not None, "Failed to get dataset_id from cognify result" - logger.info(f"โœ… Document added and cognified. Dataset ID: {dataset_id}") - - # Step 2: Perform search to trigger last_accessed update - logger.info("Triggering search to update last_accessed...") - search_results = await cognee.search( - query_type=SearchType.CHUNKS, - query_text="machine learning", - datasets=[dataset_name], - user=user - ) - logger.info(f"โœ… Search completed, found {len(search_results)} results") - - # Step 3: Verify last_accessed was set in SQL Data table - db_engine = get_relational_engine() - async with db_engine.get_async_session() as session: - # Get the Data record for this dataset - result = await session.execute( - select(Data, DatasetData) - .join(DatasetData, Data.id == DatasetData.data_id) - .where(DatasetData.dataset_id == dataset_id) - ) - data_records = result.all() - assert len(data_records) > 0, "No Data records found for the dataset" - data_record = data_records[0][0] - data_id = data_record.id + # Setup test directories + data_directory_path = str( + pathlib.Path( + os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_cleanup") + ).resolve() + ) + cognee_directory_path = str( + pathlib.Path( + os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_cleanup") + ).resolve() + ) - # Verify last_accessed is set (should be set by search operation) - assert data_record.last_accessed is not None, ( - "last_accessed should be set after search operation" - ) + cognee.config.data_root_directory(data_directory_path) + cognee.config.system_root_directory(cognee_directory_path) - original_last_accessed = data_record.last_accessed - logger.info(f"โœ… last_accessed verified: {original_last_accessed}") - - # Step 4: Manually age the timestamp to be older than cleanup threshold - days_threshold = 30 - aged_timestamp = datetime.now(timezone.utc) - timedelta(days=days_threshold + 10) - - async with db_engine.get_async_session() as session: - stmt = update(Data).where(Data.id == data_id).values(last_accessed=aged_timestamp) - await session.execute(stmt) - await session.commit() - - # Query in a NEW session to avoid cached values - async with db_engine.get_async_session() as session: - result = await session.execute(select(Data).where(Data.id == data_id)) - updated_data = result.scalar_one_or_none() + # Initialize database + from cognee.modules.engine.operations.setup import setup - # Make both timezone-aware for comparison - retrieved_timestamp = updated_data.last_accessed - if retrieved_timestamp.tzinfo is None: - # If database returned naive datetime, make it UTC-aware - retrieved_timestamp = retrieved_timestamp.replace(tzinfo=timezone.utc) + # Clean slate + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) - assert retrieved_timestamp == aged_timestamp, ( - f"Timestamp should be updated to aged value. " - f"Expected: {aged_timestamp}, Got: {retrieved_timestamp}" - ) - - # Step 5: Test cleanup with text_doc=True - from cognee.tasks.cleanup.cleanup_unused_data import cleanup_unused_data - - # First do a dry run - logger.info("Testing dry run with text_doc=True...") - dry_run_result = await cleanup_unused_data( - days_threshold=30, - dry_run=True, - user_id=user.id, - text_doc=True - ) - - assert dry_run_result['status'] == 'dry_run', "Status should be 'dry_run'" - assert dry_run_result['unused_count'] > 0, ( - "Should find at least one unused document" - ) - logger.info(f"โœ… Dry run found {dry_run_result['unused_count']} unused documents") - - # Now run actual cleanup - logger.info("Executing cleanup with text_doc=True...") - cleanup_result = await cleanup_unused_data( - days_threshold=30, - dry_run=False, - user_id=user.id, - text_doc=True - ) - - assert cleanup_result["status"] == "completed", "Cleanup should complete successfully" - assert cleanup_result["deleted_count"]["documents"] > 0, ( - "At least one document should be deleted" - ) - logger.info(f"โœ… Cleanup completed. Deleted {cleanup_result['deleted_count']['documents']} documents") - - # Step 6: Verify the document was actually deleted from SQL - async with db_engine.get_async_session() as session: - deleted_data = ( - await session.execute(select(Data).where(Data.id == data_id)) - ).scalar_one_or_none() + logger.info("๐Ÿงช Testing TextDocument cleanup based on last_accessed") - assert deleted_data is None, ( - "Data record should be deleted after cleanup" - ) - logger.info("โœ… Confirmed: Data record was deleted from SQL database") - - # Verify the dataset-data link was also removed - async with db_engine.get_async_session() as session: - dataset_data_link = ( - await session.execute( - select(DatasetData).where( - DatasetData.data_id == data_id, - DatasetData.dataset_id == dataset_id - ) - ) - ).scalar_one_or_none() + # Step 1: Add and cognify a test document + dataset_name = "test_cleanup_dataset" + test_text = """ + Machine learning is a subset of artificial intelligence that enables systems to learn + and improve from experience without being explicitly programmed. Deep learning uses + neural networks with multiple layers to process data. + """ - assert dataset_data_link is None, ( - "DatasetData link should be deleted after cleanup" - ) - logger.info("โœ… Confirmed: DatasetData link was deleted") + await setup() + user = await get_default_user() + await cognee.add([test_text], dataset_name=dataset_name, user=user) + + cognify_result = await cognee.cognify([dataset_name], user=user) + + # Extract dataset_id from cognify result + dataset_id = None + for ds_id, pipeline_result in cognify_result.items(): + dataset_id = ds_id + break + + assert dataset_id is not None, "Failed to get dataset_id from cognify result" + logger.info(f"โœ… Document added and cognified. Dataset ID: {dataset_id}") + + # Step 2: Perform search to trigger last_accessed update + logger.info("Triggering search to update last_accessed...") + search_results = await cognee.search( + query_type=SearchType.CHUNKS, + query_text="machine learning", + datasets=[dataset_name], + user=user + ) + logger.info(f"โœ… Search completed, found {len(search_results)} results") + assert len(search_results) > 0, "Search should return results" + + # Step 3: Verify last_accessed was set and get data_id + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + result = await session.execute( + select(Data, DatasetData) + .join(DatasetData, Data.id == DatasetData.data_id) + .where(DatasetData.dataset_id == dataset_id) + ) + data_records = result.all() + assert len(data_records) > 0, "No Data records found for the dataset" + data_record = data_records[0][0] + data_id = data_record.id + + # Verify last_accessed is set + assert data_record.last_accessed is not None, ( + "last_accessed should be set after search operation" + ) + + original_last_accessed = data_record.last_accessed + logger.info(f"โœ… last_accessed verified: {original_last_accessed}") + + # Step 4: Manually age the timestamp + minutes_threshold = 30 + aged_timestamp = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold + 10) + + async with db_engine.get_async_session() as session: + stmt = update(Data).where(Data.id == data_id).values(last_accessed=aged_timestamp) + await session.execute(stmt) + await session.commit() + + # Verify timestamp was updated + async with db_engine.get_async_session() as session: + result = await session.execute(select(Data).where(Data.id == data_id)) + updated_data = result.scalar_one_or_none() + assert updated_data is not None, "Data record should exist" + retrieved_timestamp = updated_data.last_accessed + if retrieved_timestamp.tzinfo is None: + retrieved_timestamp = retrieved_timestamp.replace(tzinfo=timezone.utc) + assert retrieved_timestamp == aged_timestamp, ( + f"Timestamp should be updated to aged value" + ) + + # Step 5: Test cleanup (document-level is now the default) + from cognee.tasks.cleanup.cleanup_unused_data import cleanup_unused_data + + # First do a dry run + logger.info("Testing dry run...") + dry_run_result = await cleanup_unused_data( + minutes_threshold=10, + dry_run=True, + user_id=user.id + ) + + # Debug: Print the actual result + logger.info(f"Dry run result: {dry_run_result}") - # Verify graph nodes were cleaned up - from cognee.infrastructure.databases.graph import get_graph_engine + assert dry_run_result['status'] == 'dry_run', f"Status should be 'dry_run', got: {dry_run_result['status']}" + assert dry_run_result['unused_count'] > 0, ( + "Should find at least one unused document" + ) + logger.info(f"โœ… Dry run found {dry_run_result['unused_count']} unused documents") + + # Now run actual cleanup + logger.info("Executing cleanup...") + cleanup_result = await cleanup_unused_data( + minutes_threshold=30, + dry_run=False, + user_id=user.id + ) + + assert cleanup_result["status"] == "completed", "Cleanup should complete successfully" + assert cleanup_result["deleted_count"]["documents"] > 0, ( + "At least one document should be deleted" + ) + logger.info(f"โœ… Cleanup completed. Deleted {cleanup_result['deleted_count']['documents']} documents") + + # Step 6: Verify deletion + async with db_engine.get_async_session() as session: + deleted_data = ( + await session.execute(select(Data).where(Data.id == data_id)) + ).scalar_one_or_none() + assert deleted_data is None, "Data record should be deleted" + logger.info("โœ… Confirmed: Data record was deleted") + + logger.info("๐ŸŽ‰ All cleanup tests passed!") + return True - graph_engine = await get_graph_engine() - # Try to find the TextDocument node - it should not exist - result = await graph_engine.query( - "MATCH (n:Node {id: $id}) RETURN n", - {"id": str(data_id)} - ) - - assert len(result) == 0, ( - "TextDocument node should be deleted from graph database" - ) - logger.info("โœ… Confirmed: TextDocument node was deleted from graph database") - - # Verify vector database was cleaned up - from cognee.infrastructure.databases.vector import get_vector_engine - - vector_engine = get_vector_engine() - - # Check each collection that should have been cleaned up - vector_collections = [ - "DocumentChunk_text", - "Entity_name", - "TextSummary_text" - ] - - for collection_name in vector_collections: - if await vector_engine.has_collection(collection_name): - # Try to retrieve the deleted data points - try: - results = await vector_engine.retrieve(collection_name, [str(data_id)]) - assert len(results) == 0, ( - f"Data points should be deleted from {collection_name} collection" - ) - logger.info(f"โœ… Confirmed: {collection_name} collection is clean") - except Exception as e: - # Collection might be empty or not exist, which is fine - logger.info(f"โœ… Confirmed: {collection_name} collection is empty or doesn't exist") - pass - - logger.info("โœ… Confirmed: Vector database entries were deleted") - - logger.info("๐ŸŽ‰ All cleanup tests passed!") - - return True - - -if __name__ == "__main__": - import asyncio - success = asyncio.run(test_textdocument_cleanup_with_sql()) +if __name__ == "__main__": + import asyncio + success = asyncio.run(test_textdocument_cleanup_with_sql()) exit(0 if success else 1)