diff --git a/level_3/api.py b/level_3/api.py index 9eb64b3e8..10c640d0b 100644 --- a/level_3/api.py +++ b/level_3/api.py @@ -11,6 +11,7 @@ from level_3.database.database import AsyncSessionLocal from level_3.database.database_crud import session_scope from vectorstore_manager import Memory from dotenv import load_dotenv + # Set up logging logging.basicConfig( level=logging.INFO, # Set the logging level (e.g., DEBUG, INFO, WARNING, ERROR, CRITICAL) @@ -20,20 +21,23 @@ logging.basicConfig( logger = logging.getLogger(__name__) - load_dotenv() OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") app = FastAPI(debug=True) from auth.cognito.JWTBearer import JWTBearer from auth.auth import jwks + auth = JWTBearer(jwks) from fastapi import Depends + + class ImageResponse(BaseModel): success: bool message: str + @app.get( "/", ) @@ -43,15 +47,19 @@ async def root(): """ return {"message": "Hello, World, I am alive!"} + @app.get("/health") def health_check(): """ Health check endpoint that returns the server status. """ return {"status": "OK"} + + class Payload(BaseModel): payload: Dict[str, Any] + def memory_factory(memory_type): load_dotenv() @@ -67,24 +75,34 @@ def memory_factory(memory_type): logging.info(" Adding to Memory ") decoded_payload = payload.payload async with session_scope(session=AsyncSessionLocal()) as session: - - memory = await Memory.create_memory(decoded_payload["user_id"], session, namespace='SEMANTICMEMORY') + memory = await Memory.create_memory( + decoded_payload["user_id"], session, namespace="SEMANTICMEMORY" + ) # Adding a memory instance await memory.add_memory_instance(decoded_payload["memory_object"]) # Managing memory attributes - existing_user = await Memory.check_existing_user(decoded_payload["user_id"], session) + existing_user = await Memory.check_existing_user( + decoded_payload["user_id"], session + ) await memory.manage_memory_attributes(existing_user) - await memory.add_dynamic_memory_class(decoded_payload["memory_object"], decoded_payload["memory_object"].upper()) + await memory.add_dynamic_memory_class( + decoded_payload["memory_object"], + decoded_payload["memory_object"].upper(), + ) memory_class = decoded_payload["memory_object"] + "_class" dynamic_memory_class = getattr(memory, memory_class.lower(), None) - await memory.add_method_to_class(dynamic_memory_class, 'add_memories') + await memory.add_method_to_class(dynamic_memory_class, "add_memories") # await memory.add_method_to_class(memory.semanticmemory_class, 'fetch_memories') - output = await memory.dynamic_method_call(dynamic_memory_class, 'add_memories', - observation='some_observation', params=decoded_payload["params"], - loader_settings=decoded_payload["loader_settings"]) + output = await memory.dynamic_method_call( + dynamic_memory_class, + "add_memories", + observation="some_observation", + params=decoded_payload["params"], + loader_settings=decoded_payload["loader_settings"], + ) return JSONResponse(content={"response": output}, status_code=200) except Exception as e: @@ -101,23 +119,32 @@ def memory_factory(memory_type): logging.info(" Adding to Memory ") decoded_payload = payload.payload async with session_scope(session=AsyncSessionLocal()) as session: - - memory = await Memory.create_memory(decoded_payload["user_id"], session, namespace='SEMANTICMEMORY') + memory = await Memory.create_memory( + decoded_payload["user_id"], session, namespace="SEMANTICMEMORY" + ) # Adding a memory instance await memory.add_memory_instance(decoded_payload["memory_object"]) # Managing memory attributes - existing_user = await Memory.check_existing_user(decoded_payload["user_id"], session) + existing_user = await Memory.check_existing_user( + decoded_payload["user_id"], session + ) await memory.manage_memory_attributes(existing_user) - await memory.add_dynamic_memory_class(decoded_payload["memory_object"], decoded_payload["memory_object"].upper()) + await memory.add_dynamic_memory_class( + decoded_payload["memory_object"], + decoded_payload["memory_object"].upper(), + ) memory_class = decoded_payload["memory_object"] + "_class" dynamic_memory_class = getattr(memory, memory_class.lower(), None) - await memory.add_method_to_class(dynamic_memory_class, 'add_memories') + await memory.add_method_to_class(dynamic_memory_class, "add_memories") # await memory.add_method_to_class(memory.semanticmemory_class, 'fetch_memories') - output = await memory.dynamic_method_call(dynamic_memory_class, 'fetch_memories', - observation=decoded_payload['observation']) + output = await memory.dynamic_method_call( + dynamic_memory_class, + "fetch_memories", + observation=decoded_payload["observation"], + ) return JSONResponse(content={"response": output}, status_code=200) except Exception as e: @@ -134,23 +161,34 @@ def memory_factory(memory_type): logging.info(" Adding to Memory ") decoded_payload = payload.payload async with session_scope(session=AsyncSessionLocal()) as session: - - memory = await Memory.create_memory(decoded_payload["user_id"], session, namespace='SEMANTICMEMORY') + memory = await Memory.create_memory( + decoded_payload["user_id"], session, namespace="SEMANTICMEMORY" + ) # Adding a memory instance await memory.add_memory_instance(decoded_payload["memory_object"]) # Managing memory attributes - existing_user = await Memory.check_existing_user(decoded_payload["user_id"], session) + existing_user = await Memory.check_existing_user( + decoded_payload["user_id"], session + ) await memory.manage_memory_attributes(existing_user) - await memory.add_dynamic_memory_class(decoded_payload["memory_object"], decoded_payload["memory_object"].upper()) + await memory.add_dynamic_memory_class( + decoded_payload["memory_object"], + decoded_payload["memory_object"].upper(), + ) memory_class = decoded_payload["memory_object"] + "_class" dynamic_memory_class = getattr(memory, memory_class.lower(), None) - await memory.add_method_to_class(dynamic_memory_class, 'delete_memories') + await memory.add_method_to_class( + dynamic_memory_class, "delete_memories" + ) # await memory.add_method_to_class(memory.semanticmemory_class, 'fetch_memories') - output = await memory.dynamic_method_call(dynamic_memory_class, 'delete_memories', - namespace=decoded_payload["memory_object"].upper()) + output = await memory.dynamic_method_call( + dynamic_memory_class, + "delete_memories", + namespace=decoded_payload["memory_object"].upper(), + ) return JSONResponse(content={"response": output}, status_code=200) except Exception as e: @@ -158,6 +196,7 @@ def memory_factory(memory_type): content={"response": {"error": str(e)}}, status_code=503 ) + memory_list = ["episodic", "buffer", "semantic"] for memory_type in memory_list: memory_factory(memory_type) diff --git a/level_3/rag_test_manager.py b/level_3/rag_test_manager.py index ff86ae356..1b395764e 100644 --- a/level_3/rag_test_manager.py +++ b/level_3/rag_test_manager.py @@ -20,7 +20,9 @@ logging.basicConfig(level=logging.INFO) import marvin from dotenv import load_dotenv from sqlalchemy.orm import sessionmaker -from database.database import engine # Ensure you have database engine defined somewhere +from database.database import ( + engine, +) # Ensure you have database engine defined somewhere from models.user import User from models.memory import MemoryModel from models.sessions import Session @@ -28,6 +30,7 @@ from models.testset import TestSet from models.testoutput import TestOutput from models.metadatas import MetaDatas from models.operation import Operation + load_dotenv() import ast import tracemalloc @@ -43,12 +46,13 @@ load_dotenv() OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") marvin.settings.openai.api_key = os.environ.get("OPENAI_API_KEY") -from vectordb.basevectordb import BaseMemory +from vectordb.basevectordb import BaseMemory from vectorstore_manager import Memory import asyncio from database.database_crud import session_scope from database.database import AsyncSessionLocal + openai.api_key = os.getenv("OPENAI_API_KEY", "") @@ -58,13 +62,19 @@ async def retrieve_latest_test_case(session, user_id, memory_id): result = await session.execute( session.query(TestSet.attributes_list) .filter_by(user_id=user_id, memory_id=memory_id) - .order_by(TestSet.created_at).first() + .order_by(TestSet.created_at) + .first() ) - return result.scalar_one_or_none() # scalar_one_or_none() is a non-blocking call + return ( + result.scalar_one_or_none() + ) # scalar_one_or_none() is a non-blocking call except Exception as e: - logging.error(f"An error occurred while retrieving the latest test case: {str(e)}") + logging.error( + f"An error occurred while retrieving the latest test case: {str(e)}" + ) return None + async def add_entity(session, entity): async with session_scope(session) as s: # Use your async session_scope s.add(entity) # No need to commit; session_scope takes care of it @@ -72,6 +82,7 @@ async def add_entity(session, entity): return "Successfully added entity" + async def retrieve_job_by_id(session, user_id, job_id): try: result = await session.execute( @@ -84,12 +95,14 @@ async def retrieve_job_by_id(session, user_id, job_id): logging.error(f"An error occurred while retrieving the job: {str(e)}") return None + async def fetch_job_id(session, user_id=None, memory_id=None, job_id=None): try: result = await session.execute( session.query(Session.id) .filter_by(user_id=user_id, id=job_id) - .order_by(Session.created_at).first() + .order_by(Session.created_at) + .first() ) return result.scalar_one_or_none() except Exception as e: @@ -103,16 +116,24 @@ async def fetch_test_set_id(session, user_id, id): result = await session.execute( session.query(TestSet.id) .filter_by(user_id=user_id, id=id) - .order_by(TestSet.created_at).desc().first() + .order_by(TestSet.created_at) + .desc() + .first() ) - return result.scalar_one_or_none() # scalar_one_or_none() is a non-blocking call + return ( + result.scalar_one_or_none() + ) # scalar_one_or_none() is a non-blocking call except Exception as e: logging.error(f"An error occurred while retrieving the test set: {str(e)}") return None + # Adding "embeddings" to the parameter variants function -def generate_param_variants(base_params=None, increments=None, ranges=None, included_params=None): + +def generate_param_variants( + base_params=None, increments=None, ranges=None, included_params=None +): """Generate parameter variants for testing. Args: @@ -128,22 +149,22 @@ def generate_param_variants(base_params=None, increments=None, ranges=None, incl # Default values defaults = { - 'chunk_size': 250, - 'chunk_overlap': 20, - 'similarity_score': 0.5, - 'metadata_variation': 0, - 'search_type': 'hybrid', - 'embeddings': 'openai' # Default value added for 'embeddings' + "chunk_size": 250, + "chunk_overlap": 20, + "similarity_score": 0.5, + "metadata_variation": 0, + "search_type": "hybrid", + "embeddings": "openai", # Default value added for 'embeddings' } # Update defaults with provided base parameters params = {**defaults, **(base_params or {})} default_increments = { - 'chunk_size': 150, - 'chunk_overlap': 10, - 'similarity_score': 0.1, - 'metadata_variation': 1 + "chunk_size": 150, + "chunk_overlap": 10, + "similarity_score": 0.1, + "metadata_variation": 1, } # Update default increments with provided increments @@ -151,10 +172,10 @@ def generate_param_variants(base_params=None, increments=None, ranges=None, incl # Default ranges default_ranges = { - 'chunk_size': 2, - 'chunk_overlap': 2, - 'similarity_score': 2, - 'metadata_variation': 2 + "chunk_size": 2, + "chunk_overlap": 2, + "similarity_score": 2, + "metadata_variation": 2, } # Update default ranges with provided ranges @@ -162,22 +183,43 @@ def generate_param_variants(base_params=None, increments=None, ranges=None, incl # Generate parameter variant ranges param_ranges = { - key: [params[key] + i * increments.get(key, 1) for i in range(ranges.get(key, 1))] - for key in ['chunk_size', 'chunk_overlap', 'similarity_score', 'metadata_variation'] + key: [ + params[key] + i * increments.get(key, 1) for i in range(ranges.get(key, 1)) + ] + for key in [ + "chunk_size", + "chunk_overlap", + "similarity_score", + "metadata_variation", + ] } # Add search_type and embeddings with possible values - param_ranges['search_type'] = ['text', 'hybrid', 'bm25', 'generate', 'generate_grouped'] - param_ranges['embeddings'] = ['openai', 'cohere', 'huggingface'] # Added 'embeddings' values + param_ranges["search_type"] = [ + "text", + "hybrid", + "bm25", + "generate", + "generate_grouped", + ] + param_ranges["embeddings"] = [ + "openai", + "cohere", + "huggingface", + ] # Added 'embeddings' values # Filter param_ranges based on included_params if included_params is not None: - param_ranges = {key: val for key, val in param_ranges.items() if key in included_params} + param_ranges = { + key: val for key, val in param_ranges.items() if key in included_params + } # Generate all combinations of parameter variants keys = param_ranges.keys() values = param_ranges.values() - param_variants = [dict(zip(keys, combination)) for combination in itertools.product(*values)] + param_variants = [ + dict(zip(keys, combination)) for combination in itertools.product(*values) + ] return param_variants @@ -185,27 +227,60 @@ def generate_param_variants(base_params=None, increments=None, ranges=None, incl # Generate parameter variants and display a sample of the generated combinations -async def generate_chatgpt_output(query:str, context:str=None): - response = openai.ChatCompletion.create( - model="gpt-3.5-turbo", - messages=[ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "assistant", "content": f"{context}"}, - {"role": "user", "content": query} - ] - ) - llm_output = response.choices[0].message.content - # print(llm_output) - return llm_output -async def eval_test(query=None, output=None, expected_output=None, context=None, synthetic_test_set=False): +async def generate_chatgpt_output(query: str, context: str = None, api_key=None, model_name="gpt-3.5-turbo"): + """ + Generate a response from the OpenAI ChatGPT model. + + Args: + query (str): The user's query or message. + context (str, optional): Additional context for the conversation. Defaults to an empty string. + api_key (str, optional): Your OpenAI API key. If not provided, the globally configured API key will be used. + model_name (str, optional): The name of the ChatGPT model to use. Defaults to "gpt-3.5-turbo". + + Returns: + str: The response generated by the ChatGPT model. + + Raises: + Exception: If an error occurs during the API call, an error message is returned for the caller to handle. + """ + if not context: + context = "" + + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "assistant", "content": context}, + {"role": "user", "content": query}, + ] + + try: + openai.api_key = api_key if api_key else openai.api_key # Use the provided API key or the one set globally + + response = openai.ChatCompletion.create( + model=model_name, + messages=messages, + ) + + llm_output = response.choices[0].message.content + return llm_output + except Exception as e: + return f"An error occurred: {e}" # Return the error message for the caller to handle + + + +async def eval_test( + query=None, + output=None, + expected_output=None, + context=None, + synthetic_test_set=False, +): result_output = await generate_chatgpt_output(query, context) if synthetic_test_set: test_case = synthetic_test_set else: - test_case = LLMTestCase( query=query, output=result_output, @@ -226,10 +301,10 @@ async def eval_test(query=None, output=None, expected_output=None, context=None, "output": test_result.output, "expected_output": test_result.expected_output, "metadata": test_result.metadata, - "context": test_result.context + "context": test_result.context, } - test_result_dict =[] + test_result_dict = [] for test in test_result: test_result_it = test_result_to_dict(test) test_result_dict.append(test_result_it) @@ -238,13 +313,11 @@ async def eval_test(query=None, output=None, expected_output=None, context=None, # print(test_result) - -def data_format_route( data_string: str): +def data_format_route(data_string: str): @ai_classifier class FormatRoute(Enum): """Represents classifier for the data format""" - PDF = "PDF" UNSTRUCTURED_WEB = "UNSTRUCTURED_WEB" GITHUB = "GITHUB" @@ -258,7 +331,7 @@ def data_format_route( data_string: str): def data_location_route(data_string: str): @ai_classifier class LocationRoute(Enum): - """Represents classifier for the data location, if it is device, or database connections string or URL """ + """Represents classifier for the data location, if it is device, or database connections string or URL""" DEVICE = "file_path_starting_with_.data_or_containing_it" # URL = "url starting with http or https" @@ -269,71 +342,92 @@ def data_location_route(data_string: str): def dynamic_test_manager(context=None): from deepeval.dataset import create_evaluation_query_answer_pairs + # fetch random chunks from the document - #feed them to the evaluation pipeline - dataset = create_evaluation_query_answer_pairs(openai_api_key=os.environ.get("OPENAI_API_KEY"), context= context ,n=10) + # feed them to the evaluation pipeline + dataset = create_evaluation_query_answer_pairs( + openai_api_key=os.environ.get("OPENAI_API_KEY"), context=context, n=10 + ) return dataset - def generate_letter_uuid(length=8): """Generate a random string of uppercase letters with the specified length.""" letters = string.ascii_uppercase # A-Z - return ''.join(random.choice(letters) for _ in range(length)) - -async def start_test(data, test_set=None, user_id=None, params=None, job_id=None, metadata=None, generate_test_set=False, retriever_type:str=None): + return "".join(random.choice(letters) for _ in range(length)) - """retriever_type = "llm_context, single_document_context, multi_document_context, "cognitive_architecture""""" - +async def start_test( + data, + test_set=None, + user_id=None, + params=None, + job_id=None, + metadata=None, + generate_test_set=False, + retriever_type: str = None, +): + """retriever_type = "llm_context, single_document_context, multi_document_context, "cognitive_architecture""" "" async with session_scope(session=AsyncSessionLocal()) as session: - - job_id = await fetch_job_id(session, user_id=user_id, job_id=job_id) test_set_id = await fetch_test_set_id(session, user_id=user_id, id=job_id) - memory = await Memory.create_memory(user_id, session, namespace="SEMANTICMEMORY") + memory = await Memory.create_memory( + user_id, session, namespace="SEMANTICMEMORY" + ) await memory.add_memory_instance("ExampleMemory") existing_user = await Memory.check_existing_user(user_id, session) - - if test_set_id is None: test_set_id = str(uuid.uuid4()) - await add_entity(session, TestSet(id=test_set_id, user_id=user_id, content=str(test_set))) + await add_entity( + session, TestSet(id=test_set_id, user_id=user_id, content=str(test_set)) + ) if params is None: - data_format = data_format_route(data) # Assume data_format_route is predefined + data_format = data_format_route( + data + ) # Assume data_format_route is predefined logging.info("Data format is %s", data_format) data_location = data_location_route(data) - logging.info("Data location is %s",data_location)# Assume data_location_route is predefined - test_params = generate_param_variants( - included_params=['chunk_size']) + logging.info( + "Data location is %s", data_location + ) # Assume data_location_route is predefined + test_params = generate_param_variants(included_params=["chunk_size"]) print("Here are the test params", str(test_params)) loader_settings = { "format": f"{data_format}", "source": f"{data_location}", - "path": data + "path": data, } if job_id is None: job_id = str(uuid.uuid4()) - await add_entity(session, Operation(id=job_id, user_id=user_id, operation_params =str(test_params), operation_type=retriever_type, test_set_id=test_set_id)) - - - async def run_test(test, loader_settings, metadata, test_id=None,retriever_type=False): + await add_entity( + session, + Operation( + id=job_id, + user_id=user_id, + operation_params=str(test_params), + operation_type=retriever_type, + test_set_id=test_set_id, + ), + ) + async def run_test( + test, loader_settings, metadata, test_id=None, retriever_type=False + ): if test_id is None: - test_id = str(generate_letter_uuid()) + "_" +"SEMANTICMEMORY" + test_id = str(generate_letter_uuid()) + "_" + "SEMANTICMEMORY" await memory.manage_memory_attributes(existing_user) test_class = test_id + "_class" await memory.add_dynamic_memory_class(test_id.lower(), test_id) dynamic_memory_class = getattr(memory, test_class.lower(), None) - methods_to_add = ['add_memories', 'fetch_memories', 'delete_memories'] + methods_to_add = ["add_memories", "fetch_memories", "delete_memories"] if dynamic_memory_class is not None: for method_name in methods_to_add: @@ -348,98 +442,132 @@ async def start_test(data, test_set=None, user_id=None, params=None, job_id=None loader_settings.update(test) test_class = test_id + "_class" dynamic_memory_class = getattr(memory, test_class.lower(), None) - async def run_load_test_element( loader_settings=loader_settings, metadata=metadata, test_id=test_id, test_set=test_set): + async def run_load_test_element( + loader_settings=loader_settings, + metadata=metadata, + test_id=test_id, + test_set=test_set, + ): print(f"Trying to access: {test_class.lower()}") - await memory.dynamic_method_call(dynamic_memory_class, 'add_memories', - observation='Observation loaded', params=metadata, - loader_settings=loader_settings) + await memory.dynamic_method_call( + dynamic_memory_class, + "add_memories", + observation="Observation loaded", + params=metadata, + loader_settings=loader_settings, + ) return "Loaded test element" + async def run_search_element(test_item, test_id): - retrieve_action = await memory.dynamic_method_call(dynamic_memory_class, 'fetch_memories', - observation=str(test_item["question"])) - print("Here is the test result", str(retrieve_action["data"]['Get'][test_id][0]["text"])) - return retrieve_action["data"]['Get'][test_id][0]["text"] + retrieve_action = await memory.dynamic_method_call( + dynamic_memory_class, + "fetch_memories", + observation=str(test_item["question"]), + ) + print( + "Here is the test result", + str(retrieve_action["data"]["Get"][test_id][0]["text"]), + ) + return retrieve_action["data"]["Get"][test_id][0]["text"] async def run_eval(test_item, search_result): - test_eval= await eval_test(query=test_item["question"], expected_output=test_item["answer"], - context=str(search_result)) + test_eval = await eval_test( + query=test_item["question"], + expected_output=test_item["answer"], + context=str(search_result), + ) return test_eval - async def run_generate_test_set( test_id): + async def run_generate_test_set(test_id): test_class = test_id + "_class" # await memory.add_dynamic_memory_class(test_id.lower(), test_id) dynamic_memory_class = getattr(memory, test_class.lower(), None) print(dynamic_memory_class) - retrieve_action = await memory.dynamic_method_call(dynamic_memory_class, 'fetch_memories', - observation="Generate a short summary of this document", - search_type="generative") + retrieve_action = await memory.dynamic_method_call( + dynamic_memory_class, + "fetch_memories", + observation="Generate a short summary of this document", + search_type="generative", + ) return dynamic_test_manager(retrieve_action) - test_eval_pipeline =[] + test_eval_pipeline = [] if retriever_type == "llm_context": for test_qa in test_set: - context="" + context = "" + logging.info("Loading and evaluating test set for LLM context") test_result = await run_eval(test_qa, context) test_eval_pipeline.append(test_result) + elif retriever_type == "single_document_context": + if test_set: + logging.info( + "Loading and evaluating test set for a single document context" + ) + await run_load_test_element( + loader_settings, metadata, test_id, test_set + ) + for test_qa in test_set: + result = await run_search_element(test_qa, test_id) + test_result = await run_eval(test_qa, result) + test_eval_pipeline.append(test_result) + await memory.dynamic_method_call( + dynamic_memory_class, "delete_memories", namespace=test_id + ) + else: + pass if generate_test_set is True: synthetic_test_set = run_generate_test_set(test_id) else: pass - if test_set: - logging.info("Loading and evaluating test set") - await run_load_test_element(loader_settings, metadata, test_id, test_set) - for test_qa in test_set: - result = await run_search_element(test_qa, test_id) - test_result = await run_eval(test_qa, result) - test_eval_pipeline.append( test_result) - - else: - pass - - await memory.dynamic_method_call(dynamic_memory_class, 'delete_memories', - namespace=test_id) return test_id, test_eval_pipeline results = [] if retriever_type == "llm_context": - test_id, result = await run_test(test=None, loader_settings=loader_settings, metadata=metadata, - retriever_type=retriever_type) # No params for this case + test_id, result = await run_test( + test=None, + loader_settings=loader_settings, + metadata=metadata, + retriever_type=retriever_type, + ) # No params for this case results.append([result, "No params"]) elif retriever_type == "single_document_context": for param in test_params: - test_id, result = await run_test(param, loader_settings, metadata, retriever_type=retriever_type) # Add the params to the result + test_id, result = await run_test( + param, loader_settings, metadata, retriever_type=retriever_type + ) # Add the params to the result results.append([result, param]) for b, r in results: - print("Here is the result", r) for result_list in b: for result in result_list: - print("Here is one result", result) - await add_entity(session, TestOutput( - id=test_id, - test_set_id=test_set_id, - operation_id=job_id, - set_id=str(uuid.uuid4()), - user_id=user_id, - test_results=result['success'], - test_score=str(result['score']), - test_metric_name=result['metric_name'], - test_query=result['query'], - test_output=result['output'], - test_expected_output=str(['expected_output']), - test_context=result['context'][0], - test_params=str(r) # Add params to the database table - )) + await add_entity( + session, + TestOutput( + id=test_id, + test_set_id=test_set_id, + operation_id=job_id, + set_id=str(uuid.uuid4()), + user_id=user_id, + test_results=result["success"], + test_score=str(result["score"]), + test_metric_name=result["metric_name"], + test_query=result["query"], + test_output=result["output"], + test_expected_output=str(["expected_output"]), + test_context=result["context"][0], + test_params=str(r), # Add params to the database table + ), + ) return results -async def main(): +async def main(): metadata = { "version": "1.0", "agreement_id": "AG123456", @@ -457,28 +585,32 @@ async def main(): test_set = [ { "question": "Who is the main character in 'The Call of the Wild'?", - "answer": "Buck" - }, - { - "question": "Who wrote 'The Call of the Wild'?", - "answer": "Jack London" + "answer": "Buck", }, + {"question": "Who wrote 'The Call of the Wild'?", "answer": "Jack London"}, { "question": "Where does Buck live at the start of the book?", - "answer": "In the Santa Clara Valley, at Judge Miller’s place." + "answer": "In the Santa Clara Valley, at Judge Miller’s place.", }, { "question": "Why is Buck kidnapped?", - "answer": "He is kidnapped to be sold as a sled dog in the Yukon during the Klondike Gold Rush." + "answer": "He is kidnapped to be sold as a sled dog in the Yukon during the Klondike Gold Rush.", }, { "question": "How does Buck become the leader of the sled dog team?", - "answer": "Buck becomes the leader after defeating the original leader, Spitz, in a fight." - } + "answer": "Buck becomes the leader after defeating the original leader, Spitz, in a fight.", + }, ] # "https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf" - #http://public-library.uk/ebooks/59/83.pdf - result = await start_test(".data/3ZCCCW.pdf", test_set=test_set, user_id="677", params=None, metadata=metadata, retriever_type='llm_context') + # http://public-library.uk/ebooks/59/83.pdf + result = await start_test( + ".data/3ZCCCW.pdf", + test_set=test_set, + user_id="677", + params=None, + metadata=metadata, + retriever_type="llm_context", + ) # # parser = argparse.ArgumentParser(description="Run tests against a document.") # parser.add_argument("--url", required=True, help="URL of the document to test.") @@ -520,8 +652,9 @@ async def main(): # params = None # #clean up params here # await start_test(args.url, test_set, args.user_id, params=None, metadata=metadata) -if __name__ == "__main__": + +if __name__ == "__main__": asyncio.run(main()) # delete_mems = await memory.dynamic_method_call(dynamic_memory_class, 'delete_memories', @@ -541,4 +674,4 @@ if __name__ == "__main__": # results = await asyncio.gather( # *(run_testo(test, loader_settings, metadata) for test in test_params) # ) -# return results \ No newline at end of file +# return results diff --git a/level_3/vectorstore_manager.py b/level_3/vectorstore_manager.py index df0f652c2..217318b7e 100644 --- a/level_3/vectorstore_manager.py +++ b/level_3/vectorstore_manager.py @@ -6,7 +6,9 @@ logging.basicConfig(level=logging.INFO) import marvin from dotenv import load_dotenv from sqlalchemy.orm import sessionmaker -from database.database import engine # Ensure you have database engine defined somewhere +from database.database import ( + engine, +) # Ensure you have database engine defined somewhere from models.user import User from models.memory import MemoryModel from models.sessions import Session @@ -14,6 +16,7 @@ from models.testset import TestSet from models.testoutput import TestOutput from models.metadatas import MetaDatas from models.operation import Operation + load_dotenv() import ast import tracemalloc @@ -29,7 +32,7 @@ load_dotenv() OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") marvin.settings.openai.api_key = os.environ.get("OPENAI_API_KEY") -from vectordb.basevectordb import BaseMemory +from vectordb.basevectordb import BaseMemory class DynamicBaseMemory(BaseMemory): @@ -41,7 +44,7 @@ class DynamicBaseMemory(BaseMemory): index_name: str, db_type: str, namespace: str, - embeddings=None + embeddings=None, ): super().__init__(user_id, memory_id, index_name, db_type, namespace, embeddings) self.name = name @@ -50,7 +53,6 @@ class DynamicBaseMemory(BaseMemory): self.inheritance = None self.associations = [] - async def add_method(self, method_name): """ Add a method to the memory class. @@ -102,6 +104,7 @@ class DynamicBaseMemory(BaseMemory): # Optionally, establish a bidirectional association associated_memory.associations.append(self) + class Attribute: def __init__(self, name): """ @@ -115,6 +118,7 @@ class Attribute: """ self.name = name + class Method: def __init__(self, name): """ @@ -130,9 +134,17 @@ class Method: class Memory: - def __init__(self, user_id: str = "676", session=None, index_name: str = None, - knowledge_source: str = None, knowledge_type: str = None, - db_type: str = "weaviate", namespace: str = None, memory_id:str =None) -> None: + def __init__( + self, + user_id: str = "676", + session=None, + index_name: str = None, + knowledge_source: str = None, + knowledge_type: str = None, + db_type: str = "weaviate", + namespace: str = None, + memory_id: str = None, + ) -> None: self.load_environment_variables() self.memory_id = memory_id self.user_id = user_id @@ -145,12 +157,16 @@ class Memory: self.short_term_memory = None self.namespace = namespace self.memory_instances = [] - #inspect and fix this - self.memory_class = DynamicBaseMemory('Memory', user_id, str(self.memory_id), index_name, db_type, namespace) + # inspect and fix this + self.memory_class = DynamicBaseMemory( + "Memory", user_id, str(self.memory_id), index_name, db_type, namespace + ) + def load_environment_variables(self) -> None: load_dotenv() self.OPENAI_TEMPERATURE = float(os.getenv("OPENAI_TEMPERATURE", 0.0)) self.OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") + @classmethod async def create_memory(cls, user_id: str, session, **kwargs): """ @@ -160,14 +176,17 @@ class Memory: existing_user = await cls.check_existing_user(user_id, session) if existing_user: - # Handle existing user scenario... memory_id = await cls.check_existing_memory(user_id, session) - logging.info(f"Existing user {user_id} found in the DB. Memory ID: {memory_id}") + logging.info( + f"Existing user {user_id} found in the DB. Memory ID: {memory_id}" + ) else: # Handle new user scenario... memory_id = await cls.handle_new_user(user_id, session) - logging.info(f"New user {user_id} created in the DB. Memory ID: {memory_id}") + logging.info( + f"New user {user_id} created in the DB. Memory ID: {memory_id}" + ) return cls(user_id=user_id, session=session, memory_id=memory_id, **kwargs) @@ -181,9 +200,7 @@ class Memory: @staticmethod async def check_existing_user(user_id: str, session): """Check if a user exists in the DB and return it.""" - result = await session.execute( - select(User).where(User.id == user_id) - ) + result = await session.execute(select(User).where(User.id == user_id)) return result.scalar_one_or_none() @staticmethod @@ -198,21 +215,42 @@ class Memory: async def handle_new_user(user_id: str, session): """Handle new user creation in the DB and return the new memory ID.""" - #handle these better in terms of retry and error handling + # handle these better in terms of retry and error handling memory_id = str(uuid.uuid4()) new_user = User(id=user_id) await add_entity(session, new_user) - memory = MemoryModel(id=memory_id, user_id=user_id, methods_list=str(['Memory', 'SemanticMemory', 'EpisodicMemory']), - attributes_list=str(['user_id', 'index_name', 'db_type', 'knowledge_source', 'knowledge_type', 'memory_id', 'long_term_memory', 'short_term_memory', 'namespace'])) + memory = MemoryModel( + id=memory_id, + user_id=user_id, + methods_list=str(["Memory", "SemanticMemory", "EpisodicMemory"]), + attributes_list=str( + [ + "user_id", + "index_name", + "db_type", + "knowledge_source", + "knowledge_type", + "memory_id", + "long_term_memory", + "short_term_memory", + "namespace", + ] + ), + ) await add_entity(session, memory) return memory_id async def add_memory_instance(self, memory_class_name: str): """Add a new memory instance to the memory_instances list.""" - instance = DynamicBaseMemory(memory_class_name, self.user_id, - self.memory_id, self.index_name, - self.db_type, self.namespace) + instance = DynamicBaseMemory( + memory_class_name, + self.user_id, + self.memory_id, + self.index_name, + self.db_type, + self.namespace, + ) print("The following instance was defined", instance) self.memory_instances.append(instance) @@ -228,8 +266,6 @@ class Memory: if existing_user: print(f"ID before query: {self.memory_id}, type: {type(self.memory_id)}") - - # attributes_list = await self.session.query(MemoryModel.attributes_list).filter_by(id=self.memory_id[0]).scalar() attributes_list = await self.query_method() logging.info(f"Attributes list: {attributes_list}") @@ -239,10 +275,17 @@ class Memory: else: logging.warning("attributes_list is None!") else: - attributes_list = ['user_id', 'index_name', 'db_type', - 'knowledge_source', 'knowledge_type', - 'memory_id', 'long_term_memory', - 'short_term_memory', 'namespace'] + attributes_list = [ + "user_id", + "index_name", + "db_type", + "knowledge_source", + "knowledge_type", + "memory_id", + "long_term_memory", + "short_term_memory", + "namespace", + ] await self.handle_attributes(attributes_list) async def handle_attributes(self, attributes_list): @@ -259,49 +302,66 @@ class Memory: # methods_list = await self.session.query(MemoryModel.methods_list).filter_by(id=self.memory_id).scalar() methods_list = await self.session.execute( - select(MemoryModel.methods_list).where(MemoryModel.id == self.memory_id[0]) + select(MemoryModel.methods_list).where( + MemoryModel.id == self.memory_id[0] + ) ) methods_list = methods_list.scalar_one_or_none() methods_list = ast.literal_eval(methods_list) else: # Define default methods for a new user methods_list = [ - 'async_create_long_term_memory', 'async_init', 'add_memories', "fetch_memories", "delete_memories", - 'async_create_short_term_memory', '_create_buffer_context', '_get_task_list', - '_run_main_buffer', '_available_operations', '_provide_feedback' + "async_create_long_term_memory", + "async_init", + "add_memories", + "fetch_memories", + "delete_memories", + "async_create_short_term_memory", + "_create_buffer_context", + "_get_task_list", + "_run_main_buffer", + "_available_operations", + "_provide_feedback", ] # Apply methods to memory instances for class_instance in self.memory_instances: for method in methods_list: class_instance.add_method(method) - - async def dynamic_method_call(self, dynamic_base_memory_instance, method_name: str, *args, **kwargs): + async def dynamic_method_call( + self, dynamic_base_memory_instance, method_name: str, *args, **kwargs + ): if method_name in dynamic_base_memory_instance.methods: method = getattr(dynamic_base_memory_instance, method_name, None) if method: return await method(*args, **kwargs) - raise AttributeError(f"{dynamic_base_memory_instance.name} object has no attribute {method_name}") + raise AttributeError( + f"{dynamic_base_memory_instance.name} object has no attribute {method_name}" + ) async def add_dynamic_memory_class(self, class_name: str, namespace: str): logging.info("Here is the memory id %s", self.memory_id[0]) - new_memory_class = DynamicBaseMemory(class_name, self.user_id, self.memory_id[0], self.index_name, - self.db_type, namespace) + new_memory_class = DynamicBaseMemory( + class_name, + self.user_id, + self.memory_id[0], + self.index_name, + self.db_type, + namespace, + ) setattr(self, f"{class_name.lower()}_class", new_memory_class) return new_memory_class async def add_attribute_to_class(self, class_instance, attribute_name: str): - #add this to database for a particular user and load under memory id + # add this to database for a particular user and load under memory id await class_instance.add_attribute(attribute_name) async def add_method_to_class(self, class_instance, method_name: str): - #add this to database for a particular user and load under memory id + # add this to database for a particular user and load under memory id await class_instance.add_method(method_name) - async def main(): - # if you want to run the script as a standalone script, do so with the examples below # memory = Memory(user_id="TestUser") # await memory.async_init() @@ -318,10 +378,10 @@ async def main(): "validity_start": "2023-08-01", "validity_end": "2024-07-31", } - loader_settings = { - "format": "PDF", - "source": "URL", - "path": "https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf" + loader_settings = { + "format": "PDF", + "source": "URL", + "path": "https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf", } # memory_instance = Memory(namespace='SEMANTICMEMORY') # sss = await memory_instance.dynamic_method_call(memory_instance.semantic_memory_class, 'fetch_memories', observation='some_observation') @@ -330,26 +390,28 @@ async def main(): from database.database import AsyncSessionLocal async with session_scope(AsyncSessionLocal()) as session: - memory = await Memory.create_memory("676", session, namespace='SEMANTICMEMORY') + memory = await Memory.create_memory("676", session, namespace="SEMANTICMEMORY") # Adding a memory instance await memory.add_memory_instance("ExampleMemory") - # Managing memory attributes existing_user = await Memory.check_existing_user("676", session) print("here is the existing user", existing_user) await memory.manage_memory_attributes(existing_user) # aeehuvyq_semanticememory_class - await memory.add_dynamic_memory_class('semanticmemory', 'SEMANTICMEMORY') - await memory.add_method_to_class(memory.semanticmemory_class, 'add_memories') - await memory.add_method_to_class(memory.semanticmemory_class, 'fetch_memories') + await memory.add_dynamic_memory_class("semanticmemory", "SEMANTICMEMORY") + await memory.add_method_to_class(memory.semanticmemory_class, "add_memories") + await memory.add_method_to_class(memory.semanticmemory_class, "fetch_memories") # sss = await memory.dynamic_method_call(memory.semanticmemory_class, 'add_memories', # observation='some_observation', params=params, loader_settings=loader_settings) - susu = await memory.dynamic_method_call(memory.semanticmemory_class, 'fetch_memories', - observation='some_observation') + susu = await memory.dynamic_method_call( + memory.semanticmemory_class, + "fetch_memories", + observation="some_observation", + ) print(susu) # Adding a dynamic memory class @@ -360,16 +422,14 @@ async def main(): # memory_instance.add_method_to_class(procedural_memory_class, 'add_memories') # - # print(sss) # load_jack_london = await memory._add_semantic_memory(observation = "bla", loader_settings=loader_settings, params=params) # print(load_jack_london) - modulator = {"relevance": 0.1, "frequency": 0.1} + modulator = {"relevance": 0.1, "frequency": 0.1} if __name__ == "__main__": import asyncio asyncio.run(main()) -