fix: flag to enable and disable last_accessed
This commit is contained in:
parent
53d3b50f93
commit
b52c1a1e25
3 changed files with 90 additions and 45 deletions
|
|
@ -1,46 +1,52 @@
|
||||||
"""add_last_accessed_to_data
|
"""add_last_accessed_to_data
|
||||||
|
|
||||||
Revision ID: e1ec1dcb50b6
|
Revision ID: e1ec1dcb50b6
|
||||||
Revises: 211ab850ef3d
|
Revises: 211ab850ef3d
|
||||||
Create Date: 2025-11-04 21:45:52.642322
|
Create Date: 2025-11-04 21:45:52.642322
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from typing import Sequence, Union
|
import os
|
||||||
|
from typing import Sequence, Union
|
||||||
from alembic import op
|
|
||||||
import sqlalchemy as sa
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
# revision identifiers, used by Alembic.
|
|
||||||
revision: str = 'e1ec1dcb50b6'
|
|
||||||
down_revision: Union[str, None] = '211ab850ef3d'
|
|
||||||
branch_labels: Union[str, Sequence[str], None] = None
|
|
||||||
depends_on: Union[str, Sequence[str], None] = None
|
|
||||||
|
|
||||||
def _get_column(inspector, table, name, schema=None):
|
|
||||||
for col in inspector.get_columns(table, schema=schema):
|
|
||||||
if col["name"] == name:
|
|
||||||
return col
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def upgrade() -> None:
|
# revision identifiers, used by Alembic.
|
||||||
conn = op.get_bind()
|
revision: str = 'e1ec1dcb50b6'
|
||||||
insp = sa.inspect(conn)
|
down_revision: Union[str, None] = '211ab850ef3d'
|
||||||
|
branch_labels: Union[str, Sequence[str], None] = None
|
||||||
last_accessed_column = _get_column(insp, "data", "last_accessed")
|
depends_on: Union[str, Sequence[str], None] = None
|
||||||
if not last_accessed_column:
|
|
||||||
op.add_column('data',
|
|
||||||
sa.Column('last_accessed', sa.DateTime(timezone=True), nullable=True)
|
|
||||||
)
|
|
||||||
# Optionally initialize with created_at values for existing records
|
|
||||||
op.execute("UPDATE data SET last_accessed = CURRENT_TIMESTAMP")
|
|
||||||
|
|
||||||
|
|
||||||
def downgrade() -> None:
|
def _get_column(inspector, table, name, schema=None):
|
||||||
conn = op.get_bind()
|
for col in inspector.get_columns(table, schema=schema):
|
||||||
insp = sa.inspect(conn)
|
if col["name"] == name:
|
||||||
|
return col
|
||||||
last_accessed_column = _get_column(insp, "data", "last_accessed")
|
return None
|
||||||
if last_accessed_column:
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
conn = op.get_bind()
|
||||||
|
insp = sa.inspect(conn)
|
||||||
|
|
||||||
|
last_accessed_column = _get_column(insp, "data", "last_accessed")
|
||||||
|
if not last_accessed_column:
|
||||||
|
# Always create the column for schema consistency
|
||||||
|
op.add_column('data',
|
||||||
|
sa.Column('last_accessed', sa.DateTime(timezone=True), nullable=True)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Only initialize existing records if feature is enabled
|
||||||
|
enable_last_accessed = os.getenv("ENABLE_LAST_ACCESSED", "false").lower() == "true"
|
||||||
|
if enable_last_accessed:
|
||||||
|
op.execute("UPDATE data SET last_accessed = CURRENT_TIMESTAMP")
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
conn = op.get_bind()
|
||||||
|
insp = sa.inspect(conn)
|
||||||
|
|
||||||
|
last_accessed_column = _get_column(insp, "data", "last_accessed")
|
||||||
|
if last_accessed_column:
|
||||||
op.drop_column('data', 'last_accessed')
|
op.drop_column('data', 'last_accessed')
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@ import json
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from typing import List, Any
|
from typing import List, Any
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
import os
|
||||||
from cognee.infrastructure.databases.graph import get_graph_engine
|
from cognee.infrastructure.databases.graph import get_graph_engine
|
||||||
from cognee.infrastructure.databases.relational import get_relational_engine
|
from cognee.infrastructure.databases.relational import get_relational_engine
|
||||||
from cognee.modules.data.models import Data
|
from cognee.modules.data.models import Data
|
||||||
|
|
@ -27,7 +27,10 @@ async def update_node_access_timestamps(items: List[Any]):
|
||||||
----------
|
----------
|
||||||
items : List[Any]
|
items : List[Any]
|
||||||
List of items with payload containing 'id' field (from vector search results)
|
List of items with payload containing 'id' field (from vector search results)
|
||||||
"""
|
"""
|
||||||
|
if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true":
|
||||||
|
return
|
||||||
|
|
||||||
if not items:
|
if not items:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ import json
|
||||||
from datetime import datetime, timezone, timedelta
|
from datetime import datetime, timezone, timedelta
|
||||||
from typing import Optional, Dict, Any
|
from typing import Optional, Dict, Any
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
import os
|
||||||
from cognee.infrastructure.databases.graph import get_graph_engine
|
from cognee.infrastructure.databases.graph import get_graph_engine
|
||||||
from cognee.infrastructure.databases.vector import get_vector_engine
|
from cognee.infrastructure.databases.vector import get_vector_engine
|
||||||
from cognee.infrastructure.databases.relational import get_relational_engine
|
from cognee.infrastructure.databases.relational import get_relational_engine
|
||||||
|
|
@ -47,7 +47,43 @@ async def cleanup_unused_data(
|
||||||
-------
|
-------
|
||||||
Dict[str, Any]
|
Dict[str, Any]
|
||||||
Cleanup results with status, counts, and timestamp
|
Cleanup results with status, counts, and timestamp
|
||||||
"""
|
"""
|
||||||
|
# Check 1: Environment variable must be enabled
|
||||||
|
if os.getenv("ENABLE_LAST_ACCESSED", "false").lower() != "true":
|
||||||
|
logger.warning(
|
||||||
|
"Cleanup skipped: ENABLE_LAST_ACCESSED is not enabled."
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"status": "skipped",
|
||||||
|
"reason": "ENABLE_LAST_ACCESSED not enabled",
|
||||||
|
"unused_count": 0,
|
||||||
|
"deleted_count": {},
|
||||||
|
"cleanup_date": datetime.now(timezone.utc).isoformat()
|
||||||
|
}
|
||||||
|
|
||||||
|
# Check 2: Verify tracking has actually been running
|
||||||
|
db_engine = get_relational_engine()
|
||||||
|
async with db_engine.get_async_session() as session:
|
||||||
|
# Count records with non-NULL last_accessed
|
||||||
|
tracked_count = await session.execute(
|
||||||
|
select(sa.func.count(Data.id)).where(Data.last_accessed.isnot(None))
|
||||||
|
)
|
||||||
|
tracked_records = tracked_count.scalar()
|
||||||
|
|
||||||
|
if tracked_records == 0:
|
||||||
|
logger.warning(
|
||||||
|
"Cleanup skipped: No records have been tracked yet. "
|
||||||
|
"ENABLE_LAST_ACCESSED may have been recently enabled. "
|
||||||
|
"Wait for retrievers to update timestamps before running cleanup."
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"status": "skipped",
|
||||||
|
"reason": "No tracked records found - tracking may be newly enabled",
|
||||||
|
"unused_count": 0,
|
||||||
|
"deleted_count": {},
|
||||||
|
"cleanup_date": datetime.now(timezone.utc).isoformat()
|
||||||
|
}
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Starting cleanup task",
|
"Starting cleanup task",
|
||||||
days_threshold=days_threshold,
|
days_threshold=days_threshold,
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue