diff --git a/cognee/modules/search/methods/search.py b/cognee/modules/search/methods/search.py index 6fb3ee0bc..eed3dc85d 100644 --- a/cognee/modules/search/methods/search.py +++ b/cognee/modules/search/methods/search.py @@ -13,8 +13,9 @@ from cognee.context_global_variables import backend_access_control_enabled from cognee.modules.engine.models.node_set import NodeSet from cognee.modules.graph.cognee_graph.CogneeGraphElements import Edge from cognee.modules.search.types import ( - SearchResult, - SearchType, + SearchResultDataset, + SearchResult, + SearchType, ) from cognee.modules.search.operations import log_query, log_result from cognee.modules.users.models import User @@ -26,7 +27,7 @@ from cognee import __version__ as cognee_version from .get_search_type_tools import get_search_type_tools from .no_access_control_search import no_access_control_search from ..utils.prepare_search_result import prepare_search_result -from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps # Import your function +from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps logger = get_logger() @@ -44,11 +45,12 @@ async def search( save_interaction: bool = False, last_k: Optional[int] = None, only_context: bool = False, + use_combined_context: bool = False, session_id: Optional[str] = None, wide_search_top_k: Optional[int] = 100, triplet_distance_penalty: Optional[float] = 3.5, verbose: bool = False, -) -> Union[CombinedSearchResult, List[SearchResult]]: +) -> List[SearchResult]: """ Args: diff --git a/cognee/tests/test_cleanup_unused_data.py b/cognee/tests/test_cleanup_unused_data.py index abb6d8f3e..01bbd9744 100644 --- a/cognee/tests/test_cleanup_unused_data.py +++ b/cognee/tests/test_cleanup_unused_data.py @@ -1,165 +1,197 @@ -import os -import pathlib -import cognee -from datetime import datetime, timezone, timedelta -from uuid import UUID -from sqlalchemy import select, update -from cognee.modules.data.models import Data, DatasetData -from cognee.infrastructure.databases.relational import get_relational_engine -from cognee.modules.users.methods import get_default_user -from cognee.shared.logging_utils import get_logger -from cognee.modules.search.types import SearchType - -logger = get_logger() - - -async def test_textdocument_cleanup_with_sql(): - """ - End-to-end test for TextDocument cleanup based on last_accessed timestamps. - """ - # Enable last accessed tracking BEFORE any cognee operations - os.environ["ENABLE_LAST_ACCESSED"] = "true" - - # Setup test directories - data_directory_path = str( - pathlib.Path( - os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_cleanup") - ).resolve() - ) - cognee_directory_path = str( - pathlib.Path( - os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_cleanup") - ).resolve() - ) - - cognee.config.data_root_directory(data_directory_path) - cognee.config.system_root_directory(cognee_directory_path) - - # Initialize database - from cognee.modules.engine.operations.setup import setup - - # Clean slate - await cognee.prune.prune_data() - await cognee.prune.prune_system(metadata=True) - - logger.info("๐Ÿงช Testing TextDocument cleanup based on last_accessed") - - # Step 1: Add and cognify a test document - dataset_name = "test_cleanup_dataset" - test_text = """ - Machine learning is a subset of artificial intelligence that enables systems to learn - and improve from experience without being explicitly programmed. Deep learning uses - neural networks with multiple layers to process data. - """ - - await setup() - user = await get_default_user() - await cognee.add([test_text], dataset_name=dataset_name, user=user) - - cognify_result = await cognee.cognify([dataset_name], user=user) - - # Extract dataset_id from cognify result - dataset_id = None - for ds_id, pipeline_result in cognify_result.items(): - dataset_id = ds_id - break - - assert dataset_id is not None, "Failed to get dataset_id from cognify result" - logger.info(f"โœ… Document added and cognified. Dataset ID: {dataset_id}") - - # Step 2: Perform search to trigger last_accessed update - logger.info("Triggering search to update last_accessed...") - search_results = await cognee.search( - query_type=SearchType.CHUNKS, - query_text="machine learning", - datasets=[dataset_name], - user=user, - ) - logger.info(f"โœ… Search completed, found {len(search_results)} results") - assert len(search_results) > 0, "Search should return results" - - # Step 3: Verify last_accessed was set and get data_id - db_engine = get_relational_engine() - async with db_engine.get_async_session() as session: - result = await session.execute( - select(Data, DatasetData) - .join(DatasetData, Data.id == DatasetData.data_id) - .where(DatasetData.dataset_id == dataset_id) - ) - data_records = result.all() - assert len(data_records) > 0, "No Data records found for the dataset" - data_record = data_records[0][0] - data_id = data_record.id - - # Verify last_accessed is set - assert data_record.last_accessed is not None, ( - "last_accessed should be set after search operation" - ) - - original_last_accessed = data_record.last_accessed - logger.info(f"โœ… last_accessed verified: {original_last_accessed}") - - # Step 4: Manually age the timestamp - minutes_threshold = 30 - aged_timestamp = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold + 10) - - async with db_engine.get_async_session() as session: - stmt = update(Data).where(Data.id == data_id).values(last_accessed=aged_timestamp) - await session.execute(stmt) - await session.commit() - - # Verify timestamp was updated - async with db_engine.get_async_session() as session: - result = await session.execute(select(Data).where(Data.id == data_id)) - updated_data = result.scalar_one_or_none() - assert updated_data is not None, "Data record should exist" - retrieved_timestamp = updated_data.last_accessed - if retrieved_timestamp.tzinfo is None: - retrieved_timestamp = retrieved_timestamp.replace(tzinfo=timezone.utc) - assert retrieved_timestamp == aged_timestamp, "Timestamp should be updated to aged value" - - # Step 5: Test cleanup (document-level is now the default) - from cognee.tasks.cleanup.cleanup_unused_data import cleanup_unused_data - - # First do a dry run - logger.info("Testing dry run...") - dry_run_result = await cleanup_unused_data(minutes_threshold=10, dry_run=True, user_id=user.id) - - # Debug: Print the actual result - logger.info(f"Dry run result: {dry_run_result}") - - assert dry_run_result["status"] == "dry_run", ( - f"Status should be 'dry_run', got: {dry_run_result['status']}" - ) - assert dry_run_result["unused_count"] > 0, "Should find at least one unused document" - logger.info(f"โœ… Dry run found {dry_run_result['unused_count']} unused documents") - - # Now run actual cleanup - logger.info("Executing cleanup...") - cleanup_result = await cleanup_unused_data(minutes_threshold=30, dry_run=False, user_id=user.id) - - assert cleanup_result["status"] == "completed", "Cleanup should complete successfully" - assert cleanup_result["deleted_count"]["documents"] > 0, ( - "At least one document should be deleted" - ) - logger.info( - f"โœ… Cleanup completed. Deleted {cleanup_result['deleted_count']['documents']} documents" - ) - - # Step 6: Verify deletion - async with db_engine.get_async_session() as session: - deleted_data = ( - await session.execute(select(Data).where(Data.id == data_id)) - ).scalar_one_or_none() - assert deleted_data is None, "Data record should be deleted" - logger.info("โœ… Confirmed: Data record was deleted") - - logger.info("๐ŸŽ‰ All cleanup tests passed!") - return True - - -if __name__ == "__main__": - import asyncio - - success = asyncio.run(test_textdocument_cleanup_with_sql()) +import os +import pathlib +import cognee +from datetime import datetime, timezone, timedelta +from uuid import UUID +from sqlalchemy import select, update +from cognee.modules.data.models import Data, DatasetData +from cognee.infrastructure.databases.relational import get_relational_engine +from cognee.modules.users.methods import get_default_user +from cognee.shared.logging_utils import get_logger +from cognee.modules.search.types import SearchType + +logger = get_logger() + + +async def test_all_search_types_cleanup(): + """ + End-to-end test for TextDocument cleanup based on last_accessed timestamps + across all search types. + """ + # Enable last accessed tracking BEFORE any cognee operations + os.environ["ENABLE_LAST_ACCESSED"] = "true" + + # Setup test directories + data_directory_path = str( + pathlib.Path( + os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_cleanup") + ).resolve() + ) + cognee_directory_path = str( + pathlib.Path( + os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_cleanup") + ).resolve() + ) + + cognee.config.data_root_directory(data_directory_path) + cognee.config.system_root_directory(cognee_directory_path) + + # Initialize database + from cognee.modules.engine.operations.setup import setup + + # Clean slate + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + logger.info("๐Ÿงช Testing TextDocument cleanup for all search types") + + # Step 1: Add and cognify a test document + dataset_name = "test_cleanup_dataset" + test_text = """ + Machine learning is a subset of artificial intelligence that enables systems to learn + and improve from experience without being explicitly programmed. Deep learning uses + neural networks with multiple layers to process data. + """ + + await setup() + user = await get_default_user() + await cognee.add([test_text], dataset_name=dataset_name, user=user) + + cognify_result = await cognee.cognify([dataset_name], user=user) + + # Extract dataset_id from cognify result + dataset_id = None + for ds_id, pipeline_result in cognify_result.items(): + dataset_id = ds_id + break + + assert dataset_id is not None, "Failed to get dataset_id from cognify result" + logger.info(f"โœ… Document added and cognified. Dataset ID: {dataset_id}") + + # All available search types to test (excluding CODE) + search_types_to_test = [ + SearchType.CHUNKS, + SearchType.SUMMARIES, + SearchType.RAG_COMPLETION, + SearchType.GRAPH_COMPLETION, + SearchType.GRAPH_SUMMARY_COMPLETION, + SearchType.GRAPH_COMPLETION_COT, + SearchType.GRAPH_COMPLETION_CONTEXT_EXTENSION, + SearchType.FEELING_LUCKY, + SearchType.CHUNKS_LEXICAL, + ] + + # Skip search types that require special data or permissions + skip_types = { + SearchType.CYPHER, # Requires ALLOW_CYPHER_QUERY=true + SearchType.NATURAL_LANGUAGE, # Requires ALLOW_CYPHER_QUERY=true + SearchType.FEEDBACK, # Requires previous search interaction + SearchType.TEMPORAL, # Requires temporal data + SearchType.CODING_RULES, # Requires coding rules data + } + + tested_data_ids = [] + + # Test each search type + for search_type in search_types_to_test: + if search_type in skip_types: + logger.info(f"โญ๏ธ Skipping {search_type.value} (requires special setup)") + continue + + logger.info(f"๐Ÿ” Testing {search_type.value} search...") + + try: + # Perform search to trigger last_accessed update + search_results = await cognee.search( + query_type=search_type, + query_text="machine learning", + datasets=[dataset_name], + user=user, + ) + + logger.info(f"โœ… {search_type.value} search completed, found {len(search_results)} results") + + # Verify last_accessed was set + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + result = await session.execute( + select(Data, DatasetData) + .join(DatasetData, Data.id == DatasetData.data_id) + .where(DatasetData.dataset_id == dataset_id) + ) + data_records = result.all() + assert len(data_records) > 0, "No Data records found for the dataset" + data_record = data_records[0][0] + data_id = data_record.id + + # Verify last_accessed is set + assert data_record.last_accessed is not None, ( + f"last_accessed should be set after {search_type.value} search operation" + ) + + original_last_accessed = data_record.last_accessed + logger.info(f"โœ… {search_type.value} last_accessed verified: {original_last_accessed}") + + if data_id not in tested_data_ids: + tested_data_ids.append(data_id) + + except Exception as e: + logger.warning(f"โš ๏ธ {search_type.value} search failed: {str(e)}") + continue + + # Step 3: Test cleanup with aged timestamps + from cognee.tasks.cleanup.cleanup_unused_data import cleanup_unused_data + + minutes_threshold = 30 + aged_timestamp = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold + 10) + + # Age all tested data records + db_engine = get_relational_engine() + for data_id in tested_data_ids: + async with db_engine.get_async_session() as session: + stmt = update(Data).where(Data.id == data_id).values(last_accessed=aged_timestamp) + await session.execute(stmt) + await session.commit() + + # First do a dry run + logger.info("Testing dry run...") + dry_run_result = await cleanup_unused_data(minutes_threshold=10, dry_run=True, user_id=user.id) + + logger.info(f"Dry run result: {dry_run_result}") + + assert dry_run_result["status"] == "dry_run", ( + f"Status should be 'dry_run', got: {dry_run_result['status']}" + ) + assert dry_run_result["unused_count"] > 0, "Should find at least one unused document" + logger.info(f"โœ… Dry run found {dry_run_result['unused_count']} unused documents") + + # Now run actual cleanup + logger.info("Executing cleanup...") + cleanup_result = await cleanup_unused_data(minutes_threshold=30, dry_run=False, user_id=user.id) + + assert cleanup_result["status"] == "completed", "Cleanup should complete successfully" + assert cleanup_result["deleted_count"]["documents"] > 0, ( + "At least one document should be deleted" + ) + logger.info( + f"โœ… Cleanup completed. Deleted {cleanup_result['deleted_count']['documents']} documents" + ) + + # Step 4: Verify deletion + for data_id in tested_data_ids: + async with db_engine.get_async_session() as session: + deleted_data = ( + await session.execute(select(Data).where(Data.id == data_id)) + ).scalar_one_or_none() + assert deleted_data is None, f"Data record {data_id} should be deleted" + + logger.info("โœ… Confirmed: All tested data records were deleted") + logger.info("๐ŸŽ‰ All cleanup tests passed for all search types!") + return True + + +if __name__ == "__main__": + import asyncio + + success = asyncio.run(test_all_search_types_cleanup()) exit(0 if success else 1)