From 51552891b58a9fec2ccfcc8d8043c10576c6b0db Mon Sep 17 00:00:00 2001 From: Vasilije <8619304+Vasilije1990@users.noreply.github.com> Date: Fri, 17 Nov 2023 18:56:46 +0100 Subject: [PATCH] Added initial API logic, crud for graph, connected vectordb and graph --- level_4/Dockerfile | 9 +- level_4/api.py | 489 ++++++++++++++++ level_4/cognitive_architecture/api.py | 521 ------------------ .../classifiers/classifier.py | 44 +- .../{ => database}/create_database.py | 11 +- .../database/graph_database/graph.py | 58 +- .../database/postgres/database_crud.py | 2 - .../database/postgres/models/operation.py | 2 - .../database/postgres/models/testoutput.py | 44 -- .../database/postgres/models/testset.py | 24 - .../database/postgres/models/user.py | 2 - .../database/vectordb/basevectordb.py | 14 +- .../database/vectordb/chunkers/chunkers.py | 2 +- .../database/vectordb/loaders/loaders.py | 5 +- .../database/vectordb/vectordb.py | 2 +- .../cognitive_architecture/fetch_secret.py | 13 +- .../vectorstore_manager.py | 12 +- .../entrypoint.sh | 4 +- level_4/main.py | 251 ++++++--- 19 files changed, 815 insertions(+), 694 deletions(-) create mode 100644 level_4/api.py delete mode 100644 level_4/cognitive_architecture/api.py rename level_4/cognitive_architecture/{ => database}/create_database.py (88%) delete mode 100644 level_4/cognitive_architecture/database/postgres/models/testoutput.py delete mode 100644 level_4/cognitive_architecture/database/postgres/models/testset.py rename level_4/{cognitive_architecture => }/entrypoint.sh (81%) diff --git a/level_4/Dockerfile b/level_4/Dockerfile index 8633d2419..8faf4d439 100644 --- a/level_4/Dockerfile +++ b/level_4/Dockerfile @@ -42,9 +42,12 @@ RUN apt-get update -q && \ #RUN playwright install-deps WORKDIR /app -COPY cognitive_architecture/ /app +COPY cognitive_architecture/ /app/cognitive_architecture +COPY main.py /app +COPY api.py /app -COPY cognitive_architecture/entrypoint.sh /app/entrypoint.sh + +COPY entrypoint.sh /app/entrypoint.sh RUN chmod +x /app/entrypoint.sh -ENTRYPOINT ["/app/entrypoint.sh"] \ No newline at end of file +ENTRYPOINT ["/app/cognitive_architecture/entrypoint.sh"] \ No newline at end of file diff --git a/level_4/api.py b/level_4/api.py new file mode 100644 index 000000000..9d5970456 --- /dev/null +++ b/level_4/api.py @@ -0,0 +1,489 @@ +import json +import logging +import os +from enum import Enum +from typing import Dict, Any + +import uvicorn +from fastapi import FastAPI, BackgroundTasks, HTTPException +from fastapi.responses import JSONResponse +from pydantic import BaseModel + +from cognitive_architecture.database.postgres.database import AsyncSessionLocal +from cognitive_architecture.database.postgres.database_crud import session_scope +from cognitive_architecture.vectorstore_manager import Memory +from dotenv import load_dotenv + +from cognitive_architecture.config import Config + +# Set up logging +logging.basicConfig( + level=logging.INFO, # Set the logging level (e.g., DEBUG, INFO, WARNING, ERROR, CRITICAL) + format="%(asctime)s [%(levelname)s] %(message)s", # Set the log message format +) + +logger = logging.getLogger(__name__) + + +load_dotenv() +OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") +app = FastAPI(debug=True) +# +# from auth.cognito.JWTBearer import JWTBearer +# from auth.auth import jwks +# +# auth = JWTBearer(jwks) + +from fastapi import Depends + + +config = Config() +config.load() + +class ImageResponse(BaseModel): + success: bool + message: str + + +@app.get( + "/", +) +async def root(): + """ + Root endpoint that returns a welcome message. + """ + return {"message": "Hello, World, I am alive!"} + + +@app.get("/health") +def health_check(): + """ + Health check endpoint that returns the server status. + """ + return {"status": "OK"} + + + + +class Payload(BaseModel): + payload: Dict[str, Any] + +@app.post("/add-memory", response_model=dict) +async def add_memory( + payload: Payload, + # files: List[UploadFile] = File(...), +): + try: + logging.info(" Adding to Memory ") + decoded_payload = payload.payload + async with session_scope(session=AsyncSessionLocal()) as session: + from main import load_documents_to_vectorstore + + output = await load_documents_to_vectorstore(session, decoded_payload['user_id'], loader_settings=decoded_payload['settings']) + return JSONResponse(content={"response": output}, status_code=200) + + except Exception as e: + return JSONResponse( + content={"response": {"error": str(e)}}, status_code=503 + ) + +@app.post("/user-to-graph-query") +async def generate_cypher_query(payload: Payload,): + try: + + + from cognitive_architecture.database.graph_database.graph import Neo4jGraphDB + neo4j_graph_db = Neo4jGraphDB(config.graph_database_url, config.graph_database_username, config.graph_database_password) + decoded_payload = payload.payload + cypher_query = await neo4j_graph_db.generate_cypher_query_for_user_prompt_decomposition(decoded_payload['user_id'], + decoded_payload['prompt']) + + # Execute the query - replace this with the actual execution method + async with session_scope(session=AsyncSessionLocal()) as session: + # Assuming you have a method in Neo4jGraphDB to execute the query + result = await neo4j_graph_db.query(cypher_query, session) + + return result + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) +@app.post("/user-to-document-summary") +async def generate_document_summary(payload: Payload,): + try: + from database.graph_database.graph import Neo4jGraphDB + neo4j_graph_db = Neo4jGraphDB(config.graph_database_url, config.graph_database_username, config.graph_database_password) + decoded_payload = payload.payload + call_of_the_wild_summary = { + "user_id": decoded_payload["user_id"], + "document_category": "Classic Literature", + "title": "The Call of the Wild", + "summary": ( + "'The Call of the Wild' is a novel by Jack London set in the Yukon during the 1890s Klondike " + "Gold Rush—a period when strong sled dogs were in high demand. The novel's central character " + "is a dog named Buck, a domesticated dog living at a ranch in the Santa Clara Valley of California " + "as the story opens. Stolen from his home and sold into the brutal existence of an Alaskan sled dog, " + "he reverts to atavistic traits. Buck is forced to adjust to, and survive, cruel treatments and fight " + "to dominate other dogs in a harsh climate. Eventually, he sheds the veneer of civilization, relying " + "on primordial instincts and lessons he learns, to emerge as a leader in the wild. London drew on his " + "own experiences in the Klondike, and the book provides a snapshot of the epical gold rush and the " + "harsh realities of life in the wilderness. The novel explores themes of morality versus instinct, " + "the struggle for survival in the natural world, and the intrusion of civilization on the wilderness. " + "As Buck's wild nature is awakened, he rises to become a respected and feared leader in the wild, " + "answering the primal call of nature." + ) + } + cypher_query = neo4j_graph_db.create_document_node_cypher(call_of_the_wild_summary, decoded_payload['user_id']) + + neo4j_graph_db.query(cypher_query, call_of_the_wild_summary) + + # Execute the query - replace this with the actual execution method + async with session_scope(session=AsyncSessionLocal()) as session: + # Assuming you have a method in Neo4jGraphDB to execute the query + result = await neo4j_graph_db.query(cypher_query, session) + + return result + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/user-document-vectordb") +async def generate_document_to_vector_db(payload: Payload, ): + try: + from database.graph_database.graph import Neo4jGraphDB + neo4j_graph_db = Neo4jGraphDB(config.graph_database_url, config.graph_database_username, + config.graph_database_password) + decoded_payload = payload.payload + + + neo4j_graph_db.update_document_node_with_namespace(decoded_payload['user_id'], document_title="The Call of the Wild") + + # Execute the query - replace this with the actual execution method + # async with session_scope(session=AsyncSessionLocal()) as session: + # # Assuming you have a method in Neo4jGraphDB to execute the query + # result = await neo4j_graph_db.query(cypher_query, session) + # + # return result + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + + +@app.post("/fetch-memory", response_model=dict) +async def fetch_memory( + payload: Payload, + # files: List[UploadFile] = File(...), +): + try: + logging.info(" Adding to Memory ") + decoded_payload = payload.payload + async with session_scope(session=AsyncSessionLocal()) as session: + memory = await Memory.create_memory( + decoded_payload["user_id"], session, "SEMANTICMEMORY", namespace="SEMANTICMEMORY" + ) + + # Adding a memory instance + await memory.add_memory_instance(decoded_payload["memory_object"]) + + # Managing memory attributes + existing_user = await Memory.check_existing_user( + decoded_payload["user_id"], session + ) + await memory.manage_memory_attributes(existing_user) + await memory.add_dynamic_memory_class( + decoded_payload["memory_object"], + decoded_payload["memory_object"].upper(), + ) + memory_class = decoded_payload["memory_object"] + "_class" + dynamic_memory_class = getattr(memory, memory_class.lower(), None) + + await memory.add_method_to_class(dynamic_memory_class, "add_memories") + # await memory.add_method_to_class(memory.semanticmemory_class, 'fetch_memories') + output = await memory.dynamic_method_call( + dynamic_memory_class, + "fetch_memories", + observation=decoded_payload["observation"], + ) + return JSONResponse(content={"response": output}, status_code=200) + + except Exception as e: + return JSONResponse( + content={"response": {"error": str(e)}}, status_code=503 + ) + +@app.post("/delete-memory", response_model=dict) +async def delete_memory( + payload: Payload, + # files: List[UploadFile] = File(...), +): + try: + logging.info(" Adding to Memory ") + decoded_payload = payload.payload + async with session_scope(session=AsyncSessionLocal()) as session: + memory = await Memory.create_memory( + decoded_payload["user_id"], session, namespace="SEMANTICMEMORY" + ) + + # Adding a memory instance + await memory.add_memory_instance(decoded_payload["memory_object"]) + + # Managing memory attributes + existing_user = await Memory.check_existing_user( + decoded_payload["user_id"], session + ) + await memory.manage_memory_attributes(existing_user) + await memory.add_dynamic_memory_class( + decoded_payload["memory_object"], + decoded_payload["memory_object"].upper(), + ) + memory_class = decoded_payload["memory_object"] + "_class" + dynamic_memory_class = getattr(memory, memory_class.lower(), None) + + await memory.add_method_to_class( + dynamic_memory_class, "delete_memories" + ) + # await memory.add_method_to_class(memory.semanticmemory_class, 'fetch_memories') + output = await memory.dynamic_method_call( + dynamic_memory_class, + "delete_memories", + namespace=decoded_payload["memory_object"].upper(), + ) + return JSONResponse(content={"response": output}, status_code=200) + + except Exception as e: + return JSONResponse( + content={"response": {"error": str(e)}}, status_code=503 + ) + + +class TestSetType(Enum): + SAMPLE = "sample" + MANUAL = "manual" + +def get_test_set(test_set_type, folder_path="example_data", payload=None): + if test_set_type == TestSetType.SAMPLE: + file_path = os.path.join(folder_path, "test_set.json") + if os.path.isfile(file_path): + with open(file_path, "r") as file: + return json.load(file) + elif test_set_type == TestSetType.MANUAL: + # Check if the manual test set is provided in the payload + if payload and "manual_test_set" in payload: + return payload["manual_test_set"] + else: + # Attempt to load the manual test set from a file + pass + + return None + + +class MetadataType(Enum): + SAMPLE = "sample" + MANUAL = "manual" + +def get_metadata(metadata_type, folder_path="example_data", payload=None): + if metadata_type == MetadataType.SAMPLE: + file_path = os.path.join(folder_path, "metadata.json") + if os.path.isfile(file_path): + with open(file_path, "r") as file: + return json.load(file) + elif metadata_type == MetadataType.MANUAL: + # Check if the manual metadata is provided in the payload + if payload and "manual_metadata" in payload: + return payload["manual_metadata"] + else: + pass + + return None + + + + + +# @app.post("/rag-test/rag_test_run", response_model=dict) +# async def rag_test_run( +# payload: Payload, +# background_tasks: BackgroundTasks, +# ): +# try: +# logging.info("Starting RAG Test") +# decoded_payload = payload.payload +# test_set_type = TestSetType(decoded_payload['test_set']) +# +# metadata_type = MetadataType(decoded_payload['metadata']) +# +# metadata = get_metadata(metadata_type, payload=decoded_payload) +# if metadata is None: +# return JSONResponse(content={"response": "Invalid metadata value"}, status_code=400) +# +# test_set = get_test_set(test_set_type, payload=decoded_payload) +# if test_set is None: +# return JSONResponse(content={"response": "Invalid test_set value"}, status_code=400) +# +# async def run_start_test(data, test_set, user_id, params, metadata, retriever_type): +# result = await start_test(data = data, test_set = test_set, user_id =user_id, params =params, metadata =metadata, retriever_type=retriever_type) +# +# logging.info("Retriever DATA type", type(decoded_payload['data'])) +# +# background_tasks.add_task( +# run_start_test, +# decoded_payload['data'], +# test_set, +# decoded_payload['user_id'], +# decoded_payload['params'], +# metadata, +# decoded_payload['retriever_type'] +# ) +# +# logging.info("Retriever type", decoded_payload['retriever_type']) +# return JSONResponse(content={"response": "Task has been started"}, status_code=200) +# +# except Exception as e: +# return JSONResponse( +# +# content={"response": {"error": str(e)}}, status_code=503 +# +# ) + + +# @app.get("/rag-test/{task_id}") +# async def check_task_status(task_id: int): +# task_status = task_status_db.get(task_id, "not_found") +# +# if task_status == "not_found": +# return {"status": "Task not found"} +# +# return {"status": task_status} + +# @app.get("/available-buffer-actions", response_model=dict) +# async def available_buffer_actions( +# payload: Payload, +# # files: List[UploadFile] = File(...), +# ): +# try: +# decoded_payload = payload.payload +# +# Memory_ = Memory(user_id=decoded_payload["user_id"]) +# +# await Memory_.async_init() +# +# # memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None) +# output = await Memory_._available_operations() +# return JSONResponse(content={"response": output}, status_code=200) +# +# except Exception as e: +# return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) + + +# @app.post("/run-buffer", response_model=dict) +# async def run_buffer( +# payload: Payload, +# # files: List[UploadFile] = File(...), +# ): +# try: +# decoded_payload = payload.payload +# +# Memory_ = Memory(user_id=decoded_payload["user_id"]) +# +# await Memory_.async_init() +# +# # memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None) +# output = await Memory_._run_main_buffer( +# user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"] +# ) +# return JSONResponse(content={"response": output}, status_code=200) +# +# except Exception as e: +# return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) +# +# +# @app.post("/buffer/create-context", response_model=dict) +# async def create_context( +# payload: Payload, +# # files: List[UploadFile] = File(...), +# ): +# try: +# decoded_payload = payload.payload +# +# Memory_ = Memory(user_id=decoded_payload["user_id"]) +# +# await Memory_.async_init() +# +# # memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None) +# output = await Memory_._create_buffer_context( +# user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"] +# ) +# return JSONResponse(content={"response": output}, status_code=200) +# +# except Exception as e: +# return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) +# +# +# @app.post("/buffer/get-tasks", response_model=dict) +# async def create_context( +# payload: Payload, +# # files: List[UploadFile] = File(...), +# ): +# try: +# decoded_payload = payload.payload +# +# Memory_ = Memory(user_id=decoded_payload["user_id"]) +# +# await Memory_.async_init() +# +# # memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None) +# output = await Memory_._get_task_list( +# user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"] +# ) +# return JSONResponse(content={"response": output}, status_code=200) +# +# except Exception as e: +# return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) +# +# +# @app.post("/buffer/provide-feedback", response_model=dict) +# async def provide_feedback( +# payload: Payload, +# # files: List[UploadFile] = File(...), +# ): +# try: +# decoded_payload = payload.payload +# +# Memory_ = Memory(user_id=decoded_payload["user_id"]) +# +# await Memory_.async_init() +# +# # memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None) +# if decoded_payload["total_score"] is None: +# +# output = await Memory_._provide_feedback( +# user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=None, total_score=decoded_payload["total_score"] +# ) +# return JSONResponse(content={"response": output}, status_code=200) +# else: +# output = await Memory_._provide_feedback( +# user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"], total_score=None +# ) +# return JSONResponse(content={"response": output}, status_code=200) +# +# +# except Exception as e: +# return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) +def start_api_server(host: str = "0.0.0.0", port: int = 8000): + """ + Start the API server using uvicorn. + + Parameters: + host (str): The host for the server. + port (int): The port for the server. + """ + try: + logger.info(f"Starting server at {host}:{port}") + uvicorn.run(app, host=host, port=port) + except Exception as e: + logger.exception(f"Failed to start server: {e}") + # Here you could add any cleanup code or error recovery code. + + +if __name__ == "__main__": + start_api_server() diff --git a/level_4/cognitive_architecture/api.py b/level_4/cognitive_architecture/api.py deleted file mode 100644 index 086a321eb..000000000 --- a/level_4/cognitive_architecture/api.py +++ /dev/null @@ -1,521 +0,0 @@ -import json -import logging -import os -from enum import Enum -from typing import Dict, Any - -import uvicorn -from fastapi import FastAPI, BackgroundTasks, HTTPException -from fastapi.responses import JSONResponse -from pydantic import BaseModel - -from database.postgres.database import AsyncSessionLocal -from database.postgres.database_crud import session_scope -from vectorstore_manager import Memory -from dotenv import load_dotenv - - -# Set up logging -logging.basicConfig( - level=logging.INFO, # Set the logging level (e.g., DEBUG, INFO, WARNING, ERROR, CRITICAL) - format="%(asctime)s [%(levelname)s] %(message)s", # Set the log message format -) - -logger = logging.getLogger(__name__) - - -load_dotenv() -OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") -app = FastAPI(debug=True) -# -# from auth.cognito.JWTBearer import JWTBearer -# from auth.auth import jwks -# -# auth = JWTBearer(jwks) - -from fastapi import Depends - -from config import Config - -config = Config() -config.load() - -class ImageResponse(BaseModel): - success: bool - message: str - - -@app.get( - "/", -) -async def root(): - """ - Root endpoint that returns a welcome message. - """ - return {"message": "Hello, World, I am alive!"} - - -@app.get("/health") -def health_check(): - """ - Health check endpoint that returns the server status. - """ - return {"status": "OK"} - - -class Payload(BaseModel): - payload: Dict[str, Any] - - -def memory_factory(memory_type): - load_dotenv() - - class Payload(BaseModel): - payload: Dict[str, Any] - - @app.post("/{memory_type}/add-memory", response_model=dict) - async def add_memory( - payload: Payload, - # files: List[UploadFile] = File(...), - ): - try: - logging.info(" Adding to Memory ") - decoded_payload = payload.payload - async with session_scope(session=AsyncSessionLocal()) as session: - memory = await Memory.create_memory( - decoded_payload["user_id"], session,"SEMANTICMEMORY", namespace="SEMANTICMEMORY" - ) - - # Adding a memory instance - await memory.add_memory_instance(decoded_payload["memory_object"]) - - # Managing memory attributes - existing_user = await Memory.check_existing_user( - decoded_payload["user_id"], session - ) - await memory.manage_memory_attributes(existing_user) - await memory.add_dynamic_memory_class( - decoded_payload["memory_object"], - decoded_payload["memory_object"].upper(), - ) - memory_class = decoded_payload["memory_object"] + "_class" - dynamic_memory_class = getattr(memory, memory_class.lower(), None) - - await memory.add_method_to_class(dynamic_memory_class, "add_memories") - # await memory.add_method_to_class(memory.semanticmemory_class, 'fetch_memories') - output = await memory.dynamic_method_call( - dynamic_memory_class, - "add_memories", - observation="some_observation", - params=decoded_payload["params"], - loader_settings=decoded_payload["loader_settings"], - ) - return JSONResponse(content={"response": output}, status_code=200) - - except Exception as e: - return JSONResponse( - content={"response": {"error": str(e)}}, status_code=503 - ) - - @app.post("/user-to-graph-query") - async def generate_cypher_query(payload: Payload,): - try: - - from database.graph_database.graph import Neo4jGraphDB - neo4j_graph_db = Neo4jGraphDB(config.graph_database_url, config.graph_database_username, config.graph_database_password) - decoded_payload = payload.payload - cypher_query = await neo4j_graph_db.generate_cypher_query_for_user_prompt_decomposition(decoded_payload['user_id'], - decoded_payload['prompt']) - - # Execute the query - replace this with the actual execution method - async with session_scope(session=AsyncSessionLocal()) as session: - # Assuming you have a method in Neo4jGraphDB to execute the query - result = await neo4j_graph_db.query(cypher_query, session) - - return result - - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - @app.post("/user-to-document-summary") - async def generate_document_summary(payload: Payload,): - try: - from database.graph_database.graph import Neo4jGraphDB - neo4j_graph_db = Neo4jGraphDB(config.graph_database_url, config.graph_database_username, config.graph_database_password) - decoded_payload = payload.payload - call_of_the_wild_summary = { - "user_id": decoded_payload["user_id"], - "document_category": "Classic Literature", - "title": "The Call of the Wild", - "summary": ( - "'The Call of the Wild' is a novel by Jack London set in the Yukon during the 1890s Klondike " - "Gold Rush—a period when strong sled dogs were in high demand. The novel's central character " - "is a dog named Buck, a domesticated dog living at a ranch in the Santa Clara Valley of California " - "as the story opens. Stolen from his home and sold into the brutal existence of an Alaskan sled dog, " - "he reverts to atavistic traits. Buck is forced to adjust to, and survive, cruel treatments and fight " - "to dominate other dogs in a harsh climate. Eventually, he sheds the veneer of civilization, relying " - "on primordial instincts and lessons he learns, to emerge as a leader in the wild. London drew on his " - "own experiences in the Klondike, and the book provides a snapshot of the epical gold rush and the " - "harsh realities of life in the wilderness. The novel explores themes of morality versus instinct, " - "the struggle for survival in the natural world, and the intrusion of civilization on the wilderness. " - "As Buck's wild nature is awakened, he rises to become a respected and feared leader in the wild, " - "answering the primal call of nature." - ) - } - cypher_query = neo4j_graph_db.create_document_node_cypher(call_of_the_wild_summary, decoded_payload['user_id']) - - neo4j_graph_db.query(cypher_query, call_of_the_wild_summary) - - # Execute the query - replace this with the actual execution method - async with session_scope(session=AsyncSessionLocal()) as session: - # Assuming you have a method in Neo4jGraphDB to execute the query - result = await neo4j_graph_db.query(cypher_query, session) - - return result - - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - - @app.post("/user-document-vectordb") - async def generate_document_to_vector_db(payload: Payload, ): - try: - from database.graph_database.graph import Neo4jGraphDB - neo4j_graph_db = Neo4jGraphDB(config.graph_database_url, config.graph_database_username, - config.graph_database_password) - decoded_payload = payload.payload - - - neo4j_graph_db.update_document_node_with_namespace(decoded_payload['user_id'], document_title="The Call of the Wild") - - # Execute the query - replace this with the actual execution method - # async with session_scope(session=AsyncSessionLocal()) as session: - # # Assuming you have a method in Neo4jGraphDB to execute the query - # result = await neo4j_graph_db.query(cypher_query, session) - # - # return result - - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - - - - @app.post("/{memory_type}/fetch-memory", response_model=dict) - async def fetch_memory( - payload: Payload, - # files: List[UploadFile] = File(...), - ): - try: - logging.info(" Adding to Memory ") - decoded_payload = payload.payload - async with session_scope(session=AsyncSessionLocal()) as session: - memory = await Memory.create_memory( - decoded_payload["user_id"], session, "SEMANTICMEMORY", namespace="SEMANTICMEMORY" - ) - - # Adding a memory instance - await memory.add_memory_instance(decoded_payload["memory_object"]) - - # Managing memory attributes - existing_user = await Memory.check_existing_user( - decoded_payload["user_id"], session - ) - await memory.manage_memory_attributes(existing_user) - await memory.add_dynamic_memory_class( - decoded_payload["memory_object"], - decoded_payload["memory_object"].upper(), - ) - memory_class = decoded_payload["memory_object"] + "_class" - dynamic_memory_class = getattr(memory, memory_class.lower(), None) - - await memory.add_method_to_class(dynamic_memory_class, "add_memories") - # await memory.add_method_to_class(memory.semanticmemory_class, 'fetch_memories') - output = await memory.dynamic_method_call( - dynamic_memory_class, - "fetch_memories", - observation=decoded_payload["observation"], - ) - return JSONResponse(content={"response": output}, status_code=200) - - except Exception as e: - return JSONResponse( - content={"response": {"error": str(e)}}, status_code=503 - ) - - @app.post("/{memory_type}/delete-memory", response_model=dict) - async def delete_memory( - payload: Payload, - # files: List[UploadFile] = File(...), - ): - try: - logging.info(" Adding to Memory ") - decoded_payload = payload.payload - async with session_scope(session=AsyncSessionLocal()) as session: - memory = await Memory.create_memory( - decoded_payload["user_id"], session, namespace="SEMANTICMEMORY" - ) - - # Adding a memory instance - await memory.add_memory_instance(decoded_payload["memory_object"]) - - # Managing memory attributes - existing_user = await Memory.check_existing_user( - decoded_payload["user_id"], session - ) - await memory.manage_memory_attributes(existing_user) - await memory.add_dynamic_memory_class( - decoded_payload["memory_object"], - decoded_payload["memory_object"].upper(), - ) - memory_class = decoded_payload["memory_object"] + "_class" - dynamic_memory_class = getattr(memory, memory_class.lower(), None) - - await memory.add_method_to_class( - dynamic_memory_class, "delete_memories" - ) - # await memory.add_method_to_class(memory.semanticmemory_class, 'fetch_memories') - output = await memory.dynamic_method_call( - dynamic_memory_class, - "delete_memories", - namespace=decoded_payload["memory_object"].upper(), - ) - return JSONResponse(content={"response": output}, status_code=200) - - except Exception as e: - return JSONResponse( - content={"response": {"error": str(e)}}, status_code=503 - ) - - -memory_list = [ "semantic"] -for memory_type in memory_list: - memory_factory(memory_type) -class TestSetType(Enum): - SAMPLE = "sample" - MANUAL = "manual" - -def get_test_set(test_set_type, folder_path="example_data", payload=None): - if test_set_type == TestSetType.SAMPLE: - file_path = os.path.join(folder_path, "test_set.json") - if os.path.isfile(file_path): - with open(file_path, "r") as file: - return json.load(file) - elif test_set_type == TestSetType.MANUAL: - # Check if the manual test set is provided in the payload - if payload and "manual_test_set" in payload: - return payload["manual_test_set"] - else: - # Attempt to load the manual test set from a file - pass - - return None - - -class MetadataType(Enum): - SAMPLE = "sample" - MANUAL = "manual" - -def get_metadata(metadata_type, folder_path="example_data", payload=None): - if metadata_type == MetadataType.SAMPLE: - file_path = os.path.join(folder_path, "metadata.json") - if os.path.isfile(file_path): - with open(file_path, "r") as file: - return json.load(file) - elif metadata_type == MetadataType.MANUAL: - # Check if the manual metadata is provided in the payload - if payload and "manual_metadata" in payload: - return payload["manual_metadata"] - else: - pass - - return None - - - - - -# @app.post("/rag-test/rag_test_run", response_model=dict) -# async def rag_test_run( -# payload: Payload, -# background_tasks: BackgroundTasks, -# ): -# try: -# logging.info("Starting RAG Test") -# decoded_payload = payload.payload -# test_set_type = TestSetType(decoded_payload['test_set']) -# -# metadata_type = MetadataType(decoded_payload['metadata']) -# -# metadata = get_metadata(metadata_type, payload=decoded_payload) -# if metadata is None: -# return JSONResponse(content={"response": "Invalid metadata value"}, status_code=400) -# -# test_set = get_test_set(test_set_type, payload=decoded_payload) -# if test_set is None: -# return JSONResponse(content={"response": "Invalid test_set value"}, status_code=400) -# -# async def run_start_test(data, test_set, user_id, params, metadata, retriever_type): -# result = await start_test(data = data, test_set = test_set, user_id =user_id, params =params, metadata =metadata, retriever_type=retriever_type) -# -# logging.info("Retriever DATA type", type(decoded_payload['data'])) -# -# background_tasks.add_task( -# run_start_test, -# decoded_payload['data'], -# test_set, -# decoded_payload['user_id'], -# decoded_payload['params'], -# metadata, -# decoded_payload['retriever_type'] -# ) -# -# logging.info("Retriever type", decoded_payload['retriever_type']) -# return JSONResponse(content={"response": "Task has been started"}, status_code=200) -# -# except Exception as e: -# return JSONResponse( -# -# content={"response": {"error": str(e)}}, status_code=503 -# -# ) - - -# @app.get("/rag-test/{task_id}") -# async def check_task_status(task_id: int): -# task_status = task_status_db.get(task_id, "not_found") -# -# if task_status == "not_found": -# return {"status": "Task not found"} -# -# return {"status": task_status} - -# @app.get("/available-buffer-actions", response_model=dict) -# async def available_buffer_actions( -# payload: Payload, -# # files: List[UploadFile] = File(...), -# ): -# try: -# decoded_payload = payload.payload -# -# Memory_ = Memory(user_id=decoded_payload["user_id"]) -# -# await Memory_.async_init() -# -# # memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None) -# output = await Memory_._available_operations() -# return JSONResponse(content={"response": output}, status_code=200) -# -# except Exception as e: -# return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) - - -# @app.post("/run-buffer", response_model=dict) -# async def run_buffer( -# payload: Payload, -# # files: List[UploadFile] = File(...), -# ): -# try: -# decoded_payload = payload.payload -# -# Memory_ = Memory(user_id=decoded_payload["user_id"]) -# -# await Memory_.async_init() -# -# # memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None) -# output = await Memory_._run_main_buffer( -# user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"] -# ) -# return JSONResponse(content={"response": output}, status_code=200) -# -# except Exception as e: -# return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) -# -# -# @app.post("/buffer/create-context", response_model=dict) -# async def create_context( -# payload: Payload, -# # files: List[UploadFile] = File(...), -# ): -# try: -# decoded_payload = payload.payload -# -# Memory_ = Memory(user_id=decoded_payload["user_id"]) -# -# await Memory_.async_init() -# -# # memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None) -# output = await Memory_._create_buffer_context( -# user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"] -# ) -# return JSONResponse(content={"response": output}, status_code=200) -# -# except Exception as e: -# return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) -# -# -# @app.post("/buffer/get-tasks", response_model=dict) -# async def create_context( -# payload: Payload, -# # files: List[UploadFile] = File(...), -# ): -# try: -# decoded_payload = payload.payload -# -# Memory_ = Memory(user_id=decoded_payload["user_id"]) -# -# await Memory_.async_init() -# -# # memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None) -# output = await Memory_._get_task_list( -# user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"] -# ) -# return JSONResponse(content={"response": output}, status_code=200) -# -# except Exception as e: -# return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) -# -# -# @app.post("/buffer/provide-feedback", response_model=dict) -# async def provide_feedback( -# payload: Payload, -# # files: List[UploadFile] = File(...), -# ): -# try: -# decoded_payload = payload.payload -# -# Memory_ = Memory(user_id=decoded_payload["user_id"]) -# -# await Memory_.async_init() -# -# # memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None) -# if decoded_payload["total_score"] is None: -# -# output = await Memory_._provide_feedback( -# user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=None, total_score=decoded_payload["total_score"] -# ) -# return JSONResponse(content={"response": output}, status_code=200) -# else: -# output = await Memory_._provide_feedback( -# user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"], total_score=None -# ) -# return JSONResponse(content={"response": output}, status_code=200) -# -# -# except Exception as e: -# return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) -def start_api_server(host: str = "0.0.0.0", port: int = 8000): - """ - Start the API server using uvicorn. - - Parameters: - host (str): The host for the server. - port (int): The port for the server. - """ - try: - logger.info(f"Starting server at {host}:{port}") - uvicorn.run(app, host=host, port=port) - except Exception as e: - logger.exception(f"Failed to start server: {e}") - # Here you could add any cleanup code or error recovery code. - - -if __name__ == "__main__": - start_api_server() diff --git a/level_4/cognitive_architecture/classifiers/classifier.py b/level_4/cognitive_architecture/classifiers/classifier.py index b1541761f..9d37c5b52 100644 --- a/level_4/cognitive_architecture/classifiers/classifier.py +++ b/level_4/cognitive_architecture/classifiers/classifier.py @@ -21,11 +21,11 @@ from langchain.document_loaders import TextLoader from langchain.document_loaders import DirectoryLoader -async def classify_documents(query): +async def classify_documents(query, document_id): llm = ChatOpenAI(temperature=0, model=config.model) prompt_classify = ChatPromptTemplate.from_template( - """You are a summarizer and classifier. Determine what book this is and where does it belong in the output : {query}""" + """You are a summarizer and classifier. Determine what book this is and where does it belong in the output : {query}, Id: {d_id}""" ) json_structure = [{ "name": "summarizer", @@ -44,13 +44,17 @@ async def classify_documents(query): "Summary": { "type": "string", "description": "The summary of the document" + }, + "d_id": { + "type": "string", + "description": "The id of the document" } - }, "required": ["DocumentCategory", "Title", "Summary"] } + }, "required": ["DocumentCategory", "Title", "Summary","d_id"] } }] chain_filter = prompt_classify | llm.bind(function_call={"name": "summarizer"}, functions=json_structure) - classifier_output = await chain_filter.ainvoke({"query": query}) + classifier_output = await chain_filter.ainvoke({"query": query, "d_id": document_id}) arguments_str = classifier_output.additional_kwargs['function_call']['arguments'] print("This is the arguments string", arguments_str) arguments_dict = json.loads(arguments_str) @@ -68,5 +72,33 @@ def classify_retrieval(): # classify documents according to type of document -def classify_call(): - pass \ No newline at end of file +async def classify_call(query, context, document_types): + + llm = ChatOpenAI(temperature=0, model=config.model) + prompt_classify = ChatPromptTemplate.from_template( + """You are a classifier. Determine what document types are relevant : {query}, Context: {context}, Book_types:{document_types}""" + ) + json_structure = [{ + "name": "classifier", + "description": "Classification", + "parameters": { + "type": "object", + "properties": { + "DocumentCategory": { + "type": "string", + "description": "The classification of documents in groups such as legal, medical, etc." + } + + + }, "required": ["DocumentCategory"] } + }] + chain_filter = prompt_classify | llm.bind(function_call={"name": "classifier"}, functions=json_structure) + classifier_output = await chain_filter.ainvoke({"query": query, "context": context, "document_types": document_types}) + arguments_str = classifier_output.additional_kwargs['function_call']['arguments'] + print("This is the arguments string", arguments_str) + arguments_dict = json.loads(arguments_str) + classfier_value = arguments_dict.get('summarizer', None) + + print("This is the classifier value", classfier_value) + + return classfier_value \ No newline at end of file diff --git a/level_4/cognitive_architecture/create_database.py b/level_4/cognitive_architecture/database/create_database.py similarity index 88% rename from level_4/cognitive_architecture/create_database.py rename to level_4/cognitive_architecture/database/create_database.py index 468096d8d..578d80b15 100644 --- a/level_4/cognitive_architecture/create_database.py +++ b/level_4/cognitive_architecture/database/create_database.py @@ -3,8 +3,15 @@ # # Get the parent directory of your script and add it to sys.path # parent_dir = os.path.dirname(script_dir) # sys.path.append(parent_dir) +from postgres.models import memory +from postgres.models import metadatas +from postgres.models import operation +from postgres.models import sessions +from postgres.models import user +from postgres.models import docs -from database.postgres.database import Base + +from postgres.database import Base from sqlalchemy import create_engine, text import psycopg2 @@ -62,6 +69,8 @@ if __name__ == "__main__": engine = create_admin_engine(username, password, host, database_name) + print(Base.metadata.tables) + if not database_exists(username, password, host, database_name): print(f"Database {database_name} does not exist. Creating...") create_database(username, password, host, database_name) diff --git a/level_4/cognitive_architecture/database/graph_database/graph.py b/level_4/cognitive_architecture/database/graph_database/graph.py index 1fe6aa287..516ca6439 100644 --- a/level_4/cognitive_architecture/database/graph_database/graph.py +++ b/level_4/cognitive_architecture/database/graph_database/graph.py @@ -2,7 +2,7 @@ # from pydantic_settings import BaseSettings # from marvin import ai_classifier # marvin.settings.openai.api_key = os.environ.get("OPENAI_API_KEY") - +import logging import os print(os.getcwd()) @@ -430,8 +430,34 @@ class Neo4jGraphDB(AbstractGraphDB): return create_statements + async def get_document_categories(self, user_id: str): + """ + Retrieve a list of categories for all documents associated with a given user. + This function executes a Cypher query in a Neo4j database to fetch the categories + of all 'Document' nodes that are linked to the 'SemanticMemory' node of the specified user. + Parameters: + - session (AsyncSession): The database session for executing the query. + - user_id (str): The unique identifier of the user. + + Returns: + - List[str]: A list of document categories associated with the user. + + Raises: + - Exception: If an error occurs during the database query execution. + """ + try: + query = f''' + MATCH (user:User {{userId: {user_id} }})-[:HAS_SEMANTIC_MEMORY]->(semantic:SemanticMemory)-[:HAS_DOCUMENT]->(document:Document) + RETURN document.documentCategory AS category + ''' + result = self.query(query) + categories = [record["category"] for record in result] + return categories + except Exception as e: + logging.error(f"An error occurred while retrieving document categories: {str(e)}") + return None def create_document_node_cypher(self, document_summary: dict, user_id: str) -> str: """ @@ -483,10 +509,10 @@ class Neo4jGraphDB(AbstractGraphDB): return cypher_query - def update_document_node_with_namespace(self, user_id: str, vectordb_namespace: str, document_title: str): + def update_document_node_with_namespace(self, user_id: str, vectordb_namespace: str, document_id: str): # Generate the Cypher query cypher_query = f''' - MATCH (user:User {{userId: '{user_id}' }})-[:HAS_SEMANTIC_MEMORY]->(semantic:SemanticMemory)-[:HAS_DOCUMENT]->(document:Document {{title: '{document_title}'}}) + MATCH (user:User {{userId: '{user_id}' }})-[:HAS_SEMANTIC_MEMORY]->(semantic:SemanticMemory)-[:HAS_DOCUMENT]->(document:Document {{d_id: '{document_id}'}}) SET document.vectordbNamespace = '{vectordb_namespace}' RETURN document ''' @@ -494,9 +520,35 @@ class Neo4jGraphDB(AbstractGraphDB): return cypher_query + def get_namespaces_by_document_category(self, user_id: str, category: str): + """ + Retrieve a list of Vectordb namespaces for documents of a specified category associated with a given user. + This function executes a Cypher query in a Neo4j database to fetch the 'vectordbNamespace' of all 'Document' nodes + that are linked to the 'SemanticMemory' node of the specified user and belong to the specified category. + Parameters: + - user_id (str): The unique identifier of the user. + - category (str): The category to filter the documents by. + Returns: + - List[str]: A list of Vectordb namespaces for documents in the specified category. + + Raises: + - Exception: If an error occurs during the database query execution. + """ + try: + query = f''' + MATCH (user:User {{userId: '{user_id}'}})-[:HAS_SEMANTIC_MEMORY]->(semantic:SemanticMemory)-[:HAS_DOCUMENT]->(document:Document) + WHERE document.documentCategory = '{category}' + RETURN document.vectordbNamespace AS namespace + ''' + result = self.query(query) + namespaces = [record["namespace"] for record in result] + return namespaces + except Exception as e: + logging.error(f"An error occurred while retrieving namespaces by document category: {str(e)}") + return None class NetworkXGraphDB(AbstractGraphDB): diff --git a/level_4/cognitive_architecture/database/postgres/database_crud.py b/level_4/cognitive_architecture/database/postgres/database_crud.py index 4e8b755af..3f713acb0 100644 --- a/level_4/cognitive_architecture/database/postgres/database_crud.py +++ b/level_4/cognitive_architecture/database/postgres/database_crud.py @@ -5,8 +5,6 @@ from .models.sessions import Session from .models.memory import MemoryModel from .models.user import User from .models.operation import Operation -from .models.testset import TestSet -from .models.testoutput import TestOutput from .models.metadatas import MetaDatas from .models.docs import DocsModel diff --git a/level_4/cognitive_architecture/database/postgres/models/operation.py b/level_4/cognitive_architecture/database/postgres/models/operation.py index 39858503d..c9375ea61 100644 --- a/level_4/cognitive_architecture/database/postgres/models/operation.py +++ b/level_4/cognitive_architecture/database/postgres/models/operation.py @@ -13,14 +13,12 @@ class Operation(Base): user_id = Column(String, ForeignKey('users.id'), index=True) # Link to User operation_type = Column(String, nullable=True) operation_status = Column(String, nullable=True) - test_set_id = Column(String, ForeignKey('test_sets.id'), index=True) created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, onupdate=datetime.utcnow) memories = relationship("MemoryModel", back_populates="operation") # Relationships user = relationship("User", back_populates="operations") - test_set = relationship("TestSet", back_populates="operations") docs = relationship("DocsModel", back_populates="operations") def __repr__(self): diff --git a/level_4/cognitive_architecture/database/postgres/models/testoutput.py b/level_4/cognitive_architecture/database/postgres/models/testoutput.py deleted file mode 100644 index ae7dd7681..000000000 --- a/level_4/cognitive_architecture/database/postgres/models/testoutput.py +++ /dev/null @@ -1,44 +0,0 @@ -# test_output.py -import os -import sys -sys.path.append(os.path.dirname(os.path.abspath(__file__))) - -from datetime import datetime -from sqlalchemy import Column, String, DateTime, ForeignKey, JSON -from sqlalchemy.orm import relationship -import os -import sys -from ..database import Base - -class TestOutput(Base): - """ - Represents the output result of a specific test set. - """ - __tablename__ = 'test_outputs' - - set_id = Column(String, primary_key=True) - id = Column(String, nullable=True) - user_id = Column(String, ForeignKey('users.id'), index=True) # Added user_id field - test_set_id = Column(String, ForeignKey('test_sets.id'), index=True) - operation_id = Column(String, ForeignKey('operations.id'), index=True) - test_params= Column(String, nullable=True) - test_result = Column(String, nullable=True) - test_score = Column(String, nullable=True) - test_metric_name = Column(String, nullable=True) - test_query = Column(String, nullable=True) - test_output = Column(String, nullable=True) - test_expected_output = Column(String, nullable=True) - test_context = Column(String, nullable=True) - number_of_memories = Column(String, nullable=True) - - test_results = Column(JSON, nullable=True) - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, onupdate=datetime.utcnow) - - # Relationships - user = relationship("User", back_populates="test_outputs") # Added relationship with User - test_set = relationship("TestSet", back_populates="test_outputs") - operation = relationship("Operation", backref="test_outputs") - - def __repr__(self): - return f"" diff --git a/level_4/cognitive_architecture/database/postgres/models/testset.py b/level_4/cognitive_architecture/database/postgres/models/testset.py deleted file mode 100644 index f3de61aa9..000000000 --- a/level_4/cognitive_architecture/database/postgres/models/testset.py +++ /dev/null @@ -1,24 +0,0 @@ -# test_set.py -from datetime import datetime -from sqlalchemy import Column, String, DateTime, ForeignKey -from sqlalchemy.orm import relationship -import os -import sys -from ..database import Base -class TestSet(Base): - __tablename__ = 'test_sets' - - id = Column(String, primary_key=True) - user_id = Column(String, ForeignKey('users.id'), index=True) # Ensure uniqueness - - content = Column(String, nullable=True) - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, onupdate=datetime.utcnow) - - user = relationship("User", back_populates="test_sets") - operations = relationship("Operation", back_populates="test_set") - - test_outputs = relationship("TestOutput", back_populates="test_set", cascade="all, delete-orphan") - - def __repr__(self): - return f"" diff --git a/level_4/cognitive_architecture/database/postgres/models/user.py b/level_4/cognitive_architecture/database/postgres/models/user.py index 45f71e3f5..b23beb8f5 100644 --- a/level_4/cognitive_architecture/database/postgres/models/user.py +++ b/level_4/cognitive_architecture/database/postgres/models/user.py @@ -20,8 +20,6 @@ class User(Base): memories = relationship("MemoryModel", back_populates="user", cascade="all, delete-orphan") operations = relationship("Operation", back_populates="user", cascade="all, delete-orphan") sessions = relationship("Session", back_populates="user", cascade="all, delete-orphan") - test_sets = relationship("TestSet", back_populates="user", cascade="all, delete-orphan") - test_outputs = relationship("TestOutput", back_populates="user", cascade="all, delete-orphan") metadatas = relationship("MetaDatas", back_populates="user") def __repr__(self): diff --git a/level_4/cognitive_architecture/database/vectordb/basevectordb.py b/level_4/cognitive_architecture/database/vectordb/basevectordb.py index c9b474b72..230e78c18 100644 --- a/level_4/cognitive_architecture/database/vectordb/basevectordb.py +++ b/level_4/cognitive_architecture/database/vectordb/basevectordb.py @@ -16,14 +16,12 @@ from langchain.retrievers import WeaviateHybridSearchRetriever from weaviate.gql.get import HybridFusion -from database.postgres.models.sessions import Session -from database.postgres.models.testset import TestSet -from database.postgres.models.testoutput import TestOutput -from database.postgres.models.metadatas import MetaDatas -from database.postgres.models.operation import Operation -from database.postgres.models.docs import DocsModel +from cognitive_architecture.database.postgres.models.sessions import Session +from cognitive_architecture.database.postgres.models.metadatas import MetaDatas +from cognitive_architecture.database.postgres.models.operation import Operation +from cognitive_architecture.database.postgres.models.docs import DocsModel from sqlalchemy.orm import sessionmaker -from database.postgres.database import engine +from cognitive_architecture.database.postgres.database import engine load_dotenv() from typing import Optional import time @@ -33,7 +31,7 @@ tracemalloc.start() from datetime import datetime from langchain.embeddings.openai import OpenAIEmbeddings -from database.vectordb.vectordb import PineconeVectorDB, WeaviateVectorDB +from cognitive_architecture.database.vectordb.vectordb import PineconeVectorDB, WeaviateVectorDB from langchain.schema import Document import uuid import weaviate diff --git a/level_4/cognitive_architecture/database/vectordb/chunkers/chunkers.py b/level_4/cognitive_architecture/database/vectordb/chunkers/chunkers.py index b18fb86d1..1ec2525ac 100644 --- a/level_4/cognitive_architecture/database/vectordb/chunkers/chunkers.py +++ b/level_4/cognitive_architecture/database/vectordb/chunkers/chunkers.py @@ -1,7 +1,7 @@ from langchain.document_loaders import PyPDFLoader import sys, os -from shared.chunk_strategy import ChunkStrategy +from cognitive_architecture.shared.chunk_strategy import ChunkStrategy import re def chunk_data(chunk_strategy=None, source_data=None, chunk_size=None, chunk_overlap=None): diff --git a/level_4/cognitive_architecture/database/vectordb/loaders/loaders.py b/level_4/cognitive_architecture/database/vectordb/loaders/loaders.py index 6da5010d7..72e064980 100644 --- a/level_4/cognitive_architecture/database/vectordb/loaders/loaders.py +++ b/level_4/cognitive_architecture/database/vectordb/loaders/loaders.py @@ -3,7 +3,7 @@ import fitz import os import sys -from database.vectordb.chunkers.chunkers import chunk_data +from cognitive_architecture.database.vectordb.chunkers.chunkers import chunk_data from langchain.document_loaders import UnstructuredURLLoader from langchain.document_loaders import DirectoryLoader @@ -17,8 +17,7 @@ async def _document_loader( observation: str, loader_settings: dict): loader_strategy = loader_settings.get("strategy", "VANILLA") chunk_size = loader_settings.get("chunk_size", 500) chunk_overlap = loader_settings.get("chunk_overlap", 20) - import logging - import os + logging.info("LOADER SETTINGS %s", loader_settings) diff --git a/level_4/cognitive_architecture/database/vectordb/vectordb.py b/level_4/cognitive_architecture/database/vectordb/vectordb.py index 380107f00..bccbfdc1d 100644 --- a/level_4/cognitive_architecture/database/vectordb/vectordb.py +++ b/level_4/cognitive_architecture/database/vectordb/vectordb.py @@ -4,7 +4,7 @@ import logging from langchain.text_splitter import RecursiveCharacterTextSplitter from marshmallow import Schema, fields -from database.vectordb.loaders.loaders import _document_loader +from cognitive_architecture.database.vectordb.loaders.loaders import _document_loader # Add the parent directory to sys.path diff --git a/level_4/cognitive_architecture/fetch_secret.py b/level_4/cognitive_architecture/fetch_secret.py index 6c422d1af..c9c070035 100644 --- a/level_4/cognitive_architecture/fetch_secret.py +++ b/level_4/cognitive_architecture/fetch_secret.py @@ -1,6 +1,17 @@ import os from dotenv import load_dotenv -from api import start_api_server +import os +import sys + +# Get the directory that contains your script +current_dir = os.path.dirname(os.path.abspath(__file__)) + +# Get the parent directory +parent_dir = os.path.dirname(current_dir) + +# Add the parent directory to sys.path +sys.path.insert(0, parent_dir) + # API_ENABLED = os.environ.get("API_ENABLED", "False").lower() == "true" import boto3 diff --git a/level_4/cognitive_architecture/vectorstore_manager.py b/level_4/cognitive_architecture/vectorstore_manager.py index 4d59b920d..5cc2c6cff 100644 --- a/level_4/cognitive_architecture/vectorstore_manager.py +++ b/level_4/cognitive_architecture/vectorstore_manager.py @@ -9,12 +9,14 @@ import os print(os.getcwd()) -from database.postgres.models.user import User -from database.postgres.models.memory import MemoryModel + + +from cognitive_architecture.database.postgres.models.user import User +from cognitive_architecture.database.postgres.models.memory import MemoryModel import ast import tracemalloc -from database.postgres.database_crud import add_entity +from cognitive_architecture.database.postgres.database_crud import add_entity tracemalloc.start() @@ -26,9 +28,9 @@ load_dotenv() -from database.vectordb.basevectordb import BaseMemory +from cognitive_architecture.database.vectordb.basevectordb import BaseMemory -from config import Config +from cognitive_architecture.config import Config config = Config() config.load() diff --git a/level_4/cognitive_architecture/entrypoint.sh b/level_4/entrypoint.sh similarity index 81% rename from level_4/cognitive_architecture/entrypoint.sh rename to level_4/entrypoint.sh index 4cf551d01..300813101 100755 --- a/level_4/cognitive_architecture/entrypoint.sh +++ b/level_4/entrypoint.sh @@ -2,14 +2,14 @@ export ENVIRONMENT # Run Python scripts with error handling echo "Running fetch_secret.py" -python fetch_secret.py +python cognitive_architecture/fetch_secret.py if [ $? -ne 0 ]; then echo "Error: fetch_secret.py failed" exit 1 fi echo "Running create_database.py" -python create_database.py +python cognitive_architecture/database/create_database.py if [ $? -ne 0 ]; then echo "Error: create_database.py failed" exit 1 diff --git a/level_4/main.py b/level_4/main.py index 57517758f..573c11e35 100644 --- a/level_4/main.py +++ b/level_4/main.py @@ -1,5 +1,7 @@ # from marvin import ai_classifier # marvin.settings.openai.api_key = os.environ.get("OPENAI_API_KEY") +from pydantic import BaseModel + from cognitive_architecture.database.graph_database.graph import Neo4jGraphDB from cognitive_architecture.database.postgres.models.memory import MemoryModel @@ -12,7 +14,16 @@ from cognitive_architecture.utils import generate_letter_uuid import instructor from openai import OpenAI -from level_4.cognitive_architecture.vectorstore_manager import Memory +from cognitive_architecture.vectorstore_manager import Memory +from cognitive_architecture.database.postgres.database_crud import fetch_job_id +import uuid +from cognitive_architecture.database.postgres.models.sessions import Session +from cognitive_architecture.database.postgres.models.operation import Operation +from cognitive_architecture.database.postgres.database_crud import session_scope, add_entity, update_entity, fetch_job_id +from cognitive_architecture.database.postgres.models.metadatas import MetaDatas + +from cognitive_architecture.database.postgres.models.docs import DocsModel +from cognitive_architecture.database.postgres.models.memory import MemoryModel # Adds response_model to ChatCompletion # Allows the return of Pydantic model rather than raw JSON @@ -59,6 +70,29 @@ async def get_vectordb_document_name(session: AsyncSession, user_id: str): logging.error(f"An error occurred while retrieving the Vectordb_namespace: {str(e)}") return None +from sqlalchemy.orm import selectinload + +async def get_vectordb_data(session: AsyncSession, user_id: str): + try: + # Query to join Operations with MemoryModel and DocsModel + result = await session.execute( + select(Operation) + .options(selectinload(Operation.memories), selectinload(Operation.docs)) + .where((Operation.user_id == user_id) & (Operation.operation_status == 'SUCCESS')) + .order_by(Operation.created_at.desc()) + ) + operations = result.scalars().all() + + # Extract memory names and document names and IDs + memory_names = [memory.memory_name for op in operations for memory in op.memories] + docs = [(doc.doc_name, doc.id) for op in operations for doc in op.docs] + + return memory_names, docs + except Exception as e: + logging.error(f"An error occurred while retrieving Vectordb data: {str(e)}") + return None, None + + # async def retrieve_job_by_id(session, user_id, job_id): # try: # result = await session.execute( @@ -74,39 +108,44 @@ async def get_vectordb_document_name(session: AsyncSession, user_id: str): -async def update_document_vectordb_namespace(postgres_session: AsyncSession, user_id: str, namespace: str = None, job_id:str=None): - """ - Update the Document node with the Vectordb_namespace for the given user. If the namespace is not provided, - it will be retrieved from the PostgreSQL database. - - Args: - postgres_session (AsyncSession): The async session for connecting to the PostgreSQL database. - user_id (str): The user's unique identifier. - namespace (str, optional): The Vectordb_namespace. If None, it will be retrieved from the database. - - Returns: - The result of the update operation or None if an error occurred. - """ - vectordb_namespace = namespace - - # Retrieve namespace from the database if not provided - if vectordb_namespace is None: - vectordb_namespace = await get_vectordb_namespace(postgres_session, user_id) - if not vectordb_namespace: - logging.error("Vectordb_namespace could not be retrieved.") - return None - from cognitive_architecture.database.graph_database.graph import Neo4jGraphDB - # Example initialization (replace with your actual connection details) - neo4j_graph_db = Neo4jGraphDB(url='bolt://localhost:7687', username='neo4j', password='pleaseletmein') - results = [] - for namespace in vectordb_namespace: - update_result = neo4j_graph_db.update_document_node_with_namespace(user_id, namespace) - results.append(update_result) - return results +# async def update_document_vectordb_namespace(postgres_session: AsyncSession, user_id: str, namespace: str = None, job_id:str=None): +# """ +# Update the Document node with the Vectordb_namespace for the given user. If the namespace is not provided, +# it will be retrieved from the PostgreSQL database. +# +# Args: +# postgres_session (AsyncSession): The async session for connecting to the PostgreSQL database. +# user_id (str): The user's unique identifier. +# namespace (str, optional): The Vectordb_namespace. If None, it will be retrieved from the database. +# +# Returns: +# The result of the update operation or None if an error occurred. +# """ +# vectordb_namespace = namespace +# +# # Retrieve namespace from the database if not provided +# if vectordb_namespace is None: +# vectordb_namespace = await get_vectordb_namespace(postgres_session, user_id) +# if not vectordb_namespace: +# logging.error("Vectordb_namespace could not be retrieved.") +# return None +# from cognitive_architecture.database.graph_database.graph import Neo4jGraphDB +# # Example initialization (replace with your actual connection details) +# neo4j_graph_db = Neo4jGraphDB(url='bolt://localhost:7687', username='neo4j', password='pleaseletmein') +# results = [] +# for namespace in vectordb_namespace: +# update_result = neo4j_graph_db.update_document_node_with_namespace(user_id, namespace) +# results.append(update_result) +# return results async def load_documents_to_vectorstore(session: AsyncSession, user_id: str, job_id:str=None, loader_settings:dict=None): + namespace_id = str(generate_letter_uuid()) + "_" + "SEMANTICMEMORY" + namespace_class = namespace_id + "_class" + + memory = await Memory.create_memory(user_id, session, namespace=namespace_id, memory_label=namespace_id) + if job_id is None: job_id = str(uuid.uuid4()) @@ -117,7 +156,6 @@ async def load_documents_to_vectorstore(session: AsyncSession, user_id: str, job user_id=user_id, operation_status="RUNNING", operation_type="DATA_LOAD", - test_set_id="none", ), ) @@ -131,15 +169,14 @@ async def load_documents_to_vectorstore(session: AsyncSession, user_id: str, job doc_name=doc ) ) - namespace_id = str(generate_letter_uuid()) + "_" + "SEMANTICMEMORY" - namespace_class = namespace_id + "_class" - memory = await Memory.create_memory(user_id, session, namespace=namespace_class) + # Managing memory attributes existing_user = await Memory.check_existing_user(user_id, session) print("here is the existing user", existing_user) await memory.manage_memory_attributes(existing_user) + params = { "version": "1.0", "agreement_id": "AG123456", @@ -153,11 +190,23 @@ async def load_documents_to_vectorstore(session: AsyncSession, user_id: str, job "validity_start": "2023-08-01", "validity_end": "2024-07-31", } + print("Namespace id is %s", namespace_id) + await memory.add_dynamic_memory_class(namespace_id.lower(), namespace_id) dynamic_memory_class = getattr(memory, namespace_class.lower(), None) - await memory.add_dynamic_memory_class(dynamic_memory_class, namespace_class) - await memory.add_method_to_class(dynamic_memory_class, "add_memories") + methods_to_add = ["add_memories", "fetch_memories", "delete_memories"] + + if dynamic_memory_class is not None: + for method_name in methods_to_add: + await memory.add_method_to_class(dynamic_memory_class, method_name) + print(f"Memory method {method_name} has been added") + else: + print(f"No attribute named in memory.") + + print("Available memory classes:", await memory.list_memory_classes()) + + # await memory.add_method_to_class(dynamic_memory_class, "add_memories") # await memory.add_method_to_class(memory.semanticmemory_class, "fetch_memories") sss = await memory.dynamic_method_call(dynamic_memory_class, 'add_memories', observation='some_observation', params=params, loader_settings=loader_settings) @@ -166,9 +215,8 @@ async def load_documents_to_vectorstore(session: AsyncSession, user_id: str, job Operation( id=job_id, user_id=user_id, - operation_status="FINISHED", + operation_status="SUCCESS", operation_type="DATA_LOAD", - test_set_id="none", ), ) @@ -183,7 +231,6 @@ async def user_query_to_graph_db(session: AsyncSession, user_id: str, query_inpu user_id=user_id, operation_status="RUNNING", operation_type="USER_QUERY_TO_GRAPH_DB", - test_set_id="none", ), ) @@ -199,7 +246,6 @@ async def user_query_to_graph_db(session: AsyncSession, user_id: str, query_inpu user_id=user_id, operation_status="SUCCESS", operation_type="USER_QUERY_TO_GRAPH_DB", - test_set_id="none", ), ) return result @@ -210,21 +256,104 @@ async def user_query_to_graph_db(session: AsyncSession, user_id: str, query_inpu async def add_documents_to_graph_db(postgres_session: AsyncSession, user_id: str): """""" try: - await update_document_vectordb_namespace(postgres_session, user_id) + # await update_document_vectordb_namespace(postgres_session, user_id) from cognitive_architecture.classifiers.classifier import classify_documents + memory_names, docs = await get_vectordb_data(postgres_session, user_id) - classification = await classify_documents("Lord of the Rings") - neo4j_graph_db = Neo4jGraphDB(url=config.graph_database_url, username=config.graph_database_username, - password=config.graph_database_password) - rs = neo4j_graph_db.create_document_node_cypher(classification, user_id) - neo4j_graph_db.query(rs, classification) - namespace_title_dict = get_vectordb_namespace #fix this - neo4j_graph_db.update_document_node_with_namespace(user_id, namespace=, document_title="Lord of the Rings") + for doc, memory_name in zip(docs, memory_names): + + + doc_name, doc_id = doc + + + classification = await classify_documents(doc_name, document_id=doc_id) + neo4j_graph_db = Neo4jGraphDB(url=config.graph_database_url, username=config.graph_database_username, + password=config.graph_database_password) + rs = neo4j_graph_db.create_document_node_cypher(classification, user_id) + neo4j_graph_db.query(rs, classification) + + # select doc from the store + neo4j_graph_db.update_document_node_with_namespace(user_id, vectordb_namespace=memory_name, document_id=doc_id) except: pass + +async def user_context_enrichment(session, user_id, query): + """""" + neo4j_graph_db = Neo4jGraphDB(url=config.graph_database_url, username=config.graph_database_username, + password=config.graph_database_password) + + semantic_mem = neo4j_graph_db.retrieve_semantic_memory(user_id=user_id) + episodic_mem = neo4j_graph_db.retrieve_episodic_memory(user_id=user_id) + context = f""" You are a memory system that uses cognitive architecture to enrich the user context. + You have access to the following information: + EPISODIC MEMORY: {episodic_mem} + SEMANTIC MEMORY: {semantic_mem} + PROCEDURAL MEMORY: NULL + The original user query: {query} + """ + + classifier_prompt = """ Your task is to determine if any of the following document categories are relevant to the user query and context: {query} Context is: {context} + Document categories: {document_categories}""" + from cognitive_architecture.classifiers.classifier import classify_call + + document_categories = neo4j_graph_db.get_document_categories(user_id=user_id) + relevant_categories = await classify_call( query, context, document_types=document_categories) + fetch_namespace_from_graph = neo4j_graph_db.get_namespaces_by_document_category(user_id=user_id, category=relevant_categories) + + results = [] + + for namespace in fetch_namespace_from_graph: + memory = await Memory.create_memory(user_id, session, namespace=namespace) + + # Managing memory attributes + existing_user = await Memory.check_existing_user(user_id, session) + print("here is the existing user", existing_user) + await memory.manage_memory_attributes(existing_user) + namespace_class = namespace + memory = await Memory.create_memory(user_id, session, namespace=namespace_class) + + # Managing memory attributes + existing_user = await Memory.check_existing_user(user_id, session) + print("here is the existing user", existing_user) + await memory.manage_memory_attributes(existing_user) + + + dynamic_memory_class = getattr(memory, namespace_class.lower(), None) + + await memory.add_dynamic_memory_class(dynamic_memory_class, namespace_class) + await memory.add_method_to_class(dynamic_memory_class, "fetch_memories") + raw_document_output = await memory.dynamic_method_call( + memory.semanticmemory_class, + "fetch_memories", + observation=context, + ) + from openai import OpenAI + import instructor + + # Enables `response_model` + client = instructor.patch(OpenAI()) + + format_query_via_gpt = f""" Provide an answer to the user query: {query} Context is: {context}, Document store information is {raw_document_output} """ + + class UserResponse(BaseModel): + response: str + + + user = client.chat.completions.create( + model=config.model, + response_model=UserResponse, + messages=[ + {"role": "user", "content": format_query_via_gpt}, + ] + ) + + results.append(user.response) + + return results + # query_input = "I walked in the forest yesterday and added to my list I need to buy some milk in the store" # # # Generate the knowledge graph from the user input @@ -283,16 +412,7 @@ async def add_documents_to_graph_db(postgres_session: AsyncSession, user_id: str # } # # execute_cypher_query(rs, parameters) -from cognitive_architecture.database.postgres.database_crud import fetch_job_id -import uuid -from cognitive_architecture.database.postgres.models.sessions import Session -from cognitive_architecture.database.postgres.models.operation import Operation -from cognitive_architecture.database.postgres.database_crud import session_scope, add_entity, update_entity, fetch_job_id -from cognitive_architecture.database.postgres.models.metadatas import MetaDatas -from cognitive_architecture.database.postgres.models.testset import TestSet -from cognitive_architecture.database.postgres.models.testoutput import TestOutput -from cognitive_architecture.database.postgres.models.docs import DocsModel -from cognitive_architecture.database.postgres.models.memory import MemoryModel + async def main(): user_id = "user" @@ -327,7 +447,7 @@ async def main(): # await update_document_vectordb_namespace(session, user_id) # from cognitive_architecture.graph_database.graph import Neo4jGraphDB # # Example initialization (replace with your actual connection details) - neo4j_graph_db = Neo4jGraphDB(url='bolt://localhost:7687', username='neo4j', password='pleaseletmein') + # neo4j_graph_db = Neo4jGraphDB(url='bolt://localhost:7687', username='neo4j', password='pleaseletmein') # # Generate the Cypher query for a specific user # user_id = 'user123' # Replace with the actual user ID #cypher_query = await neo4j_graph_db.generate_cypher_query_for_user_prompt_decomposition(user_id,"I walked in the forest yesterday and added to my list I need to buy some milk in the store") @@ -356,11 +476,11 @@ async def main(): # neo4j_graph_db.query(rs, call_of_the_wild_summary) # print(cypher_query) - from cognitive_architecture.classifiers.classifier import classify_documents - - ff = await classify_documents("Lord of the Rings") - - print(ff) + # from cognitive_architecture.classifiers.classifier import classify_documents + # + # ff = await classify_documents("Lord of the Rings") + # + # print(ff) # vector_db_namespaces = await get_vectordb_namespace(session, user_id) # @@ -400,8 +520,9 @@ async def main(): loader_settings = { "format": "PDF", "source": "URL", - "path": "https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf", + "path": ["https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf"], } + await load_documents_to_vectorstore(session, user_id, loader_settings=loader_settings) # memory_instance = Memory(namespace='SEMANTICMEMORY') # sss = await memory_instance.dynamic_method_call(memory_instance.semantic_memory_class, 'fetch_memories', observation='some_observation') # from cognitive_architecture.vectorstore_manager import Memory