Merge pull request #14 from topoteretes/added_backend_pg_and_dynamic_memory_classes

Added backend pg and dynamic memory classes
This commit is contained in:
Vasilije 2023-10-08 21:33:23 +02:00 committed by GitHub
commit c984610fac
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
53 changed files with 8628 additions and 180 deletions

View file

@ -1,180 +0,0 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"ticketType": {
"type": "string",
"enum": ["online ticket", "ICE ticket"]
},
"departureDate": {
"type": "string",
"format": "date"
},
"priceType": {
"type": "string",
"enum": ["Flex price (single journey)"]
},
"class": {
"type": "integer",
"enum": [1]
},
"adult": {
"type": "object",
"properties": {
"quantity": {
"type": "integer"
},
"BC50": {
"type": "integer"
}
},
"required": ["quantity", "BC50"]
},
"journey": {
"type": "object",
"properties": {
"from": {
"type": "string"
},
"to": {
"type": "string"
},
"via": {
"type": "string"
},
"train": {
"type": "string",
"enum": ["ICE"]
}
},
"required": ["from", "to", "via", "train"]
},
"refundPolicy": {
"type": "string"
},
"payment": {
"type": "object",
"properties": {
"items": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"quantity": {
"type": "integer"
},
"price": {
"type": "number"
},
"vat19": {
"type": "number"
},
"vat7": {
"type": "number"
}
},
"required": ["name", "quantity", "price", "vat19", "vat7"]
}
},
"total": {
"type": "number"
},
"method": {
"type": "string",
"enum": ["credit card"]
},
"transactionDetails": {
"type": "object",
"properties": {
"amount": {
"type": "number"
},
"VUNumber": {
"type": "integer"
},
"transactionNumber": {
"type": "integer"
},
"date": {
"type": "string",
"format": "date"
},
"genNumber": {
"type": "string"
}
},
"required": ["amount", "VUNumber", "transactionNumber", "date", "genNumber"]
}
},
"required": ["items", "total", "method", "transactionDetails"]
},
"bookingDetails": {
"type": "object",
"properties": {
"bookingDate": {
"type": "string",
"format": "date-time"
},
"bookingAddress": {
"type": "string"
},
"taxNumber": {
"type": "string"
}
},
"required": ["bookingDate", "bookingAddress", "taxNumber"]
},
"journeyDetails": {
"type": "object",
"properties": {
"validFrom": {
"type": "string",
"format": "date"
},
"passengerName": {
"type": "string"
},
"orderNumber": {
"type": "string"
},
"stops": {
"type": "array",
"items": {
"type": "object",
"properties": {
"stop": {
"type": "string"
},
"date": {
"type": "string",
"format": "date"
},
"time": {
"type": "string",
"format": "time"
},
"track": {
"type": "integer"
},
"product": {
"type": "string"
},
"reservation": {
"type": "string"
}
},
"required": ["stop", "date", "time", "track", "product", "reservation"]
}
}
},
"required": ["validFrom", "passengerName", "orderNumber", "stops"]
},
"usageNotes": {
"type": "string"
}
},
"required": ["ticketType", "departureDate", "priceType", "class", "adult", "journey", "refundPolicy", "payment", "bookingDetails", "journeyDetails", "usageNotes"]
}

3
level_3/.env.template Normal file
View file

@ -0,0 +1,3 @@
OPENAI_API_KEY=sk
WEAVIATE_URL =
WEAVIATE_API_KEY =

36
level_3/Dockerfile Normal file
View file

@ -0,0 +1,36 @@
FROM python:3.11-slim
# Set build argument
ARG API_ENABLED
# Set environment variable based on the build argument
ENV API_ENABLED=${API_ENABLED} \
PIP_NO_CACHE_DIR=true
ENV PATH="${PATH}:/root/.poetry/bin"
RUN pip install poetry
WORKDIR /app
COPY pyproject.toml poetry.lock /app/
# Install the dependencies
RUN poetry config virtualenvs.create false && \
poetry install --no-root --no-dev
RUN apt-get update -q && \
apt-get install curl zip jq netcat-traditional -y -q
RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && \
unzip -qq awscliv2.zip && ./aws/install && \
apt-get clean && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
#RUN playwright install
#RUN playwright install-deps
WORKDIR /app
COPY . /app
COPY entrypoint.sh /app/entrypoint.sh
RUN chmod +x /app/entrypoint.sh
ENTRYPOINT ["/app/entrypoint.sh"]

146
level_3/Readme.md Normal file
View file

@ -0,0 +1,146 @@
## PromethAI Memory Manager
### Description
Initial code lets you do three operations:
1. Add to memory
2. Retrieve from memory
3. Structure the data to schema
4. Load to a database
#How to use
## Installation
```docker compose build promethai_mem ```
## Run
```docker compose up promethai_mem ```
## Clean database
```docker compose down promethai_mem ```
```docker volume prune ```
docker compose up --force-recreate --build promethai_mem
## Usage
The fast API endpoint accepts prompts and stores data with the help of the Memory Manager
The types of memory are: Episodic, Semantic, Buffer
Endpoint Overview
The Memory API provides the following endpoints:
- /[memory_type]/add-memory (POST)
- /[memory_type]/fetch-memory (POST)
- /[memory_type]/delete-memory (POST)
- /available-buffer-actions (GET)
- /run-buffer (POST)
- /buffer/create-context (POST)
## How To Get Started
1. We do a post request to add-memory endpoint with the following payload:
It will upload Jack London "Call of the Wild" to SEMANTIC memory
```
curl -X POST http://localhost:8000/semantic/add-memory -H "Content-Type: application/json" -d '{
"payload": {
"user_id": "681",
"prompt": "I am adding docs",
"params": {
"version": "1.0",
"agreement_id": "AG123456",
"privacy_policy": "https://example.com/privacy",
"terms_of_service": "https://example.com/terms",
"format": "json",
"schema_version": "1.1",
"checksum": "a1b2c3d4e5f6",
"owner": "John Doe",
"license": "MIT",
"validity_start": "2023-08-01",
"validity_end": "2024-07-31"
},
"loader_settings": {
"format": "PDF",
"source": "url",
"path": "https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf"
}
}
}'
```
2. We run the buffer with the prompt "I want to know how does Buck adapt to life in the wild and then have that info translated to German "
```
curl -X POST http://localhost:8000/run-buffer -H "Content-Type: application/json" -d '{
"payload": {
"user_id": "681",
"prompt": "I want to know how does Buck adapt to life in the wild and then have that info translated to German ",
"params": {
"version": "1.0",
"agreement_id": "AG123456",
"privacy_policy": "https://example.com/privacy",
"terms_of_service": "https://example.com/terms",
"format": "json",
"schema_version": "1.1",
"checksum": "a1b2c3d4e5f6",
"owner": "John Doe",
"license": "MIT",
"validity_start": "2023-08-01",
"validity_end": "2024-07-31"
},
"attention_modulators": {
"relevance": 0.0,
"saliency": 0.1
}
}
}'
```
Other attention modulators that could be implemented:
"frequency": 0.5,
"repetition": 0.5,
"length": 0.5,
"position": 0.5,
"context": 0.5,
"emotion": 0.5,
"sentiment": 0.5,
"perspective": 0.5,
"style": 0.5,
"grammar": 0.5,
"spelling": 0.5,
"logic": 0.5,
"coherence": 0.5,
"cohesion": 0.5,
"plausibility": 0.5,
"consistency": 0.5,
"informativeness": 0.5,
"specificity": 0.5,
"detail": 0.5,
"accuracy": 0.5,
"topicality": 0.5,
"focus": 0.5,
"clarity": 0.5,
"simplicity": 0.5,
"naturalness": 0.5,
"fluency": 0.5,
"variety": 0.5,
"vividness": 0.5,
"originality": 0.5,
"creativity": 0.5,
"humor": 0.5,

0
level_3/__init__.py Normal file
View file

265
level_3/api.py Normal file
View file

@ -0,0 +1,265 @@
import logging
import os
from typing import Dict, Any
import uvicorn
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from vectorstore_manager import Memory
from dotenv import load_dotenv
# Set up logging
logging.basicConfig(
level=logging.INFO, # Set the logging level (e.g., DEBUG, INFO, WARNING, ERROR, CRITICAL)
format="%(asctime)s [%(levelname)s] %(message)s", # Set the log message format
)
logger = logging.getLogger(__name__)
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
app = FastAPI(debug=True)
from auth.cognito.JWTBearer import JWTBearer
from auth.auth import jwks
auth = JWTBearer(jwks)
from fastapi import Depends
class ImageResponse(BaseModel):
success: bool
message: str
@app.get(
"/",
)
async def root():
"""
Root endpoint that returns a welcome message.
"""
return {"message": "Hello, World, I am alive!"}
@app.get("/health")
def health_check():
"""
Health check endpoint that returns the server status.
"""
return {"status": "OK"}
class Payload(BaseModel):
payload: Dict[str, Any]
def memory_factory(memory_type):
load_dotenv()
class Payload(BaseModel):
payload: Dict[str, Any]
@app.post("/{memory_type}/add-memory", response_model=dict)
async def add_memory(
payload: Payload,
# files: List[UploadFile] = File(...),
):
try:
logging.info(" Init PDF processing")
decoded_payload = payload.payload
Memory_ = Memory(user_id=decoded_payload["user_id"])
await Memory_.async_init()
memory_class = getattr(Memory_, f"_add_{memory_type}_memory", None)
output = await memory_class(
observation=decoded_payload["prompt"],
loader_settings=decoded_payload["loader_settings"],
params=decoded_payload["params"],
)
return JSONResponse(content={"response": output}, status_code=200)
except Exception as e:
return JSONResponse(
content={"response": {"error": str(e)}}, status_code=503
)
@app.post("/{memory_type}/fetch-memory", response_model=dict)
async def fetch_memory(
payload: Payload,
# files: List[UploadFile] = File(...),
):
try:
decoded_payload = payload.payload
Memory_ = Memory(user_id=decoded_payload["user_id"])
await Memory_.async_init()
memory_class = getattr(Memory_, f"_fetch_{memory_type}_memory", None)
output = memory_class(observation=decoded_payload["prompt"])
return JSONResponse(content={"response": output}, status_code=200)
except Exception as e:
return JSONResponse(
content={"response": {"error": str(e)}}, status_code=503
)
@app.post("/{memory_type}/delete-memory", response_model=dict)
async def delete_memory(
payload: Payload,
# files: List[UploadFile] = File(...),
):
try:
decoded_payload = payload.payload
Memory_ = Memory(user_id=decoded_payload["user_id"])
await Memory_.async_init()
memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None)
output = memory_class(observation=decoded_payload["prompt"])
return JSONResponse(content={"response": output}, status_code=200)
except Exception as e:
return JSONResponse(
content={"response": {"error": str(e)}}, status_code=503
)
memory_list = ["episodic", "buffer", "semantic"]
for memory_type in memory_list:
memory_factory(memory_type)
@app.get("/available-buffer-actions", response_model=dict)
async def available_buffer_actions(
payload: Payload,
# files: List[UploadFile] = File(...),
):
try:
decoded_payload = payload.payload
Memory_ = Memory(user_id=decoded_payload["user_id"])
await Memory_.async_init()
# memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None)
output = await Memory_._available_operations()
return JSONResponse(content={"response": output}, status_code=200)
except Exception as e:
return JSONResponse(content={"response": {"error": str(e)}}, status_code=503)
@app.post("/run-buffer", response_model=dict)
async def run_buffer(
payload: Payload,
# files: List[UploadFile] = File(...),
):
try:
decoded_payload = payload.payload
Memory_ = Memory(user_id=decoded_payload["user_id"])
await Memory_.async_init()
# memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None)
output = await Memory_._run_main_buffer(
user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"]
)
return JSONResponse(content={"response": output}, status_code=200)
except Exception as e:
return JSONResponse(content={"response": {"error": str(e)}}, status_code=503)
@app.post("/buffer/create-context", response_model=dict)
async def create_context(
payload: Payload,
# files: List[UploadFile] = File(...),
):
try:
decoded_payload = payload.payload
Memory_ = Memory(user_id=decoded_payload["user_id"])
await Memory_.async_init()
# memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None)
output = await Memory_._create_buffer_context(
user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"]
)
return JSONResponse(content={"response": output}, status_code=200)
except Exception as e:
return JSONResponse(content={"response": {"error": str(e)}}, status_code=503)
@app.post("/buffer/get-tasks", response_model=dict)
async def create_context(
payload: Payload,
# files: List[UploadFile] = File(...),
):
try:
decoded_payload = payload.payload
Memory_ = Memory(user_id=decoded_payload["user_id"])
await Memory_.async_init()
# memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None)
output = await Memory_._get_task_list(
user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"]
)
return JSONResponse(content={"response": output}, status_code=200)
except Exception as e:
return JSONResponse(content={"response": {"error": str(e)}}, status_code=503)
@app.post("/buffer/provide-feedback", response_model=dict)
async def provide_feedback(
payload: Payload,
# files: List[UploadFile] = File(...),
):
try:
decoded_payload = payload.payload
Memory_ = Memory(user_id=decoded_payload["user_id"])
await Memory_.async_init()
# memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None)
if decoded_payload["total_score"] is None:
output = await Memory_._provide_feedback(
user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=None, total_score=decoded_payload["total_score"]
)
return JSONResponse(content={"response": output}, status_code=200)
else:
output = await Memory_._provide_feedback(
user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"], total_score=None
)
return JSONResponse(content={"response": output}, status_code=200)
except Exception as e:
return JSONResponse(content={"response": {"error": str(e)}}, status_code=503)
def start_api_server(host: str = "0.0.0.0", port: int = 8000):
"""
Start the API server using uvicorn.
Parameters:
host (str): The host for the server.
port (int): The port for the server.
"""
try:
logger.info(f"Starting server at {host}:{port}")
uvicorn.run(app, host=host, port=port)
except Exception as e:
logger.exception(f"Failed to start server: {e}")
# Here you could add any cleanup code or error recovery code.
if __name__ == "__main__":
start_api_server()

0
level_3/auth/__init__.py Normal file
View file

35
level_3/auth/auth.py Normal file
View file

@ -0,0 +1,35 @@
import os
import requests
from dotenv import load_dotenv
from fastapi import Depends, HTTPException
from starlette.status import HTTP_403_FORBIDDEN
from auth.cognito.JWTBearer import JWKS, JWTBearer, JWTAuthorizationCredentials
load_dotenv() # Automatically load environment variables from a '.env' file.
# jwks = JWKS.parse_obj(
# requests.get(
# f"https://cognito-idp.{os.environ.get('eu-west-1:46372257029')}.amazonaws.com/"
# f"{os.environ.get('eu-west-1_3VUqKzMgj')}/.well-known/jwks.json"
# ).json()
# )
# Construct the Cognito User Pool URL using the correct syntax
region = "eu-west-1"
user_pool_id = "eu-west-1_viUyNCqKp"
cognito_url = f"https://cognito-idp.{region}.amazonaws.com/{user_pool_id}/.well-known/jwks.json"
# Fetch the JWKS using the updated URL
jwks = JWKS.parse_obj(requests.get(cognito_url).json())
auth = JWTBearer(jwks)
async def get_current_user(
credentials: JWTAuthorizationCredentials = Depends(auth)
) -> str:
try:
return credentials.claims["username"]
except KeyError:
HTTPException(status_code=HTTP_403_FORBIDDEN, detail="Username missing")

View file

@ -0,0 +1,60 @@
from cognito.JWTBearer import JWKS, JWTBearer, JWTAuthorizationCredentials
import os
import boto3
import requests
region = "eu-west-1"
user_pool_id = "" #needed
cognito_url = f"https://cognito-idp.{region}.amazonaws.com/{user_pool_id}/.well-known/jwks.json"
# Fetch the JWKS using the updated URL
jwks = JWKS.parse_obj(requests.get(cognito_url).json())
print(jwks)
auth = JWTBearer(jwks)
# Set the Cognito authentication endpoint URL
auth = JWTBearer(jwks)
# Set the user credentials
username = os.getenv("COGNITO_USERNAME")
password = os.getenv("COGNITO_PASSWORD")
# Create the authentication payload
payload = {
"username": username,
"password": password
}
# Set the Cognito authentication endpoint URL
# Set the Cognito token endpoint URL
token_endpoint = f"https://your-cognito-domain.auth.{region}.amazoncognito.com/oauth2/token"
# Set the client credentials
client_id = os.getenv("AWS_CLIENT_ID")
client_secret = os.getenv("AWS_CLIENT_SECRET")
def authenticate_and_get_token(username: str, password: str,
user_pool_id: str, app_client_id: str) -> None:
client = boto3.client('cognito-idp')
resp = client.admin_initiate_auth(
UserPoolId=user_pool_id,
ClientId=app_client_id,
AuthFlow='ADMIN_NO_SRP_AUTH',
AuthParameters={
"USERNAME": username,
"PASSWORD": password
}
)
print("Log in success")
# print("Access token:", resp['AuthenticationResult']['AccessToken'])
# print("ID token:", resp['AuthenticationResult']['IdToken'])
authenticate_and_get_token(username, password, user_pool_id, client_id)

View file

@ -0,0 +1,72 @@
from typing import Dict, Optional, List
from fastapi import HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, jwk, JWTError
from jose.utils import base64url_decode
from pydantic import BaseModel
from starlette.requests import Request
from starlette.status import HTTP_403_FORBIDDEN
JWK = Dict[str, str]
class JWKS(BaseModel):
keys: List[JWK]
class JWTAuthorizationCredentials(BaseModel):
jwt_token: str
header: Dict[str, str]
claims: Dict[str, str]
signature: str
message: str
class JWTBearer(HTTPBearer):
def __init__(self, jwks: JWKS, auto_error: bool = True):
super().__init__(auto_error=auto_error)
self.kid_to_jwk = {jwk["kid"]: jwk for jwk in jwks.keys}
def verify_jwk_token(self, jwt_credentials: JWTAuthorizationCredentials) -> bool:
try:
public_key = self.kid_to_jwk[jwt_credentials.header["kid"]]
except KeyError:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="JWK public key not found"
)
key = jwk.construct(public_key)
decoded_signature = base64url_decode(jwt_credentials.signature.encode())
return key.verify(jwt_credentials.message.encode(), decoded_signature)
async def __call__(self, request: Request) -> Optional[JWTAuthorizationCredentials]:
credentials: HTTPAuthorizationCredentials = await super().__call__(request)
if credentials:
if not credentials.scheme == "Bearer":
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Wrong authentication method"
)
jwt_token = credentials.credentials
message, signature = jwt_token.rsplit(".", 1)
try:
jwt_credentials = JWTAuthorizationCredentials(
jwt_token=jwt_token,
header=jwt.get_unverified_header(jwt_token),
claims=jwt.get_unverified_claims(jwt_token),
signature=signature,
message=message,
)
except JWTError:
raise HTTPException(status_code=HTTP_403_FORBIDDEN, detail="JWK invalid")
if not self.verify_jwk_token(jwt_credentials):
raise HTTPException(status_code=HTTP_403_FORBIDDEN, detail="JWK invalid")
return jwt_credentials

View file

View file

View file

View file

@ -0,0 +1,151 @@
# Make sure to install the following packages: dlt, langchain, duckdb, python-dotenv, openai, weaviate-client
import json
from typing import Any
import logging
logging.basicConfig(level=logging.INFO)
import marvin
from deep_translator import GoogleTranslator
from dotenv import load_dotenv
from langchain.agents import initialize_agent, AgentType
from langchain.output_parsers import PydanticOutputParser
from langchain.tools import tool
from pydantic import parse_obj_as
load_dotenv()
from langchain import OpenAI
from langchain.chat_models import ChatOpenAI
from typing import Optional, Dict, List, Union
import tracemalloc
tracemalloc.start()
import os
from datetime import datetime
from langchain import PromptTemplate
from langchain.chains.openai_functions import create_structured_output_chain
from langchain.prompts import HumanMessagePromptTemplate, ChatPromptTemplate
from pydantic import BaseModel, Field
from dotenv import load_dotenv
from langchain.schema import SystemMessage, HumanMessage
import uuid
load_dotenv()
async def main_buffer(
self, user_input=None, params=None, attention_modulators=None
):
"""AI buffer to run the AI agent to execute the set of tasks"""
document_context_result_parsed = await self.buffer_context(
user_input=user_input,
params=params,
attention_modulators=attention_modulators,
)
tasks_list = await self.get_task_list(
user_input=user_input,
params=params,
attention_modulators=attention_modulators
)
result_tasks = []
document_context_result_parsed = document_context_result_parsed.dict()
document_from_vectorstore = [doc["document_content"] for doc in document_context_result_parsed["docs"]]
for task in tasks_list:
print("HERE IS THE TASK", task)
complete_agent_prompt = f" Document context is: {document_from_vectorstore} \n Task is : {task['task_order']} {task['task_name']} {task['operation']} "
# task['vector_store_context_results']=document_context_result_parsed.dict()
class FetchText(BaseModel):
observation: str = Field(description="observation we want to translate")
@tool("fetch_from_vector_store", args_schema=FetchText, return_direct=True)
def fetch_from_vector_store(observation, args_schema=FetchText):
"""Fetch from vectorstore if data doesn't exist in the context"""
if document_context_result_parsed:
return document_context_result_parsed
else:
out = self.fetch_memories(observation['original_query'], namespace="SEMANTICMEMORY")
return out
class TranslateText(BaseModel):
observation: str = Field(description="observation we want to translate")
@tool("translate_to_de", args_schema=TranslateText, return_direct=True)
def translate_to_de(observation, args_schema=TranslateText):
"""Translate to English"""
out = GoogleTranslator(source="auto", target="de").translate(
text=observation
)
return out
agent = initialize_agent(
llm=self.llm,
tools=[fetch_from_vector_store, translate_to_de],
agent=AgentType.OPENAI_FUNCTIONS,
verbose=True,
)
output = agent.run(input=complete_agent_prompt)
result_tasks.append(task)
result_tasks.append(output)
# buffer_result = await self.fetch_memories(observation=str(user_input))
class EpisodicTask(BaseModel):
"""Schema for an individual task."""
task_order: str = Field(
..., description="The order at which the task needs to be performed"
)
task_name: str = Field(
None, description="The task that needs to be performed"
)
operation: str = Field(None, description="The operation to be performed")
operation_result: str = Field(
None, description="The result of the operation"
)
class EpisodicList(BaseModel):
"""Schema for the record containing a list of tasks."""
tasks: List[EpisodicTask] = Field(..., description="List of tasks")
start_date: str = Field(
..., description="The order at which the task needs to be performed"
)
end_date: str = Field(
..., description="The order at which the task needs to be performed"
)
user_query: str = Field(
..., description="The order at which the task needs to be performed"
)
attention_modulators: str = Field(..., description="List of attention modulators")
parser = PydanticOutputParser(pydantic_object=EpisodicList)
date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
prompt = PromptTemplate(
template="Format the result.\n{format_instructions}\nOriginal query is: {query}\n Steps are: {steps}, buffer is: {buffer}, date is:{date}, attention modulators are: {attention_modulators} \n",
input_variables=["query", "steps", "buffer", "date", "attention_modulators"],
partial_variables={"format_instructions": parser.get_format_instructions()},
)
_input = prompt.format_prompt(
query=user_input, steps=str(tasks_list)
, buffer=str(result_tasks), date=date, attention_modulators=attention_modulators
)
print("HERE ARE THE STEPS, BUFFER AND DATE", str(tasks_list))
print("here are the result_tasks", str(result_tasks))
# return "a few things to do like load episodic memory in a structured format"
output = self.llm_base(_input.to_string())
result_parsing = parser.parse(output)
lookup_value = await self.add_memories(
observation=str(result_parsing.json()), params=params, namespace='EPISODICMEMORY'
)
# await self.delete_memories()
return result_parsing.json()

View file

@ -0,0 +1,111 @@
import logging
from typing import Any
logging.basicConfig(level=logging.INFO)
from dotenv import load_dotenv
load_dotenv()
from langchain import OpenAI
from langchain.chat_models import ChatOpenAI
from typing import Optional, Dict, List, Union
import tracemalloc
tracemalloc.start()
import os
import uuid
class EpisodicBuffer(DynamicBaseMemory):
def __init__(
self,
user_id: str,
memory_id: Optional[str],
index_name: Optional[str],
db_type: str = "weaviate",
):
super().__init__('EpisodicBuffer',
user_id, memory_id, index_name, db_type, namespace="BUFFERMEMORY"
)
self.st_memory_id = str( uuid.uuid4())
self.llm = ChatOpenAI(
temperature=0.0,
max_tokens=1200,
openai_api_key=os.environ.get("OPENAI_API_KEY"),
model_name="gpt-4-0613",
# callbacks=[MyCustomSyncHandler(), MyCustomAsyncHandler()],
)
self.llm_base = OpenAI(
temperature=0.0,
max_tokens=1200,
openai_api_key=os.environ.get("OPENAI_API_KEY"),
model_name="gpt-4-0613",
)
async def handle_modulator(
self,
modulator_name: str,
attention_modulators: Dict[str, float],
observation: str,
namespace: Optional[str] = None,
memory: Optional[Dict[str, Any]] = None,
) -> Optional[List[Union[str, float]]]:
"""
Handle the given modulator based on the observation and namespace.
Parameters:
- modulator_name: Name of the modulator to handle.
- attention_modulators: Dictionary of modulator values.
- observation: The current observation.
- namespace: An optional namespace.
Returns:
- Result of the modulator if criteria met, else None.
"""
modulator_value = attention_modulators.get(modulator_name, 0.0)
modulator_functions = {
"freshness": lambda obs, ns, mem: self.freshness(observation=obs, namespace=ns, memory=mem),
"frequency": lambda obs, ns, mem: self.frequency(observation=obs, namespace=ns, memory=mem),
"relevance": lambda obs, ns, mem: self.relevance(observation=obs, namespace=ns, memory=mem),
"saliency": lambda obs, ns, mem: self.saliency(observation=obs, namespace=ns, memory=mem),
}
result_func = modulator_functions.get(modulator_name)
if not result_func:
return None
result = await result_func(observation, namespace, memory)
if not result:
return None
try:
logging.info("Modulator %s", modulator_name)
logging.info("Modulator value %s", modulator_value)
logging.info("Result %s", result[0])
if float(result[0]) >= float(modulator_value):
return result
except ValueError:
pass
return None
async def available_operations(self) -> list[str]:
"""Determines what operations are available for the user to process PDFs"""
return [
"retrieve over time",
"save to personal notes",
"translate to german"
# "load to semantic memory",
# "load to episodic memory",
# "load to buffer",
]

View file

@ -0,0 +1,377 @@
import json
from typing import Any
import logging
logging.basicConfig(level=logging.INFO)
import marvin
from deep_translator import GoogleTranslator
from langchain.agents import initialize_agent, AgentType
from langchain.output_parsers import PydanticOutputParser
from langchain.tools import tool
from pydantic import parse_obj_as
from langchain import OpenAI
from langchain.chat_models import ChatOpenAI
from typing import Optional, Dict, List, Union
import tracemalloc
tracemalloc.start()
import os
from datetime import datetime
from langchain import PromptTemplate
from langchain.chains.openai_functions import create_structured_output_chain
from langchain.prompts import HumanMessagePromptTemplate, ChatPromptTemplate
from pydantic import BaseModel, Field
from dotenv import load_dotenv
from langchain.schema import SystemMessage, HumanMessage
import uuid
load_dotenv()
async def buffer_context(
self,
user_input=None,
params=None,
attention_modulators: dict = None,
):
"""Generates the context to be used for the buffer and passed to the agent"""
# we just filter the data here to make sure input is clean
prompt_filter = ChatPromptTemplate.from_template(
"""Filter and remove uneccessary information that is not relevant in the query to
the vector store to get more information, keep it as original as possbile: {query}"""
)
chain_filter = prompt_filter | self.llm
output = await chain_filter.ainvoke({"query": user_input})
# this part is partially done but the idea is to apply different attention modulators
# to the data to fetch the most relevant information from the vector stores
class BufferModulators(BaseModel):
"""Value of buffer modulators"""
frequency: str = Field(..., description="Frequency score of the document")
saliency: str = Field(..., description="Saliency score of the document")
relevance: str = Field(..., description="Relevance score of the document")
description: str = Field(..., description="Latest buffer modulators")
direction: str = Field(..., description="Increase or a decrease of the modulator")
parser = PydanticOutputParser(pydantic_object=BufferModulators)
prompt = PromptTemplate(
template="""Structure the buffer modulators to be used for the buffer. \n
{format_instructions} \nOriginal observation is:
{query}\n """,
input_variables=["query"],
partial_variables={"format_instructions": parser.get_format_instructions()},
)
# check if modulators exist, initialize the modulators if needed
if attention_modulators is None:
# try:
logging.info("Starting with attention mods")
attention_modulators = await self.fetch_memories(observation="Attention modulators",
namespace="BUFFERMEMORY")
logging.info("Attention modulators exist %s", str(attention_modulators))
lookup_value_episodic = await self.fetch_memories(
observation=str(output), namespace="EPISODICMEMORY"
)
# lookup_value_episodic= lookup_value_episodic["data"]["Get"]["EPISODICMEMORY"][0]["text"]
prompt_classify = ChatPromptTemplate.from_template(
"""You are a classifier. Determine if based on the previous query if the user was satisfied with the output : {query}"""
)
json_structure = [{
"name": "classifier",
"description": "Classification indicating if it's output is satisfactory",
"parameters": {
"type": "object",
"properties": {
"classification": {
"type": "boolean",
"description": "The classification true or false"
}
}, "required": ["classification"]}
}]
chain_filter = prompt_classify | self.llm.bind(function_call={"name": "classifier"}, functions=json_structure)
classifier_output = await chain_filter.ainvoke({"query": lookup_value_episodic})
arguments_str = classifier_output.additional_kwargs['function_call']['arguments']
print("This is the arguments string", arguments_str)
arguments_dict = json.loads(arguments_str)
classfier_value = arguments_dict.get('classification', None)
print("This is the classifier value", classfier_value)
if classfier_value:
# adjust the weights of the modulators by adding a positive value
print("Lookup value, episodic", lookup_value_episodic["data"]["Get"]["EPISODICMEMORY"][0]["text"])
prompt_classify = ChatPromptTemplate.from_template(
""" We know we need to increase the classifiers for our AI system. The classifiers are {modulators} The query is: {query}. Which of the classifiers should we decrease? Return just the modulator and desired value"""
)
chain_modulator = prompt_classify | self.llm
classifier_output = await chain_modulator.ainvoke(
{"query": lookup_value_episodic, "modulators": str(attention_modulators)})
print("classifier output 1", classifier_output)
diff_layer = DifferentiableLayer(attention_modulators)
adjusted_modulator = await diff_layer.adjust_weights(classifier_output)
_input = prompt.format_prompt(query=adjusted_modulator)
document_context_result = self.llm_base(_input.to_string())
document_context_result_parsed = parser.parse(document_context_result)
print("Updating with the following weights", str(document_context_result_parsed))
await self.add_memories(observation=str(document_context_result_parsed), params=params,
namespace="BUFFERMEMORY")
else:
# adjust the weights of the modulators by adding a negative value
print("Lookup value, episodic", lookup_value_episodic)
prompt_classify = ChatPromptTemplate.from_template(
""" We know we need to decrease the classifiers for our AI system. The classifiers are {modulators} The query is: {query}. Which of the classifiers should we decrease? Return just the modulator and desired value"""
)
chain_modulator_reduction = prompt_classify | self.llm
classifier_output = await chain_modulator_reduction.ainvoke(
{"query": lookup_value_episodic, "modulators": str(attention_modulators)})
print("classifier output 2", classifier_output)
diff_layer = DifferentiableLayer(attention_modulators)
adjusted_modulator = diff_layer.adjust_weights(classifier_output)
_input = prompt.format_prompt(query=adjusted_modulator)
document_context_result = self.llm_base(_input.to_string())
document_context_result_parsed = parser.parse(document_context_result)
print("Updating with the following weights", str(document_context_result_parsed))
await self.add_memories(observation=str(document_context_result_parsed), params=params,
namespace="BUFFERMEMORY")
# except:
# # initialize the modulators with default values if they are not provided
# print("Starting with default modulators")
# attention_modulators = {
# "freshness": 0.5,
# "frequency": 0.5,
# "relevance": 0.5,
# "saliency": 0.5,
# }
# _input = prompt.format_prompt(query=attention_modulators)
# document_context_result = self.llm_base(_input.to_string())
# document_context_result_parsed = parser.parse(document_context_result)
# await self.add_memories(observation=str(document_context_result_parsed), params=params, namespace="BUFFERMEMORY")
elif attention_modulators:
pass
lookup_value_semantic = await self.fetch_memories(
observation=str(output), namespace="SEMANTICMEMORY"
)
print("This is the lookup value semantic", len(lookup_value_semantic))
context = []
memory_scores = []
async def compute_score_for_memory(memory, output, attention_modulators):
modulators = list(attention_modulators.keys())
total_score = 0
num_scores = 0
individual_scores = {} # Store individual scores with their modulator names
for modulator in modulators:
result = await self.handle_modulator(
modulator_name=modulator,
attention_modulators=attention_modulators,
observation=str(output),
namespace="EPISODICMEMORY",
memory=memory,
)
if result:
score = float(result[0]) # Assuming the first value in result is the score
individual_scores[modulator] = score # Store the score with its modulator name
total_score += score
num_scores += 1
average_score = total_score / num_scores if num_scores else 0
return {
"memory": memory,
"average_score": average_score,
"individual_scores": individual_scores
}
tasks = [
compute_score_for_memory(memory=memory, output=output, attention_modulators=attention_modulators)
for memory in lookup_value_semantic["data"]["Get"]["SEMANTICMEMORY"]
]
memory_scores = await asyncio.gather(*tasks)
# Sort the memories based on their average scores
sorted_memories = sorted(memory_scores, key=lambda x: x["average_score"], reverse=True)[:5]
# Store the sorted memories in the context
context.extend([item for item in sorted_memories])
for item in context:
memory = item.get('memory', {})
text = memory.get('text', '')
prompt_sum = ChatPromptTemplate.from_template(
"""Based on this query: {query} Summarize the following text so it can be best used as a context summary for the user when running query: {text}"""
)
chain_sum = prompt_sum | self.llm
summary_context = await chain_sum.ainvoke({"query": output, "text": text})
item['memory']['text'] = summary_context
print("HERE IS THE CONTEXT", context)
lookup_value_episodic = await self.fetch_memories(
observation=str(output), namespace="EPISODICMEMORY"
)
class Event(BaseModel):
"""Schema for an individual event."""
event_order: str = Field(
..., description="The order at which the task needs to be performed"
)
event_name: str = Field(
None, description="The task that needs to be performed"
)
operation: str = Field(None, description="The operation that was performed")
original_query: str = Field(
None, description="Original user query provided"
)
class EventList(BaseModel):
"""Schema for the record containing a list of events of the user chronologically."""
tasks: List[Event] = Field(..., description="List of tasks")
prompt_filter_chunk = f" Based on available memories {lookup_value_episodic} determine only the relevant list of steps and operations sequentially "
prompt_msgs = [
SystemMessage(
content="You are a world class algorithm for determining what happened in the past and ordering events chronologically."
),
HumanMessage(content="Analyze the following memories and provide the relevant response:"),
HumanMessagePromptTemplate.from_template("{input}"),
HumanMessage(content="Tips: Make sure to answer in the correct format"),
HumanMessage(
content="Tips: Only choose actions that are relevant to the user query and ignore others"
)
]
prompt_ = ChatPromptTemplate(messages=prompt_msgs)
chain = create_structured_output_chain(
EventList, self.llm, prompt_, verbose=True
)
from langchain.callbacks import get_openai_callback
with get_openai_callback() as cb:
episodic_context = await chain.arun(input=prompt_filter_chunk, verbose=True)
print(cb)
print("HERE IS THE EPISODIC CONTEXT", episodic_context)
class BufferModulators(BaseModel):
attention_modulators: Dict[str, float] = Field(..., description="Attention modulators")
class BufferRawContextTerms(BaseModel):
"""Schema for documentGroups"""
semantic_search_term: str = Field(
...,
description="The search term to use to get relevant input based on user query",
)
document_content: str = Field(
None, description="Shortened original content of the document"
)
attention_modulators_list: List[BufferModulators] = Field(
..., description="List of modulators"
)
average_modulator_score: str = Field(None, description="Average modulator score")
class StructuredEpisodicEvents(BaseModel):
"""Schema for documentGroups"""
event_order: str = Field(
...,
description="Order when event occured",
)
event_type: str = Field(
None, description="Type of the event"
)
event_context: List[BufferModulators] = Field(
..., description="Context of the event"
)
class BufferRawContextList(BaseModel):
"""Buffer raw context processed by the buffer"""
docs: List[BufferRawContextTerms] = Field(..., description="List of docs")
events: List[StructuredEpisodicEvents] = Field(..., description="List of events")
user_query: str = Field(..., description="The original user query")
# we structure the data here to make it easier to work with
parser = PydanticOutputParser(pydantic_object=BufferRawContextList)
prompt = PromptTemplate(
template="""Summarize and create semantic search queries and relevant
document summaries for the user query.\n
{format_instructions}\nOriginal query is:
{query}\n Retrieved document context is: {context}. Retrieved memory context is {memory_context}""",
input_variables=["query", "context", "memory_context"],
partial_variables={"format_instructions": parser.get_format_instructions()},
)
_input = prompt.format_prompt(query=user_input, context=str(context), memory_context=str(episodic_context))
document_context_result = self.llm_base(_input.to_string())
document_context_result_parsed = parser.parse(document_context_result)
# print(document_context_result_parsed)
return document_context_result_parsed
async def get_task_list(
self, user_input=None, params=None, attention_modulators=None,
):
"""Gets the task list from the document context result to enchance it and be able to pass to the agent"""
list_of_operations = await self.available_operations()
class Task(BaseModel):
"""Schema for an individual task."""
task_order: str = Field(
..., description="The order at which the task needs to be performed"
)
task_name: str = Field(
None, description="The task that needs to be performed"
)
operation: str = Field(None, description="The operation to be performed")
original_query: str = Field(
None, description="Original user query provided"
)
class TaskList(BaseModel):
"""Schema for the record containing a list of tasks."""
tasks: List[Task] = Field(..., description="List of tasks")
prompt_filter_chunk = f" Based on available operations {list_of_operations} determine only the relevant list of steps and operations sequentially based {user_input}"
prompt_msgs = [
SystemMessage(
content="You are a world class algorithm for decomposing prompts into steps and operations and choosing relevant ones"
),
HumanMessage(content="Decompose based on the following prompt and provide relevant document context reponse:"),
HumanMessagePromptTemplate.from_template("{input}"),
HumanMessage(content="Tips: Make sure to answer in the correct format"),
HumanMessage(
content="Tips: Only choose actions that are relevant to the user query and ignore others"
)
]
prompt_ = ChatPromptTemplate(messages=prompt_msgs)
chain = create_structured_output_chain(
TaskList, self.llm, prompt_, verbose=True
)
from langchain.callbacks import get_openai_callback
with get_openai_callback() as cb:
output = await chain.arun(input=prompt_filter_chunk, verbose=True)
print(cb)
# output = json.dumps(output)
my_object = parse_obj_as(TaskList, output)
print("HERE IS THE OUTPUT", my_object.json())
data = json.loads(my_object.json())
# Extract the list of tasks
tasks_list = data["tasks"]
return tasks_list

View file

View file

@ -0,0 +1,239 @@
import numpy as np
# Make sure to install the following packages: dlt, langchain, duckdb, python-dotenv, openai, weaviate-client
import json
from enum import Enum
from io import BytesIO
from typing import Dict, List, Union, Any
import logging
logging.basicConfig(level=logging.INFO)
import marvin
import requests
from deep_translator import GoogleTranslator
from dotenv import load_dotenv
from langchain.agents import initialize_agent, AgentType
from langchain.document_loaders import PyPDFLoader
from langchain.output_parsers import PydanticOutputParser
from langchain.retrievers import WeaviateHybridSearchRetriever
from langchain.tools import tool
from marvin import ai_classifier
from pydantic import parse_obj_as
from weaviate.gql.get import HybridFusion
import numpy as np
load_dotenv()
from langchain import OpenAI
from langchain.chat_models import ChatOpenAI
from typing import Optional, Dict, List, Union
import tracemalloc
tracemalloc.start()
import os
from datetime import datetime
from langchain import PromptTemplate
from langchain.chains.openai_functions import create_structured_output_chain
from langchain.prompts import HumanMessagePromptTemplate, ChatPromptTemplate
from langchain.embeddings.openai import OpenAIEmbeddings
from pydantic import BaseModel, Field
from dotenv import load_dotenv
from langchain.schema import Document, SystemMessage, HumanMessage
import uuid
import humanize
import weaviate
class DifferentiableLayer:
def __init__(self, attention_modulators: dict):
self.weights = {modulator: 1.0 for modulator in attention_modulators}
self.learning_rate = 0.1
self.regularization_lambda = 0.01
self.weight_decay = 0.99
async def adjust_weights(self, feedbacks: list[float]):
"""
Adjusts the weights of the attention modulators based on user feedbacks.
Parameters:
- feedbacks: A list of feedback scores (between 0 and 1).
"""
avg_feedback = np.mean(feedbacks)
feedback_diff = 1.0 - avg_feedback
# Adjust weights based on average feedback
for modulator in self.weights:
self.weights[modulator] += self.learning_rate * (-feedback_diff) - self.regularization_lambda * \
self.weights[modulator]
self.weights[modulator] *= self.weight_decay
# Decaying the learning rate
self.learning_rate *= 0.99
async def get_weights(self):
return self.weights
async def _summarizer(self, text: str, document:str, max_tokens: int = 1200):
"""Summarize text using OpenAI API, to reduce amount of code for modulators contributing to context"""
class Summaries(BaseModel):
"""Schema for documentGroups"""
summary: str = Field(
...,
description="Summarized document")
class SummaryContextList(BaseModel):
"""Buffer raw context processed by the buffer"""
summaries: List[Summaries] = Field(..., description="List of summaries")
observation: str = Field(..., description="The original user query")
parser = PydanticOutputParser(pydantic_object=SummaryContextList)
prompt = PromptTemplate(
template=" \n{format_instructions}\nSummarize the observation briefly based on the user query, observation is: {query}\n. The document is: {document}",
input_variables=["query", "document"],
partial_variables={"format_instructions": parser.get_format_instructions()},
)
_input = prompt.format_prompt(query=text, document=document)
document_context_result = self.llm_base(_input.to_string())
document_context_result_parsed = parser.parse(document_context_result)
document_context_result_parsed = json.loads(document_context_result_parsed.json())
document_summary = document_context_result_parsed["summaries"][0]["summary"]
return document_summary
async def memory_route(self, text_time_diff: str):
@ai_classifier
class MemoryRoute(Enum):
"""Represents classifer for freshness of memories"""
data_uploaded_now = "1"
data_uploaded_very_recently = "0.9"
data_uploaded_recently = "0.7"
data_uploaded_more_than_a_month_ago = "0.5"
data_uploaded_more_than_three_months_ago = "0.3"
data_uploaded_more_than_six_months_ago = "0.1"
namespace = MemoryRoute(str(text_time_diff))
return namespace
async def freshness(self, observation: str, namespace: str = None, memory=None) -> list[str]:
"""Freshness - Score between 0 and 1 on how often was the information updated in episodic or semantic memory in the past"""
logging.info("Starting with Freshness")
lookup_value = await self.fetch_memories(
observation=observation, namespace=namespace
)
unix_t = lookup_value["data"]["Get"]["EPISODICMEMORY"][0]["_additional"][
"lastUpdateTimeUnix"
]
# Convert Unix timestamp to datetime
last_update_datetime = datetime.fromtimestamp(int(unix_t) / 1000)
time_difference = datetime.now() - last_update_datetime
time_difference_text = humanize.naturaltime(time_difference)
namespace_ = await self.memory_route(str(time_difference_text))
return [namespace_.value, lookup_value]
async def frequency(self, observation: str, namespace: str, memory) -> list[str]:
"""Frequency - Score between 0 and 1 on how often was the information processed in episodic memory in the past
Counts the number of times a memory was accessed in the past and divides it by the total number of memories in the episodic memory
"""
logging.info("Starting with Frequency")
weaviate_client = self.init_client(namespace=namespace)
result_output = await self.fetch_memories(
observation=observation, params=None, namespace=namespace
)
number_of_relevant_events = len(result_output["data"]["Get"]["EPISODICMEMORY"])
number_of_total_events = (
weaviate_client.query.aggregate(namespace).with_meta_count().do()
)
frequency = float(number_of_relevant_events) / float(
number_of_total_events["data"]["Aggregate"]["EPISODICMEMORY"][0]["meta"][
"count"
]
)
summary = await self._summarizer(text=observation, document=result_output["data"]["Get"]["EPISODICMEMORY"][0])
logging.info("Frequency summary is %s", str(summary))
return [str(frequency), summary]
async def repetition(self, observation: str, namespace: str, memory) -> list[str]:
"""Repetition - Score between 0 and 1 based on how often and at what intervals a memory has been revisited.
Accounts for the spacing effect, where memories accessed at increasing intervals are given higher scores.
# TO DO -> add metadata column to make sure that the access is not equal to update, and run update vector function each time a memory is accessed
"""
logging.info("Starting with Repetition")
result_output = await self.fetch_memories(
observation=observation, params=None, namespace=namespace
)
access_times = result_output["data"]["Get"]["EPISODICMEMORY"][0]["_additional"]["lastUpdateTimeUnix"]
# Calculate repetition score based on access times
if not access_times or len(access_times) == 1:
return ["0", result_output["data"]["Get"]["EPISODICMEMORY"][0]]
# Sort access times
access_times = sorted(access_times)
# Calculate intervals between consecutive accesses
intervals = [access_times[i + 1] - access_times[i] for i in range(len(access_times) - 1)]
# A simple scoring mechanism: Longer intervals get higher scores, as they indicate spaced repetition
repetition_score = sum([1.0 / (interval + 1) for interval in intervals]) / len(intervals)
summary = await self._summarizer(text = observation, document=result_output["data"]["Get"]["EPISODICMEMORY"][0])
logging.info("Repetition is %s", str(repetition_score))
logging.info("Repetition summary is %s", str(summary))
return [str(repetition_score), summary]
async def relevance(self, observation: str, namespace: str, memory) -> list[str]:
"""
Fetches the fusion relevance score for a given observation from the episodic memory.
Learn more about fusion scores here on Weaviate docs: https://weaviate.io/blog/hybrid-search-fusion-algorithms
Parameters:
- observation: The user's query or observation.
- namespace: The namespace for the data.
Returns:
- The relevance score between 0 and 1.
"""
logging.info("Starting with Relevance")
score = memory["_additional"]["score"]
logging.info("Relevance is %s", str(score))
return [score, "fusion score"]
async def saliency(self, observation: str, namespace=None, memory=None) -> list[str]:
"""Determines saliency by scoring the set of retrieved documents against each other and trying to determine saliency
"""
logging.info("Starting with Saliency")
class SaliencyRawList(BaseModel):
"""Schema for documentGroups"""
summary: str = Field(
...,
description="Summarized document")
saliency_score: str = Field(
None, description="The score between 0 and 1")
class SailencyContextList(BaseModel):
"""Buffer raw context processed by the buffer"""
docs: List[SaliencyRawList] = Field(..., description="List of docs")
observation: str = Field(..., description="The original user query")
parser = PydanticOutputParser(pydantic_object=SailencyContextList)
prompt = PromptTemplate(
template="Determine saliency of documents compared to the other documents retrieved \n{format_instructions}\nSummarize the observation briefly based on the user query, observation is: {query}\n",
input_variables=["query"],
partial_variables={"format_instructions": parser.get_format_instructions()},
)
_input = prompt.format_prompt(query=observation)
document_context_result = self.llm_base(_input.to_string())
document_context_result_parsed = parser.parse(document_context_result)
document_context_result_parsed = json.loads(document_context_result_parsed.json())
saliency_score = document_context_result_parsed["docs"][0]["saliency_score"]
saliency_values = document_context_result_parsed["docs"][0]["summary"]
logging.info("Saliency is %s", str(saliency_score))
logging.info("Saliency summary is %s", str(saliency_values))
return [saliency_score, saliency_values]

View file

View file

@ -0,0 +1,13 @@
from setuptools import setup, find_packages
setup(
name='cognitive_memory',
version='0.0.1',
description='Library for cognitive memory in VectorDBs with RAG test framework',
author='Vasilije Markovic',
author_email='vasilije@topoteretes.com',
packages=find_packages(),
install_requires=[
# List your dependencies here
],
)

View file

View file

@ -0,0 +1,59 @@
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager
from sqlalchemy.exc import OperationalError
from time import sleep
import sys
from dotenv import load_dotenv
load_dotenv()
# this is needed to import classes from other modules
script_dir = os.path.dirname(os.path.abspath(__file__))
# Get the parent directory of your script and add it to sys.path
parent_dir = os.path.dirname(script_dir)
sys.path.append(parent_dir)
# in seconds
MAX_RETRIES = 3
RETRY_DELAY = 5
username = os.getenv('POSTGRES_USER')
password = os.getenv('POSTGRES_PASSWORD')
database_name = os.getenv('POSTGRES_DB')
host = os.getenv('POSTGRES_HOST')
SQLALCHEMY_DATABASE_URL = f"postgresql://{username}:{password}@{host}:5432/{database_name}"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
pool_recycle=3600, # recycle connections after 1 hour
pool_pre_ping=True # test the connection for liveness upon each checkout
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
@contextmanager
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def safe_db_operation(db_op, *args, **kwargs):
for attempt in range(MAX_RETRIES):
with get_db() as db:
try:
return db_op(db, *args, **kwargs)
except OperationalError as e:
db.rollback()
if "server closed the connection unexpectedly" in str(e) and attempt < MAX_RETRIES - 1:
sleep(RETRY_DELAY)
else:
raise

View file

@ -0,0 +1,31 @@
version: "3.9"
services:
promethai_mem:
networks:
- promethai_mem_backend
build:
context: ./
volumes:
- "./:/app"
environment:
- HOST=0.0.0.0
profiles: ["exclude-from-up"] # Use `docker-compose run teenage-agi` to get an attached container
ports:
- 8000:8000
- 443:443
postgres:
image: postgres
container_name: postgres
environment:
- POSTGRES_HOST_AUTH_METHOD= trust
- POSTGRES_USER=bla
- POSTGRES_PASSWORD=bla
- POSTGRES_DB=bubu
networks:
- promethai_mem_backend
ports:
- "5432:5432"
networks:
promethai_mem_backend:
name: promethai_mem_backend

7
level_3/entrypoint.sh Executable file
View file

@ -0,0 +1,7 @@
#!/bin/bash
export ENVIRONMENT
python fetch_secret.py
python scripts/create_database.py
# Start Gunicorn
gunicorn -w 2 -k uvicorn.workers.UvicornWorker -t 120 --bind=0.0.0.0:8000 --bind=0.0.0.0:443 --log-level debug api:app

75
level_3/fetch_secret.py Normal file
View file

@ -0,0 +1,75 @@
import os
from dotenv import load_dotenv
from api import start_api_server
# API_ENABLED = os.environ.get("API_ENABLED", "False").lower() == "true"
import boto3
environment = os.getenv("AWS_ENV", "dev")
def fetch_secret(secret_name, region_name, env_file_path):
session = boto3.session.Session()
client = session.client(service_name="secretsmanager", region_name=region_name)
try:
response = client.get_secret_value(SecretId=secret_name)
except Exception as e:
print(f"Error retrieving secret: {e}")
return None
if "SecretString" in response:
secret = response["SecretString"]
else:
secret = response["SecretBinary"]
with open(env_file_path, "w") as env_file:
env_file.write(secret)
if os.path.exists(env_file_path):
print(f"The .env file is located at: {os.path.abspath(env_file_path)}")
load_dotenv()
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY", "")
print("LEN OF PINECONE_API_KEY", len(PINECONE_API_KEY))
else:
print("The .env file was not found.")
return "Success in loading env files"
env_file = ".env"
if os.path.exists(env_file):
# Load default environment variables (.env)
load_dotenv()
print("Talk to the AI!")
else:
secrets = fetch_secret(
f"promethai-{environment}-backend-secretso-promethaijs-dotenv",
"eu-west-1",
".env",
)
if secrets:
print(secrets)
load_dotenv()
# Check if "dev" is present in the task ARN
if "dev" in environment:
# Fetch the secret
secrets = fetch_secret(
f"promethai-dev-backend-secretso-promethaijs-dotenv",
"eu-west-1",
".env",
)
load_dotenv()
elif "prd" in environment:
# Fetch the secret
secrets = fetch_secret(
f"promethai-prd-backend-secretso-promethaijs-dotenv",
"eu-west-1",
".env",
)
load_dotenv()

View file

24
level_3/models/memory.py Normal file
View file

@ -0,0 +1,24 @@
# memory.py
from datetime import datetime
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import relationship
import os
import sys
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from database.database import Base
class MemoryModel(Base):
__tablename__ = 'memories'
id = Column(String, primary_key=True)
user_id = Column(String, ForeignKey('users.id'), index=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, onupdate=datetime.utcnow)
methods_list = Column(String , nullable=True)
attributes_list = Column(String, nullable=True)
user = relationship("User", back_populates="memories")
metadatas = relationship("MetaDatas", back_populates="memory", cascade="all, delete-orphan")
def __repr__(self):
return f"<Memory(id={self.id}, user_id={self.user_id}, created_at={self.created_at}, updated_at={self.updated_at})>"

View file

@ -0,0 +1,26 @@
# metadata.py
from datetime import datetime
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import relationship
import os
import sys
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from database.database import Base
class MetaDatas(Base):
__tablename__ = 'metadatas'
id = Column(String, primary_key=True)
user_id = Column(String, ForeignKey('users.id'), index=True)
version = Column(String, nullable=False)
contract_metadata = Column(String, nullable=False)
memory_id = Column(String, ForeignKey('memories.id'), index=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, onupdate=datetime.utcnow)
user = relationship("User", back_populates="metadatas")
memory = relationship("MemoryModel", back_populates="metadatas")
def __repr__(self):
return f"<MetaData(id={self.id}, version={self.version}, field={self.field}, memory_id={self.memory_id}, created_at={self.created_at}, updated_at={self.updated_at})>"

View file

@ -0,0 +1,25 @@
# operation.py
from datetime import datetime
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import relationship
import os
import sys
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from database.database import Base
class Operation(Base):
__tablename__ = 'operations'
id = Column(String, primary_key=True)
user_id = Column(String, ForeignKey('users.id'), index=True) # Link to User
test_set_id = Column(String, ForeignKey('test_sets.id'), index=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, onupdate=datetime.utcnow)
# Relationships
user = relationship("User", back_populates="operations")
test_set = relationship("TestSet", back_populates="operations")
def __repr__(self):
return f"<Operation(id={self.id}, user_id={self.user_id}, created_at={self.created_at}, updated_at={self.updated_at})>"

View file

@ -0,0 +1,25 @@
# session.py
from datetime import datetime
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import relationship
import os
import sys
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from database.database import Base
class Session(Base):
__tablename__ = 'sessions'
id = Column(String, primary_key=True)
user_id = Column(String, ForeignKey('users.id'), index=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, onupdate=datetime.utcnow)
# Corrected relationship name
user = relationship("User", back_populates="sessions")
# operations = relationship("Operation", back_populates="session", cascade="all, delete-orphan")
def __repr__(self):
return f"<Session(id={self.id}, user_id={self.user_id}, created_at={self.created_at}, updated_at={self.updated_at})>"

View file

@ -0,0 +1,40 @@
# test_output.py
from datetime import datetime
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import relationship
import os
import sys
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from database.database import Base
from datetime import datetime
from sqlalchemy import Column, String, DateTime, ForeignKey
from sqlalchemy.orm import relationship
import os
import sys
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from database.database import Base
class TestOutput(Base):
"""
Represents the output result of a specific test set.
"""
__tablename__ = 'test_outputs'
id = Column(String, primary_key=True)
user_id = Column(String, ForeignKey('users.id'), index=True) # Added user_id field
test_set_id = Column(String, ForeignKey('test_sets.id'), index=True)
operation_id = Column(String, ForeignKey('operations.id'), index=True)
test_results = Column(String, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, onupdate=datetime.utcnow)
# Relationships
user = relationship("User", back_populates="test_outputs") # Added relationship with User
test_set = relationship("TestSet", back_populates="test_outputs")
operation = relationship("Operation", backref="test_outputs")
def __repr__(self):
return f"<TestOutput(id={self.id}, user_id={self.user_id}, test_set_id={self.test_set_id}, operation_id={self.operation_id}, created_at={self.created_at}, updated_at={self.updated_at})>"

27
level_3/models/testset.py Normal file
View file

@ -0,0 +1,27 @@
# test_set.py
from datetime import datetime
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import relationship
import os
import sys
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from database.database import Base
class TestSet(Base):
__tablename__ = 'test_sets'
id = Column(String, primary_key=True)
user_id = Column(String, ForeignKey('users.id'), index=True) # Ensure uniqueness
content = Column(String, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, onupdate=datetime.utcnow)
user = relationship("User", back_populates="test_sets")
operations = relationship("Operation", back_populates="test_set")
test_outputs = relationship("TestOutput", back_populates="test_set", cascade="all, delete-orphan")
def __repr__(self):
return f"<TestSet(id={self.id}, user_id={self.user_id}, created_at={self.created_at}, updated_at={self.updated_at})>"

30
level_3/models/user.py Normal file
View file

@ -0,0 +1,30 @@
# user.py
from datetime import datetime
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base
import os
import sys
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from database.database import Base
class User(Base):
__tablename__ = 'users'
id = Column(String, primary_key=True)
name = Column(String, nullable=False, unique=True, index=True)
session_id = Column(String, nullable=True, unique=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, onupdate=datetime.utcnow)
# Relationships
memories = relationship("MemoryModel", back_populates="user", cascade="all, delete-orphan")
operations = relationship("Operation", back_populates="user", cascade="all, delete-orphan")
sessions = relationship("Session", back_populates="user", cascade="all, delete-orphan")
test_sets = relationship("TestSet", back_populates="user", cascade="all, delete-orphan")
test_outputs = relationship("TestOutput", back_populates="user", cascade="all, delete-orphan")
metadatas = relationship("MetaDatas", back_populates="user")
def __repr__(self):
return f"<User(id={self.id}, name={self.name}, created_at={self.created_at}, updated_at={self.updated_at})>"

4834
level_3/poetry.lock generated Normal file

File diff suppressed because it is too large Load diff

55
level_3/pyproject.toml Normal file
View file

@ -0,0 +1,55 @@
[tool.poetry]
name = "PromethAI_memory"
version = "0.1.0"
description = "PromethAI memory manager"
authors = ["Vasilije Markovic"]
readme = "README.md"
[tool.poetry.dependencies]
python = "^3.10"
#langchain = {git = "https://github.com/topoteretes/langchain.git" , tag = "v0.0.209"}
langchain = "v0.0.303"
nltk = "3.8.1"
openai = "0.27.8"
pinecone-client = "2.2.2"
python-dotenv = "1.0.0"
pyyaml = "6.0"
fastapi = "0.98.0"
uvicorn = "0.22.0"
googlemaps = "4.10.0"
jinja2 = "3.1.2"
replicate = "^0.8.4"
pexpect = "^4.8.0"
selenium = "^4.9.0"
playwright = "^1.32.1"
pytest-playwright = "^0.3.3"
boto3 = "^1.26.125"
gptcache = "^0.1.22"
redis = "^4.5.5"
gunicorn = "^20.1.0"
tiktoken = "^0.4.0"
google-search-results = "^2.4.2"
spacy = "^3.5.3"
python-jose = "^3.3.0"
pypdf = "^3.12.0"
fastjsonschema = "^2.18.0"
marvin = "^1.3.0"
dlt = { version ="^0.3.8", extras = ["duckdb"]}
weaviate-client = "^3.22.1"
python-multipart = "^0.0.6"
deep-translator = "^1.11.4"
humanize = "^4.8.0"
deepeval = "^0.20.1"
pymupdf = "^1.23.3"
psycopg2 = "^2.9.8"
llama-index = "^0.8.39.post2"
llama-hub = "^0.0.34"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

429
level_3/rag_test_manager.py Normal file
View file

@ -0,0 +1,429 @@
from enum import Enum
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from deepeval.metrics.overall_score import OverallScoreMetric
from deepeval.test_case import LLMTestCase
from deepeval.run_test import assert_test, run_test
from gptcache.embedding import openai
from marvin import ai_classifier
from models.sessions import Session
from models.testset import TestSet
from models.testoutput import TestOutput
from models.metadatas import MetaDatas
from models.operation import Operation
from sqlalchemy.orm import sessionmaker
from database.database import engine
from vectorstore_manager import Memory
import uuid
from contextlib import contextmanager
import random
import string
import itertools
import logging
import dotenv
dotenv.load_dotenv()
import openai
logger = logging.getLogger(__name__)
openai.api_key = os.getenv("OPENAI_API_KEY", "")
@contextmanager
def session_scope(session):
"""Provide a transactional scope around a series of operations."""
try:
yield session
session.commit()
except Exception as e:
session.rollback()
logger.error(f"Session rollback due to: {str(e)}")
raise
finally:
session.close()
def retrieve_latest_test_case(session, user_id, memory_id):
"""
Retrieve the most recently created test case from the database filtered by user_id and memory_id.
Parameters:
- session (Session): A database session.
- user_id (int/str): The ID of the user to filter test cases by.
- memory_id (int/str): The ID of the memory to filter test cases by.
Returns:
- Object: The most recent test case attributes filtered by user_id and memory_id, or None if an error occurs.
"""
try:
return (
session.query(TestSet.attributes_list)
.filter_by(user_id=user_id, memory_id=memory_id)
.order_by(TestSet.created_at.desc())
.first()
)
except Exception as e:
logger.error(f"An error occurred while retrieving the latest test case: {str(e)}")
return None
def add_entity(session, entity):
"""
Add an entity (like TestOutput, Session, etc.) to the database.
Parameters:
- session (Session): A database session.
- entity (Base): An instance of an SQLAlchemy model.
Returns:
- str: A message indicating whether the addition was successful.
"""
with session_scope(session):
session.add(entity)
session.commit()
return "Successfully added entity"
def retrieve_job_by_id(session, user_id, job_id):
"""
Retrieve a job by user ID and job ID.
Parameters:
- session (Session): A database session.
- user_id (int/str): The ID of the user.
- job_id (int/str): The ID of the job to retrieve.
Returns:
- Object: The job attributes filtered by user_id and job_id, or None if an error occurs.
"""
try:
return (
session.query(Session.id)
.filter_by(user_id=user_id, id=job_id)
.order_by(Session.created_at.desc())
.first()
)
except Exception as e:
logger.error(f"An error occurred while retrieving the job: {str(e)}")
return None
def fetch_job_id(session, user_id=None, memory_id=None, job_id=None):
try:
return (
session.query(Session.id)
.filter_by(user_id=user_id, id=job_id)
.order_by(Session.created_at.desc())
.first()
)
except Exception as e:
# Handle exceptions as per your application's requirements.
print(f"An error occurred: {str(e)}")
return None
def compare_output(output, expected_output):
"""Compare the output against the expected output."""
pass
def generate_param_variants(base_params=None, increments=None, ranges=None, included_params=None):
"""Generate parameter variants for testing.
Args:
base_params (dict): Base parameter values.
increments (dict): Increment values for each parameter variant.
ranges (dict): Range (number of variants) to generate for each parameter.
included_params (list, optional): Parameters to include in the combinations.
If None, all parameters are included.
Returns:
list: A list of dictionaries containing parameter variants.
"""
# Default base values
defaults = {
'chunk_size': 500,
'chunk_overlap': 20,
'similarity_score': 0.5,
'metadata_variation': 0
}
# Update defaults with provided base parameters
params = {**defaults, **(base_params if base_params is not None else {})}
default_increments = {
'chunk_size': 500,
'chunk_overlap': 10,
'similarity_score': 0.1,
'metadata_variation': 1
}
# Update default increments with provided increments
increments = {**default_increments, **(increments if increments is not None else {})}
# Default ranges
default_ranges = {
'chunk_size': 3,
'chunk_overlap': 3,
'similarity_score': 3,
'metadata_variation': 3
}
# Update default ranges with provided ranges
ranges = {**default_ranges, **(ranges if ranges is not None else {})}
# Generate parameter variant ranges
param_ranges = {
key: [params[key] + i * increments.get(key, 1) for i in range(ranges.get(key, 1))]
for key in ['chunk_size', 'chunk_overlap', 'similarity_score', 'metadata_variation']
}
param_ranges['cognitive_architecture'] = ["simple_index", "cognitive_architecture"]
param_ranges['search_strategy'] = ["similarity_score", "fusion_score"]
# Filter param_ranges based on included_params
if included_params is not None:
param_ranges = {key: val for key, val in param_ranges.items() if key in included_params}
# Generate all combinations of parameter variants
keys = param_ranges.keys()
values = param_ranges.values()
param_variants = [dict(zip(keys, combination)) for combination in itertools.product(*values)]
return param_variants
async def generate_chatgpt_output(query:str, context:str=None):
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "assistant", "content": f"{context}"},
{"role": "user", "content": query}
]
)
llm_output = response.choices[0].message.content
# print(llm_output)
return llm_output
async def eval_test(query=None, output=None, expected_output=None, context=None):
# query = "How does photosynthesis work?"
# output = "Photosynthesis is the process by which green plants and some other organisms use sunlight to synthesize foods with the help of chlorophyll pigment."
# expected_output = "Photosynthesis is the process by which green plants and some other organisms use sunlight to synthesize food with the help of chlorophyll pigment."
# context = "Biology"
result_output = await generate_chatgpt_output(query, context)
test_case = LLMTestCase(
query=query,
output=result_output,
expected_output=expected_output,
context=context,
)
metric = OverallScoreMetric()
# if you want to make sure that the test returns an error
assert_test(test_case, metrics=[metric])
# If you want to run the test
test_result = run_test(test_case, metrics=[metric])
# You can also inspect the test result class
print(test_result)
def data_format_route( data_string: str):
@ai_classifier
class FormatRoute(Enum):
"""Represents classifier for the data format"""
PDF = "PDF"
UNSTRUCTURED_WEB = "UNSTRUCTURED_WEB"
GITHUB = "GITHUB"
TEXT = "TEXT"
CSV = "CSV"
WIKIPEDIA = "WIKIPEDIA"
return FormatRoute(data_string).name
def data_location_route(data_string: str):
@ai_classifier
class LocationRoute(Enum):
"""Represents classifier for the data location"""
DEVICE = "DEVICE"
URL = "URL"
DATABASE = "DATABASE"
return LocationRoute(data_string).name
def dynamic_test_manager(data, test_set=None, user=None, params=None):
from deepeval.dataset import create_evaluation_query_answer_pairs
# fetch random chunks from the document
#feed them to the evaluation pipeline
dataset = create_evaluation_query_answer_pairs(
"Python is a great language for mathematical expression and machine learning.")
return dataset
def generate_letter_uuid(length=8):
"""Generate a random string of uppercase letters with the specified length."""
letters = string.ascii_uppercase # A-Z
return ''.join(random.choice(letters) for _ in range(length))
def fetch_test_set_id(session, user_id, id):
try:
return (
session.query(TestSet.id)
.filter_by(user_id=user_id, id=id)
.order_by(TestSet.created_at.desc())
.first()
)
except Exception as e:
logger.error(f"An error occurred while retrieving the job: {str(e)}")
return None
async def start_test(data, test_set=None, user_id=None, params=None, job_id=None ,metadata=None):
Session = sessionmaker(bind=engine)
session = Session()
job_id = fetch_job_id(session, user_id = user_id,job_id =job_id)
test_set_id = fetch_test_set_id(session, user_id=user_id, id=job_id)
if job_id is None:
job_id = str(uuid.uuid4())
logging.info("we are adding a new job ID")
add_entity(session, Operation(id = job_id, user_id = user_id))
if test_set_id is None:
test_set_id = str(uuid.uuid4())
add_entity(session, TestSet(id = test_set_id, user_id = user_id, content = str(test_set)))
if params is None:
data_format = data_format_route(data)
data_location = data_location_route(data)
test_params = generate_param_variants( included_params=['chunk_size', 'chunk_overlap', 'similarity_score'])
loader_settings = {
"format": f"{data_format}",
"source": f"{data_location}",
"path": "https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf"
}
for test in test_params:
test_id = str(generate_letter_uuid()) + "_" + "SEMANTICEMEMORY"
#handle test data here
Session = sessionmaker(bind=engine)
session = Session()
memory = Memory.create_memory(user_id, session, namespace=test_id)
# Adding a memory instance
memory.add_memory_instance("ExampleMemory")
# Managing memory attributes
existing_user = Memory.check_existing_user(user_id, session)
print("here is the existing user", existing_user)
memory.manage_memory_attributes(existing_user)
test_class = test_id + "_class"
# memory.test_class
memory.add_dynamic_memory_class(test_id.lower(), test_id)
dynamic_memory_class = getattr(memory, test_class.lower(), None)
if dynamic_memory_class is not None:
memory.add_method_to_class(dynamic_memory_class, 'add_memories')
else:
print(f"No attribute named {test_class.lower()} in memory.")
if dynamic_memory_class is not None:
memory.add_method_to_class(dynamic_memory_class, 'fetch_memories')
else:
print(f"No attribute named {test_class.lower()} in memory.")
print(f"Trying to access: {test_class.lower()}")
print("Available memory classes:", memory.list_memory_classes())
print(f"Trying to check: ", test)
loader_settings.update(test)
load_action = await memory.dynamic_method_call(dynamic_memory_class, 'add_memories',
observation='some_observation', params=metadata, loader_settings=loader_settings)
loader_settings = {key: value for key, value in loader_settings.items() if key not in test}
test_result_colletion =[]
for test in test_set:
retrieve_action = await memory.dynamic_method_call(dynamic_memory_class, 'fetch_memories',
observation=test["question"])
test_results = await eval_test( query=test["question"], expected_output=test["answer"], context= str(retrieve_action))
test_result_colletion.append(test_results)
print(test_results)
add_entity(session, TestOutput(id=test_id, user_id=user_id, content=str(test_result_colletion)))
async def main():
params = {
"version": "1.0",
"agreement_id": "AG123456",
"privacy_policy": "https://example.com/privacy",
"terms_of_service": "https://example.com/terms",
"format": "json",
"schema_version": "1.1",
"checksum": "a1b2c3d4e5f6",
"owner": "John Doe",
"license": "MIT",
"validity_start": "2023-08-01",
"validity_end": "2024-07-31",
}
test_set = [
{
"question": "Who is the main character in 'The Call of the Wild'?",
"answer": "Buck"
},
{
"question": "Who wrote 'The Call of the Wild'?",
"answer": "Jack London"
},
{
"question": "Where does Buck live at the start of the book?",
"answer": "In the Santa Clara Valley, at Judge Millers place."
},
{
"question": "Why is Buck kidnapped?",
"answer": "He is kidnapped to be sold as a sled dog in the Yukon during the Klondike Gold Rush."
},
{
"question": "How does Buck become the leader of the sled dog team?",
"answer": "Buck becomes the leader after defeating the original leader, Spitz, in a fight."
}
]
result = await start_test("https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf", test_set=test_set, user_id="666", params=None, metadata=params)
if __name__ == "__main__":
import asyncio
asyncio.run(main())

View file

View file

@ -0,0 +1,65 @@
import sys
import os
# this is needed to import classes from other modules
script_dir = os.path.dirname(os.path.abspath(__file__))
# Get the parent directory of your script and add it to sys.path
parent_dir = os.path.dirname(script_dir)
sys.path.append(parent_dir)
from database.database import Base, engine
import models.memory
import models.metadatas
import models.operation
import models.sessions
import models.testoutput
import models.testset
import models.user
from sqlalchemy import create_engine, text
import psycopg2
from dotenv import load_dotenv
load_dotenv()
def create_admin_engine(username, password, host, database_name):
admin_url = f"postgresql://{username}:{password}@{host}:5432/{database_name}"
return create_engine(admin_url)
def database_exists(username, password, host, db_name):
engine = create_admin_engine(username, password, host, db_name)
connection = engine.connect()
query = text(f"SELECT 1 FROM pg_database WHERE datname='{db_name}'")
result = connection.execute(query).fetchone()
connection.close()
engine.dispose()
return result is not None
def create_database(username, password, host, db_name):
engine = create_admin_engine(username, password, host, db_name)
connection = engine.raw_connection()
connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cursor = connection.cursor()
cursor.execute(f"CREATE DATABASE {db_name}")
cursor.close()
connection.close()
engine.dispose()
def create_tables():
Base.metadata.create_all(bind=engine)
if __name__ == "__main__":
username = os.getenv('POSTGRES_USER')
password = os.getenv('POSTGRES_PASSWORD')
database_name = os.getenv('POSTGRES_DB')
host = os.getenv('POSTGRES_HOST')
if not database_exists(username, password, host, database_name):
print(f"Database {database_name} does not exist. Creating...")
create_database(username, password, host, database_name)
print(f"Database {database_name} created successfully.")
create_tables()

View file

View file

@ -0,0 +1,7 @@
from enum import Enum
class ChunkStrategy(Enum):
EXACT = 'exact'
PARAGRAPH = 'paragraph'
SENTENCE = 'sentence'
VANILLA = 'vanilla'

View file

View file

@ -0,0 +1,38 @@
{
"q1": {
"question": "What does Buck learn from being harnessed and made to work with François?",
"answer": "Buck learns several important lessons from being harnessed and made to work with François. First, he learns the lesson of obedience and the consequences of disobedience. He quickly realizes that François demands instant obedience and uses his whip to enforce it, and that he must listen and follow commands promptly to avoid the whip [16]. Second, Buck learns the value of hard work and the satisfaction it brings. Despite the new and strange nature of the work, Buck learns quickly and makes remarkable progress, earning respect from François [16][19]. Furthermore, Buck learns important skills related to pulling a sled, such as stopping at command ('ho'), going ahead ('mush'), swinging wide on bends, and staying clear of the wheeler when the sled goes downhill. He also learns the dynamics of working in a team and how to respond to the lead dog's instructions [16][19]."
},
"q2": {
"question": "How many chapters does the document have",
"answer": "The document has a total of 7 chapters. The chapter titles are: Into the Primitive [Chapter 1012], The Law of Club and Fang [Chapter 1013], The Dominant Primordial Beast [Chapter 1014], Who Has Won to Mastership [Chapter 1015], The Toil of Trace and Trail [Chapter 1016], For the Love of a Man [Chapter 1017], The Sounding of the Call [Chapter 1018]."
},
"q3": {
"question": "Who kidnapped Buck?",
"answer": "Buck was kidnapped by one of the gardener's helpers named Manuel, who sold him to strangers for a profit."
},
"q4": {
"question": "What is the name of the gardener's helper who kidnapped Buck?",
"answer": "The name of the gardener's helper who kidnapped Buck is Manuel."
},
"q5": {
"question": "Where was Buck taken after being kidnapped?",
"answer": "After being kidnapped, Buck was taken by Manuel through the orchard to a little flag station known as College Park [7a]. Eventually, Buck was thrown into a baggage car of a train, where he remained unconscious until he woke up and watched the man in the red sweater [8] [11a]. The specific location Buck was taken to after being kidnapped is not explicitly mentioned in the given texts."
},
"q6": {
"question": "What is the law of club and fang?",
"answer": "The law of club and fang refers to the harsh and primal rules of survival in the wild, where physical strength and aggression determine dominance and power. The club represents the power wielded by humans over animals. Buck realizes that in order to survive in this new environment, he must adapt and submit to this law. The law of club and fang signifies the brutal and primitive nature of life in the wild."
},
"q7": {
"question": "What is the mother of Buck?",
"answer": "The mother of Buck, the dog in the story 'The Call of the Wild' by Jack London, is a Scottish shepherd dog named Shep."
},
"q8": {
"question": "How did Buck feel after being kidnapped?",
"answer": "After being kidnapped, Buck felt anger and resentment towards his captors [7]. He was initially cooperative but grew angry as he was mistreated and felt violated and vilely treated during his transportation [8]."
},
"q9": {
"question": "Was Buck beaten in captivity?",
"answer": "Yes, Buck was beaten while in captivity. In document snippet [11], Buck rushed at the man who had been tormenting him, and the man delivered a blow that rendered Buck senseless."
}
}

View file

@ -0,0 +1,78 @@
import unittest
import asyncio
import sys
sys.path.append("..") # Adds higher directory to python modules path.
from level_2.level_2_pdf_vectorstore__dlt_contracts import Memory
class TestMemory(unittest.TestCase):
def setUp(self):
self.loop = asyncio.get_event_loop()
self.memory = Memory(user_id="123")
self.loop.run_until_complete(self.memory.async_init())
def test_add_fetch_delete_semantic_memory(self):
async def semantic_workflow():
params = {"sample_param": "value"}
sample_memory = "sample semantic memory"
# Add
await self.memory._add_semantic_memory(sample_memory, params=params)
# Fetch
fetched = await self.memory._fetch_semantic_memory(sample_memory, params)
fetched_text = fetched['data']['Get']['EPISODICMEMORY'][0]['text']
self.assertIn(sample_memory, fetched_text) # Replace this with the appropriate validation
# Delete
await self.memory._delete_semantic_memory()
# Verify Deletion
after_delete = await self.memory._fetch_semantic_memory(sample_memory, params)
self.assertNotIn(sample_memory, after_delete) # Replace with the appropriate validation
self.loop.run_until_complete(semantic_workflow())
def test_add_fetch_delete_episodic_memory(self):
async def episodic_workflow():
params = {"sample_param": "value"}
sample_memory = """{
"sample_key": "sample_value"
}"""
# Add
await self.memory._add_episodic_memory(observation=sample_memory, params=params)
# Fetch
fetched = await self.memory._fetch_episodic_memory(sample_memory)
fetched_text = fetched['data']['Get']['EPISODICMEMORY'][0]['text']
self.assertIn(sample_memory, fetched_text) # Replace this with the appropriate validation
# Delete
await self.memory._delete_episodic_memory()
# Verify Deletion
after_delete = await self.memory._fetch_episodic_memory(sample_memory)
self.assertNotIn(sample_memory, after_delete) # Replace with the appropriate validation
self.loop.run_until_complete(episodic_workflow())
# def test_add_fetch_delete_buffer_memory(self):
# async def buffer_workflow():
# params = {"sample_param": "value"}
# user_input = "sample buffer input"
# namespace = "sample_namespace"
#
# # Add
# await self.memory._add_buffer_memory(user_input=user_input, namespace=namespace, params=params)
# # Fetch
# fetched = await self.memory._fetch_buffer_memory(user_input, namespace)
# self.assertIn(user_input, fetched) # Replace this with the appropriate validation
# # Delete
# await self.memory._delete_buffer_memory()
# # Verify Deletion
# after_delete = await self.memory._fetch_buffer_memory(user_input, namespace)
# self.assertNotIn(user_input, after_delete) # Replace with the appropriate validation
#
# self.loop.run_until_complete(buffer_workflow())
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,125 @@
import os
import openai
from deepeval.metrics.factual_consistency import assert_factual_consistency
import dotenv
dotenv.load_dotenv()
from level_2.level_2_pdf_vectorstore__dlt_contracts import Memory
openai.api_key = os.getenv("OPENAI_API_KEY", "")
from deepeval.metrics.overall_score import assert_overall_score
import json
from deepeval.metrics.overall_score import OverallScoreMetric
# Needs to pass a QA test set
# Needs to separately fetch QA test set from DB
# Needs to have a function to run the tests that contain test set and store results in the DB
async def main():
async def generate_context(query: str='bla', context:str=None):
memory = Memory(user_id="TestUser")
await memory.async_init()
memory_loaded = await memory._fetch_semantic_memory(observation=query, params=None)
if memory_loaded:
return memory_loaded["data"]["Get"]["SEMANTICMEMORY"][0]["text"]
else:
params = {
"version": "1.0",
"agreement_id": "AG123456",
"privacy_policy": "https://example.com/privacy",
"terms_of_service": "https://example.com/terms",
"format": "json",
"schema_version": "1.1",
"checksum": "a1b2c3d4e5f6",
"owner": "John Doe",
"license": "MIT",
"validity_start": "2023-08-01",
"validity_end": "2024-07-31",
}
loader_settings = {
"format": "PDF",
"source": "url",
"path": "https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf"
}
load_jack_london = await memory._add_semantic_memory(observation = query, loader_settings=loader_settings, params=params)
memory_loaded = await memory._fetch_semantic_memory(observation=query, params=None)
return memory_loaded["data"]["Get"]["SEMANTICMEMORY"][0]["text"]
# return load_jack_london
#
# modulator = {"relevance": 0.0, "saliency": 0.0, "frequency": 0.0}
# # #
# run_main_buffer = await memory._create_buffer_context(
# user_input="I want to know how does Buck adapt to life in the wild and then have that info translated to german ",
# params=params,
# attention_modulators=modulator,
# )
async def generate_chatgpt_output(query:str, context:str=None):
if context is None:
context = await generate_context(query=query)
# print(context)
else:
pass
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "assistant", "content": f"{context}"},
{"role": "user", "content": query}
]
)
llm_output = response.choices[0].message.content
# print(llm_output)
return llm_output
with open('base_test_set.json', 'r') as f:
data = json.load(f)
#
async def test_overall_score(query:str, output:str=None, expected_output:str=None, context:str=None, context_type:str=None):
if context_type == "gpt_search":
context = ""
elif context_type == "base_memory_context":
context = await generate_context(query=query)
output = context
elif context_type == "hybrid_search":
context = await generate_context(query=query)
output = await generate_chatgpt_output(query)
elif context_type == "memory_search":
pass
metric = OverallScoreMetric()
score = metric.measure(
query=query,
output=output,
expected_output=expected_output,
context=context
)
print('here is the score', score)
return score
# await generate_chatgpt_output(query=" When was call of the wild written?")
scores = {}
for key, item in data.items():
question = item['question']
expected_ans = item['answer']
values = await test_overall_score(query=question, expected_output=expected_ans, context_type="hybrid_search")
scores[key] = values
print(scores)
if __name__ == "__main__":
import asyncio
asyncio.run(main())

166
level_3/utils.py Normal file
View file

@ -0,0 +1,166 @@
import os
from datetime import datetime
from typing import List
from langchain import PromptTemplate, OpenAI
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
import dotenv
from level_2.level_2_pdf_vectorstore__dlt_contracts import Memory
dotenv.load_dotenv()
llm_base = OpenAI(
temperature=0.0,
max_tokens=1200,
openai_api_key=os.environ.get("OPENAI_API_KEY"),
model_name="gpt-4-0613",
)
async def _add_to_episodic(user_input, tasks_list, result_tasks, attention_modulators, params):
memory = Memory(user_id="TestUser")
await memory.async_init()
class EpisodicTask(BaseModel):
"""Schema for an individual task."""
task_order: str = Field(
..., description="The order at which the task needs to be performed"
)
task_name: str = Field(
None, description="The task that needs to be performed"
)
operation: str = Field(None, description="The operation to be performed")
operation_result: str = Field(
None, description="The result of the operation"
)
class EpisodicList(BaseModel):
"""Schema for the record containing a list of tasks."""
tasks: List[EpisodicTask] = Field(..., description="List of tasks")
start_date: str = Field(
..., description="The order at which the task needs to be performed"
)
end_date: str = Field(
..., description="The order at which the task needs to be performed"
)
user_query: str = Field(
..., description="The order at which the task needs to be performed"
)
attention_modulators: str = Field(..., description="List of attention modulators")
parser = PydanticOutputParser(pydantic_object=EpisodicList)
date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
prompt = PromptTemplate(
template="Format the result.\n{format_instructions}\nOriginal query is: {query}\n Steps are: {steps}, buffer is: {buffer}, date is:{date}, attention modulators are: {attention_modulators} \n",
input_variables=["query", "steps", "buffer", "date", "attention_modulators"],
partial_variables={"format_instructions": parser.get_format_instructions()},
)
_input = prompt.format_prompt(
query=user_input, steps=str(tasks_list)
, buffer=str(result_tasks), date=date, attention_modulators=attention_modulators
)
# return "a few things to do like load episodic memory in a structured format"
output = llm_base(_input.to_string())
result_parsing = parser.parse(output)
lookup_value = await memory._add_episodic_memory(
observation=str(result_parsing.json()), params=params
)
async def add_to_buffer(adjusted_modulator=None, params={}):
memory = Memory(user_id="TestUser")
await memory.async_init()
class BufferModulators(BaseModel):
"""Value of buffer modulators"""
frequency: str = Field(..., description="Frequency score of the document")
saliency: str = Field(..., description="Saliency score of the document")
relevance: str = Field(..., description="Relevance score of the document")
description: str = Field(..., description="Latest buffer modulators")
direction: str = Field(..., description="Increase or a decrease of the modulator")
parser = PydanticOutputParser(pydantic_object=BufferModulators)
prompt = PromptTemplate(
template="""Structure the buffer modulators to be used for the buffer. \n
{format_instructions} \nOriginal observation is:
{query}\n """,
input_variables=["query"],
partial_variables={"format_instructions": parser.get_format_instructions()},
)
_input = prompt.format_prompt(query=adjusted_modulator)
document_context_result = llm_base(_input.to_string())
document_context_result_parsed = parser.parse(document_context_result)
await memory._add_buffer_memory(user_input=str(document_context_result_parsed), params=params)
return document_context_result_parsed.json()
async def delete_from_buffer():
from level_2.level_2_pdf_vectorstore__dlt_contracts import Memory
memory = Memory(user_id="TestUser")
await memory.async_init()
await memory._delete_buffer_memory()
async def delete_from_episodic():
from level_2.level_2_pdf_vectorstore__dlt_contracts import Memory
memory = Memory(user_id="TestUser")
await memory.async_init()
await memory._delete_episodic_memory()
async def get_from_episodic(observation=None):
from level_2.level_2_pdf_vectorstore__dlt_contracts import Memory
memory = Memory(user_id="TestUser")
await memory.async_init()
return await memory._fetch_episodic_memory(observation=observation)
async def get_from_buffer(observation=None):
from level_2.level_2_pdf_vectorstore__dlt_contracts import Memory
memory = Memory(user_id="TestUser")
await memory.async_init()
return await memory._fetch_buffer_memory(user_input=observation)
async def main():
params = {
"version": "1.0",
"agreement_id": "AG123456",
"privacy_policy": "https://example.com/privacy",
"terms_of_service": "https://example.com/terms",
"format": "json",
"schema_version": "1.1",
"checksum": "a1b2c3d4e5f6",
"owner": "John Doe",
"license": "MIT",
"validity_start": "2023-08-01",
"validity_end": "2024-07-31",
}
loader_settings = {
"format": "PDF",
"source": "url",
"path": "https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf"
}
modulator = {"relevance": 1.0, "saliency": 1.0, "frequency": 1.0, "freshness": 1.0, "repetition": 1.0}
user_input = "I want to know how does Buck adapt to life in the wild"
# tasks_list = """tasks": [{"task_order": "1", "task_name": "Fetch Information", "operation": "fetch from vector store", "original_query": "I want to know how does Buck adapt to life in the wild"]"""
out_tasks = """here are the result_tasks [{'task_order': '1', 'task_name': 'Save Information', 'operation': 'save to vector store', 'original_query': 'Add to notes who is Buck and get info saved yesterday about him'}, {'docs': [{'semantic_search_term': "Add to notes who is Buck", 'document_summary': 'Buck was a dog stolen from his home', 'document_relevance': '0.75', 'attention_modulators_list': [{'frequency': '0.33', 'saliency': '0.75', 'relevance': '0.74'}]}], 'user_query': 'I want to know who buck is and check my notes from yesterday'}, {'task_order': '2', 'task_name': 'Check historical data', 'operation': 'check historical data', 'original_query': ' check my notes from yesterday'}, ' Data saved yesterday about Buck include informaton that he was stolen from home and that he was a pretty dog ']"""
# await _add_to_episodic(user_input=user_input, result_tasks=out_tasks, tasks_list=None, attention_modulators=modulator, params=params)
# await delete_from_episodic()
aa = await get_from_episodic(observation="summary")
# await delete_from_buffer()
modulator_changed = {"relevance": 0.9, "saliency": 0.9, "frequency": 0.9}
# await add_to_buffer(adjusted_modulator=modulator_changed)
# aa = await get_from_buffer(observation="summary")
print(aa)
if __name__ == "__main__":
import asyncio
asyncio.run(main())

View file

View file

@ -0,0 +1,232 @@
import logging
from io import BytesIO
import os, sys
# Add the parent directory to sys.path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from vectordb.vectordb import PineconeVectorDB, WeaviateVectorDB
import sqlalchemy as sa
logging.basicConfig(level=logging.INFO)
import marvin
import requests
from dotenv import load_dotenv
from langchain.document_loaders import PyPDFLoader
from langchain.retrievers import WeaviateHybridSearchRetriever
from weaviate.gql.get import HybridFusion
from models.sessions import Session
from models.testset import TestSet
from models.testoutput import TestOutput
from models.metadatas import MetaDatas
from models.operation import Operation
from sqlalchemy.orm import sessionmaker
from database.database import engine
load_dotenv()
from typing import Optional
import time
import tracemalloc
tracemalloc.start()
from datetime import datetime
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.schema import Document
import uuid
import weaviate
from marshmallow import Schema, fields
import json
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
marvin.settings.openai.api_key = os.environ.get("OPENAI_API_KEY")
LTM_MEMORY_ID_DEFAULT = "00000"
ST_MEMORY_ID_DEFAULT = "0000"
BUFFER_ID_DEFAULT = "0000"
class VectorDBFactory:
def create_vector_db(
self,
user_id: str,
index_name: str,
memory_id: str,
ltm_memory_id: str = LTM_MEMORY_ID_DEFAULT,
st_memory_id: str = ST_MEMORY_ID_DEFAULT,
buffer_id: str = BUFFER_ID_DEFAULT,
db_type: str = "pinecone",
namespace: str = None,
):
db_map = {"pinecone": PineconeVectorDB, "weaviate": WeaviateVectorDB}
if db_type in db_map:
return db_map[db_type](
user_id,
index_name,
memory_id,
ltm_memory_id,
st_memory_id,
buffer_id,
namespace,
)
raise ValueError(f"Unsupported database type: {db_type}")
class BaseMemory:
def __init__(
self,
user_id: str,
memory_id: Optional[str],
index_name: Optional[str],
db_type: str,
namespace: str,
):
self.user_id = user_id
self.memory_id = memory_id
self.index_name = index_name
self.namespace = namespace
self.db_type = db_type
factory = VectorDBFactory()
self.vector_db = factory.create_vector_db(
self.user_id,
self.index_name,
self.memory_id,
db_type=self.db_type,
namespace=self.namespace,
)
def init_client(self, namespace: str):
return self.vector_db.init_weaviate_client(namespace)
def create_field(self, field_type, **kwargs):
field_mapping = {
"Str": fields.Str,
"Int": fields.Int,
"Float": fields.Float,
"Bool": fields.Bool,
}
return field_mapping[field_type](**kwargs)
def create_dynamic_schema(self, params):
"""Create a dynamic schema based on provided parameters."""
dynamic_fields = {field_name: fields.Str() for field_name in params.keys()}
# Create a Schema instance with the dynamic fields
dynamic_schema_instance = Schema.from_dict(dynamic_fields)()
return dynamic_schema_instance
async def get_version_from_db(self, user_id, memory_id):
# Logic to retrieve the version from the database.
Session = sessionmaker(bind=engine)
session = Session()
try:
# Querying both fields: contract_metadata and created_at
result = (
session.query(MetaDatas.contract_metadata, MetaDatas.created_at)
.filter_by(user_id=user_id) # using parameter, not self.user_id
.order_by(MetaDatas.created_at.desc())
.first()
)
if result:
version_in_db, created_at = result
logging.info(f"version_in_db: {version_in_db}")
from ast import literal_eval
version_in_db= literal_eval(version_in_db)
version_in_db = version_in_db.get("version")
return [version_in_db, created_at]
else:
return None
finally:
session.close()
async def update_metadata(self, user_id, memory_id, version_in_params, params):
version_from_db = await self.get_version_from_db(user_id, memory_id)
Session = sessionmaker(bind=engine)
session = Session()
# If there is no metadata, insert it.
if version_from_db is None:
session.add(MetaDatas(id = str(uuid.uuid4()), user_id=self.user_id, version = str(int(time.time())) ,memory_id=self.memory_id, contract_metadata=params))
session.commit()
return params
# If params version is higher, update the metadata.
elif version_in_params > version_from_db[0]:
session.add(MetaDatas(id = str(uuid.uuid4()), user_id=self.user_id, memory_id=self.memory_id, contract_metadata=params))
session.commit()
return params
else:
return params
async def add_memories(
self,
observation: Optional[str] = None,
loader_settings: dict = None,
params: Optional[dict] = None,
namespace: Optional[str] = None,
custom_fields: Optional[str] = None,
):
from ast import literal_eval
class DynamicSchema(Schema):
pass
default_version = 'current_timestamp'
version_in_params = params.get("version", default_version)
# Check and update metadata version in DB.
schema_fields = params
def create_field(field_type, **kwargs):
field_mapping = {
"Str": fields.Str,
"Int": fields.Int,
"Float": fields.Float,
"Bool": fields.Bool,
}
return field_mapping[field_type](**kwargs)
# Dynamic Schema Creation
schema_instance = self.create_dynamic_schema(params) # Always creating Str field, adjust as needed
logging.info(f"params : {params}")
# Schema Validation
schema_instance = schema_instance
print("Schema fields: ", [field for field in schema_instance._declared_fields])
loaded_params = schema_instance.load(params)
return await self.vector_db.add_memories(
observation=observation, loader_settings=loader_settings,
params=loaded_params, namespace=namespace, metadata_schema_class = schema_instance
)
# Add other db_type conditions if necessary
async def fetch_memories(
self,
observation: str,
params: Optional[str] = None,
namespace: Optional[str] = None,
n_of_observations: Optional[int] = 2,
):
return await self.vector_db.fetch_memories(
observation=observation, params=params,
namespace=namespace,
n_of_observations=n_of_observations
)
async def delete_memories(self, params: Optional[str] = None):
return await self.vector_db.delete_memories(params)

View file

View file

@ -0,0 +1,85 @@
from langchain.document_loaders import PyPDFLoader
import sys, os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from level_3.shared.chunk_strategy import ChunkStrategy
import re
def chunk_data(chunk_strategy=None, source_data=None, chunk_size=None, chunk_overlap=None):
if chunk_strategy == ChunkStrategy.VANILLA:
chunked_data = vanilla_chunker(source_data, chunk_size, chunk_overlap)
elif chunk_strategy == ChunkStrategy.PARAGRAPH:
chunked_data = chunk_data_by_paragraph(source_data,chunk_size, chunk_overlap)
elif chunk_strategy == ChunkStrategy.SENTENCE:
chunked_data = chunk_by_sentence(source_data, chunk_size, chunk_overlap)
elif chunk_strategy == ChunkStrategy.EXACT:
chunked_data = chunk_data_exact(source_data, chunk_size, chunk_overlap)
else:
chunked_data = vanilla_chunker(source_data, chunk_size, chunk_overlap)
return chunked_data
def vanilla_chunker(source_data, chunk_size=100, chunk_overlap=20):
# adapt this for different chunking strategies
from langchain.text_splitter import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(
# Set a really small chunk size, just to show.
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len
)
pages = text_splitter.create_documents([source_data])
# pages = source_data.load_and_split()
return pages
def chunk_data_exact(data_chunks, chunk_size, chunk_overlap):
data = "".join(data_chunks)
chunks = []
for i in range(0, len(data), chunk_size - chunk_overlap):
chunks.append(data[i:i + chunk_size])
return chunks
def chunk_by_sentence(data_chunks, chunk_size, overlap):
# Split by periods, question marks, exclamation marks, and ellipses
data = "".join(data_chunks)
# The regular expression is used to find series of charaters that end with one the following chaacters (. ! ? ...)
sentence_endings = r'(?<=[.!?…]) +'
sentences = re.split(sentence_endings, data)
sentence_chunks = []
for sentence in sentences:
if len(sentence) > chunk_size:
chunks = chunk_data_exact([sentence], chunk_size, overlap)
sentence_chunks.extend(chunks)
else:
sentence_chunks.append(sentence)
return sentence_chunks
def chunk_data_by_paragraph(data_chunks, chunk_size, overlap, bound=0.75):
data = "".join(data_chunks)
total_length = len(data)
chunks = []
check_bound = int(bound * chunk_size)
start_idx = 0
while start_idx < total_length:
# Set the end index to the minimum of start_idx + default_chunk_size or total_length
end_idx = min(start_idx + chunk_size, total_length)
# Find the next paragraph index within the current chunk and bound
next_paragraph_index = data.find('\n\n', start_idx + check_bound, end_idx)
# If a next paragraph index is found within the current chunk
if next_paragraph_index != -1:
# Update end_idx to include the paragraph delimiter
end_idx = next_paragraph_index + 2
chunks.append(data[start_idx:end_idx + overlap])
# Update start_idx to be the current end_idx
start_idx = end_idx
return chunks

View file

View file

@ -0,0 +1,43 @@
from io import BytesIO
import fitz
# sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from level_3.vectordb.chunkers.chunkers import chunk_data
from llama_hub.file.base import SimpleDirectoryReader
import requests
def _document_loader( observation: str, loader_settings: dict):
# Check the format of the document
document_format = loader_settings.get("format", "text")
loader_strategy = loader_settings.get("strategy", "VANILLA")
chunk_size = loader_settings.get("chunk_size", 500)
chunk_overlap = loader_settings.get("chunk_overlap", 20)
print("LOADER SETTINGS", loader_settings)
if document_format == "PDF":
if loader_settings.get("source") == "URL":
pdf_response = requests.get(loader_settings["path"])
pdf_stream = BytesIO(pdf_response.content)
with fitz.open(stream=pdf_stream, filetype='pdf') as doc:
file_content = ""
for page in doc:
file_content += page.get_text()
pages = chunk_data(chunk_strategy= loader_strategy, source_data=file_content, chunk_size=chunk_size, chunk_overlap=chunk_overlap)
return pages
elif loader_settings.get("source") == "file":
loader = SimpleDirectoryReader('./data', recursive=True, exclude_hidden=True)
documents = loader.load_data()
pages = documents.load_and_split()
return pages
elif document_format == "text":
pages = chunk_data(chunk_strategy= loader_strategy, source_data=observation, chunk_size=chunk_size, chunk_overlap=chunk_overlap)
return pages
else:
raise ValueError(f"Unsupported document format: {document_format}")

View file

@ -0,0 +1,252 @@
# Make sure to install the following packages: dlt, langchain, duckdb, python-dotenv, openai, weaviate-client
import logging
from marshmallow import Schema, fields
from loaders.loaders import _document_loader
# Add the parent directory to sys.path
logging.basicConfig(level=logging.INFO)
from langchain.retrievers import WeaviateHybridSearchRetriever
from weaviate.gql.get import HybridFusion
import tracemalloc
tracemalloc.start()
import os
from langchain.embeddings.openai import OpenAIEmbeddings
from dotenv import load_dotenv
from langchain.schema import Document
import weaviate
load_dotenv()
LTM_MEMORY_ID_DEFAULT = "00000"
ST_MEMORY_ID_DEFAULT = "0000"
BUFFER_ID_DEFAULT = "0000"
class VectorDB:
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
def __init__(
self,
user_id: str,
index_name: str,
memory_id: str,
ltm_memory_id: str = LTM_MEMORY_ID_DEFAULT,
st_memory_id: str = ST_MEMORY_ID_DEFAULT,
buffer_id: str = BUFFER_ID_DEFAULT,
namespace: str = None,
):
self.user_id = user_id
self.index_name = index_name
self.namespace = namespace
self.memory_id = memory_id
self.ltm_memory_id = ltm_memory_id
self.st_memory_id = st_memory_id
self.buffer_id = buffer_id
class PineconeVectorDB(VectorDB):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.init_pinecone(self.index_name)
def init_pinecone(self, index_name):
# Pinecone initialization logic
pass
class WeaviateVectorDB(VectorDB):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.init_weaviate(self.namespace)
def init_weaviate(self, namespace: str):
# Weaviate initialization logic
embeddings = OpenAIEmbeddings()
auth_config = weaviate.auth.AuthApiKey(
api_key=os.environ.get("WEAVIATE_API_KEY")
)
client = weaviate.Client(
url=os.environ.get("WEAVIATE_URL"),
auth_client_secret=auth_config,
additional_headers={"X-OpenAI-Api-Key": os.environ.get("OPENAI_API_KEY")},
)
retriever = WeaviateHybridSearchRetriever(
client=client,
index_name=namespace,
text_key="text",
attributes=[],
embedding=embeddings,
create_schema_if_missing=True,
)
return retriever # If this is part of the initialization, call it here.
def init_weaviate_client(self, namespace: str):
# Weaviate client initialization logic
auth_config = weaviate.auth.AuthApiKey(
api_key=os.environ.get("WEAVIATE_API_KEY")
)
client = weaviate.Client(
url=os.environ.get("WEAVIATE_URL"),
auth_client_secret=auth_config,
additional_headers={"X-OpenAI-Api-Key": os.environ.get("OPENAI_API_KEY")},
)
return client
def _stuct(self, observation, params, metadata_schema_class =None):
"""Utility function to create the document structure with optional custom fields."""
# Construct document data
document_data = {
"metadata": params,
"page_content": observation
}
def get_document_schema():
class DynamicDocumentSchema(Schema):
metadata = fields.Nested(metadata_schema_class, required=True)
page_content = fields.Str(required=True)
return DynamicDocumentSchema
# Validate and deserialize # Default to "1.0" if not provided
CurrentDocumentSchema = get_document_schema()
loaded_document = CurrentDocumentSchema().load(document_data)
return [loaded_document]
async def add_memories(self, observation, loader_settings=None, params=None, namespace=None, metadata_schema_class=None):
# Update Weaviate memories here
if namespace is None:
namespace = self.namespace
retriever = self.init_weaviate(namespace)
if loader_settings:
# Assuming _document_loader returns a list of documents
documents = _document_loader(observation, loader_settings)
logging.info("here are the docs %s", str(documents))
for doc in documents:
document_to_load = self._stuct(doc.page_content, params, metadata_schema_class)
print("here is the doc to load1", document_to_load)
retriever.add_documents([
Document(metadata=document_to_load[0]['metadata'], page_content=document_to_load[0]['page_content'])])
else:
document_to_load = self._stuct(observation, params, metadata_schema_class)
print("here is the doc to load2", document_to_load)
retriever.add_documents([
Document(metadata=document_to_load[0]['metadata'], page_content=document_to_load[0]['page_content'])])
async def fetch_memories(
self, observation: str, namespace: str, params: dict = None, n_of_observations: int = 2
):
"""
Fetch documents from weaviate.
Parameters:
- observation (str): User query.
- namespace (str): Type of memory accessed.
- params (dict, optional): Filtering parameters.
- n_of_observations (int, optional): For weaviate, equals to autocut. Defaults to 2. Ranges from 1 to 3.
Returns:
List of documents matching the query.
Example:
fetch_memories(query="some query", path=['year'], operator='Equal', valueText='2017*')
"""
client = self.init_weaviate_client(self.namespace)
if not namespace:
namespace = self.namespace
params_user_id = {
"path": ["user_id"],
"operator": "Like",
"valueText": self.user_id,
}
def list_objects_of_class(class_name, schema):
return [
prop["name"]
for class_obj in schema["classes"]
if class_obj["class"] == class_name
for prop in class_obj["properties"]
]
base_query = client.query.get(
namespace, list(list_objects_of_class(namespace, client.schema.get()))
).with_additional(
["id", "creationTimeUnix", "lastUpdateTimeUnix", "score", 'distance']
).with_where(params_user_id).with_limit(10)
if params:
query_output = (
base_query
.with_where(params)
.with_near_text({"concepts": [observation]})
.do()
)
else:
query_output = (
base_query
.with_hybrid(
query=observation,
fusion_type=HybridFusion.RELATIVE_SCORE
)
.with_autocut(n_of_observations)
.do()
)
return query_output
async def delete_memories(self, params: dict = None):
client = self.init_weaviate_client(self.namespace)
if params:
where_filter = {
"path": ["id"],
"operator": "Equal",
"valueText": params.get("id", None),
}
return client.batch.delete_objects(
class_name=self.namespace,
# Same `where` filter as in the GraphQL API
where=where_filter,
)
else:
# Delete all objects
print("HERE IS THE USER ID", self.user_id)
return client.batch.delete_objects(
class_name=self.namespace,
where={
"path": ["user_id"],
"operator": "Equal",
"valueText": self.user_id,
},
)
def update_memories(self, observation, namespace: str, params: dict = None):
client = self.init_weaviate_client(self.namespace)
client.data_object.update(
data_object={
# "text": observation,
"user_id": str(self.user_id),
"memory_id": str(self.memory_id),
"ltm_memory_id": str(self.ltm_memory_id),
"st_memory_id": str(self.st_memory_id),
"buffer_id": str(self.buffer_id),
"version": params.get("version", None) or "",
"agreement_id": params.get("agreement_id", None) or "",
"privacy_policy": params.get("privacy_policy", None) or "",
"terms_of_service": params.get("terms_of_service", None) or "",
"format": params.get("format", None) or "",
"schema_version": params.get("schema_version", None) or "",
"checksum": params.get("checksum", None) or "",
"owner": params.get("owner", None) or "",
"license": params.get("license", None) or "",
"validity_start": params.get("validity_start", None) or "",
"validity_end": params.get("validity_end", None) or ""
# **source_metadata,
},
class_name="Test",
uuid=params.get("id", None),
consistency_level=weaviate.data.replication.ConsistencyLevel.ALL, # default QUORUM
)
return

View file

@ -0,0 +1,342 @@
import logging
logging.basicConfig(level=logging.INFO)
import marvin
from dotenv import load_dotenv
from sqlalchemy.orm import sessionmaker
from database.database import engine # Ensure you have database engine defined somewhere
from models.user import User
from models.memory import MemoryModel
from models.sessions import Session
from models.testset import TestSet
from models.testoutput import TestOutput
from models.metadatas import MetaDatas
from models.operation import Operation
load_dotenv()
import ast
import tracemalloc
tracemalloc.start()
import os
from dotenv import load_dotenv
import uuid
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
marvin.settings.openai.api_key = os.environ.get("OPENAI_API_KEY")
from vectordb.basevectordb import BaseMemory
class DynamicBaseMemory(BaseMemory):
def __init__(self, name, user_id, memory_id, index_name, db_type, namespace):
super().__init__(user_id, memory_id, index_name, db_type, namespace)
self.name = name
self.attributes = set()
self.methods = set()
self.inheritance = None
self.associations = []
def add_method(self, method_name):
self.methods.add(method_name)
def add_attribute(self, attribute_name):
self.attributes.add(attribute_name)
def get_attribute(self, attribute_name):
return attribute_name in self.attributes
def add_association(self, associated_memory):
if associated_memory not in self.associations:
self.associations.append(associated_memory)
# Optionally, establish a bidirectional association
associated_memory.associations.append(self)
class Attribute:
def __init__(self, name):
self.name = name
class Method:
def __init__(self, name):
self.name = name
class Memory:
def __init__(self, user_id: str = "676", session=None, index_name: str = None,
knowledge_source: str = None, knowledge_type: str = None,
db_type: str = "weaviate", namespace: str = None, memory_id:str =None) -> None:
self.load_environment_variables()
self.memory_id = memory_id
self.user_id = user_id
self.session = session
self.index_name = index_name
self.db_type = db_type
self.knowledge_source = knowledge_source
self.knowledge_type = knowledge_type
self.long_term_memory = None
self.short_term_memory = None
self.namespace = namespace
self.memory_instances = []
self.memory_class = DynamicBaseMemory('Memory', user_id, str(self.memory_id), index_name, db_type, namespace)
def load_environment_variables(self) -> None:
load_dotenv()
self.OPENAI_TEMPERATURE = float(os.getenv("OPENAI_TEMPERATURE", 0.0))
self.OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
@classmethod
def create_memory(cls, user_id: str, session, **kwargs):
"""
Class method that acts as a factory method for creating Memory instances.
It performs necessary DB checks or updates before instance creation.
"""
existing_user = cls.check_existing_user(user_id, session)
if existing_user:
# Handle existing user scenario...
memory_id = cls.check_existing_memory(user_id, session)
logging.info(f"Existing user {user_id} found in the DB. Memory ID: {memory_id}")
else:
# Handle new user scenario...
memory_id = cls.handle_new_user(user_id, session)
logging.info(f"New user {user_id} created in the DB. Memory ID: {memory_id}")
return cls(user_id=user_id, session=session, memory_id=memory_id, **kwargs)
def list_memory_classes(self):
"""
Lists all available memory classes in the memory instance.
"""
# Use a list comprehension to filter attributes that end with '_class'
return [attr for attr in dir(self) if attr.endswith("_class")]
@staticmethod
def check_existing_user(user_id: str, session):
"""Check if a user exists in the DB and return it."""
return session.query(User).filter_by(id=user_id).first()
@staticmethod
def check_existing_memory(user_id: str, session):
"""Check if a user memory exists in the DB and return it."""
return session.query(MemoryModel.id).filter_by(user_id=user_id).first()
@staticmethod
def handle_new_user(user_id: str, session):
"""Handle new user creation in the DB and return the new memory ID."""
memory_id = str(uuid.uuid4())
new_user = User(id=user_id, name="john doe")
session.add(new_user)
session.commit()
memory = MemoryModel(id=memory_id, user_id=user_id, methods_list=str(['Memory', 'SemanticMemory', 'EpisodicMemory']),
attributes_list=str(['user_id', 'index_name', 'db_type', 'knowledge_source', 'knowledge_type', 'memory_id', 'long_term_memory', 'short_term_memory', 'namespace']))
session.add(memory)
session.commit()
return memory_id
def add_memory_instance(self, memory_class_name: str):
"""Add a new memory instance to the memory_instances list."""
instance = DynamicBaseMemory(memory_class_name, self.user_id,
self.memory_id, self.index_name,
self.db_type, self.namespace)
print("The following instance was defined", instance)
self.memory_instances.append(instance)
def manage_memory_attributes(self, existing_user):
"""Manage memory attributes based on the user existence."""
if existing_user:
print(f"ID before query: {self.memory_id}, type: {type(self.memory_id)}")
attributes_list = self.session.query(MemoryModel.attributes_list).filter_by(id=self.memory_id[0]).scalar()
logging.info(f"Attributes list: {attributes_list}")
if attributes_list is not None:
attributes_list = ast.literal_eval(attributes_list)
self.handle_attributes(attributes_list)
else:
logging.warning("attributes_list is None!")
else:
attributes_list = ['user_id', 'index_name', 'db_type',
'knowledge_source', 'knowledge_type',
'memory_id', 'long_term_memory',
'short_term_memory', 'namespace']
self.handle_attributes(attributes_list)
def handle_attributes(self, attributes_list):
"""Handle attributes for existing memory instances."""
for attr in attributes_list:
self.memory_class.add_attribute(attr)
def manage_memory_methods(self, existing_user):
"""
Manage memory methods based on the user existence.
"""
if existing_user:
# Fetch existing methods from the database
methods_list = self.session.query(MemoryModel.methods_list).filter_by(id=self.memory_id).scalar()
methods_list = ast.literal_eval(methods_list)
else:
# Define default methods for a new user
methods_list = [
'async_create_long_term_memory', 'async_init', 'add_memories', "fetch_memories",
'async_create_short_term_memory', '_create_buffer_context', '_get_task_list',
'_run_main_buffer', '_available_operations', '_provide_feedback'
]
# Apply methods to memory instances
for class_instance in self.memory_instances:
for method in methods_list:
class_instance.add_method(method)
async def dynamic_method_call(self, dynamic_base_memory_instance, method_name: str, *args, **kwargs):
if method_name in dynamic_base_memory_instance.methods:
method = getattr(dynamic_base_memory_instance, method_name, None)
if method:
return await method(*args, **kwargs)
raise AttributeError(f"{dynamic_base_memory_instance.name} object has no attribute {method_name}")
def add_dynamic_memory_class(self, class_name: str, namespace: str):
logging.info("Here is the memory id %s", self.memory_id[0])
new_memory_class = DynamicBaseMemory(class_name, self.user_id, self.memory_id[0], self.index_name,
self.db_type, namespace)
setattr(self, f"{class_name.lower()}_class", new_memory_class)
return new_memory_class
def add_attribute_to_class(self, class_instance, attribute_name: str):
#add this to database for a particular user and load under memory id
class_instance.add_attribute(attribute_name)
def add_method_to_class(self, class_instance, method_name: str):
#add this to database for a particular user and load under memory id
class_instance.add_method(method_name)
async def main():
# if you want to run the script as a standalone script, do so with the examples below
# memory = Memory(user_id="TestUser")
# await memory.async_init()
params = {
"version": "1.0",
"agreement_id": "AG123456",
"privacy_policy": "https://example.com/privacy",
"terms_of_service": "https://example.com/terms",
"format": "json",
"schema_version": "1.1",
"checksum": "a1b2c3d4e5f6",
"owner": "John Doe",
"license": "MIT",
"validity_start": "2023-08-01",
"validity_end": "2024-07-31",
}
loader_settings = {
"format": "PDF",
"source": "url",
"path": "https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf"
}
# memory_instance = Memory(namespace='SEMANTICMEMORY')
# sss = await memory_instance.dynamic_method_call(memory_instance.semantic_memory_class, 'fetch_memories', observation='some_observation')
#
# Create a DB session
Session = sessionmaker(bind=engine)
session = Session()
memory = Memory.create_memory("676", session, namespace='SEMANTICMEMORY')
# Adding a memory instance
memory.add_memory_instance("ExampleMemory")
# Managing memory attributes
existing_user = Memory.check_existing_user("676", session)
print("here is the existing user", existing_user)
memory.manage_memory_attributes(existing_user)
memory.add_dynamic_memory_class('SemanticMemory', 'SEMANTICMEMORY')
memory.add_method_to_class(memory.semanticmemory_class, 'add_memories')
memory.add_method_to_class(memory.semanticmemory_class, 'fetch_memories')
sss = await memory.dynamic_method_call(memory.semanticmemory_class, 'add_memories',
observation='some_observation', params=params)
# susu = await memory.dynamic_method_call(memory.semanticmemory_class, 'fetch_memories',
# observation='some_observation')
# print(susu)
# Adding a dynamic memory class
# dynamic_memory = memory.add_dynamic_memory_class("DynamicMemory", "ExampleNamespace")
# memory_instance = Memory(namespace='PROCEDURALMEMORY', session=session)
# procedural_memory_class = memory_instance.add_dynamic_memory_class('ProceduralMemory', 'PROCEDURALMEMORY')
# memory_instance.add_method_to_class(procedural_memory_class, 'add_memories')
#
# print(sss)
# load_jack_london = await memory._add_semantic_memory(observation = "bla", loader_settings=loader_settings, params=params)
# print(load_jack_london)
modulator = {"relevance": 0.1, "frequency": 0.1}
if __name__ == "__main__":
import asyncio
asyncio.run(main())
# Check for existing user
# existing_user = session.query(User).filter_by(id=user_id).first()
#
# if existing_user:
# self.memory_id = existing_user.memory_id
# existing_memories_classes = session.query(Memory).filter_by(id=user_id).first()
# self.memory_instances = []
#
# for memory in existing_memories_classes:
# instance = DynamicBaseMemory(memory, user_id, self.memory_id, index_name, db_type, namespace)
# self.memory_instances.append(instance)
# else:
# self.memory_id = str(uuid.uuid4())
# new_user = User(id=user_id, memory_id=self.memory_id) # Adjust as per your User model
# session.add(new_user)
# session.commit()
# memory_classes = ['Memory', 'SemanticMemory', 'EpisodicMemory']
# self.memory_instances = []
#
# for memory in memory_classes:
# instance = DynamicBaseMemory(memory, user_id, self.memory_id, index_name, db_type, namespace)
# self.memory_instances.append(instance)
# # fix this so it uploads relationships between memories
# session.add(Memory(id=self.memory_id, user_id=user_id))
# session.commit()
#
# if existing_user:
# attributes_list = session.query(Memory.attributes_list).filter_by(id=self.memory_id).scalar()
# attributes_list = ast.literal_eval(attributes_list)
# for attr in attributes_list:
# self.memory_class.add_attribute(attr)
# methods_list = session.query(Memory.methods_list).filter_by(id=self.memory_id).scalar()
# methods_list = ast.literal_eval(methods_list)
# for class_instance in self.memory_instances:
# # , self.episodic_buffer_class]:
# for method in methods_list:
# class_instance.add_method(method)
# else:
# attributes_list = ['user_id', 'index_name', 'db_type', 'knowledge_source', 'knowledge_type', 'memory_id',
# 'long_term_memory', 'short_term_memory', 'namespace']
# for attr in attributes_list:
# self.memory_class.add_attribute(attr)
# # if old user, fetch attributes from memory table and load them like above
# # if new user, load methods from a list
# methods_list = ['async_create_long_term_memory', 'async_init', 'add_memories', "fetch_memories",
# 'async_create_short_term_memory',
# '_create_buffer_context', '_get_task_list', '_run_main_buffer',
# '_available_operations', '_provide_feedback']
# session.add(Memory(id=self.memory_id, user_id=user_id, methods_list=str(methods_list),
# attributes_list=str(attributes_list)))
# session.commit()
# # if old user, load methods from db
# # if new user, use class inherintance like bellow
# for class_instance in self.memory_instances:
# # , self.episodic_buffer_class]:
# for method in methods_list:
# class_instance.add_method(method)
# # Safely convert string representation to a list