Merge pull request #5 from chinu0609/delete-last-acessed

feat: adding cleanup function and adding update_node_acess_timestamps…
This commit is contained in:
Chinmay Bhosale 2025-11-06 23:02:18 +05:30 committed by GitHub
commit bd71540d75
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 468 additions and 34 deletions

View file

@ -0,0 +1,46 @@
"""add_last_accessed_to_data
Revision ID: e1ec1dcb50b6
Revises: 211ab850ef3d
Create Date: 2025-11-04 21:45:52.642322
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'e1ec1dcb50b6'
down_revision: Union[str, None] = '211ab850ef3d'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def _get_column(inspector, table, name, schema=None):
for col in inspector.get_columns(table, schema=schema):
if col["name"] == name:
return col
return None
def upgrade() -> None:
conn = op.get_bind()
insp = sa.inspect(conn)
last_accessed_column = _get_column(insp, "data", "last_accessed")
if not last_accessed_column:
op.add_column('data',
sa.Column('last_accessed', sa.DateTime(timezone=True), nullable=True)
)
# Optionally initialize with created_at values for existing records
op.execute("UPDATE data SET last_accessed = created_at")
def downgrade() -> None:
conn = op.get_bind()
insp = sa.inspect(conn)
last_accessed_column = _get_column(insp, "data", "last_accessed")
if last_accessed_column:
op.drop_column('data', 'last_accessed')

View file

@ -36,6 +36,7 @@ class Data(Base):
data_size = Column(Integer, nullable=True) # File size in bytes
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc))
last_accessed = Column(DateTime(timezone=True), nullable=True)
datasets = relationship(
"Dataset",

View file

@ -5,3 +5,4 @@ from .retrieve_existing_edges import retrieve_existing_edges
from .convert_node_to_data_point import convert_node_to_data_point
from .deduplicate_nodes_and_edges import deduplicate_nodes_and_edges
from .resolve_edges_to_text import resolve_edges_to_text
from .get_entity_nodes_from_triplets import get_entity_nodes_from_triplets

View file

@ -0,0 +1,13 @@
def get_entity_nodes_from_triplets(triplets):
entity_nodes = []
seen_ids = set()
for triplet in triplets:
if hasattr(triplet, 'node1') and triplet.node1 and triplet.node1.id not in seen_ids:
entity_nodes.append({"id": str(triplet.node1.id)})
seen_ids.add(triplet.node1.id)
if hasattr(triplet, 'node2') and triplet.node2 and triplet.node2.id not in seen_ids:
entity_nodes.append({"id": str(triplet.node2.id)})
seen_ids.add(triplet.node2.id)
return entity_nodes

View file

@ -8,6 +8,7 @@ from cognee.modules.retrieval.utils.session_cache import (
save_conversation_history,
get_conversation_history,
)
from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps
from cognee.modules.retrieval.base_retriever import BaseRetriever
from cognee.modules.retrieval.exceptions.exceptions import NoDataError
from cognee.infrastructure.databases.vector.exceptions import CollectionNotFoundError
@ -65,7 +66,7 @@ class CompletionRetriever(BaseRetriever):
if len(found_chunks) == 0:
return ""
await update_node_access_timestamps(found_chunks)
# Combine all chunks text returned from vector search (number of chunks is determined by top_k
chunks_payload = [found_chunk.payload["text"] for found_chunk in found_chunks]
combined_context = "\n".join(chunks_payload)

View file

@ -16,11 +16,13 @@ from cognee.modules.retrieval.utils.session_cache import (
)
from cognee.shared.logging_utils import get_logger
from cognee.modules.retrieval.utils.extract_uuid_from_node import extract_uuid_from_node
from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps
from cognee.modules.retrieval.utils.models import CogneeUserInteraction
from cognee.modules.engine.models.node_set import NodeSet
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.context_global_variables import session_user
from cognee.infrastructure.databases.cache.config import CacheConfig
from cognee.modules.graph.utils import get_entity_nodes_from_triplets
logger = get_logger("GraphCompletionRetriever")
@ -139,6 +141,9 @@ class GraphCompletionRetriever(BaseGraphRetriever):
# context = await self.resolve_edges_to_text(triplets)
entity_nodes = get_entity_nodes_from_triplets(triplets)
await update_node_access_timestamps(entity_nodes)
return triplets
async def get_completion(

View file

@ -1,20 +1,27 @@
"""Utilities for tracking data access in retrievers."""
import json
from datetime import datetime, timezone
from typing import List, Any
from uuid import UUID
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Data
from cognee.shared.logging_utils import get_logger
from sqlalchemy import update
logger = get_logger(__name__)
async def update_node_access_timestamps(items: List[Any]):
"""
Update last_accessed_at for nodes in Kuzu graph database.
Automatically determines node type from the graph database.
Update last_accessed_at for nodes in graph database and corresponding Data records in SQL.
This function:
1. Updates last_accessed_at in the graph database nodes (in properties JSON)
2. Traverses to find origin TextDocument nodes (without hardcoded relationship names)
3. Updates last_accessed in the SQL Data table for those documents
Parameters
----------
@ -26,39 +33,63 @@ async def update_node_access_timestamps(items: List[Any]):
graph_engine = await get_graph_engine()
timestamp_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
timestamp_dt = datetime.now(timezone.utc)
# Extract node IDs
node_ids = []
for item in items:
# Extract ID from payload
item_id = item.payload.get("id") if hasattr(item, 'payload') else item.get("id")
if not item_id:
continue
# try:
# Query to get both node type and properties in one call
result = await graph_engine.query(
"MATCH (n:Node {id: $id}) RETURN n.type as node_type, n.properties as props",
{"id": str(item_id)}
)
if result and len(result) > 0 and result[0]:
node_type = result[0][0] # First column: node_type
props_json = result[0][1] # Second column: properties
# Parse existing properties JSON
props = json.loads(props_json) if props_json else {}
# Update last_accessed_at with millisecond timestamp
props["last_accessed_at"] = timestamp_ms
# Write back to graph database
await graph_engine.query(
"MATCH (n:Node {id: $id}) SET n.properties = $props",
{"id": str(item_id), "props": json.dumps(props)}
if item_id:
node_ids.append(str(item_id))
if not node_ids:
return
try:
# Step 1: Batch update graph nodes
for node_id in node_ids:
result = await graph_engine.query(
"MATCH (n:Node {id: $id}) RETURN n.properties",
{"id": node_id}
)
logger.debug(f"Updated access timestamp for {node_type} node {item_id}")
if result and result[0]:
props = json.loads(result[0][0]) if result[0][0] else {}
props["last_accessed_at"] = timestamp_ms
# except Exception as e:
# logger.error(f"Failed to update timestamp for node {item_id}: {e}")
# continue
logger.debug(f"Updated access timestamps for {len(items)} nodes")
await graph_engine.query(
"MATCH (n:Node {id: $id}) SET n.properties = $props",
{"id": node_id, "props": json.dumps(props)}
)
logger.debug(f"Updated access timestamps for {len(node_ids)} graph nodes")
# Step 2: Find origin TextDocument nodes (without hardcoded relationship names)
origin_query = """
UNWIND $node_ids AS node_id
MATCH (chunk:Node {id: node_id})-[e:EDGE]-(doc:Node)
WHERE chunk.type = 'DocumentChunk' AND doc.type IN ['TextDocument', 'Document']
RETURN DISTINCT doc.id
"""
result = await graph_engine.query(origin_query, {"node_ids": node_ids})
# Extract and deduplicate document IDs
doc_ids = list(set([row[0] for row in result if row and row[0]])) if result else []
# Step 3: Update SQL Data table
if doc_ids:
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
stmt = update(Data).where(
Data.id.in_([UUID(doc_id) for doc_id in doc_ids])
).values(last_accessed=timestamp_dt)
await session.execute(stmt)
await session.commit()
logger.debug(f"Updated last_accessed for {len(doc_ids)} Data records in SQL")
except Exception as e:
logger.error(f"Failed to update timestamps: {e}")
raise

View file

@ -0,0 +1,336 @@
"""
Task for automatically deleting unused data from the memify pipeline.
This task identifies and removes data (chunks, entities, summaries) that hasn't
been accessed by retrievers for a specified period, helping maintain system
efficiency and storage optimization.
"""
import json
from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, Any
from uuid import UUID
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Data, DatasetData
from cognee.shared.logging_utils import get_logger
from sqlalchemy import select, or_
import cognee
logger = get_logger(__name__)
async def cleanup_unused_data(
days_threshold: Optional[int],
dry_run: bool = True,
user_id: Optional[UUID] = None,
text_doc: bool = False
) -> Dict[str, Any]:
"""
Identify and remove unused data from the memify pipeline.
Parameters
----------
days_threshold : int
days since last access to consider data unused
dry_run : bool
If True, only report what would be deleted without actually deleting (default: True)
user_id : UUID, optional
Limit cleanup to specific user's data (default: None)
text_doc : bool
If True, use SQL-based filtering to find unused TextDocuments and call cognee.delete()
for proper whole-document deletion (default: False)
Returns
-------
Dict[str, Any]
Cleanup results with status, counts, and timestamp
"""
logger.info(
"Starting cleanup task",
days_threshold=days_threshold,
dry_run=dry_run,
user_id=str(user_id) if user_id else None,
text_doc=text_doc
)
# Calculate cutoff timestamp
cutoff_date = datetime.now(timezone.utc) - timedelta(days=days_threshold)
if text_doc:
# SQL-based approach: Find unused TextDocuments and use cognee.delete()
return await _cleanup_via_sql(cutoff_date, dry_run, user_id)
else:
# Graph-based approach: Find unused nodes directly from graph
cutoff_timestamp_ms = int(cutoff_date.timestamp() * 1000)
logger.debug(f"Cutoff timestamp: {cutoff_date.isoformat()} ({cutoff_timestamp_ms}ms)")
# Find unused nodes
unused_nodes = await _find_unused_nodes(cutoff_timestamp_ms, user_id)
total_unused = sum(len(nodes) for nodes in unused_nodes.values())
logger.info(f"Found {total_unused} unused nodes", unused_nodes={k: len(v) for k, v in unused_nodes.items()})
if dry_run:
return {
"status": "dry_run",
"unused_count": total_unused,
"deleted_count": {
"data_items": 0,
"chunks": 0,
"entities": 0,
"summaries": 0,
"associations": 0
},
"cleanup_date": datetime.now(timezone.utc).isoformat(),
"preview": {
"chunks": len(unused_nodes["DocumentChunk"]),
"entities": len(unused_nodes["Entity"]),
"summaries": len(unused_nodes["TextSummary"])
}
}
# Delete unused nodes
deleted_counts = await _delete_unused_nodes(unused_nodes)
logger.info("Cleanup completed", deleted_counts=deleted_counts)
return {
"status": "completed",
"unused_count": total_unused,
"deleted_count": {
"data_items": 0,
"chunks": deleted_counts["DocumentChunk"],
"entities": deleted_counts["Entity"],
"summaries": deleted_counts["TextSummary"],
"associations": deleted_counts["associations"]
},
"cleanup_date": datetime.now(timezone.utc).isoformat()
}
async def _cleanup_via_sql(
cutoff_date: datetime,
dry_run: bool,
user_id: Optional[UUID] = None
) -> Dict[str, Any]:
"""
SQL-based cleanup: Query Data table for unused documents and use cognee.delete().
Parameters
----------
cutoff_date : datetime
Cutoff date for last_accessed filtering
dry_run : bool
If True, only report what would be deleted
user_id : UUID, optional
Filter by user ID if provided
Returns
-------
Dict[str, Any]
Cleanup results
"""
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
# Query for Data records with old last_accessed timestamps
query = select(Data, DatasetData).join(
DatasetData, Data.id == DatasetData.data_id
).where(
or_(
Data.last_accessed < cutoff_date,
Data.last_accessed.is_(None)
)
)
if user_id:
from cognee.modules.data.models import Dataset
query = query.join(Dataset, DatasetData.dataset_id == Dataset.id).where(
Dataset.owner_id == user_id
)
result = await session.execute(query)
unused_data = result.all()
logger.info(f"Found {len(unused_data)} unused documents in SQL")
if dry_run:
return {
"status": "dry_run",
"unused_count": len(unused_data),
"deleted_count": {
"data_items": 0,
"documents": 0
},
"cleanup_date": datetime.now(timezone.utc).isoformat(),
"preview": {
"documents": len(unused_data)
}
}
# Delete each document using cognee.delete()
deleted_count = 0
from cognee.modules.users.methods import get_default_user
user = await get_default_user() if user_id is None else None
for data, dataset_data in unused_data:
try:
await cognee.delete(
data_id=data.id,
dataset_id=dataset_data.dataset_id,
mode="hard", # Use hard mode to also remove orphaned entities
user=user
)
deleted_count += 1
logger.info(f"Deleted document {data.id} from dataset {dataset_data.dataset_id}")
except Exception as e:
logger.error(f"Failed to delete document {data.id}: {e}")
logger.info("Cleanup completed", deleted_count=deleted_count)
return {
"status": "completed",
"unused_count": len(unused_data),
"deleted_count": {
"data_items": deleted_count,
"documents": deleted_count
},
"cleanup_date": datetime.now(timezone.utc).isoformat()
}
async def _find_unused_nodes(
cutoff_timestamp_ms: int,
user_id: Optional[UUID] = None
) -> Dict[str, list]:
"""
Query Kuzu for nodes with old last_accessed_at timestamps.
Parameters
----------
cutoff_timestamp_ms : int
Cutoff timestamp in milliseconds since epoch
user_id : UUID, optional
Filter by user ID if provided
Returns
-------
Dict[str, list]
Dictionary mapping node types to lists of unused node IDs
"""
graph_engine = await get_graph_engine()
# Query all nodes with their properties
query = "MATCH (n:Node) RETURN n.id, n.type, n.properties"
results = await graph_engine.query(query)
unused_nodes = {
"DocumentChunk": [],
"Entity": [],
"TextSummary": []
}
for node_id, node_type, props_json in results:
# Only process tracked node types
if node_type not in unused_nodes:
continue
# Parse properties JSON
if props_json:
try:
props = json.loads(props_json)
last_accessed = props.get("last_accessed_at")
# Check if node is unused (never accessed or accessed before cutoff)
if last_accessed is None or last_accessed < cutoff_timestamp_ms:
unused_nodes[node_type].append(node_id)
logger.debug(
f"Found unused {node_type}",
node_id=node_id,
last_accessed=last_accessed
)
except json.JSONDecodeError:
logger.warning(f"Failed to parse properties for node {node_id}")
continue
return unused_nodes
async def _delete_unused_nodes(unused_nodes: Dict[str, list]) -> Dict[str, int]:
"""
Delete unused nodes from graph and vector databases.
Parameters
----------
unused_nodes : Dict[str, list]
Dictionary mapping node types to lists of node IDs to delete
Returns
-------
Dict[str, int]
Count of deleted items by type
"""
graph_engine = await get_graph_engine()
vector_engine = get_vector_engine()
deleted_counts = {
"DocumentChunk": 0,
"Entity": 0,
"TextSummary": 0,
"associations": 0
}
# Count associations before deletion
for node_type, node_ids in unused_nodes.items():
if not node_ids:
continue
# Count edges connected to these nodes
for node_id in node_ids:
result = await graph_engine.query(
"MATCH (n:Node {id: $id})-[r:EDGE]-() RETURN count(r)",
{"id": node_id}
)
if result and len(result) > 0:
deleted_counts["associations"] += result[0][0]
# Delete from graph database (uses DETACH DELETE, so edges are automatically removed)
for node_type, node_ids in unused_nodes.items():
if not node_ids:
continue
logger.info(f"Deleting {len(node_ids)} {node_type} nodes from graph database")
# Delete nodes in batches
await graph_engine.delete_nodes(node_ids)
deleted_counts[node_type] = len(node_ids)
# Delete from vector database
vector_collections = {
"DocumentChunk": "DocumentChunk_text",
"Entity": "Entity_name",
"TextSummary": "TextSummary_text"
}
for node_type, collection_name in vector_collections.items():
node_ids = unused_nodes[node_type]
if not node_ids:
continue
logger.info(f"Deleting {len(node_ids)} {node_type} embeddings from vector database")
try:
# Delete from vector collection
if await vector_engine.has_collection(collection_name):
for node_id in node_ids:
try:
await vector_engine.delete(collection_name, {"id": str(node_id)})
except Exception as e:
logger.warning(f"Failed to delete {node_id} from {collection_name}: {e}")
except Exception as e:
logger.error(f"Error deleting from vector collection {collection_name}: {e}")
return deleted_counts