diff --git a/level_4/cognitive_architecture/classifiers/classifier.py b/level_4/cognitive_architecture/classifiers/classifier.py index 32a82132e..b1541761f 100644 --- a/level_4/cognitive_architecture/classifiers/classifier.py +++ b/level_4/cognitive_architecture/classifiers/classifier.py @@ -1,10 +1,67 @@ - - +from langchain.prompts import ChatPromptTemplate +import json #TO DO, ADD ALL CLASSIFIERS HERE + + + + +from langchain.chains import create_extraction_chain +from langchain.chat_models import ChatOpenAI + +from ..config import Config + +config = Config() +config.load() +OPENAI_API_KEY = config.openai_key +from langchain.document_loaders import TextLoader +from langchain.document_loaders import DirectoryLoader + + +async def classify_documents(query): + + llm = ChatOpenAI(temperature=0, model=config.model) + prompt_classify = ChatPromptTemplate.from_template( + """You are a summarizer and classifier. Determine what book this is and where does it belong in the output : {query}""" + ) + json_structure = [{ + "name": "summarizer", + "description": "Summarization and classification", + "parameters": { + "type": "object", + "properties": { + "DocumentCategory": { + "type": "string", + "description": "The classification of documents in groups such as legal, medical, etc." + }, + "Title": { + "type": "string", + "description": "The title of the document" + }, + "Summary": { + "type": "string", + "description": "The summary of the document" + } + + + }, "required": ["DocumentCategory", "Title", "Summary"] } + }] + chain_filter = prompt_classify | llm.bind(function_call={"name": "summarizer"}, functions=json_structure) + classifier_output = await chain_filter.ainvoke({"query": query}) + arguments_str = classifier_output.additional_kwargs['function_call']['arguments'] + print("This is the arguments string", arguments_str) + arguments_dict = json.loads(arguments_str) + classfier_value = arguments_dict.get('summarizer', None) + + print("This is the classifier value", classfier_value) + + return classfier_value + + + # classify retrievals according to type of retrieval def classify_retrieval(): pass @@ -12,4 +69,4 @@ def classify_retrieval(): # classify documents according to type of document def classify_call(): - pass + pass \ No newline at end of file diff --git a/level_4/cognitive_architecture/database/graph_database/graph.py b/level_4/cognitive_architecture/database/graph_database/graph.py index 4d71b14eb..1fe6aa287 100644 --- a/level_4/cognitive_architecture/database/graph_database/graph.py +++ b/level_4/cognitive_architecture/database/graph_database/graph.py @@ -485,23 +485,14 @@ class Neo4jGraphDB(AbstractGraphDB): def update_document_node_with_namespace(self, user_id: str, vectordb_namespace: str, document_title: str): # Generate the Cypher query - cypher_query = ''' - MATCH (user:User {userId: $user_id})-[:HAS_SEMANTIC_MEMORY]->(semantic:SemanticMemory)-[:HAS_DOCUMENT]->(document:Document {title: $document_title}) - SET document.vectordbNamespace = $vectordb_namespace + cypher_query = f''' + MATCH (user:User {{userId: '{user_id}' }})-[:HAS_SEMANTIC_MEMORY]->(semantic:SemanticMemory)-[:HAS_DOCUMENT]->(document:Document {{title: '{document_title}'}}) + SET document.vectordbNamespace = '{vectordb_namespace}' RETURN document ''' - # Parameters for the query - parameters = { - 'user_id': user_id, - 'vectordb_namespace': vectordb_namespace, - 'document_title': document_title - } - # Execute the query with the provided parameters - result = self.query(cypher_query, parameters) - - return result + return cypher_query diff --git a/level_4/cognitive_architecture/database/postgres/models/operation.py b/level_4/cognitive_architecture/database/postgres/models/operation.py index cf8168b25..39858503d 100644 --- a/level_4/cognitive_architecture/database/postgres/models/operation.py +++ b/level_4/cognitive_architecture/database/postgres/models/operation.py @@ -13,8 +13,6 @@ class Operation(Base): user_id = Column(String, ForeignKey('users.id'), index=True) # Link to User operation_type = Column(String, nullable=True) operation_status = Column(String, nullable=True) - operation_params = Column(String, nullable=True) - number_of_files = Column(Integer, nullable=True) test_set_id = Column(String, ForeignKey('test_sets.id'), index=True) created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, onupdate=datetime.utcnow) diff --git a/level_4/cognitive_architecture/utils.py b/level_4/cognitive_architecture/utils.py index 2eec84281..7d97149b2 100644 --- a/level_4/cognitive_architecture/utils.py +++ b/level_4/cognitive_architecture/utils.py @@ -1,3 +1,6 @@ +import os +import random +import string import uuid from graphviz import Digraph @@ -32,7 +35,44 @@ class Edge: # dot.render("knowledge_graph.gv", view=True) # # +def get_document_names(doc_input): + """ + Get a list of document names. + This function takes doc_input, which can be a folder path, a single document file path, or a document name as a string. + It returns a list of document names based on the doc_input. + + Args: + doc_input (str): The doc_input can be a folder path, a single document file path, or a document name as a string. + + Returns: + list: A list of document names. + + Example usage: + - Folder path: get_document_names(".data") + - Single document file path: get_document_names(".data/example.pdf") + - Document name provided as a string: get_document_names("example.docx") + + """ + if isinstance(doc_input, list): + return doc_input + if os.path.isdir(doc_input): + # doc_input is a folder + folder_path = doc_input + document_names = [] + for filename in os.listdir(folder_path): + if os.path.isfile(os.path.join(folder_path, filename)): + document_names.append(filename) + return document_names + elif os.path.isfile(doc_input): + # doc_input is a single document file + return [os.path.basename(doc_input)] + elif isinstance(doc_input, str): + # doc_input is a document name provided as a string + return [doc_input] + else: + # doc_input is not valid + return [] def format_dict(d): # Initialize an empty list to store formatted items @@ -76,4 +116,13 @@ def create_edge_variable_mapping(edges): # Construct a unique identifier for the edge variable_name = f"edge{edge['source']}to{edge['target']}".lower() mapping[(edge['source'], edge['target'])] = variable_name - return mapping \ No newline at end of file + return mapping + + + +def generate_letter_uuid(length=8): + """Generate a random string of uppercase letters with the specified length.""" + letters = string.ascii_uppercase # A-Z + return "".join(random.choice(letters) for _ in range(length)) + + diff --git a/level_4/main.py b/level_4/main.py index de77c6466..57517758f 100644 --- a/level_4/main.py +++ b/level_4/main.py @@ -8,10 +8,12 @@ from dotenv import load_dotenv from level_4.cognitive_architecture.database.postgres.database_crud import session_scope from cognitive_architecture.database.postgres.database import AsyncSessionLocal - +from cognitive_architecture.utils import generate_letter_uuid import instructor from openai import OpenAI +from level_4.cognitive_architecture.vectorstore_manager import Memory + # Adds response_model to ChatCompletion # Allows the return of Pydantic model rather than raw JSON instructor.patch(OpenAI()) @@ -28,7 +30,7 @@ config.load() print(config.model) print(config.openai_key) - +from cognitive_architecture.utils import get_document_names import logging @@ -46,6 +48,17 @@ async def get_vectordb_namespace(session: AsyncSession, user_id: str): logging.error(f"An error occurred while retrieving the Vectordb_namespace: {str(e)}") return None +async def get_vectordb_document_name(session: AsyncSession, user_id: str): + try: + result = await session.execute( + select(DocsModel.doc_name).where(DocsModel.user_id == user_id).order_by(DocsModel.created_at.desc()) + ) + doc_names = [row[0] for row in result.fetchall()] + return doc_names + except Exception as e: + logging.error(f"An error occurred while retrieving the Vectordb_namespace: {str(e)}") + return None + # async def retrieve_job_by_id(session, user_id, job_id): # try: # result = await session.execute( @@ -61,7 +74,7 @@ async def get_vectordb_namespace(session: AsyncSession, user_id: str): -async def update_document_vectordb_namespace(postgres_session: AsyncSession, user_id: str, namespace: str = None): +async def update_document_vectordb_namespace(postgres_session: AsyncSession, user_id: str, namespace: str = None, job_id:str=None): """ Update the Document node with the Vectordb_namespace for the given user. If the namespace is not provided, it will be retrieved from the PostgreSQL database. @@ -93,6 +106,124 @@ async def update_document_vectordb_namespace(postgres_session: AsyncSession, use +async def load_documents_to_vectorstore(session: AsyncSession, user_id: str, job_id:str=None, loader_settings:dict=None): + if job_id is None: + job_id = str(uuid.uuid4()) + + await add_entity( + session, + Operation( + id=job_id, + user_id=user_id, + operation_status="RUNNING", + operation_type="DATA_LOAD", + test_set_id="none", + ), + ) + + document_names = get_document_names(loader_settings.get("path", "None")) + for doc in document_names: + await add_entity( + session, + DocsModel( + id=str(uuid.uuid4()), + operation_id=job_id, + doc_name=doc + ) + ) + namespace_id = str(generate_letter_uuid()) + "_" + "SEMANTICMEMORY" + namespace_class = namespace_id + "_class" + memory = await Memory.create_memory(user_id, session, namespace=namespace_class) + + # Managing memory attributes + existing_user = await Memory.check_existing_user(user_id, session) + print("here is the existing user", existing_user) + await memory.manage_memory_attributes(existing_user) + + params = { + "version": "1.0", + "agreement_id": "AG123456", + "privacy_policy": "https://example.com/privacy", + "terms_of_service": "https://example.com/terms", + "format": "json", + "schema_version": "1.1", + "checksum": "a1b2c3d4e5f6", + "owner": "John Doe", + "license": "MIT", + "validity_start": "2023-08-01", + "validity_end": "2024-07-31", + } + + dynamic_memory_class = getattr(memory, namespace_class.lower(), None) + + await memory.add_dynamic_memory_class(dynamic_memory_class, namespace_class) + await memory.add_method_to_class(dynamic_memory_class, "add_memories") + # await memory.add_method_to_class(memory.semanticmemory_class, "fetch_memories") + sss = await memory.dynamic_method_call(dynamic_memory_class, 'add_memories', + observation='some_observation', params=params, loader_settings=loader_settings) + await add_entity( + session, + Operation( + id=job_id, + user_id=user_id, + operation_status="FINISHED", + operation_type="DATA_LOAD", + test_set_id="none", + ), + ) + +async def user_query_to_graph_db(session: AsyncSession, user_id: str, query_input: str): + + job_id = str(uuid.uuid4()) + + await add_entity( + session, + Operation( + id=job_id, + user_id=user_id, + operation_status="RUNNING", + operation_type="USER_QUERY_TO_GRAPH_DB", + test_set_id="none", + ), + ) + + neo4j_graph_db = Neo4jGraphDB(url=config.graph_database_url, username=config.graph_database_username, password=config.graph_database_password) + # # Generate the Cypher query for a specific user + # user_id = 'user123' # Replace with the actual user ID + cypher_query = await neo4j_graph_db.generate_cypher_query_for_user_prompt_decomposition(user_id,query_input) + result = neo4j_graph_db.query(cypher_query) + await add_entity( + session, + Operation( + id=job_id, + user_id=user_id, + operation_status="SUCCESS", + operation_type="USER_QUERY_TO_GRAPH_DB", + test_set_id="none", + ), + ) + return result + + + + +async def add_documents_to_graph_db(postgres_session: AsyncSession, user_id: str): + """""" + try: + await update_document_vectordb_namespace(postgres_session, user_id) + from cognitive_architecture.classifiers.classifier import classify_documents + + + classification = await classify_documents("Lord of the Rings") + neo4j_graph_db = Neo4jGraphDB(url=config.graph_database_url, username=config.graph_database_username, + password=config.graph_database_password) + rs = neo4j_graph_db.create_document_node_cypher(classification, user_id) + neo4j_graph_db.query(rs, classification) + namespace_title_dict = get_vectordb_namespace #fix this + neo4j_graph_db.update_document_node_with_namespace(user_id, namespace=, document_title="Lord of the Rings") + + except: + pass # query_input = "I walked in the forest yesterday and added to my list I need to buy some milk in the store" # @@ -199,7 +330,7 @@ async def main(): neo4j_graph_db = Neo4jGraphDB(url='bolt://localhost:7687', username='neo4j', password='pleaseletmein') # # Generate the Cypher query for a specific user # user_id = 'user123' # Replace with the actual user ID - cypher_query = await neo4j_graph_db.generate_cypher_query_for_user_prompt_decomposition(user_id,"I walked in the forest yesterday and added to my list I need to buy some milk in the store") + #cypher_query = await neo4j_graph_db.generate_cypher_query_for_user_prompt_decomposition(user_id,"I walked in the forest yesterday and added to my list I need to buy some milk in the store") # result = neo4j_graph_db.query(cypher_query) call_of_the_wild_summary = { "user_id": user_id, @@ -220,12 +351,30 @@ async def main(): "answering the primal call of nature." ) } - rs = neo4j_graph_db.create_document_node_cypher(call_of_the_wild_summary, user_id) + # rs = neo4j_graph_db.create_document_node_cypher(call_of_the_wild_summary, user_id) + # + # neo4j_graph_db.query(rs, call_of_the_wild_summary) + # print(cypher_query) - neo4j_graph_db.query(rs, call_of_the_wild_summary) - print(cypher_query) + from cognitive_architecture.classifiers.classifier import classify_documents - neo4j_graph_db.update_document_node_with_namespace(user_id, document_title="The Call of the Wild") + ff = await classify_documents("Lord of the Rings") + + print(ff) + + # vector_db_namespaces = await get_vectordb_namespace(session, user_id) + # + # if vector_db_namespaces == []: + # vector_db_namespaces = ["None"] + # + # + # print(vector_db_namespaces) + # for value in vector_db_namespaces: + # print(value) + # + # oo = neo4j_graph_db.update_document_node_with_namespace(user_id,vectordb_namespace= value,document_title="The Call of the Wild") + # logging.info("gg", oo) + # neo4j_graph_db.query(oo) @@ -283,3 +432,7 @@ if __name__ == "__main__": asyncio.run(main()) +#1. decompose query +#2. add document to vectordb +#3. add document to graph +#4. fetch relevant memories from semantic, episodic \ No newline at end of file