diff --git a/alembic/versions/e1ec1dcb50b6_add_last_accessed_to_data.py b/alembic/versions/e1ec1dcb50b6_add_last_accessed_to_data.py new file mode 100644 index 000000000..267e11fb2 --- /dev/null +++ b/alembic/versions/e1ec1dcb50b6_add_last_accessed_to_data.py @@ -0,0 +1,46 @@ +"""add_last_accessed_to_data + +Revision ID: e1ec1dcb50b6 +Revises: 211ab850ef3d +Create Date: 2025-11-04 21:45:52.642322 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'e1ec1dcb50b6' +down_revision: Union[str, None] = '211ab850ef3d' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + +def _get_column(inspector, table, name, schema=None): + for col in inspector.get_columns(table, schema=schema): + if col["name"] == name: + return col + return None + + +def upgrade() -> None: + conn = op.get_bind() + insp = sa.inspect(conn) + + last_accessed_column = _get_column(insp, "data", "last_accessed") + if not last_accessed_column: + op.add_column('data', + sa.Column('last_accessed', sa.DateTime(timezone=True), nullable=True) + ) + # Optionally initialize with created_at values for existing records + op.execute("UPDATE data SET last_accessed = created_at") + + +def downgrade() -> None: + conn = op.get_bind() + insp = sa.inspect(conn) + + last_accessed_column = _get_column(insp, "data", "last_accessed") + if last_accessed_column: + op.drop_column('data', 'last_accessed') diff --git a/cognee/modules/data/models/Data.py b/cognee/modules/data/models/Data.py index ef228f2e1..27ab7481e 100644 --- a/cognee/modules/data/models/Data.py +++ b/cognee/modules/data/models/Data.py @@ -36,6 +36,7 @@ class Data(Base): data_size = Column(Integer, nullable=True) # File size in bytes created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)) updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc)) + last_accessed = Column(DateTime(timezone=True), nullable=True) datasets = relationship( "Dataset", diff --git a/cognee/modules/graph/utils/__init__.py b/cognee/modules/graph/utils/__init__.py index ebc648495..4c0b29d47 100644 --- a/cognee/modules/graph/utils/__init__.py +++ b/cognee/modules/graph/utils/__init__.py @@ -5,3 +5,4 @@ from .retrieve_existing_edges import retrieve_existing_edges from .convert_node_to_data_point import convert_node_to_data_point from .deduplicate_nodes_and_edges import deduplicate_nodes_and_edges from .resolve_edges_to_text import resolve_edges_to_text +from .get_entity_nodes_from_triplets import get_entity_nodes_from_triplets diff --git a/cognee/modules/graph/utils/get_entity_nodes_from_triplets.py b/cognee/modules/graph/utils/get_entity_nodes_from_triplets.py new file mode 100644 index 000000000..598a36854 --- /dev/null +++ b/cognee/modules/graph/utils/get_entity_nodes_from_triplets.py @@ -0,0 +1,13 @@ + +def get_entity_nodes_from_triplets(triplets): + entity_nodes = [] + seen_ids = set() + for triplet in triplets: + if hasattr(triplet, 'node1') and triplet.node1 and triplet.node1.id not in seen_ids: + entity_nodes.append({"id": str(triplet.node1.id)}) + seen_ids.add(triplet.node1.id) + if hasattr(triplet, 'node2') and triplet.node2 and triplet.node2.id not in seen_ids: + entity_nodes.append({"id": str(triplet.node2.id)}) + seen_ids.add(triplet.node2.id) + + return entity_nodes diff --git a/cognee/modules/retrieval/completion_retriever.py b/cognee/modules/retrieval/completion_retriever.py index bb568924d..fc8ef747f 100644 --- a/cognee/modules/retrieval/completion_retriever.py +++ b/cognee/modules/retrieval/completion_retriever.py @@ -8,6 +8,7 @@ from cognee.modules.retrieval.utils.session_cache import ( save_conversation_history, get_conversation_history, ) +from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps from cognee.modules.retrieval.base_retriever import BaseRetriever from cognee.modules.retrieval.exceptions.exceptions import NoDataError from cognee.infrastructure.databases.vector.exceptions import CollectionNotFoundError @@ -65,7 +66,7 @@ class CompletionRetriever(BaseRetriever): if len(found_chunks) == 0: return "" - + await update_node_access_timestamps(found_chunks) # Combine all chunks text returned from vector search (number of chunks is determined by top_k chunks_payload = [found_chunk.payload["text"] for found_chunk in found_chunks] combined_context = "\n".join(chunks_payload) diff --git a/cognee/modules/retrieval/graph_completion_retriever.py b/cognee/modules/retrieval/graph_completion_retriever.py index b7ab4edae..122cc943f 100644 --- a/cognee/modules/retrieval/graph_completion_retriever.py +++ b/cognee/modules/retrieval/graph_completion_retriever.py @@ -16,11 +16,13 @@ from cognee.modules.retrieval.utils.session_cache import ( ) from cognee.shared.logging_utils import get_logger from cognee.modules.retrieval.utils.extract_uuid_from_node import extract_uuid_from_node +from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps from cognee.modules.retrieval.utils.models import CogneeUserInteraction from cognee.modules.engine.models.node_set import NodeSet from cognee.infrastructure.databases.graph import get_graph_engine from cognee.context_global_variables import session_user from cognee.infrastructure.databases.cache.config import CacheConfig +from cognee.modules.graph.utils import get_entity_nodes_from_triplets logger = get_logger("GraphCompletionRetriever") @@ -139,6 +141,9 @@ class GraphCompletionRetriever(BaseGraphRetriever): # context = await self.resolve_edges_to_text(triplets) + entity_nodes = get_entity_nodes_from_triplets(triplets) + + await update_node_access_timestamps(entity_nodes) return triplets async def get_completion( diff --git a/cognee/modules/retrieval/utils/access_tracking.py b/cognee/modules/retrieval/utils/access_tracking.py index 79afd25db..65d597a93 100644 --- a/cognee/modules/retrieval/utils/access_tracking.py +++ b/cognee/modules/retrieval/utils/access_tracking.py @@ -1,20 +1,27 @@ - """Utilities for tracking data access in retrievers.""" import json from datetime import datetime, timezone from typing import List, Any +from uuid import UUID from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.infrastructure.databases.relational import get_relational_engine +from cognee.modules.data.models import Data from cognee.shared.logging_utils import get_logger +from sqlalchemy import update logger = get_logger(__name__) async def update_node_access_timestamps(items: List[Any]): """ - Update last_accessed_at for nodes in Kuzu graph database. - Automatically determines node type from the graph database. + Update last_accessed_at for nodes in graph database and corresponding Data records in SQL. + + This function: + 1. Updates last_accessed_at in the graph database nodes (in properties JSON) + 2. Traverses to find origin TextDocument nodes (without hardcoded relationship names) + 3. Updates last_accessed in the SQL Data table for those documents Parameters ---------- @@ -26,39 +33,63 @@ async def update_node_access_timestamps(items: List[Any]): graph_engine = await get_graph_engine() timestamp_ms = int(datetime.now(timezone.utc).timestamp() * 1000) + timestamp_dt = datetime.now(timezone.utc) + # Extract node IDs + node_ids = [] for item in items: - # Extract ID from payload item_id = item.payload.get("id") if hasattr(item, 'payload') else item.get("id") - if not item_id: - continue - - # try: - # Query to get both node type and properties in one call - result = await graph_engine.query( - "MATCH (n:Node {id: $id}) RETURN n.type as node_type, n.properties as props", - {"id": str(item_id)} - ) - - if result and len(result) > 0 and result[0]: - node_type = result[0][0] # First column: node_type - props_json = result[0][1] # Second column: properties - - # Parse existing properties JSON - props = json.loads(props_json) if props_json else {} - # Update last_accessed_at with millisecond timestamp - props["last_accessed_at"] = timestamp_ms - - # Write back to graph database - await graph_engine.query( - "MATCH (n:Node {id: $id}) SET n.properties = $props", - {"id": str(item_id), "props": json.dumps(props)} + if item_id: + node_ids.append(str(item_id)) + + if not node_ids: + return + + try: + # Step 1: Batch update graph nodes + for node_id in node_ids: + result = await graph_engine.query( + "MATCH (n:Node {id: $id}) RETURN n.properties", + {"id": node_id} ) - logger.debug(f"Updated access timestamp for {node_type} node {item_id}") + if result and result[0]: + props = json.loads(result[0][0]) if result[0][0] else {} + props["last_accessed_at"] = timestamp_ms - # except Exception as e: - # logger.error(f"Failed to update timestamp for node {item_id}: {e}") - # continue - - logger.debug(f"Updated access timestamps for {len(items)} nodes") + await graph_engine.query( + "MATCH (n:Node {id: $id}) SET n.properties = $props", + {"id": node_id, "props": json.dumps(props)} + ) + + logger.debug(f"Updated access timestamps for {len(node_ids)} graph nodes") + + # Step 2: Find origin TextDocument nodes (without hardcoded relationship names) + origin_query = """ + UNWIND $node_ids AS node_id + MATCH (chunk:Node {id: node_id})-[e:EDGE]-(doc:Node) + WHERE chunk.type = 'DocumentChunk' AND doc.type IN ['TextDocument', 'Document'] + RETURN DISTINCT doc.id + """ + + result = await graph_engine.query(origin_query, {"node_ids": node_ids}) + + # Extract and deduplicate document IDs + doc_ids = list(set([row[0] for row in result if row and row[0]])) if result else [] + + # Step 3: Update SQL Data table + if doc_ids: + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + stmt = update(Data).where( + Data.id.in_([UUID(doc_id) for doc_id in doc_ids]) + ).values(last_accessed=timestamp_dt) + + await session.execute(stmt) + await session.commit() + + logger.debug(f"Updated last_accessed for {len(doc_ids)} Data records in SQL") + + except Exception as e: + logger.error(f"Failed to update timestamps: {e}") + raise diff --git a/cognee/tasks/cleanup/cleanup_unused_data.py b/cognee/tasks/cleanup/cleanup_unused_data.py new file mode 100644 index 000000000..4df622a2c --- /dev/null +++ b/cognee/tasks/cleanup/cleanup_unused_data.py @@ -0,0 +1,336 @@ +""" +Task for automatically deleting unused data from the memify pipeline. + +This task identifies and removes data (chunks, entities, summaries) that hasn't +been accessed by retrievers for a specified period, helping maintain system +efficiency and storage optimization. +""" + +import json +from datetime import datetime, timezone, timedelta +from typing import Optional, Dict, Any +from uuid import UUID + +from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.infrastructure.databases.vector import get_vector_engine +from cognee.infrastructure.databases.relational import get_relational_engine +from cognee.modules.data.models import Data, DatasetData +from cognee.shared.logging_utils import get_logger +from sqlalchemy import select, or_ +import cognee + +logger = get_logger(__name__) + + +async def cleanup_unused_data( + days_threshold: Optional[int], + dry_run: bool = True, + user_id: Optional[UUID] = None, + text_doc: bool = False +) -> Dict[str, Any]: + """ + Identify and remove unused data from the memify pipeline. + + Parameters + ---------- + days_threshold : int + days since last access to consider data unused + dry_run : bool + If True, only report what would be deleted without actually deleting (default: True) + user_id : UUID, optional + Limit cleanup to specific user's data (default: None) + text_doc : bool + If True, use SQL-based filtering to find unused TextDocuments and call cognee.delete() + for proper whole-document deletion (default: False) + + Returns + ------- + Dict[str, Any] + Cleanup results with status, counts, and timestamp + """ + logger.info( + "Starting cleanup task", + days_threshold=days_threshold, + dry_run=dry_run, + user_id=str(user_id) if user_id else None, + text_doc=text_doc + ) + + # Calculate cutoff timestamp + cutoff_date = datetime.now(timezone.utc) - timedelta(days=days_threshold) + + if text_doc: + # SQL-based approach: Find unused TextDocuments and use cognee.delete() + return await _cleanup_via_sql(cutoff_date, dry_run, user_id) + else: + # Graph-based approach: Find unused nodes directly from graph + cutoff_timestamp_ms = int(cutoff_date.timestamp() * 1000) + logger.debug(f"Cutoff timestamp: {cutoff_date.isoformat()} ({cutoff_timestamp_ms}ms)") + + # Find unused nodes + unused_nodes = await _find_unused_nodes(cutoff_timestamp_ms, user_id) + + total_unused = sum(len(nodes) for nodes in unused_nodes.values()) + logger.info(f"Found {total_unused} unused nodes", unused_nodes={k: len(v) for k, v in unused_nodes.items()}) + + if dry_run: + return { + "status": "dry_run", + "unused_count": total_unused, + "deleted_count": { + "data_items": 0, + "chunks": 0, + "entities": 0, + "summaries": 0, + "associations": 0 + }, + "cleanup_date": datetime.now(timezone.utc).isoformat(), + "preview": { + "chunks": len(unused_nodes["DocumentChunk"]), + "entities": len(unused_nodes["Entity"]), + "summaries": len(unused_nodes["TextSummary"]) + } + } + + # Delete unused nodes + deleted_counts = await _delete_unused_nodes(unused_nodes) + + logger.info("Cleanup completed", deleted_counts=deleted_counts) + + return { + "status": "completed", + "unused_count": total_unused, + "deleted_count": { + "data_items": 0, + "chunks": deleted_counts["DocumentChunk"], + "entities": deleted_counts["Entity"], + "summaries": deleted_counts["TextSummary"], + "associations": deleted_counts["associations"] + }, + "cleanup_date": datetime.now(timezone.utc).isoformat() + } + + +async def _cleanup_via_sql( + cutoff_date: datetime, + dry_run: bool, + user_id: Optional[UUID] = None +) -> Dict[str, Any]: + """ + SQL-based cleanup: Query Data table for unused documents and use cognee.delete(). + + Parameters + ---------- + cutoff_date : datetime + Cutoff date for last_accessed filtering + dry_run : bool + If True, only report what would be deleted + user_id : UUID, optional + Filter by user ID if provided + + Returns + ------- + Dict[str, Any] + Cleanup results + """ + db_engine = get_relational_engine() + + async with db_engine.get_async_session() as session: + # Query for Data records with old last_accessed timestamps + query = select(Data, DatasetData).join( + DatasetData, Data.id == DatasetData.data_id + ).where( + or_( + Data.last_accessed < cutoff_date, + Data.last_accessed.is_(None) + ) + ) + + if user_id: + from cognee.modules.data.models import Dataset + query = query.join(Dataset, DatasetData.dataset_id == Dataset.id).where( + Dataset.owner_id == user_id + ) + + result = await session.execute(query) + unused_data = result.all() + + logger.info(f"Found {len(unused_data)} unused documents in SQL") + + if dry_run: + return { + "status": "dry_run", + "unused_count": len(unused_data), + "deleted_count": { + "data_items": 0, + "documents": 0 + }, + "cleanup_date": datetime.now(timezone.utc).isoformat(), + "preview": { + "documents": len(unused_data) + } + } + + # Delete each document using cognee.delete() + deleted_count = 0 + from cognee.modules.users.methods import get_default_user + user = await get_default_user() if user_id is None else None + + for data, dataset_data in unused_data: + try: + await cognee.delete( + data_id=data.id, + dataset_id=dataset_data.dataset_id, + mode="hard", # Use hard mode to also remove orphaned entities + user=user + ) + deleted_count += 1 + logger.info(f"Deleted document {data.id} from dataset {dataset_data.dataset_id}") + except Exception as e: + logger.error(f"Failed to delete document {data.id}: {e}") + + logger.info("Cleanup completed", deleted_count=deleted_count) + + return { + "status": "completed", + "unused_count": len(unused_data), + "deleted_count": { + "data_items": deleted_count, + "documents": deleted_count + }, + "cleanup_date": datetime.now(timezone.utc).isoformat() + } + + +async def _find_unused_nodes( + cutoff_timestamp_ms: int, + user_id: Optional[UUID] = None +) -> Dict[str, list]: + """ + Query Kuzu for nodes with old last_accessed_at timestamps. + + Parameters + ---------- + cutoff_timestamp_ms : int + Cutoff timestamp in milliseconds since epoch + user_id : UUID, optional + Filter by user ID if provided + + Returns + ------- + Dict[str, list] + Dictionary mapping node types to lists of unused node IDs + """ + graph_engine = await get_graph_engine() + + # Query all nodes with their properties + query = "MATCH (n:Node) RETURN n.id, n.type, n.properties" + results = await graph_engine.query(query) + + unused_nodes = { + "DocumentChunk": [], + "Entity": [], + "TextSummary": [] + } + + for node_id, node_type, props_json in results: + # Only process tracked node types + if node_type not in unused_nodes: + continue + + # Parse properties JSON + if props_json: + try: + props = json.loads(props_json) + last_accessed = props.get("last_accessed_at") + + # Check if node is unused (never accessed or accessed before cutoff) + if last_accessed is None or last_accessed < cutoff_timestamp_ms: + unused_nodes[node_type].append(node_id) + logger.debug( + f"Found unused {node_type}", + node_id=node_id, + last_accessed=last_accessed + ) + except json.JSONDecodeError: + logger.warning(f"Failed to parse properties for node {node_id}") + continue + + return unused_nodes + + +async def _delete_unused_nodes(unused_nodes: Dict[str, list]) -> Dict[str, int]: + """ + Delete unused nodes from graph and vector databases. + + Parameters + ---------- + unused_nodes : Dict[str, list] + Dictionary mapping node types to lists of node IDs to delete + + Returns + ------- + Dict[str, int] + Count of deleted items by type + """ + graph_engine = await get_graph_engine() + vector_engine = get_vector_engine() + + deleted_counts = { + "DocumentChunk": 0, + "Entity": 0, + "TextSummary": 0, + "associations": 0 + } + + # Count associations before deletion + for node_type, node_ids in unused_nodes.items(): + if not node_ids: + continue + + # Count edges connected to these nodes + for node_id in node_ids: + result = await graph_engine.query( + "MATCH (n:Node {id: $id})-[r:EDGE]-() RETURN count(r)", + {"id": node_id} + ) + if result and len(result) > 0: + deleted_counts["associations"] += result[0][0] + + # Delete from graph database (uses DETACH DELETE, so edges are automatically removed) + for node_type, node_ids in unused_nodes.items(): + if not node_ids: + continue + + logger.info(f"Deleting {len(node_ids)} {node_type} nodes from graph database") + + # Delete nodes in batches + await graph_engine.delete_nodes(node_ids) + deleted_counts[node_type] = len(node_ids) + + # Delete from vector database + vector_collections = { + "DocumentChunk": "DocumentChunk_text", + "Entity": "Entity_name", + "TextSummary": "TextSummary_text" + } + + for node_type, collection_name in vector_collections.items(): + node_ids = unused_nodes[node_type] + if not node_ids: + continue + + logger.info(f"Deleting {len(node_ids)} {node_type} embeddings from vector database") + + try: + # Delete from vector collection + if await vector_engine.has_collection(collection_name): + for node_id in node_ids: + try: + await vector_engine.delete(collection_name, {"id": str(node_id)}) + except Exception as e: + logger.warning(f"Failed to delete {node_id} from {collection_name}: {e}") + except Exception as e: + logger.error(f"Error deleting from vector collection {collection_name}: {e}") + + return deleted_counts