Merge pull request #4 from chinu0609/delete-last-acessed

Delete last acessed
This commit is contained in:
Chinmay Bhosale 2025-10-31 00:25:33 +05:30 committed by GitHub
commit 4b43afcdab
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 84 additions and 13 deletions

View file

@ -43,6 +43,9 @@ class DataPoint(BaseModel):
updated_at: int = Field(
default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000)
)
last_accessed_at: int = Field(
default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000)
)
ontology_valid: bool = False
version: int = 1 # Default version
topological_rank: Optional[int] = 0

View file

@ -1,5 +1,7 @@
from typing import List, Union
from pydantic import BaseModel, Field
from datetime import datetime, timezone
from cognee.infrastructure.engine import DataPoint
from cognee.modules.data.processing.document_types import Document
from cognee.modules.engine.models import Entity
@ -22,6 +24,7 @@ class DocumentChunk(DataPoint):
- cut_type: The type of cut that defined this chunk.
- is_part_of: The document to which this chunk belongs.
- contains: A list of entities or events contained within the chunk (default is None).
- last_accessed_at: The timestamp of the last time the chunk was accessed.
- metadata: A dictionary to hold meta information related to the chunk, including index
fields.
"""
@ -32,5 +35,4 @@ class DocumentChunk(DataPoint):
cut_type: str
is_part_of: Document
contains: List[Union[Entity, Event]] = None
metadata: dict = {"index_fields": ["text"]}

View file

@ -1,11 +1,11 @@
from cognee.infrastructure.engine import DataPoint
from cognee.modules.engine.models.EntityType import EntityType
from typing import Optional
from datetime import datetime, timezone
from pydantic import BaseModel, Field
class Entity(DataPoint):
name: str
is_a: Optional[EntityType] = None
description: str
metadata: dict = {"index_fields": ["name"]}

View file

@ -1,10 +1,11 @@
from typing import Any, Optional
from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps
from cognee.shared.logging_utils import get_logger
from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.modules.retrieval.base_retriever import BaseRetriever
from cognee.modules.retrieval.exceptions.exceptions import NoDataError
from cognee.infrastructure.databases.vector.exceptions.exceptions import CollectionNotFoundError
from datetime import datetime, timezone
logger = get_logger("ChunksRetriever")
@ -27,21 +28,16 @@ class ChunksRetriever(BaseRetriever):
):
self.top_k = top_k
async def get_context(self, query: str) -> Any:
async def get_context(self, query: str) -> Any:
"""
Retrieves document chunks context based on the query.
Searches for document chunks relevant to the specified query using a vector engine.
Raises a NoDataError if no data is found in the system.
Parameters:
-----------
- query (str): The query string to search for relevant document chunks.
Returns:
--------
- Any: A list of document chunk payloads retrieved from the search.
"""
logger.info(
@ -53,13 +49,14 @@ class ChunksRetriever(BaseRetriever):
try:
found_chunks = await vector_engine.search("DocumentChunk_text", query, limit=self.top_k)
logger.info(f"Found {len(found_chunks)} chunks from vector search")
await update_node_access_timestamps(found_chunks)
except CollectionNotFoundError as error:
logger.error("DocumentChunk_text collection not found in vector database")
raise NoDataError("No data found in the system, please add data first.") from error
chunk_payloads = [result.payload for result in found_chunks]
logger.info(f"Returning {len(chunk_payloads)} chunk payloads")
return chunk_payloads
async def get_completion(
self, query: str, context: Optional[Any] = None, session_id: Optional[str] = None

View file

@ -4,6 +4,7 @@ from cognee.shared.logging_utils import get_logger
from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.modules.retrieval.base_retriever import BaseRetriever
from cognee.modules.retrieval.exceptions.exceptions import NoDataError
from cognee.modules.retrieval.utils.access_tracking import update_node_access_timestamps
from cognee.infrastructure.databases.vector.exceptions.exceptions import CollectionNotFoundError
logger = get_logger("SummariesRetriever")
@ -54,6 +55,9 @@ class SummariesRetriever(BaseRetriever):
"TextSummary_text", query, limit=self.top_k
)
logger.info(f"Found {len(summaries_results)} summaries from vector search")
await update_node_access_timestamps(summaries_results)
except CollectionNotFoundError as error:
logger.error("TextSummary_text collection not found in vector database")
raise NoDataError("No data found in the system, please add data first.") from error

View file

@ -0,0 +1,64 @@
"""Utilities for tracking data access in retrievers."""
import json
from datetime import datetime, timezone
from typing import List, Any
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.shared.logging_utils import get_logger
logger = get_logger(__name__)
async def update_node_access_timestamps(items: List[Any]):
"""
Update last_accessed_at for nodes in Kuzu graph database.
Automatically determines node type from the graph database.
Parameters
----------
items : List[Any]
List of items with payload containing 'id' field (from vector search results)
"""
if not items:
return
graph_engine = await get_graph_engine()
timestamp_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
for item in items:
# Extract ID from payload
item_id = item.payload.get("id") if hasattr(item, 'payload') else item.get("id")
if not item_id:
continue
# try:
# Query to get both node type and properties in one call
result = await graph_engine.query(
"MATCH (n:Node {id: $id}) RETURN n.type as node_type, n.properties as props",
{"id": str(item_id)}
)
if result and len(result) > 0 and result[0]:
node_type = result[0][0] # First column: node_type
props_json = result[0][1] # Second column: properties
# Parse existing properties JSON
props = json.loads(props_json) if props_json else {}
# Update last_accessed_at with millisecond timestamp
props["last_accessed_at"] = timestamp_ms
# Write back to graph database
await graph_engine.query(
"MATCH (n:Node {id: $id}) SET n.properties = $props",
{"id": str(item_id), "props": json.dumps(props)}
)
logger.debug(f"Updated access timestamp for {node_type} node {item_id}")
# except Exception as e:
# logger.error(f"Failed to update timestamp for node {item_id}: {e}")
# continue
logger.debug(f"Updated access timestamps for {len(items)} nodes")

View file

@ -1,5 +1,7 @@
from typing import Union
from pydantic import BaseModel, Field
from typing import Union
from datetime import datetime, timezone
from cognee.infrastructure.engine import DataPoint
from cognee.modules.chunking.models import DocumentChunk
from cognee.shared.CodeGraphEntities import CodeFile, CodePart
@ -17,7 +19,6 @@ class TextSummary(DataPoint):
text: str
made_from: DocumentChunk
metadata: dict = {"index_fields": ["text"]}