fix: fallback and document deletion

This commit is contained in:
chinu0609 2025-12-02 22:25:03 +05:30
parent 6a4d31356b
commit 5f00abf3e4
2 changed files with 68 additions and 46 deletions

View file

@ -36,16 +36,22 @@ async def update_node_access_timestamps(items: List[Any]):
return
try:
# Update nodes using graph projection ( database-agnostic approach
# Try to update nodes in graph database (may fail for unsupported DBs)
await _update_nodes_via_projection(graph_engine, node_ids, timestamp_ms)
except Exception as e:
logger.warning(
f"Failed to update node timestamps in graph database: {e}. "
"Will update document-level timestamps in SQL instead."
)
# Find origin documents and update SQL
# Always try to find origin documents and update SQL
# This ensures document-level tracking works even if graph updates fail
try:
doc_ids = await _find_origin_documents_via_projection(graph_engine, node_ids)
if doc_ids:
await _update_sql_records(doc_ids, timestamp_dt)
except Exception as e:
logger.error(f"Failed to update timestamps: {e}")
logger.error(f"Failed to update SQL timestamps: {e}")
raise
async def _update_nodes_via_projection(graph_engine, node_ids, timestamp_ms):
@ -59,37 +65,42 @@ async def _update_nodes_via_projection(graph_engine, node_ids, timestamp_ms):
)
# Update each node's last_accessed_at property
provider = os.getenv("GRAPH_DATABASE_PROVIDER", "kuzu").lower()
for node_id in node_ids:
node = memory_fragment.get_node(node_id)
if node:
# Update the node in the database
provider = os.getenv("GRAPH_DATABASE_PROVIDER", "kuzu").lower()
if provider == "kuzu":
# Kuzu stores properties as JSON
result = await graph_engine.query(
"MATCH (n:Node {id: $id}) RETURN n.properties",
{"id": node_id}
)
if result and result[0]:
props = json.loads(result[0][0]) if result[0][0] else {}
props["last_accessed_at"] = timestamp_ms
await graph_engine.query(
"MATCH (n:Node {id: $id}) SET n.properties = $props",
{"id": node_id, "props": json.dumps(props)}
try:
# Update the node in the database
if provider == "kuzu":
# Kuzu stores properties as JSON
result = await graph_engine.query(
"MATCH (n:Node {id: $id}) RETURN n.properties",
{"id": node_id}
)
elif provider == "neo4j":
await graph_engine.query(
"MATCH (n:__Node__ {id: $id}) SET n.last_accessed_at = $timestamp",
{"id": node_id, "timestamp": timestamp_ms}
)
elif provider == "neptune":
await graph_engine.query(
"MATCH (n:Node {id: $id}) SET n.last_accessed_at = $timestamp",
{"id": node_id, "timestamp": timestamp_ms}
)
if result and result[0]:
props = json.loads(result[0][0]) if result[0][0] else {}
props["last_accessed_at"] = timestamp_ms
await graph_engine.query(
"MATCH (n:Node {id: $id}) SET n.properties = $props",
{"id": node_id, "props": json.dumps(props)}
)
elif provider == "neo4j":
await graph_engine.query(
"MATCH (n:__Node__ {id: $id}) SET n.last_accessed_at = $timestamp",
{"id": node_id, "timestamp": timestamp_ms}
)
elif provider == "neptune":
await graph_engine.query(
"MATCH (n:Node {id: $id}) SET n.last_accessed_at = $timestamp",
{"id": node_id, "timestamp": timestamp_ms}
)
except Exception as e:
# Log but continue with other nodes
logger.debug(f"Failed to update node {node_id}: {e}")
continue
async def _find_origin_documents_via_projection(graph_engine, node_ids):
"""Find origin documents using graph projection instead of DB queries"""

View file

@ -1,9 +1,9 @@
"""
Task for automatically deleting unused data from the memify pipeline.
This task identifies and removes data (chunks, entities, summaries)) that hasn't
This task identifies and removes entire documents that haven't
been accessed by retrievers for a specified period, helping maintain system
efficiency and storage optimization.
efficiency and storage optimization through whole-document removal.
"""
import json
@ -28,22 +28,26 @@ async def cleanup_unused_data(
minutes_threshold: Optional[int],
dry_run: bool = True,
user_id: Optional[UUID] = None,
text_doc: bool = False
text_doc: bool = True, # Changed default to True for document-level cleanup
node_level: bool = False # New parameter for explicit node-level cleanup
) -> Dict[str, Any]:
"""
Identify and remove unused data from the memify pipeline.
Parameters
----------
minutes_threshold : int
days since last access to consider data unused
Minutes since last access to consider data unused
dry_run : bool
If True, only report what would be delete without actually deleting (default: True)
If True, only report what would be deleted without actually deleting (default: True)
user_id : UUID, optional
Limit cleanup to specific user's data (default: None)
text_doc : bool
If True, use SQL-based filtering to find unused TextDocuments and call cognee.delete()
for proper whole-document deletion (default: False)
If True (default), use SQL-based filtering to find unused TextDocuments and call cognee.delete()
for proper whole-document deletion
node_level : bool
If True, perform chaotic node-level deletion of unused chunks, entities, and summaries
(default: False - deprecated in favor of document-level cleanup)
Returns
-------
@ -91,17 +95,19 @@ async def cleanup_unused_data(
minutes_threshold=minutes_threshold,
dry_run=dry_run,
user_id=str(user_id) if user_id else None,
text_doc=text_doc
text_doc=text_doc,
node_level=node_level
)
# Calculate cutoff timestamp
cutoff_date = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold)
if text_doc:
# SQL-based approach: Find unused TextDocuments and use cognee.delete()
return await _cleanup_via_sql(cutoff_date, dry_run, user_id)
else:
# Graph-based approach: Find unused nodes using projection (database-agnostic)
if node_level:
# Deprecated: Node-level approach (chaotic)
logger.warning(
"Node-level cleanup is deprecated and may lead to fragmented knowledge graphs. "
"Consider using document-level cleanup (default) instead."
)
cutoff_timestamp_ms = int(cutoff_date.timestamp() * 1000)
logger.debug(f"Cutoff timestamp: {cutoff_date.isoformat()} ({cutoff_timestamp_ms}ms)")
@ -147,6 +153,9 @@ async def cleanup_unused_data(
},
"cleanup_date": datetime.now(timezone.utc).isoformat()
}
else:
# Default: Document-level approach (recommended)
return await _cleanup_via_sql(cutoff_date, dry_run, user_id)
async def _cleanup_via_sql(
@ -243,6 +252,7 @@ async def _cleanup_via_sql(
async def _find_unused_nodes_via_projection(cutoff_timestamp_ms: int) -> Dict[str, list]:
"""
Find unused nodes using graph projection - database-agnostic approach.
NOTE: This function is deprecated as it leads to fragmented knowledge graphs.
Parameters
----------
@ -291,6 +301,7 @@ async def _find_unused_nodes_via_projection(cutoff_timestamp_ms: int) -> Dict[st
async def _delete_unused_nodes(unused_nodes: Dict[str, list]) -> Dict[str, int]:
"""
Delete unused nodes from graph and vector databases.
NOTE: This function is deprecated as it leads to fragmented knowledge graphs.
Parameters
----------
@ -325,7 +336,7 @@ async def _delete_unused_nodes(unused_nodes: Dict[str, list]) -> Dict[str, int]:
if not node_ids:
continue
# Count edges connected to these nodes
# Count edges from the in-memory graph
for node_id in node_ids:
node = memory_fragment.get_node(node_id)
if node: