feat: adding last_acessed in the Data model
This commit is contained in:
parent
5080e8f8a5
commit
d34fd9237b
3 changed files with 100 additions and 33 deletions
30
alembic/versions/e1ec1dcb50b6_add_last_accessed_to_data.py
Normal file
30
alembic/versions/e1ec1dcb50b6_add_last_accessed_to_data.py
Normal file
|
|
@ -0,0 +1,30 @@
|
||||||
|
"""add_last_accessed_to_data
|
||||||
|
|
||||||
|
Revision ID: e1ec1dcb50b6
|
||||||
|
Revises: 211ab850ef3d
|
||||||
|
Create Date: 2025-11-04 21:45:52.642322
|
||||||
|
|
||||||
|
"""
|
||||||
|
from typing import Sequence, Union
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision: str = 'e1ec1dcb50b6'
|
||||||
|
down_revision: Union[str, None] = '211ab850ef3d'
|
||||||
|
branch_labels: Union[str, Sequence[str], None] = None
|
||||||
|
depends_on: Union[str, Sequence[str], None] = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
op.add_column('data',
|
||||||
|
sa.Column('last_accessed', sa.DateTime(timezone=True), nullable=True)
|
||||||
|
)
|
||||||
|
# Optionally initialize with created_at values for existing records
|
||||||
|
op.execute("UPDATE data SET last_accessed = created_at")
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
op.drop_column('data', 'last_accessed')
|
||||||
|
|
@ -36,6 +36,7 @@ class Data(Base):
|
||||||
data_size = Column(Integer, nullable=True) # File size in bytes
|
data_size = Column(Integer, nullable=True) # File size in bytes
|
||||||
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
|
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
|
||||||
updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc))
|
updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc))
|
||||||
|
last_accessed = Column(DateTime(timezone=True), nullable=True)
|
||||||
|
|
||||||
datasets = relationship(
|
datasets = relationship(
|
||||||
"Dataset",
|
"Dataset",
|
||||||
|
|
|
||||||
|
|
@ -1,20 +1,27 @@
|
||||||
|
|
||||||
"""Utilities for tracking data access in retrievers."""
|
"""Utilities for tracking data access in retrievers."""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from typing import List, Any
|
from typing import List, Any
|
||||||
|
from uuid import UUID
|
||||||
|
|
||||||
from cognee.infrastructure.databases.graph import get_graph_engine
|
from cognee.infrastructure.databases.graph import get_graph_engine
|
||||||
|
from cognee.infrastructure.databases.relational import get_relational_engine
|
||||||
|
from cognee.modules.data.models import Data
|
||||||
from cognee.shared.logging_utils import get_logger
|
from cognee.shared.logging_utils import get_logger
|
||||||
|
from sqlalchemy import update
|
||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
async def update_node_access_timestamps(items: List[Any]):
|
async def update_node_access_timestamps(items: List[Any]):
|
||||||
"""
|
"""
|
||||||
Update last_accessed_at for nodes in Kuzu graph database.
|
Update last_accessed_at for nodes in graph database and corresponding Data records in SQL.
|
||||||
Automatically determines node type from the graph database.
|
|
||||||
|
This function:
|
||||||
|
1. Updates last_accessed_at in the graph database nodes (in properties JSON)
|
||||||
|
2. Traverses to find origin TextDocument nodes
|
||||||
|
3. Updates last_accessed in the SQL Data table for those documents
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
----------
|
----------
|
||||||
|
|
@ -26,39 +33,68 @@ async def update_node_access_timestamps(items: List[Any]):
|
||||||
|
|
||||||
graph_engine = await get_graph_engine()
|
graph_engine = await get_graph_engine()
|
||||||
timestamp_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
|
timestamp_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
|
||||||
|
timestamp_dt = datetime.now(timezone.utc)
|
||||||
|
|
||||||
|
# Extract node IDs
|
||||||
|
node_ids = []
|
||||||
for item in items:
|
for item in items:
|
||||||
# Extract ID from payload
|
|
||||||
item_id = item.payload.get("id") if hasattr(item, 'payload') else item.get("id")
|
item_id = item.payload.get("id") if hasattr(item, 'payload') else item.get("id")
|
||||||
if not item_id:
|
if item_id:
|
||||||
continue
|
node_ids.append(str(item_id))
|
||||||
|
|
||||||
# try:
|
if not node_ids:
|
||||||
# Query to get both node type and properties in one call
|
return
|
||||||
result = await graph_engine.query(
|
|
||||||
"MATCH (n:Node {id: $id}) RETURN n.type as node_type, n.properties as props",
|
try:
|
||||||
{"id": str(item_id)}
|
# Step 1: Batch update graph nodes
|
||||||
)
|
for node_id in node_ids:
|
||||||
|
result = await graph_engine.query(
|
||||||
if result and len(result) > 0 and result[0]:
|
"MATCH (n:Node {id: $id}) RETURN n.properties",
|
||||||
node_type = result[0][0] # First column: node_type
|
{"id": node_id}
|
||||||
props_json = result[0][1] # Second column: properties
|
|
||||||
|
|
||||||
# Parse existing properties JSON
|
|
||||||
props = json.loads(props_json) if props_json else {}
|
|
||||||
# Update last_accessed_at with millisecond timestamp
|
|
||||||
props["last_accessed_at"] = timestamp_ms
|
|
||||||
|
|
||||||
# Write back to graph database
|
|
||||||
await graph_engine.query(
|
|
||||||
"MATCH (n:Node {id: $id}) SET n.properties = $props",
|
|
||||||
{"id": str(item_id), "props": json.dumps(props)}
|
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.debug(f"Updated access timestamp for {node_type} node {item_id}")
|
if result and result[0]:
|
||||||
|
props = json.loads(result[0][0]) if result[0][0] else {}
|
||||||
|
props["last_accessed_at"] = timestamp_ms
|
||||||
|
|
||||||
# except Exception as e:
|
await graph_engine.query(
|
||||||
# logger.error(f"Failed to update timestamp for node {item_id}: {e}")
|
"MATCH (n:Node {id: $id}) SET n.properties = $props",
|
||||||
# continue
|
{"id": node_id, "props": json.dumps(props)}
|
||||||
|
)
|
||||||
logger.debug(f"Updated access timestamps for {len(items)} nodes")
|
|
||||||
|
logger.debug(f"Updated access timestamps for {len(node_ids)} graph nodes")
|
||||||
|
|
||||||
|
# Step 2: Find origin TextDocument nodes
|
||||||
|
origin_query = """
|
||||||
|
UNWIND $node_ids AS node_id
|
||||||
|
MATCH (n:Node {id: node_id})
|
||||||
|
OPTIONAL MATCH (n)-[e:EDGE]-(chunk:Node)
|
||||||
|
WHERE (e.relationship_name = 'contains' OR e.relationship_name = 'made_from')
|
||||||
|
AND chunk.type = 'DocumentChunk'
|
||||||
|
OPTIONAL MATCH (chunk)-[e2:EDGE]->(doc:Node)
|
||||||
|
WHERE e2.relationship_name = 'is_part_of'
|
||||||
|
AND doc.type IN ['TextDocument', 'PdfDocument', 'AudioDocument', 'ImageDocument', 'UnstructuredDocument']
|
||||||
|
RETURN DISTINCT doc.id as doc_id
|
||||||
|
"""
|
||||||
|
|
||||||
|
result = await graph_engine.query(origin_query, {"node_ids": node_ids})
|
||||||
|
|
||||||
|
# Extract document IDs
|
||||||
|
doc_ids = [row[0] for row in result if row and row[0]] if result else []
|
||||||
|
|
||||||
|
# Step 3: Update SQL Data table
|
||||||
|
if doc_ids:
|
||||||
|
db_engine = get_relational_engine()
|
||||||
|
async with db_engine.get_async_session() as session:
|
||||||
|
stmt = update(Data).where(
|
||||||
|
Data.id.in_([UUID(doc_id) for doc_id in doc_ids])
|
||||||
|
).values(last_accessed=timestamp_dt)
|
||||||
|
|
||||||
|
await session.execute(stmt)
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
logger.debug(f"Updated last_accessed for {len(doc_ids)} Data records in SQL")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to update timestamps: {e}")
|
||||||
|
raise
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue