This commit is contained in:
hajdul88 2025-12-19 10:25:24 +01:00
parent 8a490b1c16
commit 4b71995a70
10 changed files with 486 additions and 504 deletions

View file

@ -1,52 +1,51 @@
"""add_last_accessed_to_data
Revision ID: e1ec1dcb50b6
Revises: 211ab850ef3d
Create Date: 2025-11-04 21:45:52.642322
"""
import os
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'e1ec1dcb50b6'
down_revision: Union[str, None] = '211ab850ef3d'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def _get_column(inspector, table, name, schema=None):
for col in inspector.get_columns(table, schema=schema):
if col["name"] == name:
return col
return None
def upgrade() -> None:
conn = op.get_bind()
insp = sa.inspect(conn)
last_accessed_column = _get_column(insp, "data", "last_accessed")
if not last_accessed_column:
# Always create the column for schema consistency
op.add_column('data',
sa.Column('last_accessed', sa.DateTime(timezone=True), nullable=True)
)
# Only initialize existing records if feature is enabled
enable_last_accessed = os.getenv("ENABLE_LAST_ACCESSED", "false").lower() == "true"
if enable_last_accessed:
op.execute("UPDATE data SET last_accessed = CURRENT_TIMESTAMP")
def downgrade() -> None:
conn = op.get_bind()
insp = sa.inspect(conn)
last_accessed_column = _get_column(insp, "data", "last_accessed")
if last_accessed_column:
op.drop_column('data', 'last_accessed')
"""add_last_accessed_to_data
Revision ID: e1ec1dcb50b6
Revises: 211ab850ef3d
Create Date: 2025-11-04 21:45:52.642322
"""
import os
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "e1ec1dcb50b6"
down_revision: Union[str, None] = "211ab850ef3d"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def _get_column(inspector, table, name, schema=None):
for col in inspector.get_columns(table, schema=schema):
if col["name"] == name:
return col
return None
def upgrade() -> None:
conn = op.get_bind()
insp = sa.inspect(conn)
last_accessed_column = _get_column(insp, "data", "last_accessed")
if not last_accessed_column:
# Always create the column for schema consistency
op.add_column("data", sa.Column("last_accessed", sa.DateTime(timezone=True), nullable=True))
# Only initialize existing records if feature is enabled
enable_last_accessed = os.getenv("ENABLE_LAST_ACCESSED", "false").lower() == "true"
if enable_last_accessed:
op.execute("UPDATE data SET last_accessed = CURRENT_TIMESTAMP")
def downgrade() -> None:
conn = op.get_bind()
insp = sa.inspect(conn)
last_accessed_column = _get_column(insp, "data", "last_accessed")
if last_accessed_column:
op.drop_column("data", "last_accessed")

View file

@ -2,6 +2,7 @@ from cognee.infrastructure.engine import DataPoint
from cognee.modules.engine.models.EntityType import EntityType
from typing import Optional
class Entity(DataPoint):
name: str
is_a: Optional[EntityType] = None

View file

@ -1,13 +1,12 @@
def get_entity_nodes_from_triplets(triplets):
entity_nodes = []
seen_ids = set()
for triplet in triplets:
if hasattr(triplet, 'node1') and triplet.node1 and triplet.node1.id not in seen_ids:
entity_nodes.append({"id": str(triplet.node1.id)})
seen_ids.add(triplet.node1.id)
if hasattr(triplet, 'node2') and triplet.node2 and triplet.node2.id not in seen_ids:
entity_nodes.append({"id": str(triplet.node2.id)})
seen_ids.add(triplet.node2.id)
entity_nodes = []
seen_ids = set()
for triplet in triplets:
if hasattr(triplet, "node1") and triplet.node1 and triplet.node1.id not in seen_ids:
entity_nodes.append({"id": str(triplet.node1.id)})
seen_ids.add(triplet.node1.id)
if hasattr(triplet, "node2") and triplet.node2 and triplet.node2.id not in seen_ids:
entity_nodes.append({"id": str(triplet.node2.id)})
seen_ids.add(triplet.node2.id)
return entity_nodes
return entity_nodes

View file

@ -5,7 +5,7 @@ from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.modules.retrieval.base_retriever import BaseRetriever
from cognee.modules.retrieval.exceptions.exceptions import NoDataError
from cognee.infrastructure.databases.vector.exceptions.exceptions import CollectionNotFoundError
from datetime import datetime, timezone
from datetime import datetime, timezone
logger = get_logger("ChunksRetriever")
@ -28,7 +28,7 @@ class ChunksRetriever(BaseRetriever):
):
self.top_k = top_k
async def get_context(self, query: str) -> Any:
async def get_context(self, query: str) -> Any:
"""
Retrieves document chunks context based on the query.
Searches for document chunks relevant to the specified query using a vector engine.

View file

@ -148,8 +148,8 @@ class GraphCompletionRetriever(BaseGraphRetriever):
# context = await self.resolve_edges_to_text(triplets)
entity_nodes = get_entity_nodes_from_triplets(triplets)
await update_node_access_timestamps(entity_nodes)
await update_node_access_timestamps(entity_nodes)
return triplets
async def convert_retrieved_objects_to_context(self, triplets: List[Edge]):

View file

@ -55,9 +55,9 @@ class SummariesRetriever(BaseRetriever):
"TextSummary_text", query, limit=self.top_k
)
logger.info(f"Found {len(summaries_results)} summaries from vector search")
await update_node_access_timestamps(summaries_results)
except CollectionNotFoundError as error:
logger.error("TextSummary_text collection not found in vector database")
raise NoDataError("No data found in the system, please add data first.") from error

View file

@ -1,82 +1,88 @@
"""Utilities for tracking data access in retrievers."""
import json
from datetime import datetime, timezone
from typing import List, Any
from uuid import UUID
import os
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Data
from cognee.shared.logging_utils import get_logger
from sqlalchemy import update
from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph
logger = get_logger(__name__)
async def update_node_access_timestamps(items: List[Any]):
if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true":
return
if not items:
return
graph_engine = await get_graph_engine()
timestamp_dt = datetime.now(timezone.utc)
# Extract node IDs
node_ids = []
for item in items:
item_id = item.payload.get("id") if hasattr(item, 'payload') else item.get("id")
if item_id:
node_ids.append(str(item_id))
if not node_ids:
return
# Focus on document-level tracking via projection
try:
doc_ids = await _find_origin_documents_via_projection(graph_engine, node_ids)
if doc_ids:
await _update_sql_records(doc_ids, timestamp_dt)
except Exception as e:
logger.error(f"Failed to update SQL timestamps: {e}")
raise
async def _find_origin_documents_via_projection(graph_engine, node_ids):
"""Find origin documents using graph projection instead of DB queries"""
# Project the entire graph with necessary properties
memory_fragment = CogneeGraph()
await memory_fragment.project_graph_from_db(
graph_engine,
node_properties_to_project=["id", "type"],
edge_properties_to_project=["relationship_name"]
)
# Find origin documents by traversing the in-memory graph
doc_ids = set()
for node_id in node_ids:
node = memory_fragment.get_node(node_id)
if node and node.get_attribute("type") == "DocumentChunk":
# Traverse edges to find connected documents
for edge in node.get_skeleton_edges():
# Get the neighbor node
neighbor = edge.get_destination_node() if edge.get_source_node().id == node_id else edge.get_source_node()
if neighbor and neighbor.get_attribute("type") in ["TextDocument", "Document"]:
doc_ids.add(neighbor.id)
return list(doc_ids)
async def _update_sql_records(doc_ids, timestamp_dt):
"""Update SQL Data table (same for all providers)"""
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
stmt = update(Data).where(
Data.id.in_([UUID(doc_id) for doc_id in doc_ids])
).values(last_accessed=timestamp_dt)
await session.execute(stmt)
"""Utilities for tracking data access in retrievers."""
import json
from datetime import datetime, timezone
from typing import List, Any
from uuid import UUID
import os
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Data
from cognee.shared.logging_utils import get_logger
from sqlalchemy import update
from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph
logger = get_logger(__name__)
async def update_node_access_timestamps(items: List[Any]):
if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true":
return
if not items:
return
graph_engine = await get_graph_engine()
timestamp_dt = datetime.now(timezone.utc)
# Extract node IDs
node_ids = []
for item in items:
item_id = item.payload.get("id") if hasattr(item, "payload") else item.get("id")
if item_id:
node_ids.append(str(item_id))
if not node_ids:
return
# Focus on document-level tracking via projection
try:
doc_ids = await _find_origin_documents_via_projection(graph_engine, node_ids)
if doc_ids:
await _update_sql_records(doc_ids, timestamp_dt)
except Exception as e:
logger.error(f"Failed to update SQL timestamps: {e}")
raise
async def _find_origin_documents_via_projection(graph_engine, node_ids):
"""Find origin documents using graph projection instead of DB queries"""
# Project the entire graph with necessary properties
memory_fragment = CogneeGraph()
await memory_fragment.project_graph_from_db(
graph_engine,
node_properties_to_project=["id", "type"],
edge_properties_to_project=["relationship_name"],
)
# Find origin documents by traversing the in-memory graph
doc_ids = set()
for node_id in node_ids:
node = memory_fragment.get_node(node_id)
if node and node.get_attribute("type") == "DocumentChunk":
# Traverse edges to find connected documents
for edge in node.get_skeleton_edges():
# Get the neighbor node
neighbor = (
edge.get_destination_node()
if edge.get_source_node().id == node_id
else edge.get_source_node()
)
if neighbor and neighbor.get_attribute("type") in ["TextDocument", "Document"]:
doc_ids.add(neighbor.id)
return list(doc_ids)
async def _update_sql_records(doc_ids, timestamp_dt):
"""Update SQL Data table (same for all providers)"""
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
stmt = (
update(Data)
.where(Data.id.in_([UUID(doc_id) for doc_id in doc_ids]))
.values(last_accessed=timestamp_dt)
)
await session.execute(stmt)
await session.commit()

View file

@ -1,187 +1,172 @@
"""
Task for automatically deleting unused data from the memify pipeline.
This task identifies and removes entire documents that haven't
been accessed by retrievers for a specified period, helping maintain system
efficiency and storage optimization through whole-document removal.
"""
import json
from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, Any
from uuid import UUID
import os
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Data, DatasetData
from cognee.shared.logging_utils import get_logger
from sqlalchemy import select, or_
import cognee
import sqlalchemy as sa
from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph
logger = get_logger(__name__)
async def cleanup_unused_data(
minutes_threshold: Optional[int],
dry_run: bool = True,
user_id: Optional[UUID] = None
) -> Dict[str, Any]:
"""
Identify and remove unused data from the memify pipeline.
Parameters
----------
minutes_threshold : int
Minutes since last access to consider data unused
dry_run : bool
If True, only report what would be deleted without actually deleting (default: True)
user_id : UUID, optional
Limit cleanup to specific user's data (default: None)
Returns
-------
Dict[str, Any]
Cleanup results with status, counts, and timestamp
"""
# Check 1: Environment variable must be enabled
if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true":
logger.warning(
"Cleanup skipped: ENABLE_LAST_ACCESSED is not enabled."
)
return {
"status": "skipped",
"reason": "ENABLE_LAST_ACCESSED not enabled",
"unused_count": 0,
"deleted_count": {},
"cleanup_date": datetime.now(timezone.utc).isoformat()
}
# Check 2: Verify tracking has actually been running
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
# Count records with non-NULL last_accessed
tracked_count = await session.execute(
select(sa.func.count(Data.id)).where(Data.last_accessed.isnot(None))
)
tracked_records = tracked_count.scalar()
if tracked_records == 0:
logger.warning(
"Cleanup skipped: No records have been tracked yet. "
"ENABLE_LAST_ACCESSED may have been recently enabled. "
"Wait for retrievers to update timestamps before running cleanup."
)
return {
"status": "skipped",
"reason": "No tracked records found - tracking may be newly enabled",
"unused_count": 0,
"deleted_count": {},
"cleanup_date": datetime.now(timezone.utc).isoformat()
}
logger.info(
"Starting cleanup task",
minutes_threshold=minutes_threshold,
dry_run=dry_run,
user_id=str(user_id) if user_id else None
)
# Calculate cutoff timestamp
cutoff_date = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold)
# Document-level approach (recommended)
return await _cleanup_via_sql(cutoff_date, dry_run, user_id)
async def _cleanup_via_sql(
cutoff_date: datetime,
dry_run: bool,
user_id: Optional[UUID] = None
) -> Dict[str, Any]:
"""
SQL-based cleanup: Query Data table for unused documents and use cognee.delete().
Parameters
----------
cutoff_date : datetime
Cutoff date for last_accessed filtering
dry_run : bool
If True, only report what would be deleted
user_id : UUID, optional
Filter by user ID if provided
Returns
-------
Dict[str, Any]
Cleanup results
"""
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
# Query for Data records with old last_accessed timestamps
query = select(Data, DatasetData).join(
DatasetData, Data.id == DatasetData.data_id
).where(
or_(
Data.last_accessed < cutoff_date,
Data.last_accessed.is_(None)
)
)
if user_id:
from cognee.modules.data.models import Dataset
query = query.join(Dataset, DatasetData.dataset_id == Dataset.id).where(
Dataset.owner_id == user_id
)
result = await session.execute(query)
unused_data = result.all()
logger.info(f"Found {len(unused_data)} unused documents in SQL")
if dry_run:
return {
"status": "dry_run",
"unused_count": len(unused_data),
"deleted_count": {
"data_items": 0,
"documents": 0
},
"cleanup_date": datetime.now(timezone.utc).isoformat(),
"preview": {
"documents": len(unused_data)
}
}
# Delete each document using cognee.delete()
deleted_count = 0
from cognee.modules.users.methods import get_default_user
user = await get_default_user() if user_id is None else None
for data, dataset_data in unused_data:
try:
await cognee.delete(
data_id=data.id,
dataset_id=dataset_data.dataset_id,
mode="hard", # Use hard mode to also remove orphaned entities
user=user
)
deleted_count += 1
logger.info(f"Deleted document {data.id} from dataset {dataset_data.dataset_id}")
except Exception as e:
logger.error(f"Failed to delete document {data.id}: {e}")
logger.info("Cleanup completed", deleted_count=deleted_count)
return {
"status": "completed",
"unused_count": len(unused_data),
"deleted_count": {
"data_items": deleted_count,
"documents": deleted_count
},
"cleanup_date": datetime.now(timezone.utc).isoformat()
"""
Task for automatically deleting unused data from the memify pipeline.
This task identifies and removes entire documents that haven't
been accessed by retrievers for a specified period, helping maintain system
efficiency and storage optimization through whole-document removal.
"""
import json
from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, Any
from uuid import UUID
import os
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Data, DatasetData
from cognee.shared.logging_utils import get_logger
from sqlalchemy import select, or_
import cognee
import sqlalchemy as sa
from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph
logger = get_logger(__name__)
async def cleanup_unused_data(
minutes_threshold: Optional[int], dry_run: bool = True, user_id: Optional[UUID] = None
) -> Dict[str, Any]:
"""
Identify and remove unused data from the memify pipeline.
Parameters
----------
minutes_threshold : int
Minutes since last access to consider data unused
dry_run : bool
If True, only report what would be deleted without actually deleting (default: True)
user_id : UUID, optional
Limit cleanup to specific user's data (default: None)
Returns
-------
Dict[str, Any]
Cleanup results with status, counts, and timestamp
"""
# Check 1: Environment variable must be enabled
if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true":
logger.warning("Cleanup skipped: ENABLE_LAST_ACCESSED is not enabled.")
return {
"status": "skipped",
"reason": "ENABLE_LAST_ACCESSED not enabled",
"unused_count": 0,
"deleted_count": {},
"cleanup_date": datetime.now(timezone.utc).isoformat(),
}
# Check 2: Verify tracking has actually been running
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
# Count records with non-NULL last_accessed
tracked_count = await session.execute(
select(sa.func.count(Data.id)).where(Data.last_accessed.isnot(None))
)
tracked_records = tracked_count.scalar()
if tracked_records == 0:
logger.warning(
"Cleanup skipped: No records have been tracked yet. "
"ENABLE_LAST_ACCESSED may have been recently enabled. "
"Wait for retrievers to update timestamps before running cleanup."
)
return {
"status": "skipped",
"reason": "No tracked records found - tracking may be newly enabled",
"unused_count": 0,
"deleted_count": {},
"cleanup_date": datetime.now(timezone.utc).isoformat(),
}
logger.info(
"Starting cleanup task",
minutes_threshold=minutes_threshold,
dry_run=dry_run,
user_id=str(user_id) if user_id else None,
)
# Calculate cutoff timestamp
cutoff_date = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold)
# Document-level approach (recommended)
return await _cleanup_via_sql(cutoff_date, dry_run, user_id)
async def _cleanup_via_sql(
cutoff_date: datetime, dry_run: bool, user_id: Optional[UUID] = None
) -> Dict[str, Any]:
"""
SQL-based cleanup: Query Data table for unused documents and use cognee.delete().
Parameters
----------
cutoff_date : datetime
Cutoff date for last_accessed filtering
dry_run : bool
If True, only report what would be deleted
user_id : UUID, optional
Filter by user ID if provided
Returns
-------
Dict[str, Any]
Cleanup results
"""
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
# Query for Data records with old last_accessed timestamps
query = (
select(Data, DatasetData)
.join(DatasetData, Data.id == DatasetData.data_id)
.where(or_(Data.last_accessed < cutoff_date, Data.last_accessed.is_(None)))
)
if user_id:
from cognee.modules.data.models import Dataset
query = query.join(Dataset, DatasetData.dataset_id == Dataset.id).where(
Dataset.owner_id == user_id
)
result = await session.execute(query)
unused_data = result.all()
logger.info(f"Found {len(unused_data)} unused documents in SQL")
if dry_run:
return {
"status": "dry_run",
"unused_count": len(unused_data),
"deleted_count": {"data_items": 0, "documents": 0},
"cleanup_date": datetime.now(timezone.utc).isoformat(),
"preview": {"documents": len(unused_data)},
}
# Delete each document using cognee.delete()
deleted_count = 0
from cognee.modules.users.methods import get_default_user
user = await get_default_user() if user_id is None else None
for data, dataset_data in unused_data:
try:
await cognee.delete(
data_id=data.id,
dataset_id=dataset_data.dataset_id,
mode="hard", # Use hard mode to also remove orphaned entities
user=user,
)
deleted_count += 1
logger.info(f"Deleted document {data.id} from dataset {dataset_data.dataset_id}")
except Exception as e:
logger.error(f"Failed to delete document {data.id}: {e}")
logger.info("Cleanup completed", deleted_count=deleted_count)
return {
"status": "completed",
"unused_count": len(unused_data),
"deleted_count": {"data_items": deleted_count, "documents": deleted_count},
"cleanup_date": datetime.now(timezone.utc).isoformat(),
}

View file

@ -1,4 +1,3 @@
from typing import Union
from cognee.infrastructure.engine import DataPoint
from cognee.modules.chunking.models import DocumentChunk

View file

@ -1,172 +1,165 @@
import os
import pathlib
import cognee
from datetime import datetime, timezone, timedelta
from uuid import UUID
from sqlalchemy import select, update
from cognee.modules.data.models import Data, DatasetData
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.users.methods import get_default_user
from cognee.shared.logging_utils import get_logger
from cognee.modules.search.types import SearchType
logger = get_logger()
async def test_textdocument_cleanup_with_sql():
"""
End-to-end test for TextDocument cleanup based on last_accessed timestamps.
"""
# Enable last accessed tracking BEFORE any cognee operations
os.environ["ENABLE_LAST_ACCESSED"] = "true"
# Setup test directories
data_directory_path = str(
pathlib.Path(
os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_cleanup")
).resolve()
)
cognee_directory_path = str(
pathlib.Path(
os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_cleanup")
).resolve()
)
cognee.config.data_root_directory(data_directory_path)
cognee.config.system_root_directory(cognee_directory_path)
# Initialize database
from cognee.modules.engine.operations.setup import setup
# Clean slate
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
logger.info("🧪 Testing TextDocument cleanup based on last_accessed")
# Step 1: Add and cognify a test document
dataset_name = "test_cleanup_dataset"
import os
import pathlib
import cognee
from datetime import datetime, timezone, timedelta
from uuid import UUID
from sqlalchemy import select, update
from cognee.modules.data.models import Data, DatasetData
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.users.methods import get_default_user
from cognee.shared.logging_utils import get_logger
from cognee.modules.search.types import SearchType
logger = get_logger()
async def test_textdocument_cleanup_with_sql():
"""
End-to-end test for TextDocument cleanup based on last_accessed timestamps.
"""
# Enable last accessed tracking BEFORE any cognee operations
os.environ["ENABLE_LAST_ACCESSED"] = "true"
# Setup test directories
data_directory_path = str(
pathlib.Path(
os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_cleanup")
).resolve()
)
cognee_directory_path = str(
pathlib.Path(
os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_cleanup")
).resolve()
)
cognee.config.data_root_directory(data_directory_path)
cognee.config.system_root_directory(cognee_directory_path)
# Initialize database
from cognee.modules.engine.operations.setup import setup
# Clean slate
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
logger.info("🧪 Testing TextDocument cleanup based on last_accessed")
# Step 1: Add and cognify a test document
dataset_name = "test_cleanup_dataset"
test_text = """
Machine learning is a subset of artificial intelligence that enables systems to learn
and improve from experience without being explicitly programmed. Deep learning uses
neural networks with multiple layers to process data.
"""
await setup()
user = await get_default_user()
await cognee.add([test_text], dataset_name=dataset_name, user=user)
cognify_result = await cognee.cognify([dataset_name], user=user)
# Extract dataset_id from cognify result
dataset_id = None
for ds_id, pipeline_result in cognify_result.items():
dataset_id = ds_id
break
assert dataset_id is not None, "Failed to get dataset_id from cognify result"
logger.info(f"✅ Document added and cognified. Dataset ID: {dataset_id}")
# Step 2: Perform search to trigger last_accessed update
logger.info("Triggering search to update last_accessed...")
search_results = await cognee.search(
query_type=SearchType.CHUNKS,
query_text="machine learning",
datasets=[dataset_name],
user=user
)
logger.info(f"✅ Search completed, found {len(search_results)} results")
assert len(search_results) > 0, "Search should return results"
# Step 3: Verify last_accessed was set and get data_id
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
result = await session.execute(
select(Data, DatasetData)
.join(DatasetData, Data.id == DatasetData.data_id)
.where(DatasetData.dataset_id == dataset_id)
)
data_records = result.all()
assert len(data_records) > 0, "No Data records found for the dataset"
data_record = data_records[0][0]
data_id = data_record.id
# Verify last_accessed is set
assert data_record.last_accessed is not None, (
"last_accessed should be set after search operation"
)
original_last_accessed = data_record.last_accessed
logger.info(f"✅ last_accessed verified: {original_last_accessed}")
# Step 4: Manually age the timestamp
minutes_threshold = 30
aged_timestamp = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold + 10)
async with db_engine.get_async_session() as session:
stmt = update(Data).where(Data.id == data_id).values(last_accessed=aged_timestamp)
await session.execute(stmt)
await session.commit()
# Verify timestamp was updated
async with db_engine.get_async_session() as session:
result = await session.execute(select(Data).where(Data.id == data_id))
updated_data = result.scalar_one_or_none()
assert updated_data is not None, "Data record should exist"
retrieved_timestamp = updated_data.last_accessed
if retrieved_timestamp.tzinfo is None:
retrieved_timestamp = retrieved_timestamp.replace(tzinfo=timezone.utc)
assert retrieved_timestamp == aged_timestamp, (
f"Timestamp should be updated to aged value"
)
# Step 5: Test cleanup (document-level is now the default)
from cognee.tasks.cleanup.cleanup_unused_data import cleanup_unused_data
# First do a dry run
logger.info("Testing dry run...")
dry_run_result = await cleanup_unused_data(
minutes_threshold=10,
dry_run=True,
user_id=user.id
)
# Debug: Print the actual result
logger.info(f"Dry run result: {dry_run_result}")
assert dry_run_result['status'] == 'dry_run', f"Status should be 'dry_run', got: {dry_run_result['status']}"
assert dry_run_result['unused_count'] > 0, (
"Should find at least one unused document"
)
logger.info(f"✅ Dry run found {dry_run_result['unused_count']} unused documents")
# Now run actual cleanup
logger.info("Executing cleanup...")
cleanup_result = await cleanup_unused_data(
minutes_threshold=30,
dry_run=False,
user_id=user.id
)
assert cleanup_result["status"] == "completed", "Cleanup should complete successfully"
assert cleanup_result["deleted_count"]["documents"] > 0, (
"At least one document should be deleted"
)
logger.info(f"✅ Cleanup completed. Deleted {cleanup_result['deleted_count']['documents']} documents")
# Step 6: Verify deletion
async with db_engine.get_async_session() as session:
deleted_data = (
await session.execute(select(Data).where(Data.id == data_id))
).scalar_one_or_none()
assert deleted_data is None, "Data record should be deleted"
logger.info("✅ Confirmed: Data record was deleted")
logger.info("🎉 All cleanup tests passed!")
return True
if __name__ == "__main__":
import asyncio
success = asyncio.run(test_textdocument_cleanup_with_sql())
"""
await setup()
user = await get_default_user()
await cognee.add([test_text], dataset_name=dataset_name, user=user)
cognify_result = await cognee.cognify([dataset_name], user=user)
# Extract dataset_id from cognify result
dataset_id = None
for ds_id, pipeline_result in cognify_result.items():
dataset_id = ds_id
break
assert dataset_id is not None, "Failed to get dataset_id from cognify result"
logger.info(f"✅ Document added and cognified. Dataset ID: {dataset_id}")
# Step 2: Perform search to trigger last_accessed update
logger.info("Triggering search to update last_accessed...")
search_results = await cognee.search(
query_type=SearchType.CHUNKS,
query_text="machine learning",
datasets=[dataset_name],
user=user,
)
logger.info(f"✅ Search completed, found {len(search_results)} results")
assert len(search_results) > 0, "Search should return results"
# Step 3: Verify last_accessed was set and get data_id
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
result = await session.execute(
select(Data, DatasetData)
.join(DatasetData, Data.id == DatasetData.data_id)
.where(DatasetData.dataset_id == dataset_id)
)
data_records = result.all()
assert len(data_records) > 0, "No Data records found for the dataset"
data_record = data_records[0][0]
data_id = data_record.id
# Verify last_accessed is set
assert data_record.last_accessed is not None, (
"last_accessed should be set after search operation"
)
original_last_accessed = data_record.last_accessed
logger.info(f"✅ last_accessed verified: {original_last_accessed}")
# Step 4: Manually age the timestamp
minutes_threshold = 30
aged_timestamp = datetime.now(timezone.utc) - timedelta(minutes=minutes_threshold + 10)
async with db_engine.get_async_session() as session:
stmt = update(Data).where(Data.id == data_id).values(last_accessed=aged_timestamp)
await session.execute(stmt)
await session.commit()
# Verify timestamp was updated
async with db_engine.get_async_session() as session:
result = await session.execute(select(Data).where(Data.id == data_id))
updated_data = result.scalar_one_or_none()
assert updated_data is not None, "Data record should exist"
retrieved_timestamp = updated_data.last_accessed
if retrieved_timestamp.tzinfo is None:
retrieved_timestamp = retrieved_timestamp.replace(tzinfo=timezone.utc)
assert retrieved_timestamp == aged_timestamp, f"Timestamp should be updated to aged value"
# Step 5: Test cleanup (document-level is now the default)
from cognee.tasks.cleanup.cleanup_unused_data import cleanup_unused_data
# First do a dry run
logger.info("Testing dry run...")
dry_run_result = await cleanup_unused_data(minutes_threshold=10, dry_run=True, user_id=user.id)
# Debug: Print the actual result
logger.info(f"Dry run result: {dry_run_result}")
assert dry_run_result["status"] == "dry_run", (
f"Status should be 'dry_run', got: {dry_run_result['status']}"
)
assert dry_run_result["unused_count"] > 0, "Should find at least one unused document"
logger.info(f"✅ Dry run found {dry_run_result['unused_count']} unused documents")
# Now run actual cleanup
logger.info("Executing cleanup...")
cleanup_result = await cleanup_unused_data(minutes_threshold=30, dry_run=False, user_id=user.id)
assert cleanup_result["status"] == "completed", "Cleanup should complete successfully"
assert cleanup_result["deleted_count"]["documents"] > 0, (
"At least one document should be deleted"
)
logger.info(
f"✅ Cleanup completed. Deleted {cleanup_result['deleted_count']['documents']} documents"
)
# Step 6: Verify deletion
async with db_engine.get_async_session() as session:
deleted_data = (
await session.execute(select(Data).where(Data.id == data_id))
).scalar_one_or_none()
assert deleted_data is None, "Data record should be deleted"
logger.info("✅ Confirmed: Data record was deleted")
logger.info("🎉 All cleanup tests passed!")
return True
if __name__ == "__main__":
import asyncio
success = asyncio.run(test_textdocument_cleanup_with_sql())
exit(0 if success else 1)