diff --git a/level_2/api.py b/level_2/api.py index d2cc74e6c..e79309de4 100644 --- a/level_2/api.py +++ b/level_2/api.py @@ -1,3 +1,5 @@ +from io import BytesIO + from langchain.document_loaders import PyPDFLoader from level_2_pdf_vectorstore__dlt_contracts import Memory @@ -27,7 +29,7 @@ from dotenv import load_dotenv load_dotenv() - +OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") app = FastAPI(debug=True) @@ -63,82 +65,73 @@ def health_check(): #curl -X POST -H "Content-Type: application/json" -d '{"data": "YourPayload"}' -F "files=@/path/to/your/pdf/file.pdf" http://127.0.0.1:8000/upload/ -from fastapi import FastAPI, UploadFile, File -import requests -import os -import json - -app = FastAPI() - - -from io import BytesIO - class Payload(BaseModel): payload: Dict[str, Any] -@app.post("/upload/", response_model=dict) -async def upload_pdf_and_payload( - payload: Payload, - # files: List[UploadFile] = File(...), -): - try: - # Process the payload - decoded_payload = payload.payload - # except: - # pass - # - # return JSONResponse(content={"response": decoded_payload}, status_code=200) - - # Download the remote PDF if URL is provided - if 'pdf_url' in decoded_payload: - pdf_response = requests.get(decoded_payload['pdf_url']) - pdf_content = pdf_response.content - - logging.info("Downloaded PDF from URL") - - # Create an in-memory file-like object for the PDF content - pdf_stream = BytesIO(pdf_content) - - contents = pdf_stream.read() - - tmp_location = os.path.join('/tmp', "tmp.pdf") - with open(tmp_location, 'wb') as tmp_file: - tmp_file.write(contents) - - logging.info("Wrote PDF from URL") - - # Process the PDF using PyPDFLoader - loader = PyPDFLoader(tmp_location) - pages = loader.load_and_split() - logging.info(" PDF split into pages") - Memory_ = Memory(index_name="my-agent", user_id='555' ) - await Memory_.async_init() - Memory_._add_episodic_memory(user_input="I want to get a schema for my data", content =pages) - - - # Run the buffer - response = Memory_._run_buffer(user_input="I want to get a schema for my data") - return JSONResponse(content={"response": response}, status_code=200) - - #to do: add the user id to the payload - #to do add the raw pdf to payload - # bb = await Memory_._run_buffer(user_input=decoded_payload['prompt']) - # print(bb) - - - except Exception as e: - - return {"error": str(e)} - # Here you can perform your processing on the PDF contents - # results.append({"filename": file.filename, "size": len(contents)}) - - # Append the in-memory file to the files list - # files.append(UploadFile(pdf_stream, filename="downloaded.pdf")) - +# @app.post("/upload/", response_model=dict) +# async def upload_pdf_and_payload( +# payload: Payload, +# # files: List[UploadFile] = File(...), +# ): +# try: +# # Process the payload +# decoded_payload = payload.payload +# # except: +# # pass +# # +# # return JSONResponse(content={"response": decoded_payload}, status_code=200) +# +# # Download the remote PDF if URL is provided +# if 'pdf_url' in decoded_payload: +# pdf_response = requests.get(decoded_payload['pdf_url']) +# pdf_content = pdf_response.content +# +# logging.info("Downloaded PDF from URL") +# +# # Create an in-memory file-like object for the PDF content +# pdf_stream = BytesIO(pdf_content) +# +# contents = pdf_stream.read() +# +# tmp_location = os.path.join('/tmp', "tmp.pdf") +# with open(tmp_location, 'wb') as tmp_file: +# tmp_file.write(contents) +# +# logging.info("Wrote PDF from URL") +# +# # Process the PDF using PyPDFLoader +# loader = PyPDFLoader(tmp_location) +# pages = loader.load_and_split() +# logging.info(" PDF split into pages") +# Memory_ = Memory(index_name="my-agent", user_id='555' ) +# await Memory_.async_init() +# Memory_._add_episodic_memory(user_input="I want to get a schema for my data", content =pages) +# +# +# # Run the buffer +# response = Memory_._run_buffer(user_input="I want to get a schema for my data") +# return JSONResponse(content={"response": response}, status_code=200) +# +# #to do: add the user id to the payload +# #to do add the raw pdf to payload +# # bb = await Memory_._run_buffer(user_input=decoded_payload['prompt']) +# # print(bb) +# +# +# except Exception as e: +# +# return {"error": str(e)} +# # Here you can perform your processing on the PDF contents +# # results.append({"filename": file.filename, "size": len(contents)}) +# +# # Append the in-memory file to the files list +# # files.append(UploadFile(pdf_stream, filename="downloaded.pdf")) +# def memory_factory(memory_type): + load_dotenv() class Payload(BaseModel): payload: Dict[str, Any] @app.post("/{memory_type}/add-memory", response_model=dict) @@ -148,16 +141,40 @@ def memory_factory(memory_type): ): try: + logging.info(" Init PDF processing") + decoded_payload = payload.payload - Memory_ = Memory( user_id='555') + if 'pdf_url' in decoded_payload: + pdf_response = requests.get(decoded_payload['pdf_url']) + pdf_content = pdf_response.content - await Memory_.async_init() + logging.info("Downloaded PDF from URL") - memory_class = getattr(Memory_, f"_add_{memory_type}_memory", None) - output= memory_class(observation=decoded_payload['prompt']) - return JSONResponse(content={"response": output}, status_code=200) + # Create an in-memory file-like object for the PDF content + pdf_stream = BytesIO(pdf_content) + + contents = pdf_stream.read() + + tmp_location = os.path.join('/tmp', "tmp.pdf") + with open(tmp_location, 'wb') as tmp_file: + tmp_file.write(contents) + + logging.info("Wrote PDF from URL") + + # Process the PDF using PyPDFLoader + loader = PyPDFLoader(tmp_location) + # pages = loader.load_and_split() + logging.info(" PDF split into pages") + + Memory_ = Memory(user_id='555') + + await Memory_.async_init() + + memory_class = getattr(Memory_, f"_add_{memory_type}_memory", None) + output= await memory_class(observation=str(loader)) + return JSONResponse(content={"response": output}, status_code=200) except Exception as e: diff --git a/level_2/level_2_pdf_vectorstore__dlt_contracts.py b/level_2/level_2_pdf_vectorstore__dlt_contracts.py index 1f8b1cdf9..e087cafcd 100644 --- a/level_2/level_2_pdf_vectorstore__dlt_contracts.py +++ b/level_2/level_2_pdf_vectorstore__dlt_contracts.py @@ -1,4 +1,4 @@ -#Make sure to install the following packages: dlt, langchain, duckdb, python-dotenv, openai, weaviate-client +# Make sure to install the following packages: dlt, langchain, duckdb, python-dotenv, openai, weaviate-client import dlt from langchain import PromptTemplate, LLMChain @@ -15,7 +15,7 @@ from deep_translator import (GoogleTranslator) from langchain.chat_models import ChatOpenAI from langchain.schema import LLMResult, HumanMessage from langchain.callbacks.base import AsyncCallbackHandler, BaseCallbackHandler - +from pydantic import BaseModel, Field, parse_obj_as from langchain.memory import VectorStoreRetrieverMemory from marvin import ai_classifier from enum import Enum @@ -29,14 +29,13 @@ from langchain.tools import tool from langchain.vectorstores import Weaviate import uuid from dotenv import load_dotenv + load_dotenv() from pathlib import Path from langchain import OpenAI, LLMMathChain from langchain.chat_models import ChatOpenAI from langchain.prompts import ChatPromptTemplate - - import os from datetime import datetime @@ -56,12 +55,17 @@ from langchain.schema import Document, SystemMessage, HumanMessage from langchain.vectorstores import Weaviate import weaviate import uuid + load_dotenv() +OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") + class MyCustomSyncHandler(BaseCallbackHandler): def on_llm_new_token(self, token: str, **kwargs) -> None: print(f"Sync handler being called in a `thread_pool_executor`: token: {token}") + + class MyCustomAsyncHandler(AsyncCallbackHandler): """Async callback handler that can be used to handle callbacks from langchain.""" @@ -80,14 +84,17 @@ class MyCustomAsyncHandler(AsyncCallbackHandler): await asyncio.sleep(0.3) print("Hi! I just woke up. Your llm is ending") + class VectorDB: OPENAI_TEMPERATURE = float(os.getenv("OPENAI_TEMPERATURE", 0.0)) OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") - def __init__(self, user_id: str, index_name: str, memory_id:str, ltm_memory_id:str='00000', st_memory_id:str='0000', buffer_id:str='0000', db_type: str = "pinecone", namespace:str = None): + + def __init__(self, user_id: str, index_name: str, memory_id: str, ltm_memory_id: str = '00000', + st_memory_id: str = '0000', buffer_id: str = '0000', db_type: str = "pinecone", namespace: str = None): self.user_id = user_id self.index_name = index_name self.db_type = db_type - self.namespace=namespace + self.namespace = namespace self.memory_id = memory_id self.ltm_memory_id = ltm_memory_id self.st_memory_id = st_memory_id @@ -118,7 +125,7 @@ class VectorDB: ) return vectorstore - def init_weaviate_client(self, namespace:str): + def init_weaviate_client(self, namespace: str): embeddings = OpenAIEmbeddings() auth_config = weaviate.auth.AuthApiKey(api_key=os.environ.get('WEAVIATE_API_KEY')) client = weaviate.Client( @@ -131,7 +138,7 @@ class VectorDB: ) return client - def init_weaviate(self, namespace:str): + def init_weaviate(self, namespace: str): embeddings = OpenAIEmbeddings() auth_config = weaviate.auth.AuthApiKey(api_key=os.environ.get('WEAVIATE_API_KEY')) client = weaviate.Client( @@ -152,15 +159,12 @@ class VectorDB: ) return retriever - async def add_memories(self, observation: str, page: str = "", source: str = ""): + async def add_memories(self, observation: str, params: dict = None): if self.db_type == "pinecone": # Update Pinecone memories here vectorstore: Pinecone = Pinecone.from_existing_index( index_name=self.index_name, embedding=OpenAIEmbeddings(), namespace=self.namespace ) - - - retriever = vectorstore.as_retriever() retriever.add_documents( [ @@ -170,8 +174,17 @@ class VectorDB: "inserted_at": datetime.now(), "text": observation, "user_id": self.user_id, - "page": page, - "source": source + "version": params.get('version', None) or "", + "agreement_id": params.get('agreement_id', None) or "", + "privacy_policy": params.get('privacy_policy', None) or "", + "terms_of_service": params.get('terms_of_service', None) or "", + "format": params.get('format', None) or "", + "schema_version": params.get('schema_version', None) or "", + "checksum": params.get('checksum', None) or "", + "owner": params.get('owner', None) or "", + "license": params.get('license', None) or "", + "validity_start": params.get('validity_start', None) or "", + "validity_end": params.get('validity_end', None) or "" }, namespace=self.namespace, ) @@ -180,10 +193,9 @@ class VectorDB: elif self.db_type == "weaviate": # Update Weaviate memories here print(self.namespace) - retriever = self.init_weaviate( self.namespace) + retriever = self.init_weaviate(self.namespace) - - return retriever.add_documents([ + return retriever.add_documents([ Document( metadata={ "text": observation, @@ -192,18 +204,30 @@ class VectorDB: "ltm_memory_id": str(self.ltm_memory_id), "st_memory_id": str(self.st_memory_id), "buffer_id": str(self.buffer_id), + "version": params.get('version', None) or "", + "agreement_id": params.get('agreement_id', None) or "", + "privacy_policy": params.get('privacy_policy', None) or "", + "terms_of_service": params.get('terms_of_service', None) or "", + "format": params.get('format', None) or "", + "schema_version": params.get('schema_version', None) or "", + "checksum": params.get('checksum', None) or "", + "owner": params.get('owner', None) or "", + "license": params.get('license', None) or "", + "validity_start": params.get('validity_start', None) or "", + "validity_end": params.get('validity_end', None) or "" # **source_metadata, }, page_content=observation, )] ) + # def get_pinecone_vectorstore(self, namespace: str) -> pinecone.VectorStore: # return Pinecone.from_existing_index( # index_name=self.index, embedding=OpenAIEmbeddings(), namespace=namespace # ) - async def fetch_memories(self, observation: str, namespace:str, params = None): + async def fetch_memories(self, observation: str, namespace: str, params: dict = None): if self.db_type == "pinecone": # Fetch Pinecone memories here pass @@ -227,22 +251,57 @@ class VectorDB: print(str(datetime.now())) print(observation) if namespace is None: - namespace= self.namespace + namespace = self.namespace - params_user_id= { - "path": ["user_id"], - "operator": "Like", - "valueText": self.user_id + params_user_id = { + "path": ["user_id"], + "operator": "Like", + "valueText": self.user_id } if params: - query_output = client.query.get(namespace, ["text","user_id", "memory_id", "ltm_memory_id", "st_memory_id", "buffer_id"]).with_where(params).with_additional(['id','creationTimeUnix','lastUpdateTimeUnix']).with_where(params_user_id).do() + query_output = client.query.get(namespace, ["text" + , "user_id" + , "memory_id" + , "ltm_memory_id" + , "st_memory_id" + , "buffer_id" + , "version", + "agreement_id", + "privacy_policy", + "terms_of_service", + "format", + "schema_version", + "checksum", + "owner", + "license", + "validity_start", + "validity_end"]).with_where(params).with_additional( + ['id', 'creationTimeUnix', 'lastUpdateTimeUnix', "score"]).with_where(params_user_id).do() return query_output else: - query_output = client.query.get(namespace, ["text","user_id", "memory_id", "ltm_memory_id","st_memory_id", "buffer_id"]).with_additional(['id','creationTimeUnix','lastUpdateTimeUnix']).with_where(params_user_id).do() + query_output = client.query.get(namespace, ["text", + "user_id", + "memory_id", + "ltm_memory_id", + "st_memory_id", + "buffer_id", + "version", + "agreement_id", + "privacy_policy", + "terms_of_service", + "format", + "schema_version", + "checksum", + "owner", + "license", + "validity_start", + "validity_end" + ]).with_additional( + ['id', 'creationTimeUnix', 'lastUpdateTimeUnix', "score"]).with_where(params_user_id).do() return query_output - async def delete_memories(self, params:dict = None): + async def delete_memories(self, params: dict = None): client = self.init_weaviate_client(self.namespace) if params: where_filter = { @@ -256,7 +315,7 @@ class VectorDB: where=where_filter, ) else: - #Delete all objects + # Delete all objects return client.batch.delete_objects( class_name=self.namespace, @@ -267,8 +326,9 @@ class VectorDB: } ) - def update_memories(self, observation, namespace:str,params:dict = None): + def update_memories(self, observation, namespace: str, params: dict = None): client = self.init_weaviate_client(self.namespace) + client.data_object.update( data_object={ "text": observation, @@ -277,6 +337,17 @@ class VectorDB: "ltm_memory_id": str(self.ltm_memory_id), "st_memory_id": str(self.st_memory_id), "buffer_id": str(self.buffer_id), + "version": params.get('version', None) or "", + "agreement_id": params.get('agreement_id', None) or "", + "privacy_policy": params.get('privacy_policy', None) or "", + "terms_of_service": params.get('terms_of_service', None) or "", + "format": params.get('format', None) or "", + "schema_version": params.get('schema_version', None) or "", + "checksum": params.get('checksum', None) or "", + "owner": params.get('owner', None) or "", + "license": params.get('license', None) or "", + "validity_start": params.get('validity_start', None) or "", + "validity_end": params.get('validity_end', None) or "" # **source_metadata, @@ -288,33 +359,39 @@ class VectorDB: return - class SemanticMemory: - def __init__(self, user_id: str, memory_id:str, ltm_memory_id:str, index_name: str, db_type:str="weaviate", namespace:str="SEMANTICMEMORY"): + def __init__(self, user_id: str, memory_id: str, ltm_memory_id: str, index_name: str, db_type: str = "weaviate", + namespace: str = "SEMANTICMEMORY"): # Add any semantic memory-related attributes or setup here - self.user_id=user_id + self.user_id = user_id self.index_name = index_name self.namespace = namespace self.semantic_memory_id = str(uuid.uuid4()) self.memory_id = memory_id self.ltm_memory_id = ltm_memory_id - self.vector_db = VectorDB(user_id=user_id, memory_id= self.memory_id, ltm_memory_id = self.ltm_memory_id, index_name=index_name, db_type=db_type, namespace=self.namespace) + self.vector_db = VectorDB(user_id=user_id, memory_id=self.memory_id, ltm_memory_id=self.ltm_memory_id, + index_name=index_name, db_type=db_type, namespace=self.namespace) self.db_type = db_type - - - async def _add_memories(self, semantic_memory: str="None") ->list[str]: + async def _add_memories(self, semantic_memory: str = "None", params: dict = None) -> list[str]: """Update semantic memory for the user""" if self.db_type == "weaviate": - out = await self.vector_db.add_memories( observation = semantic_memory) - return out + text_splitter = RecursiveCharacterTextSplitter( + chunk_size=400, + chunk_overlap=20, + length_function=len, + is_separator_regex=False, + ) + texts = text_splitter.create_documents([semantic_memory]) + for text in texts: + out = await self.vector_db.add_memories(observation=text.page_content, params=params) + return out elif self.db_type == "pinecone": pass - - async def _fetch_memories(self, observation: str,params:str=None) -> Coroutine[Any, Any, Any]: + async def _fetch_memories(self, observation: str, params: str = None) -> Coroutine[Any, Any, Any]: """Fetch related characteristics, preferences or dislikes for a user.""" # self.init_pinecone(index_name=self.index) @@ -325,42 +402,42 @@ class SemanticMemory: elif self.db_type == "pinecone": pass - async def _delete_memories(self,params:str=None) -> Coroutine[Any, Any, Any]: + async def _delete_memories(self, params: str = None) -> Coroutine[Any, Any, Any]: """Fetch related characteristics, preferences or dislikes for a user.""" # self.init_pinecone(index_name=self.index) if self.db_type == "weaviate": - return await self.vector_db.delete_memories( params=params) + return await self.vector_db.delete_memories(params=params) elif self.db_type == "pinecone": pass + class EpisodicMemory: - def __init__(self, user_id: str, memory_id:str, ltm_memory_id:str, index_name: str, db_type:str="weaviate", namespace:str="EPISODICMEMORY"): + def __init__(self, user_id: str, memory_id: str, ltm_memory_id: str, index_name: str, db_type: str = "weaviate", + namespace: str = "EPISODICMEMORY"): # Add any semantic memory-related attributes or setup here - self.user_id=user_id + self.user_id = user_id self.index_name = index_name self.namespace = namespace self.episodic_memory_id = str(uuid.uuid4()) self.memory_id = memory_id self.ltm_memory_id = ltm_memory_id - self.vector_db = VectorDB(user_id=user_id, memory_id= self.memory_id, ltm_memory_id = self.ltm_memory_id, index_name=index_name, db_type=db_type, namespace=self.namespace) + self.vector_db = VectorDB(user_id=user_id, memory_id=self.memory_id, ltm_memory_id=self.ltm_memory_id, + index_name=index_name, db_type=db_type, namespace=self.namespace) self.db_type = db_type - - - async def _add_memories(self ,observation:str=None) -> list[str]: + async def _add_memories(self, observation: str = None, params: dict = None) -> list[str]: """Update semantic memory for the user""" if self.db_type == "weaviate": - return await self.vector_db.add_memories( observation = observation) + return await self.vector_db.add_memories(observation=observation, params=params) elif self.db_type == "pinecone": pass - - def _fetch_memories(self, observation: str,params:str=None) -> Coroutine[Any, Any, Any]: + def _fetch_memories(self, observation: str, params: str = None) -> Coroutine[Any, Any, Any]: """Fetch related characteristics, preferences or dislikes for a user.""" # self.init_pinecone(index_name=self.index) @@ -370,19 +447,22 @@ class EpisodicMemory: elif self.db_type == "pinecone": pass - async def _delete_memories(self, params:str=None) -> Coroutine[Any, Any, Any]: + + async def _delete_memories(self, params: str = None) -> Coroutine[Any, Any, Any]: """Fetch related characteristics, preferences or dislikes for a user.""" # self.init_pinecone(index_name=self.index) if self.db_type == "weaviate": - return await self.vector_db.delete_memories( params=params) + return await self.vector_db.delete_memories(params=params) elif self.db_type == "pinecone": pass + class LongTermMemory: - def __init__(self, user_id: str = "676", memory_id:str=None, index_name: str = None, namespace:str=None, db_type:str="weaviate"): + def __init__(self, user_id: str = "676", memory_id: str = None, index_name: str = None, namespace: str = None, + db_type: str = "weaviate"): self.user_id = user_id self.memory_id = memory_id self.ltm_memory_id = str(uuid.uuid4()) @@ -390,14 +470,17 @@ class LongTermMemory: self.namespace = namespace self.db_type = db_type # self.episodic_memory = EpisodicMemory() - self.semantic_memory = SemanticMemory(user_id = self.user_id, memory_id=self.memory_id, ltm_memory_id = self.ltm_memory_id, index_name=self.index_name, db_type=self.db_type) + self.semantic_memory = SemanticMemory(user_id=self.user_id, memory_id=self.memory_id, + ltm_memory_id=self.ltm_memory_id, index_name=self.index_name, + db_type=self.db_type) self.episodic_memory = EpisodicMemory(user_id=self.user_id, memory_id=self.memory_id, ltm_memory_id=self.ltm_memory_id, index_name=self.index_name, db_type=self.db_type) class ShortTermMemory: - def __init__(self, user_id: str = "676", memory_id:str=None, index_name: str = None, namespace:str=None, db_type:str="weaviate"): + def __init__(self, user_id: str = "676", memory_id: str = None, index_name: str = None, namespace: str = None, + db_type: str = "weaviate"): # Add any short-term memory-related attributes or setup here self.user_id = user_id self.memory_id = memory_id @@ -405,12 +488,13 @@ class ShortTermMemory: self.db_type = db_type self.stm_memory_id = str(uuid.uuid4()) self.index_name = index_name - self.episodic_buffer = EpisodicBuffer(user_id=self.user_id, memory_id=self.memory_id, index_name=self.index_name, db_type=self.db_type) - + self.episodic_buffer = EpisodicBuffer(user_id=self.user_id, memory_id=self.memory_id, + index_name=self.index_name, db_type=self.db_type) class EpisodicBuffer: - def __init__(self, user_id: str = "676", memory_id:str=None, index_name: str = None, namespace:str='EPISODICBUFFER', db_type:str="weaviate"): + def __init__(self, user_id: str = "676", memory_id: str = None, index_name: str = None, + namespace: str = 'EPISODICBUFFER', db_type: str = "weaviate"): # Add any short-term memory-related attributes or setup here self.user_id = user_id self.memory_id = memory_id @@ -418,7 +502,7 @@ class EpisodicBuffer: self.db_type = db_type self.st_memory_id = "blah" self.index_name = index_name - self.llm= ChatOpenAI( + self.llm = ChatOpenAI( temperature=0.0, max_tokens=1200, openai_api_key=os.environ.get('OPENAI_API_KEY'), @@ -426,10 +510,8 @@ class EpisodicBuffer: callbacks=[MyCustomSyncHandler(), MyCustomAsyncHandler()], ) - # self.vector_db = VectorDB(user_id=user_id, memory_id= self.memory_id, st_memory_id = self.st_memory_id, index_name=index_name, db_type=db_type, namespace=self.namespace) - def _compute_weights(self, context: str): """Computes the weights for the buffer""" pass @@ -456,57 +538,58 @@ class EpisodicBuffer: # json_data = json.dumps(chain_result) # return json_data - async def _fetch_memories(self, observation: str,namespace:str) -> str: + async def _fetch_memories(self, observation: str, namespace: str) -> str: vector_db = VectorDB(user_id=self.user_id, memory_id=self.memory_id, st_memory_id=self.st_memory_id, index_name=self.index_name, db_type=self.db_type, namespace=namespace) - query = await vector_db.fetch_memories(observation=observation) return query - async def _add_memories(self, observation: str,namespace:str): + async def _add_memories(self, observation: str, namespace: str, params: dict = None): vector_db = VectorDB(user_id=self.user_id, memory_id=self.memory_id, st_memory_id=self.st_memory_id, index_name=self.index_name, db_type=self.db_type, namespace=namespace) - - query = await vector_db.add_memories(observation) + query = await vector_db.add_memories(observation, params=params) return query - async def _delete_memories(self, params:str=None) -> Coroutine[Any, Any, Any]: + async def _delete_memories(self, params: str = None) -> Coroutine[Any, Any, Any]: """Fetch related characteristics, preferences or dislikes for a user.""" # self.init_pinecone(index_name=self.index) if self.db_type == "weaviate": - return await self.vector_db.delete_memories( params=params) + return await self.vector_db.delete_memories(params=params) elif self.db_type == "pinecone": pass - async def freshness(self, observation: str,namespace:str) -> str: - """Freshness - Score between 1 and 5 on how often was the information processed in episodic memory in the past""" + # async def freshness(self, observation: str,namespace:str) -> str: + # """Freshness - Score between 1 and 5 on how often was the information processed in episodic memory in the past""" + # + # memory = Memory(user_id=self.user_id) + # await memory.async_init() + # + # # gg = await memory._run_buffer(user_input= "bla", content = "blablabla ") + # # print(gg) + # + # + # + # ggur = await memory._fetch_episodic_memory(observation=observation) + # print(ggur) - memory = Memory(user_id=self.user_id) - await memory.async_init() + # @ai_classifier + # class MemoryRoute(Enum): + # """Represents classifer for semantic fetching of memories""" + # + # storage_of_documents_and_knowledge_to_memory = "SEMANTICMEMORY" + # raw_information_currently_processed_in_short_term_memory = "EPISODICBUFFER" + # raw_information_kept_in_short_term_memory = "SHORTTERMMEMORY" + # long_term_recollections_of_past_events_and_emotions = "EPISODICMEMORY" + # raw_information_to_store_as_events = "EVENTBUFFER" + # + # namespace= MemoryRoute(observation) - # gg = await memory._run_buffer(user_input= "bla", content = "blablabla ") - # print(gg) - - ggur = await memory._fetch_episodic_memory(observation="bla bla bla") - print(ggur) - # @ai_classifier - # class MemoryRoute(Enum): - # """Represents classifer for semantic fetching of memories""" - # - # storage_of_documents_and_knowledge_to_memory = "SEMANTICMEMORY" - # raw_information_currently_processed_in_short_term_memory = "EPISODICBUFFER" - # raw_information_kept_in_short_term_memory = "SHORTTERMMEMORY" - # long_term_recollections_of_past_events_and_emotions = "EPISODICMEMORY" - # raw_information_to_store_as_events = "EVENTBUFFER" - # - # namespace= MemoryRoute(observation) - - return ggur + # return ggur async def encoding(self, document: str, namespace: str = "EPISODICBUFFER") -> None: """Encoding for the buffer, stores raw data in the buffer @@ -522,21 +605,90 @@ class EpisodicBuffer: # Here we define the user prompt and the structure of the output we desire # prompt = output[0].page_content + # file_upload + # if content is not None: - #We need to encode the content. Note, this is not comp-sci encoding, but rather encoding in the sense of storing the content in the buffer - output_translated = GoogleTranslator(source='auto', target='en').translate(text=content) - await self.encoding(output_translated) - freshness_score =await self.freshness(output_translated, namespace="EPISODICBUFFER") - print(freshness_score) + # operations -> translate, structure, load to db + + list_of_operations = ["translate", "structure", "load to db"] + + prompt_filter = ChatPromptTemplate.from_template( + "Filter and remove uneccessary information that is not relevant in the user query {query}") + chain_filter = prompt_filter | self.llm + output = await chain_filter.ainvoke({"query": user_input}) + + class Task(BaseModel): + """Schema for an individual task.""" + task_order: str = Field(..., description="The order at which the task needs to be performed") + task_name: str = Field(None, description="The task that needs to be performed") + operation: str = Field(None, description="The operation to be performed") + + class TaskList(BaseModel): + """Schema for the record containing a list of tasks.""" + tasks: List[Task] = Field(..., description="List of tasks") + + prompt_filter_chunk = f" Based on available operations {list_of_operations} determine only the relevant list of steps and operations sequentially based {output}" + # chain_filter_chunk = prompt_filter_chunk | self.llm.bind(function_call={"TaskList": "tasks"}, functions=TaskList) + # output_chunk = await chain_filter_chunk.ainvoke({"query": output, "list_of_operations": list_of_operations}) + prompt_msgs = [ + SystemMessage( + content="You are a world class algorithm for decomposing prompts into steps and operations and choosing relevant ones" + ), + HumanMessage(content="Decompose based on the following prompt:"), + HumanMessagePromptTemplate.from_template("{input}"), + HumanMessage(content="Tips: Make sure to answer in the correct format"), + HumanMessage(content="Tips: Only choose actions that are relevant to the user query and ignore others") + + ] + prompt_ = ChatPromptTemplate(messages=prompt_msgs) + chain = create_structured_output_chain(TaskList, self.llm, prompt_, verbose=True) + from langchain.callbacks import get_openai_callback + with get_openai_callback() as cb: + output = await chain.arun(input=prompt_filter_chunk, verbose=True) + print(cb) + # output = json.dumps(output) + my_object = parse_obj_as(TaskList, output) + print("HERE IS THE OUTPUT", my_object.json()) + + data = json.loads(my_object.json()) + + # Extract the list of tasks + tasks_list = data["tasks"] + + for task in tasks_list: + class TranslateText(BaseModel): + observation: str = Field( + description="observation we want to translate" + ) + + @tool("translate_to_en", args_schema=TranslateText, return_direct=True) + def translate_to_en(observation, args_schema=TranslateText): + """Translate to English""" + out = GoogleTranslator(source='auto', target='en').translate(text=observation) + return out + + agent = initialize_agent( + llm=self.llm, + tools=[translate_to_en], + agent=AgentType.OPENAI_FUNCTIONS, + + verbose=True, + ) + + agent.run(task) + + # We need to encode the content. Note, this is not comp-sci encoding, but rather encoding in the sense of storing the content in the buffer + # output_translated = GoogleTranslator(source='auto', target='en').translate(text=content) + # await self.encoding(output_translated) + # freshness_score =await self.freshness(output_translated, namespace="EPISODICBUFFER") + # print(freshness_score) # shows how much the data is relevant for the user, provided by the user in a separate step, starts at 0 - user_relevance_score ="0" + user_relevance_score = "0" # similarity score between the user input and the content already available in the buffer - - - + # write this to episodic memory # prompt_filter = ChatPromptTemplate.from_template("Filter and remove uneccessary information that is not relevant in the user query {query}") # chain_filter = prompt_filter | self.llm @@ -546,12 +698,12 @@ class EpisodicBuffer: if content is None: # Sensory and Linguistic Processing - prompt_filter = ChatPromptTemplate.from_template("Filter and remove uneccessary information that is not relevant in the user query {query}") + prompt_filter = ChatPromptTemplate.from_template( + "Filter and remove uneccessary information that is not relevant in the user query {query}") chain_filter = prompt_filter | self.llm output = await chain_filter.ainvoke({"query": user_input}) translation = GoogleTranslator(source='auto', target='en').translate(text=output.content) - def top_down_processing(): """Top-down processing""" pass @@ -560,29 +712,19 @@ class EpisodicBuffer: """Bottom-up processing""" pass - def interactive_processing(): """interactive processing""" pass + working_memory_activation = "bla" - - - - working_memory_activation = "bla" - - - prompt_chunk = ChatPromptTemplate.from_template("Can you break down the instruction 'Structure a PDF and load it into duckdb' into smaller tasks or actions? Return only tasks or actions. Be brief") + prompt_chunk = ChatPromptTemplate.from_template( + "Can you break down the instruction 'Structure a PDF and load it into duckdb' into smaller tasks or actions? Return only tasks or actions. Be brief") chain_chunk = prompt_chunk | self.llm output_chunks = await chain_chunk.ainvoke({"query": output.content}) - print(output_chunks.content) - - - - # vectorstore = Weaviate.from_documents(documents, embeddings, client=client, by_text=False) # retriever = WeaviateHybridSearchRetriever( # client=client, @@ -598,7 +740,6 @@ class EpisodicBuffer: # query = vector_db. - # retriever = vectorstore.as_retriever(search_kwargs=dict(k=1)) # memory = VectorStoreRetrieverMemory(retriever=retriever) # class PromptWrapper(BaseModel): @@ -728,10 +869,8 @@ class EpisodicBuffer: # return output - - -#DEFINE STM -#DEFINE LTM +# DEFINE STM +# DEFINE LTM class Memory: load_dotenv() @@ -739,7 +878,7 @@ class Memory: OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") def __init__(self, user_id: str = "676", index_name: str = None, knowledge_source: str = None, - knowledge_type: str = None, db_type:str="weaviate", namespace:str=None) -> None: + knowledge_type: str = None, db_type: str = "weaviate", namespace: str = None) -> None: self.user_id = user_id self.index_name = index_name self.db_type = db_type @@ -752,12 +891,13 @@ class Memory: load_dotenv() # Asynchronous factory function for creating LongTermMemory - async def async_create_long_term_memory(self,user_id, memory_id, index_name, namespace, db_type): + async def async_create_long_term_memory(self, user_id, memory_id, index_name, namespace, db_type): # Perform asynchronous initialization steps if needed return LongTermMemory( user_id=user_id, memory_id=memory_id, index_name=index_name, namespace=namespace, db_type=db_type ) + async def async_init(self): # Asynchronous initialization of LongTermMemory and ShortTermMemory self.long_term_memory = await self.async_create_long_term_memory( @@ -785,9 +925,9 @@ class Memory: # user_id=self.user_id, memory_id=self.memory_id, index_name=self.index_name, db_type=self.db_type # ) - async def _add_semantic_memory(self, semantic_memory:str): + async def _add_semantic_memory(self, semantic_memory: str, params: dict = None): return await self.long_term_memory.semantic_memory._add_memories( - semantic_memory=semantic_memory + semantic_memory=semantic_memory, params=params ) @@ -795,67 +935,79 @@ class Memory: return await self.long_term_memory.semantic_memory._fetch_memories( observation=observation, params=params - - ) - async def _delete_semantic_memory(self, params:str=None): + + async def _delete_semantic_memory(self, params: str = None): return await self.long_term_memory.semantic_memory._delete_memories( params=params ) - async def _add_episodic_memory(self, observation:str): + async def _add_episodic_memory(self, observation: str, params: dict = None): return await self.long_term_memory.episodic_memory._add_memories( - observation=observation + observation=observation, params=params ) - async def _fetch_episodic_memory(self, observation, params:str=None): + async def _fetch_episodic_memory(self, observation, params: str = None): return await self.long_term_memory.episodic_memory._fetch_memories( observation=observation, params=params ) - async def _delete_episodic_memory(self, params:str=None): + async def _delete_episodic_memory(self, params: str = None): return await self.long_term_memory.episodic_memory._delete_memories( - params=params + params=params ) - async def _run_buffer(self, user_input:str, content:str=None): + async def _run_buffer(self, user_input: str, content: str = None): return await self.short_term_memory.episodic_buffer.main_buffer(user_input=user_input, content=content) + async def _add_buffer_memory(self, user_input: str, namespace: str = None, params: dict = None): + return await self.short_term_memory.episodic_buffer._add_memories(observation=user_input, namespace=namespace, + params=params) - - async def _add_buffer_memory(self, user_input: str, namespace: str = None ): - return await self.short_term_memory.episodic_buffer._add_memories(observation=user_input, namespace=namespace) - - async def _fetch_buffer_memory(self, user_input: str, namespace: str = None ): + async def _fetch_buffer_memory(self, user_input: str, namespace: str = None): return await self.short_term_memory.episodic_buffer._fetch_memories(observation=user_input, namespace=namespace) - async def _delete_buffer_memory(self, params:str=None): + async def _delete_buffer_memory(self, params: str = None): return await self.long_term_memory.episodic_buffer._delete_memories( - params=params + params=params ) - async def main(): memory = Memory(user_id="123") await memory.async_init() + params = { + "version": "1.0", + "agreement_id": "AG123456", + "privacy_policy": "https://example.com/privacy", + "terms_of_service": "https://example.com/terms", + "format": "json", + "schema_version": "1.1", + "checksum": "a1b2c3d4e5f6", + "owner": "John Doe", + "license": "MIT", + "validity_start": "2023-08-01", + "validity_end": "2024-07-31" + } - # gg = await memory._run_buffer(user_input= "bla", content = "blablabla ") - # print(gg) - - gg = await memory._delete_episodic_memory() + gg = await memory._run_buffer(user_input="i NEED TRANSLATION TO GERMAN ", content="i NEED TRANSLATION TO GERMAN ") print(gg) - # ggur = await memory._add_episodic_memory(observation = "bla bla bla") + # gg = await memory._delete_episodic_memory() + # print(gg) + + # ggur = await memory._add_episodic_memory(observation = "bla bla bla", params=params) # print(ggur) # ggur = await memory._fetch_episodic_memory(observation = "bla bla bla") # print(ggur) # fff = await memory._fetch_memories_buffer(user_input = "bla bla bla", namespace="Test") # print(fff) + if __name__ == "__main__": import asyncio + asyncio.run(main()) # bb = agent._update_semantic_memory(semantic_memory="Users core summary")