fix: add text_doc flag

This commit is contained in:
chinu0609 2025-11-05 18:32:49 +05:30
parent 3c0e915812
commit 9041a804ec

View file

@ -13,7 +13,11 @@ from uuid import UUID
from cognee.infrastructure.databases.graph import get_graph_engine from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.databases.vector import get_vector_engine from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Data, DatasetData
from cognee.shared.logging_utils import get_logger from cognee.shared.logging_utils import get_logger
from sqlalchemy import select, or_
import cognee
logger = get_logger(__name__) logger = get_logger(__name__)
@ -21,7 +25,8 @@ logger = get_logger(__name__)
async def cleanup_unused_data( async def cleanup_unused_data(
minutes_threshold: int = 30, minutes_threshold: int = 30,
dry_run: bool = True, dry_run: bool = True,
user_id: Optional[UUID] = None user_id: Optional[UUID] = None,
text_doc: bool = False
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Identify and remove unused data from the memify pipeline. Identify and remove unused data from the memify pipeline.
@ -34,6 +39,9 @@ async def cleanup_unused_data(
If True, only report what would be deleted without actually deleting (default: True) If True, only report what would be deleted without actually deleting (default: True)
user_id : UUID, optional user_id : UUID, optional
Limit cleanup to specific user's data (default: None) Limit cleanup to specific user's data (default: None)
text_doc : bool
If True, use SQL-based filtering to find unused TextDocuments and call cognee.delete()
for proper whole-document deletion (default: False)
Returns Returns
------- -------
@ -44,54 +52,151 @@ async def cleanup_unused_data(
"Starting cleanup task", "Starting cleanup task",
minutes_threshold=minutes_threshold, minutes_threshold=minutes_threshold,
dry_run=dry_run, dry_run=dry_run,
user_id=str(user_id) if user_id else None user_id=str(user_id) if user_id else None,
text_doc=text_doc
) )
# Calculate cutoff timestamp in milliseconds # Calculate cutoff timestamp
cutoff_date = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold) cutoff_date = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold)
cutoff_timestamp_ms = int(cutoff_date.timestamp() * 1000)
logger.debug(f"Cutoff timestamp: {cutoff_date.isoformat()} ({cutoff_timestamp_ms}ms)") if text_doc:
# SQL-based approach: Find unused TextDocuments and use cognee.delete()
return await _cleanup_via_sql(cutoff_date, dry_run, user_id)
else:
# Graph-based approach: Find unused nodes directly from graph
cutoff_timestamp_ms = int(cutoff_date.timestamp() * 1000)
logger.debug(f"Cutoff timestamp: {cutoff_date.isoformat()} ({cutoff_timestamp_ms}ms)")
# Find unused nodes # Find unused nodes
unused_nodes = await _find_unused_nodes(cutoff_timestamp_ms, user_id) unused_nodes = await _find_unused_nodes(cutoff_timestamp_ms, user_id)
total_unused = sum(len(nodes) for nodes in unused_nodes.values()) total_unused = sum(len(nodes) for nodes in unused_nodes.values())
logger.info(f"Found {total_unused} unused nodes", unused_nodes={k: len(v) for k, v in unused_nodes.items()}) logger.info(f"Found {total_unused} unused nodes", unused_nodes={k: len(v) for k, v in unused_nodes.items()})
if dry_run:
return {
"status": "dry_run",
"unused_count": total_unused,
"deleted_count": {
"data_items": 0,
"chunks": 0,
"entities": 0,
"summaries": 0,
"associations": 0
},
"cleanup_date": datetime.now(timezone.utc).isoformat(),
"preview": {
"chunks": len(unused_nodes["DocumentChunk"]),
"entities": len(unused_nodes["Entity"]),
"summaries": len(unused_nodes["TextSummary"])
}
}
# Delete unused nodes
deleted_counts = await _delete_unused_nodes(unused_nodes)
logger.info("Cleanup completed", deleted_counts=deleted_counts)
return {
"status": "completed",
"unused_count": total_unused,
"deleted_count": {
"data_items": 0,
"chunks": deleted_counts["DocumentChunk"],
"entities": deleted_counts["Entity"],
"summaries": deleted_counts["TextSummary"],
"associations": deleted_counts["associations"]
},
"cleanup_date": datetime.now(timezone.utc).isoformat()
}
async def _cleanup_via_sql(
cutoff_date: datetime,
dry_run: bool,
user_id: Optional[UUID] = None
) -> Dict[str, Any]:
"""
SQL-based cleanup: Query Data table for unused documents and use cognee.delete().
Parameters
----------
cutoff_date : datetime
Cutoff date for last_accessed filtering
dry_run : bool
If True, only report what would be deleted
user_id : UUID, optional
Filter by user ID if provided
Returns
-------
Dict[str, Any]
Cleanup results
"""
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
# Query for Data records with old last_accessed timestamps
query = select(Data, DatasetData).join(
DatasetData, Data.id == DatasetData.data_id
).where(
or_(
Data.last_accessed < cutoff_date,
Data.last_accessed.is_(None)
)
)
if user_id:
from cognee.modules.data.models import Dataset
query = query.join(Dataset, DatasetData.dataset_id == Dataset.id).where(
Dataset.owner_id == user_id
)
result = await session.execute(query)
unused_data = result.all()
logger.info(f"Found {len(unused_data)} unused documents in SQL")
if dry_run: if dry_run:
return { return {
"status": "dry_run", "status": "dry_run",
"unused_count": total_unused, "unused_count": len(unused_data),
"deleted_count": { "deleted_count": {
"data_items": 0, "data_items": 0,
"chunks": 0, "documents": 0
"entities": 0,
"summaries": 0,
"associations": 0
}, },
"cleanup_date": datetime.now(timezone.utc).isoformat(), "cleanup_date": datetime.now(timezone.utc).isoformat(),
"preview": { "preview": {
"chunks": len(unused_nodes["DocumentChunk"]), "documents": len(unused_data)
"entities": len(unused_nodes["Entity"]),
"summaries": len(unused_nodes["TextSummary"])
} }
} }
# Delete unused nodes # Delete each document using cognee.delete()
deleted_counts = await _delete_unused_nodes(unused_nodes) deleted_count = 0
from cognee.modules.users.methods import get_default_user
user = await get_default_user() if user_id is None else None
logger.info("Cleanup completed", deleted_counts=deleted_counts) for data, dataset_data in unused_data:
try:
await cognee.delete(
data_id=data.id,
dataset_id=dataset_data.dataset_id,
mode="hard", # Use hard mode to also remove orphaned entities
user=user
)
deleted_count += 1
logger.info(f"Deleted document {data.id} from dataset {dataset_data.dataset_id}")
except Exception as e:
logger.error(f"Failed to delete document {data.id}: {e}")
logger.info("Cleanup completed", deleted_count=deleted_count)
return { return {
"status": "completed", "status": "completed",
"unused_count": total_unused, "unused_count": len(unused_data),
"deleted_count": { "deleted_count": {
"data_items": 0, "data_items": deleted_count,
"chunks": deleted_counts["DocumentChunk"], "documents": deleted_count
"entities": deleted_counts["Entity"],
"summaries": deleted_counts["TextSummary"],
"associations": deleted_counts["associations"]
}, },
"cleanup_date": datetime.now(timezone.utc).isoformat() "cleanup_date": datetime.now(timezone.utc).isoformat()
} }
@ -141,7 +246,6 @@ async def _find_unused_nodes(
# Check if node is unused (never accessed or accessed before cutoff) # Check if node is unused (never accessed or accessed before cutoff)
if last_accessed is None or last_accessed < cutoff_timestamp_ms: if last_accessed is None or last_accessed < cutoff_timestamp_ms:
# TODO: Add user_id filtering when user ownership is implemented
unused_nodes[node_type].append(node_id) unused_nodes[node_type].append(node_id)
logger.debug( logger.debug(
f"Found unused {node_type}", f"Found unused {node_type}",