Added a few fixes and refactored the base app

This commit is contained in:
Vasilije 2023-10-27 11:11:22 +02:00
parent 5f6f2974b1
commit f869548dfe
3 changed files with 446 additions and 214 deletions

View file

@ -11,6 +11,7 @@ from level_3.database.database import AsyncSessionLocal
from level_3.database.database_crud import session_scope
from vectorstore_manager import Memory
from dotenv import load_dotenv
# Set up logging
logging.basicConfig(
level=logging.INFO, # Set the logging level (e.g., DEBUG, INFO, WARNING, ERROR, CRITICAL)
@ -20,20 +21,23 @@ logging.basicConfig(
logger = logging.getLogger(__name__)
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
app = FastAPI(debug=True)
from auth.cognito.JWTBearer import JWTBearer
from auth.auth import jwks
auth = JWTBearer(jwks)
from fastapi import Depends
class ImageResponse(BaseModel):
success: bool
message: str
@app.get(
"/",
)
@ -43,15 +47,19 @@ async def root():
"""
return {"message": "Hello, World, I am alive!"}
@app.get("/health")
def health_check():
"""
Health check endpoint that returns the server status.
"""
return {"status": "OK"}
class Payload(BaseModel):
payload: Dict[str, Any]
def memory_factory(memory_type):
load_dotenv()
@ -67,24 +75,34 @@ def memory_factory(memory_type):
logging.info(" Adding to Memory ")
decoded_payload = payload.payload
async with session_scope(session=AsyncSessionLocal()) as session:
memory = await Memory.create_memory(decoded_payload["user_id"], session, namespace='SEMANTICMEMORY')
memory = await Memory.create_memory(
decoded_payload["user_id"], session, namespace="SEMANTICMEMORY"
)
# Adding a memory instance
await memory.add_memory_instance(decoded_payload["memory_object"])
# Managing memory attributes
existing_user = await Memory.check_existing_user(decoded_payload["user_id"], session)
existing_user = await Memory.check_existing_user(
decoded_payload["user_id"], session
)
await memory.manage_memory_attributes(existing_user)
await memory.add_dynamic_memory_class(decoded_payload["memory_object"], decoded_payload["memory_object"].upper())
await memory.add_dynamic_memory_class(
decoded_payload["memory_object"],
decoded_payload["memory_object"].upper(),
)
memory_class = decoded_payload["memory_object"] + "_class"
dynamic_memory_class = getattr(memory, memory_class.lower(), None)
await memory.add_method_to_class(dynamic_memory_class, 'add_memories')
await memory.add_method_to_class(dynamic_memory_class, "add_memories")
# await memory.add_method_to_class(memory.semanticmemory_class, 'fetch_memories')
output = await memory.dynamic_method_call(dynamic_memory_class, 'add_memories',
observation='some_observation', params=decoded_payload["params"],
loader_settings=decoded_payload["loader_settings"])
output = await memory.dynamic_method_call(
dynamic_memory_class,
"add_memories",
observation="some_observation",
params=decoded_payload["params"],
loader_settings=decoded_payload["loader_settings"],
)
return JSONResponse(content={"response": output}, status_code=200)
except Exception as e:
@ -101,23 +119,32 @@ def memory_factory(memory_type):
logging.info(" Adding to Memory ")
decoded_payload = payload.payload
async with session_scope(session=AsyncSessionLocal()) as session:
memory = await Memory.create_memory(decoded_payload["user_id"], session, namespace='SEMANTICMEMORY')
memory = await Memory.create_memory(
decoded_payload["user_id"], session, namespace="SEMANTICMEMORY"
)
# Adding a memory instance
await memory.add_memory_instance(decoded_payload["memory_object"])
# Managing memory attributes
existing_user = await Memory.check_existing_user(decoded_payload["user_id"], session)
existing_user = await Memory.check_existing_user(
decoded_payload["user_id"], session
)
await memory.manage_memory_attributes(existing_user)
await memory.add_dynamic_memory_class(decoded_payload["memory_object"], decoded_payload["memory_object"].upper())
await memory.add_dynamic_memory_class(
decoded_payload["memory_object"],
decoded_payload["memory_object"].upper(),
)
memory_class = decoded_payload["memory_object"] + "_class"
dynamic_memory_class = getattr(memory, memory_class.lower(), None)
await memory.add_method_to_class(dynamic_memory_class, 'add_memories')
await memory.add_method_to_class(dynamic_memory_class, "add_memories")
# await memory.add_method_to_class(memory.semanticmemory_class, 'fetch_memories')
output = await memory.dynamic_method_call(dynamic_memory_class, 'fetch_memories',
observation=decoded_payload['observation'])
output = await memory.dynamic_method_call(
dynamic_memory_class,
"fetch_memories",
observation=decoded_payload["observation"],
)
return JSONResponse(content={"response": output}, status_code=200)
except Exception as e:
@ -134,23 +161,34 @@ def memory_factory(memory_type):
logging.info(" Adding to Memory ")
decoded_payload = payload.payload
async with session_scope(session=AsyncSessionLocal()) as session:
memory = await Memory.create_memory(decoded_payload["user_id"], session, namespace='SEMANTICMEMORY')
memory = await Memory.create_memory(
decoded_payload["user_id"], session, namespace="SEMANTICMEMORY"
)
# Adding a memory instance
await memory.add_memory_instance(decoded_payload["memory_object"])
# Managing memory attributes
existing_user = await Memory.check_existing_user(decoded_payload["user_id"], session)
existing_user = await Memory.check_existing_user(
decoded_payload["user_id"], session
)
await memory.manage_memory_attributes(existing_user)
await memory.add_dynamic_memory_class(decoded_payload["memory_object"], decoded_payload["memory_object"].upper())
await memory.add_dynamic_memory_class(
decoded_payload["memory_object"],
decoded_payload["memory_object"].upper(),
)
memory_class = decoded_payload["memory_object"] + "_class"
dynamic_memory_class = getattr(memory, memory_class.lower(), None)
await memory.add_method_to_class(dynamic_memory_class, 'delete_memories')
await memory.add_method_to_class(
dynamic_memory_class, "delete_memories"
)
# await memory.add_method_to_class(memory.semanticmemory_class, 'fetch_memories')
output = await memory.dynamic_method_call(dynamic_memory_class, 'delete_memories',
namespace=decoded_payload["memory_object"].upper())
output = await memory.dynamic_method_call(
dynamic_memory_class,
"delete_memories",
namespace=decoded_payload["memory_object"].upper(),
)
return JSONResponse(content={"response": output}, status_code=200)
except Exception as e:
@ -158,6 +196,7 @@ def memory_factory(memory_type):
content={"response": {"error": str(e)}}, status_code=503
)
memory_list = ["episodic", "buffer", "semantic"]
for memory_type in memory_list:
memory_factory(memory_type)

View file

@ -20,7 +20,9 @@ logging.basicConfig(level=logging.INFO)
import marvin
from dotenv import load_dotenv
from sqlalchemy.orm import sessionmaker
from database.database import engine # Ensure you have database engine defined somewhere
from database.database import (
engine,
) # Ensure you have database engine defined somewhere
from models.user import User
from models.memory import MemoryModel
from models.sessions import Session
@ -28,6 +30,7 @@ from models.testset import TestSet
from models.testoutput import TestOutput
from models.metadatas import MetaDatas
from models.operation import Operation
load_dotenv()
import ast
import tracemalloc
@ -43,12 +46,13 @@ load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
marvin.settings.openai.api_key = os.environ.get("OPENAI_API_KEY")
from vectordb.basevectordb import BaseMemory
from vectordb.basevectordb import BaseMemory
from vectorstore_manager import Memory
import asyncio
from database.database_crud import session_scope
from database.database import AsyncSessionLocal
openai.api_key = os.getenv("OPENAI_API_KEY", "")
@ -58,13 +62,19 @@ async def retrieve_latest_test_case(session, user_id, memory_id):
result = await session.execute(
session.query(TestSet.attributes_list)
.filter_by(user_id=user_id, memory_id=memory_id)
.order_by(TestSet.created_at).first()
.order_by(TestSet.created_at)
.first()
)
return result.scalar_one_or_none() # scalar_one_or_none() is a non-blocking call
return (
result.scalar_one_or_none()
) # scalar_one_or_none() is a non-blocking call
except Exception as e:
logging.error(f"An error occurred while retrieving the latest test case: {str(e)}")
logging.error(
f"An error occurred while retrieving the latest test case: {str(e)}"
)
return None
async def add_entity(session, entity):
async with session_scope(session) as s: # Use your async session_scope
s.add(entity) # No need to commit; session_scope takes care of it
@ -72,6 +82,7 @@ async def add_entity(session, entity):
return "Successfully added entity"
async def retrieve_job_by_id(session, user_id, job_id):
try:
result = await session.execute(
@ -84,12 +95,14 @@ async def retrieve_job_by_id(session, user_id, job_id):
logging.error(f"An error occurred while retrieving the job: {str(e)}")
return None
async def fetch_job_id(session, user_id=None, memory_id=None, job_id=None):
try:
result = await session.execute(
session.query(Session.id)
.filter_by(user_id=user_id, id=job_id)
.order_by(Session.created_at).first()
.order_by(Session.created_at)
.first()
)
return result.scalar_one_or_none()
except Exception as e:
@ -103,16 +116,24 @@ async def fetch_test_set_id(session, user_id, id):
result = await session.execute(
session.query(TestSet.id)
.filter_by(user_id=user_id, id=id)
.order_by(TestSet.created_at).desc().first()
.order_by(TestSet.created_at)
.desc()
.first()
)
return result.scalar_one_or_none() # scalar_one_or_none() is a non-blocking call
return (
result.scalar_one_or_none()
) # scalar_one_or_none() is a non-blocking call
except Exception as e:
logging.error(f"An error occurred while retrieving the test set: {str(e)}")
return None
# Adding "embeddings" to the parameter variants function
def generate_param_variants(base_params=None, increments=None, ranges=None, included_params=None):
def generate_param_variants(
base_params=None, increments=None, ranges=None, included_params=None
):
"""Generate parameter variants for testing.
Args:
@ -128,22 +149,22 @@ def generate_param_variants(base_params=None, increments=None, ranges=None, incl
# Default values
defaults = {
'chunk_size': 250,
'chunk_overlap': 20,
'similarity_score': 0.5,
'metadata_variation': 0,
'search_type': 'hybrid',
'embeddings': 'openai' # Default value added for 'embeddings'
"chunk_size": 250,
"chunk_overlap": 20,
"similarity_score": 0.5,
"metadata_variation": 0,
"search_type": "hybrid",
"embeddings": "openai", # Default value added for 'embeddings'
}
# Update defaults with provided base parameters
params = {**defaults, **(base_params or {})}
default_increments = {
'chunk_size': 150,
'chunk_overlap': 10,
'similarity_score': 0.1,
'metadata_variation': 1
"chunk_size": 150,
"chunk_overlap": 10,
"similarity_score": 0.1,
"metadata_variation": 1,
}
# Update default increments with provided increments
@ -151,10 +172,10 @@ def generate_param_variants(base_params=None, increments=None, ranges=None, incl
# Default ranges
default_ranges = {
'chunk_size': 2,
'chunk_overlap': 2,
'similarity_score': 2,
'metadata_variation': 2
"chunk_size": 2,
"chunk_overlap": 2,
"similarity_score": 2,
"metadata_variation": 2,
}
# Update default ranges with provided ranges
@ -162,22 +183,43 @@ def generate_param_variants(base_params=None, increments=None, ranges=None, incl
# Generate parameter variant ranges
param_ranges = {
key: [params[key] + i * increments.get(key, 1) for i in range(ranges.get(key, 1))]
for key in ['chunk_size', 'chunk_overlap', 'similarity_score', 'metadata_variation']
key: [
params[key] + i * increments.get(key, 1) for i in range(ranges.get(key, 1))
]
for key in [
"chunk_size",
"chunk_overlap",
"similarity_score",
"metadata_variation",
]
}
# Add search_type and embeddings with possible values
param_ranges['search_type'] = ['text', 'hybrid', 'bm25', 'generate', 'generate_grouped']
param_ranges['embeddings'] = ['openai', 'cohere', 'huggingface'] # Added 'embeddings' values
param_ranges["search_type"] = [
"text",
"hybrid",
"bm25",
"generate",
"generate_grouped",
]
param_ranges["embeddings"] = [
"openai",
"cohere",
"huggingface",
] # Added 'embeddings' values
# Filter param_ranges based on included_params
if included_params is not None:
param_ranges = {key: val for key, val in param_ranges.items() if key in included_params}
param_ranges = {
key: val for key, val in param_ranges.items() if key in included_params
}
# Generate all combinations of parameter variants
keys = param_ranges.keys()
values = param_ranges.values()
param_variants = [dict(zip(keys, combination)) for combination in itertools.product(*values)]
param_variants = [
dict(zip(keys, combination)) for combination in itertools.product(*values)
]
return param_variants
@ -185,27 +227,60 @@ def generate_param_variants(base_params=None, increments=None, ranges=None, incl
# Generate parameter variants and display a sample of the generated combinations
async def generate_chatgpt_output(query:str, context:str=None):
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "assistant", "content": f"{context}"},
{"role": "user", "content": query}
]
)
llm_output = response.choices[0].message.content
# print(llm_output)
return llm_output
async def eval_test(query=None, output=None, expected_output=None, context=None, synthetic_test_set=False):
async def generate_chatgpt_output(query: str, context: str = None, api_key=None, model_name="gpt-3.5-turbo"):
"""
Generate a response from the OpenAI ChatGPT model.
Args:
query (str): The user's query or message.
context (str, optional): Additional context for the conversation. Defaults to an empty string.
api_key (str, optional): Your OpenAI API key. If not provided, the globally configured API key will be used.
model_name (str, optional): The name of the ChatGPT model to use. Defaults to "gpt-3.5-turbo".
Returns:
str: The response generated by the ChatGPT model.
Raises:
Exception: If an error occurs during the API call, an error message is returned for the caller to handle.
"""
if not context:
context = ""
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "assistant", "content": context},
{"role": "user", "content": query},
]
try:
openai.api_key = api_key if api_key else openai.api_key # Use the provided API key or the one set globally
response = openai.ChatCompletion.create(
model=model_name,
messages=messages,
)
llm_output = response.choices[0].message.content
return llm_output
except Exception as e:
return f"An error occurred: {e}" # Return the error message for the caller to handle
async def eval_test(
query=None,
output=None,
expected_output=None,
context=None,
synthetic_test_set=False,
):
result_output = await generate_chatgpt_output(query, context)
if synthetic_test_set:
test_case = synthetic_test_set
else:
test_case = LLMTestCase(
query=query,
output=result_output,
@ -226,10 +301,10 @@ async def eval_test(query=None, output=None, expected_output=None, context=None,
"output": test_result.output,
"expected_output": test_result.expected_output,
"metadata": test_result.metadata,
"context": test_result.context
"context": test_result.context,
}
test_result_dict =[]
test_result_dict = []
for test in test_result:
test_result_it = test_result_to_dict(test)
test_result_dict.append(test_result_it)
@ -238,13 +313,11 @@ async def eval_test(query=None, output=None, expected_output=None, context=None,
# print(test_result)
def data_format_route( data_string: str):
def data_format_route(data_string: str):
@ai_classifier
class FormatRoute(Enum):
"""Represents classifier for the data format"""
PDF = "PDF"
UNSTRUCTURED_WEB = "UNSTRUCTURED_WEB"
GITHUB = "GITHUB"
@ -258,7 +331,7 @@ def data_format_route( data_string: str):
def data_location_route(data_string: str):
@ai_classifier
class LocationRoute(Enum):
"""Represents classifier for the data location, if it is device, or database connections string or URL """
"""Represents classifier for the data location, if it is device, or database connections string or URL"""
DEVICE = "file_path_starting_with_.data_or_containing_it"
# URL = "url starting with http or https"
@ -269,71 +342,92 @@ def data_location_route(data_string: str):
def dynamic_test_manager(context=None):
from deepeval.dataset import create_evaluation_query_answer_pairs
# fetch random chunks from the document
#feed them to the evaluation pipeline
dataset = create_evaluation_query_answer_pairs(openai_api_key=os.environ.get("OPENAI_API_KEY"), context= context ,n=10)
# feed them to the evaluation pipeline
dataset = create_evaluation_query_answer_pairs(
openai_api_key=os.environ.get("OPENAI_API_KEY"), context=context, n=10
)
return dataset
def generate_letter_uuid(length=8):
"""Generate a random string of uppercase letters with the specified length."""
letters = string.ascii_uppercase # A-Z
return ''.join(random.choice(letters) for _ in range(length))
async def start_test(data, test_set=None, user_id=None, params=None, job_id=None, metadata=None, generate_test_set=False, retriever_type:str=None):
return "".join(random.choice(letters) for _ in range(length))
"""retriever_type = "llm_context, single_document_context, multi_document_context, "cognitive_architecture"""""
async def start_test(
data,
test_set=None,
user_id=None,
params=None,
job_id=None,
metadata=None,
generate_test_set=False,
retriever_type: str = None,
):
"""retriever_type = "llm_context, single_document_context, multi_document_context, "cognitive_architecture""" ""
async with session_scope(session=AsyncSessionLocal()) as session:
job_id = await fetch_job_id(session, user_id=user_id, job_id=job_id)
test_set_id = await fetch_test_set_id(session, user_id=user_id, id=job_id)
memory = await Memory.create_memory(user_id, session, namespace="SEMANTICMEMORY")
memory = await Memory.create_memory(
user_id, session, namespace="SEMANTICMEMORY"
)
await memory.add_memory_instance("ExampleMemory")
existing_user = await Memory.check_existing_user(user_id, session)
if test_set_id is None:
test_set_id = str(uuid.uuid4())
await add_entity(session, TestSet(id=test_set_id, user_id=user_id, content=str(test_set)))
await add_entity(
session, TestSet(id=test_set_id, user_id=user_id, content=str(test_set))
)
if params is None:
data_format = data_format_route(data) # Assume data_format_route is predefined
data_format = data_format_route(
data
) # Assume data_format_route is predefined
logging.info("Data format is %s", data_format)
data_location = data_location_route(data)
logging.info("Data location is %s",data_location)# Assume data_location_route is predefined
test_params = generate_param_variants(
included_params=['chunk_size'])
logging.info(
"Data location is %s", data_location
) # Assume data_location_route is predefined
test_params = generate_param_variants(included_params=["chunk_size"])
print("Here are the test params", str(test_params))
loader_settings = {
"format": f"{data_format}",
"source": f"{data_location}",
"path": data
"path": data,
}
if job_id is None:
job_id = str(uuid.uuid4())
await add_entity(session, Operation(id=job_id, user_id=user_id, operation_params =str(test_params), operation_type=retriever_type, test_set_id=test_set_id))
async def run_test(test, loader_settings, metadata, test_id=None,retriever_type=False):
await add_entity(
session,
Operation(
id=job_id,
user_id=user_id,
operation_params=str(test_params),
operation_type=retriever_type,
test_set_id=test_set_id,
),
)
async def run_test(
test, loader_settings, metadata, test_id=None, retriever_type=False
):
if test_id is None:
test_id = str(generate_letter_uuid()) + "_" +"SEMANTICMEMORY"
test_id = str(generate_letter_uuid()) + "_" + "SEMANTICMEMORY"
await memory.manage_memory_attributes(existing_user)
test_class = test_id + "_class"
await memory.add_dynamic_memory_class(test_id.lower(), test_id)
dynamic_memory_class = getattr(memory, test_class.lower(), None)
methods_to_add = ['add_memories', 'fetch_memories', 'delete_memories']
methods_to_add = ["add_memories", "fetch_memories", "delete_memories"]
if dynamic_memory_class is not None:
for method_name in methods_to_add:
@ -348,98 +442,132 @@ async def start_test(data, test_set=None, user_id=None, params=None, job_id=None
loader_settings.update(test)
test_class = test_id + "_class"
dynamic_memory_class = getattr(memory, test_class.lower(), None)
async def run_load_test_element( loader_settings=loader_settings, metadata=metadata, test_id=test_id, test_set=test_set):
async def run_load_test_element(
loader_settings=loader_settings,
metadata=metadata,
test_id=test_id,
test_set=test_set,
):
print(f"Trying to access: {test_class.lower()}")
await memory.dynamic_method_call(dynamic_memory_class, 'add_memories',
observation='Observation loaded', params=metadata,
loader_settings=loader_settings)
await memory.dynamic_method_call(
dynamic_memory_class,
"add_memories",
observation="Observation loaded",
params=metadata,
loader_settings=loader_settings,
)
return "Loaded test element"
async def run_search_element(test_item, test_id):
retrieve_action = await memory.dynamic_method_call(dynamic_memory_class, 'fetch_memories',
observation=str(test_item["question"]))
print("Here is the test result", str(retrieve_action["data"]['Get'][test_id][0]["text"]))
return retrieve_action["data"]['Get'][test_id][0]["text"]
retrieve_action = await memory.dynamic_method_call(
dynamic_memory_class,
"fetch_memories",
observation=str(test_item["question"]),
)
print(
"Here is the test result",
str(retrieve_action["data"]["Get"][test_id][0]["text"]),
)
return retrieve_action["data"]["Get"][test_id][0]["text"]
async def run_eval(test_item, search_result):
test_eval= await eval_test(query=test_item["question"], expected_output=test_item["answer"],
context=str(search_result))
test_eval = await eval_test(
query=test_item["question"],
expected_output=test_item["answer"],
context=str(search_result),
)
return test_eval
async def run_generate_test_set( test_id):
async def run_generate_test_set(test_id):
test_class = test_id + "_class"
# await memory.add_dynamic_memory_class(test_id.lower(), test_id)
dynamic_memory_class = getattr(memory, test_class.lower(), None)
print(dynamic_memory_class)
retrieve_action = await memory.dynamic_method_call(dynamic_memory_class, 'fetch_memories',
observation="Generate a short summary of this document",
search_type="generative")
retrieve_action = await memory.dynamic_method_call(
dynamic_memory_class,
"fetch_memories",
observation="Generate a short summary of this document",
search_type="generative",
)
return dynamic_test_manager(retrieve_action)
test_eval_pipeline =[]
test_eval_pipeline = []
if retriever_type == "llm_context":
for test_qa in test_set:
context=""
context = ""
logging.info("Loading and evaluating test set for LLM context")
test_result = await run_eval(test_qa, context)
test_eval_pipeline.append(test_result)
elif retriever_type == "single_document_context":
if test_set:
logging.info(
"Loading and evaluating test set for a single document context"
)
await run_load_test_element(
loader_settings, metadata, test_id, test_set
)
for test_qa in test_set:
result = await run_search_element(test_qa, test_id)
test_result = await run_eval(test_qa, result)
test_eval_pipeline.append(test_result)
await memory.dynamic_method_call(
dynamic_memory_class, "delete_memories", namespace=test_id
)
else:
pass
if generate_test_set is True:
synthetic_test_set = run_generate_test_set(test_id)
else:
pass
if test_set:
logging.info("Loading and evaluating test set")
await run_load_test_element(loader_settings, metadata, test_id, test_set)
for test_qa in test_set:
result = await run_search_element(test_qa, test_id)
test_result = await run_eval(test_qa, result)
test_eval_pipeline.append( test_result)
else:
pass
await memory.dynamic_method_call(dynamic_memory_class, 'delete_memories',
namespace=test_id)
return test_id, test_eval_pipeline
results = []
if retriever_type == "llm_context":
test_id, result = await run_test(test=None, loader_settings=loader_settings, metadata=metadata,
retriever_type=retriever_type) # No params for this case
test_id, result = await run_test(
test=None,
loader_settings=loader_settings,
metadata=metadata,
retriever_type=retriever_type,
) # No params for this case
results.append([result, "No params"])
elif retriever_type == "single_document_context":
for param in test_params:
test_id, result = await run_test(param, loader_settings, metadata, retriever_type=retriever_type) # Add the params to the result
test_id, result = await run_test(
param, loader_settings, metadata, retriever_type=retriever_type
) # Add the params to the result
results.append([result, param])
for b, r in results:
print("Here is the result", r)
for result_list in b:
for result in result_list:
print("Here is one result", result)
await add_entity(session, TestOutput(
id=test_id,
test_set_id=test_set_id,
operation_id=job_id,
set_id=str(uuid.uuid4()),
user_id=user_id,
test_results=result['success'],
test_score=str(result['score']),
test_metric_name=result['metric_name'],
test_query=result['query'],
test_output=result['output'],
test_expected_output=str(['expected_output']),
test_context=result['context'][0],
test_params=str(r) # Add params to the database table
))
await add_entity(
session,
TestOutput(
id=test_id,
test_set_id=test_set_id,
operation_id=job_id,
set_id=str(uuid.uuid4()),
user_id=user_id,
test_results=result["success"],
test_score=str(result["score"]),
test_metric_name=result["metric_name"],
test_query=result["query"],
test_output=result["output"],
test_expected_output=str(["expected_output"]),
test_context=result["context"][0],
test_params=str(r), # Add params to the database table
),
)
return results
async def main():
async def main():
metadata = {
"version": "1.0",
"agreement_id": "AG123456",
@ -457,28 +585,32 @@ async def main():
test_set = [
{
"question": "Who is the main character in 'The Call of the Wild'?",
"answer": "Buck"
},
{
"question": "Who wrote 'The Call of the Wild'?",
"answer": "Jack London"
"answer": "Buck",
},
{"question": "Who wrote 'The Call of the Wild'?", "answer": "Jack London"},
{
"question": "Where does Buck live at the start of the book?",
"answer": "In the Santa Clara Valley, at Judge Millers place."
"answer": "In the Santa Clara Valley, at Judge Millers place.",
},
{
"question": "Why is Buck kidnapped?",
"answer": "He is kidnapped to be sold as a sled dog in the Yukon during the Klondike Gold Rush."
"answer": "He is kidnapped to be sold as a sled dog in the Yukon during the Klondike Gold Rush.",
},
{
"question": "How does Buck become the leader of the sled dog team?",
"answer": "Buck becomes the leader after defeating the original leader, Spitz, in a fight."
}
"answer": "Buck becomes the leader after defeating the original leader, Spitz, in a fight.",
},
]
# "https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf"
#http://public-library.uk/ebooks/59/83.pdf
result = await start_test(".data/3ZCCCW.pdf", test_set=test_set, user_id="677", params=None, metadata=metadata, retriever_type='llm_context')
# http://public-library.uk/ebooks/59/83.pdf
result = await start_test(
".data/3ZCCCW.pdf",
test_set=test_set,
user_id="677",
params=None,
metadata=metadata,
retriever_type="llm_context",
)
#
# parser = argparse.ArgumentParser(description="Run tests against a document.")
# parser.add_argument("--url", required=True, help="URL of the document to test.")
@ -520,8 +652,9 @@ async def main():
# params = None
# #clean up params here
# await start_test(args.url, test_set, args.user_id, params=None, metadata=metadata)
if __name__ == "__main__":
if __name__ == "__main__":
asyncio.run(main())
# delete_mems = await memory.dynamic_method_call(dynamic_memory_class, 'delete_memories',
@ -541,4 +674,4 @@ if __name__ == "__main__":
# results = await asyncio.gather(
# *(run_testo(test, loader_settings, metadata) for test in test_params)
# )
# return results
# return results

View file

@ -6,7 +6,9 @@ logging.basicConfig(level=logging.INFO)
import marvin
from dotenv import load_dotenv
from sqlalchemy.orm import sessionmaker
from database.database import engine # Ensure you have database engine defined somewhere
from database.database import (
engine,
) # Ensure you have database engine defined somewhere
from models.user import User
from models.memory import MemoryModel
from models.sessions import Session
@ -14,6 +16,7 @@ from models.testset import TestSet
from models.testoutput import TestOutput
from models.metadatas import MetaDatas
from models.operation import Operation
load_dotenv()
import ast
import tracemalloc
@ -29,7 +32,7 @@ load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
marvin.settings.openai.api_key = os.environ.get("OPENAI_API_KEY")
from vectordb.basevectordb import BaseMemory
from vectordb.basevectordb import BaseMemory
class DynamicBaseMemory(BaseMemory):
@ -41,7 +44,7 @@ class DynamicBaseMemory(BaseMemory):
index_name: str,
db_type: str,
namespace: str,
embeddings=None
embeddings=None,
):
super().__init__(user_id, memory_id, index_name, db_type, namespace, embeddings)
self.name = name
@ -50,7 +53,6 @@ class DynamicBaseMemory(BaseMemory):
self.inheritance = None
self.associations = []
async def add_method(self, method_name):
"""
Add a method to the memory class.
@ -102,6 +104,7 @@ class DynamicBaseMemory(BaseMemory):
# Optionally, establish a bidirectional association
associated_memory.associations.append(self)
class Attribute:
def __init__(self, name):
"""
@ -115,6 +118,7 @@ class Attribute:
"""
self.name = name
class Method:
def __init__(self, name):
"""
@ -130,9 +134,17 @@ class Method:
class Memory:
def __init__(self, user_id: str = "676", session=None, index_name: str = None,
knowledge_source: str = None, knowledge_type: str = None,
db_type: str = "weaviate", namespace: str = None, memory_id:str =None) -> None:
def __init__(
self,
user_id: str = "676",
session=None,
index_name: str = None,
knowledge_source: str = None,
knowledge_type: str = None,
db_type: str = "weaviate",
namespace: str = None,
memory_id: str = None,
) -> None:
self.load_environment_variables()
self.memory_id = memory_id
self.user_id = user_id
@ -145,12 +157,16 @@ class Memory:
self.short_term_memory = None
self.namespace = namespace
self.memory_instances = []
#inspect and fix this
self.memory_class = DynamicBaseMemory('Memory', user_id, str(self.memory_id), index_name, db_type, namespace)
# inspect and fix this
self.memory_class = DynamicBaseMemory(
"Memory", user_id, str(self.memory_id), index_name, db_type, namespace
)
def load_environment_variables(self) -> None:
load_dotenv()
self.OPENAI_TEMPERATURE = float(os.getenv("OPENAI_TEMPERATURE", 0.0))
self.OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
@classmethod
async def create_memory(cls, user_id: str, session, **kwargs):
"""
@ -160,14 +176,17 @@ class Memory:
existing_user = await cls.check_existing_user(user_id, session)
if existing_user:
# Handle existing user scenario...
memory_id = await cls.check_existing_memory(user_id, session)
logging.info(f"Existing user {user_id} found in the DB. Memory ID: {memory_id}")
logging.info(
f"Existing user {user_id} found in the DB. Memory ID: {memory_id}"
)
else:
# Handle new user scenario...
memory_id = await cls.handle_new_user(user_id, session)
logging.info(f"New user {user_id} created in the DB. Memory ID: {memory_id}")
logging.info(
f"New user {user_id} created in the DB. Memory ID: {memory_id}"
)
return cls(user_id=user_id, session=session, memory_id=memory_id, **kwargs)
@ -181,9 +200,7 @@ class Memory:
@staticmethod
async def check_existing_user(user_id: str, session):
"""Check if a user exists in the DB and return it."""
result = await session.execute(
select(User).where(User.id == user_id)
)
result = await session.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
@staticmethod
@ -198,21 +215,42 @@ class Memory:
async def handle_new_user(user_id: str, session):
"""Handle new user creation in the DB and return the new memory ID."""
#handle these better in terms of retry and error handling
# handle these better in terms of retry and error handling
memory_id = str(uuid.uuid4())
new_user = User(id=user_id)
await add_entity(session, new_user)
memory = MemoryModel(id=memory_id, user_id=user_id, methods_list=str(['Memory', 'SemanticMemory', 'EpisodicMemory']),
attributes_list=str(['user_id', 'index_name', 'db_type', 'knowledge_source', 'knowledge_type', 'memory_id', 'long_term_memory', 'short_term_memory', 'namespace']))
memory = MemoryModel(
id=memory_id,
user_id=user_id,
methods_list=str(["Memory", "SemanticMemory", "EpisodicMemory"]),
attributes_list=str(
[
"user_id",
"index_name",
"db_type",
"knowledge_source",
"knowledge_type",
"memory_id",
"long_term_memory",
"short_term_memory",
"namespace",
]
),
)
await add_entity(session, memory)
return memory_id
async def add_memory_instance(self, memory_class_name: str):
"""Add a new memory instance to the memory_instances list."""
instance = DynamicBaseMemory(memory_class_name, self.user_id,
self.memory_id, self.index_name,
self.db_type, self.namespace)
instance = DynamicBaseMemory(
memory_class_name,
self.user_id,
self.memory_id,
self.index_name,
self.db_type,
self.namespace,
)
print("The following instance was defined", instance)
self.memory_instances.append(instance)
@ -228,8 +266,6 @@ class Memory:
if existing_user:
print(f"ID before query: {self.memory_id}, type: {type(self.memory_id)}")
# attributes_list = await self.session.query(MemoryModel.attributes_list).filter_by(id=self.memory_id[0]).scalar()
attributes_list = await self.query_method()
logging.info(f"Attributes list: {attributes_list}")
@ -239,10 +275,17 @@ class Memory:
else:
logging.warning("attributes_list is None!")
else:
attributes_list = ['user_id', 'index_name', 'db_type',
'knowledge_source', 'knowledge_type',
'memory_id', 'long_term_memory',
'short_term_memory', 'namespace']
attributes_list = [
"user_id",
"index_name",
"db_type",
"knowledge_source",
"knowledge_type",
"memory_id",
"long_term_memory",
"short_term_memory",
"namespace",
]
await self.handle_attributes(attributes_list)
async def handle_attributes(self, attributes_list):
@ -259,49 +302,66 @@ class Memory:
# methods_list = await self.session.query(MemoryModel.methods_list).filter_by(id=self.memory_id).scalar()
methods_list = await self.session.execute(
select(MemoryModel.methods_list).where(MemoryModel.id == self.memory_id[0])
select(MemoryModel.methods_list).where(
MemoryModel.id == self.memory_id[0]
)
)
methods_list = methods_list.scalar_one_or_none()
methods_list = ast.literal_eval(methods_list)
else:
# Define default methods for a new user
methods_list = [
'async_create_long_term_memory', 'async_init', 'add_memories', "fetch_memories", "delete_memories",
'async_create_short_term_memory', '_create_buffer_context', '_get_task_list',
'_run_main_buffer', '_available_operations', '_provide_feedback'
"async_create_long_term_memory",
"async_init",
"add_memories",
"fetch_memories",
"delete_memories",
"async_create_short_term_memory",
"_create_buffer_context",
"_get_task_list",
"_run_main_buffer",
"_available_operations",
"_provide_feedback",
]
# Apply methods to memory instances
for class_instance in self.memory_instances:
for method in methods_list:
class_instance.add_method(method)
async def dynamic_method_call(self, dynamic_base_memory_instance, method_name: str, *args, **kwargs):
async def dynamic_method_call(
self, dynamic_base_memory_instance, method_name: str, *args, **kwargs
):
if method_name in dynamic_base_memory_instance.methods:
method = getattr(dynamic_base_memory_instance, method_name, None)
if method:
return await method(*args, **kwargs)
raise AttributeError(f"{dynamic_base_memory_instance.name} object has no attribute {method_name}")
raise AttributeError(
f"{dynamic_base_memory_instance.name} object has no attribute {method_name}"
)
async def add_dynamic_memory_class(self, class_name: str, namespace: str):
logging.info("Here is the memory id %s", self.memory_id[0])
new_memory_class = DynamicBaseMemory(class_name, self.user_id, self.memory_id[0], self.index_name,
self.db_type, namespace)
new_memory_class = DynamicBaseMemory(
class_name,
self.user_id,
self.memory_id[0],
self.index_name,
self.db_type,
namespace,
)
setattr(self, f"{class_name.lower()}_class", new_memory_class)
return new_memory_class
async def add_attribute_to_class(self, class_instance, attribute_name: str):
#add this to database for a particular user and load under memory id
# add this to database for a particular user and load under memory id
await class_instance.add_attribute(attribute_name)
async def add_method_to_class(self, class_instance, method_name: str):
#add this to database for a particular user and load under memory id
# add this to database for a particular user and load under memory id
await class_instance.add_method(method_name)
async def main():
# if you want to run the script as a standalone script, do so with the examples below
# memory = Memory(user_id="TestUser")
# await memory.async_init()
@ -318,10 +378,10 @@ async def main():
"validity_start": "2023-08-01",
"validity_end": "2024-07-31",
}
loader_settings = {
"format": "PDF",
"source": "URL",
"path": "https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf"
loader_settings = {
"format": "PDF",
"source": "URL",
"path": "https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf",
}
# memory_instance = Memory(namespace='SEMANTICMEMORY')
# sss = await memory_instance.dynamic_method_call(memory_instance.semantic_memory_class, 'fetch_memories', observation='some_observation')
@ -330,26 +390,28 @@ async def main():
from database.database import AsyncSessionLocal
async with session_scope(AsyncSessionLocal()) as session:
memory = await Memory.create_memory("676", session, namespace='SEMANTICMEMORY')
memory = await Memory.create_memory("676", session, namespace="SEMANTICMEMORY")
# Adding a memory instance
await memory.add_memory_instance("ExampleMemory")
# Managing memory attributes
existing_user = await Memory.check_existing_user("676", session)
print("here is the existing user", existing_user)
await memory.manage_memory_attributes(existing_user)
# aeehuvyq_semanticememory_class
await memory.add_dynamic_memory_class('semanticmemory', 'SEMANTICMEMORY')
await memory.add_method_to_class(memory.semanticmemory_class, 'add_memories')
await memory.add_method_to_class(memory.semanticmemory_class, 'fetch_memories')
await memory.add_dynamic_memory_class("semanticmemory", "SEMANTICMEMORY")
await memory.add_method_to_class(memory.semanticmemory_class, "add_memories")
await memory.add_method_to_class(memory.semanticmemory_class, "fetch_memories")
# sss = await memory.dynamic_method_call(memory.semanticmemory_class, 'add_memories',
# observation='some_observation', params=params, loader_settings=loader_settings)
susu = await memory.dynamic_method_call(memory.semanticmemory_class, 'fetch_memories',
observation='some_observation')
susu = await memory.dynamic_method_call(
memory.semanticmemory_class,
"fetch_memories",
observation="some_observation",
)
print(susu)
# Adding a dynamic memory class
@ -360,16 +422,14 @@ async def main():
# memory_instance.add_method_to_class(procedural_memory_class, 'add_memories')
#
# print(sss)
# load_jack_london = await memory._add_semantic_memory(observation = "bla", loader_settings=loader_settings, params=params)
# print(load_jack_london)
modulator = {"relevance": 0.1, "frequency": 0.1}
modulator = {"relevance": 0.1, "frequency": 0.1}
if __name__ == "__main__":
import asyncio
asyncio.run(main())