feat: adding last_accessed_at field to the models and updating the retrievers to update the timestamp

This commit is contained in:
chinu0609 2025-10-29 20:12:14 +05:30
parent 283c67546d
commit 3372679f7b
6 changed files with 115 additions and 51 deletions

View file

@ -1,5 +1,7 @@
from typing import List, Union from typing import List, Union
from pydantic import BaseModel, Field
from datetime import datetime, timezone
from cognee.infrastructure.engine import DataPoint from cognee.infrastructure.engine import DataPoint
from cognee.modules.data.processing.document_types import Document from cognee.modules.data.processing.document_types import Document
from cognee.modules.engine.models import Entity from cognee.modules.engine.models import Entity
@ -22,6 +24,7 @@ class DocumentChunk(DataPoint):
- cut_type: The type of cut that defined this chunk. - cut_type: The type of cut that defined this chunk.
- is_part_of: The document to which this chunk belongs. - is_part_of: The document to which this chunk belongs.
- contains: A list of entities or events contained within the chunk (default is None). - contains: A list of entities or events contained within the chunk (default is None).
- last_accessed_at: The timestamp of the last time the chunk was accessed.
- metadata: A dictionary to hold meta information related to the chunk, including index - metadata: A dictionary to hold meta information related to the chunk, including index
fields. fields.
""" """
@ -32,5 +35,9 @@ class DocumentChunk(DataPoint):
cut_type: str cut_type: str
is_part_of: Document is_part_of: Document
contains: List[Union[Entity, Event]] = None contains: List[Union[Entity, Event]] = None
last_accessed_at: int = Field(
default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000)
)
metadata: dict = {"index_fields": ["text"]} metadata: dict = {"index_fields": ["text"]}

View file

@ -1,11 +1,14 @@
from cognee.infrastructure.engine import DataPoint from cognee.infrastructure.engine import DataPoint
from cognee.modules.engine.models.EntityType import EntityType from cognee.modules.engine.models.EntityType import EntityType
from typing import Optional from typing import Optional
from datetime import datetime, timezone
from pydantic import BaseModel, Field
class Entity(DataPoint): class Entity(DataPoint):
name: str name: str
is_a: Optional[EntityType] = None is_a: Optional[EntityType] = None
description: str description: str
last_accessed_at: int = Field(
default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000)
)
metadata: dict = {"index_fields": ["name"]} metadata: dict = {"index_fields": ["name"]}

View file

@ -1,10 +1,11 @@
from typing import Any, Optional from typing import Any, Optional
from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps
from cognee.shared.logging_utils import get_logger from cognee.shared.logging_utils import get_logger
from cognee.infrastructure.databases.vector import get_vector_engine from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.modules.retrieval.base_retriever import BaseRetriever from cognee.modules.retrieval.base_retriever import BaseRetriever
from cognee.modules.retrieval.exceptions.exceptions import NoDataError from cognee.modules.retrieval.exceptions.exceptions import NoDataError
from cognee.infrastructure.databases.vector.exceptions.exceptions import CollectionNotFoundError from cognee.infrastructure.databases.vector.exceptions.exceptions import CollectionNotFoundError
from datetime import datetime, timezone
logger = get_logger("ChunksRetriever") logger = get_logger("ChunksRetriever")
@ -27,38 +28,26 @@ class ChunksRetriever(BaseRetriever):
): ):
self.top_k = top_k self.top_k = top_k
async def get_context(self, query: str) -> Any: async def get_context(self, query: str) -> Any:
""" """Retrieves document chunks context based on the query."""
Retrieves document chunks context based on the query. logger.info(
f"Starting chunk retrieval for query: '{query[:100]}{'...' if len(query) > 100 else ''}'"
Searches for document chunks relevant to the specified query using a vector engine. )
Raises a NoDataError if no data is found in the system.
vector_engine = get_vector_engine()
Parameters:
----------- try:
found_chunks = await vector_engine.search("DocumentChunk_text", query, limit=self.top_k)
- query (str): The query string to search for relevant document chunks. logger.info(f"Found {len(found_chunks)} chunks from vector search")
Returns: # NEW: Update access timestamps
-------- await update_node_access_timestamps(found_chunks, "DocumentChunk")
except CollectionNotFoundError as error:
- Any: A list of document chunk payloads retrieved from the search. logger.error("DocumentChunk_text collection not found in vector database")
""" raise NoDataError("No data found in the system, please add data first.") from error
logger.info(
f"Starting chunk retrieval for query: '{query[:100]}{'...' if len(query) > 100 else ''}'" chunk_payloads = [result.payload for result in found_chunks]
) logger.info(f"Returning {len(chunk_payloads)} chunk payloads")
vector_engine = get_vector_engine()
try:
found_chunks = await vector_engine.search("DocumentChunk_text", query, limit=self.top_k)
logger.info(f"Found {len(found_chunks)} chunks from vector search")
except CollectionNotFoundError as error:
logger.error("DocumentChunk_text collection not found in vector database")
raise NoDataError("No data found in the system, please add data first.") from error
chunk_payloads = [result.payload for result in found_chunks]
logger.info(f"Returning {len(chunk_payloads)} chunk payloads")
return chunk_payloads return chunk_payloads
async def get_completion( async def get_completion(

View file

@ -4,6 +4,7 @@ from cognee.shared.logging_utils import get_logger
from cognee.infrastructure.databases.vector import get_vector_engine from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.modules.retrieval.base_retriever import BaseRetriever from cognee.modules.retrieval.base_retriever import BaseRetriever
from cognee.modules.retrieval.exceptions.exceptions import NoDataError from cognee.modules.retrieval.exceptions.exceptions import NoDataError
from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps
from cognee.infrastructure.databases.vector.exceptions.exceptions import CollectionNotFoundError from cognee.infrastructure.databases.vector.exceptions.exceptions import CollectionNotFoundError
logger = get_logger("SummariesRetriever") logger = get_logger("SummariesRetriever")
@ -47,20 +48,19 @@ class SummariesRetriever(BaseRetriever):
f"Starting summary retrieval for query: '{query[:100]}{'...' if len(query) > 100 else ''}'" f"Starting summary retrieval for query: '{query[:100]}{'...' if len(query) > 100 else ''}'"
) )
vector_engine = get_vector_engine() vector_engine = get_vector_engine()
try: try:
summaries_results = await vector_engine.search( summaries_results = await vector_engine.search(
"TextSummary_text", query, limit=self.top_k "TextSummary_text", query, limit=self.top_k
) )
logger.info(f"Found {len(summaries_results)} summaries from vector search")
except CollectionNotFoundError as error: await update_node_access_timestamps(summaries_results, "TextSummary")
logger.error("TextSummary_text collection not found in vector database")
raise NoDataError("No data found in the system, please add data first.") from error except CollectionNotFoundError as error:
raise NoDataError("No data found in the system, please add data first.") from error
summary_payloads = [summary.payload for summary in summaries_results]
logger.info(f"Returning {len(summary_payloads)} summary payloads") return [summary.payload for summary in summaries_results]
return summary_payloads
async def get_completion( async def get_completion(
self, query: str, context: Optional[Any] = None, session_id: Optional[str] = None, **kwargs self, query: str, context: Optional[Any] = None, session_id: Optional[str] = None, **kwargs

View file

@ -0,0 +1,61 @@
"""Utilities for tracking data access in retrievers."""
import json
from datetime import datetime, timezone
from typing import List, Any
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.shared.logging_utils import get_logger
logger = get_logger(__name__)
async def update_node_access_timestamps(items: List[Any], node_type: str):
"""
Update last_accessed_at for nodes in Kuzu graph database.
Parameters
----------
items : List[Any]
List of items with payload containing 'id' field (from vector search results)
node_type : str
Type of node to update (e.g., 'DocumentChunk', 'Entity', 'TextSummary')
"""
if not items:
return
graph_engine = await get_graph_engine()
# Convert to milliseconds since epoch (matching the field format)
timestamp_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
for item in items:
# Extract ID from payload (vector search results have this structure)
item_id = item.payload.get("id") if hasattr(item, 'payload') else item.get("id")
if not item_id:
continue
try:
# Get current node properties from Kuzu's Node table
result = await graph_engine.query(
"MATCH (n:Node {id: $id}) WHERE n.type = $node_type RETURN n.properties as props",
{"id": str(item_id), "node_type": node_type}
)
if result and len(result) > 0 and result[0][0]:
# Parse existing properties JSON
props = json.loads(result[0][0]) if result[0][0] else {}
# Update last_accessed_at with millisecond timestamp
props["last_accessed_at"] = timestamp_ms
# Write back to graph database
await graph_engine.query(
"MATCH (n:Node {id: $id}) WHERE n.type = $node_type SET n.properties = $props",
{"id": str(item_id), "node_type": node_type, "props": json.dumps(props)}
)
except Exception as e:
logger.warning(f"Failed to update timestamp for {node_type} {item_id}: {e}")
continue
logger.debug(f"Updated access timestamps for {len(items)} {node_type} nodes")

View file

@ -1,5 +1,7 @@
from typing import Union
from pydantic import BaseModel, Field
from typing import Union
from datetime import datetime, timezone
from cognee.infrastructure.engine import DataPoint from cognee.infrastructure.engine import DataPoint
from cognee.modules.chunking.models import DocumentChunk from cognee.modules.chunking.models import DocumentChunk
from cognee.shared.CodeGraphEntities import CodeFile, CodePart from cognee.shared.CodeGraphEntities import CodeFile, CodePart
@ -17,7 +19,9 @@ class TextSummary(DataPoint):
text: str text: str
made_from: DocumentChunk made_from: DocumentChunk
last_accessed_at: int = Field(
default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000)
)
metadata: dict = {"index_fields": ["text"]} metadata: dict = {"index_fields": ["text"]}