From 07e67e268b6b8c383ad1184e49c49c9ad5d93e29 Mon Sep 17 00:00:00 2001 From: chinu0609 Date: Fri, 26 Dec 2025 09:04:39 +0530 Subject: [PATCH 1/6] fix: implementing deletion in search.py --- .../retrieval/utils/access_tracking.py | 13 ++- cognee/modules/search/methods/search.py | 106 ++++++++++++++---- 2 files changed, 95 insertions(+), 24 deletions(-) diff --git a/cognee/modules/retrieval/utils/access_tracking.py b/cognee/modules/retrieval/utils/access_tracking.py index 54fd043b9..b2d98924a 100644 --- a/cognee/modules/retrieval/utils/access_tracking.py +++ b/cognee/modules/retrieval/utils/access_tracking.py @@ -25,12 +25,17 @@ async def update_node_access_timestamps(items: List[Any]): graph_engine = await get_graph_engine() timestamp_dt = datetime.now(timezone.utc) - # Extract node IDs + # Extract node IDs - updated for graph node format node_ids = [] for item in items: - item_id = item.payload.get("id") if hasattr(item, 'payload') else item.get("id") - if item_id: - node_ids.append(str(item_id)) + # Handle graph nodes from prepare_search_result (direct id attribute) + if hasattr(item, 'id'): + node_ids.append(str(item.id)) + # Fallback for original retriever format + elif hasattr(item, 'payload') and item.payload.get("id"): + node_ids.append(str(item.payload.get("id"))) + elif isinstance(item, dict) and item.get("id"): + node_ids.append(str(item.get("id"))) if not node_ids: return diff --git a/cognee/modules/search/methods/search.py b/cognee/modules/search/methods/search.py index b4278424b..8709a34a1 100644 --- a/cognee/modules/search/methods/search.py +++ b/cognee/modules/search/methods/search.py @@ -28,6 +28,7 @@ from cognee import __version__ as cognee_version from .get_search_type_tools import get_search_type_tools from .no_access_control_search import no_access_control_search from ..utils.prepare_search_result import prepare_search_result +from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps # Import your function logger = get_logger() @@ -47,6 +48,9 @@ async def search( only_context: bool = False, use_combined_context: bool = False, session_id: Optional[str] = None, + wide_search_top_k: Optional[int] = 100, + triplet_distance_penalty: Optional[float] = 3.5, + verbose: bool = False, ) -> Union[CombinedSearchResult, List[SearchResult]]: """ @@ -73,9 +77,11 @@ async def search( }, ) + actual_accessed_items = [] # Collect all accessed items here + # Use search function filtered by permissions if access control is enabled if backend_access_control_enabled(): - search_results = await authorized_search( + raw_search_results = await authorized_search( query_type=query_type, query_text=query_text, user=user, @@ -90,9 +96,22 @@ async def search( only_context=only_context, use_combined_context=use_combined_context, session_id=session_id, + wide_search_top_k=wide_search_top_k, + triplet_distance_penalty=triplet_distance_penalty, ) + if use_combined_context: + # raw_search_results is (completion, context, datasets) + _, context_data, _ = raw_search_results + if isinstance(context_data, list): # Expecting a list of Edge or similar + actual_accessed_items.extend(context_data) + # If context_data is a string, it's already textual and might not map to specific nodes for timestamp updates + else: + for result_tuple in raw_search_results: + _, context_data, _ = result_tuple + if isinstance(context_data, list): # Expecting a list of Edge or similar + actual_accessed_items.extend(context_data) else: - search_results = [ + raw_search_results = [ await no_access_control_search( query_type=query_type, query_text=query_text, @@ -105,8 +124,19 @@ async def search( last_k=last_k, only_context=only_context, session_id=session_id, + wide_search_top_k=wide_search_top_k, + triplet_distance_penalty=triplet_distance_penalty, ) ] + # In this case, raw_search_results is a list containing a single tuple + if raw_search_results: + _, context_data, _ = raw_search_results[0] + if isinstance(context_data, list): # Expecting a list of Edge or similar + actual_accessed_items.extend(context_data) + + # Call the update_node_access_timestamps function here + # Pass the collected actual_accessed_items + await update_node_access_timestamps(actual_accessed_items) send_telemetry( "cognee.search EXECUTION COMPLETED", @@ -117,6 +147,19 @@ async def search( }, ) + # The rest of the code for logging and preparing results remains largely the same + # Ensure search_results is correctly defined for the subsequent logging/preparation logic + # based on how it was processed in the if/else blocks above. + # For now, let's assume 'search_results' should refer to 'raw_search_results' + # for the purpose of this part of the code, or be re-structured to use the + # collected components for the final output. + + # This part needs careful adjustment based on the exact structure expected by prepare_search_result + # and the final return type. + # For simplicity here, let's re-assign search_results to raw_search_results for the original flow + search_results = raw_search_results + # ... rest of the original function ... + await log_result( query.id, json.dumps( @@ -134,6 +177,7 @@ async def search( ) if use_combined_context: + # Note: combined context search must always be verbose and return a CombinedSearchResult with graphs info prepared_search_results = await prepare_search_result( search_results[0] if isinstance(search_results, list) else search_results ) @@ -167,25 +211,30 @@ async def search( datasets = prepared_search_results["datasets"] if only_context: - return_value.append( - { - "search_result": [context] if context else None, - "dataset_id": datasets[0].id, - "dataset_name": datasets[0].name, - "dataset_tenant_id": datasets[0].tenant_id, - "graphs": graphs, - } - ) + search_result_dict = { + "search_result": [context] if context else None, + "dataset_id": datasets[0].id, + "dataset_name": datasets[0].name, + "dataset_tenant_id": datasets[0].tenant_id, + } + if verbose: + # Include graphs only in verbose mode + search_result_dict["graphs"] = graphs + + return_value.append(search_result_dict) else: - return_value.append( - { - "search_result": [result] if result else None, - "dataset_id": datasets[0].id, - "dataset_name": datasets[0].name, - "dataset_tenant_id": datasets[0].tenant_id, - "graphs": graphs, - } - ) + search_result_dict = { + "search_result": [result] if result else None, + "dataset_id": datasets[0].id, + "dataset_name": datasets[0].name, + "dataset_tenant_id": datasets[0].tenant_id, + } + if verbose: + # Include graphs only in verbose mode + search_result_dict["graphs"] = graphs + + return_value.append(search_result_dict) + return return_value else: return_value = [] @@ -219,6 +268,8 @@ async def authorized_search( only_context: bool = False, use_combined_context: bool = False, session_id: Optional[str] = None, + wide_search_top_k: Optional[int] = 100, + triplet_distance_penalty: Optional[float] = 3.5, ) -> Union[ Tuple[Any, Union[List[Edge], str], List[Dataset]], List[Tuple[Any, Union[List[Edge], str], List[Dataset]]], @@ -246,6 +297,8 @@ async def authorized_search( last_k=last_k, only_context=True, session_id=session_id, + wide_search_top_k=wide_search_top_k, + triplet_distance_penalty=triplet_distance_penalty, ) context = {} @@ -267,6 +320,8 @@ async def authorized_search( node_name=node_name, save_interaction=save_interaction, last_k=last_k, + wide_search_top_k=wide_search_top_k, + triplet_distance_penalty=triplet_distance_penalty, ) search_tools = specific_search_tools if len(search_tools) == 2: @@ -306,6 +361,8 @@ async def authorized_search( last_k=last_k, only_context=only_context, session_id=session_id, + wide_search_top_k=wide_search_top_k, + triplet_distance_penalty=triplet_distance_penalty, ) return search_results @@ -325,6 +382,8 @@ async def search_in_datasets_context( only_context: bool = False, context: Optional[Any] = None, session_id: Optional[str] = None, + wide_search_top_k: Optional[int] = 100, + triplet_distance_penalty: Optional[float] = 3.5, ) -> List[Tuple[Any, Union[str, List[Edge]], List[Dataset]]]: """ Searches all provided datasets and handles setting up of appropriate database context based on permissions. @@ -345,6 +404,8 @@ async def search_in_datasets_context( only_context: bool = False, context: Optional[Any] = None, session_id: Optional[str] = None, + wide_search_top_k: Optional[int] = 100, + triplet_distance_penalty: Optional[float] = 3.5, ) -> Tuple[Any, Union[str, List[Edge]], List[Dataset]]: # Set database configuration in async context for each dataset user has access for await set_database_global_context_variables(dataset.id, dataset.owner_id) @@ -378,6 +439,8 @@ async def search_in_datasets_context( node_name=node_name, save_interaction=save_interaction, last_k=last_k, + wide_search_top_k=wide_search_top_k, + triplet_distance_penalty=triplet_distance_penalty, ) search_tools = specific_search_tools if len(search_tools) == 2: @@ -413,7 +476,10 @@ async def search_in_datasets_context( only_context=only_context, context=context, session_id=session_id, + wide_search_top_k=wide_search_top_k, + triplet_distance_penalty=triplet_distance_penalty, ) ) return await asyncio.gather(*tasks) + From 4860d1c5500d43a497287bdf720ffaf8cf36d753 Mon Sep 17 00:00:00 2001 From: chinu0609 Date: Fri, 26 Dec 2025 09:13:29 +0530 Subject: [PATCH 2/6] fix: implementing deletion in search.py --- cognee/modules/retrieval/chunks_retriever.py | 2 -- cognee/modules/retrieval/completion_retriever.py | 2 -- cognee/modules/retrieval/graph_completion_retriever.py | 2 -- cognee/modules/retrieval/summaries_retriever.py | 2 -- 4 files changed, 8 deletions(-) diff --git a/cognee/modules/retrieval/chunks_retriever.py b/cognee/modules/retrieval/chunks_retriever.py index b7a90238a..4ddc841fd 100644 --- a/cognee/modules/retrieval/chunks_retriever.py +++ b/cognee/modules/retrieval/chunks_retriever.py @@ -1,5 +1,4 @@ from typing import Any, Optional -from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps from cognee.shared.logging_utils import get_logger from cognee.infrastructure.databases.vector import get_vector_engine from cognee.modules.retrieval.base_retriever import BaseRetriever @@ -49,7 +48,6 @@ class ChunksRetriever(BaseRetriever): try: found_chunks = await vector_engine.search("DocumentChunk_text", query, limit=self.top_k) logger.info(f"Found {len(found_chunks)} chunks from vector search") - await update_node_access_timestamps(found_chunks) except CollectionNotFoundError as error: logger.error("DocumentChunk_text collection not found in vector database") diff --git a/cognee/modules/retrieval/completion_retriever.py b/cognee/modules/retrieval/completion_retriever.py index 0e9a4167c..58e4b03eb 100644 --- a/cognee/modules/retrieval/completion_retriever.py +++ b/cognee/modules/retrieval/completion_retriever.py @@ -8,7 +8,6 @@ from cognee.modules.retrieval.utils.session_cache import ( save_conversation_history, get_conversation_history, ) -from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps from cognee.modules.retrieval.base_retriever import BaseRetriever from cognee.modules.retrieval.exceptions.exceptions import NoDataError from cognee.infrastructure.databases.vector.exceptions import CollectionNotFoundError @@ -66,7 +65,6 @@ class CompletionRetriever(BaseRetriever): if len(found_chunks) == 0: return "" - await update_node_access_timestamps(found_chunks) # Combine all chunks text returned from vector search (number of chunks is determined by top_k chunks_payload = [found_chunk.payload["text"] for found_chunk in found_chunks] combined_context = "\n".join(chunks_payload) diff --git a/cognee/modules/retrieval/graph_completion_retriever.py b/cognee/modules/retrieval/graph_completion_retriever.py index d26091eee..bc36fa753 100644 --- a/cognee/modules/retrieval/graph_completion_retriever.py +++ b/cognee/modules/retrieval/graph_completion_retriever.py @@ -16,7 +16,6 @@ from cognee.modules.retrieval.utils.session_cache import ( ) from cognee.shared.logging_utils import get_logger from cognee.modules.retrieval.utils.extract_uuid_from_node import extract_uuid_from_node -from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps from cognee.modules.retrieval.utils.models import CogneeUserInteraction from cognee.modules.engine.models.node_set import NodeSet from cognee.infrastructure.databases.graph import get_graph_engine @@ -143,7 +142,6 @@ class GraphCompletionRetriever(BaseGraphRetriever): entity_nodes = get_entity_nodes_from_triplets(triplets) - await update_node_access_timestamps(entity_nodes) return triplets async def get_completion( diff --git a/cognee/modules/retrieval/summaries_retriever.py b/cognee/modules/retrieval/summaries_retriever.py index 0df750d22..e24f22e82 100644 --- a/cognee/modules/retrieval/summaries_retriever.py +++ b/cognee/modules/retrieval/summaries_retriever.py @@ -4,7 +4,6 @@ from cognee.shared.logging_utils import get_logger from cognee.infrastructure.databases.vector import get_vector_engine from cognee.modules.retrieval.base_retriever import BaseRetriever from cognee.modules.retrieval.exceptions.exceptions import NoDataError -from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps from cognee.infrastructure.databases.vector.exceptions.exceptions import CollectionNotFoundError logger = get_logger("SummariesRetriever") @@ -56,7 +55,6 @@ class SummariesRetriever(BaseRetriever): ) logger.info(f"Found {len(summaries_results)} summaries from vector search") - await update_node_access_timestamps(summaries_results) except CollectionNotFoundError as error: logger.error("TextSummary_text collection not found in vector database") From 09c338c668d0ebaf1cc5da029e8d6ddcb14582b2 Mon Sep 17 00:00:00 2001 From: chinu0609 Date: Fri, 26 Dec 2025 09:23:01 +0530 Subject: [PATCH 3/6] fix: removing comments --- cognee/modules/search/methods/search.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/cognee/modules/search/methods/search.py b/cognee/modules/search/methods/search.py index e5ddf52de..5b436d8e0 100644 --- a/cognee/modules/search/methods/search.py +++ b/cognee/modules/search/methods/search.py @@ -147,18 +147,7 @@ async def search( }, ) - # The rest of the code for logging and preparing results remains largely the same - # Ensure search_results is correctly defined for the subsequent logging/preparation logic - # based on how it was processed in the if/else blocks above. - # For now, let's assume 'search_results' should refer to 'raw_search_results' - # for the purpose of this part of the code, or be re-structured to use the - # collected components for the final output. - - # This part needs careful adjustment based on the exact structure expected by prepare_search_result - # and the final return type. - # For simplicity here, let's re-assign search_results to raw_search_results for the original flow search_results = raw_search_results - # ... rest of the original function ... await log_result( query.id, From 7d5dc69c5a995b3013df00463bf9be9ff970e006 Mon Sep 17 00:00:00 2001 From: chinu0609 Date: Fri, 26 Dec 2025 09:56:45 +0530 Subject: [PATCH 4/6] fix: change in .env.template --- .env.template | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.env.template b/.env.template index fe168cf91..1f7941a26 100644 --- a/.env.template +++ b/.env.template @@ -216,6 +216,11 @@ TOKENIZERS_PARALLELISM="false" # LITELLM Logging Level. Set to quiet down logging LITELLM_LOG="ERROR" + +# Enable or disable the last accessed timestamp tracking and cleanup functionality. + +ENABLE_LAST_ACCESSED="false" + # Set this environment variable to disable sending telemetry data # TELEMETRY_DISABLED=1 From a0808f50f7fc773f940cf002e9f05375f6752644 Mon Sep 17 00:00:00 2001 From: chinu0609 Date: Mon, 19 Jan 2026 11:30:48 +0530 Subject: [PATCH 5/6] fix: removing search from individual retrievers --- cognee/modules/search/methods/search.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/cognee/modules/search/methods/search.py b/cognee/modules/search/methods/search.py index 6fb3ee0bc..91b88ef6b 100644 --- a/cognee/modules/search/methods/search.py +++ b/cognee/modules/search/methods/search.py @@ -13,8 +13,9 @@ from cognee.context_global_variables import backend_access_control_enabled from cognee.modules.engine.models.node_set import NodeSet from cognee.modules.graph.cognee_graph.CogneeGraphElements import Edge from cognee.modules.search.types import ( - SearchResult, - SearchType, + SearchResultDataset, + SearchResult, + SearchType, ) from cognee.modules.search.operations import log_query, log_result from cognee.modules.users.models import User @@ -44,11 +45,12 @@ async def search( save_interaction: bool = False, last_k: Optional[int] = None, only_context: bool = False, + use_combined_context: bool = False, session_id: Optional[str] = None, wide_search_top_k: Optional[int] = 100, triplet_distance_penalty: Optional[float] = 3.5, verbose: bool = False, -) -> Union[CombinedSearchResult, List[SearchResult]]: +) -> List[SearchResult]: """ Args: From 50ccb147c3a6d159c431f63030bf44941a1579d3 Mon Sep 17 00:00:00 2001 From: chinu0609 Date: Mon, 19 Jan 2026 11:43:46 +0530 Subject: [PATCH 6/6] fix: adding tests --- cognee/modules/search/methods/search.py | 10 +- cognee/tests/test_cleanup_unused_data.py | 360 ++++++++++++----------- 2 files changed, 202 insertions(+), 168 deletions(-) diff --git a/cognee/modules/search/methods/search.py b/cognee/modules/search/methods/search.py index 6fb3ee0bc..eed3dc85d 100644 --- a/cognee/modules/search/methods/search.py +++ b/cognee/modules/search/methods/search.py @@ -13,8 +13,9 @@ from cognee.context_global_variables import backend_access_control_enabled from cognee.modules.engine.models.node_set import NodeSet from cognee.modules.graph.cognee_graph.CogneeGraphElements import Edge from cognee.modules.search.types import ( - SearchResult, - SearchType, + SearchResultDataset, + SearchResult, + SearchType, ) from cognee.modules.search.operations import log_query, log_result from cognee.modules.users.models import User @@ -26,7 +27,7 @@ from cognee import __version__ as cognee_version from .get_search_type_tools import get_search_type_tools from .no_access_control_search import no_access_control_search from ..utils.prepare_search_result import prepare_search_result -from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps # Import your function +from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps logger = get_logger() @@ -44,11 +45,12 @@ async def search( save_interaction: bool = False, last_k: Optional[int] = None, only_context: bool = False, + use_combined_context: bool = False, session_id: Optional[str] = None, wide_search_top_k: Optional[int] = 100, triplet_distance_penalty: Optional[float] = 3.5, verbose: bool = False, -) -> Union[CombinedSearchResult, List[SearchResult]]: +) -> List[SearchResult]: """ Args: diff --git a/cognee/tests/test_cleanup_unused_data.py b/cognee/tests/test_cleanup_unused_data.py index abb6d8f3e..01bbd9744 100644 --- a/cognee/tests/test_cleanup_unused_data.py +++ b/cognee/tests/test_cleanup_unused_data.py @@ -1,165 +1,197 @@ -import os -import pathlib -import cognee -from datetime import datetime, timezone, timedelta -from uuid import UUID -from sqlalchemy import select, update -from cognee.modules.data.models import Data, DatasetData -from cognee.infrastructure.databases.relational import get_relational_engine -from cognee.modules.users.methods import get_default_user -from cognee.shared.logging_utils import get_logger -from cognee.modules.search.types import SearchType - -logger = get_logger() - - -async def test_textdocument_cleanup_with_sql(): - """ - End-to-end test for TextDocument cleanup based on last_accessed timestamps. - """ - # Enable last accessed tracking BEFORE any cognee operations - os.environ["ENABLE_LAST_ACCESSED"] = "true" - - # Setup test directories - data_directory_path = str( - pathlib.Path( - os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_cleanup") - ).resolve() - ) - cognee_directory_path = str( - pathlib.Path( - os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_cleanup") - ).resolve() - ) - - cognee.config.data_root_directory(data_directory_path) - cognee.config.system_root_directory(cognee_directory_path) - - # Initialize database - from cognee.modules.engine.operations.setup import setup - - # Clean slate - await cognee.prune.prune_data() - await cognee.prune.prune_system(metadata=True) - - logger.info("๐Ÿงช Testing TextDocument cleanup based on last_accessed") - - # Step 1: Add and cognify a test document - dataset_name = "test_cleanup_dataset" - test_text = """ - Machine learning is a subset of artificial intelligence that enables systems to learn - and improve from experience without being explicitly programmed. Deep learning uses - neural networks with multiple layers to process data. - """ - - await setup() - user = await get_default_user() - await cognee.add([test_text], dataset_name=dataset_name, user=user) - - cognify_result = await cognee.cognify([dataset_name], user=user) - - # Extract dataset_id from cognify result - dataset_id = None - for ds_id, pipeline_result in cognify_result.items(): - dataset_id = ds_id - break - - assert dataset_id is not None, "Failed to get dataset_id from cognify result" - logger.info(f"โœ… Document added and cognified. Dataset ID: {dataset_id}") - - # Step 2: Perform search to trigger last_accessed update - logger.info("Triggering search to update last_accessed...") - search_results = await cognee.search( - query_type=SearchType.CHUNKS, - query_text="machine learning", - datasets=[dataset_name], - user=user, - ) - logger.info(f"โœ… Search completed, found {len(search_results)} results") - assert len(search_results) > 0, "Search should return results" - - # Step 3: Verify last_accessed was set and get data_id - db_engine = get_relational_engine() - async with db_engine.get_async_session() as session: - result = await session.execute( - select(Data, DatasetData) - .join(DatasetData, Data.id == DatasetData.data_id) - .where(DatasetData.dataset_id == dataset_id) - ) - data_records = result.all() - assert len(data_records) > 0, "No Data records found for the dataset" - data_record = data_records[0][0] - data_id = data_record.id - - # Verify last_accessed is set - assert data_record.last_accessed is not None, ( - "last_accessed should be set after search operation" - ) - - original_last_accessed = data_record.last_accessed - logger.info(f"โœ… last_accessed verified: {original_last_accessed}") - - # Step 4: Manually age the timestamp - minutes_threshold = 30 - aged_timestamp = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold + 10) - - async with db_engine.get_async_session() as session: - stmt = update(Data).where(Data.id == data_id).values(last_accessed=aged_timestamp) - await session.execute(stmt) - await session.commit() - - # Verify timestamp was updated - async with db_engine.get_async_session() as session: - result = await session.execute(select(Data).where(Data.id == data_id)) - updated_data = result.scalar_one_or_none() - assert updated_data is not None, "Data record should exist" - retrieved_timestamp = updated_data.last_accessed - if retrieved_timestamp.tzinfo is None: - retrieved_timestamp = retrieved_timestamp.replace(tzinfo=timezone.utc) - assert retrieved_timestamp == aged_timestamp, "Timestamp should be updated to aged value" - - # Step 5: Test cleanup (document-level is now the default) - from cognee.tasks.cleanup.cleanup_unused_data import cleanup_unused_data - - # First do a dry run - logger.info("Testing dry run...") - dry_run_result = await cleanup_unused_data(minutes_threshold=10, dry_run=True, user_id=user.id) - - # Debug: Print the actual result - logger.info(f"Dry run result: {dry_run_result}") - - assert dry_run_result["status"] == "dry_run", ( - f"Status should be 'dry_run', got: {dry_run_result['status']}" - ) - assert dry_run_result["unused_count"] > 0, "Should find at least one unused document" - logger.info(f"โœ… Dry run found {dry_run_result['unused_count']} unused documents") - - # Now run actual cleanup - logger.info("Executing cleanup...") - cleanup_result = await cleanup_unused_data(minutes_threshold=30, dry_run=False, user_id=user.id) - - assert cleanup_result["status"] == "completed", "Cleanup should complete successfully" - assert cleanup_result["deleted_count"]["documents"] > 0, ( - "At least one document should be deleted" - ) - logger.info( - f"โœ… Cleanup completed. Deleted {cleanup_result['deleted_count']['documents']} documents" - ) - - # Step 6: Verify deletion - async with db_engine.get_async_session() as session: - deleted_data = ( - await session.execute(select(Data).where(Data.id == data_id)) - ).scalar_one_or_none() - assert deleted_data is None, "Data record should be deleted" - logger.info("โœ… Confirmed: Data record was deleted") - - logger.info("๐ŸŽ‰ All cleanup tests passed!") - return True - - -if __name__ == "__main__": - import asyncio - - success = asyncio.run(test_textdocument_cleanup_with_sql()) +import os +import pathlib +import cognee +from datetime import datetime, timezone, timedelta +from uuid import UUID +from sqlalchemy import select, update +from cognee.modules.data.models import Data, DatasetData +from cognee.infrastructure.databases.relational import get_relational_engine +from cognee.modules.users.methods import get_default_user +from cognee.shared.logging_utils import get_logger +from cognee.modules.search.types import SearchType + +logger = get_logger() + + +async def test_all_search_types_cleanup(): + """ + End-to-end test for TextDocument cleanup based on last_accessed timestamps + across all search types. + """ + # Enable last accessed tracking BEFORE any cognee operations + os.environ["ENABLE_LAST_ACCESSED"] = "true" + + # Setup test directories + data_directory_path = str( + pathlib.Path( + os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_cleanup") + ).resolve() + ) + cognee_directory_path = str( + pathlib.Path( + os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_cleanup") + ).resolve() + ) + + cognee.config.data_root_directory(data_directory_path) + cognee.config.system_root_directory(cognee_directory_path) + + # Initialize database + from cognee.modules.engine.operations.setup import setup + + # Clean slate + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + logger.info("๐Ÿงช Testing TextDocument cleanup for all search types") + + # Step 1: Add and cognify a test document + dataset_name = "test_cleanup_dataset" + test_text = """ + Machine learning is a subset of artificial intelligence that enables systems to learn + and improve from experience without being explicitly programmed. Deep learning uses + neural networks with multiple layers to process data. + """ + + await setup() + user = await get_default_user() + await cognee.add([test_text], dataset_name=dataset_name, user=user) + + cognify_result = await cognee.cognify([dataset_name], user=user) + + # Extract dataset_id from cognify result + dataset_id = None + for ds_id, pipeline_result in cognify_result.items(): + dataset_id = ds_id + break + + assert dataset_id is not None, "Failed to get dataset_id from cognify result" + logger.info(f"โœ… Document added and cognified. Dataset ID: {dataset_id}") + + # All available search types to test (excluding CODE) + search_types_to_test = [ + SearchType.CHUNKS, + SearchType.SUMMARIES, + SearchType.RAG_COMPLETION, + SearchType.GRAPH_COMPLETION, + SearchType.GRAPH_SUMMARY_COMPLETION, + SearchType.GRAPH_COMPLETION_COT, + SearchType.GRAPH_COMPLETION_CONTEXT_EXTENSION, + SearchType.FEELING_LUCKY, + SearchType.CHUNKS_LEXICAL, + ] + + # Skip search types that require special data or permissions + skip_types = { + SearchType.CYPHER, # Requires ALLOW_CYPHER_QUERY=true + SearchType.NATURAL_LANGUAGE, # Requires ALLOW_CYPHER_QUERY=true + SearchType.FEEDBACK, # Requires previous search interaction + SearchType.TEMPORAL, # Requires temporal data + SearchType.CODING_RULES, # Requires coding rules data + } + + tested_data_ids = [] + + # Test each search type + for search_type in search_types_to_test: + if search_type in skip_types: + logger.info(f"โญ๏ธ Skipping {search_type.value} (requires special setup)") + continue + + logger.info(f"๐Ÿ” Testing {search_type.value} search...") + + try: + # Perform search to trigger last_accessed update + search_results = await cognee.search( + query_type=search_type, + query_text="machine learning", + datasets=[dataset_name], + user=user, + ) + + logger.info(f"โœ… {search_type.value} search completed, found {len(search_results)} results") + + # Verify last_accessed was set + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + result = await session.execute( + select(Data, DatasetData) + .join(DatasetData, Data.id == DatasetData.data_id) + .where(DatasetData.dataset_id == dataset_id) + ) + data_records = result.all() + assert len(data_records) > 0, "No Data records found for the dataset" + data_record = data_records[0][0] + data_id = data_record.id + + # Verify last_accessed is set + assert data_record.last_accessed is not None, ( + f"last_accessed should be set after {search_type.value} search operation" + ) + + original_last_accessed = data_record.last_accessed + logger.info(f"โœ… {search_type.value} last_accessed verified: {original_last_accessed}") + + if data_id not in tested_data_ids: + tested_data_ids.append(data_id) + + except Exception as e: + logger.warning(f"โš ๏ธ {search_type.value} search failed: {str(e)}") + continue + + # Step 3: Test cleanup with aged timestamps + from cognee.tasks.cleanup.cleanup_unused_data import cleanup_unused_data + + minutes_threshold = 30 + aged_timestamp = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold + 10) + + # Age all tested data records + db_engine = get_relational_engine() + for data_id in tested_data_ids: + async with db_engine.get_async_session() as session: + stmt = update(Data).where(Data.id == data_id).values(last_accessed=aged_timestamp) + await session.execute(stmt) + await session.commit() + + # First do a dry run + logger.info("Testing dry run...") + dry_run_result = await cleanup_unused_data(minutes_threshold=10, dry_run=True, user_id=user.id) + + logger.info(f"Dry run result: {dry_run_result}") + + assert dry_run_result["status"] == "dry_run", ( + f"Status should be 'dry_run', got: {dry_run_result['status']}" + ) + assert dry_run_result["unused_count"] > 0, "Should find at least one unused document" + logger.info(f"โœ… Dry run found {dry_run_result['unused_count']} unused documents") + + # Now run actual cleanup + logger.info("Executing cleanup...") + cleanup_result = await cleanup_unused_data(minutes_threshold=30, dry_run=False, user_id=user.id) + + assert cleanup_result["status"] == "completed", "Cleanup should complete successfully" + assert cleanup_result["deleted_count"]["documents"] > 0, ( + "At least one document should be deleted" + ) + logger.info( + f"โœ… Cleanup completed. Deleted {cleanup_result['deleted_count']['documents']} documents" + ) + + # Step 4: Verify deletion + for data_id in tested_data_ids: + async with db_engine.get_async_session() as session: + deleted_data = ( + await session.execute(select(Data).where(Data.id == data_id)) + ).scalar_one_or_none() + assert deleted_data is None, f"Data record {data_id} should be deleted" + + logger.info("โœ… Confirmed: All tested data records were deleted") + logger.info("๐ŸŽ‰ All cleanup tests passed for all search types!") + return True + + +if __name__ == "__main__": + import asyncio + + success = asyncio.run(test_all_search_types_cleanup()) exit(0 if success else 1)