diff --git a/cognee/infrastructure/engine/models/DataPoint.py b/cognee/infrastructure/engine/models/DataPoint.py index 812380eaa..3178713c8 100644 --- a/cognee/infrastructure/engine/models/DataPoint.py +++ b/cognee/infrastructure/engine/models/DataPoint.py @@ -43,6 +43,9 @@ class DataPoint(BaseModel): updated_at: int = Field( default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000) ) + last_accessed_at: int = Field( + default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000) + ) ontology_valid: bool = False version: int = 1 # Default version topological_rank: Optional[int] = 0 diff --git a/cognee/modules/chunking/models/DocumentChunk.py b/cognee/modules/chunking/models/DocumentChunk.py index 9f8c57486..601454802 100644 --- a/cognee/modules/chunking/models/DocumentChunk.py +++ b/cognee/modules/chunking/models/DocumentChunk.py @@ -1,5 +1,7 @@ from typing import List, Union +from pydantic import BaseModel, Field +from datetime import datetime, timezone from cognee.infrastructure.engine import DataPoint from cognee.modules.data.processing.document_types import Document from cognee.modules.engine.models import Entity @@ -22,6 +24,7 @@ class DocumentChunk(DataPoint): - cut_type: The type of cut that defined this chunk. - is_part_of: The document to which this chunk belongs. - contains: A list of entities or events contained within the chunk (default is None). + - last_accessed_at: The timestamp of the last time the chunk was accessed. - metadata: A dictionary to hold meta information related to the chunk, including index fields. """ @@ -32,5 +35,4 @@ class DocumentChunk(DataPoint): cut_type: str is_part_of: Document contains: List[Union[Entity, Event]] = None - metadata: dict = {"index_fields": ["text"]} diff --git a/cognee/modules/engine/models/Entity.py b/cognee/modules/engine/models/Entity.py index 36da2e344..4083cd2e6 100644 --- a/cognee/modules/engine/models/Entity.py +++ b/cognee/modules/engine/models/Entity.py @@ -1,11 +1,11 @@ from cognee.infrastructure.engine import DataPoint from cognee.modules.engine.models.EntityType import EntityType from typing import Optional - +from datetime import datetime, timezone +from pydantic import BaseModel, Field class Entity(DataPoint): name: str is_a: Optional[EntityType] = None description: str - metadata: dict = {"index_fields": ["name"]} diff --git a/cognee/modules/retrieval/chunks_retriever.py b/cognee/modules/retrieval/chunks_retriever.py index 94b9d3fb9..be1f95811 100644 --- a/cognee/modules/retrieval/chunks_retriever.py +++ b/cognee/modules/retrieval/chunks_retriever.py @@ -1,10 +1,11 @@ from typing import Any, Optional - +from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps from cognee.shared.logging_utils import get_logger from cognee.infrastructure.databases.vector import get_vector_engine from cognee.modules.retrieval.base_retriever import BaseRetriever from cognee.modules.retrieval.exceptions.exceptions import NoDataError from cognee.infrastructure.databases.vector.exceptions.exceptions import CollectionNotFoundError +from datetime import datetime, timezone logger = get_logger("ChunksRetriever") @@ -27,21 +28,16 @@ class ChunksRetriever(BaseRetriever): ): self.top_k = top_k - async def get_context(self, query: str) -> Any: + async def get_context(self, query: str) -> Any: """ Retrieves document chunks context based on the query. - Searches for document chunks relevant to the specified query using a vector engine. Raises a NoDataError if no data is found in the system. - Parameters: ----------- - - query (str): The query string to search for relevant document chunks. - Returns: -------- - - Any: A list of document chunk payloads retrieved from the search. """ logger.info( @@ -53,13 +49,14 @@ class ChunksRetriever(BaseRetriever): try: found_chunks = await vector_engine.search("DocumentChunk_text", query, limit=self.top_k) logger.info(f"Found {len(found_chunks)} chunks from vector search") + await update_node_access_timestamps(found_chunks) + except CollectionNotFoundError as error: logger.error("DocumentChunk_text collection not found in vector database") raise NoDataError("No data found in the system, please add data first.") from error chunk_payloads = [result.payload for result in found_chunks] logger.info(f"Returning {len(chunk_payloads)} chunk payloads") - return chunk_payloads async def get_completion( self, query: str, context: Optional[Any] = None, session_id: Optional[str] = None diff --git a/cognee/modules/retrieval/summaries_retriever.py b/cognee/modules/retrieval/summaries_retriever.py index 87b224946..0df750d22 100644 --- a/cognee/modules/retrieval/summaries_retriever.py +++ b/cognee/modules/retrieval/summaries_retriever.py @@ -4,6 +4,7 @@ from cognee.shared.logging_utils import get_logger from cognee.infrastructure.databases.vector import get_vector_engine from cognee.modules.retrieval.base_retriever import BaseRetriever from cognee.modules.retrieval.exceptions.exceptions import NoDataError +from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps from cognee.infrastructure.databases.vector.exceptions.exceptions import CollectionNotFoundError logger = get_logger("SummariesRetriever") @@ -54,6 +55,9 @@ class SummariesRetriever(BaseRetriever): "TextSummary_text", query, limit=self.top_k ) logger.info(f"Found {len(summaries_results)} summaries from vector search") + + await update_node_access_timestamps(summaries_results) + except CollectionNotFoundError as error: logger.error("TextSummary_text collection not found in vector database") raise NoDataError("No data found in the system, please add data first.") from error diff --git a/cognee/modules/retrieval/utils/access_tracking.py b/cognee/modules/retrieval/utils/access_tracking.py new file mode 100644 index 000000000..79afd25db --- /dev/null +++ b/cognee/modules/retrieval/utils/access_tracking.py @@ -0,0 +1,64 @@ + +"""Utilities for tracking data access in retrievers.""" + +import json +from datetime import datetime, timezone +from typing import List, Any + +from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.shared.logging_utils import get_logger + +logger = get_logger(__name__) + + +async def update_node_access_timestamps(items: List[Any]): + """ + Update last_accessed_at for nodes in Kuzu graph database. + Automatically determines node type from the graph database. + + Parameters + ---------- + items : List[Any] + List of items with payload containing 'id' field (from vector search results) + """ + if not items: + return + + graph_engine = await get_graph_engine() + timestamp_ms = int(datetime.now(timezone.utc).timestamp() * 1000) + + for item in items: + # Extract ID from payload + item_id = item.payload.get("id") if hasattr(item, 'payload') else item.get("id") + if not item_id: + continue + + # try: + # Query to get both node type and properties in one call + result = await graph_engine.query( + "MATCH (n:Node {id: $id}) RETURN n.type as node_type, n.properties as props", + {"id": str(item_id)} + ) + + if result and len(result) > 0 and result[0]: + node_type = result[0][0] # First column: node_type + props_json = result[0][1] # Second column: properties + + # Parse existing properties JSON + props = json.loads(props_json) if props_json else {} + # Update last_accessed_at with millisecond timestamp + props["last_accessed_at"] = timestamp_ms + + # Write back to graph database + await graph_engine.query( + "MATCH (n:Node {id: $id}) SET n.properties = $props", + {"id": str(item_id), "props": json.dumps(props)} + ) + + logger.debug(f"Updated access timestamp for {node_type} node {item_id}") + + # except Exception as e: + # logger.error(f"Failed to update timestamp for node {item_id}: {e}") + # continue + + logger.debug(f"Updated access timestamps for {len(items)} nodes") diff --git a/cognee/tasks/summarization/models.py b/cognee/tasks/summarization/models.py index 75ed82d50..8cee2ade3 100644 --- a/cognee/tasks/summarization/models.py +++ b/cognee/tasks/summarization/models.py @@ -1,5 +1,7 @@ -from typing import Union +from pydantic import BaseModel, Field +from typing import Union +from datetime import datetime, timezone from cognee.infrastructure.engine import DataPoint from cognee.modules.chunking.models import DocumentChunk from cognee.shared.CodeGraphEntities import CodeFile, CodePart @@ -17,7 +19,6 @@ class TextSummary(DataPoint): text: str made_from: DocumentChunk - metadata: dict = {"index_fields": ["text"]}