added extended logic

This commit is contained in:
Vasilije 2023-08-23 00:14:40 +02:00
parent 8aa75481e4
commit 39140620f6
2 changed files with 389 additions and 220 deletions

View file

@ -1,3 +1,5 @@
from io import BytesIO
from langchain.document_loaders import PyPDFLoader
from level_2_pdf_vectorstore__dlt_contracts import Memory
@ -27,7 +29,7 @@ from dotenv import load_dotenv
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
app = FastAPI(debug=True)
@ -63,82 +65,73 @@ def health_check():
#curl -X POST -H "Content-Type: application/json" -d '{"data": "YourPayload"}' -F "files=@/path/to/your/pdf/file.pdf" http://127.0.0.1:8000/upload/
from fastapi import FastAPI, UploadFile, File
import requests
import os
import json
app = FastAPI()
from io import BytesIO
class Payload(BaseModel):
payload: Dict[str, Any]
@app.post("/upload/", response_model=dict)
async def upload_pdf_and_payload(
payload: Payload,
# files: List[UploadFile] = File(...),
):
try:
# Process the payload
decoded_payload = payload.payload
# except:
# pass
#
# return JSONResponse(content={"response": decoded_payload}, status_code=200)
# Download the remote PDF if URL is provided
if 'pdf_url' in decoded_payload:
pdf_response = requests.get(decoded_payload['pdf_url'])
pdf_content = pdf_response.content
logging.info("Downloaded PDF from URL")
# Create an in-memory file-like object for the PDF content
pdf_stream = BytesIO(pdf_content)
contents = pdf_stream.read()
tmp_location = os.path.join('/tmp', "tmp.pdf")
with open(tmp_location, 'wb') as tmp_file:
tmp_file.write(contents)
logging.info("Wrote PDF from URL")
# Process the PDF using PyPDFLoader
loader = PyPDFLoader(tmp_location)
pages = loader.load_and_split()
logging.info(" PDF split into pages")
Memory_ = Memory(index_name="my-agent", user_id='555' )
await Memory_.async_init()
Memory_._add_episodic_memory(user_input="I want to get a schema for my data", content =pages)
# Run the buffer
response = Memory_._run_buffer(user_input="I want to get a schema for my data")
return JSONResponse(content={"response": response}, status_code=200)
#to do: add the user id to the payload
#to do add the raw pdf to payload
# bb = await Memory_._run_buffer(user_input=decoded_payload['prompt'])
# print(bb)
except Exception as e:
return {"error": str(e)}
# Here you can perform your processing on the PDF contents
# results.append({"filename": file.filename, "size": len(contents)})
# Append the in-memory file to the files list
# files.append(UploadFile(pdf_stream, filename="downloaded.pdf"))
# @app.post("/upload/", response_model=dict)
# async def upload_pdf_and_payload(
# payload: Payload,
# # files: List[UploadFile] = File(...),
# ):
# try:
# # Process the payload
# decoded_payload = payload.payload
# # except:
# # pass
# #
# # return JSONResponse(content={"response": decoded_payload}, status_code=200)
#
# # Download the remote PDF if URL is provided
# if 'pdf_url' in decoded_payload:
# pdf_response = requests.get(decoded_payload['pdf_url'])
# pdf_content = pdf_response.content
#
# logging.info("Downloaded PDF from URL")
#
# # Create an in-memory file-like object for the PDF content
# pdf_stream = BytesIO(pdf_content)
#
# contents = pdf_stream.read()
#
# tmp_location = os.path.join('/tmp', "tmp.pdf")
# with open(tmp_location, 'wb') as tmp_file:
# tmp_file.write(contents)
#
# logging.info("Wrote PDF from URL")
#
# # Process the PDF using PyPDFLoader
# loader = PyPDFLoader(tmp_location)
# pages = loader.load_and_split()
# logging.info(" PDF split into pages")
# Memory_ = Memory(index_name="my-agent", user_id='555' )
# await Memory_.async_init()
# Memory_._add_episodic_memory(user_input="I want to get a schema for my data", content =pages)
#
#
# # Run the buffer
# response = Memory_._run_buffer(user_input="I want to get a schema for my data")
# return JSONResponse(content={"response": response}, status_code=200)
#
# #to do: add the user id to the payload
# #to do add the raw pdf to payload
# # bb = await Memory_._run_buffer(user_input=decoded_payload['prompt'])
# # print(bb)
#
#
# except Exception as e:
#
# return {"error": str(e)}
# # Here you can perform your processing on the PDF contents
# # results.append({"filename": file.filename, "size": len(contents)})
#
# # Append the in-memory file to the files list
# # files.append(UploadFile(pdf_stream, filename="downloaded.pdf"))
#
def memory_factory(memory_type):
load_dotenv()
class Payload(BaseModel):
payload: Dict[str, Any]
@app.post("/{memory_type}/add-memory", response_model=dict)
@ -148,16 +141,40 @@ def memory_factory(memory_type):
):
try:
logging.info(" Init PDF processing")
decoded_payload = payload.payload
Memory_ = Memory( user_id='555')
if 'pdf_url' in decoded_payload:
pdf_response = requests.get(decoded_payload['pdf_url'])
pdf_content = pdf_response.content
await Memory_.async_init()
logging.info("Downloaded PDF from URL")
memory_class = getattr(Memory_, f"_add_{memory_type}_memory", None)
output= memory_class(observation=decoded_payload['prompt'])
return JSONResponse(content={"response": output}, status_code=200)
# Create an in-memory file-like object for the PDF content
pdf_stream = BytesIO(pdf_content)
contents = pdf_stream.read()
tmp_location = os.path.join('/tmp', "tmp.pdf")
with open(tmp_location, 'wb') as tmp_file:
tmp_file.write(contents)
logging.info("Wrote PDF from URL")
# Process the PDF using PyPDFLoader
loader = PyPDFLoader(tmp_location)
# pages = loader.load_and_split()
logging.info(" PDF split into pages")
Memory_ = Memory(user_id='555')
await Memory_.async_init()
memory_class = getattr(Memory_, f"_add_{memory_type}_memory", None)
output= await memory_class(observation=str(loader))
return JSONResponse(content={"response": output}, status_code=200)
except Exception as e:

View file

@ -1,4 +1,4 @@
#Make sure to install the following packages: dlt, langchain, duckdb, python-dotenv, openai, weaviate-client
# Make sure to install the following packages: dlt, langchain, duckdb, python-dotenv, openai, weaviate-client
import dlt
from langchain import PromptTemplate, LLMChain
@ -15,7 +15,7 @@ from deep_translator import (GoogleTranslator)
from langchain.chat_models import ChatOpenAI
from langchain.schema import LLMResult, HumanMessage
from langchain.callbacks.base import AsyncCallbackHandler, BaseCallbackHandler
from pydantic import BaseModel, Field, parse_obj_as
from langchain.memory import VectorStoreRetrieverMemory
from marvin import ai_classifier
from enum import Enum
@ -29,14 +29,13 @@ from langchain.tools import tool
from langchain.vectorstores import Weaviate
import uuid
from dotenv import load_dotenv
load_dotenv()
from pathlib import Path
from langchain import OpenAI, LLMMathChain
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
import os
from datetime import datetime
@ -56,12 +55,17 @@ from langchain.schema import Document, SystemMessage, HumanMessage
from langchain.vectorstores import Weaviate
import weaviate
import uuid
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
class MyCustomSyncHandler(BaseCallbackHandler):
def on_llm_new_token(self, token: str, **kwargs) -> None:
print(f"Sync handler being called in a `thread_pool_executor`: token: {token}")
class MyCustomAsyncHandler(AsyncCallbackHandler):
"""Async callback handler that can be used to handle callbacks from langchain."""
@ -80,14 +84,17 @@ class MyCustomAsyncHandler(AsyncCallbackHandler):
await asyncio.sleep(0.3)
print("Hi! I just woke up. Your llm is ending")
class VectorDB:
OPENAI_TEMPERATURE = float(os.getenv("OPENAI_TEMPERATURE", 0.0))
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
def __init__(self, user_id: str, index_name: str, memory_id:str, ltm_memory_id:str='00000', st_memory_id:str='0000', buffer_id:str='0000', db_type: str = "pinecone", namespace:str = None):
def __init__(self, user_id: str, index_name: str, memory_id: str, ltm_memory_id: str = '00000',
st_memory_id: str = '0000', buffer_id: str = '0000', db_type: str = "pinecone", namespace: str = None):
self.user_id = user_id
self.index_name = index_name
self.db_type = db_type
self.namespace=namespace
self.namespace = namespace
self.memory_id = memory_id
self.ltm_memory_id = ltm_memory_id
self.st_memory_id = st_memory_id
@ -118,7 +125,7 @@ class VectorDB:
)
return vectorstore
def init_weaviate_client(self, namespace:str):
def init_weaviate_client(self, namespace: str):
embeddings = OpenAIEmbeddings()
auth_config = weaviate.auth.AuthApiKey(api_key=os.environ.get('WEAVIATE_API_KEY'))
client = weaviate.Client(
@ -131,7 +138,7 @@ class VectorDB:
)
return client
def init_weaviate(self, namespace:str):
def init_weaviate(self, namespace: str):
embeddings = OpenAIEmbeddings()
auth_config = weaviate.auth.AuthApiKey(api_key=os.environ.get('WEAVIATE_API_KEY'))
client = weaviate.Client(
@ -152,15 +159,12 @@ class VectorDB:
)
return retriever
async def add_memories(self, observation: str, page: str = "", source: str = ""):
async def add_memories(self, observation: str, params: dict = None):
if self.db_type == "pinecone":
# Update Pinecone memories here
vectorstore: Pinecone = Pinecone.from_existing_index(
index_name=self.index_name, embedding=OpenAIEmbeddings(), namespace=self.namespace
)
retriever = vectorstore.as_retriever()
retriever.add_documents(
[
@ -170,8 +174,17 @@ class VectorDB:
"inserted_at": datetime.now(),
"text": observation,
"user_id": self.user_id,
"page": page,
"source": source
"version": params.get('version', None) or "",
"agreement_id": params.get('agreement_id', None) or "",
"privacy_policy": params.get('privacy_policy', None) or "",
"terms_of_service": params.get('terms_of_service', None) or "",
"format": params.get('format', None) or "",
"schema_version": params.get('schema_version', None) or "",
"checksum": params.get('checksum', None) or "",
"owner": params.get('owner', None) or "",
"license": params.get('license', None) or "",
"validity_start": params.get('validity_start', None) or "",
"validity_end": params.get('validity_end', None) or ""
},
namespace=self.namespace,
)
@ -180,10 +193,9 @@ class VectorDB:
elif self.db_type == "weaviate":
# Update Weaviate memories here
print(self.namespace)
retriever = self.init_weaviate( self.namespace)
retriever = self.init_weaviate(self.namespace)
return retriever.add_documents([
return retriever.add_documents([
Document(
metadata={
"text": observation,
@ -192,18 +204,30 @@ class VectorDB:
"ltm_memory_id": str(self.ltm_memory_id),
"st_memory_id": str(self.st_memory_id),
"buffer_id": str(self.buffer_id),
"version": params.get('version', None) or "",
"agreement_id": params.get('agreement_id', None) or "",
"privacy_policy": params.get('privacy_policy', None) or "",
"terms_of_service": params.get('terms_of_service', None) or "",
"format": params.get('format', None) or "",
"schema_version": params.get('schema_version', None) or "",
"checksum": params.get('checksum', None) or "",
"owner": params.get('owner', None) or "",
"license": params.get('license', None) or "",
"validity_start": params.get('validity_start', None) or "",
"validity_end": params.get('validity_end', None) or ""
# **source_metadata,
},
page_content=observation,
)]
)
# def get_pinecone_vectorstore(self, namespace: str) -> pinecone.VectorStore:
# return Pinecone.from_existing_index(
# index_name=self.index, embedding=OpenAIEmbeddings(), namespace=namespace
# )
async def fetch_memories(self, observation: str, namespace:str, params = None):
async def fetch_memories(self, observation: str, namespace: str, params: dict = None):
if self.db_type == "pinecone":
# Fetch Pinecone memories here
pass
@ -227,22 +251,57 @@ class VectorDB:
print(str(datetime.now()))
print(observation)
if namespace is None:
namespace= self.namespace
namespace = self.namespace
params_user_id= {
"path": ["user_id"],
"operator": "Like",
"valueText": self.user_id
params_user_id = {
"path": ["user_id"],
"operator": "Like",
"valueText": self.user_id
}
if params:
query_output = client.query.get(namespace, ["text","user_id", "memory_id", "ltm_memory_id", "st_memory_id", "buffer_id"]).with_where(params).with_additional(['id','creationTimeUnix','lastUpdateTimeUnix']).with_where(params_user_id).do()
query_output = client.query.get(namespace, ["text"
, "user_id"
, "memory_id"
, "ltm_memory_id"
, "st_memory_id"
, "buffer_id"
, "version",
"agreement_id",
"privacy_policy",
"terms_of_service",
"format",
"schema_version",
"checksum",
"owner",
"license",
"validity_start",
"validity_end"]).with_where(params).with_additional(
['id', 'creationTimeUnix', 'lastUpdateTimeUnix', "score"]).with_where(params_user_id).do()
return query_output
else:
query_output = client.query.get(namespace, ["text","user_id", "memory_id", "ltm_memory_id","st_memory_id", "buffer_id"]).with_additional(['id','creationTimeUnix','lastUpdateTimeUnix']).with_where(params_user_id).do()
query_output = client.query.get(namespace, ["text",
"user_id",
"memory_id",
"ltm_memory_id",
"st_memory_id",
"buffer_id",
"version",
"agreement_id",
"privacy_policy",
"terms_of_service",
"format",
"schema_version",
"checksum",
"owner",
"license",
"validity_start",
"validity_end"
]).with_additional(
['id', 'creationTimeUnix', 'lastUpdateTimeUnix', "score"]).with_where(params_user_id).do()
return query_output
async def delete_memories(self, params:dict = None):
async def delete_memories(self, params: dict = None):
client = self.init_weaviate_client(self.namespace)
if params:
where_filter = {
@ -256,7 +315,7 @@ class VectorDB:
where=where_filter,
)
else:
#Delete all objects
# Delete all objects
return client.batch.delete_objects(
class_name=self.namespace,
@ -267,8 +326,9 @@ class VectorDB:
}
)
def update_memories(self, observation, namespace:str,params:dict = None):
def update_memories(self, observation, namespace: str, params: dict = None):
client = self.init_weaviate_client(self.namespace)
client.data_object.update(
data_object={
"text": observation,
@ -277,6 +337,17 @@ class VectorDB:
"ltm_memory_id": str(self.ltm_memory_id),
"st_memory_id": str(self.st_memory_id),
"buffer_id": str(self.buffer_id),
"version": params.get('version', None) or "",
"agreement_id": params.get('agreement_id', None) or "",
"privacy_policy": params.get('privacy_policy', None) or "",
"terms_of_service": params.get('terms_of_service', None) or "",
"format": params.get('format', None) or "",
"schema_version": params.get('schema_version', None) or "",
"checksum": params.get('checksum', None) or "",
"owner": params.get('owner', None) or "",
"license": params.get('license', None) or "",
"validity_start": params.get('validity_start', None) or "",
"validity_end": params.get('validity_end', None) or ""
# **source_metadata,
@ -288,33 +359,39 @@ class VectorDB:
return
class SemanticMemory:
def __init__(self, user_id: str, memory_id:str, ltm_memory_id:str, index_name: str, db_type:str="weaviate", namespace:str="SEMANTICMEMORY"):
def __init__(self, user_id: str, memory_id: str, ltm_memory_id: str, index_name: str, db_type: str = "weaviate",
namespace: str = "SEMANTICMEMORY"):
# Add any semantic memory-related attributes or setup here
self.user_id=user_id
self.user_id = user_id
self.index_name = index_name
self.namespace = namespace
self.semantic_memory_id = str(uuid.uuid4())
self.memory_id = memory_id
self.ltm_memory_id = ltm_memory_id
self.vector_db = VectorDB(user_id=user_id, memory_id= self.memory_id, ltm_memory_id = self.ltm_memory_id, index_name=index_name, db_type=db_type, namespace=self.namespace)
self.vector_db = VectorDB(user_id=user_id, memory_id=self.memory_id, ltm_memory_id=self.ltm_memory_id,
index_name=index_name, db_type=db_type, namespace=self.namespace)
self.db_type = db_type
async def _add_memories(self, semantic_memory: str="None") ->list[str]:
async def _add_memories(self, semantic_memory: str = "None", params: dict = None) -> list[str]:
"""Update semantic memory for the user"""
if self.db_type == "weaviate":
out = await self.vector_db.add_memories( observation = semantic_memory)
return out
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=400,
chunk_overlap=20,
length_function=len,
is_separator_regex=False,
)
texts = text_splitter.create_documents([semantic_memory])
for text in texts:
out = await self.vector_db.add_memories(observation=text.page_content, params=params)
return out
elif self.db_type == "pinecone":
pass
async def _fetch_memories(self, observation: str,params:str=None) -> Coroutine[Any, Any, Any]:
async def _fetch_memories(self, observation: str, params: str = None) -> Coroutine[Any, Any, Any]:
"""Fetch related characteristics, preferences or dislikes for a user."""
# self.init_pinecone(index_name=self.index)
@ -325,42 +402,42 @@ class SemanticMemory:
elif self.db_type == "pinecone":
pass
async def _delete_memories(self,params:str=None) -> Coroutine[Any, Any, Any]:
async def _delete_memories(self, params: str = None) -> Coroutine[Any, Any, Any]:
"""Fetch related characteristics, preferences or dislikes for a user."""
# self.init_pinecone(index_name=self.index)
if self.db_type == "weaviate":
return await self.vector_db.delete_memories( params=params)
return await self.vector_db.delete_memories(params=params)
elif self.db_type == "pinecone":
pass
class EpisodicMemory:
def __init__(self, user_id: str, memory_id:str, ltm_memory_id:str, index_name: str, db_type:str="weaviate", namespace:str="EPISODICMEMORY"):
def __init__(self, user_id: str, memory_id: str, ltm_memory_id: str, index_name: str, db_type: str = "weaviate",
namespace: str = "EPISODICMEMORY"):
# Add any semantic memory-related attributes or setup here
self.user_id=user_id
self.user_id = user_id
self.index_name = index_name
self.namespace = namespace
self.episodic_memory_id = str(uuid.uuid4())
self.memory_id = memory_id
self.ltm_memory_id = ltm_memory_id
self.vector_db = VectorDB(user_id=user_id, memory_id= self.memory_id, ltm_memory_id = self.ltm_memory_id, index_name=index_name, db_type=db_type, namespace=self.namespace)
self.vector_db = VectorDB(user_id=user_id, memory_id=self.memory_id, ltm_memory_id=self.ltm_memory_id,
index_name=index_name, db_type=db_type, namespace=self.namespace)
self.db_type = db_type
async def _add_memories(self ,observation:str=None) -> list[str]:
async def _add_memories(self, observation: str = None, params: dict = None) -> list[str]:
"""Update semantic memory for the user"""
if self.db_type == "weaviate":
return await self.vector_db.add_memories( observation = observation)
return await self.vector_db.add_memories(observation=observation, params=params)
elif self.db_type == "pinecone":
pass
def _fetch_memories(self, observation: str,params:str=None) -> Coroutine[Any, Any, Any]:
def _fetch_memories(self, observation: str, params: str = None) -> Coroutine[Any, Any, Any]:
"""Fetch related characteristics, preferences or dislikes for a user."""
# self.init_pinecone(index_name=self.index)
@ -370,19 +447,22 @@ class EpisodicMemory:
elif self.db_type == "pinecone":
pass
async def _delete_memories(self, params:str=None) -> Coroutine[Any, Any, Any]:
async def _delete_memories(self, params: str = None) -> Coroutine[Any, Any, Any]:
"""Fetch related characteristics, preferences or dislikes for a user."""
# self.init_pinecone(index_name=self.index)
if self.db_type == "weaviate":
return await self.vector_db.delete_memories( params=params)
return await self.vector_db.delete_memories(params=params)
elif self.db_type == "pinecone":
pass
class LongTermMemory:
def __init__(self, user_id: str = "676", memory_id:str=None, index_name: str = None, namespace:str=None, db_type:str="weaviate"):
def __init__(self, user_id: str = "676", memory_id: str = None, index_name: str = None, namespace: str = None,
db_type: str = "weaviate"):
self.user_id = user_id
self.memory_id = memory_id
self.ltm_memory_id = str(uuid.uuid4())
@ -390,14 +470,17 @@ class LongTermMemory:
self.namespace = namespace
self.db_type = db_type
# self.episodic_memory = EpisodicMemory()
self.semantic_memory = SemanticMemory(user_id = self.user_id, memory_id=self.memory_id, ltm_memory_id = self.ltm_memory_id, index_name=self.index_name, db_type=self.db_type)
self.semantic_memory = SemanticMemory(user_id=self.user_id, memory_id=self.memory_id,
ltm_memory_id=self.ltm_memory_id, index_name=self.index_name,
db_type=self.db_type)
self.episodic_memory = EpisodicMemory(user_id=self.user_id, memory_id=self.memory_id,
ltm_memory_id=self.ltm_memory_id, index_name=self.index_name,
db_type=self.db_type)
class ShortTermMemory:
def __init__(self, user_id: str = "676", memory_id:str=None, index_name: str = None, namespace:str=None, db_type:str="weaviate"):
def __init__(self, user_id: str = "676", memory_id: str = None, index_name: str = None, namespace: str = None,
db_type: str = "weaviate"):
# Add any short-term memory-related attributes or setup here
self.user_id = user_id
self.memory_id = memory_id
@ -405,12 +488,13 @@ class ShortTermMemory:
self.db_type = db_type
self.stm_memory_id = str(uuid.uuid4())
self.index_name = index_name
self.episodic_buffer = EpisodicBuffer(user_id=self.user_id, memory_id=self.memory_id, index_name=self.index_name, db_type=self.db_type)
self.episodic_buffer = EpisodicBuffer(user_id=self.user_id, memory_id=self.memory_id,
index_name=self.index_name, db_type=self.db_type)
class EpisodicBuffer:
def __init__(self, user_id: str = "676", memory_id:str=None, index_name: str = None, namespace:str='EPISODICBUFFER', db_type:str="weaviate"):
def __init__(self, user_id: str = "676", memory_id: str = None, index_name: str = None,
namespace: str = 'EPISODICBUFFER', db_type: str = "weaviate"):
# Add any short-term memory-related attributes or setup here
self.user_id = user_id
self.memory_id = memory_id
@ -418,7 +502,7 @@ class EpisodicBuffer:
self.db_type = db_type
self.st_memory_id = "blah"
self.index_name = index_name
self.llm= ChatOpenAI(
self.llm = ChatOpenAI(
temperature=0.0,
max_tokens=1200,
openai_api_key=os.environ.get('OPENAI_API_KEY'),
@ -426,10 +510,8 @@ class EpisodicBuffer:
callbacks=[MyCustomSyncHandler(), MyCustomAsyncHandler()],
)
# self.vector_db = VectorDB(user_id=user_id, memory_id= self.memory_id, st_memory_id = self.st_memory_id, index_name=index_name, db_type=db_type, namespace=self.namespace)
def _compute_weights(self, context: str):
"""Computes the weights for the buffer"""
pass
@ -456,57 +538,58 @@ class EpisodicBuffer:
# json_data = json.dumps(chain_result)
# return json_data
async def _fetch_memories(self, observation: str,namespace:str) -> str:
async def _fetch_memories(self, observation: str, namespace: str) -> str:
vector_db = VectorDB(user_id=self.user_id, memory_id=self.memory_id, st_memory_id=self.st_memory_id,
index_name=self.index_name, db_type=self.db_type, namespace=namespace)
query = await vector_db.fetch_memories(observation=observation)
return query
async def _add_memories(self, observation: str,namespace:str):
async def _add_memories(self, observation: str, namespace: str, params: dict = None):
vector_db = VectorDB(user_id=self.user_id, memory_id=self.memory_id, st_memory_id=self.st_memory_id,
index_name=self.index_name, db_type=self.db_type, namespace=namespace)
query = await vector_db.add_memories(observation)
query = await vector_db.add_memories(observation, params=params)
return query
async def _delete_memories(self, params:str=None) -> Coroutine[Any, Any, Any]:
async def _delete_memories(self, params: str = None) -> Coroutine[Any, Any, Any]:
"""Fetch related characteristics, preferences or dislikes for a user."""
# self.init_pinecone(index_name=self.index)
if self.db_type == "weaviate":
return await self.vector_db.delete_memories( params=params)
return await self.vector_db.delete_memories(params=params)
elif self.db_type == "pinecone":
pass
async def freshness(self, observation: str,namespace:str) -> str:
"""Freshness - Score between 1 and 5 on how often was the information processed in episodic memory in the past"""
# async def freshness(self, observation: str,namespace:str) -> str:
# """Freshness - Score between 1 and 5 on how often was the information processed in episodic memory in the past"""
#
# memory = Memory(user_id=self.user_id)
# await memory.async_init()
#
# # gg = await memory._run_buffer(user_input= "bla", content = "blablabla ")
# # print(gg)
#
#
#
# ggur = await memory._fetch_episodic_memory(observation=observation)
# print(ggur)
memory = Memory(user_id=self.user_id)
await memory.async_init()
# @ai_classifier
# class MemoryRoute(Enum):
# """Represents classifer for semantic fetching of memories"""
#
# storage_of_documents_and_knowledge_to_memory = "SEMANTICMEMORY"
# raw_information_currently_processed_in_short_term_memory = "EPISODICBUFFER"
# raw_information_kept_in_short_term_memory = "SHORTTERMMEMORY"
# long_term_recollections_of_past_events_and_emotions = "EPISODICMEMORY"
# raw_information_to_store_as_events = "EVENTBUFFER"
#
# namespace= MemoryRoute(observation)
# gg = await memory._run_buffer(user_input= "bla", content = "blablabla ")
# print(gg)
ggur = await memory._fetch_episodic_memory(observation="bla bla bla")
print(ggur)
# @ai_classifier
# class MemoryRoute(Enum):
# """Represents classifer for semantic fetching of memories"""
#
# storage_of_documents_and_knowledge_to_memory = "SEMANTICMEMORY"
# raw_information_currently_processed_in_short_term_memory = "EPISODICBUFFER"
# raw_information_kept_in_short_term_memory = "SHORTTERMMEMORY"
# long_term_recollections_of_past_events_and_emotions = "EPISODICMEMORY"
# raw_information_to_store_as_events = "EVENTBUFFER"
#
# namespace= MemoryRoute(observation)
return ggur
# return ggur
async def encoding(self, document: str, namespace: str = "EPISODICBUFFER") -> None:
"""Encoding for the buffer, stores raw data in the buffer
@ -522,21 +605,90 @@ class EpisodicBuffer:
# Here we define the user prompt and the structure of the output we desire
# prompt = output[0].page_content
# file_upload
#
if content is not None:
#We need to encode the content. Note, this is not comp-sci encoding, but rather encoding in the sense of storing the content in the buffer
output_translated = GoogleTranslator(source='auto', target='en').translate(text=content)
await self.encoding(output_translated)
freshness_score =await self.freshness(output_translated, namespace="EPISODICBUFFER")
print(freshness_score)
# operations -> translate, structure, load to db
list_of_operations = ["translate", "structure", "load to db"]
prompt_filter = ChatPromptTemplate.from_template(
"Filter and remove uneccessary information that is not relevant in the user query {query}")
chain_filter = prompt_filter | self.llm
output = await chain_filter.ainvoke({"query": user_input})
class Task(BaseModel):
"""Schema for an individual task."""
task_order: str = Field(..., description="The order at which the task needs to be performed")
task_name: str = Field(None, description="The task that needs to be performed")
operation: str = Field(None, description="The operation to be performed")
class TaskList(BaseModel):
"""Schema for the record containing a list of tasks."""
tasks: List[Task] = Field(..., description="List of tasks")
prompt_filter_chunk = f" Based on available operations {list_of_operations} determine only the relevant list of steps and operations sequentially based {output}"
# chain_filter_chunk = prompt_filter_chunk | self.llm.bind(function_call={"TaskList": "tasks"}, functions=TaskList)
# output_chunk = await chain_filter_chunk.ainvoke({"query": output, "list_of_operations": list_of_operations})
prompt_msgs = [
SystemMessage(
content="You are a world class algorithm for decomposing prompts into steps and operations and choosing relevant ones"
),
HumanMessage(content="Decompose based on the following prompt:"),
HumanMessagePromptTemplate.from_template("{input}"),
HumanMessage(content="Tips: Make sure to answer in the correct format"),
HumanMessage(content="Tips: Only choose actions that are relevant to the user query and ignore others")
]
prompt_ = ChatPromptTemplate(messages=prompt_msgs)
chain = create_structured_output_chain(TaskList, self.llm, prompt_, verbose=True)
from langchain.callbacks import get_openai_callback
with get_openai_callback() as cb:
output = await chain.arun(input=prompt_filter_chunk, verbose=True)
print(cb)
# output = json.dumps(output)
my_object = parse_obj_as(TaskList, output)
print("HERE IS THE OUTPUT", my_object.json())
data = json.loads(my_object.json())
# Extract the list of tasks
tasks_list = data["tasks"]
for task in tasks_list:
class TranslateText(BaseModel):
observation: str = Field(
description="observation we want to translate"
)
@tool("translate_to_en", args_schema=TranslateText, return_direct=True)
def translate_to_en(observation, args_schema=TranslateText):
"""Translate to English"""
out = GoogleTranslator(source='auto', target='en').translate(text=observation)
return out
agent = initialize_agent(
llm=self.llm,
tools=[translate_to_en],
agent=AgentType.OPENAI_FUNCTIONS,
verbose=True,
)
agent.run(task)
# We need to encode the content. Note, this is not comp-sci encoding, but rather encoding in the sense of storing the content in the buffer
# output_translated = GoogleTranslator(source='auto', target='en').translate(text=content)
# await self.encoding(output_translated)
# freshness_score =await self.freshness(output_translated, namespace="EPISODICBUFFER")
# print(freshness_score)
# shows how much the data is relevant for the user, provided by the user in a separate step, starts at 0
user_relevance_score ="0"
user_relevance_score = "0"
# similarity score between the user input and the content already available in the buffer
# write this to episodic memory
# prompt_filter = ChatPromptTemplate.from_template("Filter and remove uneccessary information that is not relevant in the user query {query}")
# chain_filter = prompt_filter | self.llm
@ -546,12 +698,12 @@ class EpisodicBuffer:
if content is None:
# Sensory and Linguistic Processing
prompt_filter = ChatPromptTemplate.from_template("Filter and remove uneccessary information that is not relevant in the user query {query}")
prompt_filter = ChatPromptTemplate.from_template(
"Filter and remove uneccessary information that is not relevant in the user query {query}")
chain_filter = prompt_filter | self.llm
output = await chain_filter.ainvoke({"query": user_input})
translation = GoogleTranslator(source='auto', target='en').translate(text=output.content)
def top_down_processing():
"""Top-down processing"""
pass
@ -560,29 +712,19 @@ class EpisodicBuffer:
"""Bottom-up processing"""
pass
def interactive_processing():
"""interactive processing"""
pass
working_memory_activation = "bla"
working_memory_activation = "bla"
prompt_chunk = ChatPromptTemplate.from_template("Can you break down the instruction 'Structure a PDF and load it into duckdb' into smaller tasks or actions? Return only tasks or actions. Be brief")
prompt_chunk = ChatPromptTemplate.from_template(
"Can you break down the instruction 'Structure a PDF and load it into duckdb' into smaller tasks or actions? Return only tasks or actions. Be brief")
chain_chunk = prompt_chunk | self.llm
output_chunks = await chain_chunk.ainvoke({"query": output.content})
print(output_chunks.content)
# vectorstore = Weaviate.from_documents(documents, embeddings, client=client, by_text=False)
# retriever = WeaviateHybridSearchRetriever(
# client=client,
@ -598,7 +740,6 @@ class EpisodicBuffer:
# query = vector_db.
# retriever = vectorstore.as_retriever(search_kwargs=dict(k=1))
# memory = VectorStoreRetrieverMemory(retriever=retriever)
# class PromptWrapper(BaseModel):
@ -728,10 +869,8 @@ class EpisodicBuffer:
# return output
#DEFINE STM
#DEFINE LTM
# DEFINE STM
# DEFINE LTM
class Memory:
load_dotenv()
@ -739,7 +878,7 @@ class Memory:
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
def __init__(self, user_id: str = "676", index_name: str = None, knowledge_source: str = None,
knowledge_type: str = None, db_type:str="weaviate", namespace:str=None) -> None:
knowledge_type: str = None, db_type: str = "weaviate", namespace: str = None) -> None:
self.user_id = user_id
self.index_name = index_name
self.db_type = db_type
@ -752,12 +891,13 @@ class Memory:
load_dotenv()
# Asynchronous factory function for creating LongTermMemory
async def async_create_long_term_memory(self,user_id, memory_id, index_name, namespace, db_type):
async def async_create_long_term_memory(self, user_id, memory_id, index_name, namespace, db_type):
# Perform asynchronous initialization steps if needed
return LongTermMemory(
user_id=user_id, memory_id=memory_id, index_name=index_name,
namespace=namespace, db_type=db_type
)
async def async_init(self):
# Asynchronous initialization of LongTermMemory and ShortTermMemory
self.long_term_memory = await self.async_create_long_term_memory(
@ -785,9 +925,9 @@ class Memory:
# user_id=self.user_id, memory_id=self.memory_id, index_name=self.index_name, db_type=self.db_type
# )
async def _add_semantic_memory(self, semantic_memory:str):
async def _add_semantic_memory(self, semantic_memory: str, params: dict = None):
return await self.long_term_memory.semantic_memory._add_memories(
semantic_memory=semantic_memory
semantic_memory=semantic_memory, params=params
)
@ -795,67 +935,79 @@ class Memory:
return await self.long_term_memory.semantic_memory._fetch_memories(
observation=observation, params=params
)
async def _delete_semantic_memory(self, params:str=None):
async def _delete_semantic_memory(self, params: str = None):
return await self.long_term_memory.semantic_memory._delete_memories(
params=params
)
async def _add_episodic_memory(self, observation:str):
async def _add_episodic_memory(self, observation: str, params: dict = None):
return await self.long_term_memory.episodic_memory._add_memories(
observation=observation
observation=observation, params=params
)
async def _fetch_episodic_memory(self, observation, params:str=None):
async def _fetch_episodic_memory(self, observation, params: str = None):
return await self.long_term_memory.episodic_memory._fetch_memories(
observation=observation, params=params
)
async def _delete_episodic_memory(self, params:str=None):
async def _delete_episodic_memory(self, params: str = None):
return await self.long_term_memory.episodic_memory._delete_memories(
params=params
params=params
)
async def _run_buffer(self, user_input:str, content:str=None):
async def _run_buffer(self, user_input: str, content: str = None):
return await self.short_term_memory.episodic_buffer.main_buffer(user_input=user_input, content=content)
async def _add_buffer_memory(self, user_input: str, namespace: str = None, params: dict = None):
return await self.short_term_memory.episodic_buffer._add_memories(observation=user_input, namespace=namespace,
params=params)
async def _add_buffer_memory(self, user_input: str, namespace: str = None ):
return await self.short_term_memory.episodic_buffer._add_memories(observation=user_input, namespace=namespace)
async def _fetch_buffer_memory(self, user_input: str, namespace: str = None ):
async def _fetch_buffer_memory(self, user_input: str, namespace: str = None):
return await self.short_term_memory.episodic_buffer._fetch_memories(observation=user_input, namespace=namespace)
async def _delete_buffer_memory(self, params:str=None):
async def _delete_buffer_memory(self, params: str = None):
return await self.long_term_memory.episodic_buffer._delete_memories(
params=params
params=params
)
async def main():
memory = Memory(user_id="123")
await memory.async_init()
params = {
"version": "1.0",
"agreement_id": "AG123456",
"privacy_policy": "https://example.com/privacy",
"terms_of_service": "https://example.com/terms",
"format": "json",
"schema_version": "1.1",
"checksum": "a1b2c3d4e5f6",
"owner": "John Doe",
"license": "MIT",
"validity_start": "2023-08-01",
"validity_end": "2024-07-31"
}
# gg = await memory._run_buffer(user_input= "bla", content = "blablabla ")
# print(gg)
gg = await memory._delete_episodic_memory()
gg = await memory._run_buffer(user_input="i NEED TRANSLATION TO GERMAN ", content="i NEED TRANSLATION TO GERMAN ")
print(gg)
# ggur = await memory._add_episodic_memory(observation = "bla bla bla")
# gg = await memory._delete_episodic_memory()
# print(gg)
# ggur = await memory._add_episodic_memory(observation = "bla bla bla", params=params)
# print(ggur)
# ggur = await memory._fetch_episodic_memory(observation = "bla bla bla")
# print(ggur)
# fff = await memory._fetch_memories_buffer(user_input = "bla bla bla", namespace="Test")
# print(fff)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
# bb = agent._update_semantic_memory(semantic_memory="Users core summary")