feat: Add a task that deletes the old data that has not been accessed in a while (#1751)
<!-- .github/pull_request_template.md --> ## Description This PR implements a data deletion system for unused DataPoint models based on last access tracking. The system tracks when data is accessed during search operations and provides cleanup functionality to remove data that hasn't been accessed within a configurable time threshold. **Key Changes:** 1. Added `last_accessed` timestamp field to the SQL `Data` model 2. Added `last_accessed_at` timestamp field to the graph `DataPoint` model 3. Implemented `update_node_access_timestamps()` function that updates both graph nodes and SQL records during search operations 4. Created `cleanup_unused_data()` function with SQL-based deletion mode for whole document cleanup 5. Added Alembic migration to add `last_accessed` column to the `data` table 6. Integrated timestamp tracking into in retrievers 7. Added comprehensive end-to-end test for the cleanup functionality ## Related Issues Fixes #[issue_number] ## Type of Change - [x] New feature (non-breaking change that adds functionality) - [ ] Bug fix (non-breaking change that fixes an issue) - [ ] Breaking change (fix or feature that would cause existing functionality to change) - [ ] Documentation update - [ ] Code refactoring - [ ] Performance improvement ## Database Changes - [x] This PR includes database schema changes - [x] Alembic migration included: `add_last_accessed_to_data` - [x] Migration adds `last_accessed` column to `data` table - [x] Migration includes backward compatibility (nullable column) - [x] Migration tested locally ## Implementation Details ### Files Modified: 1. **cognee/modules/data/models/Data.py** - Added `last_accessed` column 2. **cognee/infrastructure/engine/models/DataPoint.py** - Added `last_accessed_at` field 3. **cognee/modules/retrieval/chunks_retriever.py** - Integrated timestamp tracking in `get_context()` 4. **cognee/modules/retrieval/utils/update_node_access_timestamps.py** (new file) - Core tracking logic 5. **cognee/tasks/cleanup/cleanup_unused_data.py** (new file) - Cleanup implementation 6. **alembic/versions/[revision]_add_last_accessed_to_data.py** (new file) - Database migration 7. **cognee/tests/test_cleanup_unused_data.py** (new file) - End-to-end test ### Key Functions: - `update_node_access_timestamps(items)` - Updates timestamps in both graph and SQL - `cleanup_unused_data(minutes_threshold, dry_run, text_doc)` - Main cleanup function - SQL-based cleanup mode uses `cognee.delete()` for proper document deletion ## Testing - [x] Added end-to-end test: `test_textdocument_cleanup_with_sql()` - [x] Test covers: add → cognify → search → timestamp verification → aging → cleanup → deletion verification - [x] Test verifies cleanup across all storage systems (SQL, graph, vector) - [x] All existing tests pass - [x] Manual testing completed ## Screenshots/Videos N/A - Backend functionality ## Pre-submission Checklist - [x] **I have tested my changes thoroughly before submitting this PR** - [x] **This PR contains minimal changes necessary to address the issue/feature** - [x] My code follows the project's coding standards and style guidelines - [x] I have added tests that prove my fix is effective or that my feature works - [x] I have added necessary documentation (if applicable) - [x] All new and existing tests pass - [x] I have searched existing PRs to ensure this change hasn't been submitted already - [x] I have linked any relevant issues in the description - [x] My commits have clear and descriptive messages ## Breaking Changes None - This is a new feature that doesn't affect existing functionality. ## DCO Affirmation I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin. Resolves #1335 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Added access timestamp tracking to monitor when data is last retrieved. * Introduced automatic cleanup of unused data based on configurable time thresholds and access history. * Retrieval operations now update access timestamps to ensure accurate tracking of data usage. * **Tests** * Added integration test validating end-to-end cleanup workflow across storage layers. <sub>✏️ Tip: You can customize this high-level summary in your review settings.</sub> <!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
commit
eb444ca18f
14 changed files with 525 additions and 13 deletions
52
alembic/versions/e1ec1dcb50b6_add_last_accessed_to_data.py
Normal file
52
alembic/versions/e1ec1dcb50b6_add_last_accessed_to_data.py
Normal file
|
|
@ -0,0 +1,52 @@
|
|||
"""add_last_accessed_to_data
|
||||
|
||||
Revision ID: e1ec1dcb50b6
|
||||
Revises: 211ab850ef3d
|
||||
Create Date: 2025-11-04 21:45:52.642322
|
||||
|
||||
"""
|
||||
import os
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = 'e1ec1dcb50b6'
|
||||
down_revision: Union[str, None] = '211ab850ef3d'
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def _get_column(inspector, table, name, schema=None):
|
||||
for col in inspector.get_columns(table, schema=schema):
|
||||
if col["name"] == name:
|
||||
return col
|
||||
return None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
conn = op.get_bind()
|
||||
insp = sa.inspect(conn)
|
||||
|
||||
last_accessed_column = _get_column(insp, "data", "last_accessed")
|
||||
if not last_accessed_column:
|
||||
# Always create the column for schema consistency
|
||||
op.add_column('data',
|
||||
sa.Column('last_accessed', sa.DateTime(timezone=True), nullable=True)
|
||||
)
|
||||
|
||||
# Only initialize existing records if feature is enabled
|
||||
enable_last_accessed = os.getenv("ENABLE_LAST_ACCESSED", "false").lower() == "true"
|
||||
if enable_last_accessed:
|
||||
op.execute("UPDATE data SET last_accessed = CURRENT_TIMESTAMP")
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
conn = op.get_bind()
|
||||
insp = sa.inspect(conn)
|
||||
|
||||
last_accessed_column = _get_column(insp, "data", "last_accessed")
|
||||
if last_accessed_column:
|
||||
op.drop_column('data', 'last_accessed')
|
||||
|
|
@ -33,5 +33,4 @@ class DocumentChunk(DataPoint):
|
|||
cut_type: str
|
||||
is_part_of: Document
|
||||
contains: List[Union[Entity, Event, tuple[Edge, Entity]]] = None
|
||||
|
||||
metadata: dict = {"index_fields": ["text"]}
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ class Data(Base):
|
|||
data_size = Column(Integer, nullable=True) # File size in bytes
|
||||
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
|
||||
updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc))
|
||||
last_accessed = Column(DateTime(timezone=True), nullable=True)
|
||||
|
||||
datasets = relationship(
|
||||
"Dataset",
|
||||
|
|
|
|||
|
|
@ -2,10 +2,8 @@ from cognee.infrastructure.engine import DataPoint
|
|||
from cognee.modules.engine.models.EntityType import EntityType
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class Entity(DataPoint):
|
||||
name: str
|
||||
is_a: Optional[EntityType] = None
|
||||
description: str
|
||||
|
||||
metadata: dict = {"index_fields": ["name"]}
|
||||
|
|
|
|||
|
|
@ -5,3 +5,4 @@ from .retrieve_existing_edges import retrieve_existing_edges
|
|||
from .convert_node_to_data_point import convert_node_to_data_point
|
||||
from .deduplicate_nodes_and_edges import deduplicate_nodes_and_edges
|
||||
from .resolve_edges_to_text import resolve_edges_to_text
|
||||
from .get_entity_nodes_from_triplets import get_entity_nodes_from_triplets
|
||||
|
|
|
|||
13
cognee/modules/graph/utils/get_entity_nodes_from_triplets.py
Normal file
13
cognee/modules/graph/utils/get_entity_nodes_from_triplets.py
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
|
||||
def get_entity_nodes_from_triplets(triplets):
|
||||
entity_nodes = []
|
||||
seen_ids = set()
|
||||
for triplet in triplets:
|
||||
if hasattr(triplet, 'node1') and triplet.node1 and triplet.node1.id not in seen_ids:
|
||||
entity_nodes.append({"id": str(triplet.node1.id)})
|
||||
seen_ids.add(triplet.node1.id)
|
||||
if hasattr(triplet, 'node2') and triplet.node2 and triplet.node2.id not in seen_ids:
|
||||
entity_nodes.append({"id": str(triplet.node2.id)})
|
||||
seen_ids.add(triplet.node2.id)
|
||||
|
||||
return entity_nodes
|
||||
|
|
@ -1,10 +1,11 @@
|
|||
from typing import Any, Optional
|
||||
|
||||
from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
from cognee.infrastructure.databases.vector import get_vector_engine
|
||||
from cognee.modules.retrieval.base_retriever import BaseRetriever
|
||||
from cognee.modules.retrieval.exceptions.exceptions import NoDataError
|
||||
from cognee.infrastructure.databases.vector.exceptions.exceptions import CollectionNotFoundError
|
||||
from datetime import datetime, timezone
|
||||
|
||||
logger = get_logger("ChunksRetriever")
|
||||
|
||||
|
|
@ -27,21 +28,16 @@ class ChunksRetriever(BaseRetriever):
|
|||
):
|
||||
self.top_k = top_k
|
||||
|
||||
async def get_context(self, query: str) -> Any:
|
||||
async def get_context(self, query: str) -> Any:
|
||||
"""
|
||||
Retrieves document chunks context based on the query.
|
||||
|
||||
Searches for document chunks relevant to the specified query using a vector engine.
|
||||
Raises a NoDataError if no data is found in the system.
|
||||
|
||||
Parameters:
|
||||
-----------
|
||||
|
||||
- query (str): The query string to search for relevant document chunks.
|
||||
|
||||
Returns:
|
||||
--------
|
||||
|
||||
- Any: A list of document chunk payloads retrieved from the search.
|
||||
"""
|
||||
logger.info(
|
||||
|
|
@ -53,6 +49,8 @@ class ChunksRetriever(BaseRetriever):
|
|||
try:
|
||||
found_chunks = await vector_engine.search("DocumentChunk_text", query, limit=self.top_k)
|
||||
logger.info(f"Found {len(found_chunks)} chunks from vector search")
|
||||
await update_node_access_timestamps(found_chunks)
|
||||
|
||||
except CollectionNotFoundError as error:
|
||||
logger.error("DocumentChunk_text collection not found in vector database")
|
||||
raise NoDataError("No data found in the system, please add data first.") from error
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ from cognee.modules.retrieval.utils.session_cache import (
|
|||
save_conversation_history,
|
||||
get_conversation_history,
|
||||
)
|
||||
from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps
|
||||
from cognee.modules.retrieval.base_retriever import BaseRetriever
|
||||
from cognee.modules.retrieval.exceptions.exceptions import NoDataError
|
||||
from cognee.infrastructure.databases.vector.exceptions import CollectionNotFoundError
|
||||
|
|
@ -65,7 +66,7 @@ class CompletionRetriever(BaseRetriever):
|
|||
|
||||
if len(found_chunks) == 0:
|
||||
return ""
|
||||
|
||||
await update_node_access_timestamps(found_chunks)
|
||||
# Combine all chunks text returned from vector search (number of chunks is determined by top_k
|
||||
chunks_payload = [found_chunk.payload["text"] for found_chunk in found_chunks]
|
||||
combined_context = "\n".join(chunks_payload)
|
||||
|
|
|
|||
|
|
@ -16,11 +16,13 @@ from cognee.modules.retrieval.utils.session_cache import (
|
|||
)
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
from cognee.modules.retrieval.utils.extract_uuid_from_node import extract_uuid_from_node
|
||||
from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps
|
||||
from cognee.modules.retrieval.utils.models import CogneeUserInteraction
|
||||
from cognee.modules.engine.models.node_set import NodeSet
|
||||
from cognee.infrastructure.databases.graph import get_graph_engine
|
||||
from cognee.context_global_variables import session_user
|
||||
from cognee.infrastructure.databases.cache.config import CacheConfig
|
||||
from cognee.modules.graph.utils import get_entity_nodes_from_triplets
|
||||
|
||||
logger = get_logger("GraphCompletionRetriever")
|
||||
|
||||
|
|
@ -145,6 +147,9 @@ class GraphCompletionRetriever(BaseGraphRetriever):
|
|||
|
||||
# context = await self.resolve_edges_to_text(triplets)
|
||||
|
||||
entity_nodes = get_entity_nodes_from_triplets(triplets)
|
||||
|
||||
await update_node_access_timestamps(entity_nodes)
|
||||
return triplets
|
||||
|
||||
async def convert_retrieved_objects_to_context(self, triplets: List[Edge]):
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ from cognee.shared.logging_utils import get_logger
|
|||
from cognee.infrastructure.databases.vector import get_vector_engine
|
||||
from cognee.modules.retrieval.base_retriever import BaseRetriever
|
||||
from cognee.modules.retrieval.exceptions.exceptions import NoDataError
|
||||
from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps
|
||||
from cognee.infrastructure.databases.vector.exceptions.exceptions import CollectionNotFoundError
|
||||
|
||||
logger = get_logger("SummariesRetriever")
|
||||
|
|
@ -54,6 +55,9 @@ class SummariesRetriever(BaseRetriever):
|
|||
"TextSummary_text", query, limit=self.top_k
|
||||
)
|
||||
logger.info(f"Found {len(summaries_results)} summaries from vector search")
|
||||
|
||||
await update_node_access_timestamps(summaries_results)
|
||||
|
||||
except CollectionNotFoundError as error:
|
||||
logger.error("TextSummary_text collection not found in vector database")
|
||||
raise NoDataError("No data found in the system, please add data first.") from error
|
||||
|
|
|
|||
82
cognee/modules/retrieval/utils/access_tracking.py
Normal file
82
cognee/modules/retrieval/utils/access_tracking.py
Normal file
|
|
@ -0,0 +1,82 @@
|
|||
"""Utilities for tracking data access in retrievers."""
|
||||
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from typing import List, Any
|
||||
from uuid import UUID
|
||||
import os
|
||||
from cognee.infrastructure.databases.graph import get_graph_engine
|
||||
from cognee.infrastructure.databases.relational import get_relational_engine
|
||||
from cognee.modules.data.models import Data
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
from sqlalchemy import update
|
||||
from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
async def update_node_access_timestamps(items: List[Any]):
|
||||
if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true":
|
||||
return
|
||||
|
||||
if not items:
|
||||
return
|
||||
|
||||
graph_engine = await get_graph_engine()
|
||||
timestamp_dt = datetime.now(timezone.utc)
|
||||
|
||||
# Extract node IDs
|
||||
node_ids = []
|
||||
for item in items:
|
||||
item_id = item.payload.get("id") if hasattr(item, 'payload') else item.get("id")
|
||||
if item_id:
|
||||
node_ids.append(str(item_id))
|
||||
|
||||
if not node_ids:
|
||||
return
|
||||
|
||||
# Focus on document-level tracking via projection
|
||||
try:
|
||||
doc_ids = await _find_origin_documents_via_projection(graph_engine, node_ids)
|
||||
if doc_ids:
|
||||
await _update_sql_records(doc_ids, timestamp_dt)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to update SQL timestamps: {e}")
|
||||
raise
|
||||
|
||||
|
||||
async def _find_origin_documents_via_projection(graph_engine, node_ids):
|
||||
"""Find origin documents using graph projection instead of DB queries"""
|
||||
# Project the entire graph with necessary properties
|
||||
memory_fragment = CogneeGraph()
|
||||
await memory_fragment.project_graph_from_db(
|
||||
graph_engine,
|
||||
node_properties_to_project=["id", "type"],
|
||||
edge_properties_to_project=["relationship_name"]
|
||||
)
|
||||
|
||||
# Find origin documents by traversing the in-memory graph
|
||||
doc_ids = set()
|
||||
for node_id in node_ids:
|
||||
node = memory_fragment.get_node(node_id)
|
||||
if node and node.get_attribute("type") == "DocumentChunk":
|
||||
# Traverse edges to find connected documents
|
||||
for edge in node.get_skeleton_edges():
|
||||
# Get the neighbor node
|
||||
neighbor = edge.get_destination_node() if edge.get_source_node().id == node_id else edge.get_source_node()
|
||||
if neighbor and neighbor.get_attribute("type") in ["TextDocument", "Document"]:
|
||||
doc_ids.add(neighbor.id)
|
||||
|
||||
return list(doc_ids)
|
||||
|
||||
|
||||
async def _update_sql_records(doc_ids, timestamp_dt):
|
||||
"""Update SQL Data table (same for all providers)"""
|
||||
db_engine = get_relational_engine()
|
||||
async with db_engine.get_async_session() as session:
|
||||
stmt = update(Data).where(
|
||||
Data.id.in_([UUID(doc_id) for doc_id in doc_ids])
|
||||
).values(last_accessed=timestamp_dt)
|
||||
|
||||
await session.execute(stmt)
|
||||
await session.commit()
|
||||
187
cognee/tasks/cleanup/cleanup_unused_data.py
Normal file
187
cognee/tasks/cleanup/cleanup_unused_data.py
Normal file
|
|
@ -0,0 +1,187 @@
|
|||
"""
|
||||
Task for automatically deleting unused data from the memify pipeline.
|
||||
|
||||
This task identifies and removes entire documents that haven't
|
||||
been accessed by retrievers for a specified period, helping maintain system
|
||||
efficiency and storage optimization through whole-document removal.
|
||||
"""
|
||||
|
||||
import json
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from typing import Optional, Dict, Any
|
||||
from uuid import UUID
|
||||
import os
|
||||
from cognee.infrastructure.databases.graph import get_graph_engine
|
||||
from cognee.infrastructure.databases.vector import get_vector_engine
|
||||
from cognee.infrastructure.databases.relational import get_relational_engine
|
||||
from cognee.modules.data.models import Data, DatasetData
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
from sqlalchemy import select, or_
|
||||
import cognee
|
||||
import sqlalchemy as sa
|
||||
from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
async def cleanup_unused_data(
|
||||
minutes_threshold: Optional[int],
|
||||
dry_run: bool = True,
|
||||
user_id: Optional[UUID] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Identify and remove unused data from the memify pipeline.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
minutes_threshold : int
|
||||
Minutes since last access to consider data unused
|
||||
dry_run : bool
|
||||
If True, only report what would be deleted without actually deleting (default: True)
|
||||
user_id : UUID, optional
|
||||
Limit cleanup to specific user's data (default: None)
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dict[str, Any]
|
||||
Cleanup results with status, counts, and timestamp
|
||||
"""
|
||||
# Check 1: Environment variable must be enabled
|
||||
if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true":
|
||||
logger.warning(
|
||||
"Cleanup skipped: ENABLE_LAST_ACCESSED is not enabled."
|
||||
)
|
||||
return {
|
||||
"status": "skipped",
|
||||
"reason": "ENABLE_LAST_ACCESSED not enabled",
|
||||
"unused_count": 0,
|
||||
"deleted_count": {},
|
||||
"cleanup_date": datetime.now(timezone.utc).isoformat()
|
||||
}
|
||||
|
||||
# Check 2: Verify tracking has actually been running
|
||||
db_engine = get_relational_engine()
|
||||
async with db_engine.get_async_session() as session:
|
||||
# Count records with non-NULL last_accessed
|
||||
tracked_count = await session.execute(
|
||||
select(sa.func.count(Data.id)).where(Data.last_accessed.isnot(None))
|
||||
)
|
||||
tracked_records = tracked_count.scalar()
|
||||
|
||||
if tracked_records == 0:
|
||||
logger.warning(
|
||||
"Cleanup skipped: No records have been tracked yet. "
|
||||
"ENABLE_LAST_ACCESSED may have been recently enabled. "
|
||||
"Wait for retrievers to update timestamps before running cleanup."
|
||||
)
|
||||
return {
|
||||
"status": "skipped",
|
||||
"reason": "No tracked records found - tracking may be newly enabled",
|
||||
"unused_count": 0,
|
||||
"deleted_count": {},
|
||||
"cleanup_date": datetime.now(timezone.utc).isoformat()
|
||||
}
|
||||
|
||||
logger.info(
|
||||
"Starting cleanup task",
|
||||
minutes_threshold=minutes_threshold,
|
||||
dry_run=dry_run,
|
||||
user_id=str(user_id) if user_id else None
|
||||
)
|
||||
|
||||
# Calculate cutoff timestamp
|
||||
cutoff_date = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold)
|
||||
|
||||
# Document-level approach (recommended)
|
||||
return await _cleanup_via_sql(cutoff_date, dry_run, user_id)
|
||||
|
||||
|
||||
async def _cleanup_via_sql(
|
||||
cutoff_date: datetime,
|
||||
dry_run: bool,
|
||||
user_id: Optional[UUID] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
SQL-based cleanup: Query Data table for unused documents and use cognee.delete().
|
||||
|
||||
Parameters
|
||||
----------
|
||||
cutoff_date : datetime
|
||||
Cutoff date for last_accessed filtering
|
||||
dry_run : bool
|
||||
If True, only report what would be deleted
|
||||
user_id : UUID, optional
|
||||
Filter by user ID if provided
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dict[str, Any]
|
||||
Cleanup results
|
||||
"""
|
||||
db_engine = get_relational_engine()
|
||||
|
||||
async with db_engine.get_async_session() as session:
|
||||
# Query for Data records with old last_accessed timestamps
|
||||
query = select(Data, DatasetData).join(
|
||||
DatasetData, Data.id == DatasetData.data_id
|
||||
).where(
|
||||
or_(
|
||||
Data.last_accessed < cutoff_date,
|
||||
Data.last_accessed.is_(None)
|
||||
)
|
||||
)
|
||||
|
||||
if user_id:
|
||||
from cognee.modules.data.models import Dataset
|
||||
query = query.join(Dataset, DatasetData.dataset_id == Dataset.id).where(
|
||||
Dataset.owner_id == user_id
|
||||
)
|
||||
|
||||
result = await session.execute(query)
|
||||
unused_data = result.all()
|
||||
|
||||
logger.info(f"Found {len(unused_data)} unused documents in SQL")
|
||||
|
||||
if dry_run:
|
||||
return {
|
||||
"status": "dry_run",
|
||||
"unused_count": len(unused_data),
|
||||
"deleted_count": {
|
||||
"data_items": 0,
|
||||
"documents": 0
|
||||
},
|
||||
"cleanup_date": datetime.now(timezone.utc).isoformat(),
|
||||
"preview": {
|
||||
"documents": len(unused_data)
|
||||
}
|
||||
}
|
||||
|
||||
# Delete each document using cognee.delete()
|
||||
deleted_count = 0
|
||||
from cognee.modules.users.methods import get_default_user
|
||||
user = await get_default_user() if user_id is None else None
|
||||
|
||||
for data, dataset_data in unused_data:
|
||||
try:
|
||||
await cognee.delete(
|
||||
data_id=data.id,
|
||||
dataset_id=dataset_data.dataset_id,
|
||||
mode="hard", # Use hard mode to also remove orphaned entities
|
||||
user=user
|
||||
)
|
||||
deleted_count += 1
|
||||
logger.info(f"Deleted document {data.id} from dataset {dataset_data.dataset_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete document {data.id}: {e}")
|
||||
|
||||
logger.info("Cleanup completed", deleted_count=deleted_count)
|
||||
|
||||
return {
|
||||
"status": "completed",
|
||||
"unused_count": len(unused_data),
|
||||
"deleted_count": {
|
||||
"data_items": deleted_count,
|
||||
"documents": deleted_count
|
||||
},
|
||||
"cleanup_date": datetime.now(timezone.utc).isoformat()
|
||||
}
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
from typing import Union
|
||||
|
||||
from typing import Union
|
||||
from cognee.infrastructure.engine import DataPoint
|
||||
from cognee.modules.chunking.models import DocumentChunk
|
||||
from cognee.shared.CodeGraphEntities import CodeFile, CodePart
|
||||
|
|
@ -17,7 +17,6 @@ class TextSummary(DataPoint):
|
|||
|
||||
text: str
|
||||
made_from: DocumentChunk
|
||||
|
||||
metadata: dict = {"index_fields": ["text"]}
|
||||
|
||||
|
||||
|
|
|
|||
172
cognee/tests/test_cleanup_unused_data.py
Normal file
172
cognee/tests/test_cleanup_unused_data.py
Normal file
|
|
@ -0,0 +1,172 @@
|
|||
import os
|
||||
import pathlib
|
||||
import cognee
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from uuid import UUID
|
||||
from sqlalchemy import select, update
|
||||
from cognee.modules.data.models import Data, DatasetData
|
||||
from cognee.infrastructure.databases.relational import get_relational_engine
|
||||
from cognee.modules.users.methods import get_default_user
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
from cognee.modules.search.types import SearchType
|
||||
|
||||
logger = get_logger()
|
||||
|
||||
|
||||
async def test_textdocument_cleanup_with_sql():
|
||||
"""
|
||||
End-to-end test for TextDocument cleanup based on last_accessed timestamps.
|
||||
"""
|
||||
# Enable last accessed tracking BEFORE any cognee operations
|
||||
os.environ["ENABLE_LAST_ACCESSED"] = "true"
|
||||
|
||||
# Setup test directories
|
||||
data_directory_path = str(
|
||||
pathlib.Path(
|
||||
os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_cleanup")
|
||||
).resolve()
|
||||
)
|
||||
cognee_directory_path = str(
|
||||
pathlib.Path(
|
||||
os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_cleanup")
|
||||
).resolve()
|
||||
)
|
||||
|
||||
cognee.config.data_root_directory(data_directory_path)
|
||||
cognee.config.system_root_directory(cognee_directory_path)
|
||||
|
||||
# Initialize database
|
||||
from cognee.modules.engine.operations.setup import setup
|
||||
|
||||
# Clean slate
|
||||
await cognee.prune.prune_data()
|
||||
await cognee.prune.prune_system(metadata=True)
|
||||
|
||||
logger.info("🧪 Testing TextDocument cleanup based on last_accessed")
|
||||
|
||||
# Step 1: Add and cognify a test document
|
||||
dataset_name = "test_cleanup_dataset"
|
||||
test_text = """
|
||||
Machine learning is a subset of artificial intelligence that enables systems to learn
|
||||
and improve from experience without being explicitly programmed. Deep learning uses
|
||||
neural networks with multiple layers to process data.
|
||||
"""
|
||||
|
||||
await setup()
|
||||
user = await get_default_user()
|
||||
await cognee.add([test_text], dataset_name=dataset_name, user=user)
|
||||
|
||||
cognify_result = await cognee.cognify([dataset_name], user=user)
|
||||
|
||||
# Extract dataset_id from cognify result
|
||||
dataset_id = None
|
||||
for ds_id, pipeline_result in cognify_result.items():
|
||||
dataset_id = ds_id
|
||||
break
|
||||
|
||||
assert dataset_id is not None, "Failed to get dataset_id from cognify result"
|
||||
logger.info(f"✅ Document added and cognified. Dataset ID: {dataset_id}")
|
||||
|
||||
# Step 2: Perform search to trigger last_accessed update
|
||||
logger.info("Triggering search to update last_accessed...")
|
||||
search_results = await cognee.search(
|
||||
query_type=SearchType.CHUNKS,
|
||||
query_text="machine learning",
|
||||
datasets=[dataset_name],
|
||||
user=user
|
||||
)
|
||||
logger.info(f"✅ Search completed, found {len(search_results)} results")
|
||||
assert len(search_results) > 0, "Search should return results"
|
||||
|
||||
# Step 3: Verify last_accessed was set and get data_id
|
||||
db_engine = get_relational_engine()
|
||||
async with db_engine.get_async_session() as session:
|
||||
result = await session.execute(
|
||||
select(Data, DatasetData)
|
||||
.join(DatasetData, Data.id == DatasetData.data_id)
|
||||
.where(DatasetData.dataset_id == dataset_id)
|
||||
)
|
||||
data_records = result.all()
|
||||
assert len(data_records) > 0, "No Data records found for the dataset"
|
||||
data_record = data_records[0][0]
|
||||
data_id = data_record.id
|
||||
|
||||
# Verify last_accessed is set
|
||||
assert data_record.last_accessed is not None, (
|
||||
"last_accessed should be set after search operation"
|
||||
)
|
||||
|
||||
original_last_accessed = data_record.last_accessed
|
||||
logger.info(f"✅ last_accessed verified: {original_last_accessed}")
|
||||
|
||||
# Step 4: Manually age the timestamp
|
||||
minutes_threshold = 30
|
||||
aged_timestamp = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold + 10)
|
||||
|
||||
async with db_engine.get_async_session() as session:
|
||||
stmt = update(Data).where(Data.id == data_id).values(last_accessed=aged_timestamp)
|
||||
await session.execute(stmt)
|
||||
await session.commit()
|
||||
|
||||
# Verify timestamp was updated
|
||||
async with db_engine.get_async_session() as session:
|
||||
result = await session.execute(select(Data).where(Data.id == data_id))
|
||||
updated_data = result.scalar_one_or_none()
|
||||
assert updated_data is not None, "Data record should exist"
|
||||
retrieved_timestamp = updated_data.last_accessed
|
||||
if retrieved_timestamp.tzinfo is None:
|
||||
retrieved_timestamp = retrieved_timestamp.replace(tzinfo=timezone.utc)
|
||||
assert retrieved_timestamp == aged_timestamp, (
|
||||
f"Timestamp should be updated to aged value"
|
||||
)
|
||||
|
||||
# Step 5: Test cleanup (document-level is now the default)
|
||||
from cognee.tasks.cleanup.cleanup_unused_data import cleanup_unused_data
|
||||
|
||||
# First do a dry run
|
||||
logger.info("Testing dry run...")
|
||||
dry_run_result = await cleanup_unused_data(
|
||||
minutes_threshold=10,
|
||||
dry_run=True,
|
||||
user_id=user.id
|
||||
)
|
||||
|
||||
# Debug: Print the actual result
|
||||
logger.info(f"Dry run result: {dry_run_result}")
|
||||
|
||||
assert dry_run_result['status'] == 'dry_run', f"Status should be 'dry_run', got: {dry_run_result['status']}"
|
||||
assert dry_run_result['unused_count'] > 0, (
|
||||
"Should find at least one unused document"
|
||||
)
|
||||
logger.info(f"✅ Dry run found {dry_run_result['unused_count']} unused documents")
|
||||
|
||||
# Now run actual cleanup
|
||||
logger.info("Executing cleanup...")
|
||||
cleanup_result = await cleanup_unused_data(
|
||||
minutes_threshold=30,
|
||||
dry_run=False,
|
||||
user_id=user.id
|
||||
)
|
||||
|
||||
assert cleanup_result["status"] == "completed", "Cleanup should complete successfully"
|
||||
assert cleanup_result["deleted_count"]["documents"] > 0, (
|
||||
"At least one document should be deleted"
|
||||
)
|
||||
logger.info(f"✅ Cleanup completed. Deleted {cleanup_result['deleted_count']['documents']} documents")
|
||||
|
||||
# Step 6: Verify deletion
|
||||
async with db_engine.get_async_session() as session:
|
||||
deleted_data = (
|
||||
await session.execute(select(Data).where(Data.id == data_id))
|
||||
).scalar_one_or_none()
|
||||
assert deleted_data is None, "Data record should be deleted"
|
||||
logger.info("✅ Confirmed: Data record was deleted")
|
||||
|
||||
logger.info("🎉 All cleanup tests passed!")
|
||||
return True
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import asyncio
|
||||
success = asyncio.run(test_textdocument_cleanup_with_sql())
|
||||
exit(0 if success else 1)
|
||||
Loading…
Add table
Reference in a new issue