diff --git a/alembic/versions/e1ec1dcb50b6_add_last_accessed_to_data.py b/alembic/versions/e1ec1dcb50b6_add_last_accessed_to_data.py new file mode 100644 index 000000000..f1a36ae59 --- /dev/null +++ b/alembic/versions/e1ec1dcb50b6_add_last_accessed_to_data.py @@ -0,0 +1,52 @@ +"""add_last_accessed_to_data + +Revision ID: e1ec1dcb50b6 +Revises: 211ab850ef3d +Create Date: 2025-11-04 21:45:52.642322 + +""" +import os +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'e1ec1dcb50b6' +down_revision: Union[str, None] = '211ab850ef3d' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def _get_column(inspector, table, name, schema=None): + for col in inspector.get_columns(table, schema=schema): + if col["name"] == name: + return col + return None + + +def upgrade() -> None: + conn = op.get_bind() + insp = sa.inspect(conn) + + last_accessed_column = _get_column(insp, "data", "last_accessed") + if not last_accessed_column: + # Always create the column for schema consistency + op.add_column('data', + sa.Column('last_accessed', sa.DateTime(timezone=True), nullable=True) + ) + + # Only initialize existing records if feature is enabled + enable_last_accessed = os.getenv("ENABLE_LAST_ACCESSED", "false").lower() == "true" + if enable_last_accessed: + op.execute("UPDATE data SET last_accessed = CURRENT_TIMESTAMP") + + +def downgrade() -> None: + conn = op.get_bind() + insp = sa.inspect(conn) + + last_accessed_column = _get_column(insp, "data", "last_accessed") + if last_accessed_column: + op.drop_column('data', 'last_accessed') diff --git a/cognee/modules/chunking/models/DocumentChunk.py b/cognee/modules/chunking/models/DocumentChunk.py index e024bf00b..e2b216a9b 100644 --- a/cognee/modules/chunking/models/DocumentChunk.py +++ b/cognee/modules/chunking/models/DocumentChunk.py @@ -33,5 +33,4 @@ class DocumentChunk(DataPoint): cut_type: str is_part_of: Document contains: List[Union[Entity, Event, tuple[Edge, Entity]]] = None - metadata: dict = {"index_fields": ["text"]} diff --git a/cognee/modules/data/models/Data.py b/cognee/modules/data/models/Data.py index 3cdead9d9..f04155718 100644 --- a/cognee/modules/data/models/Data.py +++ b/cognee/modules/data/models/Data.py @@ -36,6 +36,7 @@ class Data(Base): data_size = Column(Integer, nullable=True) # File size in bytes created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)) updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc)) + last_accessed = Column(DateTime(timezone=True), nullable=True) datasets = relationship( "Dataset", diff --git a/cognee/modules/engine/models/Entity.py b/cognee/modules/engine/models/Entity.py index 36da2e344..a34a6503c 100644 --- a/cognee/modules/engine/models/Entity.py +++ b/cognee/modules/engine/models/Entity.py @@ -2,10 +2,8 @@ from cognee.infrastructure.engine import DataPoint from cognee.modules.engine.models.EntityType import EntityType from typing import Optional - class Entity(DataPoint): name: str is_a: Optional[EntityType] = None description: str - metadata: dict = {"index_fields": ["name"]} diff --git a/cognee/modules/graph/utils/__init__.py b/cognee/modules/graph/utils/__init__.py index ebc648495..4c0b29d47 100644 --- a/cognee/modules/graph/utils/__init__.py +++ b/cognee/modules/graph/utils/__init__.py @@ -5,3 +5,4 @@ from .retrieve_existing_edges import retrieve_existing_edges from .convert_node_to_data_point import convert_node_to_data_point from .deduplicate_nodes_and_edges import deduplicate_nodes_and_edges from .resolve_edges_to_text import resolve_edges_to_text +from .get_entity_nodes_from_triplets import get_entity_nodes_from_triplets diff --git a/cognee/modules/graph/utils/get_entity_nodes_from_triplets.py b/cognee/modules/graph/utils/get_entity_nodes_from_triplets.py new file mode 100644 index 000000000..598a36854 --- /dev/null +++ b/cognee/modules/graph/utils/get_entity_nodes_from_triplets.py @@ -0,0 +1,13 @@ + +def get_entity_nodes_from_triplets(triplets): + entity_nodes = [] + seen_ids = set() + for triplet in triplets: + if hasattr(triplet, 'node1') and triplet.node1 and triplet.node1.id not in seen_ids: + entity_nodes.append({"id": str(triplet.node1.id)}) + seen_ids.add(triplet.node1.id) + if hasattr(triplet, 'node2') and triplet.node2 and triplet.node2.id not in seen_ids: + entity_nodes.append({"id": str(triplet.node2.id)}) + seen_ids.add(triplet.node2.id) + + return entity_nodes diff --git a/cognee/modules/retrieval/chunks_retriever.py b/cognee/modules/retrieval/chunks_retriever.py index 94b9d3fb9..b7a90238a 100644 --- a/cognee/modules/retrieval/chunks_retriever.py +++ b/cognee/modules/retrieval/chunks_retriever.py @@ -1,10 +1,11 @@ from typing import Any, Optional - +from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps from cognee.shared.logging_utils import get_logger from cognee.infrastructure.databases.vector import get_vector_engine from cognee.modules.retrieval.base_retriever import BaseRetriever from cognee.modules.retrieval.exceptions.exceptions import NoDataError from cognee.infrastructure.databases.vector.exceptions.exceptions import CollectionNotFoundError +from datetime import datetime, timezone logger = get_logger("ChunksRetriever") @@ -27,21 +28,16 @@ class ChunksRetriever(BaseRetriever): ): self.top_k = top_k - async def get_context(self, query: str) -> Any: + async def get_context(self, query: str) -> Any: """ Retrieves document chunks context based on the query. - Searches for document chunks relevant to the specified query using a vector engine. Raises a NoDataError if no data is found in the system. - Parameters: ----------- - - query (str): The query string to search for relevant document chunks. - Returns: -------- - - Any: A list of document chunk payloads retrieved from the search. """ logger.info( @@ -53,6 +49,8 @@ class ChunksRetriever(BaseRetriever): try: found_chunks = await vector_engine.search("DocumentChunk_text", query, limit=self.top_k) logger.info(f"Found {len(found_chunks)} chunks from vector search") + await update_node_access_timestamps(found_chunks) + except CollectionNotFoundError as error: logger.error("DocumentChunk_text collection not found in vector database") raise NoDataError("No data found in the system, please add data first.") from error diff --git a/cognee/modules/retrieval/completion_retriever.py b/cognee/modules/retrieval/completion_retriever.py index 126ebcab8..0e9a4167c 100644 --- a/cognee/modules/retrieval/completion_retriever.py +++ b/cognee/modules/retrieval/completion_retriever.py @@ -8,6 +8,7 @@ from cognee.modules.retrieval.utils.session_cache import ( save_conversation_history, get_conversation_history, ) +from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps from cognee.modules.retrieval.base_retriever import BaseRetriever from cognee.modules.retrieval.exceptions.exceptions import NoDataError from cognee.infrastructure.databases.vector.exceptions import CollectionNotFoundError @@ -65,7 +66,7 @@ class CompletionRetriever(BaseRetriever): if len(found_chunks) == 0: return "" - + await update_node_access_timestamps(found_chunks) # Combine all chunks text returned from vector search (number of chunks is determined by top_k chunks_payload = [found_chunk.payload["text"] for found_chunk in found_chunks] combined_context = "\n".join(chunks_payload) diff --git a/cognee/modules/retrieval/graph_completion_retriever.py b/cognee/modules/retrieval/graph_completion_retriever.py index 89e9e47ce..317d7cd9a 100644 --- a/cognee/modules/retrieval/graph_completion_retriever.py +++ b/cognee/modules/retrieval/graph_completion_retriever.py @@ -16,11 +16,13 @@ from cognee.modules.retrieval.utils.session_cache import ( ) from cognee.shared.logging_utils import get_logger from cognee.modules.retrieval.utils.extract_uuid_from_node import extract_uuid_from_node +from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps from cognee.modules.retrieval.utils.models import CogneeUserInteraction from cognee.modules.engine.models.node_set import NodeSet from cognee.infrastructure.databases.graph import get_graph_engine from cognee.context_global_variables import session_user from cognee.infrastructure.databases.cache.config import CacheConfig +from cognee.modules.graph.utils import get_entity_nodes_from_triplets logger = get_logger("GraphCompletionRetriever") @@ -145,6 +147,9 @@ class GraphCompletionRetriever(BaseGraphRetriever): # context = await self.resolve_edges_to_text(triplets) + entity_nodes = get_entity_nodes_from_triplets(triplets) + + await update_node_access_timestamps(entity_nodes) return triplets async def convert_retrieved_objects_to_context(self, triplets: List[Edge]): diff --git a/cognee/modules/retrieval/summaries_retriever.py b/cognee/modules/retrieval/summaries_retriever.py index 87b224946..0df750d22 100644 --- a/cognee/modules/retrieval/summaries_retriever.py +++ b/cognee/modules/retrieval/summaries_retriever.py @@ -4,6 +4,7 @@ from cognee.shared.logging_utils import get_logger from cognee.infrastructure.databases.vector import get_vector_engine from cognee.modules.retrieval.base_retriever import BaseRetriever from cognee.modules.retrieval.exceptions.exceptions import NoDataError +from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps from cognee.infrastructure.databases.vector.exceptions.exceptions import CollectionNotFoundError logger = get_logger("SummariesRetriever") @@ -54,6 +55,9 @@ class SummariesRetriever(BaseRetriever): "TextSummary_text", query, limit=self.top_k ) logger.info(f"Found {len(summaries_results)} summaries from vector search") + + await update_node_access_timestamps(summaries_results) + except CollectionNotFoundError as error: logger.error("TextSummary_text collection not found in vector database") raise NoDataError("No data found in the system, please add data first.") from error diff --git a/cognee/modules/retrieval/utils/access_tracking.py b/cognee/modules/retrieval/utils/access_tracking.py new file mode 100644 index 000000000..54fd043b9 --- /dev/null +++ b/cognee/modules/retrieval/utils/access_tracking.py @@ -0,0 +1,82 @@ +"""Utilities for tracking data access in retrievers.""" + +import json +from datetime import datetime, timezone +from typing import List, Any +from uuid import UUID +import os +from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.infrastructure.databases.relational import get_relational_engine +from cognee.modules.data.models import Data +from cognee.shared.logging_utils import get_logger +from sqlalchemy import update +from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph + +logger = get_logger(__name__) + + +async def update_node_access_timestamps(items: List[Any]): + if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true": + return + + if not items: + return + + graph_engine = await get_graph_engine() + timestamp_dt = datetime.now(timezone.utc) + + # Extract node IDs + node_ids = [] + for item in items: + item_id = item.payload.get("id") if hasattr(item, 'payload') else item.get("id") + if item_id: + node_ids.append(str(item_id)) + + if not node_ids: + return + + # Focus on document-level tracking via projection + try: + doc_ids = await _find_origin_documents_via_projection(graph_engine, node_ids) + if doc_ids: + await _update_sql_records(doc_ids, timestamp_dt) + except Exception as e: + logger.error(f"Failed to update SQL timestamps: {e}") + raise + + +async def _find_origin_documents_via_projection(graph_engine, node_ids): + """Find origin documents using graph projection instead of DB queries""" + # Project the entire graph with necessary properties + memory_fragment = CogneeGraph() + await memory_fragment.project_graph_from_db( + graph_engine, + node_properties_to_project=["id", "type"], + edge_properties_to_project=["relationship_name"] + ) + + # Find origin documents by traversing the in-memory graph + doc_ids = set() + for node_id in node_ids: + node = memory_fragment.get_node(node_id) + if node and node.get_attribute("type") == "DocumentChunk": + # Traverse edges to find connected documents + for edge in node.get_skeleton_edges(): + # Get the neighbor node + neighbor = edge.get_destination_node() if edge.get_source_node().id == node_id else edge.get_source_node() + if neighbor and neighbor.get_attribute("type") in ["TextDocument", "Document"]: + doc_ids.add(neighbor.id) + + return list(doc_ids) + + +async def _update_sql_records(doc_ids, timestamp_dt): + """Update SQL Data table (same for all providers)""" + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + stmt = update(Data).where( + Data.id.in_([UUID(doc_id) for doc_id in doc_ids]) + ).values(last_accessed=timestamp_dt) + + await session.execute(stmt) + await session.commit() diff --git a/cognee/tasks/cleanup/cleanup_unused_data.py b/cognee/tasks/cleanup/cleanup_unused_data.py new file mode 100644 index 000000000..34cde1b6f --- /dev/null +++ b/cognee/tasks/cleanup/cleanup_unused_data.py @@ -0,0 +1,187 @@ +""" +Task for automatically deleting unused data from the memify pipeline. + +This task identifies and removes entire documents that haven't +been accessed by retrievers for a specified period, helping maintain system +efficiency and storage optimization through whole-document removal. +""" + +import json +from datetime import datetime, timezone, timedelta +from typing import Optional, Dict, Any +from uuid import UUID +import os +from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.infrastructure.databases.vector import get_vector_engine +from cognee.infrastructure.databases.relational import get_relational_engine +from cognee.modules.data.models import Data, DatasetData +from cognee.shared.logging_utils import get_logger +from sqlalchemy import select, or_ +import cognee +import sqlalchemy as sa +from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph + +logger = get_logger(__name__) + + +async def cleanup_unused_data( + minutes_threshold: Optional[int], + dry_run: bool = True, + user_id: Optional[UUID] = None +) -> Dict[str, Any]: + """ + Identify and remove unused data from the memify pipeline. + + Parameters + ---------- + minutes_threshold : int + Minutes since last access to consider data unused + dry_run : bool + If True, only report what would be deleted without actually deleting (default: True) + user_id : UUID, optional + Limit cleanup to specific user's data (default: None) + + Returns + ------- + Dict[str, Any] + Cleanup results with status, counts, and timestamp + """ + # Check 1: Environment variable must be enabled + if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true": + logger.warning( + "Cleanup skipped: ENABLE_LAST_ACCESSED is not enabled." + ) + return { + "status": "skipped", + "reason": "ENABLE_LAST_ACCESSED not enabled", + "unused_count": 0, + "deleted_count": {}, + "cleanup_date": datetime.now(timezone.utc).isoformat() + } + + # Check 2: Verify tracking has actually been running + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + # Count records with non-NULL last_accessed + tracked_count = await session.execute( + select(sa.func.count(Data.id)).where(Data.last_accessed.isnot(None)) + ) + tracked_records = tracked_count.scalar() + + if tracked_records == 0: + logger.warning( + "Cleanup skipped: No records have been tracked yet. " + "ENABLE_LAST_ACCESSED may have been recently enabled. " + "Wait for retrievers to update timestamps before running cleanup." + ) + return { + "status": "skipped", + "reason": "No tracked records found - tracking may be newly enabled", + "unused_count": 0, + "deleted_count": {}, + "cleanup_date": datetime.now(timezone.utc).isoformat() + } + + logger.info( + "Starting cleanup task", + minutes_threshold=minutes_threshold, + dry_run=dry_run, + user_id=str(user_id) if user_id else None + ) + + # Calculate cutoff timestamp + cutoff_date = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold) + + # Document-level approach (recommended) + return await _cleanup_via_sql(cutoff_date, dry_run, user_id) + + +async def _cleanup_via_sql( + cutoff_date: datetime, + dry_run: bool, + user_id: Optional[UUID] = None +) -> Dict[str, Any]: + """ + SQL-based cleanup: Query Data table for unused documents and use cognee.delete(). + + Parameters + ---------- + cutoff_date : datetime + Cutoff date for last_accessed filtering + dry_run : bool + If True, only report what would be deleted + user_id : UUID, optional + Filter by user ID if provided + + Returns + ------- + Dict[str, Any] + Cleanup results + """ + db_engine = get_relational_engine() + + async with db_engine.get_async_session() as session: + # Query for Data records with old last_accessed timestamps + query = select(Data, DatasetData).join( + DatasetData, Data.id == DatasetData.data_id + ).where( + or_( + Data.last_accessed < cutoff_date, + Data.last_accessed.is_(None) + ) + ) + + if user_id: + from cognee.modules.data.models import Dataset + query = query.join(Dataset, DatasetData.dataset_id == Dataset.id).where( + Dataset.owner_id == user_id + ) + + result = await session.execute(query) + unused_data = result.all() + + logger.info(f"Found {len(unused_data)} unused documents in SQL") + + if dry_run: + return { + "status": "dry_run", + "unused_count": len(unused_data), + "deleted_count": { + "data_items": 0, + "documents": 0 + }, + "cleanup_date": datetime.now(timezone.utc).isoformat(), + "preview": { + "documents": len(unused_data) + } + } + + # Delete each document using cognee.delete() + deleted_count = 0 + from cognee.modules.users.methods import get_default_user + user = await get_default_user() if user_id is None else None + + for data, dataset_data in unused_data: + try: + await cognee.delete( + data_id=data.id, + dataset_id=dataset_data.dataset_id, + mode="hard", # Use hard mode to also remove orphaned entities + user=user + ) + deleted_count += 1 + logger.info(f"Deleted document {data.id} from dataset {dataset_data.dataset_id}") + except Exception as e: + logger.error(f"Failed to delete document {data.id}: {e}") + + logger.info("Cleanup completed", deleted_count=deleted_count) + + return { + "status": "completed", + "unused_count": len(unused_data), + "deleted_count": { + "data_items": deleted_count, + "documents": deleted_count + }, + "cleanup_date": datetime.now(timezone.utc).isoformat() + } diff --git a/cognee/tasks/summarization/models.py b/cognee/tasks/summarization/models.py index 75ed82d50..8420cfaa5 100644 --- a/cognee/tasks/summarization/models.py +++ b/cognee/tasks/summarization/models.py @@ -1,5 +1,5 @@ -from typing import Union +from typing import Union from cognee.infrastructure.engine import DataPoint from cognee.modules.chunking.models import DocumentChunk from cognee.shared.CodeGraphEntities import CodeFile, CodePart @@ -17,7 +17,6 @@ class TextSummary(DataPoint): text: str made_from: DocumentChunk - metadata: dict = {"index_fields": ["text"]} diff --git a/cognee/tests/test_cleanup_unused_data.py b/cognee/tests/test_cleanup_unused_data.py new file mode 100644 index 000000000..e738dcba0 --- /dev/null +++ b/cognee/tests/test_cleanup_unused_data.py @@ -0,0 +1,172 @@ +import os +import pathlib +import cognee +from datetime import datetime, timezone, timedelta +from uuid import UUID +from sqlalchemy import select, update +from cognee.modules.data.models import Data, DatasetData +from cognee.infrastructure.databases.relational import get_relational_engine +from cognee.modules.users.methods import get_default_user +from cognee.shared.logging_utils import get_logger +from cognee.modules.search.types import SearchType + +logger = get_logger() + + +async def test_textdocument_cleanup_with_sql(): + """ + End-to-end test for TextDocument cleanup based on last_accessed timestamps. + """ + # Enable last accessed tracking BEFORE any cognee operations + os.environ["ENABLE_LAST_ACCESSED"] = "true" + + # Setup test directories + data_directory_path = str( + pathlib.Path( + os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_cleanup") + ).resolve() + ) + cognee_directory_path = str( + pathlib.Path( + os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_cleanup") + ).resolve() + ) + + cognee.config.data_root_directory(data_directory_path) + cognee.config.system_root_directory(cognee_directory_path) + + # Initialize database + from cognee.modules.engine.operations.setup import setup + + # Clean slate + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + logger.info("๐Ÿงช Testing TextDocument cleanup based on last_accessed") + + # Step 1: Add and cognify a test document + dataset_name = "test_cleanup_dataset" + test_text = """ + Machine learning is a subset of artificial intelligence that enables systems to learn + and improve from experience without being explicitly programmed. Deep learning uses + neural networks with multiple layers to process data. + """ + + await setup() + user = await get_default_user() + await cognee.add([test_text], dataset_name=dataset_name, user=user) + + cognify_result = await cognee.cognify([dataset_name], user=user) + + # Extract dataset_id from cognify result + dataset_id = None + for ds_id, pipeline_result in cognify_result.items(): + dataset_id = ds_id + break + + assert dataset_id is not None, "Failed to get dataset_id from cognify result" + logger.info(f"โœ… Document added and cognified. Dataset ID: {dataset_id}") + + # Step 2: Perform search to trigger last_accessed update + logger.info("Triggering search to update last_accessed...") + search_results = await cognee.search( + query_type=SearchType.CHUNKS, + query_text="machine learning", + datasets=[dataset_name], + user=user + ) + logger.info(f"โœ… Search completed, found {len(search_results)} results") + assert len(search_results) > 0, "Search should return results" + + # Step 3: Verify last_accessed was set and get data_id + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + result = await session.execute( + select(Data, DatasetData) + .join(DatasetData, Data.id == DatasetData.data_id) + .where(DatasetData.dataset_id == dataset_id) + ) + data_records = result.all() + assert len(data_records) > 0, "No Data records found for the dataset" + data_record = data_records[0][0] + data_id = data_record.id + + # Verify last_accessed is set + assert data_record.last_accessed is not None, ( + "last_accessed should be set after search operation" + ) + + original_last_accessed = data_record.last_accessed + logger.info(f"โœ… last_accessed verified: {original_last_accessed}") + + # Step 4: Manually age the timestamp + minutes_threshold = 30 + aged_timestamp = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold + 10) + + async with db_engine.get_async_session() as session: + stmt = update(Data).where(Data.id == data_id).values(last_accessed=aged_timestamp) + await session.execute(stmt) + await session.commit() + + # Verify timestamp was updated + async with db_engine.get_async_session() as session: + result = await session.execute(select(Data).where(Data.id == data_id)) + updated_data = result.scalar_one_or_none() + assert updated_data is not None, "Data record should exist" + retrieved_timestamp = updated_data.last_accessed + if retrieved_timestamp.tzinfo is None: + retrieved_timestamp = retrieved_timestamp.replace(tzinfo=timezone.utc) + assert retrieved_timestamp == aged_timestamp, ( + f"Timestamp should be updated to aged value" + ) + + # Step 5: Test cleanup (document-level is now the default) + from cognee.tasks.cleanup.cleanup_unused_data import cleanup_unused_data + + # First do a dry run + logger.info("Testing dry run...") + dry_run_result = await cleanup_unused_data( + minutes_threshold=10, + dry_run=True, + user_id=user.id + ) + + # Debug: Print the actual result + logger.info(f"Dry run result: {dry_run_result}") + + assert dry_run_result['status'] == 'dry_run', f"Status should be 'dry_run', got: {dry_run_result['status']}" + assert dry_run_result['unused_count'] > 0, ( + "Should find at least one unused document" + ) + logger.info(f"โœ… Dry run found {dry_run_result['unused_count']} unused documents") + + # Now run actual cleanup + logger.info("Executing cleanup...") + cleanup_result = await cleanup_unused_data( + minutes_threshold=30, + dry_run=False, + user_id=user.id + ) + + assert cleanup_result["status"] == "completed", "Cleanup should complete successfully" + assert cleanup_result["deleted_count"]["documents"] > 0, ( + "At least one document should be deleted" + ) + logger.info(f"โœ… Cleanup completed. Deleted {cleanup_result['deleted_count']['documents']} documents") + + # Step 6: Verify deletion + async with db_engine.get_async_session() as session: + deleted_data = ( + await session.execute(select(Data).where(Data.id == data_id)) + ).scalar_one_or_none() + assert deleted_data is None, "Data record should be deleted" + logger.info("โœ… Confirmed: Data record was deleted") + + logger.info("๐ŸŽ‰ All cleanup tests passed!") + return True + + +if __name__ == "__main__": + import asyncio + success = asyncio.run(test_textdocument_cleanup_with_sql()) + exit(0 if success else 1)