769 lines
32 KiB
Python
769 lines
32 KiB
Python
|
|
import logging
|
|
import os
|
|
|
|
from neo4j import AsyncSession
|
|
from neo4j.exceptions import Neo4jError
|
|
|
|
print(os.getcwd())
|
|
|
|
import networkx as nx
|
|
|
|
from langchain.graphs import Neo4jGraph
|
|
import os
|
|
from dotenv import load_dotenv
|
|
|
|
import openai
|
|
import instructor
|
|
from openai import OpenAI
|
|
from openai import AsyncOpenAI
|
|
import pickle
|
|
|
|
from abc import ABC, abstractmethod
|
|
|
|
# Adds response_model to ChatCompletion
|
|
# Allows the return of Pydantic model rather than raw JSON
|
|
|
|
from pydantic import BaseModel, Field
|
|
from typing import List, Dict, Optional
|
|
from ...utils import format_dict, append_uuid_to_variable_names, create_edge_variable_mapping, \
|
|
create_node_variable_mapping, get_unsumarized_vector_db_namespace
|
|
from ...llm.queries import generate_summary, generate_graph
|
|
|
|
DEFAULT_PRESET = "promethai_chat"
|
|
preset_options = [DEFAULT_PRESET]
|
|
PROMETHAI_DIR = os.path.join(os.path.expanduser("~"), ".")
|
|
load_dotenv()
|
|
|
|
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
|
|
from ...config import Config
|
|
|
|
from ...shared.data_models import Node, Edge, KnowledgeGraph, GraphQLQuery, MemorySummary
|
|
|
|
config = Config()
|
|
config.load()
|
|
|
|
print(config.model)
|
|
print(config.openai_key)
|
|
|
|
OPENAI_API_KEY = config.openai_key
|
|
|
|
aclient = instructor.patch(OpenAI())
|
|
|
|
class AbstractGraphDB(ABC):
|
|
|
|
@abstractmethod
|
|
def query(self, query: str, params=None):
|
|
pass
|
|
|
|
# @abstractmethod
|
|
# def create_nodes(self, nodes: List[dict]):
|
|
# pass
|
|
#
|
|
# @abstractmethod
|
|
# def create_edges(self, edges: List[dict]):
|
|
# pass
|
|
#
|
|
# @abstractmethod
|
|
# def create_memory_type_relationships(self, nodes: List[dict], memory_type: str):
|
|
# pass
|
|
|
|
|
|
class Neo4jGraphDB(AbstractGraphDB):
|
|
def __init__(self, url, username, password):
|
|
# self.graph = Neo4jGraph(url=url, username=username, password=password)
|
|
from neo4j import GraphDatabase
|
|
self.driver = GraphDatabase.driver(url, auth=(username, password))
|
|
self.openai_key = config.openai_key
|
|
|
|
|
|
|
|
def close(self):
|
|
# Method to close the Neo4j driver instance
|
|
self.driver.close()
|
|
|
|
def query(self, query, params=None):
|
|
try:
|
|
with self.driver.session() as session:
|
|
result = session.run(query, params).data()
|
|
return result
|
|
except Exception as e:
|
|
logging.error(f"An error occurred while executing the query: {e}")
|
|
raise e
|
|
|
|
|
|
|
|
def create_base_cognitive_architecture(self, user_id: str):
|
|
# Create the user and memory components if they don't exist
|
|
user_memory_cypher = f"""
|
|
MERGE (user:User {{userId: '{user_id}'}})
|
|
MERGE (semantic:SemanticMemory {{description: 'SemanticMemory', userId: '{user_id}' }})
|
|
MERGE (episodic:EpisodicMemory {{description: 'EpisodicMemory' , userId: '{user_id}'}})
|
|
MERGE (buffer:Buffer {{description: 'Buffer' , userId: '{user_id}' }})
|
|
MERGE (user)-[:HAS_SEMANTIC_MEMORY]->(semantic)
|
|
MERGE (user)-[:HAS_EPISODIC_MEMORY]->(episodic)
|
|
MERGE (user)-[:HAS_BUFFER]->(buffer)
|
|
"""
|
|
return user_memory_cypher
|
|
|
|
async def retrieve_memory(self, user_id: str, memory_type: str, timestamp: float = None, summarized: bool = None):
|
|
if memory_type == 'SemanticMemory':
|
|
relationship = 'SEMANTIC_MEMORY'
|
|
memory_rel = 'HAS_KNOWLEDGE'
|
|
elif memory_type == 'EpisodicMemory':
|
|
relationship = 'EPISODIC_MEMORY'
|
|
memory_rel = 'HAS_EVENT'
|
|
elif memory_type == 'Buffer':
|
|
relationship = 'BUFFER'
|
|
memory_rel = 'CURRENTLY_HOLDING'
|
|
if timestamp is not None and summarized is not None:
|
|
query = f"""
|
|
MATCH (user:User {{userId: '{user_id}' }})-[:HAS_{relationship}]->(memory:{memory_type})
|
|
MATCH (memory)-[:{memory_rel}]->(item)
|
|
WHERE item.created_at >= {timestamp} AND item.summarized = {str(summarized).lower()}
|
|
RETURN item
|
|
"""
|
|
elif timestamp is not None:
|
|
query = f"""
|
|
MATCH (user:User {{userId: '{user_id}' }})-[:HAS_{relationship}]->(memory:{memory_type})
|
|
MATCH (memory)-[:{memory_rel}]->(item)
|
|
WHERE item.created_at >= {timestamp}
|
|
RETURN item
|
|
"""
|
|
elif summarized is not None:
|
|
query = f"""
|
|
MATCH (user:User {{userId: '{user_id}' }})-[:HAS_{relationship}]->(memory:{memory_type})
|
|
MATCH (memory)-[:{memory_rel}]->(item)
|
|
WHERE item.summarized = {str(summarized).lower()}
|
|
RETURN item
|
|
"""
|
|
print(query)
|
|
else:
|
|
query = f"""
|
|
MATCH (user:User {{userId: '{user_id}' }})-[:HAS_{relationship}]->(memory:{memory_type})
|
|
MATCH (memory)-[:{memory_rel}]->(item)
|
|
RETURN item
|
|
"""
|
|
output = self.query(query, params={"user_id": user_id})
|
|
print("Here is the output", output)
|
|
|
|
reduced_graph = await generate_summary(input = output)
|
|
return reduced_graph
|
|
|
|
|
|
def cypher_statement_correcting(self, input: str) ->str:
|
|
return aclient.chat.completions.create(
|
|
model=config.model,
|
|
messages=[
|
|
{
|
|
"role": "user",
|
|
"content": f"""Check the cypher query for syntax issues, and fix any if found and return it as is: {input}. """,
|
|
|
|
},
|
|
{"role": "system", "content": """You are a top-tier algorithm
|
|
designed for checking cypher queries for neo4j graph databases. You have to return input provided to you as is"""}
|
|
],
|
|
response_model=GraphQLQuery,
|
|
)
|
|
|
|
def generate_create_statements_for_nodes_with_uuid(self, nodes, unique_mapping, base_node_mapping):
|
|
create_statements = []
|
|
for node in nodes:
|
|
original_variable_name = base_node_mapping[node['id']]
|
|
unique_variable_name = unique_mapping[original_variable_name]
|
|
node_label = node['category'].capitalize()
|
|
properties = {k: v for k, v in node.items() if k not in ['id', 'category']}
|
|
try:
|
|
properties = format_dict(properties)
|
|
except:
|
|
pass
|
|
create_statements.append(f"CREATE ({unique_variable_name}:{node_label} {properties})")
|
|
return create_statements
|
|
|
|
# Update the function to generate Cypher CREATE statements for edges with unique variable names
|
|
def generate_create_statements_for_edges_with_uuid(self, user_id, edges, unique_mapping, base_node_mapping):
|
|
create_statements = []
|
|
with_statement = f"WITH {', '.join(unique_mapping.values())}, user , semantic, episodic, buffer"
|
|
create_statements.append(with_statement)
|
|
|
|
for edge in edges:
|
|
# print("HERE IS THE EDGE", edge)
|
|
source_variable = unique_mapping[base_node_mapping[edge['source']]]
|
|
target_variable = unique_mapping[base_node_mapping[edge['target']]]
|
|
relationship = edge['description'].replace(" ", "_").upper()
|
|
create_statements.append(f"CREATE ({source_variable})-[:{relationship}]->({target_variable})")
|
|
return create_statements
|
|
|
|
def generate_memory_type_relationships_with_uuid_and_time_context(self, user_id, nodes, unique_mapping, base_node_mapping):
|
|
create_statements = []
|
|
with_statement = f"WITH {', '.join(unique_mapping.values())}, user, semantic, episodic, buffer"
|
|
create_statements.append(with_statement)
|
|
|
|
# Loop through each node and create relationships based on memory_type
|
|
for node in nodes:
|
|
original_variable_name = base_node_mapping[node['id']]
|
|
unique_variable_name = unique_mapping[original_variable_name]
|
|
if node['memory_type'] == 'semantic':
|
|
create_statements.append(f"CREATE (semantic)-[:HAS_KNOWLEDGE]->({unique_variable_name})")
|
|
elif node['memory_type'] == 'episodic':
|
|
create_statements.append(f"CREATE (episodic)-[:HAS_EVENT]->({unique_variable_name})")
|
|
if node['category'] == 'time':
|
|
create_statements.append(f"CREATE (buffer)-[:HAS_TIME_CONTEXT]->({unique_variable_name})")
|
|
|
|
# Assuming buffer holds all actions and times
|
|
# if node['category'] in ['action', 'time']:
|
|
create_statements.append(f"CREATE (buffer)-[:CURRENTLY_HOLDING]->({unique_variable_name})")
|
|
|
|
return create_statements
|
|
|
|
async def generate_cypher_query_for_user_prompt_decomposition(self, user_id:str, query:str):
|
|
|
|
graph: KnowledgeGraph = generate_graph(query)
|
|
import time
|
|
for node in graph.nodes:
|
|
node.created_at = time.time()
|
|
node.summarized = False
|
|
|
|
for edge in graph.edges:
|
|
edge.created_at = time.time()
|
|
edge.summarized = False
|
|
graph_dic = graph.dict()
|
|
|
|
node_variable_mapping = create_node_variable_mapping(graph_dic['nodes'])
|
|
edge_variable_mapping = create_edge_variable_mapping(graph_dic['edges'])
|
|
# Create unique variable names for each node
|
|
unique_node_variable_mapping = append_uuid_to_variable_names(node_variable_mapping)
|
|
unique_edge_variable_mapping = append_uuid_to_variable_names(edge_variable_mapping)
|
|
create_nodes_statements = self.generate_create_statements_for_nodes_with_uuid(graph_dic['nodes'], unique_node_variable_mapping, node_variable_mapping)
|
|
create_edges_statements =self.generate_create_statements_for_edges_with_uuid(user_id, graph_dic['edges'], unique_node_variable_mapping, node_variable_mapping)
|
|
|
|
memory_type_statements_with_uuid_and_time_context = self.generate_memory_type_relationships_with_uuid_and_time_context(user_id,
|
|
graph_dic['nodes'], unique_node_variable_mapping, node_variable_mapping)
|
|
|
|
# # Combine all statements
|
|
cypher_statements = [self.create_base_cognitive_architecture(user_id)] + create_nodes_statements + create_edges_statements + memory_type_statements_with_uuid_and_time_context
|
|
cypher_statements_joined = "\n".join(cypher_statements)
|
|
logging.info("User Cypher Query raw: %s", cypher_statements_joined)
|
|
# corrected_cypher_statements = self.cypher_statement_correcting(input = cypher_statements_joined)
|
|
# logging.info("User Cypher Query: %s", corrected_cypher_statements.query)
|
|
# return corrected_cypher_statements.query
|
|
return cypher_statements_joined
|
|
|
|
|
|
def update_user_query_for_user_prompt_decomposition(self, user_id, user_query):
|
|
pass
|
|
|
|
|
|
def delete_all_user_memories(self, user_id):
|
|
try:
|
|
# Check if the user exists
|
|
user_exists = self.query(f"MATCH (user:User {{userId: '{user_id}'}}) RETURN user")
|
|
if not user_exists:
|
|
return f"No user found with ID: {user_id}"
|
|
|
|
# Delete all memory nodes and relationships for the given user
|
|
delete_query = f"""
|
|
MATCH (user:User {{userId: '{user_id}'}})-[r]-()
|
|
DELETE r
|
|
WITH user
|
|
MATCH (user)-[:HAS_SEMANTIC_MEMORY]->(semantic)
|
|
MATCH (user)-[:HAS_EPISODIC_MEMORY]->(episodic)
|
|
MATCH (user)-[:HAS_BUFFER]->(buffer)
|
|
DETACH DELETE semantic, episodic, buffer
|
|
"""
|
|
self.query(delete_query)
|
|
return f"All memories deleted for user ID: {user_id}"
|
|
except Exception as e:
|
|
return f"An error occurred: {str(e)}"
|
|
|
|
def delete_specific_memory_type(self, user_id, memory_type):
|
|
try:
|
|
# Check if the user exists
|
|
user_exists = self.query(f"MATCH (user:User {{userId: '{user_id}'}}) RETURN user")
|
|
if not user_exists:
|
|
return f"No user found with ID: {user_id}"
|
|
|
|
# Validate memory type
|
|
if memory_type not in ['SemanticMemory', 'EpisodicMemory', 'Buffer']:
|
|
return "Invalid memory type. Choose from 'SemanticMemory', 'EpisodicMemory', or 'Buffer'."
|
|
|
|
# Delete specific memory type nodes and relationships for the given user
|
|
delete_query = f"""
|
|
MATCH (user:User {{userId: '{user_id}'}})-[:HAS_{memory_type.upper()}]->(memory)
|
|
DETACH DELETE memory
|
|
"""
|
|
self.query(delete_query)
|
|
return f"{memory_type} deleted for user ID: {user_id}"
|
|
except Exception as e:
|
|
return f"An error occurred: {str(e)}"
|
|
|
|
def retrieve_semantic_memory(self, user_id: str, timestamp: float = None, summarized: bool = None):
|
|
if timestamp is not None and summarized is not None:
|
|
query = f"""
|
|
MATCH (user:User {{userId: '{user_id}' }})-[:HAS_SEMANTIC_MEMORY]->(semantic:SemanticMemory)
|
|
MATCH (semantic)-[:HAS_KNOWLEDGE]->(knowledge)
|
|
WHERE knowledge.created_at >= {timestamp} AND knowledge.summarized = {str(summarized).lower()}
|
|
RETURN knowledge
|
|
"""
|
|
elif timestamp is not None:
|
|
query = f"""
|
|
MATCH (user:User {{userId: '{user_id}' }})-[:HAS_SEMANTIC_MEMORY]->(semantic:SemanticMemory)
|
|
MATCH (semantic)-[:HAS_KNOWLEDGE]->(knowledge)
|
|
WHERE knowledge.created_at >= {timestamp}
|
|
RETURN knowledge
|
|
"""
|
|
elif summarized is not None:
|
|
query = f"""
|
|
MATCH (user:User {{userId: '{user_id}' }})-[:HAS_SEMANTIC_MEMORY]->(semantic:SemanticMemory)
|
|
MATCH (semantic)-[:HAS_KNOWLEDGE]->(knowledge)
|
|
WHERE knowledge.summarized = {str(summarized).lower()}
|
|
RETURN knowledge
|
|
"""
|
|
else:
|
|
query = f"""
|
|
MATCH (user:User {{userId: '{user_id}' }})-[:HAS_SEMANTIC_MEMORY]->(semantic:SemanticMemory)
|
|
MATCH (semantic)-[:HAS_KNOWLEDGE]->(knowledge)
|
|
RETURN knowledge
|
|
"""
|
|
return self.query(query, params={"user_id": user_id})
|
|
|
|
def retrieve_episodic_memory(self, user_id: str, timestamp: float = None, summarized: bool = None):
|
|
if timestamp is not None and summarized is not None:
|
|
query = f"""
|
|
MATCH (user:User {{userId: '{user_id}' }})-[:HAS_EPISODIC_MEMORY]->(episodic:EpisodicMemory)
|
|
MATCH (episodic)-[:HAS_EVENT]->(event)
|
|
WHERE event.created_at >= {timestamp} AND event.summarized = {str(summarized).lower()}
|
|
RETURN event
|
|
"""
|
|
elif timestamp is not None:
|
|
query = f"""
|
|
MATCH (user:User {{userId: '{user_id}' }})-[:HAS_EPISODIC_MEMORY]->(episodic:EpisodicMemory)
|
|
MATCH (episodic)-[:HAS_EVENT]->(event)
|
|
WHERE event.created_at >= {timestamp}
|
|
RETURN event
|
|
"""
|
|
elif summarized is not None:
|
|
query = f"""
|
|
MATCH (user:User {{userId: '{user_id}' }})-[:HAS_EPISODIC_MEMORY]->(episodic:EpisodicMemory)
|
|
MATCH (episodic)-[:HAS_EVENT]->(event)
|
|
WHERE event.summarized = {str(summarized).lower()}
|
|
RETURN event
|
|
"""
|
|
else:
|
|
query = f"""
|
|
MATCH (user:User {{userId: '{user_id}' }})-[:HAS_EPISODIC_MEMORY]->(episodic:EpisodicMemory)
|
|
MATCH (episodic)-[:HAS_EVENT]->(event)
|
|
RETURN event
|
|
"""
|
|
return self.query(query, params={"user_id": user_id})
|
|
|
|
|
|
def retrieve_buffer_memory(self, user_id: str, timestamp: float = None, summarized: bool = None):
|
|
if timestamp is not None and summarized is not None:
|
|
query = f"""
|
|
MATCH (user:User {{userId: '{user_id}' }})-[:HAS_BUFFER]->(buffer:Buffer)
|
|
MATCH (buffer)-[:CURRENTLY_HOLDING]->(item)
|
|
WHERE item.created_at >= {timestamp} AND item.summarized = {str(summarized).lower()}
|
|
RETURN item
|
|
"""
|
|
elif timestamp is not None:
|
|
query = f"""
|
|
MATCH (user:User {{userId: '{user_id}' }})-[:HAS_BUFFER]->(buffer:Buffer)
|
|
MATCH (buffer)-[:CURRENTLY_HOLDING]->(item)
|
|
WHERE item.created_at >= {timestamp}
|
|
RETURN item
|
|
"""
|
|
elif summarized is not None:
|
|
query = f"""
|
|
MATCH (user:User {{userId: '{user_id}' }})-[:HAS_BUFFER]->(buffer:Buffer)
|
|
MATCH (buffer)-[:CURRENTLY_HOLDING]->(item)
|
|
WHERE item.summarized = {str(summarized).lower()}
|
|
RETURN item
|
|
"""
|
|
else:
|
|
query = f"""
|
|
MATCH (user:User {{userId: '{user_id}' }})-[:HAS_BUFFER]->(buffer:Buffer)
|
|
MATCH (buffer)-[:CURRENTLY_HOLDING]->(item)
|
|
RETURN item
|
|
"""
|
|
return self.query(query, params={"user_id": user_id})
|
|
|
|
|
|
|
|
def retrieve_public_memory(self, user_id: str):
|
|
query = """
|
|
MATCH (user:User {userId: $user_id})-[:HAS_PUBLIC_MEMORY]->(public:PublicMemory)
|
|
MATCH (public)-[:HAS_DOCUMENT]->(document)
|
|
RETURN document
|
|
"""
|
|
return self.query(query, params={"user_id": user_id})
|
|
def generate_graph_semantic_memory_document_summary(self, document_summary : str, unique_graphdb_mapping_values: dict, document_namespace: str):
|
|
""" This function takes a document and generates a document summary in Semantic Memory"""
|
|
create_statements = []
|
|
with_statement = f"WITH {', '.join(unique_graphdb_mapping_values.values())}, user, semantic, episodic, buffer"
|
|
create_statements.append(with_statement)
|
|
|
|
# Loop through each node and create relationships based on memory_type
|
|
|
|
create_statements.append(f"CREATE (semantic)-[:HAS_KNOWLEDGE]->({unique_graphdb_mapping_values})")
|
|
|
|
|
|
return create_statements
|
|
|
|
|
|
def generate_document_summary(self, document_summary : str, unique_graphdb_mapping_values: dict, document_namespace: str):
|
|
""" This function takes a document and generates a document summary in Semantic Memory"""
|
|
|
|
|
|
# fetch namespace from postgres db
|
|
# fetch 1st and last page from vector store
|
|
# summarize the text, add document type
|
|
# write to postgres
|
|
create_statements = []
|
|
with_statement = f"WITH {', '.join(unique_graphdb_mapping_values.values())}, user, semantic, episodic, buffer"
|
|
create_statements.append(with_statement)
|
|
|
|
# Loop through each node and create relationships based on memory_type
|
|
|
|
create_statements.append(f"CREATE (semantic)-[:HAS_KNOWLEDGE]->({unique_graphdb_mapping_values})")
|
|
|
|
|
|
return create_statements
|
|
|
|
async def get_memory_linked_document_summaries(self, user_id: str, memory_type: str = "PublicMemory"):
|
|
"""
|
|
Retrieve a list of summaries for all documents associated with a given memory type for a user.
|
|
|
|
Args:
|
|
user_id (str): The unique identifier of the user.
|
|
memory_type (str): The type of memory node ('SemanticMemory' or 'PublicMemory').
|
|
|
|
Returns:
|
|
List[Dict[str, Union[str, None]]]: A list of dictionaries containing document summary and d_id.
|
|
|
|
Raises:
|
|
Exception: If an error occurs during the database query execution.
|
|
"""
|
|
if memory_type == "PublicMemory":
|
|
relationship = "HAS_PUBLIC_MEMORY"
|
|
elif memory_type == "SemanticMemory":
|
|
relationship = "HAS_SEMANTIC_MEMORY"
|
|
try:
|
|
query = f'''
|
|
MATCH (user:User {{userId: '{user_id}'}})-[:{relationship}]->(memory:{memory_type})-[:HAS_DOCUMENT]->(document:Document)
|
|
RETURN document.d_id AS d_id, document.summary AS summary
|
|
'''
|
|
logging.info(f"Generated Cypher query: {query}")
|
|
result = self.query(query)
|
|
logging.info(f"Result: {result}")
|
|
return [{"d_id": record.get("d_id", None), "summary": record.get("summary", "No summary available")} for
|
|
record in result]
|
|
|
|
except Exception as e:
|
|
logging.error(f"An error occurred while retrieving document summary: {str(e)}")
|
|
return None
|
|
|
|
|
|
|
|
async def get_memory_linked_document_ids(self, user_id: str, summary_id: str, memory_type: str = "PublicMemory"):
|
|
"""
|
|
Retrieve a list of document IDs for a specific category associated with a given memory type for a user.
|
|
|
|
Args:
|
|
user_id (str): The unique identifier of the user.
|
|
summary_id (str): The specific document summary id to filter by.
|
|
memory_type (str): The type of memory node ('SemanticMemory' or 'PublicMemory').
|
|
|
|
Returns:
|
|
List[str]: A list of document IDs in the specified category associated with the memory type for the user.
|
|
|
|
Raises:
|
|
Exception: If an error occurs during the database query execution.
|
|
"""
|
|
|
|
if memory_type == "PublicMemory":
|
|
relationship = "HAS_PUBLIC_MEMORY"
|
|
elif memory_type == "SemanticMemory":
|
|
relationship = "HAS_SEMANTIC_MEMORY"
|
|
try:
|
|
query = f'''
|
|
MATCH (user:User {{userId: '{user_id}'}})-[:{relationship}]->(memory:{memory_type})-[:HAS_DOCUMENT]->(document:Document)
|
|
WHERE document.d_id = '{summary_id}'
|
|
RETURN document.d_id AS d_id
|
|
'''
|
|
logging.info(f"Generated Cypher query: {query}")
|
|
result = self.query(query)
|
|
return [record["d_id"] for record in result]
|
|
except Exception as e:
|
|
logging.error(f"An error occurred while retrieving document IDs: {str(e)}")
|
|
return None
|
|
|
|
|
|
def create_document_node_cypher(self, document_summary: dict, user_id: str,
|
|
memory_type: str = "PublicMemory",public_memory_id:str=None) -> str:
|
|
"""
|
|
Generate a Cypher query to create a Document node. If the memory type is 'Semantic',
|
|
link it to a SemanticMemory node for a user. If the memory type is 'PublicMemory',
|
|
only link the Document node to the PublicMemory node.
|
|
|
|
Parameters:
|
|
- document_summary (dict): A dictionary containing the document's category, title, summary, and document ID.
|
|
- user_id (str): The unique identifier for the user.
|
|
- memory_type (str): The type of memory node to link ("Semantic" or "PublicMemory"). Default is "PublicMemory".
|
|
|
|
Returns:
|
|
- str: A Cypher query string with parameters.
|
|
|
|
Raises:
|
|
- ValueError: If any required data is missing or invalid.
|
|
"""
|
|
|
|
# Validate the input parameters
|
|
if not isinstance(document_summary, dict):
|
|
raise ValueError("The document_summary must be a dictionary.")
|
|
if not all(key in document_summary for key in ['DocumentCategory', 'Title', 'Summary', 'd_id']):
|
|
raise ValueError("The document_summary dictionary is missing required keys.")
|
|
if not isinstance(user_id, str) or not user_id:
|
|
raise ValueError("The user_id must be a non-empty string.")
|
|
if memory_type not in ["SemanticMemory", "PublicMemory"]:
|
|
raise ValueError("The memory_type must be either 'Semantic' or 'PublicMemory'.")
|
|
|
|
# Escape single quotes in the document summary data
|
|
title = document_summary['Title'].replace("'", "\\'")
|
|
summary = document_summary['Summary'].replace("'", "\\'")
|
|
document_category = document_summary['DocumentCategory'].replace("'", "\\'")
|
|
d_id = document_summary['d_id'].replace("'", "\\'")
|
|
|
|
memory_node_type = "SemanticMemory" if memory_type == "SemanticMemory" else "PublicMemory"
|
|
|
|
user_memory_link = ''
|
|
if memory_type == "SemanticMemory":
|
|
user_memory_link = f'''
|
|
// Ensure the User node exists
|
|
MERGE (user:User {{ userId: '{user_id}' }})
|
|
MERGE (memory:SemanticMemory {{ userId: '{user_id}' }})
|
|
MERGE (user)-[:HAS_SEMANTIC_MEMORY]->(memory)
|
|
'''
|
|
elif memory_type == "PublicMemory":
|
|
logging.info(f"Public memory id: {public_memory_id}")
|
|
user_memory_link = f'''
|
|
// Merge with the existing PublicMemory node or create a new one if it does not exist
|
|
MATCH (memory:PublicMemory {{ memoryId: {public_memory_id} }})
|
|
'''
|
|
|
|
cypher_query = f'''
|
|
{user_memory_link}
|
|
|
|
// Create the Document node with its properties
|
|
CREATE (document:Document {{
|
|
title: '{title}',
|
|
summary: '{summary}',
|
|
documentCategory: '{document_category}',
|
|
d_id: '{d_id}',
|
|
created_at: timestamp(),
|
|
}})
|
|
|
|
// Link the Document node to the {memory_node_type} node
|
|
MERGE (memory)-[:HAS_DOCUMENT]->(document)
|
|
'''
|
|
|
|
logging.info(f"Generated Cypher query: {cypher_query}")
|
|
|
|
return cypher_query
|
|
|
|
def update_document_node_with_db_ids(self, vectordb_namespace: str, document_id: str, user_id: str = None):
|
|
"""
|
|
Update the namespace of a Document node in the database. The document can be linked
|
|
either to a SemanticMemory node (if a user ID is provided) or to a PublicMemory node.
|
|
|
|
Parameters:
|
|
- vectordb_namespace (str): The namespace to set for the vectordb.
|
|
- document_id (str): The unique identifier of the document.
|
|
- user_id (str, optional): The unique identifier for the user. Default is None.
|
|
|
|
Returns:
|
|
- str: A Cypher query string to perform the update.
|
|
"""
|
|
|
|
if user_id:
|
|
# Update for a document linked to a SemanticMemory node
|
|
cypher_query = f'''
|
|
MATCH (user:User {{userId: '{user_id}' }})-[:HAS_SEMANTIC_MEMORY]->(:SemanticMemory)-[:HAS_DOCUMENT]->(document:Document {{d_id: '{document_id}'}})
|
|
SET document.vectordbNamespace = '{vectordb_namespace}'
|
|
RETURN document
|
|
'''
|
|
else:
|
|
# Update for a document linked to a PublicMemory node
|
|
cypher_query = f'''
|
|
MATCH (:PublicMemory)-[:HAS_DOCUMENT]->(document:Document {{d_id: '{document_id}'}})
|
|
SET document.vectordbNamespace = '{vectordb_namespace}'
|
|
RETURN document
|
|
'''
|
|
|
|
return cypher_query
|
|
|
|
def get_namespaces_by_document_category(self, user_id: str, category: str):
|
|
"""
|
|
Retrieve a list of Vectordb namespaces for documents of a specified category associated with a given user.
|
|
|
|
This function executes a Cypher query in a Neo4j database to fetch the 'vectordbNamespace' of all 'Document' nodes
|
|
that are linked to the 'SemanticMemory' node of the specified user and belong to the specified category.
|
|
|
|
Parameters:
|
|
- user_id (str): The unique identifier of the user.
|
|
- category (str): The category to filter the documents by.
|
|
|
|
Returns:
|
|
- List[str]: A list of Vectordb namespaces for documents in the specified category.
|
|
|
|
Raises:
|
|
- Exception: If an error occurs during the database query execution.
|
|
"""
|
|
try:
|
|
query = f'''
|
|
MATCH (user:User {{userId: '{user_id}'}})-[:HAS_SEMANTIC_MEMORY]->(semantic:SemanticMemory)-[:HAS_DOCUMENT]->(document:Document)
|
|
WHERE document.documentCategory = '{category}'
|
|
RETURN document.vectordbNamespace AS namespace
|
|
'''
|
|
result = self.query(query)
|
|
namespaces = [record["namespace"] for record in result]
|
|
return namespaces
|
|
except Exception as e:
|
|
logging.error(f"An error occurred while retrieving namespaces by document category: {str(e)}")
|
|
return None
|
|
|
|
async def create_memory_node(self, labels, topic=None):
|
|
"""
|
|
Create or find a memory node of the specified type with labels and a description.
|
|
|
|
Args:
|
|
labels (List[str]): A list of labels for the node.
|
|
topic (str, optional): The type of memory node to create or find. Defaults to "PublicMemory".
|
|
|
|
Returns:
|
|
int: The ID of the created or found memory node.
|
|
|
|
Raises:
|
|
ValueError: If input parameters are invalid.
|
|
Neo4jError: If an error occurs during the database operation.
|
|
"""
|
|
if topic is None:
|
|
topic = "PublicMemory"
|
|
|
|
# Prepare labels as a string
|
|
label_list = ', '.join(f"'{label}'" for label in labels)
|
|
|
|
# Cypher query to find or create the memory node with the given description and labels
|
|
memory_cypher = f"""
|
|
MERGE (memory:{topic} {{description: '{topic}', label: [{label_list}]}})
|
|
SET memory.memoryId = ID(memory)
|
|
RETURN id(memory) AS memoryId
|
|
"""
|
|
|
|
try:
|
|
result = self.query(memory_cypher)
|
|
# Assuming the result is a list of records, where each record contains 'memoryId'
|
|
memory_id = result[0]['memoryId'] if result else None
|
|
self.close()
|
|
return memory_id
|
|
except Neo4jError as e:
|
|
logging.error(f"Error creating or finding memory node: {e}")
|
|
raise
|
|
|
|
def link_user_to_public(self, user_id: str, public_property_value: str, public_property_name: str = 'name',
|
|
relationship_type: str = 'HAS_PUBLIC'):
|
|
if not user_id or not public_property_value:
|
|
raise ValueError("Valid User ID and Public property value are required for linking.")
|
|
|
|
try:
|
|
link_cypher = f"""
|
|
MATCH (user:User {{userId: '{user_id}'}})
|
|
MATCH (public:Public {{{public_property_name}: '{public_property_value}'}})
|
|
MERGE (user)-[:{relationship_type}]->(public)
|
|
"""
|
|
self.query(link_cypher)
|
|
except Neo4jError as e:
|
|
logging.error(f"Error linking Public node to user: {e}")
|
|
raise
|
|
|
|
def delete_memory_node(self, memory_id: int, topic: str) -> None:
|
|
if not memory_id or not topic:
|
|
raise ValueError("Memory ID and Topic are required for deletion.")
|
|
|
|
try:
|
|
delete_cypher = f"""
|
|
MATCH ({topic.lower()}: {topic}) WHERE id({topic.lower()}) = {memory_id}
|
|
DETACH DELETE {topic.lower()}
|
|
"""
|
|
logging.info("Delete Cypher Query: %s", delete_cypher)
|
|
self.query(delete_cypher)
|
|
except Neo4jError as e:
|
|
logging.error(f"Error deleting {topic} memory node: {e}")
|
|
raise
|
|
|
|
def unlink_memory_from_user(self, memory_id: int, user_id: str, topic: str='PublicMemory') -> None:
|
|
"""
|
|
Unlink a memory node from a user node.
|
|
|
|
Parameters:
|
|
- memory_id (int): The internal ID of the memory node.
|
|
- user_id (str): The unique identifier for the user.
|
|
- memory_type (str): The type of memory node to unlink ("SemanticMemory" or "PublicMemory").
|
|
|
|
Raises:
|
|
- ValueError: If any required data is missing or invalid.
|
|
"""
|
|
|
|
if not user_id or not isinstance(memory_id, int):
|
|
raise ValueError("Valid User ID and Memory ID are required for unlinking.")
|
|
|
|
if topic not in ["SemanticMemory", "PublicMemory"]:
|
|
raise ValueError("The memory_type must be either 'SemanticMemory' or 'PublicMemory'.")
|
|
|
|
relationship_type = "HAS_SEMANTIC_MEMORY" if topic == "SemanticMemory" else "HAS_PUBLIC_MEMORY"
|
|
|
|
try:
|
|
unlink_cypher = f"""
|
|
MATCH (user:User {{userId: '{user_id}'}})-[r:{relationship_type}]->(memory:{topic}) WHERE id(memory) = {memory_id}
|
|
DELETE r
|
|
"""
|
|
self.query(unlink_cypher)
|
|
except Neo4jError as e:
|
|
logging.error(f"Error unlinking {topic} from user: {e}")
|
|
raise
|
|
|
|
|
|
def link_public_memory_to_user(self, memory_id, user_id):
|
|
# Link an existing Public Memory node to a User node
|
|
link_cypher = f"""
|
|
MATCH (user:User {{userId: '{user_id}'}})
|
|
MATCH (publicMemory:PublicMemory) WHERE id(publicMemory) = {memory_id}
|
|
MERGE (user)-[:HAS_PUBLIC_MEMORY]->(publicMemory)
|
|
"""
|
|
self.query(link_cypher)
|
|
|
|
def retrieve_node_id_for_memory_type(self, topic: str = 'SemanticMemory'):
|
|
link_cypher = f""" MATCH(publicMemory: {topic})
|
|
RETURN
|
|
id(publicMemory)
|
|
AS
|
|
memoryId """
|
|
node_ids = self.query(link_cypher)
|
|
return node_ids
|
|
|
|
|
|
|
|
|
|
from networkx_graph import NetworkXGraphDB
|
|
class GraphDBFactory:
|
|
def create_graph_db(self, db_type, **kwargs):
|
|
if db_type == 'neo4j':
|
|
return Neo4jGraphDB(**kwargs)
|
|
elif db_type == 'networkx':
|
|
return NetworkXGraphDB(**kwargs)
|
|
else:
|
|
raise ValueError(f"Unsupported database type: {db_type}")
|
|
|
|
|
|
|
|
|