diff --git a/alembic/versions/e1ec1dcb50b6_add_last_accessed_to_data.py b/alembic/versions/e1ec1dcb50b6_add_last_accessed_to_data.py index a16c99e9f..f1a36ae59 100644 --- a/alembic/versions/e1ec1dcb50b6_add_last_accessed_to_data.py +++ b/alembic/versions/e1ec1dcb50b6_add_last_accessed_to_data.py @@ -1,46 +1,52 @@ -"""add_last_accessed_to_data - -Revision ID: e1ec1dcb50b6 -Revises: 211ab850ef3d -Create Date: 2025-11-04 21:45:52.642322 - -""" -from typing import Sequence, Union - -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision: str = 'e1ec1dcb50b6' -down_revision: Union[str, None] = '211ab850ef3d' -branch_labels: Union[str, Sequence[str], None] = None -depends_on: Union[str, Sequence[str], None] = None - -def _get_column(inspector, table, name, schema=None): - for col in inspector.get_columns(table, schema=schema): - if col["name"] == name: - return col - return None +"""add_last_accessed_to_data + +Revision ID: e1ec1dcb50b6 +Revises: 211ab850ef3d +Create Date: 2025-11-04 21:45:52.642322 + +""" +import os +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa -def upgrade() -> None: - conn = op.get_bind() - insp = sa.inspect(conn) - - last_accessed_column = _get_column(insp, "data", "last_accessed") - if not last_accessed_column: - op.add_column('data', - sa.Column('last_accessed', sa.DateTime(timezone=True), nullable=True) - ) - # Optionally initialize with created_at values for existing records - op.execute("UPDATE data SET last_accessed = CURRENT_TIMESTAMP") +# revision identifiers, used by Alembic. +revision: str = 'e1ec1dcb50b6' +down_revision: Union[str, None] = '211ab850ef3d' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None -def downgrade() -> None: - conn = op.get_bind() - insp = sa.inspect(conn) - - last_accessed_column = _get_column(insp, "data", "last_accessed") - if last_accessed_column: +def _get_column(inspector, table, name, schema=None): + for col in inspector.get_columns(table, schema=schema): + if col["name"] == name: + return col + return None + + +def upgrade() -> None: + conn = op.get_bind() + insp = sa.inspect(conn) + + last_accessed_column = _get_column(insp, "data", "last_accessed") + if not last_accessed_column: + # Always create the column for schema consistency + op.add_column('data', + sa.Column('last_accessed', sa.DateTime(timezone=True), nullable=True) + ) + + # Only initialize existing records if feature is enabled + enable_last_accessed = os.getenv("ENABLE_LAST_ACCESSED", "false").lower() == "true" + if enable_last_accessed: + op.execute("UPDATE data SET last_accessed = CURRENT_TIMESTAMP") + + +def downgrade() -> None: + conn = op.get_bind() + insp = sa.inspect(conn) + + last_accessed_column = _get_column(insp, "data", "last_accessed") + if last_accessed_column: op.drop_column('data', 'last_accessed') diff --git a/cognee/modules/retrieval/utils/access_tracking.py b/cognee/modules/retrieval/utils/access_tracking.py index 65d597a93..c7b06ee17 100644 --- a/cognee/modules/retrieval/utils/access_tracking.py +++ b/cognee/modules/retrieval/utils/access_tracking.py @@ -4,92 +4,135 @@ import json from datetime import datetime, timezone from typing import List, Any from uuid import UUID - +import os from cognee.infrastructure.databases.graph import get_graph_engine from cognee.infrastructure.databases.relational import get_relational_engine from cognee.modules.data.models import Data from cognee.shared.logging_utils import get_logger from sqlalchemy import update +from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph logger = get_logger(__name__) - async def update_node_access_timestamps(items: List[Any]): - """ - Update last_accessed_at for nodes in graph database and corresponding Data records in SQL. - - This function: - 1. Updates last_accessed_at in the graph database nodes (in properties JSON) - 2. Traverses to find origin TextDocument nodes (without hardcoded relationship names) - 3. Updates last_accessed in the SQL Data table for those documents - - Parameters - ---------- - items : List[Any] - List of items with payload containing 'id' field (from vector search results) - """ + if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true": + return + if not items: return - + graph_engine = await get_graph_engine() timestamp_ms = int(datetime.now(timezone.utc).timestamp() * 1000) timestamp_dt = datetime.now(timezone.utc) - + # Extract node IDs node_ids = [] for item in items: item_id = item.payload.get("id") if hasattr(item, 'payload') else item.get("id") if item_id: node_ids.append(str(item_id)) - + if not node_ids: return - + try: - # Step 1: Batch update graph nodes - for node_id in node_ids: - result = await graph_engine.query( - "MATCH (n:Node {id: $id}) RETURN n.properties", - {"id": node_id} - ) - - if result and result[0]: - props = json.loads(result[0][0]) if result[0][0] else {} - props["last_accessed_at"] = timestamp_ms - - await graph_engine.query( - "MATCH (n:Node {id: $id}) SET n.properties = $props", - {"id": node_id, "props": json.dumps(props)} - ) - - logger.debug(f"Updated access timestamps for {len(node_ids)} graph nodes") - - # Step 2: Find origin TextDocument nodes (without hardcoded relationship names) - origin_query = """ - UNWIND $node_ids AS node_id - MATCH (chunk:Node {id: node_id})-[e:EDGE]-(doc:Node) - WHERE chunk.type = 'DocumentChunk' AND doc.type IN ['TextDocument', 'Document'] - RETURN DISTINCT doc.id - """ - - result = await graph_engine.query(origin_query, {"node_ids": node_ids}) - - # Extract and deduplicate document IDs - doc_ids = list(set([row[0] for row in result if row and row[0]])) if result else [] - - # Step 3: Update SQL Data table - if doc_ids: - db_engine = get_relational_engine() - async with db_engine.get_async_session() as session: - stmt = update(Data).where( - Data.id.in_([UUID(doc_id) for doc_id in doc_ids]) - ).values(last_accessed=timestamp_dt) - - await session.execute(stmt) - await session.commit() - - logger.debug(f"Updated last_accessed for {len(doc_ids)} Data records in SQL") - + # Try to update nodes in graph database (may fail for unsupported DBs) + await _update_nodes_via_projection(graph_engine, node_ids, timestamp_ms) except Exception as e: - logger.error(f"Failed to update timestamps: {e}") - raise + logger.warning( + f"Failed to update node timestamps in graph database: {e}. " + "Will update document-level timestamps in SQL instead." + ) + + # Always try to find origin documents and update SQL + # This ensures document-level tracking works even if graph updates fail + try: + doc_ids = await _find_origin_documents_via_projection(graph_engine, node_ids) + if doc_ids: + await _update_sql_records(doc_ids, timestamp_dt) + except Exception as e: + logger.error(f"Failed to update SQL timestamps: {e}") + raise + +async def _update_nodes_via_projection(graph_engine, node_ids, timestamp_ms): + """Update nodes using graph projection - works with any graph database""" + # Project the graph with necessary properties + memory_fragment = CogneeGraph() + await memory_fragment.project_graph_from_db( + graph_engine, + node_properties_to_project=["id"], + edge_properties_to_project=[] + ) + + # Update each node's last_accessed_at property + provider = os.getenv("GRAPH_DATABASE_PROVIDER", "kuzu").lower() + + for node_id in node_ids: + node = memory_fragment.get_node(node_id) + if node: + try: + # Update the node in the database + if provider == "kuzu": + # Kuzu stores properties as JSON + result = await graph_engine.query( + "MATCH (n:Node {id: $id}) RETURN n.properties", + {"id": node_id} + ) + + if result and result[0]: + props = json.loads(result[0][0]) if result[0][0] else {} + props["last_accessed_at"] = timestamp_ms + + await graph_engine.query( + "MATCH (n:Node {id: $id}) SET n.properties = $props", + {"id": node_id, "props": json.dumps(props)} + ) + elif provider == "neo4j": + await graph_engine.query( + "MATCH (n:__Node__ {id: $id}) SET n.last_accessed_at = $timestamp", + {"id": node_id, "timestamp": timestamp_ms} + ) + elif provider == "neptune": + await graph_engine.query( + "MATCH (n:Node {id: $id}) SET n.last_accessed_at = $timestamp", + {"id": node_id, "timestamp": timestamp_ms} + ) + except Exception as e: + # Log but continue with other nodes + logger.debug(f"Failed to update node {node_id}: {e}") + continue + +async def _find_origin_documents_via_projection(graph_engine, node_ids): + """Find origin documents using graph projection instead of DB queries""" + # Project the entire graph with necessary properties + memory_fragment = CogneeGraph() + await memory_fragment.project_graph_from_db( + graph_engine, + node_properties_to_project=["id", "type"], + edge_properties_to_project=["relationship_name"] + ) + + # Find origin documents by traversing the in-memory graph + doc_ids = set() + for node_id in node_ids: + node = memory_fragment.get_node(node_id) + if node and node.get_attribute("type") == "DocumentChunk": + # Traverse edges to find connected documents + for edge in node.get_skeleton_edges(): + # Get the neighbor node + neighbor = edge.get_destination_node() if edge.get_source_node().id == node_id else edge.get_source_node() + if neighbor and neighbor.get_attribute("type") in ["TextDocument", "Document"]: + doc_ids.add(neighbor.id) + + return list(doc_ids) + +async def _update_sql_records(doc_ids, timestamp_dt): + """Update SQL Data table (same for all providers)""" + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + stmt = update(Data).where( + Data.id.in_([UUID(doc_id) for doc_id in doc_ids]) + ).values(last_accessed=timestamp_dt) + + await session.execute(stmt) + await session.commit() diff --git a/cognee/tasks/cleanup/cleanup_unused_data.py b/cognee/tasks/cleanup/cleanup_unused_data.py index fd4b68204..3894635dd 100644 --- a/cognee/tasks/cleanup/cleanup_unused_data.py +++ b/cognee/tasks/cleanup/cleanup_unused_data.py @@ -1,335 +1,382 @@ -""" -Task for automatically deleting unused data from the memify pipeline. - -This task identifies and removes data (chunks, entities, summaries) that hasn't -been accessed by retrievers for a specified period, helping maintain system -efficiency and storage optimization. -""" - -import json -from datetime import datetime, timezone, timedelta -from typing import Optional, Dict, Any -from uuid import UUID - -from cognee.infrastructure.databases.graph import get_graph_engine -from cognee.infrastructure.databases.vector import get_vector_engine -from cognee.infrastructure.databases.relational import get_relational_engine -from cognee.modules.data.models import Data, DatasetData -from cognee.shared.logging_utils import get_logger -from sqlalchemy import select, or_ -import cognee - -logger = get_logger(__name__) - - -async def cleanup_unused_data( - days_threshold: Optional[int], - dry_run: bool = True, - user_id: Optional[UUID] = None, - text_doc: bool = False -) -> Dict[str, Any]: - """ - Identify and remove unused data from the memify pipeline. +""" +Task for automatically deleting unused data from the memify pipeline. - Parameters - ---------- - days_threshold : int - days since last access to consider data unused - dry_run : bool - If True, only report what would be deleted without actually deleting (default: True) - user_id : UUID, optional - Limit cleanup to specific user's data (default: None) - text_doc : bool - If True, use SQL-based filtering to find unused TextDocuments and call cognee.delete() - for proper whole-document deletion (default: False) +This task identifies and removes entire documents that haven't +been accessed by retrievers for a specified period, helping maintain system +efficiency and storage optimization through whole-document removal. +""" - Returns - ------- - Dict[str, Any] - Cleanup results with status, counts, and timestamp - """ - logger.info( - "Starting cleanup task", - days_threshold=days_threshold, - dry_run=dry_run, - user_id=str(user_id) if user_id else None, - text_doc=text_doc +import json +from datetime import datetime, timezone, timedelta +from typing import Optional, Dict, Any +from uuid import UUID +import os +from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.infrastructure.databases.vector import get_vector_engine +from cognee.infrastructure.databases.relational import get_relational_engine +from cognee.modules.data.models import Data, DatasetData +from cognee.shared.logging_utils import get_logger +from sqlalchemy import select, or_ +import cognee +import sqlalchemy as sa +from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph + +logger = get_logger(__name__) + + +async def cleanup_unused_data( + minutes_threshold: Optional[int], + dry_run: bool = True, + user_id: Optional[UUID] = None, + text_doc: bool = True, # Changed default to True for document-level cleanup + node_level: bool = False # New parameter for explicit node-level cleanup +) -> Dict[str, Any]: + """ + Identify and remove unused data from the memify pipeline. + + Parameters + ---------- + minutes_threshold : int + Minutes since last access to consider data unused + dry_run : bool + If True, only report what would be deleted without actually deleting (default: True) + user_id : UUID, optional + Limit cleanup to specific user's data (default: None) + text_doc : bool + If True (default), use SQL-based filtering to find unused TextDocuments and call cognee.delete() + for proper whole-document deletion + node_level : bool + If True, perform chaotic node-level deletion of unused chunks, entities, and summaries + (default: False - deprecated in favor of document-level cleanup) + + Returns + ------- + Dict[str, Any] + Cleanup results with status, counts, and timestamp + """ + # Check 1: Environment variable must be enabled + if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true": + logger.warning( + "Cleanup skipped: ENABLE_LAST_ACCESSED is not enabled." + ) + return { + "status": "skipped", + "reason": "ENABLE_LAST_ACCESSED not enabled", + "unused_count": 0, + "deleted_count": {}, + "cleanup_date": datetime.now(timezone.utc).isoformat() + } + + # Check 2: Verify tracking has actually been running + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + # Count records with non-NULL last_accessed + tracked_count = await session.execute( + select(sa.func.count(Data.id)).where(Data.last_accessed.isnot(None)) + ) + tracked_records = tracked_count.scalar() + + if tracked_records == 0: + logger.warning( + "Cleanup skipped: No records have been tracked yet. " + "ENABLE_LAST_ACCESSED may have been recently enabled. " + "Wait for retrievers to update timestamps before running cleanup." + ) + return { + "status": "skipped", + "reason": "No tracked records found - tracking may be newly enabled", + "unused_count": 0, + "deleted_count": {}, + "cleanup_date": datetime.now(timezone.utc).isoformat() + } + + logger.info( + "Starting cleanup task", + minutes_threshold=minutes_threshold, + dry_run=dry_run, + user_id=str(user_id) if user_id else None, + text_doc=text_doc, + node_level=node_level + ) + + # Calculate cutoff timestamp + cutoff_date = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold) + + if node_level: + # Deprecated: Node-level approach (chaotic) + logger.warning( + "Node-level cleanup is deprecated and may lead to fragmented knowledge graphs. " + "Consider using document-level cleanup (default) instead." + ) + cutoff_timestamp_ms = int(cutoff_date.timestamp() * 1000) + logger.debug(f"Cutoff timestamp: {cutoff_date.isoformat()} ({cutoff_timestamp_ms}ms)") + + # Find unused nodes using graph projection + unused_nodes = await _find_unused_nodes_via_projection(cutoff_timestamp_ms) + + total_unused = sum(len(nodes) for nodes in unused_nodes.values()) + logger.info(f"Found {total_unused} unused nodes", unused_nodes={k: len(v) for k, v in unused_nodes.items()}) + + if dry_run: + return { + "status": "dry_run", + "unused_count": total_unused, + "deleted_count": { + "data_items": 0, + "chunks": 0, + "entities": 0, + "summaries": 0, + "associations": 0 + }, + "cleanup_date": datetime.now(timezone.utc).isoformat(), + "preview": { + "chunks": len(unused_nodes["DocumentChunk"]), + "entities": len(unused_nodes["Entity"]), + "summaries": len(unused_nodes["TextSummary"]) + } + } + + # Delete unused nodes (provider-agnostic deletion) + deleted_counts = await _delete_unused_nodes(unused_nodes) + + logger.info("Cleanup completed", deleted_counts=deleted_counts) + + return { + "status": "completed", + "unused_count": total_unused, + "deleted_count": { + "data_items": 0, + "chunks": deleted_counts["DocumentChunk"], + "entities": deleted_counts["Entity"], + "summaries": deleted_counts["TextSummary"], + "associations": deleted_counts["associations"] + }, + "cleanup_date": datetime.now(timezone.utc).isoformat() + } + else: + # Default: Document-level approach (recommended) + return await _cleanup_via_sql(cutoff_date, dry_run, user_id) + + +async def _cleanup_via_sql( + cutoff_date: datetime, + dry_run: bool, + user_id: Optional[UUID] = None +) -> Dict[str, Any]: + """ + SQL-based cleanup: Query Data table for unused documents and use cognee.delete(). + + Parameters + ---------- + cutoff_date : datetime + Cutoff date for last_accessed filtering + dry_run : bool + If True, only report what would be deleted + user_id : UUID, optional + Filter by user ID if provided + + Returns + ------- + Dict[str, Any] + Cleanup results + """ + db_engine = get_relational_engine() + + async with db_engine.get_async_session() as session: + # Query for Data records with old last_accessed timestamps + query = select(Data, DatasetData).join( + DatasetData, Data.id == DatasetData.data_id + ).where( + or_( + Data.last_accessed < cutoff_date, + Data.last_accessed.is_(None) + ) + ) + + if user_id: + from cognee.modules.data.models import Dataset + query = query.join(Dataset, DatasetData.dataset_id == Dataset.id).where( + Dataset.owner_id == user_id + ) + + result = await session.execute(query) + unused_data = result.all() + + logger.info(f"Found {len(unused_data)} unused documents in SQL") + + if dry_run: + return { + "status": "dry_run", + "unused_count": len(unused_data), + "deleted_count": { + "data_items": 0, + "documents": 0 + }, + "cleanup_date": datetime.now(timezone.utc).isoformat(), + "preview": { + "documents": len(unused_data) + } + } + + # Delete each document using cognee.delete() + deleted_count = 0 + from cognee.modules.users.methods import get_default_user + user = await get_default_user() if user_id is None else None + + for data, dataset_data in unused_data: + try: + await cognee.delete( + data_id=data.id, + dataset_id=dataset_data.dataset_id, + mode="hard", # Use hard mode to also remove orphaned entities + user=user + ) + deleted_count += 1 + logger.info(f"Deleted document {data.id} from dataset {dataset_data.dataset_id}") + except Exception as e: + logger.error(f"Failed to delete document {data.id}: {e}") + + logger.info("Cleanup completed", deleted_count=deleted_count) + + return { + "status": "completed", + "unused_count": len(unused_data), + "deleted_count": { + "data_items": deleted_count, + "documents": deleted_count + }, + "cleanup_date": datetime.now(timezone.utc).isoformat() + } + + +async def _find_unused_nodes_via_projection(cutoff_timestamp_ms: int) -> Dict[str, list]: + """ + Find unused nodes using graph projection - database-agnostic approach. + NOTE: This function is deprecated as it leads to fragmented knowledge graphs. + + Parameters + ---------- + cutoff_timestamp_ms : int + Cutoff timestamp in milliseconds since epoch + + Returns + ------- + Dict[str, list] + Dictionary mapping node types to lists of unused node IDs + """ + graph_engine = await get_graph_engine() + + # Project the entire graph with necessary properties + memory_fragment = CogneeGraph() + await memory_fragment.project_graph_from_db( + graph_engine, + node_properties_to_project=["id", "type", "last_accessed_at"], + edge_properties_to_project=[] ) - # Calculate cutoff timestamp - cutoff_date = datetime.now(timezone.utc) - timedelta(days=days_threshold) - - if text_doc: - # SQL-based approach: Find unused TextDocuments and use cognee.delete() - return await _cleanup_via_sql(cutoff_date, dry_run, user_id) - else: - # Graph-based approach: Find unused nodes directly from graph - cutoff_timestamp_ms = int(cutoff_date.timestamp() * 1000) - logger.debug(f"Cutoff timestamp: {cutoff_date.isoformat()} ({cutoff_timestamp_ms}ms)") - - # Find unused nodes - unused_nodes = await _find_unused_nodes(cutoff_timestamp_ms, user_id) - - total_unused = sum(len(nodes) for nodes in unused_nodes.values()) - logger.info(f"Found {total_unused} unused nodes", unused_nodes={k: len(v) for k, v in unused_nodes.items()}) - - if dry_run: - return { - "status": "dry_run", - "unused_count": total_unused, - "deleted_count": { - "data_items": 0, - "chunks": 0, - "entities": 0, - "summaries": 0, - "associations": 0 - }, - "cleanup_date": datetime.now(timezone.utc).isoformat(), - "preview": { - "chunks": len(unused_nodes["DocumentChunk"]), - "entities": len(unused_nodes["Entity"]), - "summaries": len(unused_nodes["TextSummary"]) - } - } - - # Delete unused nodes - deleted_counts = await _delete_unused_nodes(unused_nodes) - - logger.info("Cleanup completed", deleted_counts=deleted_counts) - - return { - "status": "completed", - "unused_count": total_unused, - "deleted_count": { - "data_items": 0, - "chunks": deleted_counts["DocumentChunk"], - "entities": deleted_counts["Entity"], - "summaries": deleted_counts["TextSummary"], - "associations": deleted_counts["associations"] - }, - "cleanup_date": datetime.now(timezone.utc).isoformat() - } - - -async def _cleanup_via_sql( - cutoff_date: datetime, - dry_run: bool, - user_id: Optional[UUID] = None -) -> Dict[str, Any]: - """ - SQL-based cleanup: Query Data table for unused documents and use cognee.delete(). - - Parameters - ---------- - cutoff_date : datetime - Cutoff date for last_accessed filtering - dry_run : bool - If True, only report what would be deleted - user_id : UUID, optional - Filter by user ID if provided - - Returns - ------- - Dict[str, Any] - Cleanup results - """ - db_engine = get_relational_engine() - - async with db_engine.get_async_session() as session: - # Query for Data records with old last_accessed timestamps - query = select(Data, DatasetData).join( - DatasetData, Data.id == DatasetData.data_id - ).where( - or_( - Data.last_accessed < cutoff_date, - Data.last_accessed.is_(None) - ) - ) - - if user_id: - from cognee.modules.data.models import Dataset - query = query.join(Dataset, DatasetData.dataset_id == Dataset.id).where( - Dataset.owner_id == user_id - ) - - result = await session.execute(query) - unused_data = result.all() - - logger.info(f"Found {len(unused_data)} unused documents in SQL") - - if dry_run: - return { - "status": "dry_run", - "unused_count": len(unused_data), - "deleted_count": { - "data_items": 0, - "documents": 0 - }, - "cleanup_date": datetime.now(timezone.utc).isoformat(), - "preview": { - "documents": len(unused_data) - } - } - - # Delete each document using cognee.delete() - deleted_count = 0 - from cognee.modules.users.methods import get_default_user - user = await get_default_user() if user_id is None else None - - for data, dataset_data in unused_data: - try: - await cognee.delete( - data_id=data.id, - dataset_id=dataset_data.dataset_id, - mode="hard", # Use hard mode to also remove orphaned entities - user=user - ) - deleted_count += 1 - logger.info(f"Deleted document {data.id} from dataset {dataset_data.dataset_id}") - except Exception as e: - logger.error(f"Failed to delete document {data.id}: {e}") - - logger.info("Cleanup completed", deleted_count=deleted_count) - - return { - "status": "completed", - "unused_count": len(unused_data), - "deleted_count": { - "data_items": deleted_count, - "documents": deleted_count - }, - "cleanup_date": datetime.now(timezone.utc).isoformat() - } - - -async def _find_unused_nodes( - cutoff_timestamp_ms: int, - user_id: Optional[UUID] = None -) -> Dict[str, list]: - """ - Query Kuzu for nodes with old last_accessed_at timestamps. + unused_nodes = {"DocumentChunk": [], "Entity": [], "TextSummary": []} - Parameters - ---------- - cutoff_timestamp_ms : int - Cutoff timestamp in milliseconds since epoch - user_id : UUID, optional - Filter by user ID if provided + # Get all nodes from the projected graph + all_nodes = memory_fragment.get_nodes() - Returns - ------- - Dict[str, list] - Dictionary mapping node types to lists of unused node IDs - """ - graph_engine = await get_graph_engine() - - # Query all nodes with their properties - query = "MATCH (n:Node) RETURN n.id, n.type, n.properties" - results = await graph_engine.query(query) - - unused_nodes = { - "DocumentChunk": [], - "Entity": [], - "TextSummary": [] - } - - for node_id, node_type, props_json in results: - # Only process tracked node types + for node in all_nodes: + node_type = node.get_attribute("type") if node_type not in unused_nodes: continue + + # Check last_accessed_at property + last_accessed = node.get_attribute("last_accessed_at") - # Parse properties JSON - if props_json: - try: - props = json.loads(props_json) - last_accessed = props.get("last_accessed_at") - - # Check if node is unused (never accessed or accessed before cutoff) - if last_accessed is None or last_accessed < cutoff_timestamp_ms: - unused_nodes[node_type].append(node_id) - logger.debug( - f"Found unused {node_type}", - node_id=node_id, - last_accessed=last_accessed - ) - except json.JSONDecodeError: - logger.warning(f"Failed to parse properties for node {node_id}") - continue + if last_accessed is None or last_accessed < cutoff_timestamp_ms: + unused_nodes[node_type].append(node.id) + logger.debug( + f"Found unused {node_type}", + node_id=node.id, + last_accessed=last_accessed + ) return unused_nodes -async def _delete_unused_nodes(unused_nodes: Dict[str, list]) -> Dict[str, int]: - """ - Delete unused nodes from graph and vector databases. +async def _delete_unused_nodes(unused_nodes: Dict[str, list]) -> Dict[str, int]: + """ + Delete unused nodes from graph and vector databases. + NOTE: This function is deprecated as it leads to fragmented knowledge graphs. + + Parameters + ---------- + unused_nodes : Dict[str, list] + Dictionary mapping node types to lists of node IDs to delete + + Returns + ------- + Dict[str, int] + Count of deleted items by type + """ + graph_engine = await get_graph_engine() + vector_engine = get_vector_engine() + + deleted_counts = { + "DocumentChunk": 0, + "Entity": 0, + "TextSummary": 0, + "associations": 0 + } + + # Count associations before deletion (using graph projection for consistency) + if any(unused_nodes.values()): + memory_fragment = CogneeGraph() + await memory_fragment.project_graph_from_db( + graph_engine, + node_properties_to_project=["id"], + edge_properties_to_project=[] + ) + + for node_type, node_ids in unused_nodes.items(): + if not node_ids: + continue + + # Count edges from the in-memory graph + for node_id in node_ids: + node = memory_fragment.get_node(node_id) + if node: + # Count edges from the in-memory graph + edge_count = len(node.get_skeleton_edges()) + deleted_counts["associations"] += edge_count - Parameters - ---------- - unused_nodes : Dict[str, list] - Dictionary mapping node types to lists of node IDs to delete - - Returns - ------- - Dict[str, int] - Count of deleted items by type - """ - graph_engine = await get_graph_engine() - vector_engine = get_vector_engine() - - deleted_counts = { - "DocumentChunk": 0, - "Entity": 0, - "TextSummary": 0, - "associations": 0 - } - - # Count associations before deletion - for node_type, node_ids in unused_nodes.items(): + # Delete from graph database (uses DETACH DELETE, so edges are automatically removed) + for node_type, node_ids in unused_nodes.items(): + if not node_ids: + continue + + logger.info(f"Deleting {len(node_ids)} {node_type} nodes from graph database") + + # Delete nodes in batches (database-agnostic) + await graph_engine.delete_nodes(node_ids) + deleted_counts[node_type] = len(node_ids) + + # Delete from vector database + vector_collections = { + "DocumentChunk": "DocumentChunk_text", + "Entity": "Entity_name", + "TextSummary": "TextSummary_text" + } + + + for node_type, collection_name in vector_collections.items(): + node_ids = unused_nodes[node_type] if not node_ids: continue + + logger.info(f"Deleting {len(node_ids)} {node_type} embeddings from vector database") + + try: + if await vector_engine.has_collection(collection_name): + await vector_engine.delete_data_points( + collection_name, + [str(node_id) for node_id in node_ids] + ) + except Exception as e: + logger.error(f"Error deleting from vector collection {collection_name}: {e}") - # Count edges connected to these nodes - for node_id in node_ids: - result = await graph_engine.query( - "MATCH (n:Node {id: $id})-[r:EDGE]-() RETURN count(r)", - {"id": node_id} - ) - if result and len(result) > 0: - deleted_counts["associations"] += result[0][0] - - # Delete from graph database (uses DETACH DELETE, so edges are automatically removed) - for node_type, node_ids in unused_nodes.items(): - if not node_ids: - continue - - logger.info(f"Deleting {len(node_ids)} {node_type} nodes from graph database") - - # Delete nodes in batches - await graph_engine.delete_nodes(node_ids) - deleted_counts[node_type] = len(node_ids) - - # Delete from vector database - vector_collections = { - "DocumentChunk": "DocumentChunk_text", - "Entity": "Entity_name", - "TextSummary": "TextSummary_text" - } - - - for node_type, collection_name in vector_collections.items(): - node_ids = unused_nodes[node_type] - if not node_ids: - continue - - logger.info(f"Deleting {len(node_ids)} {node_type} embeddings from vector database") - - try: - if await vector_engine.has_collection(collection_name): - await vector_engine.delete_data_points( - collection_name, - [str(node_id) for node_id in node_ids] - ) - except Exception as e: - logger.error(f"Error deleting from vector collection {collection_name}: {e}") - return deleted_counts