diff --git a/cognitive_architecture/database/graph_database/graph.py b/cognitive_architecture/database/graph_database/graph.py index de7b665f1..98f16bb28 100644 --- a/cognitive_architecture/database/graph_database/graph.py +++ b/cognitive_architecture/database/graph_database/graph.py @@ -1,7 +1,4 @@ -# import marvin -# from pydantic_settings import BaseSettings -# from marvin import ai_classifier -# marvin.settings.openai.api_key = os.environ.get("OPENAI_API_KEY") + import logging import os @@ -31,6 +28,7 @@ from pydantic import BaseModel, Field from typing import List, Dict, Optional from ...utils import format_dict, append_uuid_to_variable_names, create_edge_variable_mapping, \ create_node_variable_mapping, get_unsumarized_vector_db_namespace +from ...llm.queries import generate_summary, generate_graph DEFAULT_PRESET = "promethai_chat" preset_options = [DEFAULT_PRESET] @@ -40,6 +38,8 @@ load_dotenv() OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") from ...config import Config +from ...shared.data_models import Node, Edge, KnowledgeGraph, GraphQLQuery, MemorySummary + config = Config() config.load() @@ -50,152 +50,6 @@ OPENAI_API_KEY = config.openai_key aclient = instructor.patch(OpenAI()) - -#Execute Cypher queries to create the user and memory components if they don't exist -# -# graph.query( -# f""" -# // Ensure the User node exists -# MERGE (user:User {{ userId: {user} }}) -# -# // Ensure the SemanticMemory node exists -# MERGE (semantic:SemanticMemory {{ userId: {user} }}) -# MERGE (user)-[:HAS_SEMANTIC_MEMORY]->(semantic) -# -# // Ensure the EpisodicMemory node exists -# MERGE (episodic:EpisodicMemory {{ userId: {user} }}) -# MERGE (user)-[:HAS_EPISODIC_MEMORY]->(episodic) -# -# // Ensure the Buffer node exists -# MERGE (buffer:Buffer {{ userId: {user} }}) -# MERGE (user)-[:HAS_BUFFER]->(buffer) -# """ -# ) -# -# # Execute Cypher queries to create the cognitive components in the graph -# graph.query( -# f""" -# // Parsing the query into components and linking them to the user and memory components -# MERGE (user:User {{ userId: {user} }}) -# MERGE (semantic:SemanticMemory {{ userId: {user} }}) -# MERGE (episodic:EpisodicMemory {{ userId: {user} }}) -# MERGE (buffer:Buffer {{ userId: {user} }}) -# - # CREATE (action1:Event {{ description: 'take a walk', location: 'forest' }}) - # CREATE (action2:Event {{ description: 'get information', source: 'book' }}) - # CREATE (time:TimeContext {{ description: 'in the afternoon' }}) - # - # WITH user, semantic, episodic, buffer, action1, action2, time - # CREATE (knowledge:Knowledge {{ content: 'information from a book' }}) - # CREATE (semantic)-[:HAS_KNOWLEDGE]->(knowledge) - # CREATE (episodic)-[:HAS_EVENT]->(action1) - # CREATE (episodic)-[:HAS_EVENT]->(action2) - # CREATE (episodic)-[:HAS_TIME_CONTEXT]->(time) - # CREATE (buffer)-[:CURRENTLY_HOLDING]->(action1) - # CREATE (buffer)-[:CURRENTLY_HOLDING]->(action2) - # CREATE (buffer)-[:CURRENTLY_HOLDING]->(time) -# """ -# ) - - -class Node(BaseModel): - id: int - description: str - category: str - color: str ="blue" - memory_type: str - created_at: Optional[float] = None - summarized: Optional[bool] = None - - - -class Edge(BaseModel): - source: int - target: int - description: str - color: str= "blue" - created_at: Optional[float] = None - summarized: Optional[bool] = None - -class KnowledgeGraph(BaseModel): - nodes: List[Node] = Field(..., default_factory=list) - edges: List[Edge] = Field(..., default_factory=list) - -class GraphQLQuery(BaseModel): - query: str -# - -def generate_graph(input) -> KnowledgeGraph: - out = aclient.chat.completions.create( - model="gpt-4-1106-preview", - messages=[ - { - "role": "user", - "content": f"""Use the given format to extract information from the following input: {input}. """, - - }, - { "role":"system", "content": """You are a top-tier algorithm - designed for extracting information in structured formats to build a knowledge graph. - - **Nodes** represent entities and concepts. They're akin to Wikipedia nodes. - - The aim is to achieve simplicity and clarity in the - knowledge graph, making it accessible for a vast audience. - ## 2. Labeling Nodes - - **Consistency**: Ensure you use basic or elementary types for node labels. - - For example, when you identify an entity representing a person, - always label it as **"person"**. - Avoid using more specific terms like "mathematician" or "scientist". - - Include event, entity, time, or action nodes to the category. - - Classify the memory type as episodic or semantic. - - **Node IDs**: Never utilize integers as node IDs. - Node IDs should be names or human-readable identifiers found in the text. - ## 3. Handling Numerical Data and Dates - - Numerical data, like age or other related information, - should be incorporated as attributes or properties of the respective nodes. - - **No Separate Nodes for Dates/Numbers**: - Do not create separate nodes for dates or numerical values. - Always attach them as attributes or properties of nodes. - - **Property Format**: Properties must be in a key-value format. - - **Quotation Marks**: Never use escaped single or double quotes within property values. - - **Naming Convention**: Use camelCase for property keys, e.g., `birthDate`. - ## 4. Coreference Resolution - - **Maintain Entity Consistency**: - When extracting entities, it's vital to ensure consistency. - If an entity, such as "John Doe", is mentioned multiple times - in the text but is referred to by different names or pronouns (e.g., "Joe", "he"), - always use the most complete identifier for that entity throughout the knowledge graph. - In this example, use "John Doe" as the entity ID. - Remember, the knowledge graph should be coherent and easily understandable, - so maintaining consistency in entity references is crucial. - ## 5. Strict Compliance - Adhere to the rules strictly. Non-compliance will result in termination."""} - ], - response_model=KnowledgeGraph, - ) - return out - -class MemorySummary(BaseModel): - nodes: List[Node] = Field(..., default_factory=list) - edges: List[Edge] = Field(..., default_factory=list) - -async def generate_summary(input) -> MemorySummary: - out = aclient.chat.completions.create( - model="gpt-4-1106-preview", - messages=[ - { - "role": "user", - "content": f"""Use the given format summarize and reduce the following input: {input}. """, - - }, - { "role":"system", "content": """You are a top-tier algorithm - designed for summarizing existing knowledge graphs in structured formats based on a knowledge graph. - ## 1. Strict Compliance - Adhere to the rules strictly. Non-compliance will result in termination. - ## 2. Don't forget your main goal is to reduce the number of nodes in the knowledge graph while preserving the information contained in it."""} - ], - response_model=MemorySummary, - ) - return out - class AbstractGraphDB(ABC): @abstractmethod @@ -297,58 +151,6 @@ class Neo4jGraphDB(AbstractGraphDB): return reduced_graph - # def add_summarized_memory(self, user_id: str, memory_type: str, timestamp: float = None, summarized: bool = None): - # - - - def user_query_to_edges_and_nodes(self, input: str) ->KnowledgeGraph: - return aclient.chat.completions.create( - model=config.model, - messages=[ - { - "role": "user", - "content": f"""Use the given format to extract information from the following input: {input}. """, - - }, - {"role": "system", "content": """You are a top-tier algorithm - designed for extracting information in structured formats to build a knowledge graph. - - **Nodes** represent entities and concepts. They're akin to Wikipedia nodes. - - The aim is to achieve simplicity and clarity in the - knowledge graph, making it accessible for a vast audience. - ## 2. Labeling Nodes - - **Consistency**: Ensure you use basic or elementary types for node labels. - - For example, when you identify an entity representing a person, - always label it as **"person"**. - Avoid using more specific terms like "mathematician" or "scientist". - - Include event, entity, time, or action nodes to the category. - - Classify the memory type as episodic or semantic. - - **Node IDs**: Never utilize integers as node IDs. - Node IDs should be names or human-readable identifiers found in the text. - ## 3. Handling Numerical Data and Dates - - Numerical data, like age or other related information, - should be incorporated as attributes or properties of the respective nodes. - - **No Separate Nodes for Dates/Numbers**: - Do not create separate nodes for dates or numerical values. - Always attach them as attributes or properties of nodes. - - **Property Format**: Properties must be in a key-value format. - - **Quotation Marks**: Never use escaped single or double quotes within property values. - - **Naming Convention**: Use camelCase for property keys, e.g., `birthDate`. - ## 4. Coreference Resolution - - **Maintain Entity Consistency**: - When extracting entities, it's vital to ensure consistency. - If an entity, such as "John Doe", is mentioned multiple times - in the text but is referred to by different names or pronouns (e.g., "Joe", "he"), - always use the most complete identifier for that entity throughout the knowledge graph. - In this example, use "John Doe" as the entity ID. - Remember, the knowledge graph should be coherent and easily understandable, - so maintaining consistency in entity references is crucial. - ## 5. Strict Compliance - Adhere to the rules strictly. Non-compliance will result in termination."""} - ], - response_model=KnowledgeGraph, - ) - - def cypher_statement_correcting(self, input: str) ->str: return aclient.chat.completions.create( model=config.model, @@ -661,63 +463,7 @@ class Neo4jGraphDB(AbstractGraphDB): logging.error(f"An error occurred while retrieving document summary: {str(e)}") return None - # async def get_document_categories(self, user_id: str): - # """ - # Retrieve a list of categories for all documents associated with a given user. - # - # This function executes a Cypher query in a Neo4j database to fetch the categories - # of all 'Document' nodes that are linked to the 'SemanticMemory' node of the specified user. - # - # Parameters: - # - session (AsyncSession): The database session for executing the query. - # - user_id (str): The unique identifier of the user. - # - # Returns: - # - List[str]: A list of document categories associated with the user. - # - # Raises: - # - Exception: If an error occurs during the database query execution. - # """ - # try: - # query = f''' - # MATCH (user:User {{userId: '{user_id}' }})-[:HAS_SEMANTIC_MEMORY]->(semantic:SemanticMemory)-[:HAS_DOCUMENT]->(document:Document) - # RETURN document.documentCategory AS category - # ''' - # logging.info(f"Generated Cypher query: {query}") - # return query - # - # except Exception as e: - # logging.error(f"An error occurred while retrieving document categories: {str(e)}") - # return None - # async def get_document_ids(self, user_id: str, category: str): - # """ - # Retrieve a list of document IDs for a specific category associated with a given user. - # - # This function executes a Cypher query in a Neo4j database to fetch the IDs - # of all 'Document' nodes in a specific category that are linked to the 'SemanticMemory' node of the specified user. - # - # Parameters: - # - user_id (str): The unique identifier of the user. - # - category (str): The specific document category to filter by. - # - # Returns: - # - List[str]: A list of document IDs in the specified category associated with the user. - # - # Raises: - # - Exception: If an error occurs during the database query execution. - # """ - # try: - # query = f''' - # MATCH (user:User {{userId: '{user_id}' }})-[:HAS_SEMANTIC_MEMORY]->(semantic:SemanticMemory)-[:HAS_DOCUMENT]->(document:Document {{documentCategory: '{category}'}}) - # RETURN document.d_id AS d_id - # ''' - # logging.info(f"Generated Cypher query: {query}") - # return query - # - # except Exception as e: - # logging.error(f"An error occurred while retrieving document IDs: {str(e)}") - # return None async def get_memory_linked_document_ids(self, user_id: str, summary_id: str, memory_type: str = "PublicMemory"): """ @@ -1006,245 +752,9 @@ class Neo4jGraphDB(AbstractGraphDB): return node_ids - # def retrieve_linked_memory_for_user(self, user_id: str, topic: str, relationship_type: str = 'HAS_MEMORY'): - # query = f""" - # MATCH (user:User {{userId: $user_id}})-[:{relationship_type}]->({topic.lower()}:{topic}) - # RETURN {topic.lower()} - # """ - # return self.query(query, params={"user_id": user_id}) - # - # - # async def link_memory_to_user(self, memory_id: int, user_id: str, relationship_type: str = 'HAS_MEMORY') -> None: - # """ - # Link a memory node to a user with a specified relationship type. - # - # Args: - # memory_id (int): The ID of the memory node. - # user_id (str): The user ID to link the memory to. - # relationship_type (str): The type of relationship. - # - # Raises: - # ValueError: If input parameters are invalid. - # Neo4jError: If an error occurs during the database operation. - # """ - # if not user_id or not memory_id: - # raise ValueError("User ID and Memory ID are required for linking.") - # - # try: - # link_cypher = f""" - # MATCH (user:User {{userId: '{user_id}'}}) - # MATCH (memory) WHERE id(memory) = {memory_id} - # MERGE (user)-[:{relationship_type}]->(memory) - # """ - # await self.query(link_cypher) - # except Neo4jError as e: - # logging.error(f"Error linking memory to user: {e}") - # raise - # - # async def delete_memory_node(self, memory_id: int, memory_type: str) -> None: - # """ - # Delete a memory node of a specified type. - # - # Args: - # memory_id (int): The ID of the memory node to delete. - # memory_type (str): The type of the memory node. - # - # Raises: - # ValueError: If input parameters are invalid. - # Neo4jError: If an error occurs during the database operation. - # """ - # if not memory_id or not memory_type: - # raise ValueError("Memory ID and Memory Type are required for deletion.") - # - # try: - # delete_cypher = f""" - # MATCH (memory:{memory_type}) WHERE id(memory) = {memory_id} - # DETACH DELETE memory - # """ - # await self.query(delete_cypher) - # except Neo4jError as e: - # logging.error(f"Error deleting memory node: {e}") - # raise - # - # async def unlink_memory_from_user(self, memory_id: int, user_id: str, relationship_type: str = 'HAS_MEMORY') -> None: - # """ - # Unlink a memory node from a user. - # - # Args: - # memory_id (int): The ID of the memory node. - # user_id (str): The user ID to unlink from the memory. - # relationship_type (str): The type of relationship. - # - # Raises: - # ValueError: If input parameters are invalid. - # Neo4jError: If an error occurs during the database operation. - # """ - # if not user_id or not memory_id: - # raise ValueError("User ID and Memory ID are required for unlinking.") - # - # try: - # unlink_cypher = f""" - # MATCH (user:User {{userId: '{user_id}'}})-[r:{relationship_type}]->(memory) WHERE id(memory) = {memory_id} - # DELETE r - # """ - # await self.query(unlink_cypher) - # except Neo4jError as e: - # logging.error(f"Error unlinking memory from user: {e}") - # raise - # - - # - # def create_public_memory(self, labels, topic=None): - # if topic is None: - # topic = "SerbianArchitecture" - # topicMemoryId = topic + "MemoryId" - # # Create an independent Architecture Memory node with countries as properties - # label_list = ', '.join(f"'{label}'" for label in labels) # Prepare countries list as a string - # architecture_memory_cypher = f""" - # CREATE ({topic.lower()}:{topic} {{description: '{topic}', label: [{label_list}]}}) - # RETURN id({topic.lower()}) AS {topicMemoryId} - # """ - # return self.query(architecture_memory_cypher) - # - # def link_public_memory_to_user(self, public_memory_id, user_id): - # # Link an existing Public Memory node to a User node - # link_cypher = f""" - # MATCH (user:User {{userId: '{user_id}'}}) - # MATCH (publicMemory:PublicMemory) WHERE id(publicMemory) = {public_memory_id} - # MERGE (user)-[:HAS_PUBLIC_MEMORY]->(publicMemory) - # """ - # self.query(link_cypher) - # - # def link_public_memory_to_architecture(self, public_memory_id): - # # Link the Public Memory node to the Architecture Memory node - # link_cypher = f""" - # MATCH (publicMemory:PublicMemory) WHERE id(publicMemory) = {public_memory_id} - # MATCH (architecture:Architecture {{description: 'Architecture'}}) - # MERGE (publicMemory)-[:INCLUDES]->(architecture) - # """ - # self.query(link_cypher) - # - # def delete_public_memory(self, public_memory_id): - # # Delete a Public Memory node by its ID - # delete_cypher = f""" - # MATCH (publicMemory:PublicMemory) WHERE id(publicMemory) = {public_memory_id} - # DETACH DELETE publicMemory - # """ - # self.query(delete_cypher) - # - # def delete_architecture_memory(self, architecture_memory_id): - # # Delete an Architecture Memory node by its ID - # delete_cypher = f""" - # MATCH (architecture:Architecture) WHERE id(architecture) = {architecture_memory_id} - # DETACH DELETE architecture - # """ - # self.query(delete_cypher) - # - # def unlink_public_memory_from_user(self, public_memory_id, user_id): - # # Unlink a Public Memory node from a User node - # unlink_cypher = f""" - # MATCH (user:User {{userId: '{user_id}'}})-[r:HAS_PUBLIC_MEMORY]->(publicMemory:PublicMemory) WHERE id(publicMemory) = {public_memory_id} - # DELETE r - # """ - # self.query(unlink_cypher) - # - # def unlink_public_memory_from_architecture(self, public_memory_id): - # # Unlink the Public Memory node from the Architecture Memory node - # unlink_cypher = f""" - # MATCH (publicMemory:PublicMemory)-[r:INCLUDES]->(architecture:Architecture) WHERE id(publicMemory) = {public_memory_id} - # DELETE r - # """ - # self.query(unlink_cypher) - - -class NetworkXGraphDB: - def __init__(self, filename='networkx_graph.pkl'): - self.filename = filename - try: - self.graph = self.load_graph() # Attempt to load an existing graph - except (FileNotFoundError, EOFError, pickle.UnpicklingError): - self.graph = nx.Graph() # Create a new graph if loading failed - - def save_graph(self): - """ Save the graph to a file using pickle """ - with open(self.filename, 'wb') as f: - pickle.dump(self.graph, f) - - def load_graph(self): - """ Load the graph from a file using pickle """ - with open(self.filename, 'rb') as f: - return pickle.load(f) - - def create_base_cognitive_architecture(self, user_id: str): - # Add nodes for user and memory types if they don't exist - self.graph.add_node(user_id, type='User') - self.graph.add_node(f"{user_id}_semantic", type='SemanticMemory') - self.graph.add_node(f"{user_id}_episodic", type='EpisodicMemory') - self.graph.add_node(f"{user_id}_buffer", type='Buffer') - - # Add edges to connect user to memory types - self.graph.add_edge(user_id, f"{user_id}_semantic", relation='HAS_SEMANTIC_MEMORY') - self.graph.add_edge(user_id, f"{user_id}_episodic", relation='HAS_EPISODIC_MEMORY') - self.graph.add_edge(user_id, f"{user_id}_buffer", relation='HAS_BUFFER') - - self.save_graph() # Save the graph after modifying it - - def delete_all_user_memories(self, user_id: str): - # Remove nodes and edges related to the user's memories - for memory_type in ['semantic', 'episodic', 'buffer']: - memory_node = f"{user_id}_{memory_type}" - self.graph.remove_node(memory_node) - - self.save_graph() # Save the graph after modifying it - - def delete_specific_memory_type(self, user_id: str, memory_type: str): - # Remove a specific type of memory node and its related edges - memory_node = f"{user_id}_{memory_type.lower()}" - if memory_node in self.graph: - self.graph.remove_node(memory_node) - - self.save_graph() # Save the graph after modifying it - - def retrieve_semantic_memory(self, user_id: str): - return [n for n in self.graph.neighbors(f"{user_id}_semantic")] - - def retrieve_episodic_memory(self, user_id: str): - return [n for n in self.graph.neighbors(f"{user_id}_episodic")] - - def retrieve_buffer_memory(self, user_id: str): - return [n for n in self.graph.neighbors(f"{user_id}_buffer")] - - def generate_graph_semantic_memory_document_summary(self, document_summary, unique_graphdb_mapping_values, document_namespace, user_id): - for node, attributes in unique_graphdb_mapping_values.items(): - self.graph.add_node(node, **attributes) - self.graph.add_edge(f"{user_id}_semantic", node, relation='HAS_KNOWLEDGE') - self.save_graph() - - def generate_document_summary(self, document_summary, unique_graphdb_mapping_values, document_namespace, user_id): - self.generate_graph_semantic_memory_document_summary(document_summary, unique_graphdb_mapping_values, document_namespace, user_id) - - async def get_document_categories(self, user_id): - return [self.graph.nodes[n]['category'] for n in self.graph.neighbors(f"{user_id}_semantic") if 'category' in self.graph.nodes[n]] - - async def get_document_ids(self, user_id, category): - return [n for n in self.graph.neighbors(f"{user_id}_semantic") if self.graph.nodes[n].get('category') == category] - - def create_document_node(self, document_summary, user_id): - d_id = document_summary['d_id'] - self.graph.add_node(d_id, **document_summary) - self.graph.add_edge(f"{user_id}_semantic", d_id, relation='HAS_DOCUMENT') - self.save_graph() - - def update_document_node_with_namespace(self, user_id, vectordb_namespace, document_id): - if self.graph.has_node(document_id): - self.graph.nodes[document_id]['vectordbNamespace'] = vectordb_namespace - self.save_graph() - - def get_namespaces_by_document_category(self, user_id, category): - return [self.graph.nodes[n].get('vectordbNamespace') for n in self.graph.neighbors(f"{user_id}_semantic") if self.graph.nodes[n].get('category') == category] - +from networkx_graph import NetworkXGraphDB class GraphDBFactory: def create_graph_db(self, db_type, **kwargs): if db_type == 'neo4j': diff --git a/cognitive_architecture/database/graph_database/networkx_graph.py b/cognitive_architecture/database/graph_database/networkx_graph.py new file mode 100644 index 000000000..adcabf0a3 --- /dev/null +++ b/cognitive_architecture/database/graph_database/networkx_graph.py @@ -0,0 +1,90 @@ +import pickle + +import networkx as nx + + +class NetworkXGraphDB: + def __init__(self, filename='networkx_graph.pkl'): + self.filename = filename + try: + self.graph = self.load_graph() # Attempt to load an existing graph + except (FileNotFoundError, EOFError, pickle.UnpicklingError): + self.graph = nx.Graph() # Create a new graph if loading failed + + def save_graph(self): + """ Save the graph to a file using pickle """ + with open(self.filename, 'wb') as f: + pickle.dump(self.graph, f) + + def load_graph(self): + """ Load the graph from a file using pickle """ + with open(self.filename, 'rb') as f: + return pickle.load(f) + + def create_base_cognitive_architecture(self, user_id: str): + # Add nodes for user and memory types if they don't exist + self.graph.add_node(user_id, type='User') + self.graph.add_node(f"{user_id}_semantic", type='SemanticMemory') + self.graph.add_node(f"{user_id}_episodic", type='EpisodicMemory') + self.graph.add_node(f"{user_id}_buffer", type='Buffer') + + # Add edges to connect user to memory types + self.graph.add_edge(user_id, f"{user_id}_semantic", relation='HAS_SEMANTIC_MEMORY') + self.graph.add_edge(user_id, f"{user_id}_episodic", relation='HAS_EPISODIC_MEMORY') + self.graph.add_edge(user_id, f"{user_id}_buffer", relation='HAS_BUFFER') + + self.save_graph() # Save the graph after modifying it + + def delete_all_user_memories(self, user_id: str): + # Remove nodes and edges related to the user's memories + for memory_type in ['semantic', 'episodic', 'buffer']: + memory_node = f"{user_id}_{memory_type}" + self.graph.remove_node(memory_node) + + self.save_graph() # Save the graph after modifying it + + def delete_specific_memory_type(self, user_id: str, memory_type: str): + # Remove a specific type of memory node and its related edges + memory_node = f"{user_id}_{memory_type.lower()}" + if memory_node in self.graph: + self.graph.remove_node(memory_node) + + self.save_graph() # Save the graph after modifying it + + def retrieve_semantic_memory(self, user_id: str): + return [n for n in self.graph.neighbors(f"{user_id}_semantic")] + + def retrieve_episodic_memory(self, user_id: str): + return [n for n in self.graph.neighbors(f"{user_id}_episodic")] + + def retrieve_buffer_memory(self, user_id: str): + return [n for n in self.graph.neighbors(f"{user_id}_buffer")] + + def generate_graph_semantic_memory_document_summary(self, document_summary, unique_graphdb_mapping_values, document_namespace, user_id): + for node, attributes in unique_graphdb_mapping_values.items(): + self.graph.add_node(node, **attributes) + self.graph.add_edge(f"{user_id}_semantic", node, relation='HAS_KNOWLEDGE') + self.save_graph() + + def generate_document_summary(self, document_summary, unique_graphdb_mapping_values, document_namespace, user_id): + self.generate_graph_semantic_memory_document_summary(document_summary, unique_graphdb_mapping_values, document_namespace, user_id) + + async def get_document_categories(self, user_id): + return [self.graph.nodes[n]['category'] for n in self.graph.neighbors(f"{user_id}_semantic") if 'category' in self.graph.nodes[n]] + + async def get_document_ids(self, user_id, category): + return [n for n in self.graph.neighbors(f"{user_id}_semantic") if self.graph.nodes[n].get('category') == category] + + def create_document_node(self, document_summary, user_id): + d_id = document_summary['d_id'] + self.graph.add_node(d_id, **document_summary) + self.graph.add_edge(f"{user_id}_semantic", d_id, relation='HAS_DOCUMENT') + self.save_graph() + + def update_document_node_with_namespace(self, user_id, vectordb_namespace, document_id): + if self.graph.has_node(document_id): + self.graph.nodes[document_id]['vectordbNamespace'] = vectordb_namespace + self.save_graph() + + def get_namespaces_by_document_category(self, user_id, category): + return [self.graph.nodes[n].get('vectordbNamespace') for n in self.graph.neighbors(f"{user_id}_semantic") if self.graph.nodes[n].get('category') == category] diff --git a/cognitive_architecture/database/vectordb/basevectordb.py b/cognitive_architecture/database/vectordb/basevectordb.py index 6e734834f..f6a507247 100644 --- a/cognitive_architecture/database/vectordb/basevectordb.py +++ b/cognitive_architecture/database/vectordb/basevectordb.py @@ -231,38 +231,6 @@ class BaseMemory: embeddings: Optional[str] = None, ): - # from ast import literal_eval - # class DynamicSchema(Schema): - # pass - # - # default_version = 'current_timestamp' - # version_in_params = params.get("version", default_version) - # - # # Check and update metadata version in DB. - # schema_fields = params - # - # def create_field(field_type, **kwargs): - # field_mapping = { - # "Str": fields.Str, - # "Int": fields.Int, - # "Float": fields.Float, - # "Bool": fields.Bool, - # } - # return field_mapping[field_type](**kwargs) - # - # # Dynamic Schema Creation - # params['user_id'] = self.user_id - # - # - # schema_instance = self.create_dynamic_schema(params) # Always creating Str field, adjust as needed - # - # logging.info(f"params : {params}") - # - # # Schema Validation - # schema_instance = schema_instance - # print("Schema fields: ", [field for field in schema_instance._declared_fields]) - # loaded_params = schema_instance.load(params) - return await self.vector_db.add_memories( observation=observation, loader_settings=loader_settings, params=params, namespace=namespace, metadata_schema_class = None, embeddings=embeddings diff --git a/cognitive_architecture/graph_database/__init__.py b/cognitive_architecture/graph_database/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/cognitive_architecture/llm/prompts/generate_graph_prompt.txt b/cognitive_architecture/llm/prompts/generate_graph_prompt.txt new file mode 100644 index 000000000..744903cda --- /dev/null +++ b/cognitive_architecture/llm/prompts/generate_graph_prompt.txt @@ -0,0 +1,34 @@ +You are a top-tier algorithm +designed for extracting information in structured formats to build a knowledge graph. +- **Nodes** represent entities and concepts. They're akin to Wikipedia nodes. +- The aim is to achieve simplicity and clarity in the +knowledge graph, making it accessible for a vast audience. +## 2. Labeling Nodes +- **Consistency**: Ensure you use basic or elementary types for node labels. + - For example, when you identify an entity representing a person, + always label it as **"person"**. + Avoid using more specific terms like "mathematician" or "scientist". + - Include event, entity, time, or action nodes to the category. + - Classify the memory type as episodic or semantic. +- **Node IDs**: Never utilize integers as node IDs. + Node IDs should be names or human-readable identifiers found in the text. +## 3. Handling Numerical Data and Dates +- Numerical data, like age or other related information, +should be incorporated as attributes or properties of the respective nodes. +- **No Separate Nodes for Dates/Numbers**: +Do not create separate nodes for dates or numerical values. + Always attach them as attributes or properties of nodes. +- **Property Format**: Properties must be in a key-value format. +- **Quotation Marks**: Never use escaped single or double quotes within property values. +- **Naming Convention**: Use camelCase for property keys, e.g., `birthDate`. +## 4. Coreference Resolution +- **Maintain Entity Consistency**: +When extracting entities, it's vital to ensure consistency. +If an entity, such as "John Doe", is mentioned multiple times +in the text but is referred to by different names or pronouns (e.g., "Joe", "he"), +always use the most complete identifier for that entity throughout the knowledge graph. + In this example, use "John Doe" as the entity ID. +Remember, the knowledge graph should be coherent and easily understandable, + so maintaining consistency in entity references is crucial. +## 5. Strict Compliance +Adhere to the rules strictly. Non-compliance will result in termination \ No newline at end of file diff --git a/cognitive_architecture/llm/queries.py b/cognitive_architecture/llm/queries.py new file mode 100644 index 000000000..ac647d657 --- /dev/null +++ b/cognitive_architecture/llm/queries.py @@ -0,0 +1,84 @@ +import os + +from dotenv import load_dotenv + +from ..shared.data_models import Node, Edge, KnowledgeGraph, GraphQLQuery, MemorySummary +from ..config import Config +import instructor +from openai import OpenAI +config = Config() +config.load() + +print(config.model) +print(config.openai_key) + +OPENAI_API_KEY = config.openai_key + +aclient = instructor.patch(OpenAI()) + +load_dotenv() + + +# Function to read query prompts from files +def read_query_prompt(filename): + with open(filename, 'r') as file: + return file.read() + + +def generate_graph(input) -> KnowledgeGraph: + model = "gpt-4-1106-preview" # Define the model here + user_prompt = f"Use the given format to extract information from the following input: {input}." + system_prompt = read_query_prompt('prompts/generate_graph_prompt.txt') + + out = aclient.chat.completions.create( + model=model, + messages=[ + { + "role": "user", + "content": user_prompt, + }, + { + "role": "system", + "content": system_prompt, + }, + ], + response_model=KnowledgeGraph, + ) + return out + + + +async def generate_summary(input) -> MemorySummary: + out = aclient.chat.completions.create( + model="gpt-4-1106-preview", + messages=[ + { + "role": "user", + "content": f"""Use the given format summarize and reduce the following input: {input}. """, + + }, + { "role":"system", "content": """You are a top-tier algorithm + designed for summarizing existing knowledge graphs in structured formats based on a knowledge graph. + ## 1. Strict Compliance + Adhere to the rules strictly. Non-compliance will result in termination. + ## 2. Don't forget your main goal is to reduce the number of nodes in the knowledge graph while preserving the information contained in it."""} + ], + response_model=MemorySummary, + ) + return out + + +def user_query_to_edges_and_nodes( input: str) ->KnowledgeGraph: + system_prompt = read_query_prompt('prompts/generate_graph_prompt.txt') + return aclient.chat.completions.create( + model=config.model, + messages=[ + { + "role": "user", + "content": f"""Use the given format to extract information from the following input: {input}. """, + + }, + {"role": "system", "content":system_prompt} + ], + response_model=KnowledgeGraph, + ) \ No newline at end of file diff --git a/cognitive_architecture/shared/data_models.py b/cognitive_architecture/shared/data_models.py new file mode 100644 index 000000000..e74355cf6 --- /dev/null +++ b/cognitive_architecture/shared/data_models.py @@ -0,0 +1,33 @@ +from typing import Optional, List + +from pydantic import BaseModel, Field +class Node(BaseModel): + id: int + description: str + category: str + color: str ="blue" + memory_type: str + created_at: Optional[float] = None + summarized: Optional[bool] = None + +class Edge(BaseModel): + source: int + target: int + description: str + color: str= "blue" + created_at: Optional[float] = None + summarized: Optional[bool] = None + +class KnowledgeGraph(BaseModel): + nodes: List[Node] = Field(..., default_factory=list) + edges: List[Edge] = Field(..., default_factory=list) + +class GraphQLQuery(BaseModel): + query: str + +class MemorySummary(BaseModel): + nodes: List[Node] = Field(..., default_factory=list) + edges: List[Edge] = Field(..., default_factory=list) + + + diff --git a/main.py b/main.py index 61240bea6..0aa2bceca 100644 --- a/main.py +++ b/main.py @@ -414,6 +414,8 @@ async def user_context_enrichment(session, user_id:str, query:str, generative_re for _ in range(max_attempts): relevant_summary_id = await classify_call( query= query, document_summaries=str(summaries)) + logging.info("Relevant summary id is %s", relevant_summary_id) + if relevant_summary_id is not None: break @@ -456,7 +458,7 @@ async def user_context_enrichment(session, user_id:str, query:str, generative_re print("Available memory classes:", await memory.list_memory_classes()) results = await memory.dynamic_method_call(dynamic_memory_class, 'fetch_memories', observation=query, params=postgres_id[0], search_type="summary_filter_by_object_name") - logging.info("Result is", str(results)) + logging.info("Result is %s", str(results)) search_context = "" @@ -627,7 +629,7 @@ async def relevance_feedback(query: str, input_type: str): return result async def main(): - user_id = "user" + user_id = "user_test_1_1" async with session_scope(AsyncSessionLocal()) as session: # await update_entity(session, DocsModel, "8cd9a022-5a7a-4af5-815a-f988415536ae", True) @@ -638,6 +640,9 @@ async def main(): class GraphQLQuery(BaseModel): query: str + gg = await user_query_to_graph_db(session, user_id, "How does cognitive architecture work?") + print(gg) + # def cypher_statement_correcting( input: str) -> str: # out = aclient.chat.completions.create( # model=config.model, @@ -704,13 +709,13 @@ async def main(): # await create_public_memory(user_id=user_id, labels=['sr'], topic="PublicMemory") # await add_documents_to_graph_db(session, user_id) # - neo4j_graph_db = Neo4jGraphDB(url=config.graph_database_url, username=config.graph_database_username, - password=config.graph_database_password) + # neo4j_graph_db = Neo4jGraphDB(url=config.graph_database_url, username=config.graph_database_username, + # password=config.graph_database_password) # await attach_user_to_memory(user_id=user_id, labels=['sr'], topic="PublicMemory") - return_ = await user_context_enrichment(user_id=user_id, query="what should the size of a staircase in an apartment building be", session=session, memory_type="PublicMemory", generative_response=True) - print(return_) + # return_ = await user_context_enrichment(user_id=user_id, query="Koja je minimalna širina vrata za osobe sa invaliditetom?", session=session, memory_type="PublicMemory", generative_response=True) + # print(return_) # aa = await relevance_feedback("I need to understand how to build a staircase in an apartment building", "PublicMemory") # print(aa)