Fixed api issues, missing deps and got first endpoint to work

This commit is contained in:
Vasilije 2023-11-17 20:44:44 +01:00
parent 5b291368fd
commit f3a5ee929e
4 changed files with 47 additions and 68 deletions

View file

@ -87,21 +87,15 @@ async def add_memory(
content={"response": {"error": str(e)}}, status_code=503 content={"response": {"error": str(e)}}, status_code=503
) )
@app.post("/user-to-graph-query") @app.post("/user-query-to-graph")
async def generate_cypher_query(payload: Payload): async def user_query_to_graph(payload: Payload):
try: try:
from main import user_query_to_graph_db
from cognitive_architecture.database.graph_database.graph import Neo4jGraphDB
neo4j_graph_db = Neo4jGraphDB(config.graph_database_url, config.graph_database_username, config.graph_database_password)
decoded_payload = payload.payload decoded_payload = payload.payload
cypher_query = await neo4j_graph_db.generate_cypher_query_for_user_prompt_decomposition(decoded_payload['user_id'],
decoded_payload['prompt'])
# Execute the query - replace this with the actual execution method # Execute the query - replace this with the actual execution method
async with session_scope(session=AsyncSessionLocal()) as session: async with session_scope(session=AsyncSessionLocal()) as session:
# Assuming you have a method in Neo4jGraphDB to execute the query # Assuming you have a method in Neo4jGraphDB to execute the query
result = await neo4j_graph_db.query(cypher_query, session) result = await user_query_to_graph_db(decoded_payload['user_id'], decoded_payload['query'], session)
return result return result

View file

@ -168,7 +168,7 @@ class Memory:
self.OPENAI_API_KEY = config.openai_key self.OPENAI_API_KEY = config.openai_key
@classmethod @classmethod
async def create_memory(cls, user_id: str, session, memory_label:str, **kwargs): async def create_memory(cls, user_id: str, session, job_id, memory_label:str, **kwargs):
""" """
Class method that acts as a factory method for creating Memory instances. Class method that acts as a factory method for creating Memory instances.
It performs necessary DB checks or updates before instance creation. It performs necessary DB checks or updates before instance creation.
@ -180,7 +180,7 @@ class Memory:
# Handle existing user scenario... # Handle existing user scenario...
memory_id = await cls.check_existing_memory(user_id,memory_label, session) memory_id = await cls.check_existing_memory(user_id,memory_label, session)
if memory_id is None: if memory_id is None:
memory_id = await cls.handle_new_memory(user_id, session, memory_name= memory_label) memory_id = await cls.handle_new_memory(user_id = user_id, session= session,job_id=job_id, memory_name= memory_label)
logging.info( logging.info(
f"Existing user {user_id} found in the DB. Memory ID: {memory_id}" f"Existing user {user_id} found in the DB. Memory ID: {memory_id}"
) )
@ -188,7 +188,7 @@ class Memory:
# Handle new user scenario... # Handle new user scenario...
await cls.handle_new_user(user_id, session) await cls.handle_new_user(user_id, session)
memory_id = await cls.handle_new_memory(user_id, session, memory_name= memory_label) memory_id = await cls.handle_new_memory(user_id =user_id, session=session, job_id=job_id, memory_name= memory_label)
logging.info( logging.info(
f"New user {user_id} created in the DB. Memory ID: {memory_id}" f"New user {user_id} created in the DB. Memory ID: {memory_id}"
) )

View file

@ -31,6 +31,7 @@ services:
- 80:80 - 80:80
depends_on: depends_on:
- postgres - postgres
- neo4j
deploy: deploy:
resources: resources:
limits: limits:

View file

@ -4,7 +4,7 @@ from pydantic import BaseModel
from cognitive_architecture.database.graph_database.graph import Neo4jGraphDB from cognitive_architecture.database.graph_database.graph import Neo4jGraphDB
from cognitive_architecture.database.postgres.models.memory import MemoryModel from cognitive_architecture.database.postgres.models.memory import MemoryModel
from cognitive_architecture.classifiers.classifier import classify_documents
import os import os
from dotenv import load_dotenv from dotenv import load_dotenv
@ -70,17 +70,22 @@ async def get_vectordb_document_name(session: AsyncSession, user_id: str):
logging.error(f"An error occurred while retrieving the Vectordb_namespace: {str(e)}") logging.error(f"An error occurred while retrieving the Vectordb_namespace: {str(e)}")
return None return None
from sqlalchemy.orm import selectinload from sqlalchemy.orm import selectinload, joinedload
async def get_vectordb_data(session: AsyncSession, user_id: str): async def get_vectordb_data(session: AsyncSession, user_id: str):
try: try:
# Query to join Operations with MemoryModel and DocsModel
result = await session.execute( result = await session.execute(
select(Operation) select(Operation, DocsModel.doc_name, DocsModel.id, MemoryModel.memory_name)
.options(selectinload(Operation.memories), selectinload(Operation.docs)) .join(DocsModel, Operation.docs) # Correct join with docs table
.where((Operation.user_id == user_id) & (Operation.operation_status == 'SUCCESS')) .join(MemoryModel, Operation.memories) # Correct join with memories table
.order_by(Operation.created_at.desc()) .where(
(Operation.user_id == user_id) # Filter by user_id
# (Operation.operation_status == 'SUCCESS') # Optional additional filter
)
.order_by(Operation.created_at.desc()) # Order by creation date
) )
operations = result.scalars().all() operations = result.scalars().all()
# Extract memory names and document names and IDs # Extract memory names and document names and IDs
@ -88,9 +93,11 @@ async def get_vectordb_data(session: AsyncSession, user_id: str):
docs = [(doc.doc_name, doc.id) for op in operations for doc in op.docs] docs = [(doc.doc_name, doc.id) for op in operations for doc in op.docs]
return memory_names, docs return memory_names, docs
except Exception as e: except Exception as e:
logging.error(f"An error occurred while retrieving Vectordb data: {str(e)}") # Handle the exception as needed
return None, None print(f"An error occurred: {e}")
return None
# async def retrieve_job_by_id(session, user_id, job_id): # async def retrieve_job_by_id(session, user_id, job_id):
@ -143,21 +150,21 @@ async def get_vectordb_data(session: AsyncSession, user_id: str):
async def load_documents_to_vectorstore(session: AsyncSession, user_id: str, job_id:str=None, loader_settings:dict=None): async def load_documents_to_vectorstore(session: AsyncSession, user_id: str, job_id:str=None, loader_settings:dict=None):
namespace_id = str(generate_letter_uuid()) + "_" + "SEMANTICMEMORY" namespace_id = str(generate_letter_uuid()) + "_" + "SEMANTICMEMORY"
namespace_class = namespace_id + "_class" namespace_class = namespace_id + "_class"
memory = await Memory.create_memory(user_id, session, namespace=namespace_id, memory_label=namespace_id)
if job_id is None: if job_id is None:
job_id = str(uuid.uuid4()) job_id = str(uuid.uuid4())
await add_entity( memory = await Memory.create_memory(user_id, session, namespace=namespace_id, job_id=job_id, memory_label=namespace_id)
session,
Operation( await add_entity(
id=job_id, session,
user_id=user_id, Operation(
operation_status="RUNNING", id=job_id,
operation_type="DATA_LOAD", user_id=user_id,
), operation_status="RUNNING",
) operation_type="DATA_LOAD",
),
)
document_names = get_document_names(loader_settings.get("path", "None")) document_names = get_document_names(loader_settings.get("path", "None"))
for doc in document_names: for doc in document_names:
@ -169,14 +176,10 @@ async def load_documents_to_vectorstore(session: AsyncSession, user_id: str, job
doc_name=doc doc_name=doc
) )
) )
# Managing memory attributes # Managing memory attributes
existing_user = await Memory.check_existing_user(user_id, session) existing_user = await Memory.check_existing_user(user_id, session)
print("here is the existing user", existing_user) print("here is the existing user", existing_user)
await memory.manage_memory_attributes(existing_user) await memory.manage_memory_attributes(existing_user)
params = { params = {
"version": "1.0", "version": "1.0",
"agreement_id": "AG123456", "agreement_id": "AG123456",
@ -205,20 +208,12 @@ async def load_documents_to_vectorstore(session: AsyncSession, user_id: str, job
print(f"No attribute named in memory.") print(f"No attribute named in memory.")
print("Available memory classes:", await memory.list_memory_classes()) print("Available memory classes:", await memory.list_memory_classes())
result = await memory.dynamic_method_call(dynamic_memory_class, 'add_memories',
# await memory.add_method_to_class(dynamic_memory_class, "add_memories")
# await memory.add_method_to_class(memory.semanticmemory_class, "fetch_memories")
sss = await memory.dynamic_method_call(dynamic_memory_class, 'add_memories',
observation='some_observation', params=params, loader_settings=loader_settings) observation='some_observation', params=params, loader_settings=loader_settings)
await add_entity(
session, await update_entity(session, Operation, job_id, "SUCCESS")
Operation( return result
id=job_id,
user_id=user_id,
operation_status="SUCCESS",
operation_type="DATA_LOAD",
),
)
async def user_query_to_graph_db(session: AsyncSession, user_id: str, query_input: str): async def user_query_to_graph_db(session: AsyncSession, user_id: str, query_input: str):
@ -239,15 +234,9 @@ async def user_query_to_graph_db(session: AsyncSession, user_id: str, query_inpu
# user_id = 'user123' # Replace with the actual user ID # user_id = 'user123' # Replace with the actual user ID
cypher_query = await neo4j_graph_db.generate_cypher_query_for_user_prompt_decomposition(user_id,query_input) cypher_query = await neo4j_graph_db.generate_cypher_query_for_user_prompt_decomposition(user_id,query_input)
result = neo4j_graph_db.query(cypher_query) result = neo4j_graph_db.query(cypher_query)
await add_entity(
session, await update_entity(session, Operation, job_id, "SUCCESS")
Operation(
id=job_id,
user_id=user_id,
operation_status="SUCCESS",
operation_type="USER_QUERY_TO_GRAPH_DB",
),
)
return result return result
@ -257,16 +246,9 @@ async def add_documents_to_graph_db(postgres_session: AsyncSession, user_id: str
"""""" """"""
try: try:
# await update_document_vectordb_namespace(postgres_session, user_id) # await update_document_vectordb_namespace(postgres_session, user_id)
from cognitive_architecture.classifiers.classifier import classify_documents
memory_names, docs = await get_vectordb_data(postgres_session, user_id) memory_names, docs = await get_vectordb_data(postgres_session, user_id)
for doc, memory_name in zip(docs, memory_names): for doc, memory_name in zip(docs, memory_names):
doc_name, doc_id = doc doc_name, doc_id = doc
classification = await classify_documents(doc_name, document_id=doc_id) classification = await classify_documents(doc_name, document_id=doc_id)
neo4j_graph_db = Neo4jGraphDB(url=config.graph_database_url, username=config.graph_database_username, neo4j_graph_db = Neo4jGraphDB(url=config.graph_database_url, username=config.graph_database_username,
password=config.graph_database_password) password=config.graph_database_password)
@ -522,7 +504,9 @@ async def main():
"source": "URL", "source": "URL",
"path": ["https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf"], "path": ["https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf"],
} }
await load_documents_to_vectorstore(session, user_id, loader_settings=loader_settings) # await load_documents_to_vectorstore(session, user_id, loader_settings=loader_settings)
await user_query_to_graph_db(session, user_id, "I walked in the forest yesterday and added to my list I need to buy some milk in the store")
# await add_documents_to_graph_db(session, user_id)
# memory_instance = Memory(namespace='SEMANTICMEMORY') # memory_instance = Memory(namespace='SEMANTICMEMORY')
# sss = await memory_instance.dynamic_method_call(memory_instance.semantic_memory_class, 'fetch_memories', observation='some_observation') # sss = await memory_instance.dynamic_method_call(memory_instance.semantic_memory_class, 'fetch_memories', observation='some_observation')
# from cognitive_architecture.vectorstore_manager import Memory # from cognitive_architecture.vectorstore_manager import Memory