fix: using graph projection instead of conditions

This commit is contained in:
chinu0609 2025-12-02 18:55:47 +05:30
parent 12ce80005c
commit 6a4d31356b
2 changed files with 418 additions and 497 deletions

View file

@ -4,118 +4,116 @@ import json
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import List, Any from typing import List, Any
from uuid import UUID from uuid import UUID
import os import os
from cognee.infrastructure.databases.graph import get_graph_engine from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.databases.relational import get_relational_engine from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Data from cognee.modules.data.models import Data
from cognee.shared.logging_utils import get_logger from cognee.shared.logging_utils import get_logger
from sqlalchemy import update from sqlalchemy import update
from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph
logger = get_logger(__name__) logger = get_logger(__name__)
async def update_node_access_timestamps(items: List[Any]): async def update_node_access_timestamps(items: List[Any]):
if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true": if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true":
return return
if not items: if not items:
return return
graph_engine = await get_graph_engine() graph_engine = await get_graph_engine()
timestamp_ms = int(datetime.now(timezone.utc).timestamp() * 1000) timestamp_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
timestamp_dt = datetime.now(timezone.utc) timestamp_dt = datetime.now(timezone.utc)
# Extract node IDs # Extract node IDs
node_ids = [] node_ids = []
for item in items: for item in items:
item_id = item.payload.get("id") if hasattr(item, 'payload') else item.get("id") item_id = item.payload.get("id") if hasattr(item, 'payload') else item.get("id")
if item_id: if item_id:
node_ids.append(str(item_id)) node_ids.append(str(item_id))
if not node_ids: if not node_ids:
return return
try:
# Detect database provider and use appropriate queries
provider = os.getenv("GRAPH_DATABASE_PROVIDER", "kuzu").lower()
if provider == "kuzu": try:
await _update_kuzu_nodes(graph_engine, node_ids, timestamp_ms) # Update nodes using graph projection ( database-agnostic approach
elif provider == "neo4j": await _update_nodes_via_projection(graph_engine, node_ids, timestamp_ms)
await _update_neo4j_nodes(graph_engine, node_ids, timestamp_ms)
elif provider == "neptune":
await _update_neptune_nodes(graph_engine, node_ids, timestamp_ms)
else:
logger.warning(f"Unsupported graph provider: {provider}")
return
# Find origin documents and update SQL # Find origin documents and update SQL
doc_ids = await _find_origin_documents(graph_engine, node_ids, provider) doc_ids = await _find_origin_documents_via_projection(graph_engine, node_ids)
if doc_ids: if doc_ids:
await _update_sql_records(doc_ids, timestamp_dt) await _update_sql_records(doc_ids, timestamp_dt)
except Exception as e: except Exception as e:
logger.error(f"Failed to update timestamps: {e}") logger.error(f"Failed to update timestamps: {e}")
raise raise
async def _update_kuzu_nodes(graph_engine, node_ids, timestamp_ms): async def _update_nodes_via_projection(graph_engine, node_ids, timestamp_ms):
"""Kuzu-specific node updates""" """Update nodes using graph projection - works with any graph database"""
for node_id in node_ids: # Project the graph with necessary properties
result = await graph_engine.query( memory_fragment = CogneeGraph()
"MATCH (n:Node {id: $id}) RETURN n.properties", await memory_fragment.project_graph_from_db(
{"id": node_id} graph_engine,
) node_properties_to_project=["id"],
edge_properties_to_project=[]
if result and result[0]: )
props = json.loads(result[0][0]) if result[0][0] else {}
props["last_accessed_at"] = timestamp_ms
await graph_engine.query(
"MATCH (n:Node {id: $id}) SET n.properties = $props",
{"id": node_id, "props": json.dumps(props)}
)
async def _update_neo4j_nodes(graph_engine, node_ids, timestamp_ms):
"""Neo4j-specific node updates"""
for node_id in node_ids:
await graph_engine.query(
"MATCH (n:__Node__ {id: $id}) SET n.last_accessed_at = $timestamp",
{"id": node_id, "timestamp": timestamp_ms}
)
async def _update_neptune_nodes(graph_engine, node_ids, timestamp_ms):
"""Neptune-specific node updates"""
for node_id in node_ids:
await graph_engine.query(
"MATCH (n:Node {id: $id}) SET n.last_accessed_at = $timestamp",
{"id": node_id, "timestamp": timestamp_ms}
)
async def _find_origin_documents(graph_engine, node_ids, provider):
"""Find origin documents with provider-specific queries"""
if provider == "kuzu":
query = """
UNWIND $node_ids AS node_id
MATCH (chunk:Node {id: node_id})-[e:EDGE]-(doc:Node)
WHERE chunk.type = 'DocumentChunk' AND doc.type IN ['TextDocument', 'Document']
RETURN DISTINCT doc.id
"""
elif provider == "neo4j":
query = """
UNWIND $node_ids AS node_id
MATCH (chunk:__Node__ {id: node_id})-[e:EDGE]-(doc:__Node__)
WHERE chunk.type = 'DocumentChunk' AND doc.type IN ['TextDocument', 'Document']
RETURN DISTINCT doc.id
"""
elif provider == "neptune":
query = """
UNWIND $node_ids AS node_id
MATCH (chunk:Node {id: node_id})-[e:EDGE]-(doc:Node)
WHERE chunk.type = 'DocumentChunk' AND doc.type IN ['TextDocument', 'Document']
RETURN DISTINCT doc.id
"""
result = await graph_engine.query(query, {"node_ids": node_ids}) # Update each node's last_accessed_at property
return list(set([row[0] for row in result if row and row[0]])) if result else [] for node_id in node_ids:
node = memory_fragment.get_node(node_id)
if node:
# Update the node in the database
provider = os.getenv("GRAPH_DATABASE_PROVIDER", "kuzu").lower()
if provider == "kuzu":
# Kuzu stores properties as JSON
result = await graph_engine.query(
"MATCH (n:Node {id: $id}) RETURN n.properties",
{"id": node_id}
)
if result and result[0]:
props = json.loads(result[0][0]) if result[0][0] else {}
props["last_accessed_at"] = timestamp_ms
await graph_engine.query(
"MATCH (n:Node {id: $id}) SET n.properties = $props",
{"id": node_id, "props": json.dumps(props)}
)
elif provider == "neo4j":
await graph_engine.query(
"MATCH (n:__Node__ {id: $id}) SET n.last_accessed_at = $timestamp",
{"id": node_id, "timestamp": timestamp_ms}
)
elif provider == "neptune":
await graph_engine.query(
"MATCH (n:Node {id: $id}) SET n.last_accessed_at = $timestamp",
{"id": node_id, "timestamp": timestamp_ms}
)
async def _find_origin_documents_via_projection(graph_engine, node_ids):
"""Find origin documents using graph projection instead of DB queries"""
# Project the entire graph with necessary properties
memory_fragment = CogneeGraph()
await memory_fragment.project_graph_from_db(
graph_engine,
node_properties_to_project=["id", "type"],
edge_properties_to_project=["relationship_name"]
)
# Find origin documents by traversing the in-memory graph
doc_ids = set()
for node_id in node_ids:
node = memory_fragment.get_node(node_id)
if node and node.get_attribute("type") == "DocumentChunk":
# Traverse edges to find connected documents
for edge in node.get_skeleton_edges():
# Get the neighbor node
neighbor = edge.get_destination_node() if edge.get_source_node().id == node_id else edge.get_source_node()
if neighbor and neighbor.get_attribute("type") in ["TextDocument", "Document"]:
doc_ids.add(neighbor.id)
return list(doc_ids)
async def _update_sql_records(doc_ids, timestamp_dt): async def _update_sql_records(doc_ids, timestamp_dt):
"""Update SQL Data table (same for all providers)""" """Update SQL Data table (same for all providers)"""
@ -124,6 +122,6 @@ async def _update_sql_records(doc_ids, timestamp_dt):
stmt = update(Data).where( stmt = update(Data).where(
Data.id.in_([UUID(doc_id) for doc_id in doc_ids]) Data.id.in_([UUID(doc_id) for doc_id in doc_ids])
).values(last_accessed=timestamp_dt) ).values(last_accessed=timestamp_dt)
await session.execute(stmt) await session.execute(stmt)
await session.commit() await session.commit()

View file

@ -1,448 +1,371 @@
""" """
Task for automatically deleting unused data from the memify pipeline. Task for automatically deleting unused data from the memify pipeline.
This task identifies and removes data (chunks, entities, summaries)) that hasn't This task identifies and removes data (chunks, entities, summaries)) that hasn't
been accessed by retrievers for a specified period, helping maintain system been accessed by retrievers for a specified period, helping maintain system
efficiency and storage optimization. efficiency and storage optimization.
""" """
import json import json
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, Any from typing import Optional, Dict, Any
from uuid import UUID from uuid import UUID
import os import os
from cognee.infrastructure.databases.graph import get_graph_engine from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.databases.vector import get_vector_engine from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.infrastructure.databases.relational import get_relational_engine from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Data, DatasetData from cognee.modules.data.models import Data, DatasetData
from cognee.shared.logging_utils import get_logger from cognee.shared.logging_utils import get_logger
from sqlalchemy import select, or_ from sqlalchemy import select, or_
import cognee import cognee
import sqlalchemy as sa import sqlalchemy as sa
from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph
logger = get_logger(__name__)
logger = get_logger(__name__)
async def cleanup_unused_data(
minutes_threshold: Optional[int],
dry_run: bool = True,
user_id: Optional[UUID] = None,
text_doc: bool = False
) -> Dict[str, Any]:
"""
Identify and remove unused data from the memify pipeline.
Parameters
----------
minutes_threshold : int
days since last access to consider data unused
dry_run : bool
If True, only report what would be delete without actually deleting (default: True)
user_id : UUID, optional
Limit cleanup to specific user's data (default: None)
text_doc : bool
If True, use SQL-based filtering to find unused TextDocuments and call cognee.delete()
for proper whole-document deletion (default: False)
Returns
-------
Dict[str, Any]
Cleanup results with status, counts, and timestamp
"""
# Check 1: Environment variable must be enabled
if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true":
logger.warning(
"Cleanup skipped: ENABLE_LAST_ACCESSED is not enabled."
)
return {
"status": "skipped",
"reason": "ENABLE_LAST_ACCESSED not enabled",
"unused_count": 0,
"deleted_count": {},
"cleanup_date": datetime.now(timezone.utc).isoformat()
}
# Check 2: Verify tracking has actually been running
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
# Count records with non-NULL last_accessed
tracked_count = await session.execute(
select(sa.func.count(Data.id)).where(Data.last_accessed.isnot(None))
)
tracked_records = tracked_count.scalar()
if tracked_records == 0:
logger.warning(
"Cleanup skipped: No records have been tracked yet. "
"ENABLE_LAST_ACCESSED may have been recently enabled. "
"Wait for retrievers to update timestamps before running cleanup."
)
return {
"status": "skipped",
"reason": "No tracked records found - tracking may be newly enabled",
"unused_count": 0,
"deleted_count": {},
"cleanup_date": datetime.now(timezone.utc).isoformat()
}
logger.info(
"Starting cleanup task",
minutes_threshold=minutes_threshold,
dry_run=dry_run,
user_id=str(user_id) if user_id else None,
text_doc=text_doc
)
# Calculate cutoff timestamp
cutoff_date = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold)
if text_doc:
# SQL-based approach: Find unused TextDocuments and use cognee.delete()
return await _cleanup_via_sql(cutoff_date, dry_run, user_id)
else:
# Graph-based approach: Find unused nodes using projection (database-agnostic)
cutoff_timestamp_ms = int(cutoff_date.timestamp() * 1000)
logger.debug(f"Cutoff timestamp: {cutoff_date.isoformat()} ({cutoff_timestamp_ms}ms)")
# Find unused nodes using graph projection
unused_nodes = await _find_unused_nodes_via_projection(cutoff_timestamp_ms)
total_unused = sum(len(nodes) for nodes in unused_nodes.values())
logger.info(f"Found {total_unused} unused nodes", unused_nodes={k: len(v) for k, v in unused_nodes.items()})
if dry_run:
return {
"status": "dry_run",
"unused_count": total_unused,
"deleted_count": {
"data_items": 0,
"chunks": 0,
"entities": 0,
"summaries": 0,
"associations": 0
},
"cleanup_date": datetime.now(timezone.utc).isoformat(),
"preview": {
"chunks": len(unused_nodes["DocumentChunk"]),
"entities": len(unused_nodes["Entity"]),
"summaries": len(unused_nodes["TextSummary"])
}
}
# Delete unused nodes (provider-agnostic deletion)
deleted_counts = await _delete_unused_nodes(unused_nodes)
logger.info("Cleanup completed", deleted_counts=deleted_counts)
return {
"status": "completed",
"unused_count": total_unused,
"deleted_count": {
"data_items": 0,
"chunks": deleted_counts["DocumentChunk"],
"entities": deleted_counts["Entity"],
"summaries": deleted_counts["TextSummary"],
"associations": deleted_counts["associations"]
},
"cleanup_date": datetime.now(timezone.utc).isoformat()
}
async def cleanup_unused_data( async def _cleanup_via_sql(
minutes_threshold: Optional[int], cutoff_date: datetime,
dry_run: bool = True, dry_run: bool,
user_id: Optional[UUID] = None, user_id: Optional[UUID] = None
text_doc: bool = False
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Identify and remove unused data from the memify pipeline. SQL-based cleanup: Query Data table for unused documents and use cognee.delete().
Parameters Parameters
---------- ----------
minutes_threshold : int cutoff_date : datetime
days since last access to consider data unused Cutoff date for last_accessed filtering
dry_run : bool dry_run : bool
If True, only report what would be delete without actually deleting (default: True) If True, only report what would be deleted
user_id : UUID, optional user_id : UUID, optional
Limit cleanup to specific user's data (default: None) Filter by user ID if provided
text_doc : bool
If True, use SQL-based filtering to find unused TextDocuments and call cognee.delete()
for proper whole-document deletion (default: False)
Returns Returns
------- -------
Dict[str, Any] Dict[str, Any]
Cleanup results with status, counts, and timestamp Cleanup results
""" """
# Check 1: Environment variable must be enabled db_engine = get_relational_engine()
if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true":
logger.warning(
"Cleanup skipped: ENABLE_LAST_ACCESSED is not enabled."
)
return {
"status": "skipped",
"reason": "ENABLE_LAST_ACCESSED not enabled",
"unused_count": 0,
"deleted_count": {},
"cleanup_date": datetime.now(timezone.utc).isoformat()
}
# Check 2: Verify tracking has actually been running
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
# Count records with non-NULL last_accessed
tracked_count = await session.execute(
select(sa.func.count(Data.id)).where(Data.last_accessed.isnot(None))
)
tracked_records = tracked_count.scalar()
if tracked_records == 0:
logger.warning(
"Cleanup skipped: No records have been tracked yet. "
"ENABLE_LAST_ACCESSED may have been recently enabled. "
"Wait for retrievers to update timestamps before running cleanup."
)
return {
"status": "skipped",
"reason": "No tracked records found - tracking may be newly enabled",
"unused_count": 0,
"deleted_count": {},
"cleanup_date": datetime.now(timezone.utc).isoformat()
}
logger.info(
"Starting cleanup task",
minutes_threshold=minutes_threshold,
dry_run=dry_run,
user_id=str(user_id) if user_id else None,
text_doc=text_doc
)
# Calculate cutoff timestamp async with db_engine.get_async_session() as session:
cutoff_date = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold) # Query for Data records with old last_accessed timestamps
query = select(Data, DatasetData).join(
if text_doc: DatasetData, Data.id == DatasetData.data_id
# SQL-based approach: Find unused TextDocuments and use cognee.delete() ).where(
return await _cleanup_via_sql(cutoff_date, dry_run, user_id) or_(
else: Data.last_accessed < cutoff_date,
# Graph-based approach: Find unused nodes directly from graph Data.last_accessed.is_(None)
cutoff_timestamp_ms = int(cutoff_date.timestamp() * 1000) )
logger.debug(f"Cutoff timestamp: {cutoff_date.isoformat()} ({cutoff_timestamp_ms}ms)") )
# Detect database provider and find unused nodes if user_id:
provider = os.getenv("GRAPH_DATABASE_PROVIDER", "kuzu").lower() from cognee.modules.data.models import Dataset
unused_nodes = await _find_unused_nodes(cutoff_timestamp_ms, user_id, provider) query = query.join(Dataset, DatasetData.dataset_id == Dataset.id).where(
Dataset.owner_id == user_id
total_unused = sum(len(nodes) for nodes in unused_nodes.values()) )
logger.info(f"Found {total_unused} unused nodes", unused_nodes={k: len(v) for k, v in unused_nodes.items()})
if dry_run:
return {
"status": "dry_run",
"unused_count": total_unused,
"deleted_count": {
"data_items": 0,
"chunks": 0,
"entities": 0,
"summaries": 0,
"associations": 0
},
"cleanup_date": datetime.now(timezone.utc).isoformat(),
"preview": {
"chunks": len(unused_nodes["DocumentChunk"]),
"entities": len(unused_nodes["Entity"]),
"summaries": len(unused_nodes["TextSummary"])
}
}
# Delete unused nodes with provider-specific logic
deleted_counts = await _delete_unused_nodes(unused_nodes, provider)
logger.info("Cleanup completed", deleted_counts=deleted_counts)
result = await session.execute(query)
unused_data = result.all()
logger.info(f"Found {len(unused_data)} unused documents in SQL")
if dry_run:
return { return {
"status": "completed", "status": "dry_run",
"unused_count": total_unused, "unused_count": len(unused_data),
"deleted_count": { "deleted_count": {
"data_items": 0, "data_items": 0,
"chunks": deleted_counts["DocumentChunk"], "documents": 0
"entities": deleted_counts["Entity"],
"summaries": deleted_counts["TextSummary"],
"associations": deleted_counts["associations"]
}, },
"cleanup_date": datetime.now(timezone.utc).isoformat() "cleanup_date": datetime.now(timezone.utc).isoformat(),
} "preview": {
"documents": len(unused_data)
}
async def _cleanup_via_sql( }
cutoff_date: datetime,
dry_run: bool, # Delete each document using cognee.delete()
user_id: Optional[UUID] = None deleted_count = 0
) -> Dict[str, Any]: from cognee.modules.users.methods import get_default_user
""" user = await get_default_user() if user_id is None else None
SQL-based cleanup: Query Data table for unused documents and use cognee.delete().
for data, dataset_data in unused_data:
try:
await cognee.delete(
data_id=data.id,
dataset_id=dataset_data.dataset_id,
mode="hard", # Use hard mode to also remove orphaned entities
user=user
)
deleted_count += 1
logger.info(f"Deleted document {data.id} from dataset {dataset_data.dataset_id}")
except Exception as e:
logger.error(f"Failed to delete document {data.id}: {e}")
logger.info("Cleanup completed", deleted_count=deleted_count)
return {
"status": "completed",
"unused_count": len(unused_data),
"deleted_count": {
"data_items": deleted_count,
"documents": deleted_count
},
"cleanup_date": datetime.now(timezone.utc).isoformat()
}
Parameters
----------
cutoff_date : datetime
Cutoff date for last_accessed filtering
dry_run : bool
If True, only report what would be deleted
user_id : UUID, optional
Filter by user ID if provided
Returns async def _find_unused_nodes_via_projection(cutoff_timestamp_ms: int) -> Dict[str, list]:
------- """
Dict[str, Any] Find unused nodes using graph projection - database-agnostic approach.
Cleanup results
""" Parameters
db_engine = get_relational_engine() ----------
cutoff_timestamp_ms : int
Cutoff timestamp in milliseconds since epoch
Returns
-------
Dict[str, list]
Dictionary mapping node types to lists of unused node IDs
"""
graph_engine = await get_graph_engine()
# Project the entire graph with necessary properties
memory_fragment = CogneeGraph()
await memory_fragment.project_graph_from_db(
graph_engine,
node_properties_to_project=["id", "type", "last_accessed_at"],
edge_properties_to_project=[]
)
async with db_engine.get_async_session() as session: unused_nodes = {"DocumentChunk": [], "Entity": [], "TextSummary": []}
# Query for Data records with old last_accessed timestamps
query = select(Data, DatasetData).join( # Get all nodes from the projected graph
DatasetData, Data.id == DatasetData.data_id all_nodes = memory_fragment.get_nodes()
).where(
or_( for node in all_nodes:
Data.last_accessed < cutoff_date, node_type = node.get_attribute("type")
Data.last_accessed.is_(None) if node_type not in unused_nodes:
continue
# Check last_accessed_at property
last_accessed = node.get_attribute("last_accessed_at")
if last_accessed is None or last_accessed < cutoff_timestamp_ms:
unused_nodes[node_type].append(node.id)
logger.debug(
f"Found unused {node_type}",
node_id=node.id,
last_accessed=last_accessed
) )
return unused_nodes
async def _delete_unused_nodes(unused_nodes: Dict[str, list]) -> Dict[str, int]:
"""
Delete unused nodes from graph and vector databases.
Parameters
----------
unused_nodes : Dict[str, list]
Dictionary mapping node types to lists of node IDs to delete
Returns
-------
Dict[str, int]
Count of deleted items by type
"""
graph_engine = await get_graph_engine()
vector_engine = get_vector_engine()
deleted_counts = {
"DocumentChunk": 0,
"Entity": 0,
"TextSummary": 0,
"associations": 0
}
# Count associations before deletion (using graph projection for consistency)
if any(unused_nodes.values()):
memory_fragment = CogneeGraph()
await memory_fragment.project_graph_from_db(
graph_engine,
node_properties_to_project=["id"],
edge_properties_to_project=[]
) )
if user_id: for node_type, node_ids in unused_nodes.items():
from cognee.modules.data.models import Dataset if not node_ids:
query = query.join(Dataset, DatasetData.dataset_id == Dataset.id).where( continue
Dataset.owner_id == user_id
) # Count edges connected to these nodes
for node_id in node_ids:
node = memory_fragment.get_node(node_id)
if node:
# Count edges from the in-memory graph
edge_count = len(node.get_skeleton_edges())
deleted_counts["associations"] += edge_count
# Delete from graph database (uses DETACH DELETE, so edges are automatically removed)
for node_type, node_ids in unused_nodes.items():
if not node_ids:
continue
logger.info(f"Deleting {len(node_ids)} {node_type} nodes from graph database")
# Delete nodes in batches (database-agnostic)
await graph_engine.delete_nodes(node_ids)
deleted_counts[node_type] = len(node_ids)
result = await session.execute(query) # Delete from vector database
unused_data = result.all() vector_collections = {
"DocumentChunk": "DocumentChunk_text",
logger.info(f"Found {len(unused_data)} unused documents in SQL") "Entity": "Entity_name",
"TextSummary": "TextSummary_text"
if dry_run: }
return {
"status": "dry_run",
"unused_count": len(unused_data), for node_type, collection_name in vector_collections.items():
"deleted_count": { node_ids = unused_nodes[node_type]
"data_items": 0, if not node_ids:
"documents": 0 continue
},
"cleanup_date": datetime.now(timezone.utc).isoformat(), logger.info(f"Deleting {len(node_ids)} {node_type} embeddings from vector database")
"preview": {
"documents": len(unused_data)
}
}
# Delete each document using cognee.delete()
deleted_count = 0
from cognee.modules.users.methods import get_default_user
user = await get_default_user() if user_id is None else None
for data, dataset_data in unused_data:
try: try:
await cognee.delete( if await vector_engine.has_collection(collection_name):
data_id=data.id, await vector_engine.delete_data_points(
dataset_id=dataset_data.dataset_id, collection_name,
mode="hard", # Use hard mode to also remove orphaned entities [str(node_id) for node_id in node_ids]
user=user )
)
deleted_count += 1
logger.info(f"Deleted document {data.id} from dataset {dataset_data.dataset_id}")
except Exception as e: except Exception as e:
logger.error(f"Failed to delete document {data.id}: {e}") logger.error(f"Error deleting from vector collection {collection_name}: {e}")
logger.info("Cleanup completed", deleted_count=deleted_count)
return {
"status": "completed",
"unused_count": len(unused_data),
"deleted_count": {
"data_items": deleted_count,
"documents": deleted_count
},
"cleanup_date": datetime.now(timezone.utc).isoformat()
}
async def _find_unused_nodes(
cutoff_timestamp_ms: int,
user_id: Optional[UUID] = None,
provider: str = "kuzu"
) -> Dict[str, list]:
"""
Find unused nodes with provider-specific queries.
Parameters
----------
cutoff_timestamp_ms : int
Cutoff timestamp in milliseconds since epoch
user_id : UUID, optional
Filter by user ID if provided
provider : str
Graph database provider (kuzu, neo4j, neptune)
Returns
-------
Dict[str, list]
Dictionary mapping node types to lists of unused node IDs
"""
graph_engine = await get_graph_engine()
if provider == "kuzu":
return await _find_unused_nodes_kuzu(graph_engine, cutoff_timestamp_ms)
elif provider == "neo4j":
return await _find_unused_nodes_neo4j(graph_engine, cutoff_timestamp_ms)
elif provider == "neptune":
return await _find_unused_nodes_neptune(graph_engine, cutoff_timestamp_ms)
else:
logger.warning(f"Unsupported graph provider: {provider}")
return {"DocumentChunk": [], "Entity": [], "TextSummary": []}
async def _find_unused_nodes_kuzu(graph_engine, cutoff_timestamp_ms):
"""Kuzu-specific unused node detection"""
query = "MATCH (n:Node) RETURN n.id, n.type, n.properties"
results = await graph_engine.query(query)
unused_nodes = {"DocumentChunk": [], "Entity": [], "TextSummary": []}
for node_id, node_type, props_json in results:
if node_type not in unused_nodes:
continue
if props_json:
try:
props = json.loads(props_json)
last_accessed = props.get("last_accessed_at")
if last_accessed is None or last_accessed < cutoff_timestamp_ms:
unused_nodes[node_type].append(node_id)
logger.debug(
f"Found unused {node_type}",
node_id=node_id,
last_accessed=last_accessed
)
except json.JSONDecodeError:
logger.warning(f"Failed to parse properties for node {node_id}")
continue
return unused_nodes
async def _find_unused_nodes_neo4j(graph_engine, cutoff_timestamp_ms):
"""Neo4j-specific unused node detection"""
query = "MATCH (n:__Node__) RETURN n.id, n.type, n.last_accessed_at"
results = await graph_engine.query(query)
unused_nodes = {"DocumentChunk": [], "Entity": [], "TextSummary": []}
for row in results:
node_id = row["n"]["id"]
node_type = row["n"]["type"]
last_accessed = row["n"].get("last_accessed_at")
if node_type not in unused_nodes:
continue
if last_accessed is None or last_accessed < cutoff_timestamp_ms:
unused_nodes[node_type].append(node_id)
logger.debug(
f"Found unused {node_type}",
node_id=node_id,
last_accessed=last_accessed
)
return unused_nodes
async def _find_unused_nodes_neptune(graph_engine, cutoff_timestamp_ms):
"""Neptune-specific unused node detection"""
query = "MATCH (n:Node) RETURN n.id, n.type, n.last_accessed_at"
results = await graph_engine.query(query)
unused_nodes = {"DocumentChunk": [], "Entity": [], "TextSummary": []}
for row in results:
node_id = row["n"]["id"]
node_type = row["n"]["type"]
last_accessed = row["n"].get("last_accessed_at")
if node_type not in unused_nodes:
continue
if last_accessed is None or last_accessed < cutoff_timestamp_ms:
unused_nodes[node_type].append(node_id)
logger.debug(
f"Found unused {node_type}",
node_id=node_id,
last_accessed=last_accessed
)
return unused_nodes
async def _delete_unused_nodes(unused_nodes: Dict[str, list], provider: str) -> Dict[str, int]:
"""
Delete unused nodes from graph and vector databases.
Parameters
----------
unused_nodes : Dict[str, list]
Dictionary mapping node types to lists of node IDs to delete
provider : str
Graph database provider (kuzu, neo4j, neptune)
Returns
-------
Dict[str, int]
Count of deleted items by type
"""
graph_engine = await get_graph_engine()
vector_engine = get_vector_engine()
deleted_counts = {
"DocumentChunk": 0,
"Entity": 0,
"TextSummary": 0,
"associations": 0
}
# Count associations before deletion
for node_type, node_ids in unused_nodes.items():
if not node_ids:
continue
# Count edges connected to these nodes
for node_id in node_ids:
if provider == "kuzu":
result = await graph_engine.query(
"MATCH (n:Node {id: $id})-[r:EDGE]-() RETURN count(r)",
{"id": node_id}
)
elif provider == "neo4j":
result = await graph_engine.query(
"MATCH (n:__Node__ {id: $id})-[r:EDGE]-() RETURN count(r)",
{"id": node_id}
)
elif provider == "neptune":
result = await graph_engine.query(
"MATCH (n:Node {id: $id})-[r:EDGE]-() RETURN count(r)",
{"id": node_id}
)
if result and len(result) > 0:
count = result[0][0] if provider == "kuzu" else result[0]["count_count(r)"]
deleted_counts["associations"] += count
# Delete from graph database (uses DETACH DELETE, so edges are automatically removed)
for node_type, node_ids in unused_nodes.items():
if not node_ids:
continue
logger.info(f"Deleting {len(node_ids)} {node_type} nodes from graph database")
# Delete nodes in batches
await graph_engine.delete_nodes(node_ids)
deleted_counts[node_type] = len(node_ids)
# Delete from vector database
vector_collections = {
"DocumentChunk": "DocumentChunk_text",
"Entity": "Entity_name",
"TextSummary": "TextSummary_text"
}
for node_type, collection_name in vector_collections.items():
node_ids = unused_nodes[node_type]
if not node_ids:
continue
logger.info(f"Deleting {len(node_ids)} {node_type} embeddings from vector database")
try:
if await vector_engine.has_collection(collection_name):
await vector_engine.delete_data_points(
collection_name,
[str(node_id) for node_id in node_ids]
)
except Exception as e:
logger.error(f"Error deleting from vector collection {collection_name}: {e}")
return deleted_counts return deleted_counts