604 lines
26 KiB
Python
604 lines
26 KiB
Python
# import marvin
|
|
# from pydantic_settings import BaseSettings
|
|
# from marvin import ai_classifier
|
|
# marvin.settings.openai.api_key = os.environ.get("OPENAI_API_KEY")
|
|
import logging
|
|
import os
|
|
|
|
print(os.getcwd())
|
|
|
|
import networkx as nx
|
|
|
|
from langchain.graphs import Neo4jGraph
|
|
import os
|
|
from dotenv import load_dotenv
|
|
|
|
import openai
|
|
import instructor
|
|
from openai import OpenAI
|
|
from openai import AsyncOpenAI
|
|
|
|
|
|
from abc import ABC, abstractmethod
|
|
|
|
# Adds response_model to ChatCompletion
|
|
# Allows the return of Pydantic model rather than raw JSON
|
|
|
|
from pydantic import BaseModel, Field
|
|
from typing import List
|
|
from ...utils import format_dict, append_uuid_to_variable_names, create_edge_variable_mapping, create_node_variable_mapping
|
|
DEFAULT_PRESET = "promethai_chat"
|
|
preset_options = [DEFAULT_PRESET]
|
|
PROMETHAI_DIR = os.path.join(os.path.expanduser("~"), ".")
|
|
load_dotenv()
|
|
|
|
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
|
|
from ...config import Config
|
|
|
|
config = Config()
|
|
config.load()
|
|
|
|
print(config.model)
|
|
print(config.openai_key)
|
|
|
|
OPENAI_API_KEY = config.openai_key
|
|
|
|
aclient = instructor.patch(OpenAI())
|
|
|
|
|
|
#Execute Cypher queries to create the user and memory components if they don't exist
|
|
#
|
|
# graph.query(
|
|
# f"""
|
|
# // Ensure the User node exists
|
|
# MERGE (user:User {{ userId: {user} }})
|
|
#
|
|
# // Ensure the SemanticMemory node exists
|
|
# MERGE (semantic:SemanticMemory {{ userId: {user} }})
|
|
# MERGE (user)-[:HAS_SEMANTIC_MEMORY]->(semantic)
|
|
#
|
|
# // Ensure the EpisodicMemory node exists
|
|
# MERGE (episodic:EpisodicMemory {{ userId: {user} }})
|
|
# MERGE (user)-[:HAS_EPISODIC_MEMORY]->(episodic)
|
|
#
|
|
# // Ensure the Buffer node exists
|
|
# MERGE (buffer:Buffer {{ userId: {user} }})
|
|
# MERGE (user)-[:HAS_BUFFER]->(buffer)
|
|
# """
|
|
# )
|
|
#
|
|
# # Execute Cypher queries to create the cognitive components in the graph
|
|
# graph.query(
|
|
# f"""
|
|
# // Parsing the query into components and linking them to the user and memory components
|
|
# MERGE (user:User {{ userId: {user} }})
|
|
# MERGE (semantic:SemanticMemory {{ userId: {user} }})
|
|
# MERGE (episodic:EpisodicMemory {{ userId: {user} }})
|
|
# MERGE (buffer:Buffer {{ userId: {user} }})
|
|
#
|
|
# CREATE (action1:Event {{ description: 'take a walk', location: 'forest' }})
|
|
# CREATE (action2:Event {{ description: 'get information', source: 'book' }})
|
|
# CREATE (time:TimeContext {{ description: 'in the afternoon' }})
|
|
#
|
|
# WITH user, semantic, episodic, buffer, action1, action2, time
|
|
# CREATE (knowledge:Knowledge {{ content: 'information from a book' }})
|
|
# CREATE (semantic)-[:HAS_KNOWLEDGE]->(knowledge)
|
|
# CREATE (episodic)-[:HAS_EVENT]->(action1)
|
|
# CREATE (episodic)-[:HAS_EVENT]->(action2)
|
|
# CREATE (episodic)-[:HAS_TIME_CONTEXT]->(time)
|
|
# CREATE (buffer)-[:CURRENTLY_HOLDING]->(action1)
|
|
# CREATE (buffer)-[:CURRENTLY_HOLDING]->(action2)
|
|
# CREATE (buffer)-[:CURRENTLY_HOLDING]->(time)
|
|
# """
|
|
# )
|
|
|
|
|
|
class Node(BaseModel):
|
|
id: int
|
|
description: str
|
|
category: str
|
|
color: str ="blue"
|
|
memory_type: str
|
|
|
|
|
|
|
|
class Edge(BaseModel):
|
|
source: int
|
|
target: int
|
|
description: str
|
|
color: str= "blue"
|
|
|
|
|
|
class KnowledgeGraph(BaseModel):
|
|
nodes: List[Node] = Field(..., default_factory=list)
|
|
edges: List[Edge] = Field(..., default_factory=list)
|
|
|
|
|
|
#
|
|
|
|
def generate_graph(input) -> KnowledgeGraph:
|
|
out = aclient.chat.completions.create(
|
|
model="gpt-4-1106-preview",
|
|
messages=[
|
|
{
|
|
"role": "user",
|
|
"content": f"""Use the given format to extract information from the following input: {input}. """,
|
|
|
|
},
|
|
{ "role":"system", "content": """You are a top-tier algorithm
|
|
designed for extracting information in structured formats to build a knowledge graph.
|
|
- **Nodes** represent entities and concepts. They're akin to Wikipedia nodes.
|
|
- The aim is to achieve simplicity and clarity in the
|
|
knowledge graph, making it accessible for a vast audience.
|
|
## 2. Labeling Nodes
|
|
- **Consistency**: Ensure you use basic or elementary types for node labels.
|
|
- For example, when you identify an entity representing a person,
|
|
always label it as **"person"**.
|
|
Avoid using more specific terms like "mathematician" or "scientist".
|
|
- Include event, entity, time, or action nodes to the category.
|
|
- Classify the memory type as episodic or semantic.
|
|
- **Node IDs**: Never utilize integers as node IDs.
|
|
Node IDs should be names or human-readable identifiers found in the text.
|
|
## 3. Handling Numerical Data and Dates
|
|
- Numerical data, like age or other related information,
|
|
should be incorporated as attributes or properties of the respective nodes.
|
|
- **No Separate Nodes for Dates/Numbers**:
|
|
Do not create separate nodes for dates or numerical values.
|
|
Always attach them as attributes or properties of nodes.
|
|
- **Property Format**: Properties must be in a key-value format.
|
|
- **Quotation Marks**: Never use escaped single or double quotes within property values.
|
|
- **Naming Convention**: Use camelCase for property keys, e.g., `birthDate`.
|
|
## 4. Coreference Resolution
|
|
- **Maintain Entity Consistency**:
|
|
When extracting entities, it's vital to ensure consistency.
|
|
If an entity, such as "John Doe", is mentioned multiple times
|
|
in the text but is referred to by different names or pronouns (e.g., "Joe", "he"),
|
|
always use the most complete identifier for that entity throughout the knowledge graph.
|
|
In this example, use "John Doe" as the entity ID.
|
|
Remember, the knowledge graph should be coherent and easily understandable,
|
|
so maintaining consistency in entity references is crucial.
|
|
## 5. Strict Compliance
|
|
Adhere to the rules strictly. Non-compliance will result in termination."""}
|
|
],
|
|
response_model=KnowledgeGraph,
|
|
)
|
|
return out
|
|
|
|
class AbstractGraphDB(ABC):
|
|
|
|
@abstractmethod
|
|
def query(self, query: str, params=None):
|
|
pass
|
|
|
|
# @abstractmethod
|
|
# def create_nodes(self, nodes: List[dict]):
|
|
# pass
|
|
#
|
|
# @abstractmethod
|
|
# def create_edges(self, edges: List[dict]):
|
|
# pass
|
|
#
|
|
# @abstractmethod
|
|
# def create_memory_type_relationships(self, nodes: List[dict], memory_type: str):
|
|
# pass
|
|
|
|
|
|
class Neo4jGraphDB(AbstractGraphDB):
|
|
def __init__(self, url, username, password):
|
|
self.graph = Neo4jGraph(url=url, username=username, password=password)
|
|
self.openai_key = config.openai_key
|
|
|
|
|
|
|
|
def query(self, query, params=None):
|
|
return self.graph.query(query, params)
|
|
# Initialize the Neo4j connection here
|
|
|
|
|
|
def create_base_cognitive_architecture(self, user_id: str):
|
|
# Create the user and memory components if they don't exist
|
|
user_memory_cypher = f"""
|
|
MERGE (user:User {{userId: '{user_id}'}})
|
|
MERGE (semantic:SemanticMemory {{userId: '{user_id}'}})
|
|
MERGE (episodic:EpisodicMemory {{userId: '{user_id}'}})
|
|
MERGE (buffer:Buffer {{userId: '{user_id}'}})
|
|
MERGE (user)-[:HAS_SEMANTIC_MEMORY]->(semantic)
|
|
MERGE (user)-[:HAS_EPISODIC_MEMORY]->(episodic)
|
|
MERGE (user)-[:HAS_BUFFER]->(buffer)
|
|
"""
|
|
|
|
return user_memory_cypher
|
|
|
|
def user_query_to_edges_and_nodes(self, input: str) ->KnowledgeGraph:
|
|
return openai.ChatCompletion.create(
|
|
model=config.model,
|
|
messages=[
|
|
{
|
|
"role": "user",
|
|
"content": f"""Use the given format to extract information from the following input: {input}. """,
|
|
|
|
},
|
|
{"role": "system", "content": """You are a top-tier algorithm
|
|
designed for extracting information in structured formats to build a knowledge graph.
|
|
- **Nodes** represent entities and concepts. They're akin to Wikipedia nodes.
|
|
- The aim is to achieve simplicity and clarity in the
|
|
knowledge graph, making it accessible for a vast audience.
|
|
## 2. Labeling Nodes
|
|
- **Consistency**: Ensure you use basic or elementary types for node labels.
|
|
- For example, when you identify an entity representing a person,
|
|
always label it as **"person"**.
|
|
Avoid using more specific terms like "mathematician" or "scientist".
|
|
- Include event, entity, time, or action nodes to the category.
|
|
- Classify the memory type as episodic or semantic.
|
|
- **Node IDs**: Never utilize integers as node IDs.
|
|
Node IDs should be names or human-readable identifiers found in the text.
|
|
## 3. Handling Numerical Data and Dates
|
|
- Numerical data, like age or other related information,
|
|
should be incorporated as attributes or properties of the respective nodes.
|
|
- **No Separate Nodes for Dates/Numbers**:
|
|
Do not create separate nodes for dates or numerical values.
|
|
Always attach them as attributes or properties of nodes.
|
|
- **Property Format**: Properties must be in a key-value format.
|
|
- **Quotation Marks**: Never use escaped single or double quotes within property values.
|
|
- **Naming Convention**: Use camelCase for property keys, e.g., `birthDate`.
|
|
## 4. Coreference Resolution
|
|
- **Maintain Entity Consistency**:
|
|
When extracting entities, it's vital to ensure consistency.
|
|
If an entity, such as "John Doe", is mentioned multiple times
|
|
in the text but is referred to by different names or pronouns (e.g., "Joe", "he"),
|
|
always use the most complete identifier for that entity throughout the knowledge graph.
|
|
In this example, use "John Doe" as the entity ID.
|
|
Remember, the knowledge graph should be coherent and easily understandable,
|
|
so maintaining consistency in entity references is crucial.
|
|
## 5. Strict Compliance
|
|
Adhere to the rules strictly. Non-compliance will result in termination."""}
|
|
],
|
|
response_model=KnowledgeGraph,
|
|
)
|
|
|
|
def generate_create_statements_for_nodes_with_uuid(self, nodes, unique_mapping, base_node_mapping):
|
|
create_statements = []
|
|
for node in nodes:
|
|
original_variable_name = base_node_mapping[node['id']]
|
|
unique_variable_name = unique_mapping[original_variable_name]
|
|
node_label = node['category'].capitalize()
|
|
properties = {k: v for k, v in node.items() if k not in ['id', 'category']}
|
|
try:
|
|
properties = format_dict(properties)
|
|
except:
|
|
pass
|
|
create_statements.append(f"CREATE ({unique_variable_name}:{node_label} {properties})")
|
|
return create_statements
|
|
|
|
# Update the function to generate Cypher CREATE statements for edges with unique variable names
|
|
def generate_create_statements_for_edges_with_uuid(self, user_id, edges, unique_mapping, base_node_mapping):
|
|
create_statements = []
|
|
with_statement = f"WITH {', '.join(unique_mapping.values())}, {user_id}, semantic, episodic, buffer"
|
|
create_statements.append(with_statement)
|
|
|
|
for edge in edges:
|
|
# print("HERE IS THE EDGE", edge)
|
|
source_variable = unique_mapping[base_node_mapping[edge['source']]]
|
|
target_variable = unique_mapping[base_node_mapping[edge['target']]]
|
|
relationship = edge['description'].replace(" ", "_").upper()
|
|
create_statements.append(f"CREATE ({source_variable})-[:{relationship}]->({target_variable})")
|
|
return create_statements
|
|
|
|
def generate_memory_type_relationships_with_uuid_and_time_context(self, user_id, nodes, unique_mapping, base_node_mapping):
|
|
create_statements = []
|
|
with_statement = f"WITH {', '.join(unique_mapping.values())}, {user_id}, semantic, episodic, buffer"
|
|
create_statements.append(with_statement)
|
|
|
|
# Loop through each node and create relationships based on memory_type
|
|
for node in nodes:
|
|
original_variable_name = base_node_mapping[node['id']]
|
|
unique_variable_name = unique_mapping[original_variable_name]
|
|
if node['memory_type'] == 'semantic':
|
|
create_statements.append(f"CREATE (semantic)-[:HAS_KNOWLEDGE]->({unique_variable_name})")
|
|
elif node['memory_type'] == 'episodic':
|
|
create_statements.append(f"CREATE (episodic)-[:HAS_EVENT]->({unique_variable_name})")
|
|
if node['category'] == 'time':
|
|
create_statements.append(f"CREATE (buffer)-[:HAS_TIME_CONTEXT]->({unique_variable_name})")
|
|
|
|
# Assuming buffer holds all actions and times
|
|
# if node['category'] in ['action', 'time']:
|
|
create_statements.append(f"CREATE (buffer)-[:CURRENTLY_HOLDING]->({unique_variable_name})")
|
|
|
|
return create_statements
|
|
|
|
async def generate_cypher_query_for_user_prompt_decomposition(self, user_id:str, query:str):
|
|
|
|
graph: KnowledgeGraph = generate_graph(query)
|
|
graph_dic = graph.dict()
|
|
|
|
node_variable_mapping = create_node_variable_mapping(graph_dic['nodes'])
|
|
edge_variable_mapping = create_edge_variable_mapping(graph_dic['edges'])
|
|
# Create unique variable names for each node
|
|
unique_node_variable_mapping = append_uuid_to_variable_names(node_variable_mapping)
|
|
unique_edge_variable_mapping = append_uuid_to_variable_names(edge_variable_mapping)
|
|
create_nodes_statements = self.generate_create_statements_for_nodes_with_uuid(graph_dic['nodes'], unique_node_variable_mapping, node_variable_mapping)
|
|
create_edges_statements =self.generate_create_statements_for_edges_with_uuid(user_id, graph_dic['edges'], unique_node_variable_mapping, node_variable_mapping)
|
|
|
|
memory_type_statements_with_uuid_and_time_context = self.generate_memory_type_relationships_with_uuid_and_time_context(user_id,
|
|
graph_dic['nodes'], unique_node_variable_mapping, node_variable_mapping)
|
|
|
|
# # Combine all statements
|
|
cypher_statements = [self.create_base_cognitive_architecture(user_id)] + create_nodes_statements + create_edges_statements + memory_type_statements_with_uuid_and_time_context
|
|
cypher_statements_joined = "\n".join(cypher_statements)
|
|
return cypher_statements_joined
|
|
|
|
|
|
def update_user_query_for_user_prompt_decomposition(self, user_id, user_query):
|
|
pass
|
|
|
|
|
|
def delete_all_user_memories(self, user_id):
|
|
try:
|
|
# Check if the user exists
|
|
user_exists = self.graph.query(f"MATCH (user:User {{userId: '{user_id}'}}) RETURN user")
|
|
if not user_exists:
|
|
return f"No user found with ID: {user_id}"
|
|
|
|
# Delete all memory nodes and relationships for the given user
|
|
delete_query = f"""
|
|
MATCH (user:User {{userId: '{user_id}'}})-[r]-()
|
|
DELETE r
|
|
WITH user
|
|
MATCH (user)-[:HAS_SEMANTIC_MEMORY]->(semantic)
|
|
MATCH (user)-[:HAS_EPISODIC_MEMORY]->(episodic)
|
|
MATCH (user)-[:HAS_BUFFER]->(buffer)
|
|
DETACH DELETE semantic, episodic, buffer
|
|
"""
|
|
self.graph.query(delete_query)
|
|
return f"All memories deleted for user ID: {user_id}"
|
|
except Exception as e:
|
|
return f"An error occurred: {str(e)}"
|
|
|
|
def delete_specific_memory_type(self, user_id, memory_type):
|
|
try:
|
|
# Check if the user exists
|
|
user_exists = self.graph.query(f"MATCH (user:User {{userId: '{user_id}'}}) RETURN user")
|
|
if not user_exists:
|
|
return f"No user found with ID: {user_id}"
|
|
|
|
# Validate memory type
|
|
if memory_type not in ['SemanticMemory', 'EpisodicMemory', 'Buffer']:
|
|
return "Invalid memory type. Choose from 'SemanticMemory', 'EpisodicMemory', or 'Buffer'."
|
|
|
|
# Delete specific memory type nodes and relationships for the given user
|
|
delete_query = f"""
|
|
MATCH (user:User {{userId: '{user_id}'}})-[:HAS_{memory_type.upper()}]->(memory)
|
|
DETACH DELETE memory
|
|
"""
|
|
self.graph.query(delete_query)
|
|
return f"{memory_type} deleted for user ID: {user_id}"
|
|
except Exception as e:
|
|
return f"An error occurred: {str(e)}"
|
|
def retrieve_semantic_memory(self, user_id: str):
|
|
query = f"""
|
|
MATCH (user:User {{userId: {user_id} }})-[:HAS_SEMANTIC_MEMORY]->(semantic:SemanticMemory)
|
|
MATCH (semantic)-[:HAS_KNOWLEDGE]->(knowledge)
|
|
RETURN knowledge
|
|
"""
|
|
return self.query(query, params={"user_id": user_id})
|
|
|
|
def retrieve_episodic_memory(self, user_id: str):
|
|
query = """
|
|
MATCH (user:User {userId: $user_id})-[:HAS_EPISODIC_MEMORY]->(episodic:EpisodicMemory)
|
|
MATCH (episodic)-[:HAS_EVENT]->(event)
|
|
RETURN event
|
|
"""
|
|
return self.query(query, params={"user_id": user_id})
|
|
|
|
def retrieve_buffer_memory(self, user_id: str):
|
|
query = """
|
|
MATCH (user:User {userId: $user_id})-[:HAS_BUFFER]->(buffer:Buffer)
|
|
MATCH (buffer)-[:CURRENTLY_HOLDING]->(item)
|
|
RETURN item
|
|
"""
|
|
return self.query(query, params={"user_id": user_id})
|
|
def generate_graph_semantic_memory_document_summary(self, document_summary : str, unique_graphdb_mapping_values: dict, document_namespace: str):
|
|
""" This function takes a document and generates a document summary in Semantic Memory"""
|
|
create_statements = []
|
|
with_statement = f"WITH {', '.join(unique_graphdb_mapping_values.values())}, user, semantic, episodic, buffer"
|
|
create_statements.append(with_statement)
|
|
|
|
# Loop through each node and create relationships based on memory_type
|
|
|
|
create_statements.append(f"CREATE (semantic)-[:HAS_KNOWLEDGE]->({unique_graphdb_mapping_values})")
|
|
|
|
|
|
return create_statements
|
|
|
|
|
|
def generate_document_summary(self, document_summary : str, unique_graphdb_mapping_values: dict, document_namespace: str):
|
|
""" This function takes a document and generates a document summary in Semantic Memory"""
|
|
|
|
|
|
# fetch namespace from postgres db
|
|
# fetch 1st and last page from vector store
|
|
# summarize the text, add document type
|
|
# write to postgres
|
|
create_statements = []
|
|
with_statement = f"WITH {', '.join(unique_graphdb_mapping_values.values())}, user, semantic, episodic, buffer"
|
|
create_statements.append(with_statement)
|
|
|
|
# Loop through each node and create relationships based on memory_type
|
|
|
|
create_statements.append(f"CREATE (semantic)-[:HAS_KNOWLEDGE]->({unique_graphdb_mapping_values})")
|
|
|
|
|
|
return create_statements
|
|
|
|
async def get_document_categories(self, user_id: str):
|
|
"""
|
|
Retrieve a list of categories for all documents associated with a given user.
|
|
|
|
This function executes a Cypher query in a Neo4j database to fetch the categories
|
|
of all 'Document' nodes that are linked to the 'SemanticMemory' node of the specified user.
|
|
|
|
Parameters:
|
|
- session (AsyncSession): The database session for executing the query.
|
|
- user_id (str): The unique identifier of the user.
|
|
|
|
Returns:
|
|
- List[str]: A list of document categories associated with the user.
|
|
|
|
Raises:
|
|
- Exception: If an error occurs during the database query execution.
|
|
"""
|
|
try:
|
|
query = f'''
|
|
MATCH (user:User {{userId: {user_id} }})-[:HAS_SEMANTIC_MEMORY]->(semantic:SemanticMemory)-[:HAS_DOCUMENT]->(document:Document)
|
|
RETURN document.documentCategory AS category
|
|
'''
|
|
result = self.query(query)
|
|
categories = [record["category"] for record in result]
|
|
return categories
|
|
except Exception as e:
|
|
logging.error(f"An error occurred while retrieving document categories: {str(e)}")
|
|
return None
|
|
|
|
def create_document_node_cypher(self, document_summary: dict, user_id: str) -> str:
|
|
"""
|
|
Generate a Cypher query to create a Document node linked to a SemanticMemory node for a user.
|
|
|
|
Parameters:
|
|
- document_summary (dict): A dictionary containing the document's category, title, and summary.
|
|
- user_id (str): The unique identifier for the user.
|
|
|
|
Returns:
|
|
- str: A Cypher query string with parameters.
|
|
|
|
Raises:
|
|
- ValueError: If any required data is missing or invalid.
|
|
"""
|
|
|
|
# Validate the input parameters
|
|
if not isinstance(document_summary, dict):
|
|
raise ValueError("The document_summary must be a dictionary.")
|
|
if not all(key in document_summary for key in ['document_category', 'title', 'summary']):
|
|
raise ValueError("The document_summary dictionary is missing required keys.")
|
|
if not isinstance(user_id, str) or not user_id:
|
|
raise ValueError("The user_id must be a non-empty string.")
|
|
|
|
# Escape single quotes in the document summary data (if not using parameters)
|
|
# title = document_summary['title'].replace("'", "\\'")
|
|
# summary = document_summary['summary'].replace("'", "\\'")
|
|
# document_category = document_summary['document_category'].replace("'", "\\'")
|
|
|
|
# Generate the Cypher query using parameters
|
|
cypher_query = f'''
|
|
// Ensure the User node exists
|
|
MERGE (user:User {{ userId: $user_id }})
|
|
|
|
// Ensure the SemanticMemory node exists and is connected to the User
|
|
MERGE (semantic:SemanticMemory {{ userId: $user_id }})
|
|
MERGE (user)-[:HAS_SEMANTIC_MEMORY]->(semantic)
|
|
|
|
// Create the Document node with its properties
|
|
CREATE (document:Document {{
|
|
title: $title,
|
|
summary: $summary,
|
|
documentCategory: $document_category
|
|
}})
|
|
|
|
// Link the Document node to the SemanticMemory node
|
|
CREATE (semantic)-[:HAS_DOCUMENT]->(document)
|
|
'''
|
|
|
|
return cypher_query
|
|
|
|
def update_document_node_with_namespace(self, user_id: str, vectordb_namespace: str, document_id: str):
|
|
# Generate the Cypher query
|
|
cypher_query = f'''
|
|
MATCH (user:User {{userId: '{user_id}' }})-[:HAS_SEMANTIC_MEMORY]->(semantic:SemanticMemory)-[:HAS_DOCUMENT]->(document:Document {{d_id: '{document_id}'}})
|
|
SET document.vectordbNamespace = '{vectordb_namespace}'
|
|
RETURN document
|
|
'''
|
|
|
|
|
|
return cypher_query
|
|
|
|
def get_namespaces_by_document_category(self, user_id: str, category: str):
|
|
"""
|
|
Retrieve a list of Vectordb namespaces for documents of a specified category associated with a given user.
|
|
|
|
This function executes a Cypher query in a Neo4j database to fetch the 'vectordbNamespace' of all 'Document' nodes
|
|
that are linked to the 'SemanticMemory' node of the specified user and belong to the specified category.
|
|
|
|
Parameters:
|
|
- user_id (str): The unique identifier of the user.
|
|
- category (str): The category to filter the documents by.
|
|
|
|
Returns:
|
|
- List[str]: A list of Vectordb namespaces for documents in the specified category.
|
|
|
|
Raises:
|
|
- Exception: If an error occurs during the database query execution.
|
|
"""
|
|
try:
|
|
query = f'''
|
|
MATCH (user:User {{userId: '{user_id}'}})-[:HAS_SEMANTIC_MEMORY]->(semantic:SemanticMemory)-[:HAS_DOCUMENT]->(document:Document)
|
|
WHERE document.documentCategory = '{category}'
|
|
RETURN document.vectordbNamespace AS namespace
|
|
'''
|
|
result = self.query(query)
|
|
namespaces = [record["namespace"] for record in result]
|
|
return namespaces
|
|
except Exception as e:
|
|
logging.error(f"An error occurred while retrieving namespaces by document category: {str(e)}")
|
|
return None
|
|
|
|
|
|
class NetworkXGraphDB(AbstractGraphDB):
|
|
def __init__(self):
|
|
self.graph = nx.Graph()
|
|
# Initialize other necessary properties or configurations
|
|
|
|
def create_base_cognitive_architecture(self, user_id: str):
|
|
# Add nodes for user and memory types if they don't exist
|
|
self.graph.add_node(user_id, type='User')
|
|
self.graph.add_node(f"{user_id}_semantic", type='SemanticMemory')
|
|
self.graph.add_node(f"{user_id}_episodic", type='EpisodicMemory')
|
|
self.graph.add_node(f"{user_id}_buffer", type='Buffer')
|
|
|
|
# Add edges to connect user to memory types
|
|
self.graph.add_edge(user_id, f"{user_id}_semantic", relation='HAS_SEMANTIC_MEMORY')
|
|
self.graph.add_edge(user_id, f"{user_id}_episodic", relation='HAS_EPISODIC_MEMORY')
|
|
self.graph.add_edge(user_id, f"{user_id}_buffer", relation='HAS_BUFFER')
|
|
|
|
def delete_all_user_memories(self, user_id: str):
|
|
# Remove nodes and edges related to the user's memories
|
|
for memory_type in ['semantic', 'episodic', 'buffer']:
|
|
memory_node = f"{user_id}_{memory_type}"
|
|
self.graph.remove_node(memory_node)
|
|
|
|
def delete_specific_memory_type(self, user_id: str, memory_type: str):
|
|
# Remove a specific type of memory node and its related edges
|
|
memory_node = f"{user_id}_{memory_type.lower()}"
|
|
if memory_node in self.graph:
|
|
self.graph.remove_node(memory_node)
|
|
|
|
# Methods for retrieving semantic, episodic, and buffer memories
|
|
def retrieve_semantic_memory(self, user_id: str):
|
|
return [n for n in self.graph.neighbors(f"{user_id}_semantic")]
|
|
|
|
def retrieve_episodic_memory(self, user_id: str):
|
|
return [n for n in self.graph.neighbors(f"{user_id}_episodic")]
|
|
|
|
def retrieve_buffer_memory(self, user_id: str):
|
|
return [n for n in self.graph.neighbors(f"{user_id}_buffer")]
|
|
|
|
class GraphDBFactory:
|
|
def create_graph_db(self, db_type, **kwargs):
|
|
if db_type == 'neo4j':
|
|
return Neo4jGraphDB(**kwargs)
|
|
elif db_type == 'networkx':
|
|
return NetworkXGraphDB(**kwargs)
|
|
else:
|
|
raise ValueError(f"Unsupported database type: {db_type}")
|
|
|
|
|
|
|
|
|