diff --git a/level_4/api.py b/level_4/api.py index d7f420097..e8c2dcbba 100644 --- a/level_4/api.py +++ b/level_4/api.py @@ -87,21 +87,15 @@ async def add_memory( content={"response": {"error": str(e)}}, status_code=503 ) -@app.post("/user-to-graph-query") -async def generate_cypher_query(payload: Payload): +@app.post("/user-query-to-graph") +async def user_query_to_graph(payload: Payload): try: - - - from cognitive_architecture.database.graph_database.graph import Neo4jGraphDB - neo4j_graph_db = Neo4jGraphDB(config.graph_database_url, config.graph_database_username, config.graph_database_password) + from main import user_query_to_graph_db decoded_payload = payload.payload - cypher_query = await neo4j_graph_db.generate_cypher_query_for_user_prompt_decomposition(decoded_payload['user_id'], - decoded_payload['prompt']) - # Execute the query - replace this with the actual execution method async with session_scope(session=AsyncSessionLocal()) as session: # Assuming you have a method in Neo4jGraphDB to execute the query - result = await neo4j_graph_db.query(cypher_query, session) + result = await user_query_to_graph_db(decoded_payload['user_id'], decoded_payload['query'], session) return result diff --git a/level_4/cognitive_architecture/vectorstore_manager.py b/level_4/cognitive_architecture/vectorstore_manager.py index 5cc2c6cff..05cf3fa78 100644 --- a/level_4/cognitive_architecture/vectorstore_manager.py +++ b/level_4/cognitive_architecture/vectorstore_manager.py @@ -168,7 +168,7 @@ class Memory: self.OPENAI_API_KEY = config.openai_key @classmethod - async def create_memory(cls, user_id: str, session, memory_label:str, **kwargs): + async def create_memory(cls, user_id: str, session, job_id, memory_label:str, **kwargs): """ Class method that acts as a factory method for creating Memory instances. It performs necessary DB checks or updates before instance creation. @@ -180,7 +180,7 @@ class Memory: # Handle existing user scenario... memory_id = await cls.check_existing_memory(user_id,memory_label, session) if memory_id is None: - memory_id = await cls.handle_new_memory(user_id, session, memory_name= memory_label) + memory_id = await cls.handle_new_memory(user_id = user_id, session= session,job_id=job_id, memory_name= memory_label) logging.info( f"Existing user {user_id} found in the DB. Memory ID: {memory_id}" ) @@ -188,7 +188,7 @@ class Memory: # Handle new user scenario... await cls.handle_new_user(user_id, session) - memory_id = await cls.handle_new_memory(user_id, session, memory_name= memory_label) + memory_id = await cls.handle_new_memory(user_id =user_id, session=session, job_id=job_id, memory_name= memory_label) logging.info( f"New user {user_id} created in the DB. Memory ID: {memory_id}" ) diff --git a/level_4/docker-compose.yml b/level_4/docker-compose.yml index 55e0a9302..d8fb92483 100644 --- a/level_4/docker-compose.yml +++ b/level_4/docker-compose.yml @@ -31,6 +31,7 @@ services: - 80:80 depends_on: - postgres + - neo4j deploy: resources: limits: diff --git a/level_4/main.py b/level_4/main.py index e0dca6440..be253a13e 100644 --- a/level_4/main.py +++ b/level_4/main.py @@ -4,7 +4,7 @@ from pydantic import BaseModel from cognitive_architecture.database.graph_database.graph import Neo4jGraphDB from cognitive_architecture.database.postgres.models.memory import MemoryModel - +from cognitive_architecture.classifiers.classifier import classify_documents import os from dotenv import load_dotenv @@ -70,17 +70,22 @@ async def get_vectordb_document_name(session: AsyncSession, user_id: str): logging.error(f"An error occurred while retrieving the Vectordb_namespace: {str(e)}") return None -from sqlalchemy.orm import selectinload +from sqlalchemy.orm import selectinload, joinedload + async def get_vectordb_data(session: AsyncSession, user_id: str): try: - # Query to join Operations with MemoryModel and DocsModel result = await session.execute( - select(Operation) - .options(selectinload(Operation.memories), selectinload(Operation.docs)) - .where((Operation.user_id == user_id) & (Operation.operation_status == 'SUCCESS')) - .order_by(Operation.created_at.desc()) + select(Operation, DocsModel.doc_name, DocsModel.id, MemoryModel.memory_name) + .join(DocsModel, Operation.docs) # Correct join with docs table + .join(MemoryModel, Operation.memories) # Correct join with memories table + .where( + (Operation.user_id == user_id) # Filter by user_id + # (Operation.operation_status == 'SUCCESS') # Optional additional filter + ) + .order_by(Operation.created_at.desc()) # Order by creation date ) + operations = result.scalars().all() # Extract memory names and document names and IDs @@ -88,9 +93,11 @@ async def get_vectordb_data(session: AsyncSession, user_id: str): docs = [(doc.doc_name, doc.id) for op in operations for doc in op.docs] return memory_names, docs + except Exception as e: - logging.error(f"An error occurred while retrieving Vectordb data: {str(e)}") - return None, None + # Handle the exception as needed + print(f"An error occurred: {e}") + return None # async def retrieve_job_by_id(session, user_id, job_id): @@ -143,21 +150,21 @@ async def get_vectordb_data(session: AsyncSession, user_id: str): async def load_documents_to_vectorstore(session: AsyncSession, user_id: str, job_id:str=None, loader_settings:dict=None): namespace_id = str(generate_letter_uuid()) + "_" + "SEMANTICMEMORY" namespace_class = namespace_id + "_class" - - memory = await Memory.create_memory(user_id, session, namespace=namespace_id, memory_label=namespace_id) - if job_id is None: job_id = str(uuid.uuid4()) - await add_entity( - session, - Operation( - id=job_id, - user_id=user_id, - operation_status="RUNNING", - operation_type="DATA_LOAD", - ), - ) + memory = await Memory.create_memory(user_id, session, namespace=namespace_id, job_id=job_id, memory_label=namespace_id) + + await add_entity( + session, + Operation( + id=job_id, + user_id=user_id, + operation_status="RUNNING", + operation_type="DATA_LOAD", + ), + ) + document_names = get_document_names(loader_settings.get("path", "None")) for doc in document_names: @@ -169,14 +176,10 @@ async def load_documents_to_vectorstore(session: AsyncSession, user_id: str, job doc_name=doc ) ) - - # Managing memory attributes existing_user = await Memory.check_existing_user(user_id, session) print("here is the existing user", existing_user) await memory.manage_memory_attributes(existing_user) - - params = { "version": "1.0", "agreement_id": "AG123456", @@ -205,20 +208,12 @@ async def load_documents_to_vectorstore(session: AsyncSession, user_id: str, job print(f"No attribute named in memory.") print("Available memory classes:", await memory.list_memory_classes()) - - # await memory.add_method_to_class(dynamic_memory_class, "add_memories") - # await memory.add_method_to_class(memory.semanticmemory_class, "fetch_memories") - sss = await memory.dynamic_method_call(dynamic_memory_class, 'add_memories', + result = await memory.dynamic_method_call(dynamic_memory_class, 'add_memories', observation='some_observation', params=params, loader_settings=loader_settings) - await add_entity( - session, - Operation( - id=job_id, - user_id=user_id, - operation_status="SUCCESS", - operation_type="DATA_LOAD", - ), - ) + + await update_entity(session, Operation, job_id, "SUCCESS") + return result + async def user_query_to_graph_db(session: AsyncSession, user_id: str, query_input: str): @@ -239,15 +234,9 @@ async def user_query_to_graph_db(session: AsyncSession, user_id: str, query_inpu # user_id = 'user123' # Replace with the actual user ID cypher_query = await neo4j_graph_db.generate_cypher_query_for_user_prompt_decomposition(user_id,query_input) result = neo4j_graph_db.query(cypher_query) - await add_entity( - session, - Operation( - id=job_id, - user_id=user_id, - operation_status="SUCCESS", - operation_type="USER_QUERY_TO_GRAPH_DB", - ), - ) + + await update_entity(session, Operation, job_id, "SUCCESS") + return result @@ -257,16 +246,9 @@ async def add_documents_to_graph_db(postgres_session: AsyncSession, user_id: str """""" try: # await update_document_vectordb_namespace(postgres_session, user_id) - from cognitive_architecture.classifiers.classifier import classify_documents - memory_names, docs = await get_vectordb_data(postgres_session, user_id) - for doc, memory_name in zip(docs, memory_names): - - doc_name, doc_id = doc - - classification = await classify_documents(doc_name, document_id=doc_id) neo4j_graph_db = Neo4jGraphDB(url=config.graph_database_url, username=config.graph_database_username, password=config.graph_database_password) @@ -522,7 +504,9 @@ async def main(): "source": "URL", "path": ["https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf"], } - await load_documents_to_vectorstore(session, user_id, loader_settings=loader_settings) + # await load_documents_to_vectorstore(session, user_id, loader_settings=loader_settings) + await user_query_to_graph_db(session, user_id, "I walked in the forest yesterday and added to my list I need to buy some milk in the store") + # await add_documents_to_graph_db(session, user_id) # memory_instance = Memory(namespace='SEMANTICMEMORY') # sss = await memory_instance.dynamic_method_call(memory_instance.semantic_memory_class, 'fetch_memories', observation='some_observation') # from cognitive_architecture.vectorstore_manager import Memory