From 3372679f7bb40c01ffd9e337ead27fe9f8981d54 Mon Sep 17 00:00:00 2001 From: chinu0609 Date: Wed, 29 Oct 2025 20:12:14 +0530 Subject: [PATCH] feat: adding last_accessed_at field to the models and updating the retrievers to update the timestamp --- .../modules/chunking/models/DocumentChunk.py | 7 +++ cognee/modules/engine/models/Entity.py | 7 ++- cognee/modules/retrieval/chunks_retriever.py | 55 +++++++---------- .../modules/retrieval/summaries_retriever.py | 28 ++++----- .../retrieval/utils/access_tracking.py | 61 +++++++++++++++++++ cognee/tasks/summarization/models.py | 8 ++- 6 files changed, 115 insertions(+), 51 deletions(-) create mode 100644 cognee/modules/retrieval/utils/access_tracking.py diff --git a/cognee/modules/chunking/models/DocumentChunk.py b/cognee/modules/chunking/models/DocumentChunk.py index 9f8c57486..c4c6a2ed3 100644 --- a/cognee/modules/chunking/models/DocumentChunk.py +++ b/cognee/modules/chunking/models/DocumentChunk.py @@ -1,5 +1,7 @@ from typing import List, Union +from pydantic import BaseModel, Field +from datetime import datetime, timezone from cognee.infrastructure.engine import DataPoint from cognee.modules.data.processing.document_types import Document from cognee.modules.engine.models import Entity @@ -22,6 +24,7 @@ class DocumentChunk(DataPoint): - cut_type: The type of cut that defined this chunk. - is_part_of: The document to which this chunk belongs. - contains: A list of entities or events contained within the chunk (default is None). + - last_accessed_at: The timestamp of the last time the chunk was accessed. - metadata: A dictionary to hold meta information related to the chunk, including index fields. """ @@ -32,5 +35,9 @@ class DocumentChunk(DataPoint): cut_type: str is_part_of: Document contains: List[Union[Entity, Event]] = None + + last_accessed_at: int = Field( + default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000) + ) metadata: dict = {"index_fields": ["text"]} diff --git a/cognee/modules/engine/models/Entity.py b/cognee/modules/engine/models/Entity.py index 36da2e344..3e48ea02a 100644 --- a/cognee/modules/engine/models/Entity.py +++ b/cognee/modules/engine/models/Entity.py @@ -1,11 +1,14 @@ from cognee.infrastructure.engine import DataPoint from cognee.modules.engine.models.EntityType import EntityType from typing import Optional - +from datetime import datetime, timezone +from pydantic import BaseModel, Field class Entity(DataPoint): name: str is_a: Optional[EntityType] = None description: str - + last_accessed_at: int = Field( + default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000) + ) metadata: dict = {"index_fields": ["name"]} diff --git a/cognee/modules/retrieval/chunks_retriever.py b/cognee/modules/retrieval/chunks_retriever.py index 94b9d3fb9..74634b71e 100644 --- a/cognee/modules/retrieval/chunks_retriever.py +++ b/cognee/modules/retrieval/chunks_retriever.py @@ -1,10 +1,11 @@ from typing import Any, Optional - +from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps from cognee.shared.logging_utils import get_logger from cognee.infrastructure.databases.vector import get_vector_engine from cognee.modules.retrieval.base_retriever import BaseRetriever from cognee.modules.retrieval.exceptions.exceptions import NoDataError from cognee.infrastructure.databases.vector.exceptions.exceptions import CollectionNotFoundError +from datetime import datetime, timezone logger = get_logger("ChunksRetriever") @@ -27,38 +28,26 @@ class ChunksRetriever(BaseRetriever): ): self.top_k = top_k - async def get_context(self, query: str) -> Any: - """ - Retrieves document chunks context based on the query. - - Searches for document chunks relevant to the specified query using a vector engine. - Raises a NoDataError if no data is found in the system. - - Parameters: - ----------- - - - query (str): The query string to search for relevant document chunks. - - Returns: - -------- - - - Any: A list of document chunk payloads retrieved from the search. - """ - logger.info( - f"Starting chunk retrieval for query: '{query[:100]}{'...' if len(query) > 100 else ''}'" - ) - - vector_engine = get_vector_engine() - - try: - found_chunks = await vector_engine.search("DocumentChunk_text", query, limit=self.top_k) - logger.info(f"Found {len(found_chunks)} chunks from vector search") - except CollectionNotFoundError as error: - logger.error("DocumentChunk_text collection not found in vector database") - raise NoDataError("No data found in the system, please add data first.") from error - - chunk_payloads = [result.payload for result in found_chunks] - logger.info(f"Returning {len(chunk_payloads)} chunk payloads") + async def get_context(self, query: str) -> Any: + """Retrieves document chunks context based on the query.""" + logger.info( + f"Starting chunk retrieval for query: '{query[:100]}{'...' if len(query) > 100 else ''}'" + ) + + vector_engine = get_vector_engine() + + try: + found_chunks = await vector_engine.search("DocumentChunk_text", query, limit=self.top_k) + logger.info(f"Found {len(found_chunks)} chunks from vector search") + + # NEW: Update access timestamps + await update_node_access_timestamps(found_chunks, "DocumentChunk") + except CollectionNotFoundError as error: + logger.error("DocumentChunk_text collection not found in vector database") + raise NoDataError("No data found in the system, please add data first.") from error + + chunk_payloads = [result.payload for result in found_chunks] + logger.info(f"Returning {len(chunk_payloads)} chunk payloads") return chunk_payloads async def get_completion( diff --git a/cognee/modules/retrieval/summaries_retriever.py b/cognee/modules/retrieval/summaries_retriever.py index 87b224946..7f996274e 100644 --- a/cognee/modules/retrieval/summaries_retriever.py +++ b/cognee/modules/retrieval/summaries_retriever.py @@ -4,6 +4,7 @@ from cognee.shared.logging_utils import get_logger from cognee.infrastructure.databases.vector import get_vector_engine from cognee.modules.retrieval.base_retriever import BaseRetriever from cognee.modules.retrieval.exceptions.exceptions import NoDataError +from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps from cognee.infrastructure.databases.vector.exceptions.exceptions import CollectionNotFoundError logger = get_logger("SummariesRetriever") @@ -47,20 +48,19 @@ class SummariesRetriever(BaseRetriever): f"Starting summary retrieval for query: '{query[:100]}{'...' if len(query) > 100 else ''}'" ) - vector_engine = get_vector_engine() - - try: - summaries_results = await vector_engine.search( - "TextSummary_text", query, limit=self.top_k - ) - logger.info(f"Found {len(summaries_results)} summaries from vector search") - except CollectionNotFoundError as error: - logger.error("TextSummary_text collection not found in vector database") - raise NoDataError("No data found in the system, please add data first.") from error - - summary_payloads = [summary.payload for summary in summaries_results] - logger.info(f"Returning {len(summary_payloads)} summary payloads") - return summary_payloads + vector_engine = get_vector_engine() + + try: + summaries_results = await vector_engine.search( + "TextSummary_text", query, limit=self.top_k + ) + + await update_node_access_timestamps(summaries_results, "TextSummary") + + except CollectionNotFoundError as error: + raise NoDataError("No data found in the system, please add data first.") from error + + return [summary.payload for summary in summaries_results] async def get_completion( self, query: str, context: Optional[Any] = None, session_id: Optional[str] = None, **kwargs diff --git a/cognee/modules/retrieval/utils/access_tracking.py b/cognee/modules/retrieval/utils/access_tracking.py new file mode 100644 index 000000000..ca5ed88cd --- /dev/null +++ b/cognee/modules/retrieval/utils/access_tracking.py @@ -0,0 +1,61 @@ + +"""Utilities for tracking data access in retrievers.""" + +import json +from datetime import datetime, timezone +from typing import List, Any + +from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.shared.logging_utils import get_logger + +logger = get_logger(__name__) + + +async def update_node_access_timestamps(items: List[Any], node_type: str): + """ + Update last_accessed_at for nodes in Kuzu graph database. + + Parameters + ---------- + items : List[Any] + List of items with payload containing 'id' field (from vector search results) + node_type : str + Type of node to update (e.g., 'DocumentChunk', 'Entity', 'TextSummary') + """ + if not items: + return + + graph_engine = await get_graph_engine() + # Convert to milliseconds since epoch (matching the field format) + timestamp_ms = int(datetime.now(timezone.utc).timestamp() * 1000) + + for item in items: + # Extract ID from payload (vector search results have this structure) + item_id = item.payload.get("id") if hasattr(item, 'payload') else item.get("id") + if not item_id: + continue + + try: + # Get current node properties from Kuzu's Node table + result = await graph_engine.query( + "MATCH (n:Node {id: $id}) WHERE n.type = $node_type RETURN n.properties as props", + {"id": str(item_id), "node_type": node_type} + ) + + if result and len(result) > 0 and result[0][0]: + # Parse existing properties JSON + props = json.loads(result[0][0]) if result[0][0] else {} + # Update last_accessed_at with millisecond timestamp + props["last_accessed_at"] = timestamp_ms + + # Write back to graph database + await graph_engine.query( + "MATCH (n:Node {id: $id}) WHERE n.type = $node_type SET n.properties = $props", + {"id": str(item_id), "node_type": node_type, "props": json.dumps(props)} + ) + except Exception as e: + logger.warning(f"Failed to update timestamp for {node_type} {item_id}: {e}") + continue + + logger.debug(f"Updated access timestamps for {len(items)} {node_type} nodes") + diff --git a/cognee/tasks/summarization/models.py b/cognee/tasks/summarization/models.py index 75ed82d50..46f9a8d8b 100644 --- a/cognee/tasks/summarization/models.py +++ b/cognee/tasks/summarization/models.py @@ -1,5 +1,7 @@ -from typing import Union +from pydantic import BaseModel, Field +from typing import Union +from datetime import datetime, timezone from cognee.infrastructure.engine import DataPoint from cognee.modules.chunking.models import DocumentChunk from cognee.shared.CodeGraphEntities import CodeFile, CodePart @@ -17,7 +19,9 @@ class TextSummary(DataPoint): text: str made_from: DocumentChunk - + last_accessed_at: int = Field( + default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000) + ) metadata: dict = {"index_fields": ["text"]}