From d34fd9237bf41c6b421bd556541b50ea68246e45 Mon Sep 17 00:00:00 2001 From: chinu0609 Date: Tue, 4 Nov 2025 22:04:32 +0530 Subject: [PATCH] feat: adding last_acessed in the Data model --- .../e1ec1dcb50b6_add_last_accessed_to_data.py | 30 ++++++ cognee/modules/data/models/Data.py | 1 + .../retrieval/utils/access_tracking.py | 102 ++++++++++++------ 3 files changed, 100 insertions(+), 33 deletions(-) create mode 100644 alembic/versions/e1ec1dcb50b6_add_last_accessed_to_data.py diff --git a/alembic/versions/e1ec1dcb50b6_add_last_accessed_to_data.py b/alembic/versions/e1ec1dcb50b6_add_last_accessed_to_data.py new file mode 100644 index 000000000..0ccefa63b --- /dev/null +++ b/alembic/versions/e1ec1dcb50b6_add_last_accessed_to_data.py @@ -0,0 +1,30 @@ +"""add_last_accessed_to_data + +Revision ID: e1ec1dcb50b6 +Revises: 211ab850ef3d +Create Date: 2025-11-04 21:45:52.642322 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'e1ec1dcb50b6' +down_revision: Union[str, None] = '211ab850ef3d' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column('data', + sa.Column('last_accessed', sa.DateTime(timezone=True), nullable=True) + ) + # Optionally initialize with created_at values for existing records + op.execute("UPDATE data SET last_accessed = created_at") + + +def downgrade() -> None: + op.drop_column('data', 'last_accessed') diff --git a/cognee/modules/data/models/Data.py b/cognee/modules/data/models/Data.py index ef228f2e1..27ab7481e 100644 --- a/cognee/modules/data/models/Data.py +++ b/cognee/modules/data/models/Data.py @@ -36,6 +36,7 @@ class Data(Base): data_size = Column(Integer, nullable=True) # File size in bytes created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)) updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc)) + last_accessed = Column(DateTime(timezone=True), nullable=True) datasets = relationship( "Dataset", diff --git a/cognee/modules/retrieval/utils/access_tracking.py b/cognee/modules/retrieval/utils/access_tracking.py index 79afd25db..621e09e27 100644 --- a/cognee/modules/retrieval/utils/access_tracking.py +++ b/cognee/modules/retrieval/utils/access_tracking.py @@ -1,20 +1,27 @@ - """Utilities for tracking data access in retrievers.""" import json from datetime import datetime, timezone from typing import List, Any +from uuid import UUID from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.infrastructure.databases.relational import get_relational_engine +from cognee.modules.data.models import Data from cognee.shared.logging_utils import get_logger +from sqlalchemy import update logger = get_logger(__name__) async def update_node_access_timestamps(items: List[Any]): """ - Update last_accessed_at for nodes in Kuzu graph database. - Automatically determines node type from the graph database. + Update last_accessed_at for nodes in graph database and corresponding Data records in SQL. + + This function: + 1. Updates last_accessed_at in the graph database nodes (in properties JSON) + 2. Traverses to find origin TextDocument nodes + 3. Updates last_accessed in the SQL Data table for those documents Parameters ---------- @@ -26,39 +33,68 @@ async def update_node_access_timestamps(items: List[Any]): graph_engine = await get_graph_engine() timestamp_ms = int(datetime.now(timezone.utc).timestamp() * 1000) + timestamp_dt = datetime.now(timezone.utc) + # Extract node IDs + node_ids = [] for item in items: - # Extract ID from payload item_id = item.payload.get("id") if hasattr(item, 'payload') else item.get("id") - if not item_id: - continue - - # try: - # Query to get both node type and properties in one call - result = await graph_engine.query( - "MATCH (n:Node {id: $id}) RETURN n.type as node_type, n.properties as props", - {"id": str(item_id)} - ) - - if result and len(result) > 0 and result[0]: - node_type = result[0][0] # First column: node_type - props_json = result[0][1] # Second column: properties - - # Parse existing properties JSON - props = json.loads(props_json) if props_json else {} - # Update last_accessed_at with millisecond timestamp - props["last_accessed_at"] = timestamp_ms - - # Write back to graph database - await graph_engine.query( - "MATCH (n:Node {id: $id}) SET n.properties = $props", - {"id": str(item_id), "props": json.dumps(props)} + if item_id: + node_ids.append(str(item_id)) + + if not node_ids: + return + + try: + # Step 1: Batch update graph nodes + for node_id in node_ids: + result = await graph_engine.query( + "MATCH (n:Node {id: $id}) RETURN n.properties", + {"id": node_id} ) - logger.debug(f"Updated access timestamp for {node_type} node {item_id}") + if result and result[0]: + props = json.loads(result[0][0]) if result[0][0] else {} + props["last_accessed_at"] = timestamp_ms - # except Exception as e: - # logger.error(f"Failed to update timestamp for node {item_id}: {e}") - # continue - - logger.debug(f"Updated access timestamps for {len(items)} nodes") + await graph_engine.query( + "MATCH (n:Node {id: $id}) SET n.properties = $props", + {"id": node_id, "props": json.dumps(props)} + ) + + logger.debug(f"Updated access timestamps for {len(node_ids)} graph nodes") + + # Step 2: Find origin TextDocument nodes + origin_query = """ + UNWIND $node_ids AS node_id + MATCH (n:Node {id: node_id}) + OPTIONAL MATCH (n)-[e:EDGE]-(chunk:Node) + WHERE (e.relationship_name = 'contains' OR e.relationship_name = 'made_from') + AND chunk.type = 'DocumentChunk' + OPTIONAL MATCH (chunk)-[e2:EDGE]->(doc:Node) + WHERE e2.relationship_name = 'is_part_of' + AND doc.type IN ['TextDocument', 'PdfDocument', 'AudioDocument', 'ImageDocument', 'UnstructuredDocument'] + RETURN DISTINCT doc.id as doc_id + """ + + result = await graph_engine.query(origin_query, {"node_ids": node_ids}) + + # Extract document IDs + doc_ids = [row[0] for row in result if row and row[0]] if result else [] + + # Step 3: Update SQL Data table + if doc_ids: + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + stmt = update(Data).where( + Data.id.in_([UUID(doc_id) for doc_id in doc_ids]) + ).values(last_accessed=timestamp_dt) + + await session.execute(stmt) + await session.commit() + + logger.debug(f"Updated last_accessed for {len(doc_ids)} Data records in SQL") + + except Exception as e: + logger.error(f"Failed to update timestamps: {e}") + raise