Merge pull request #6 from chinu0609/delete-last-acessed

fix: flag to enable and disable last_accessed
This commit is contained in:
Chinmay Bhosale 2025-12-10 00:03:16 +05:30 committed by GitHub
commit 6ecf719632
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 519 additions and 423 deletions

View file

@ -5,6 +5,7 @@ Revises: 211ab850ef3d
Create Date: 2025-11-04 21:45:52.642322 Create Date: 2025-11-04 21:45:52.642322
""" """
import os
from typing import Sequence, Union from typing import Sequence, Union
from alembic import op from alembic import op
@ -17,6 +18,7 @@ down_revision: Union[str, None] = '211ab850ef3d'
branch_labels: Union[str, Sequence[str], None] = None branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None
def _get_column(inspector, table, name, schema=None): def _get_column(inspector, table, name, schema=None):
for col in inspector.get_columns(table, schema=schema): for col in inspector.get_columns(table, schema=schema):
if col["name"] == name: if col["name"] == name:
@ -30,11 +32,15 @@ def upgrade() -> None:
last_accessed_column = _get_column(insp, "data", "last_accessed") last_accessed_column = _get_column(insp, "data", "last_accessed")
if not last_accessed_column: if not last_accessed_column:
# Always create the column for schema consistency
op.add_column('data', op.add_column('data',
sa.Column('last_accessed', sa.DateTime(timezone=True), nullable=True) sa.Column('last_accessed', sa.DateTime(timezone=True), nullable=True)
) )
# Optionally initialize with created_at values for existing records
op.execute("UPDATE data SET last_accessed = CURRENT_TIMESTAMP") # Only initialize existing records if feature is enabled
enable_last_accessed = os.getenv("ENABLE_LAST_ACCESSED", "false").lower() == "true"
if enable_last_accessed:
op.execute("UPDATE data SET last_accessed = CURRENT_TIMESTAMP")
def downgrade() -> None: def downgrade() -> None:

View file

@ -4,30 +4,20 @@ import json
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import List, Any from typing import List, Any
from uuid import UUID from uuid import UUID
import os
from cognee.infrastructure.databases.graph import get_graph_engine from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.databases.relational import get_relational_engine from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Data from cognee.modules.data.models import Data
from cognee.shared.logging_utils import get_logger from cognee.shared.logging_utils import get_logger
from sqlalchemy import update from sqlalchemy import update
from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph
logger = get_logger(__name__) logger = get_logger(__name__)
async def update_node_access_timestamps(items: List[Any]): async def update_node_access_timestamps(items: List[Any]):
""" if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true":
Update last_accessed_at for nodes in graph database and corresponding Data records in SQL. return
This function:
1. Updates last_accessed_at in the graph database nodes (in properties JSON)
2. Traverses to find origin TextDocument nodes (without hardcoded relationship names)
3. Updates last_accessed in the SQL Data table for those documents
Parameters
----------
items : List[Any]
List of items with payload containing 'id' field (from vector search results)
"""
if not items: if not items:
return return
@ -46,50 +36,103 @@ async def update_node_access_timestamps(items: List[Any]):
return return
try: try:
# Step 1: Batch update graph nodes # Try to update nodes in graph database (may fail for unsupported DBs)
for node_id in node_ids: await _update_nodes_via_projection(graph_engine, node_ids, timestamp_ms)
result = await graph_engine.query(
"MATCH (n:Node {id: $id}) RETURN n.properties",
{"id": node_id}
)
if result and result[0]:
props = json.loads(result[0][0]) if result[0][0] else {}
props["last_accessed_at"] = timestamp_ms
await graph_engine.query(
"MATCH (n:Node {id: $id}) SET n.properties = $props",
{"id": node_id, "props": json.dumps(props)}
)
logger.debug(f"Updated access timestamps for {len(node_ids)} graph nodes")
# Step 2: Find origin TextDocument nodes (without hardcoded relationship names)
origin_query = """
UNWIND $node_ids AS node_id
MATCH (chunk:Node {id: node_id})-[e:EDGE]-(doc:Node)
WHERE chunk.type = 'DocumentChunk' AND doc.type IN ['TextDocument', 'Document']
RETURN DISTINCT doc.id
"""
result = await graph_engine.query(origin_query, {"node_ids": node_ids})
# Extract and deduplicate document IDs
doc_ids = list(set([row[0] for row in result if row and row[0]])) if result else []
# Step 3: Update SQL Data table
if doc_ids:
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
stmt = update(Data).where(
Data.id.in_([UUID(doc_id) for doc_id in doc_ids])
).values(last_accessed=timestamp_dt)
await session.execute(stmt)
await session.commit()
logger.debug(f"Updated last_accessed for {len(doc_ids)} Data records in SQL")
except Exception as e: except Exception as e:
logger.error(f"Failed to update timestamps: {e}") logger.warning(
f"Failed to update node timestamps in graph database: {e}. "
"Will update document-level timestamps in SQL instead."
)
# Always try to find origin documents and update SQL
# This ensures document-level tracking works even if graph updates fail
try:
doc_ids = await _find_origin_documents_via_projection(graph_engine, node_ids)
if doc_ids:
await _update_sql_records(doc_ids, timestamp_dt)
except Exception as e:
logger.error(f"Failed to update SQL timestamps: {e}")
raise raise
async def _update_nodes_via_projection(graph_engine, node_ids, timestamp_ms):
"""Update nodes using graph projection - works with any graph database"""
# Project the graph with necessary properties
memory_fragment = CogneeGraph()
await memory_fragment.project_graph_from_db(
graph_engine,
node_properties_to_project=["id"],
edge_properties_to_project=[]
)
# Update each node's last_accessed_at property
provider = os.getenv("GRAPH_DATABASE_PROVIDER", "kuzu").lower()
for node_id in node_ids:
node = memory_fragment.get_node(node_id)
if node:
try:
# Update the node in the database
if provider == "kuzu":
# Kuzu stores properties as JSON
result = await graph_engine.query(
"MATCH (n:Node {id: $id}) RETURN n.properties",
{"id": node_id}
)
if result and result[0]:
props = json.loads(result[0][0]) if result[0][0] else {}
props["last_accessed_at"] = timestamp_ms
await graph_engine.query(
"MATCH (n:Node {id: $id}) SET n.properties = $props",
{"id": node_id, "props": json.dumps(props)}
)
elif provider == "neo4j":
await graph_engine.query(
"MATCH (n:__Node__ {id: $id}) SET n.last_accessed_at = $timestamp",
{"id": node_id, "timestamp": timestamp_ms}
)
elif provider == "neptune":
await graph_engine.query(
"MATCH (n:Node {id: $id}) SET n.last_accessed_at = $timestamp",
{"id": node_id, "timestamp": timestamp_ms}
)
except Exception as e:
# Log but continue with other nodes
logger.debug(f"Failed to update node {node_id}: {e}")
continue
async def _find_origin_documents_via_projection(graph_engine, node_ids):
"""Find origin documents using graph projection instead of DB queries"""
# Project the entire graph with necessary properties
memory_fragment = CogneeGraph()
await memory_fragment.project_graph_from_db(
graph_engine,
node_properties_to_project=["id", "type"],
edge_properties_to_project=["relationship_name"]
)
# Find origin documents by traversing the in-memory graph
doc_ids = set()
for node_id in node_ids:
node = memory_fragment.get_node(node_id)
if node and node.get_attribute("type") == "DocumentChunk":
# Traverse edges to find connected documents
for edge in node.get_skeleton_edges():
# Get the neighbor node
neighbor = edge.get_destination_node() if edge.get_source_node().id == node_id else edge.get_source_node()
if neighbor and neighbor.get_attribute("type") in ["TextDocument", "Document"]:
doc_ids.add(neighbor.id)
return list(doc_ids)
async def _update_sql_records(doc_ids, timestamp_dt):
"""Update SQL Data table (same for all providers)"""
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
stmt = update(Data).where(
Data.id.in_([UUID(doc_id) for doc_id in doc_ids])
).values(last_accessed=timestamp_dt)
await session.execute(stmt)
await session.commit()

View file

@ -1,16 +1,16 @@
""" """
Task for automatically deleting unused data from the memify pipeline. Task for automatically deleting unused data from the memify pipeline.
This task identifies and removes data (chunks, entities, summaries) that hasn't This task identifies and removes entire documents that haven't
been accessed by retrievers for a specified period, helping maintain system been accessed by retrievers for a specified period, helping maintain system
efficiency and storage optimization. efficiency and storage optimization through whole-document removal.
""" """
import json import json
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, Any from typing import Optional, Dict, Any
from uuid import UUID from uuid import UUID
import os
from cognee.infrastructure.databases.graph import get_graph_engine from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.databases.vector import get_vector_engine from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.infrastructure.databases.relational import get_relational_engine from cognee.infrastructure.databases.relational import get_relational_engine
@ -18,57 +18,101 @@ from cognee.modules.data.models import Data, DatasetData
from cognee.shared.logging_utils import get_logger from cognee.shared.logging_utils import get_logger
from sqlalchemy import select, or_ from sqlalchemy import select, or_
import cognee import cognee
import sqlalchemy as sa
from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph
logger = get_logger(__name__) logger = get_logger(__name__)
async def cleanup_unused_data( async def cleanup_unused_data(
days_threshold: Optional[int], minutes_threshold: Optional[int],
dry_run: bool = True, dry_run: bool = True,
user_id: Optional[UUID] = None, user_id: Optional[UUID] = None,
text_doc: bool = False text_doc: bool = True, # Changed default to True for document-level cleanup
node_level: bool = False # New parameter for explicit node-level cleanup
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Identify and remove unused data from the memify pipeline. Identify and remove unused data from the memify pipeline.
Parameters Parameters
---------- ----------
days_threshold : int minutes_threshold : int
days since last access to consider data unused Minutes since last access to consider data unused
dry_run : bool dry_run : bool
If True, only report what would be deleted without actually deleting (default: True) If True, only report what would be deleted without actually deleting (default: True)
user_id : UUID, optional user_id : UUID, optional
Limit cleanup to specific user's data (default: None) Limit cleanup to specific user's data (default: None)
text_doc : bool text_doc : bool
If True, use SQL-based filtering to find unused TextDocuments and call cognee.delete() If True (default), use SQL-based filtering to find unused TextDocuments and call cognee.delete()
for proper whole-document deletion (default: False) for proper whole-document deletion
node_level : bool
If True, perform chaotic node-level deletion of unused chunks, entities, and summaries
(default: False - deprecated in favor of document-level cleanup)
Returns Returns
------- -------
Dict[str, Any] Dict[str, Any]
Cleanup results with status, counts, and timestamp Cleanup results with status, counts, and timestamp
""" """
# Check 1: Environment variable must be enabled
if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true":
logger.warning(
"Cleanup skipped: ENABLE_LAST_ACCESSED is not enabled."
)
return {
"status": "skipped",
"reason": "ENABLE_LAST_ACCESSED not enabled",
"unused_count": 0,
"deleted_count": {},
"cleanup_date": datetime.now(timezone.utc).isoformat()
}
# Check 2: Verify tracking has actually been running
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
# Count records with non-NULL last_accessed
tracked_count = await session.execute(
select(sa.func.count(Data.id)).where(Data.last_accessed.isnot(None))
)
tracked_records = tracked_count.scalar()
if tracked_records == 0:
logger.warning(
"Cleanup skipped: No records have been tracked yet. "
"ENABLE_LAST_ACCESSED may have been recently enabled. "
"Wait for retrievers to update timestamps before running cleanup."
)
return {
"status": "skipped",
"reason": "No tracked records found - tracking may be newly enabled",
"unused_count": 0,
"deleted_count": {},
"cleanup_date": datetime.now(timezone.utc).isoformat()
}
logger.info( logger.info(
"Starting cleanup task", "Starting cleanup task",
days_threshold=days_threshold, minutes_threshold=minutes_threshold,
dry_run=dry_run, dry_run=dry_run,
user_id=str(user_id) if user_id else None, user_id=str(user_id) if user_id else None,
text_doc=text_doc text_doc=text_doc,
node_level=node_level
) )
# Calculate cutoff timestamp # Calculate cutoff timestamp
cutoff_date = datetime.now(timezone.utc) - timedelta(days=days_threshold) cutoff_date = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold)
if text_doc: if node_level:
# SQL-based approach: Find unused TextDocuments and use cognee.delete() # Deprecated: Node-level approach (chaotic)
return await _cleanup_via_sql(cutoff_date, dry_run, user_id) logger.warning(
else: "Node-level cleanup is deprecated and may lead to fragmented knowledge graphs. "
# Graph-based approach: Find unused nodes directly from graph "Consider using document-level cleanup (default) instead."
)
cutoff_timestamp_ms = int(cutoff_date.timestamp() * 1000) cutoff_timestamp_ms = int(cutoff_date.timestamp() * 1000)
logger.debug(f"Cutoff timestamp: {cutoff_date.isoformat()} ({cutoff_timestamp_ms}ms)") logger.debug(f"Cutoff timestamp: {cutoff_date.isoformat()} ({cutoff_timestamp_ms}ms)")
# Find unused nodes # Find unused nodes using graph projection
unused_nodes = await _find_unused_nodes(cutoff_timestamp_ms, user_id) unused_nodes = await _find_unused_nodes_via_projection(cutoff_timestamp_ms)
total_unused = sum(len(nodes) for nodes in unused_nodes.values()) total_unused = sum(len(nodes) for nodes in unused_nodes.values())
logger.info(f"Found {total_unused} unused nodes", unused_nodes={k: len(v) for k, v in unused_nodes.items()}) logger.info(f"Found {total_unused} unused nodes", unused_nodes={k: len(v) for k, v in unused_nodes.items()})
@ -92,7 +136,7 @@ async def cleanup_unused_data(
} }
} }
# Delete unused nodes # Delete unused nodes (provider-agnostic deletion)
deleted_counts = await _delete_unused_nodes(unused_nodes) deleted_counts = await _delete_unused_nodes(unused_nodes)
logger.info("Cleanup completed", deleted_counts=deleted_counts) logger.info("Cleanup completed", deleted_counts=deleted_counts)
@ -109,6 +153,9 @@ async def cleanup_unused_data(
}, },
"cleanup_date": datetime.now(timezone.utc).isoformat() "cleanup_date": datetime.now(timezone.utc).isoformat()
} }
else:
# Default: Document-level approach (recommended)
return await _cleanup_via_sql(cutoff_date, dry_run, user_id)
async def _cleanup_via_sql( async def _cleanup_via_sql(
@ -202,19 +249,15 @@ async def _cleanup_via_sql(
} }
async def _find_unused_nodes( async def _find_unused_nodes_via_projection(cutoff_timestamp_ms: int) -> Dict[str, list]:
cutoff_timestamp_ms: int,
user_id: Optional[UUID] = None
) -> Dict[str, list]:
""" """
Query Kuzu for nodes with old last_accessed_at timestamps. Find unused nodes using graph projection - database-agnostic approach.
NOTE: This function is deprecated as it leads to fragmented knowledge graphs.
Parameters Parameters
---------- ----------
cutoff_timestamp_ms : int cutoff_timestamp_ms : int
Cutoff timestamp in milliseconds since epoch Cutoff timestamp in milliseconds since epoch
user_id : UUID, optional
Filter by user ID if provided
Returns Returns
------- -------
@ -223,38 +266,34 @@ async def _find_unused_nodes(
""" """
graph_engine = await get_graph_engine() graph_engine = await get_graph_engine()
# Query all nodes with their properties # Project the entire graph with necessary properties
query = "MATCH (n:Node) RETURN n.id, n.type, n.properties" memory_fragment = CogneeGraph()
results = await graph_engine.query(query) await memory_fragment.project_graph_from_db(
graph_engine,
node_properties_to_project=["id", "type", "last_accessed_at"],
edge_properties_to_project=[]
)
unused_nodes = { unused_nodes = {"DocumentChunk": [], "Entity": [], "TextSummary": []}
"DocumentChunk": [],
"Entity": [],
"TextSummary": []
}
for node_id, node_type, props_json in results: # Get all nodes from the projected graph
# Only process tracked node types all_nodes = memory_fragment.get_nodes()
for node in all_nodes:
node_type = node.get_attribute("type")
if node_type not in unused_nodes: if node_type not in unused_nodes:
continue continue
# Parse properties JSON # Check last_accessed_at property
if props_json: last_accessed = node.get_attribute("last_accessed_at")
try:
props = json.loads(props_json)
last_accessed = props.get("last_accessed_at")
# Check if node is unused (never accessed or accessed before cutoff) if last_accessed is None or last_accessed < cutoff_timestamp_ms:
if last_accessed is None or last_accessed < cutoff_timestamp_ms: unused_nodes[node_type].append(node.id)
unused_nodes[node_type].append(node_id) logger.debug(
logger.debug( f"Found unused {node_type}",
f"Found unused {node_type}", node_id=node.id,
node_id=node_id, last_accessed=last_accessed
last_accessed=last_accessed )
)
except json.JSONDecodeError:
logger.warning(f"Failed to parse properties for node {node_id}")
continue
return unused_nodes return unused_nodes
@ -262,6 +301,7 @@ async def _find_unused_nodes(
async def _delete_unused_nodes(unused_nodes: Dict[str, list]) -> Dict[str, int]: async def _delete_unused_nodes(unused_nodes: Dict[str, list]) -> Dict[str, int]:
""" """
Delete unused nodes from graph and vector databases. Delete unused nodes from graph and vector databases.
NOTE: This function is deprecated as it leads to fragmented knowledge graphs.
Parameters Parameters
---------- ----------
@ -283,19 +323,26 @@ async def _delete_unused_nodes(unused_nodes: Dict[str, list]) -> Dict[str, int]:
"associations": 0 "associations": 0
} }
# Count associations before deletion # Count associations before deletion (using graph projection for consistency)
for node_type, node_ids in unused_nodes.items(): if any(unused_nodes.values()):
if not node_ids: memory_fragment = CogneeGraph()
continue await memory_fragment.project_graph_from_db(
graph_engine,
node_properties_to_project=["id"],
edge_properties_to_project=[]
)
# Count edges connected to these nodes for node_type, node_ids in unused_nodes.items():
for node_id in node_ids: if not node_ids:
result = await graph_engine.query( continue
"MATCH (n:Node {id: $id})-[r:EDGE]-() RETURN count(r)",
{"id": node_id} # Count edges from the in-memory graph
) for node_id in node_ids:
if result and len(result) > 0: node = memory_fragment.get_node(node_id)
deleted_counts["associations"] += result[0][0] if node:
# Count edges from the in-memory graph
edge_count = len(node.get_skeleton_edges())
deleted_counts["associations"] += edge_count
# Delete from graph database (uses DETACH DELETE, so edges are automatically removed) # Delete from graph database (uses DETACH DELETE, so edges are automatically removed)
for node_type, node_ids in unused_nodes.items(): for node_type, node_ids in unused_nodes.items():
@ -304,7 +351,7 @@ async def _delete_unused_nodes(unused_nodes: Dict[str, list]) -> Dict[str, int]:
logger.info(f"Deleting {len(node_ids)} {node_type} nodes from graph database") logger.info(f"Deleting {len(node_ids)} {node_type} nodes from graph database")
# Delete nodes in batches # Delete nodes in batches (database-agnostic)
await graph_engine.delete_nodes(node_ids) await graph_engine.delete_nodes(node_ids)
deleted_counts[node_type] = len(node_ids) deleted_counts[node_type] = len(node_ids)