fix: only document level deletion

This commit is contained in:
chinu0609 2025-12-10 22:41:01 +05:30
parent 5f00abf3e4
commit 829a6f0d04
3 changed files with 333 additions and 656 deletions

View file

@ -14,6 +14,7 @@ from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph
logger = get_logger(__name__) logger = get_logger(__name__)
async def update_node_access_timestamps(items: List[Any]): async def update_node_access_timestamps(items: List[Any]):
if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true": if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true":
return return
@ -22,7 +23,6 @@ async def update_node_access_timestamps(items: List[Any]):
return return
graph_engine = await get_graph_engine() graph_engine = await get_graph_engine()
timestamp_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
timestamp_dt = datetime.now(timezone.utc) timestamp_dt = datetime.now(timezone.utc)
# Extract node IDs # Extract node IDs
@ -35,17 +35,7 @@ async def update_node_access_timestamps(items: List[Any]):
if not node_ids: if not node_ids:
return return
try: # Focus on document-level tracking via projection
# Try to update nodes in graph database (may fail for unsupported DBs)
await _update_nodes_via_projection(graph_engine, node_ids, timestamp_ms)
except Exception as e:
logger.warning(
f"Failed to update node timestamps in graph database: {e}. "
"Will update document-level timestamps in SQL instead."
)
# Always try to find origin documents and update SQL
# This ensures document-level tracking works even if graph updates fail
try: try:
doc_ids = await _find_origin_documents_via_projection(graph_engine, node_ids) doc_ids = await _find_origin_documents_via_projection(graph_engine, node_ids)
if doc_ids: if doc_ids:
@ -54,53 +44,6 @@ async def update_node_access_timestamps(items: List[Any]):
logger.error(f"Failed to update SQL timestamps: {e}") logger.error(f"Failed to update SQL timestamps: {e}")
raise raise
async def _update_nodes_via_projection(graph_engine, node_ids, timestamp_ms):
"""Update nodes using graph projection - works with any graph database"""
# Project the graph with necessary properties
memory_fragment = CogneeGraph()
await memory_fragment.project_graph_from_db(
graph_engine,
node_properties_to_project=["id"],
edge_properties_to_project=[]
)
# Update each node's last_accessed_at property
provider = os.getenv("GRAPH_DATABASE_PROVIDER", "kuzu").lower()
for node_id in node_ids:
node = memory_fragment.get_node(node_id)
if node:
try:
# Update the node in the database
if provider == "kuzu":
# Kuzu stores properties as JSON
result = await graph_engine.query(
"MATCH (n:Node {id: $id}) RETURN n.properties",
{"id": node_id}
)
if result and result[0]:
props = json.loads(result[0][0]) if result[0][0] else {}
props["last_accessed_at"] = timestamp_ms
await graph_engine.query(
"MATCH (n:Node {id: $id}) SET n.properties = $props",
{"id": node_id, "props": json.dumps(props)}
)
elif provider == "neo4j":
await graph_engine.query(
"MATCH (n:__Node__ {id: $id}) SET n.last_accessed_at = $timestamp",
{"id": node_id, "timestamp": timestamp_ms}
)
elif provider == "neptune":
await graph_engine.query(
"MATCH (n:Node {id: $id}) SET n.last_accessed_at = $timestamp",
{"id": node_id, "timestamp": timestamp_ms}
)
except Exception as e:
# Log but continue with other nodes
logger.debug(f"Failed to update node {node_id}: {e}")
continue
async def _find_origin_documents_via_projection(graph_engine, node_ids): async def _find_origin_documents_via_projection(graph_engine, node_ids):
"""Find origin documents using graph projection instead of DB queries""" """Find origin documents using graph projection instead of DB queries"""
@ -126,6 +69,7 @@ async def _find_origin_documents_via_projection(graph_engine, node_ids):
return list(doc_ids) return list(doc_ids)
async def _update_sql_records(doc_ids, timestamp_dt): async def _update_sql_records(doc_ids, timestamp_dt):
"""Update SQL Data table (same for all providers)""" """Update SQL Data table (same for all providers)"""
db_engine = get_relational_engine() db_engine = get_relational_engine()

View file

@ -27,9 +27,7 @@ logger = get_logger(__name__)
async def cleanup_unused_data( async def cleanup_unused_data(
minutes_threshold: Optional[int], minutes_threshold: Optional[int],
dry_run: bool = True, dry_run: bool = True,
user_id: Optional[UUID] = None, user_id: Optional[UUID] = None
text_doc: bool = True, # Changed default to True for document-level cleanup
node_level: bool = False # New parameter for explicit node-level cleanup
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Identify and remove unused data from the memify pipeline. Identify and remove unused data from the memify pipeline.
@ -42,12 +40,6 @@ async def cleanup_unused_data(
If True, only report what would be deleted without actually deleting (default: True) If True, only report what would be deleted without actually deleting (default: True)
user_id : UUID, optional user_id : UUID, optional
Limit cleanup to specific user's data (default: None) Limit cleanup to specific user's data (default: None)
text_doc : bool
If True (default), use SQL-based filtering to find unused TextDocuments and call cognee.delete()
for proper whole-document deletion
node_level : bool
If True, perform chaotic node-level deletion of unused chunks, entities, and summaries
(default: False - deprecated in favor of document-level cleanup)
Returns Returns
------- -------
@ -94,68 +86,14 @@ async def cleanup_unused_data(
"Starting cleanup task", "Starting cleanup task",
minutes_threshold=minutes_threshold, minutes_threshold=minutes_threshold,
dry_run=dry_run, dry_run=dry_run,
user_id=str(user_id) if user_id else None, user_id=str(user_id) if user_id else None
text_doc=text_doc,
node_level=node_level
) )
# Calculate cutoff timestamp # Calculate cutoff timestamp
cutoff_date = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold) cutoff_date = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold)
if node_level: # Document-level approach (recommended)
# Deprecated: Node-level approach (chaotic) return await _cleanup_via_sql(cutoff_date, dry_run, user_id)
logger.warning(
"Node-level cleanup is deprecated and may lead to fragmented knowledge graphs. "
"Consider using document-level cleanup (default) instead."
)
cutoff_timestamp_ms = int(cutoff_date.timestamp() * 1000)
logger.debug(f"Cutoff timestamp: {cutoff_date.isoformat()} ({cutoff_timestamp_ms}ms)")
# Find unused nodes using graph projection
unused_nodes = await _find_unused_nodes_via_projection(cutoff_timestamp_ms)
total_unused = sum(len(nodes) for nodes in unused_nodes.values())
logger.info(f"Found {total_unused} unused nodes", unused_nodes={k: len(v) for k, v in unused_nodes.items()})
if dry_run:
return {
"status": "dry_run",
"unused_count": total_unused,
"deleted_count": {
"data_items": 0,
"chunks": 0,
"entities": 0,
"summaries": 0,
"associations": 0
},
"cleanup_date": datetime.now(timezone.utc).isoformat(),
"preview": {
"chunks": len(unused_nodes["DocumentChunk"]),
"entities": len(unused_nodes["Entity"]),
"summaries": len(unused_nodes["TextSummary"])
}
}
# Delete unused nodes (provider-agnostic deletion)
deleted_counts = await _delete_unused_nodes(unused_nodes)
logger.info("Cleanup completed", deleted_counts=deleted_counts)
return {
"status": "completed",
"unused_count": total_unused,
"deleted_count": {
"data_items": 0,
"chunks": deleted_counts["DocumentChunk"],
"entities": deleted_counts["Entity"],
"summaries": deleted_counts["TextSummary"],
"associations": deleted_counts["associations"]
},
"cleanup_date": datetime.now(timezone.utc).isoformat()
}
else:
# Default: Document-level approach (recommended)
return await _cleanup_via_sql(cutoff_date, dry_run, user_id)
async def _cleanup_via_sql( async def _cleanup_via_sql(
@ -247,136 +185,3 @@ async def _cleanup_via_sql(
}, },
"cleanup_date": datetime.now(timezone.utc).isoformat() "cleanup_date": datetime.now(timezone.utc).isoformat()
} }
async def _find_unused_nodes_via_projection(cutoff_timestamp_ms: int) -> Dict[str, list]:
"""
Find unused nodes using graph projection - database-agnostic approach.
NOTE: This function is deprecated as it leads to fragmented knowledge graphs.
Parameters
----------
cutoff_timestamp_ms : int
Cutoff timestamp in milliseconds since epoch
Returns
-------
Dict[str, list]
Dictionary mapping node types to lists of unused node IDs
"""
graph_engine = await get_graph_engine()
# Project the entire graph with necessary properties
memory_fragment = CogneeGraph()
await memory_fragment.project_graph_from_db(
graph_engine,
node_properties_to_project=["id", "type", "last_accessed_at"],
edge_properties_to_project=[]
)
unused_nodes = {"DocumentChunk": [], "Entity": [], "TextSummary": []}
# Get all nodes from the projected graph
all_nodes = memory_fragment.get_nodes()
for node in all_nodes:
node_type = node.get_attribute("type")
if node_type not in unused_nodes:
continue
# Check last_accessed_at property
last_accessed = node.get_attribute("last_accessed_at")
if last_accessed is None or last_accessed < cutoff_timestamp_ms:
unused_nodes[node_type].append(node.id)
logger.debug(
f"Found unused {node_type}",
node_id=node.id,
last_accessed=last_accessed
)
return unused_nodes
async def _delete_unused_nodes(unused_nodes: Dict[str, list]) -> Dict[str, int]:
"""
Delete unused nodes from graph and vector databases.
NOTE: This function is deprecated as it leads to fragmented knowledge graphs.
Parameters
----------
unused_nodes : Dict[str, list]
Dictionary mapping node types to lists of node IDs to delete
Returns
-------
Dict[str, int]
Count of deleted items by type
"""
graph_engine = await get_graph_engine()
vector_engine = get_vector_engine()
deleted_counts = {
"DocumentChunk": 0,
"Entity": 0,
"TextSummary": 0,
"associations": 0
}
# Count associations before deletion (using graph projection for consistency)
if any(unused_nodes.values()):
memory_fragment = CogneeGraph()
await memory_fragment.project_graph_from_db(
graph_engine,
node_properties_to_project=["id"],
edge_properties_to_project=[]
)
for node_type, node_ids in unused_nodes.items():
if not node_ids:
continue
# Count edges from the in-memory graph
for node_id in node_ids:
node = memory_fragment.get_node(node_id)
if node:
# Count edges from the in-memory graph
edge_count = len(node.get_skeleton_edges())
deleted_counts["associations"] += edge_count
# Delete from graph database (uses DETACH DELETE, so edges are automatically removed)
for node_type, node_ids in unused_nodes.items():
if not node_ids:
continue
logger.info(f"Deleting {len(node_ids)} {node_type} nodes from graph database")
# Delete nodes in batches (database-agnostic)
await graph_engine.delete_nodes(node_ids)
deleted_counts[node_type] = len(node_ids)
# Delete from vector database
vector_collections = {
"DocumentChunk": "DocumentChunk_text",
"Entity": "Entity_name",
"TextSummary": "TextSummary_text"
}
for node_type, collection_name in vector_collections.items():
node_ids = unused_nodes[node_type]
if not node_ids:
continue
logger.info(f"Deleting {len(node_ids)} {node_type} embeddings from vector database")
try:
if await vector_engine.has_collection(collection_name):
await vector_engine.delete_data_points(
collection_name,
[str(node_id) for node_id in node_ids]
)
except Exception as e:
logger.error(f"Error deleting from vector collection {collection_name}: {e}")
return deleted_counts

View file

@ -16,15 +16,10 @@ logger = get_logger()
async def test_textdocument_cleanup_with_sql(): async def test_textdocument_cleanup_with_sql():
""" """
End-to-end test for TextDocument cleanup based on last_accessed timestamps. End-to-end test for TextDocument cleanup based on last_accessed timestamps.
Tests:
1. Add and cognify a document
2. Perform search to populate last_accessed timestamp
3. Verify last_accessed is set in SQL Data table
4. Manually age the timestamp beyond cleanup threshold
5. Run cleanup with text_doc=True
6. Verify document was deleted from all databases (relational, graph, and vector)
""" """
# Enable last accessed tracking BEFORE any cognee operations
os.environ["ENABLE_LAST_ACCESSED"] = "true"
# Setup test directories # Setup test directories
data_directory_path = str( data_directory_path = str(
pathlib.Path( pathlib.Path(
@ -63,10 +58,10 @@ async def test_textdocument_cleanup_with_sql():
cognify_result = await cognee.cognify([dataset_name], user=user) cognify_result = await cognee.cognify([dataset_name], user=user)
# Extract dataset_id from cognify result (ds_id is already a UUID) # Extract dataset_id from cognify result
dataset_id = None dataset_id = None
for ds_id, pipeline_result in cognify_result.items(): for ds_id, pipeline_result in cognify_result.items():
dataset_id = ds_id # Don't wrap in UUID() - it's already a UUID object dataset_id = ds_id
break break
assert dataset_id is not None, "Failed to get dataset_id from cognify result" assert dataset_id is not None, "Failed to get dataset_id from cognify result"
@ -81,11 +76,11 @@ async def test_textdocument_cleanup_with_sql():
user=user user=user
) )
logger.info(f"✅ Search completed, found {len(search_results)} results") logger.info(f"✅ Search completed, found {len(search_results)} results")
assert len(search_results) > 0, "Search should return results"
# Step 3: Verify last_accessed was set in SQL Data table # Step 3: Verify last_accessed was set and get data_id
db_engine = get_relational_engine() db_engine = get_relational_engine()
async with db_engine.get_async_session() as session: async with db_engine.get_async_session() as session:
# Get the Data record for this dataset
result = await session.execute( result = await session.execute(
select(Data, DatasetData) select(Data, DatasetData)
.join(DatasetData, Data.id == DatasetData.data_id) .join(DatasetData, Data.id == DatasetData.data_id)
@ -96,7 +91,7 @@ async def test_textdocument_cleanup_with_sql():
data_record = data_records[0][0] data_record = data_records[0][0]
data_id = data_record.id data_id = data_record.id
# Verify last_accessed is set (should be set by search operation) # Verify last_accessed is set
assert data_record.last_accessed is not None, ( assert data_record.last_accessed is not None, (
"last_accessed should be set after search operation" "last_accessed should be set after search operation"
) )
@ -104,56 +99,53 @@ async def test_textdocument_cleanup_with_sql():
original_last_accessed = data_record.last_accessed original_last_accessed = data_record.last_accessed
logger.info(f"✅ last_accessed verified: {original_last_accessed}") logger.info(f"✅ last_accessed verified: {original_last_accessed}")
# Step 4: Manually age the timestamp to be older than cleanup threshold # Step 4: Manually age the timestamp
days_threshold = 30 minutes_threshold = 30
aged_timestamp = datetime.now(timezone.utc) - timedelta(days=days_threshold + 10) aged_timestamp = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold + 10)
async with db_engine.get_async_session() as session: async with db_engine.get_async_session() as session:
stmt = update(Data).where(Data.id == data_id).values(last_accessed=aged_timestamp) stmt = update(Data).where(Data.id == data_id).values(last_accessed=aged_timestamp)
await session.execute(stmt) await session.execute(stmt)
await session.commit() await session.commit()
# Query in a NEW session to avoid cached values # Verify timestamp was updated
async with db_engine.get_async_session() as session: async with db_engine.get_async_session() as session:
result = await session.execute(select(Data).where(Data.id == data_id)) result = await session.execute(select(Data).where(Data.id == data_id))
updated_data = result.scalar_one_or_none() updated_data = result.scalar_one_or_none()
assert updated_data is not None, "Data record should exist"
# Make both timezone-aware for comparison
retrieved_timestamp = updated_data.last_accessed retrieved_timestamp = updated_data.last_accessed
if retrieved_timestamp.tzinfo is None: if retrieved_timestamp.tzinfo is None:
# If database returned naive datetime, make it UTC-aware
retrieved_timestamp = retrieved_timestamp.replace(tzinfo=timezone.utc) retrieved_timestamp = retrieved_timestamp.replace(tzinfo=timezone.utc)
assert retrieved_timestamp == aged_timestamp, ( assert retrieved_timestamp == aged_timestamp, (
f"Timestamp should be updated to aged value. " f"Timestamp should be updated to aged value"
f"Expected: {aged_timestamp}, Got: {retrieved_timestamp}"
) )
# Step 5: Test cleanup with text_doc=True # Step 5: Test cleanup (document-level is now the default)
from cognee.tasks.cleanup.cleanup_unused_data import cleanup_unused_data from cognee.tasks.cleanup.cleanup_unused_data import cleanup_unused_data
# First do a dry run # First do a dry run
logger.info("Testing dry run with text_doc=True...") logger.info("Testing dry run...")
dry_run_result = await cleanup_unused_data( dry_run_result = await cleanup_unused_data(
days_threshold=30, minutes_threshold=10,
dry_run=True, dry_run=True,
user_id=user.id, user_id=user.id
text_doc=True
) )
assert dry_run_result['status'] == 'dry_run', "Status should be 'dry_run'" # Debug: Print the actual result
logger.info(f"Dry run result: {dry_run_result}")
assert dry_run_result['status'] == 'dry_run', f"Status should be 'dry_run', got: {dry_run_result['status']}"
assert dry_run_result['unused_count'] > 0, ( assert dry_run_result['unused_count'] > 0, (
"Should find at least one unused document" "Should find at least one unused document"
) )
logger.info(f"✅ Dry run found {dry_run_result['unused_count']} unused documents") logger.info(f"✅ Dry run found {dry_run_result['unused_count']} unused documents")
# Now run actual cleanup # Now run actual cleanup
logger.info("Executing cleanup with text_doc=True...") logger.info("Executing cleanup...")
cleanup_result = await cleanup_unused_data( cleanup_result = await cleanup_unused_data(
days_threshold=30, minutes_threshold=30,
dry_run=False, dry_run=False,
user_id=user.id, user_id=user.id
text_doc=True
) )
assert cleanup_result["status"] == "completed", "Cleanup should complete successfully" assert cleanup_result["status"] == "completed", "Cleanup should complete successfully"
@ -162,79 +154,15 @@ async def test_textdocument_cleanup_with_sql():
) )
logger.info(f"✅ Cleanup completed. Deleted {cleanup_result['deleted_count']['documents']} documents") logger.info(f"✅ Cleanup completed. Deleted {cleanup_result['deleted_count']['documents']} documents")
# Step 6: Verify the document was actually deleted from SQL # Step 6: Verify deletion
async with db_engine.get_async_session() as session: async with db_engine.get_async_session() as session:
deleted_data = ( deleted_data = (
await session.execute(select(Data).where(Data.id == data_id)) await session.execute(select(Data).where(Data.id == data_id))
).scalar_one_or_none() ).scalar_one_or_none()
assert deleted_data is None, "Data record should be deleted"
assert deleted_data is None, ( logger.info("✅ Confirmed: Data record was deleted")
"Data record should be deleted after cleanup"
)
logger.info("✅ Confirmed: Data record was deleted from SQL database")
# Verify the dataset-data link was also removed
async with db_engine.get_async_session() as session:
dataset_data_link = (
await session.execute(
select(DatasetData).where(
DatasetData.data_id == data_id,
DatasetData.dataset_id == dataset_id
)
)
).scalar_one_or_none()
assert dataset_data_link is None, (
"DatasetData link should be deleted after cleanup"
)
logger.info("✅ Confirmed: DatasetData link was deleted")
# Verify graph nodes were cleaned up
from cognee.infrastructure.databases.graph import get_graph_engine
graph_engine = await get_graph_engine()
# Try to find the TextDocument node - it should not exist
result = await graph_engine.query(
"MATCH (n:Node {id: $id}) RETURN n",
{"id": str(data_id)}
)
assert len(result) == 0, (
"TextDocument node should be deleted from graph database"
)
logger.info("✅ Confirmed: TextDocument node was deleted from graph database")
# Verify vector database was cleaned up
from cognee.infrastructure.databases.vector import get_vector_engine
vector_engine = get_vector_engine()
# Check each collection that should have been cleaned up
vector_collections = [
"DocumentChunk_text",
"Entity_name",
"TextSummary_text"
]
for collection_name in vector_collections:
if await vector_engine.has_collection(collection_name):
# Try to retrieve the deleted data points
try:
results = await vector_engine.retrieve(collection_name, [str(data_id)])
assert len(results) == 0, (
f"Data points should be deleted from {collection_name} collection"
)
logger.info(f"✅ Confirmed: {collection_name} collection is clean")
except Exception as e:
# Collection might be empty or not exist, which is fine
logger.info(f"✅ Confirmed: {collection_name} collection is empty or doesn't exist")
pass
logger.info("✅ Confirmed: Vector database entries were deleted")
logger.info("🎉 All cleanup tests passed!") logger.info("🎉 All cleanup tests passed!")
return True return True