From 5f00abf3e4f3b913ae67391d487104ea3b9ae872 Mon Sep 17 00:00:00 2001 From: chinu0609 Date: Tue, 2 Dec 2025 22:25:03 +0530 Subject: [PATCH] fix: fallback and document deletion --- .../retrieval/utils/access_tracking.py | 73 +++++++++++-------- cognee/tasks/cleanup/cleanup_unused_data.py | 41 +++++++---- 2 files changed, 68 insertions(+), 46 deletions(-) diff --git a/cognee/modules/retrieval/utils/access_tracking.py b/cognee/modules/retrieval/utils/access_tracking.py index 935c47157..c7b06ee17 100644 --- a/cognee/modules/retrieval/utils/access_tracking.py +++ b/cognee/modules/retrieval/utils/access_tracking.py @@ -36,16 +36,22 @@ async def update_node_access_timestamps(items: List[Any]): return try: - # Update nodes using graph projection ( database-agnostic approach + # Try to update nodes in graph database (may fail for unsupported DBs) await _update_nodes_via_projection(graph_engine, node_ids, timestamp_ms) + except Exception as e: + logger.warning( + f"Failed to update node timestamps in graph database: {e}. " + "Will update document-level timestamps in SQL instead." + ) - # Find origin documents and update SQL + # Always try to find origin documents and update SQL + # This ensures document-level tracking works even if graph updates fail + try: doc_ids = await _find_origin_documents_via_projection(graph_engine, node_ids) if doc_ids: await _update_sql_records(doc_ids, timestamp_dt) - except Exception as e: - logger.error(f"Failed to update timestamps: {e}") + logger.error(f"Failed to update SQL timestamps: {e}") raise async def _update_nodes_via_projection(graph_engine, node_ids, timestamp_ms): @@ -59,37 +65,42 @@ async def _update_nodes_via_projection(graph_engine, node_ids, timestamp_ms): ) # Update each node's last_accessed_at property + provider = os.getenv("GRAPH_DATABASE_PROVIDER", "kuzu").lower() + for node_id in node_ids: node = memory_fragment.get_node(node_id) if node: - # Update the node in the database - provider = os.getenv("GRAPH_DATABASE_PROVIDER", "kuzu").lower() - - if provider == "kuzu": - # Kuzu stores properties as JSON - result = await graph_engine.query( - "MATCH (n:Node {id: $id}) RETURN n.properties", - {"id": node_id} - ) - - if result and result[0]: - props = json.loads(result[0][0]) if result[0][0] else {} - props["last_accessed_at"] = timestamp_ms - - await graph_engine.query( - "MATCH (n:Node {id: $id}) SET n.properties = $props", - {"id": node_id, "props": json.dumps(props)} + try: + # Update the node in the database + if provider == "kuzu": + # Kuzu stores properties as JSON + result = await graph_engine.query( + "MATCH (n:Node {id: $id}) RETURN n.properties", + {"id": node_id} ) - elif provider == "neo4j": - await graph_engine.query( - "MATCH (n:__Node__ {id: $id}) SET n.last_accessed_at = $timestamp", - {"id": node_id, "timestamp": timestamp_ms} - ) - elif provider == "neptune": - await graph_engine.query( - "MATCH (n:Node {id: $id}) SET n.last_accessed_at = $timestamp", - {"id": node_id, "timestamp": timestamp_ms} - ) + + if result and result[0]: + props = json.loads(result[0][0]) if result[0][0] else {} + props["last_accessed_at"] = timestamp_ms + + await graph_engine.query( + "MATCH (n:Node {id: $id}) SET n.properties = $props", + {"id": node_id, "props": json.dumps(props)} + ) + elif provider == "neo4j": + await graph_engine.query( + "MATCH (n:__Node__ {id: $id}) SET n.last_accessed_at = $timestamp", + {"id": node_id, "timestamp": timestamp_ms} + ) + elif provider == "neptune": + await graph_engine.query( + "MATCH (n:Node {id: $id}) SET n.last_accessed_at = $timestamp", + {"id": node_id, "timestamp": timestamp_ms} + ) + except Exception as e: + # Log but continue with other nodes + logger.debug(f"Failed to update node {node_id}: {e}") + continue async def _find_origin_documents_via_projection(graph_engine, node_ids): """Find origin documents using graph projection instead of DB queries""" diff --git a/cognee/tasks/cleanup/cleanup_unused_data.py b/cognee/tasks/cleanup/cleanup_unused_data.py index c70b97a00..3894635dd 100644 --- a/cognee/tasks/cleanup/cleanup_unused_data.py +++ b/cognee/tasks/cleanup/cleanup_unused_data.py @@ -1,9 +1,9 @@ """ Task for automatically deleting unused data from the memify pipeline. -This task identifies and removes data (chunks, entities, summaries)) that hasn't +This task identifies and removes entire documents that haven't been accessed by retrievers for a specified period, helping maintain system -efficiency and storage optimization. +efficiency and storage optimization through whole-document removal. """ import json @@ -28,22 +28,26 @@ async def cleanup_unused_data( minutes_threshold: Optional[int], dry_run: bool = True, user_id: Optional[UUID] = None, - text_doc: bool = False + text_doc: bool = True, # Changed default to True for document-level cleanup + node_level: bool = False # New parameter for explicit node-level cleanup ) -> Dict[str, Any]: """ Identify and remove unused data from the memify pipeline. - + Parameters ---------- minutes_threshold : int - days since last access to consider data unused + Minutes since last access to consider data unused dry_run : bool - If True, only report what would be delete without actually deleting (default: True) + If True, only report what would be deleted without actually deleting (default: True) user_id : UUID, optional Limit cleanup to specific user's data (default: None) text_doc : bool - If True, use SQL-based filtering to find unused TextDocuments and call cognee.delete() - for proper whole-document deletion (default: False) + If True (default), use SQL-based filtering to find unused TextDocuments and call cognee.delete() + for proper whole-document deletion + node_level : bool + If True, perform chaotic node-level deletion of unused chunks, entities, and summaries + (default: False - deprecated in favor of document-level cleanup) Returns ------- @@ -91,17 +95,19 @@ async def cleanup_unused_data( minutes_threshold=minutes_threshold, dry_run=dry_run, user_id=str(user_id) if user_id else None, - text_doc=text_doc + text_doc=text_doc, + node_level=node_level ) # Calculate cutoff timestamp cutoff_date = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold) - if text_doc: - # SQL-based approach: Find unused TextDocuments and use cognee.delete() - return await _cleanup_via_sql(cutoff_date, dry_run, user_id) - else: - # Graph-based approach: Find unused nodes using projection (database-agnostic) + if node_level: + # Deprecated: Node-level approach (chaotic) + logger.warning( + "Node-level cleanup is deprecated and may lead to fragmented knowledge graphs. " + "Consider using document-level cleanup (default) instead." + ) cutoff_timestamp_ms = int(cutoff_date.timestamp() * 1000) logger.debug(f"Cutoff timestamp: {cutoff_date.isoformat()} ({cutoff_timestamp_ms}ms)") @@ -147,6 +153,9 @@ async def cleanup_unused_data( }, "cleanup_date": datetime.now(timezone.utc).isoformat() } + else: + # Default: Document-level approach (recommended) + return await _cleanup_via_sql(cutoff_date, dry_run, user_id) async def _cleanup_via_sql( @@ -243,6 +252,7 @@ async def _cleanup_via_sql( async def _find_unused_nodes_via_projection(cutoff_timestamp_ms: int) -> Dict[str, list]: """ Find unused nodes using graph projection - database-agnostic approach. + NOTE: This function is deprecated as it leads to fragmented knowledge graphs. Parameters ---------- @@ -291,6 +301,7 @@ async def _find_unused_nodes_via_projection(cutoff_timestamp_ms: int) -> Dict[st async def _delete_unused_nodes(unused_nodes: Dict[str, list]) -> Dict[str, int]: """ Delete unused nodes from graph and vector databases. + NOTE: This function is deprecated as it leads to fragmented knowledge graphs. Parameters ---------- @@ -325,7 +336,7 @@ async def _delete_unused_nodes(unused_nodes: Dict[str, list]) -> Dict[str, int]: if not node_ids: continue - # Count edges connected to these nodes + # Count edges from the in-memory graph for node_id in node_ids: node = memory_fragment.get_node(node_id) if node: