From 3372679f7bb40c01ffd9e337ead27fe9f8981d54 Mon Sep 17 00:00:00 2001 From: chinu0609 Date: Wed, 29 Oct 2025 20:12:14 +0530 Subject: [PATCH 1/4] feat: adding last_accessed_at field to the models and updating the retrievers to update the timestamp --- .../modules/chunking/models/DocumentChunk.py | 7 +++ cognee/modules/engine/models/Entity.py | 7 ++- cognee/modules/retrieval/chunks_retriever.py | 55 +++++++---------- .../modules/retrieval/summaries_retriever.py | 28 ++++----- .../retrieval/utils/access_tracking.py | 61 +++++++++++++++++++ cognee/tasks/summarization/models.py | 8 ++- 6 files changed, 115 insertions(+), 51 deletions(-) create mode 100644 cognee/modules/retrieval/utils/access_tracking.py diff --git a/cognee/modules/chunking/models/DocumentChunk.py b/cognee/modules/chunking/models/DocumentChunk.py index 9f8c57486..c4c6a2ed3 100644 --- a/cognee/modules/chunking/models/DocumentChunk.py +++ b/cognee/modules/chunking/models/DocumentChunk.py @@ -1,5 +1,7 @@ from typing import List, Union +from pydantic import BaseModel, Field +from datetime import datetime, timezone from cognee.infrastructure.engine import DataPoint from cognee.modules.data.processing.document_types import Document from cognee.modules.engine.models import Entity @@ -22,6 +24,7 @@ class DocumentChunk(DataPoint): - cut_type: The type of cut that defined this chunk. - is_part_of: The document to which this chunk belongs. - contains: A list of entities or events contained within the chunk (default is None). + - last_accessed_at: The timestamp of the last time the chunk was accessed. - metadata: A dictionary to hold meta information related to the chunk, including index fields. """ @@ -32,5 +35,9 @@ class DocumentChunk(DataPoint): cut_type: str is_part_of: Document contains: List[Union[Entity, Event]] = None + + last_accessed_at: int = Field( + default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000) + ) metadata: dict = {"index_fields": ["text"]} diff --git a/cognee/modules/engine/models/Entity.py b/cognee/modules/engine/models/Entity.py index 36da2e344..3e48ea02a 100644 --- a/cognee/modules/engine/models/Entity.py +++ b/cognee/modules/engine/models/Entity.py @@ -1,11 +1,14 @@ from cognee.infrastructure.engine import DataPoint from cognee.modules.engine.models.EntityType import EntityType from typing import Optional - +from datetime import datetime, timezone +from pydantic import BaseModel, Field class Entity(DataPoint): name: str is_a: Optional[EntityType] = None description: str - + last_accessed_at: int = Field( + default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000) + ) metadata: dict = {"index_fields": ["name"]} diff --git a/cognee/modules/retrieval/chunks_retriever.py b/cognee/modules/retrieval/chunks_retriever.py index 94b9d3fb9..74634b71e 100644 --- a/cognee/modules/retrieval/chunks_retriever.py +++ b/cognee/modules/retrieval/chunks_retriever.py @@ -1,10 +1,11 @@ from typing import Any, Optional - +from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps from cognee.shared.logging_utils import get_logger from cognee.infrastructure.databases.vector import get_vector_engine from cognee.modules.retrieval.base_retriever import BaseRetriever from cognee.modules.retrieval.exceptions.exceptions import NoDataError from cognee.infrastructure.databases.vector.exceptions.exceptions import CollectionNotFoundError +from datetime import datetime, timezone logger = get_logger("ChunksRetriever") @@ -27,38 +28,26 @@ class ChunksRetriever(BaseRetriever): ): self.top_k = top_k - async def get_context(self, query: str) -> Any: - """ - Retrieves document chunks context based on the query. - - Searches for document chunks relevant to the specified query using a vector engine. - Raises a NoDataError if no data is found in the system. - - Parameters: - ----------- - - - query (str): The query string to search for relevant document chunks. - - Returns: - -------- - - - Any: A list of document chunk payloads retrieved from the search. - """ - logger.info( - f"Starting chunk retrieval for query: '{query[:100]}{'...' if len(query) > 100 else ''}'" - ) - - vector_engine = get_vector_engine() - - try: - found_chunks = await vector_engine.search("DocumentChunk_text", query, limit=self.top_k) - logger.info(f"Found {len(found_chunks)} chunks from vector search") - except CollectionNotFoundError as error: - logger.error("DocumentChunk_text collection not found in vector database") - raise NoDataError("No data found in the system, please add data first.") from error - - chunk_payloads = [result.payload for result in found_chunks] - logger.info(f"Returning {len(chunk_payloads)} chunk payloads") + async def get_context(self, query: str) -> Any: + """Retrieves document chunks context based on the query.""" + logger.info( + f"Starting chunk retrieval for query: '{query[:100]}{'...' if len(query) > 100 else ''}'" + ) + + vector_engine = get_vector_engine() + + try: + found_chunks = await vector_engine.search("DocumentChunk_text", query, limit=self.top_k) + logger.info(f"Found {len(found_chunks)} chunks from vector search") + + # NEW: Update access timestamps + await update_node_access_timestamps(found_chunks, "DocumentChunk") + except CollectionNotFoundError as error: + logger.error("DocumentChunk_text collection not found in vector database") + raise NoDataError("No data found in the system, please add data first.") from error + + chunk_payloads = [result.payload for result in found_chunks] + logger.info(f"Returning {len(chunk_payloads)} chunk payloads") return chunk_payloads async def get_completion( diff --git a/cognee/modules/retrieval/summaries_retriever.py b/cognee/modules/retrieval/summaries_retriever.py index 87b224946..7f996274e 100644 --- a/cognee/modules/retrieval/summaries_retriever.py +++ b/cognee/modules/retrieval/summaries_retriever.py @@ -4,6 +4,7 @@ from cognee.shared.logging_utils import get_logger from cognee.infrastructure.databases.vector import get_vector_engine from cognee.modules.retrieval.base_retriever import BaseRetriever from cognee.modules.retrieval.exceptions.exceptions import NoDataError +from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps from cognee.infrastructure.databases.vector.exceptions.exceptions import CollectionNotFoundError logger = get_logger("SummariesRetriever") @@ -47,20 +48,19 @@ class SummariesRetriever(BaseRetriever): f"Starting summary retrieval for query: '{query[:100]}{'...' if len(query) > 100 else ''}'" ) - vector_engine = get_vector_engine() - - try: - summaries_results = await vector_engine.search( - "TextSummary_text", query, limit=self.top_k - ) - logger.info(f"Found {len(summaries_results)} summaries from vector search") - except CollectionNotFoundError as error: - logger.error("TextSummary_text collection not found in vector database") - raise NoDataError("No data found in the system, please add data first.") from error - - summary_payloads = [summary.payload for summary in summaries_results] - logger.info(f"Returning {len(summary_payloads)} summary payloads") - return summary_payloads + vector_engine = get_vector_engine() + + try: + summaries_results = await vector_engine.search( + "TextSummary_text", query, limit=self.top_k + ) + + await update_node_access_timestamps(summaries_results, "TextSummary") + + except CollectionNotFoundError as error: + raise NoDataError("No data found in the system, please add data first.") from error + + return [summary.payload for summary in summaries_results] async def get_completion( self, query: str, context: Optional[Any] = None, session_id: Optional[str] = None, **kwargs diff --git a/cognee/modules/retrieval/utils/access_tracking.py b/cognee/modules/retrieval/utils/access_tracking.py new file mode 100644 index 000000000..ca5ed88cd --- /dev/null +++ b/cognee/modules/retrieval/utils/access_tracking.py @@ -0,0 +1,61 @@ + +"""Utilities for tracking data access in retrievers.""" + +import json +from datetime import datetime, timezone +from typing import List, Any + +from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.shared.logging_utils import get_logger + +logger = get_logger(__name__) + + +async def update_node_access_timestamps(items: List[Any], node_type: str): + """ + Update last_accessed_at for nodes in Kuzu graph database. + + Parameters + ---------- + items : List[Any] + List of items with payload containing 'id' field (from vector search results) + node_type : str + Type of node to update (e.g., 'DocumentChunk', 'Entity', 'TextSummary') + """ + if not items: + return + + graph_engine = await get_graph_engine() + # Convert to milliseconds since epoch (matching the field format) + timestamp_ms = int(datetime.now(timezone.utc).timestamp() * 1000) + + for item in items: + # Extract ID from payload (vector search results have this structure) + item_id = item.payload.get("id") if hasattr(item, 'payload') else item.get("id") + if not item_id: + continue + + try: + # Get current node properties from Kuzu's Node table + result = await graph_engine.query( + "MATCH (n:Node {id: $id}) WHERE n.type = $node_type RETURN n.properties as props", + {"id": str(item_id), "node_type": node_type} + ) + + if result and len(result) > 0 and result[0][0]: + # Parse existing properties JSON + props = json.loads(result[0][0]) if result[0][0] else {} + # Update last_accessed_at with millisecond timestamp + props["last_accessed_at"] = timestamp_ms + + # Write back to graph database + await graph_engine.query( + "MATCH (n:Node {id: $id}) WHERE n.type = $node_type SET n.properties = $props", + {"id": str(item_id), "node_type": node_type, "props": json.dumps(props)} + ) + except Exception as e: + logger.warning(f"Failed to update timestamp for {node_type} {item_id}: {e}") + continue + + logger.debug(f"Updated access timestamps for {len(items)} {node_type} nodes") + diff --git a/cognee/tasks/summarization/models.py b/cognee/tasks/summarization/models.py index 75ed82d50..46f9a8d8b 100644 --- a/cognee/tasks/summarization/models.py +++ b/cognee/tasks/summarization/models.py @@ -1,5 +1,7 @@ -from typing import Union +from pydantic import BaseModel, Field +from typing import Union +from datetime import datetime, timezone from cognee.infrastructure.engine import DataPoint from cognee.modules.chunking.models import DocumentChunk from cognee.shared.CodeGraphEntities import CodeFile, CodePart @@ -17,7 +19,9 @@ class TextSummary(DataPoint): text: str made_from: DocumentChunk - + last_accessed_at: int = Field( + default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000) + ) metadata: dict = {"index_fields": ["text"]} From 3f27c5592b58af29369125362510e96b72c56cbc Mon Sep 17 00:00:00 2001 From: chinu0609 Date: Wed, 29 Oct 2025 20:17:27 +0530 Subject: [PATCH 2/4] feat: adding last_accessed_at field to the models and updating the retrievers to update the timestamp --- cognee/modules/retrieval/chunks_retriever.py | 48 +++++++++++-------- .../modules/retrieval/summaries_retriever.py | 28 ++++++----- 2 files changed, 44 insertions(+), 32 deletions(-) diff --git a/cognee/modules/retrieval/chunks_retriever.py b/cognee/modules/retrieval/chunks_retriever.py index 74634b71e..f821fc902 100644 --- a/cognee/modules/retrieval/chunks_retriever.py +++ b/cognee/modules/retrieval/chunks_retriever.py @@ -29,26 +29,34 @@ class ChunksRetriever(BaseRetriever): self.top_k = top_k async def get_context(self, query: str) -> Any: - """Retrieves document chunks context based on the query.""" - logger.info( - f"Starting chunk retrieval for query: '{query[:100]}{'...' if len(query) > 100 else ''}'" - ) - - vector_engine = get_vector_engine() - - try: - found_chunks = await vector_engine.search("DocumentChunk_text", query, limit=self.top_k) - logger.info(f"Found {len(found_chunks)} chunks from vector search") - - # NEW: Update access timestamps - await update_node_access_timestamps(found_chunks, "DocumentChunk") - except CollectionNotFoundError as error: - logger.error("DocumentChunk_text collection not found in vector database") - raise NoDataError("No data found in the system, please add data first.") from error - - chunk_payloads = [result.payload for result in found_chunks] - logger.info(f"Returning {len(chunk_payloads)} chunk payloads") - return chunk_payloads + """ + Retrieves document chunks context based on the query. + Searches for document chunks relevant to the specified query using a vector engine. + Raises a NoDataError if no data is found in the system. + Parameters: + ----------- + - query (str): The query string to search for relevant document chunks. + Returns: + -------- + - Any: A list of document chunk payloads retrieved from the search. + """ + logger.info( + f"Starting chunk retrieval for query: '{query[:100]}{'...' if len(query) > 100 else ''}'" + ) + + vector_engine = get_vector_engine() + + try: + found_chunks = await vector_engine.search("DocumentChunk_text", query, limit=self.top_k) + logger.info(f"Found {len(found_chunks)} chunks from vector search") + await update_node_access_timestamps(found_chunks, "DocumentChunk") + + except CollectionNotFoundError as error: + logger.error("DocumentChunk_text collection not found in vector database") + raise NoDataError("No data found in the system, please add data first.") from error + + chunk_payloads = [result.payload for result in found_chunks] + logger.info(f"Returning {len(chunk_payloads)} chunk payloads") async def get_completion( self, query: str, context: Optional[Any] = None, session_id: Optional[str] = None diff --git a/cognee/modules/retrieval/summaries_retriever.py b/cognee/modules/retrieval/summaries_retriever.py index 7f996274e..9ac8b096d 100644 --- a/cognee/modules/retrieval/summaries_retriever.py +++ b/cognee/modules/retrieval/summaries_retriever.py @@ -48,19 +48,23 @@ class SummariesRetriever(BaseRetriever): f"Starting summary retrieval for query: '{query[:100]}{'...' if len(query) > 100 else ''}'" ) - vector_engine = get_vector_engine() - - try: - summaries_results = await vector_engine.search( - "TextSummary_text", query, limit=self.top_k - ) - + vector_engine = get_vector_engine() + + try: + summaries_results = await vector_engine.search( + "TextSummary_text", query, limit=self.top_k + ) + logger.info(f"Found {len(summaries_results)} summaries from vector search") + await update_node_access_timestamps(summaries_results, "TextSummary") - - except CollectionNotFoundError as error: - raise NoDataError("No data found in the system, please add data first.") from error - - return [summary.payload for summary in summaries_results] + + except CollectionNotFoundError as error: + logger.error("TextSummary_text collection not found in vector database") + raise NoDataError("No data found in the system, please add data first.") from error + + summary_payloads = [summary.payload for summary in summaries_results] + logger.info(f"Returning {len(summary_payloads)} summary payloads") + return summary_payloads async def get_completion( self, query: str, context: Optional[Any] = None, session_id: Optional[str] = None, **kwargs From 5f6f0502c832d129749b453121c6f5be565044bc Mon Sep 17 00:00:00 2001 From: chinu0609 Date: Fri, 31 Oct 2025 00:00:18 +0530 Subject: [PATCH 3/4] fix: removing last_acessed_at from individual model and adding it to DataPoint --- cognee/infrastructure/engine/models/DataPoint.py | 3 +++ cognee/modules/chunking/models/DocumentChunk.py | 5 ----- cognee/modules/engine/models/Entity.py | 3 --- cognee/tasks/summarization/models.py | 3 --- 4 files changed, 3 insertions(+), 11 deletions(-) diff --git a/cognee/infrastructure/engine/models/DataPoint.py b/cognee/infrastructure/engine/models/DataPoint.py index 812380eaa..3178713c8 100644 --- a/cognee/infrastructure/engine/models/DataPoint.py +++ b/cognee/infrastructure/engine/models/DataPoint.py @@ -43,6 +43,9 @@ class DataPoint(BaseModel): updated_at: int = Field( default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000) ) + last_accessed_at: int = Field( + default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000) + ) ontology_valid: bool = False version: int = 1 # Default version topological_rank: Optional[int] = 0 diff --git a/cognee/modules/chunking/models/DocumentChunk.py b/cognee/modules/chunking/models/DocumentChunk.py index c4c6a2ed3..601454802 100644 --- a/cognee/modules/chunking/models/DocumentChunk.py +++ b/cognee/modules/chunking/models/DocumentChunk.py @@ -35,9 +35,4 @@ class DocumentChunk(DataPoint): cut_type: str is_part_of: Document contains: List[Union[Entity, Event]] = None - - last_accessed_at: int = Field( - default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000) - ) - metadata: dict = {"index_fields": ["text"]} diff --git a/cognee/modules/engine/models/Entity.py b/cognee/modules/engine/models/Entity.py index 3e48ea02a..4083cd2e6 100644 --- a/cognee/modules/engine/models/Entity.py +++ b/cognee/modules/engine/models/Entity.py @@ -8,7 +8,4 @@ class Entity(DataPoint): name: str is_a: Optional[EntityType] = None description: str - last_accessed_at: int = Field( - default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000) - ) metadata: dict = {"index_fields": ["name"]} diff --git a/cognee/tasks/summarization/models.py b/cognee/tasks/summarization/models.py index 46f9a8d8b..8cee2ade3 100644 --- a/cognee/tasks/summarization/models.py +++ b/cognee/tasks/summarization/models.py @@ -19,9 +19,6 @@ class TextSummary(DataPoint): text: str made_from: DocumentChunk - last_accessed_at: int = Field( - default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000) - ) metadata: dict = {"index_fields": ["text"]} From 6f06e4a5eb1143ddcb2ad08132486630b8a2deae Mon Sep 17 00:00:00 2001 From: chinu0609 Date: Fri, 31 Oct 2025 00:17:13 +0530 Subject: [PATCH 4/4] fix: removing node_type and try except --- cognee/modules/retrieval/chunks_retriever.py | 2 +- .../modules/retrieval/summaries_retriever.py | 2 +- .../retrieval/utils/access_tracking.py | 55 ++++++++++--------- 3 files changed, 31 insertions(+), 28 deletions(-) diff --git a/cognee/modules/retrieval/chunks_retriever.py b/cognee/modules/retrieval/chunks_retriever.py index f821fc902..be1f95811 100644 --- a/cognee/modules/retrieval/chunks_retriever.py +++ b/cognee/modules/retrieval/chunks_retriever.py @@ -49,7 +49,7 @@ class ChunksRetriever(BaseRetriever): try: found_chunks = await vector_engine.search("DocumentChunk_text", query, limit=self.top_k) logger.info(f"Found {len(found_chunks)} chunks from vector search") - await update_node_access_timestamps(found_chunks, "DocumentChunk") + await update_node_access_timestamps(found_chunks) except CollectionNotFoundError as error: logger.error("DocumentChunk_text collection not found in vector database") diff --git a/cognee/modules/retrieval/summaries_retriever.py b/cognee/modules/retrieval/summaries_retriever.py index 9ac8b096d..0df750d22 100644 --- a/cognee/modules/retrieval/summaries_retriever.py +++ b/cognee/modules/retrieval/summaries_retriever.py @@ -56,7 +56,7 @@ class SummariesRetriever(BaseRetriever): ) logger.info(f"Found {len(summaries_results)} summaries from vector search") - await update_node_access_timestamps(summaries_results, "TextSummary") + await update_node_access_timestamps(summaries_results) except CollectionNotFoundError as error: logger.error("TextSummary_text collection not found in vector database") diff --git a/cognee/modules/retrieval/utils/access_tracking.py b/cognee/modules/retrieval/utils/access_tracking.py index ca5ed88cd..79afd25db 100644 --- a/cognee/modules/retrieval/utils/access_tracking.py +++ b/cognee/modules/retrieval/utils/access_tracking.py @@ -1,4 +1,4 @@ - + """Utilities for tracking data access in retrievers.""" import json @@ -11,51 +11,54 @@ from cognee.shared.logging_utils import get_logger logger = get_logger(__name__) -async def update_node_access_timestamps(items: List[Any], node_type: str): +async def update_node_access_timestamps(items: List[Any]): """ Update last_accessed_at for nodes in Kuzu graph database. + Automatically determines node type from the graph database. Parameters ---------- items : List[Any] List of items with payload containing 'id' field (from vector search results) - node_type : str - Type of node to update (e.g., 'DocumentChunk', 'Entity', 'TextSummary') """ if not items: return graph_engine = await get_graph_engine() - # Convert to milliseconds since epoch (matching the field format) timestamp_ms = int(datetime.now(timezone.utc).timestamp() * 1000) for item in items: - # Extract ID from payload (vector search results have this structure) + # Extract ID from payload item_id = item.payload.get("id") if hasattr(item, 'payload') else item.get("id") if not item_id: continue - try: - # Get current node properties from Kuzu's Node table - result = await graph_engine.query( - "MATCH (n:Node {id: $id}) WHERE n.type = $node_type RETURN n.properties as props", - {"id": str(item_id), "node_type": node_type} + # try: + # Query to get both node type and properties in one call + result = await graph_engine.query( + "MATCH (n:Node {id: $id}) RETURN n.type as node_type, n.properties as props", + {"id": str(item_id)} + ) + + if result and len(result) > 0 and result[0]: + node_type = result[0][0] # First column: node_type + props_json = result[0][1] # Second column: properties + + # Parse existing properties JSON + props = json.loads(props_json) if props_json else {} + # Update last_accessed_at with millisecond timestamp + props["last_accessed_at"] = timestamp_ms + + # Write back to graph database + await graph_engine.query( + "MATCH (n:Node {id: $id}) SET n.properties = $props", + {"id": str(item_id), "props": json.dumps(props)} ) - if result and len(result) > 0 and result[0][0]: - # Parse existing properties JSON - props = json.loads(result[0][0]) if result[0][0] else {} - # Update last_accessed_at with millisecond timestamp - props["last_accessed_at"] = timestamp_ms + logger.debug(f"Updated access timestamp for {node_type} node {item_id}") - # Write back to graph database - await graph_engine.query( - "MATCH (n:Node {id: $id}) WHERE n.type = $node_type SET n.properties = $props", - {"id": str(item_id), "node_type": node_type, "props": json.dumps(props)} - ) - except Exception as e: - logger.warning(f"Failed to update timestamp for {node_type} {item_id}: {e}") - continue + # except Exception as e: + # logger.error(f"Failed to update timestamp for node {item_id}: {e}") + # continue - logger.debug(f"Updated access timestamps for {len(items)} {node_type} nodes") - + logger.debug(f"Updated access timestamps for {len(items)} nodes")