User context enrichment mostly works now, left to add job state management, retrieve id from vectorstore and do semantic search, and then run it to chatgpt with enriched context

This commit is contained in:
Vasilije 2023-11-18 13:58:51 +01:00
parent 4c72ebf2c7
commit 6fa13a8a9a
3 changed files with 187 additions and 516 deletions

View file

@ -13,7 +13,7 @@ from cognitive_architecture.database.postgres.database import AsyncSessionLocal
from cognitive_architecture.database.postgres.database_crud import session_scope
from cognitive_architecture.vectorstore_manager import Memory
from dotenv import load_dotenv
from main import add_documents_to_graph_db
from main import add_documents_to_graph_db, user_context_enrichment
from cognitive_architecture.config import Config
# Set up logging
@ -103,7 +103,7 @@ async def user_query_to_graph(payload: Payload):
raise HTTPException(status_code=500, detail=str(e))
@app.post("/document_to_graph_db")
@app.post("/document-to-graph-db")
async def document_to_graph_db(payload: Payload):
try:
decoded_payload = payload.payload
@ -117,329 +117,20 @@ async def document_to_graph_db(payload: Payload):
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# @app.post("/user-document-vectordb")
# async def generate_document_to_vector_db(payload: Payload, ):
# try:
# from database.graph_database.graph import Neo4jGraphDB
# neo4j_graph_db = Neo4jGraphDB(config.graph_database_url, config.graph_database_username,
# config.graph_database_password)
# decoded_payload = payload.payload
#
#
# neo4j_graph_db.update_document_node_with_namespace(decoded_payload['user_id'], document_title="The Call of the Wild")
#
# # Execute the query - replace this with the actual execution method
# # async with session_scope(session=AsyncSessionLocal()) as session:
# # # Assuming you have a method in Neo4jGraphDB to execute the query
# # result = await neo4j_graph_db.query(cypher_query, session)
# #
# # return result
#
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
@app.post("/fetch-memory", response_model=dict)
async def fetch_memory(
payload: Payload,
# files: List[UploadFile] = File(...),
):
@app.post("/user-query-processor")
async def user_query_processor(payload: Payload):
try:
logging.info(" Adding to Memory ")
decoded_payload = payload.payload
# Execute the query - replace this with the actual execution method
async with session_scope(session=AsyncSessionLocal()) as session:
memory = await Memory.create_memory(
decoded_payload["user_id"], session, "SEMANTICMEMORY", namespace="SEMANTICMEMORY"
)
# Adding a memory instance
await memory.add_memory_instance(decoded_payload["memory_object"])
# Managing memory attributes
existing_user = await Memory.check_existing_user(
decoded_payload["user_id"], session
)
await memory.manage_memory_attributes(existing_user)
await memory.add_dynamic_memory_class(
decoded_payload["memory_object"],
decoded_payload["memory_object"].upper(),
)
memory_class = decoded_payload["memory_object"] + "_class"
dynamic_memory_class = getattr(memory, memory_class.lower(), None)
await memory.add_method_to_class(dynamic_memory_class, "add_memories")
# await memory.add_method_to_class(memory.semanticmemory_class, 'fetch_memories')
output = await memory.dynamic_method_call(
dynamic_memory_class,
"fetch_memories",
observation=decoded_payload["observation"],
)
return JSONResponse(content={"response": output}, status_code=200)
# Assuming you have a method in Neo4jGraphDB to execute the query
result = await user_context_enrichment(session, decoded_payload['user_id'], decoded_payload['query'])
return JSONResponse(content={"response": result}, status_code=200)
except Exception as e:
return JSONResponse(
content={"response": {"error": str(e)}}, status_code=503
)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/delete-memory", response_model=dict)
async def delete_memory(
payload: Payload,
# files: List[UploadFile] = File(...),
):
try:
logging.info(" Adding to Memory ")
decoded_payload = payload.payload
async with session_scope(session=AsyncSessionLocal()) as session:
memory = await Memory.create_memory(
decoded_payload["user_id"], session, namespace="SEMANTICMEMORY"
)
# Adding a memory instance
await memory.add_memory_instance(decoded_payload["memory_object"])
# Managing memory attributes
existing_user = await Memory.check_existing_user(
decoded_payload["user_id"], session
)
await memory.manage_memory_attributes(existing_user)
await memory.add_dynamic_memory_class(
decoded_payload["memory_object"],
decoded_payload["memory_object"].upper(),
)
memory_class = decoded_payload["memory_object"] + "_class"
dynamic_memory_class = getattr(memory, memory_class.lower(), None)
await memory.add_method_to_class(
dynamic_memory_class, "delete_memories"
)
# await memory.add_method_to_class(memory.semanticmemory_class, 'fetch_memories')
output = await memory.dynamic_method_call(
dynamic_memory_class,
"delete_memories",
namespace=decoded_payload["memory_object"].upper(),
)
return JSONResponse(content={"response": output}, status_code=200)
except Exception as e:
return JSONResponse(
content={"response": {"error": str(e)}}, status_code=503
)
class TestSetType(Enum):
SAMPLE = "sample"
MANUAL = "manual"
def get_test_set(test_set_type, folder_path="example_data", payload=None):
if test_set_type == TestSetType.SAMPLE:
file_path = os.path.join(folder_path, "test_set.json")
if os.path.isfile(file_path):
with open(file_path, "r") as file:
return json.load(file)
elif test_set_type == TestSetType.MANUAL:
# Check if the manual test set is provided in the payload
if payload and "manual_test_set" in payload:
return payload["manual_test_set"]
else:
# Attempt to load the manual test set from a file
pass
return None
class MetadataType(Enum):
SAMPLE = "sample"
MANUAL = "manual"
def get_metadata(metadata_type, folder_path="example_data", payload=None):
if metadata_type == MetadataType.SAMPLE:
file_path = os.path.join(folder_path, "metadata.json")
if os.path.isfile(file_path):
with open(file_path, "r") as file:
return json.load(file)
elif metadata_type == MetadataType.MANUAL:
# Check if the manual metadata is provided in the payload
if payload and "manual_metadata" in payload:
return payload["manual_metadata"]
else:
pass
return None
# @app.post("/rag-test/rag_test_run", response_model=dict)
# async def rag_test_run(
# payload: Payload,
# background_tasks: BackgroundTasks,
# ):
# try:
# logging.info("Starting RAG Test")
# decoded_payload = payload.payload
# test_set_type = TestSetType(decoded_payload['test_set'])
#
# metadata_type = MetadataType(decoded_payload['metadata'])
#
# metadata = get_metadata(metadata_type, payload=decoded_payload)
# if metadata is None:
# return JSONResponse(content={"response": "Invalid metadata value"}, status_code=400)
#
# test_set = get_test_set(test_set_type, payload=decoded_payload)
# if test_set is None:
# return JSONResponse(content={"response": "Invalid test_set value"}, status_code=400)
#
# async def run_start_test(data, test_set, user_id, params, metadata, retriever_type):
# result = await start_test(data = data, test_set = test_set, user_id =user_id, params =params, metadata =metadata, retriever_type=retriever_type)
#
# logging.info("Retriever DATA type", type(decoded_payload['data']))
#
# background_tasks.add_task(
# run_start_test,
# decoded_payload['data'],
# test_set,
# decoded_payload['user_id'],
# decoded_payload['params'],
# metadata,
# decoded_payload['retriever_type']
# )
#
# logging.info("Retriever type", decoded_payload['retriever_type'])
# return JSONResponse(content={"response": "Task has been started"}, status_code=200)
#
# except Exception as e:
# return JSONResponse(
#
# content={"response": {"error": str(e)}}, status_code=503
#
# )
# @app.get("/rag-test/{task_id}")
# async def check_task_status(task_id: int):
# task_status = task_status_db.get(task_id, "not_found")
#
# if task_status == "not_found":
# return {"status": "Task not found"}
#
# return {"status": task_status}
# @app.get("/available-buffer-actions", response_model=dict)
# async def available_buffer_actions(
# payload: Payload,
# # files: List[UploadFile] = File(...),
# ):
# try:
# decoded_payload = payload.payload
#
# Memory_ = Memory(user_id=decoded_payload["user_id"])
#
# await Memory_.async_init()
#
# # memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None)
# output = await Memory_._available_operations()
# return JSONResponse(content={"response": output}, status_code=200)
#
# except Exception as e:
# return JSONResponse(content={"response": {"error": str(e)}}, status_code=503)
# @app.post("/run-buffer", response_model=dict)
# async def run_buffer(
# payload: Payload,
# # files: List[UploadFile] = File(...),
# ):
# try:
# decoded_payload = payload.payload
#
# Memory_ = Memory(user_id=decoded_payload["user_id"])
#
# await Memory_.async_init()
#
# # memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None)
# output = await Memory_._run_main_buffer(
# user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"]
# )
# return JSONResponse(content={"response": output}, status_code=200)
#
# except Exception as e:
# return JSONResponse(content={"response": {"error": str(e)}}, status_code=503)
#
#
# @app.post("/buffer/create-context", response_model=dict)
# async def create_context(
# payload: Payload,
# # files: List[UploadFile] = File(...),
# ):
# try:
# decoded_payload = payload.payload
#
# Memory_ = Memory(user_id=decoded_payload["user_id"])
#
# await Memory_.async_init()
#
# # memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None)
# output = await Memory_._create_buffer_context(
# user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"]
# )
# return JSONResponse(content={"response": output}, status_code=200)
#
# except Exception as e:
# return JSONResponse(content={"response": {"error": str(e)}}, status_code=503)
#
#
# @app.post("/buffer/get-tasks", response_model=dict)
# async def create_context(
# payload: Payload,
# # files: List[UploadFile] = File(...),
# ):
# try:
# decoded_payload = payload.payload
#
# Memory_ = Memory(user_id=decoded_payload["user_id"])
#
# await Memory_.async_init()
#
# # memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None)
# output = await Memory_._get_task_list(
# user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"]
# )
# return JSONResponse(content={"response": output}, status_code=200)
#
# except Exception as e:
# return JSONResponse(content={"response": {"error": str(e)}}, status_code=503)
#
#
# @app.post("/buffer/provide-feedback", response_model=dict)
# async def provide_feedback(
# payload: Payload,
# # files: List[UploadFile] = File(...),
# ):
# try:
# decoded_payload = payload.payload
#
# Memory_ = Memory(user_id=decoded_payload["user_id"])
#
# await Memory_.async_init()
#
# # memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None)
# if decoded_payload["total_score"] is None:
#
# output = await Memory_._provide_feedback(
# user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=None, total_score=decoded_payload["total_score"]
# )
# return JSONResponse(content={"response": output}, status_code=200)
# else:
# output = await Memory_._provide_feedback(
# user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"], total_score=None
# )
# return JSONResponse(content={"response": output}, status_code=200)
#
#
# except Exception as e:
# return JSONResponse(content={"response": {"error": str(e)}}, status_code=503)
def start_api_server(host: str = "0.0.0.0", port: int = 8000):
"""
Start the API server using uvicorn.

View file

@ -192,21 +192,20 @@ class Neo4jGraphDB(AbstractGraphDB):
def query(self, query, params=None):
return self.graph.query(query, params)
# Initialize the Neo4j connection here
def create_base_cognitive_architecture(self, user_id: str):
# Create the user and memory components if they don't exist
user_memory_cypher = f"""
MERGE (user:User {{userId: '{user_id}'}})
MERGE (semantic:SemanticMemory {{userId: '{user_id}'}})
MERGE (episodic:EpisodicMemory {{userId: '{user_id}'}})
MERGE (buffer:Buffer {{userId: '{user_id}'}})
MERGE (semantic:SemanticMemory {{userId: '{user_id}', name: 'SemanticMemory}}')
MERGE (episodic:EpisodicMemory {{userId: '{user_id}', name: 'EpisodicMemory'}})
MERGE (buffer:Buffer {{userId: '{user_id}', name: 'Buffer}})
MERGE (user)-[:HAS_SEMANTIC_MEMORY]->(semantic)
MERGE (user)-[:HAS_EPISODIC_MEMORY]->(episodic)
MERGE (user)-[:HAS_BUFFER]->(buffer)
"""
return user_memory_cypher
def user_query_to_edges_and_nodes(self, input: str) ->KnowledgeGraph:
@ -459,6 +458,35 @@ class Neo4jGraphDB(AbstractGraphDB):
logging.error(f"An error occurred while retrieving document categories: {str(e)}")
return None
async def get_document_ids(self, user_id: str, category: str):
"""
Retrieve a list of document IDs for a specific category associated with a given user.
This function executes a Cypher query in a Neo4j database to fetch the IDs
of all 'Document' nodes in a specific category that are linked to the 'SemanticMemory' node of the specified user.
Parameters:
- user_id (str): The unique identifier of the user.
- category (str): The specific document category to filter by.
Returns:
- List[str]: A list of document IDs in the specified category associated with the user.
Raises:
- Exception: If an error occurs during the database query execution.
"""
try:
query = f'''
MATCH (user:User {{userId: '{user_id}' }})-[:HAS_SEMANTIC_MEMORY]->(semantic:SemanticMemory)-[:HAS_DOCUMENT]->(document:Document {{documentCategory: '{category}'}})
RETURN document.d_id AS d_id
'''
logging.info(f"Generated Cypher query: {query}")
return query
except Exception as e:
logging.error(f"An error occurred while retrieving document IDs: {str(e)}")
return None
def create_document_node_cypher(self, document_summary: dict, user_id: str) -> str:
"""
Generate a Cypher query to create a Document node linked to a SemanticMemory node for a user.

View file

@ -1,5 +1,5 @@
from pydantic import BaseModel
from pydantic import BaseModel, Field
from cognitive_architecture.database.graph_database.graph import Neo4jGraphDB
from cognitive_architecture.database.postgres.models.memory import MemoryModel
from cognitive_architecture.classifiers.classifier import classify_documents
@ -21,27 +21,18 @@ from cognitive_architecture.database.postgres.models.docs import DocsModel
from cognitive_architecture.database.postgres.models.memory import MemoryModel
from level_4.cognitive_architecture.database.postgres.models.user import User
from cognitive_architecture.classifiers.classifier import classify_call
# Adds response_model to ChatCompletion
# Allows the return of Pydantic model rather than raw JSON
instructor.patch(OpenAI())
aclient = instructor.patch(OpenAI())
DEFAULT_PRESET = "promethai_chat"
preset_options = [DEFAULT_PRESET]
PROMETHAI_DIR = os.path.join(os.path.expanduser("~"), ".")
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
from cognitive_architecture.config import Config
config = Config()
config.load()
print(config.model)
print(config.openai_key)
from cognitive_architecture.utils import get_document_names
from sqlalchemy.orm import selectinload, joinedload, contains_eager
import logging
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
@ -68,6 +59,18 @@ async def get_vectordb_document_name(session: AsyncSession, user_id: str):
return None
async def get_model_id_name(session: AsyncSession, id: str):
try:
result = await session.execute(
select(MemoryModel.memory_name).where(MemoryModel.id == id).order_by(MemoryModel.created_at.desc())
)
doc_names = [row[0] for row in result.fetchall()]
return doc_names
except Exception as e:
logging.error(f"An error occurred while retrieving the Vectordb_namespace: {str(e)}")
return None
async def get_vectordb_data(session: AsyncSession, user_id: str):
"""
@ -119,6 +122,42 @@ async def get_vectordb_data(session: AsyncSession, user_id: str):
# Handle the exception as needed
print(f"An error occurred: {e}")
return None
async def get_memory_name_by_doc_id(session: AsyncSession, docs_id: str):
"""
Asynchronously retrieves memory names associated with a specific document ID.
This function executes a database query to fetch memory names linked to a document
through operations. The query is filtered based on a given document ID and retrieves
only the memory names without loading the entire Operation entity.
Parameters:
- session (AsyncSession): The database session for executing the query.
- docs_id (str): The unique identifier of the document.
Returns:
- List[str]: A list of memory names associated with the given document ID.
Returns None if an exception occurs.
Raises:
- Exception: Propagates any exceptions that occur during query execution.
"""
try:
result = await session.execute(
select(MemoryModel.memory_name)
.join(Operation, Operation.id == MemoryModel.operation_id) # Join with Operation
.join(DocsModel, DocsModel.operation_id == Operation.id) # Join with DocsModel
.where(DocsModel.id == docs_id) # Filtering based on the passed document ID
.distinct() # To avoid duplicate memory names
)
memory_names = [row[0] for row in result.fetchall()]
return memory_names
except Exception as e:
# Handle the exception as needed
print(f"An error occurred: {e}")
return None
async def load_documents_to_vectorstore(session: AsyncSession, user_id: str, job_id:str=None, loader_settings:dict=None):
@ -127,12 +166,6 @@ async def load_documents_to_vectorstore(session: AsyncSession, user_id: str, job
logging.info("Namespace created with id %s", namespace_id)
# try:
# new_user = User(id=user_id)
# await add_entity(session, new_user)
# except:
# pass
new_user = User(id=user_id)
await add_entity(session, new_user)
@ -149,6 +182,8 @@ async def load_documents_to_vectorstore(session: AsyncSession, user_id: str, job
),
)
memory = await Memory.create_memory(user_id, session, namespace=namespace_id, job_id=job_id, memory_label=namespace_id)
document_names = get_document_names(loader_settings.get("path", "None"))
for doc in document_names:
@ -220,8 +255,6 @@ async def user_query_to_graph_db(session: AsyncSession, user_id: str, query_inpu
)
neo4j_graph_db = Neo4jGraphDB(url=config.graph_database_url, username=config.graph_database_username, password=config.graph_database_password)
# # Generate the Cypher query for a specific user
# user_id = 'user123' # Replace with the actual user ID
cypher_query = await neo4j_graph_db.generate_cypher_query_for_user_prompt_decomposition(user_id,query_input)
result = neo4j_graph_db.query(cypher_query)
@ -296,9 +329,58 @@ async def add_documents_to_graph_db(postgres_session: AsyncSession, user_id: str
except:
pass
class ResponseString(BaseModel):
response: str = Field(..., default_factory=list)
async def user_context_enrichment(session, user_id, query):
""""""
#
def generate_graph(input) -> ResponseString:
out = aclient.chat.completions.create(
model="gpt-4-1106-preview",
messages=[
{
"role": "user",
"content": f"""Use the given context to answer query and use help of associated context: {input}. """,
},
{ "role":"system", "content": """You are a top-tier algorithm
designed for using context summaries based on cognitive psychology to answer user queries, and provide a simple response.
Do not mention anything explicit about cognitive architecture, but use the context to answer the query."""}
],
response_model=ResponseString,
)
return out
async def user_context_enrichment(session, user_id:str, query:str)->str:
"""
Asynchronously enriches the user context by integrating various memory systems and document classifications.
This function uses cognitive architecture to access and manipulate different memory systems (semantic, episodic, and procedural) associated with a user. It fetches memory details from a Neo4j graph database, classifies document categories based on the user's query, and retrieves document IDs for relevant categories. The function also dynamically manages memory attributes and methods, extending the context with document store information to enrich the user's query response.
Parameters:
- session (AsyncSession): The database session for executing queries.
- user_id (str): The unique identifier of the user.
- query (str): The original query from the user.
Returns:
- str: The final enriched context after integrating various memory systems and document classifications.
The function performs several key operations:
1. Retrieves semantic and episodic memory details for the user from the Neo4j graph database.
2. Logs and classifies document categories relevant to the user's query.
3. Fetches document IDs from Neo4j and corresponding memory names from a PostgreSQL database.
4. Dynamically manages memory attributes and methods, including the addition of methods like 'add_memories', 'fetch_memories', and 'delete_memories' to the memory class.
5. Extends the context with document store information relevant to the user's query.
6. Generates and logs the final result after processing and integrating all information.
Raises:
- Exception: Propagates any exceptions that occur during database operations or memory management.
Example Usage:
```python
enriched_context = await user_context_enrichment(session, "user123", "How does cognitive architecture work?")
```
"""
neo4j_graph_db = Neo4jGraphDB(url=config.graph_database_url, username=config.graph_database_username,
password=config.graph_database_password)
@ -320,156 +402,49 @@ async def user_context_enrichment(session, user_id, query):
relevant_categories = await classify_call( query= query, context = context, document_types=str(categories))
logging.info("Relevant categories after the classifier are %s", relevant_categories)
# memory = await Memory.create_memory(user_id, session, namespace=namespace_id, job_id=job_id,
# memory_label=namespace_id)
#
# existing_user = await Memory.check_existing_user(user_id, session)
# print("here is the existing user", existing_user)
# await memory.manage_memory_attributes(existing_user)
#
# print("Namespace id is %s", namespace_id)
# await memory.add_dynamic_memory_class(namespace_id.lower(), namespace_id)
#
# dynamic_memory_class = getattr(memory, namespace_class.lower(), None)
#
# methods_to_add = ["add_memories", "fetch_memories", "delete_memories"]
#
# if dynamic_memory_class is not None:
# for method_name in methods_to_add:
# await memory.add_method_to_class(dynamic_memory_class, method_name)
# print(f"Memory method {method_name} has been added")
# else:
# print(f"No attribute named in memory.")
#
# print("Available memory classes:", await memory.list_memory_classes())
# result = await memory.dynamic_method_call(dynamic_memory_class, 'fetch_memories',
# observation='some_observation')
get_doc_ids = await neo4j_graph_db.get_document_ids(user_id, relevant_categories)
postgres_id = neo4j_graph_db.query(get_doc_ids)
logging.info("Postgres ids are %s", postgres_id)
namespace_id = await get_memory_name_by_doc_id(session, postgres_id[0]["d_id"])
logging.info("Namespace ids are %s", namespace_id)
namespace_id = namespace_id[0]
namespace_class = namespace_id + "_class"
memory = await Memory.create_memory(user_id, session, namespace=namespace_id, job_id="23232",
memory_label=namespace_id)
existing_user = await Memory.check_existing_user(user_id, session)
print("here is the existing user", existing_user)
await memory.manage_memory_attributes(existing_user)
print("Namespace id is %s", namespace_id)
await memory.add_dynamic_memory_class(namespace_id.lower(), namespace_id)
dynamic_memory_class = getattr(memory, namespace_class.lower(), None)
methods_to_add = ["add_memories", "fetch_memories", "delete_memories"]
if dynamic_memory_class is not None:
for method_name in methods_to_add:
await memory.add_method_to_class(dynamic_memory_class, method_name)
print(f"Memory method {method_name} has been added")
else:
print(f"No attribute named in memory.")
print("Available memory classes:", await memory.list_memory_classes())
result = await memory.dynamic_method_call(dynamic_memory_class, 'fetch_memories',
observation=query)
context_extension = "Document store information that can help and enrich the anwer is: " + str(result)
entire_context = context + context_extension
final_result = generate_graph(entire_context)
logging.info("Final result is %s", final_result)
return final_result
# fetch_namespace_from_graph = neo4j_graph_db.get_namespaces_by_document_category(user_id=user_id, category=relevant_categories)
#
# results = []
#
# for namespace in fetch_namespace_from_graph:
# memory = await Memory.create_memory(user_id, session, namespace=namespace)
#
# # Managing memory attributes
# existing_user = await Memory.check_existing_user(user_id, session)
# print("here is the existing user", existing_user)
# await memory.manage_memory_attributes(existing_user)
# namespace_class = namespace
# memory = await Memory.create_memory(user_id, session, namespace=namespace_class)
#
# # Managing memory attributes
# existing_user = await Memory.check_existing_user(user_id, session)
# print("here is the existing user", existing_user)
# await memory.manage_memory_attributes(existing_user)
#
#
# dynamic_memory_class = getattr(memory, namespace_class.lower(), None)
#
# await memory.add_dynamic_memory_class(dynamic_memory_class, namespace_class)
# await memory.add_method_to_class(dynamic_memory_class, "fetch_memories")
# raw_document_output = await memory.dynamic_method_call(
# memory.semanticmemory_class,
# "fetch_memories",
# observation=context,
# )
# from openai import OpenAI
# import instructor
#
# # Enables `response_model`
# client = instructor.patch(OpenAI())
#
# format_query_via_gpt = f""" Provide an answer to the user query: {query} Context is: {context}, Document store information is {raw_document_output} """
#
# class UserResponse(BaseModel):
# response: str
#
#
# user = client.chat.completions.create(
# model=config.model,
# response_model=UserResponse,
# messages=[
# {"role": "user", "content": format_query_via_gpt},
# ]
# )
#
# results.append(user.response)
#
# return results
# query_input = "I walked in the forest yesterday and added to my list I need to buy some milk in the store"
#
# # Generate the knowledge graph from the user input
# knowledge_graph = generate_graph(query_input)
# visualize_knowledge_graph(knowledge_graph)
# # out = knowledge_graph.dict()
# # print(out)
# #
# graph: KnowledgeGraph = generate_graph("I walked in the forest yesterday and added to my list I need to buy some milk in the store")
# graph_dic = graph.dict()
#
# node_variable_mapping = create_node_variable_mapping(graph_dic['nodes'])
# edge_variable_mapping = create_edge_variable_mapping(graph_dic['edges'])
# # Create unique variable names for each node
# unique_node_variable_mapping = append_uuid_to_variable_names(node_variable_mapping)
# unique_edge_variable_mapping = append_uuid_to_variable_names(edge_variable_mapping)
# create_nodes_statements = generate_create_statements_for_nodes_with_uuid(graph_dic['nodes'], unique_node_variable_mapping)
# create_edges_statements = generate_create_statements_for_edges_with_uuid(graph_dic['edges'], unique_node_variable_mapping)
#
# memory_type_statements_with_uuid_and_time_context = generate_memory_type_relationships_with_uuid_and_time_context(
# graph_dic['nodes'], unique_node_variable_mapping)
#
# # # Combine all statements
# cypher_statements = [create_base_queries_from_user(user_id)] + create_nodes_statements + create_edges_statements + memory_type_statements_with_uuid_and_time_context
# cypher_statements_joined = "\n".join(cypher_statements)
#
#
#
# execute_cypher_query(cypher_statements_joined)
# bartleby_summary = {
# "document_category": "Classic Literature",
# "title": "Bartleby, the Scrivener",
# "summary": (
# "Bartleby, the Scrivener: A Story of Wall Street' is a short story by Herman Melville "
# "that tells the tale of Bartleby, a scrivener, or copyist, who works for a Manhattan "
# "lawyer. Initially, Bartleby is a competent and industrious worker. However, one day, "
# "when asked to proofread a document, he responds with what becomes his constant refrain "
# "to any request: 'I would prefer not to.' As the story progresses, Bartleby becomes "
# "increasingly passive, refusing not just work but also food and eventually life itself, "
# "as he spirals into a state of passive resistance. The lawyer, the narrator of the story, "
# "is both fascinated and frustrated by Bartleby's behavior. Despite attempts to understand "
# "and help him, Bartleby remains an enigmatic figure, his motives and thoughts unexplained. "
# "He is eventually evicted from the office and later found dead in a prison yard, having "
# "preferred not to live. The story is a meditation on the themes of isolation, societal "
# "obligation, and the inexplicable nature of human behavior."
# )
# }
# rs = create_document_node_cypher(bartleby_summary, user_id)
#
# parameters = {
# 'user_id': user_id,
# 'title': bartleby_summary['title'],
# 'summary': bartleby_summary['summary'],
# 'document_category': bartleby_summary['document_category']
# }
#
# execute_cypher_query(rs, parameters)
async def main():
user_id = "user"
@ -498,28 +473,10 @@ async def main():
# await user_query_to_graph_db(session, user_id, "I walked in the forest yesterday and added to my list I need to buy some milk in the store and get a summary from a classical book i read yesterday")
# await add_documents_to_graph_db(session, user_id, loader_settings=loader_settings)
ee = await user_context_enrichment(session, user_id, query="I walked in the forest yesterday and added to my list I need to buy some milk in the store and i am curious about a book i read yesterday about Bartleby, the Scrivener")
ee = await user_context_enrichment(session, user_id, query="Tell me about the book I read yesterday")
# memory_instance = Memory(namespace='SEMANTICMEMORY')
# sss = await memory_instance.dynamic_method_call(memory_instance.semantic_memory_class, 'fetch_memories', observation='some_observation')
# from cognitive_architecture.vectorstore_manager import Memory
#
#
# memory = await Memory.create_memory("676", session, namespace="SEMANTICMEMORY")
#
# # Adding a memory instance
# await memory.add_memory_instance("ExampleMemory")
#
# # Managing memory attributes
# existing_user = await Memory.check_existing_user("676", session)
# print("here is the existing user", existing_user)
# await memory.manage_memory_attributes(existing_user)
# # aeehuvyq_semanticememory_class
#
# await memory.add_dynamic_memory_class("semanticmemory", "SEMANTICMEMORY")
# await memory.add_method_to_class(memory.semanticmemory_class, "add_memories")
# # await memory.add_method_to_class(memory.semanticmemory_class, "fetch_memories")
# sss = await memory.dynamic_method_call(memory.semanticmemory_class, 'add_memories',
# observation='some_observation', params=params, loader_settings=loader_settings)
# print(rs)
@ -528,8 +485,3 @@ if __name__ == "__main__":
asyncio.run(main())
#1. decompose query
#2. add document to vectordb
#3. add document to graph
#4. fetch relevant memories from semantic, episodic