diff --git a/level_2/Readme.md b/level_2/Readme.md index 2b1d36675..40756f48f 100644 --- a/level_2/Readme.md +++ b/level_2/Readme.md @@ -44,10 +44,7 @@ Here is a payload example: { "payload": { "user_id": "681", - "session_id": "471", - "model_speed": "slow", "prompt": "I want ", - "pdf_url": "https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/pdf/dummy.pdf", "params": { "version": "1.0", "agreement_id": "AG123456", @@ -60,7 +57,47 @@ Here is a payload example: "license": "MIT", "validity_start": "2023-08-01", "validity_end": "2024-07-31" - } + }, + "loader_settings": { + "format": "PDF", // or "HTML" // or 'DOCX' // or 'TXT' + "source": "url", // or "file" + "path": "https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf" + }, + "attention_modulators": { + "relevance": 0.5, + "saliency": 0.5, + "frequency": 0.5, + "repetition": 0.5, + "length": 0.5, + "position": 0.5, + "context": 0.5, + "emotion": 0.5, + "sentiment": 0.5, + "perspective": 0.5, + "style": 0.5, + "grammar": 0.5, + "spelling": 0.5, + "logic": 0.5, + "coherence": 0.5, + "cohesion": 0.5, + "plausibility": 0.5, + "consistency": 0.5, + "informativeness": 0.5, + "specificity": 0.5, + "detail": 0.5, + "accuracy": 0.5, + "topicality": 0.5, + "focus": 0.5, + "clarity": 0.5, + "simplicity": 0.5, + "naturalness": 0.5, + "fluency": 0.5, + "variety": 0.5, + "vividness": 0.5, + "originality": 0.5, + "creativity": 0.5, + "humor": 0.5, + }, } } ``` \ No newline at end of file diff --git a/level_2/api.py b/level_2/api.py index 2e5a8e9fe..de8d1fc1a 100644 --- a/level_2/api.py +++ b/level_2/api.py @@ -18,6 +18,7 @@ from fastapi import HTTPException from fastapi import FastAPI, UploadFile, File from typing import List import requests + # Set up logging logging.basicConfig( level=logging.INFO, # Set the logging level (e.g., DEBUG, INFO, WARNING, ERROR, CRITICAL) @@ -42,15 +43,16 @@ class ImageResponse(BaseModel): message: str - - -@app.get("/", ) +@app.get( + "/", +) async def root(): """ Root endpoint that returns a welcome message. """ return {"message": "Hello, World, I am alive!"} + @app.get("/health") def health_check(): """ @@ -59,16 +61,13 @@ def health_check(): return {"status": "OK"} - - - - -#curl -X POST -H "Content-Type: application/json" -d '{"data": "YourPayload"}' -F "files=@/path/to/your/pdf/file.pdf" http://127.0.0.1:8000/upload/ +# curl -X POST -H "Content-Type: application/json" -d '{"data": "YourPayload"}' -F "files=@/path/to/your/pdf/file.pdf" http://127.0.0.1:8000/upload/ class Payload(BaseModel): payload: Dict[str, Any] + # @app.post("/upload/", response_model=dict) # async def upload_pdf_and_payload( # payload: Payload, @@ -132,112 +131,94 @@ class Payload(BaseModel): def memory_factory(memory_type): load_dotenv() + class Payload(BaseModel): payload: Dict[str, Any] + @app.post("/{memory_type}/add-memory", response_model=dict) async def add_memory( - payload: Payload, - # files: List[UploadFile] = File(...), + payload: Payload, + # files: List[UploadFile] = File(...), ): try: - logging.info(" Init PDF processing") - decoded_payload = payload.payload - if 'pdf_url' in decoded_payload: - pdf_response = requests.get(decoded_payload['pdf_url']) - pdf_content = pdf_response.content + Memory_ = Memory(user_id=decoded_payload["user_id"]) - logging.info("Downloaded PDF from URL") + await Memory_.async_init() - # Create an in-memory file-like object for the PDF content - pdf_stream = BytesIO(pdf_content) - - contents = pdf_stream.read() - - tmp_location = os.path.join('/tmp', "tmp.pdf") - with open(tmp_location, 'wb') as tmp_file: - tmp_file.write(contents) - - logging.info("Wrote PDF from URL") - - # Process the PDF using PyPDFLoader - loader = PyPDFLoader(tmp_location) - # pages = loader.load_and_split() - logging.info(" PDF split into pages") - - Memory_ = Memory(user_id=decoded_payload['user_id']) - - await Memory_.async_init() - - memory_class = getattr(Memory_, f"_add_{memory_type}_memory", None) - output= await memory_class(observation=str(loader), params =decoded_payload['params']) - return JSONResponse(content={"response": output}, status_code=200) + memory_class = getattr(Memory_, f"_add_{memory_type}_memory", None) + output = await memory_class( + observation=decoded_payload["prompt"], + loader_settings=decoded_payload["loader_settings"], + params=decoded_payload["params"], + ) + return JSONResponse(content={"response": output}, status_code=200) except Exception as e: - - return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) + return JSONResponse( + content={"response": {"error": str(e)}}, status_code=503 + ) @app.post("/{memory_type}/fetch-memory", response_model=dict) async def fetch_memory( - payload: Payload, - # files: List[UploadFile] = File(...), + payload: Payload, + # files: List[UploadFile] = File(...), ): try: - decoded_payload = payload.payload - Memory_ = Memory(user_id=decoded_payload['user_id']) + Memory_ = Memory(user_id=decoded_payload["user_id"]) await Memory_.async_init() memory_class = getattr(Memory_, f"_fetch_{memory_type}_memory", None) - output = memory_class(observation=decoded_payload['prompt']) + output = memory_class(observation=decoded_payload["prompt"]) return JSONResponse(content={"response": output}, status_code=200) except Exception as e: - - return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) + return JSONResponse( + content={"response": {"error": str(e)}}, status_code=503 + ) @app.post("/{memory_type}/delete-memory", response_model=dict) async def delete_memory( - payload: Payload, - # files: List[UploadFile] = File(...), + payload: Payload, + # files: List[UploadFile] = File(...), ): try: - decoded_payload = payload.payload - Memory_ = Memory(user_id=decoded_payload['user_id']) + Memory_ = Memory(user_id=decoded_payload["user_id"]) await Memory_.async_init() memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None) - output = memory_class(observation=decoded_payload['prompt']) + output = memory_class(observation=decoded_payload["prompt"]) return JSONResponse(content={"response": output}, status_code=200) except Exception as e: + return JSONResponse( + content={"response": {"error": str(e)}}, status_code=503 + ) - return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) memory_list = ["episodic", "buffer", "semantic"] for memory_type in memory_list: memory_factory(memory_type) - @app.get("/available-buffer-actions", response_model=dict) async def available_buffer_actions( - payload: Payload, - # files: List[UploadFile] = File(...), + payload: Payload, + # files: List[UploadFile] = File(...), ): try: - decoded_payload = payload.payload - Memory_ = Memory(user_id=decoded_payload['user_id']) + Memory_ = Memory(user_id=decoded_payload["user_id"]) await Memory_.async_init() @@ -246,126 +227,76 @@ async def available_buffer_actions( return JSONResponse(content={"response": output}, status_code=200) except Exception as e: - return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) + @app.post("/run-buffer", response_model=dict) async def available_buffer_actions( - payload: Payload, - # files: List[UploadFile] = File(...), + payload: Payload, + # files: List[UploadFile] = File(...), ): try: - decoded_payload = payload.payload - Memory_ = Memory(user_id=decoded_payload['user_id']) + Memory_ = Memory(user_id=decoded_payload["user_id"]) await Memory_.async_init() # memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None) - output = await Memory_._run_buffer(user_input=decoded_payload['prompt'], params=decoded_payload['params']) + output = await Memory_._run_buffer( + user_input=decoded_payload["prompt"], params=decoded_payload["params"] + ) return JSONResponse(content={"response": output}, status_code=200) except Exception as e: - return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) + @app.post("/buffer/create-context", response_model=dict) async def available_buffer_actions( - payload: Payload, - # files: List[UploadFile] = File(...), + payload: Payload, + # files: List[UploadFile] = File(...), ): try: - decoded_payload = payload.payload - Memory_ = Memory(user_id=decoded_payload['user_id']) + Memory_ = Memory(user_id=decoded_payload["user_id"]) await Memory_.async_init() # memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None) - output = await Memory_._create_buffer_context(user_input=decoded_payload['prompt'], params=decoded_payload['params']) + output = await Memory_._create_buffer_context( + user_input=decoded_payload["prompt"], params=decoded_payload["params"] + ) return JSONResponse(content={"response": output}, status_code=200) except Exception as e: - return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) # - # # Process each uploaded PDF file - # results = [] - # for file in files: - # contents = await file.read() - # tmp_location = os.path.join('/tmp', "tmp.pdf") - # with open(tmp_location, 'wb') as tmp_file: - # tmp_file.write(contents) - # loader = PyPDFLoader(tmp_location) - # pages = loader.load_and_split() - # - # stm = ShortTermMemory(user_id=decoded_payload['user_id']) - # stm.episodic_buffer.main_buffer(prompt=decoded_payload['prompt'], pages=pages) - # # Here you can perform your processing on the PDF contents - # results.append({"filename": file.filename, "size": len(contents)}) - # - # return {"message": "Upload successful", "results": results} - # - # except Exception as e: - # return {"error": str(e)} - - -# @app.post("/clear-cache", response_model=dict) -# async def clear_cache(request_data: Payload) -> dict: -# """ -# Endpoint to clear the cache. +# # Process each uploaded PDF file +# results = [] +# for file in files: +# contents = await file.read() +# tmp_location = os.path.join('/tmp', "tmp.pdf") +# with open(tmp_location, 'wb') as tmp_file: +# tmp_file.write(contents) +# loader = PyPDFLoader(tmp_location) +# pages = loader.load_and_split() # -# Parameters: -# request_data (Payload): The request data containing the user and session IDs. +# stm = ShortTermMemory(user_id=decoded_payload['user_id']) +# stm.episodic_buffer.main_buffer(prompt=decoded_payload['prompt'], pages=pages) +# # Here you can perform your processing on the PDF contents +# results.append({"filename": file.filename, "size": len(contents)}) # -# Returns: -# dict: A dictionary with a message indicating the cache was cleared. -# """ -# json_payload = request_data.payload -# agent = Agent() -# agent.set_user_session(json_payload["user_id"], json_payload["session_id"]) -# try: -# agent.clear_cache() -# return JSONResponse(content={"response": "Cache cleared"}, status_code=200) -# except Exception as e: -# raise HTTPException(status_code=500, detail=str(e)) +# return {"message": "Upload successful", "results": results} # -# @app.post("/correct-prompt-grammar", response_model=dict) -# async def prompt_to_correct_grammar(request_data: Payload) -> dict: -# json_payload = request_data.payload -# agent = Agent() -# agent.set_user_session(json_payload["user_id"], json_payload["session_id"]) -# logging.info("Correcting grammar %s", json_payload["prompt_source"]) +# except Exception as e: +# return {"error": str(e)} + + # -# output = agent.prompt_correction(json_payload["prompt_source"], model_speed= json_payload["model_speed"]) -# return JSONResponse(content={"response": {"result": json.loads(output)}}) - - -# @app.post("/action-add-zapier-calendar-action", response_model=dict,dependencies=[Depends(auth)]) -# async def action_add_zapier_calendar_action( -# request: Request, request_data: Payload -# ) -> dict: -# json_payload = request_data.payload -# agent = Agent() -# agent.set_user_session(json_payload["user_id"], json_payload["session_id"]) -# # Extract the bearer token from the header -# auth_header = request.headers.get("Authorization") -# if auth_header: -# bearer_token = auth_header.replace("Bearer ", "") -# else: -# bearer_token = None -# outcome = agent.add_zapier_calendar_action( -# prompt_base=json_payload["prompt_base"], -# token=bearer_token, -# model_speed=json_payload["model_speed"], -# ) -# return JSONResponse(content={"response": outcome}) - - def start_api_server(host: str = "0.0.0.0", port: int = 8000): """ diff --git a/level_2/level_2_pdf_vectorstore__dlt_contracts.py b/level_2/level_2_pdf_vectorstore__dlt_contracts.py index 85598728c..73d9e00fd 100644 --- a/level_2/level_2_pdf_vectorstore__dlt_contracts.py +++ b/level_2/level_2_pdf_vectorstore__dlt_contracts.py @@ -1,129 +1,123 @@ # Make sure to install the following packages: dlt, langchain, duckdb, python-dotenv, openai, weaviate-client - -import dlt -from langchain import PromptTemplate, LLMChain -from langchain.agents import initialize_agent, AgentType -from langchain.chains.openai_functions import create_structured_output_chain -from langchain.chat_models import ChatOpenAI -from langchain.document_loaders import PyPDFLoader -import weaviate -import os import json -import asyncio -from typing import Any, Dict, List, Coroutine -from deep_translator import (GoogleTranslator) -from langchain.chat_models import ChatOpenAI -from langchain.output_parsers import PydanticOutputParser -from langchain.schema import LLMResult, HumanMessage -from langchain.callbacks.base import AsyncCallbackHandler, BaseCallbackHandler -from pydantic import BaseModel, Field, parse_obj_as -from langchain.memory import VectorStoreRetrieverMemory -from marvin import ai_classifier from enum import Enum +from io import BytesIO +from typing import Dict, List, Union + import marvin -import asyncio -from langchain.embeddings import OpenAIEmbeddings -from langchain.prompts import HumanMessagePromptTemplate, ChatPromptTemplate -from langchain.retrievers import WeaviateHybridSearchRetriever -from langchain.schema import Document, SystemMessage, HumanMessage, LLMResult -from langchain.tools import tool -from langchain.vectorstores import Weaviate -import uuid +import requests +from deep_translator import GoogleTranslator from dotenv import load_dotenv +from langchain.agents import initialize_agent, AgentType +from langchain.document_loaders import PyPDFLoader +from langchain.output_parsers import PydanticOutputParser +from langchain.retrievers import WeaviateHybridSearchRetriever +from langchain.tools import tool +from marvin import ai_classifier +from pydantic import parse_obj_as load_dotenv() -from pathlib import Path -from langchain import OpenAI, LLMMathChain +from langchain import OpenAI from langchain.chat_models import ChatOpenAI -from langchain.prompts import ChatPromptTemplate -import uuid from typing import Optional +import tracemalloc +tracemalloc.start() -from datetime import datetime import os from datetime import datetime -from jinja2 import Template -from langchain import PromptTemplate, LLMChain +from langchain import PromptTemplate from langchain.chains.openai_functions import create_structured_output_chain from langchain.prompts import HumanMessagePromptTemplate, ChatPromptTemplate -from langchain.text_splitter import RecursiveCharacterTextSplitter -import pinecone -from langchain.vectorstores import Pinecone from langchain.embeddings.openai import OpenAIEmbeddings from pydantic import BaseModel, Field from dotenv import load_dotenv from langchain.schema import Document, SystemMessage, HumanMessage -from langchain.vectorstores import Weaviate -import weaviate import uuid import humanize -import pinecone import weaviate + load_dotenv() OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") +marvin.settings.openai.api_key = os.environ.get("OPENAI_API_KEY") - -class MyCustomSyncHandler(BaseCallbackHandler): - def on_llm_new_token(self, token: str, **kwargs) -> None: - print(f"Sync handler being called in a `thread_pool_executor`: token: {token}") - - -class MyCustomAsyncHandler(AsyncCallbackHandler): - """Async callback handler that can be used to handle callbacks from langchain.""" - - async def on_llm_start( - self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any - ) -> None: - """Run when chain starts running.""" - print("zzzz....") - await asyncio.sleep(0.3) - class_name = serialized["name"] - print("Hi! I just woke up. Your llm is starting") - - async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: - """Run when chain ends running.""" - print("zzzz....") - await asyncio.sleep(0.3) - print("Hi! I just woke up. Your llm is ending") - - +# class MyCustomSyncHandler(BaseCallbackHandler): +# def on_llm_new_token(self, token: str, **kwargs) -> None: +# print(f"Sync handler being called in a `thread_pool_executor`: token: {token}") +# +# +# class MyCustomAsyncHandler(AsyncCallbackHandler): +# """Async callback handler that can be used to handle callbacks from langchain.""" +# +# async def on_llm_start( +# self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any +# ) -> None: +# """Run when chain starts running.""" +# print("zzzz....") +# await asyncio.sleep(0.3) +# class_name = serialized["name"] +# print("Hi! I just woke up. Your llm is starting") +# +# async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: +# """Run when chain ends running.""" +# print("zzzz....") +# await asyncio.sleep(0.3) +# print("Hi! I just woke up. Your llm is ending") +# +# # Assuming OpenAIEmbeddings and other necessary imports are available # Default Values -LTM_MEMORY_ID_DEFAULT = '00000' -ST_MEMORY_ID_DEFAULT = '0000' -BUFFER_ID_DEFAULT = '0000' +LTM_MEMORY_ID_DEFAULT = "00000" +ST_MEMORY_ID_DEFAULT = "0000" +BUFFER_ID_DEFAULT = "0000" class VectorDBFactory: - def create_vector_db(self, user_id: str, index_name: str, memory_id: str, - ltm_memory_id: str = LTM_MEMORY_ID_DEFAULT, - st_memory_id: str = ST_MEMORY_ID_DEFAULT, - buffer_id: str = BUFFER_ID_DEFAULT, - db_type: str = "pinecone", - namespace: str = None): - db_map = { - "pinecone": PineconeVectorDB, - "weaviate": WeaviateVectorDB - } + def create_vector_db( + self, + user_id: str, + index_name: str, + memory_id: str, + ltm_memory_id: str = LTM_MEMORY_ID_DEFAULT, + st_memory_id: str = ST_MEMORY_ID_DEFAULT, + buffer_id: str = BUFFER_ID_DEFAULT, + db_type: str = "pinecone", + namespace: str = None, + ): + db_map = {"pinecone": PineconeVectorDB, "weaviate": WeaviateVectorDB} if db_type in db_map: - return db_map[db_type](user_id, index_name, memory_id, ltm_memory_id, st_memory_id, buffer_id, namespace) + return db_map[db_type]( + user_id, + index_name, + memory_id, + ltm_memory_id, + st_memory_id, + buffer_id, + namespace, + ) raise ValueError(f"Unsupported database type: {db_type}") class VectorDB: OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") - def __init__(self, user_id: str, index_name: str, memory_id: str, - ltm_memory_id: str = LTM_MEMORY_ID_DEFAULT, - st_memory_id: str = ST_MEMORY_ID_DEFAULT, - buffer_id: str = BUFFER_ID_DEFAULT, namespace: str = None): + + def __init__( + self, + user_id: str, + index_name: str, + memory_id: str, + ltm_memory_id: str = LTM_MEMORY_ID_DEFAULT, + st_memory_id: str = ST_MEMORY_ID_DEFAULT, + buffer_id: str = BUFFER_ID_DEFAULT, + namespace: str = None, + ): self.user_id = user_id self.index_name = index_name self.namespace = namespace @@ -151,14 +145,13 @@ class WeaviateVectorDB(VectorDB): def init_weaviate(self, namespace: str): # Weaviate initialization logic embeddings = OpenAIEmbeddings() - auth_config = weaviate.auth.AuthApiKey(api_key=os.environ.get('WEAVIATE_API_KEY')) + auth_config = weaviate.auth.AuthApiKey( + api_key=os.environ.get("WEAVIATE_API_KEY") + ) client = weaviate.Client( - url=os.environ.get('WEAVIATE_URL'), + url=os.environ.get("WEAVIATE_URL"), auth_client_secret=auth_config, - - additional_headers={ - "X-OpenAI-Api-Key": os.environ.get('OPENAI_API_KEY') - } + additional_headers={"X-OpenAI-Api-Key": os.environ.get("OPENAI_API_KEY")}, ) retriever = WeaviateHybridSearchRetriever( client=client, @@ -168,61 +161,105 @@ class WeaviateVectorDB(VectorDB): embedding=embeddings, create_schema_if_missing=True, ) - return retriever # If this is part of the initialization, call it here. + return retriever # If this is part of the initialization, call it here. def init_weaviate_client(self, namespace: str): # Weaviate client initialization logic - auth_config = weaviate.auth.AuthApiKey(api_key=os.environ.get('WEAVIATE_API_KEY')) + auth_config = weaviate.auth.AuthApiKey( + api_key=os.environ.get("WEAVIATE_API_KEY") + ) client = weaviate.Client( - url=os.environ.get('WEAVIATE_URL'), + url=os.environ.get("WEAVIATE_URL"), auth_client_secret=auth_config, - - additional_headers={ - "X-OpenAI-Api-Key": os.environ.get('OPENAI_API_KEY') - } + additional_headers={"X-OpenAI-Api-Key": os.environ.get("OPENAI_API_KEY")}, ) return client - async def add_memories(self, observation: str, params: dict = None): + def _document_loader(self, observation: str, loader_settings: dict): + # Create an in-memory file-like object for the PDF content + if loader_settings.get("format") == "PDF": + + if loader_settings.get("source") == "url": + pdf_response = requests.get(loader_settings["path"]) + pdf_stream = BytesIO(pdf_response.content) + contents = pdf_stream.read() + tmp_location = os.path.join("/tmp", "tmp.pdf") + with open(tmp_location, "wb") as tmp_file: + tmp_file.write(contents) + + # Process the PDF using PyPDFLoader + loader = PyPDFLoader(tmp_location) + # adapt this for different chunking strategies + pages = loader.load_and_split() + return pages + + if loader_settings.get("source") == "file": + # Process the PDF using PyPDFLoader + # might need adapting for different loaders + OCR + # need to test the path + loader = PyPDFLoader(loader_settings["path"]) + pages = loader.load_and_split() + + return pages + else: + # Process the text by just loading the base text + return observation + + + async def add_memories( + self, observation: str, loader_settings: dict = None, params: dict = None + ): # Update Weaviate memories here print(self.namespace) retriever = self.init_weaviate(self.namespace) - return retriever.add_documents([ + def _stuct(observation, params): + """Utility function to not repeat metadata structure""" + # needs smarter solution, like dynamic generation of metadata + return [ Document( metadata={ - "text": observation, + # "text": observation, "user_id": str(self.user_id), "memory_id": str(self.memory_id), "ltm_memory_id": str(self.ltm_memory_id), "st_memory_id": str(self.st_memory_id), "buffer_id": str(self.buffer_id), - "version": params.get('version', None) or "", - "agreement_id": params.get('agreement_id', None) or "", - "privacy_policy": params.get('privacy_policy', None) or "", - "terms_of_service": params.get('terms_of_service', None) or "", - "format": params.get('format', None) or "", - "schema_version": params.get('schema_version', None) or "", - "checksum": params.get('checksum', None) or "", - "owner": params.get('owner', None) or "", - "license": params.get('license', None) or "", - "validity_start": params.get('validity_start', None) or "", - "validity_end": params.get('validity_end', None) or "" - + "version": params.get("version", None) or "", + "agreement_id": params.get("agreement_id", None) or "", + "privacy_policy": params.get("privacy_policy", None) or "", + "terms_of_service": params.get("terms_of_service", None) or "", + "format": params.get("format", None) or "", + "schema_version": params.get("schema_version", None) or "", + "checksum": params.get("checksum", None) or "", + "owner": params.get("owner", None) or "", + "license": params.get("license", None) or "", + "validity_start": params.get("validity_start", None) or "", + "validity_end": params.get("validity_end", None) or "" # **source_metadata, }, page_content=observation, - )] + ) + ] + + if loader_settings: + # Load the document + document = self._document_loader(observation, loader_settings) + print("DOC LENGTH", len(document)) + for doc in document: + document_to_load = _stuct(doc.page_content, params) + retriever.add_documents( + document_to_load + ) + + return retriever.add_documents( + _stuct(observation, params) ) - # def get_pinecone_vectorstore(self, namespace: str) -> pinecone.VectorStore: - # return Pinecone.from_existing_index( - # index_name=self.index, embedding=OpenAIEmbeddings(), namespace=namespace - # ) - - async def fetch_memories(self, observation: str, namespace: str, params: dict = None): - + async def fetch_memories( + self, observation: str, namespace: str, params: dict = None + ): # Fetch Weaviate memories here """ Get documents from weaviate. @@ -247,49 +284,75 @@ class WeaviateVectorDB(VectorDB): params_user_id = { "path": ["user_id"], "operator": "Like", - "valueText": self.user_id + "valueText": self.user_id, } if params: - query_output = client.query.get(namespace, ["text" - , "user_id" - , "memory_id" - , "ltm_memory_id" - , "st_memory_id" - , "buffer_id" - , "version", - "agreement_id", - "privacy_policy", - "terms_of_service", - "format", - "schema_version", - "checksum", - "owner", - "license", - "validity_start", - "validity_end"]).with_where(params).with_additional( - ['id', 'creationTimeUnix', 'lastUpdateTimeUnix', "score"]).with_where(params_user_id).do() + query_output = ( + client.query.get( + namespace, + [ + # "text", + "user_id", + "memory_id", + "ltm_memory_id", + "st_memory_id", + "buffer_id", + "version", + "agreement_id", + "privacy_policy", + "terms_of_service", + "format", + "schema_version", + "checksum", + "owner", + "license", + "validity_start", + "validity_end", + ], + ) + .with_where(params) + .with_near_text({"concepts": [observation]}) + .with_additional( + ["id", "creationTimeUnix", "lastUpdateTimeUnix", "score",'distance'] + ) + .with_where(params_user_id) + .do() + ) return query_output else: - query_output = client.query.get(namespace, ["text", - "user_id", - "memory_id", - "ltm_memory_id", - "st_memory_id", - "buffer_id", - "version", - "agreement_id", - "privacy_policy", - "terms_of_service", - "format", - "schema_version", - "checksum", - "owner", - "license", - "validity_start", - "validity_end" - ]).with_additional( - ['id', 'creationTimeUnix', 'lastUpdateTimeUnix', "score"]).with_where(params_user_id).do() + query_output = ( + client.query.get( + namespace, + + [ + "text", + "user_id", + "memory_id", + "ltm_memory_id", + "st_memory_id", + "buffer_id", + "version", + "agreement_id", + "privacy_policy", + "terms_of_service", + "format", + "schema_version", + "checksum", + "owner", + "license", + "validity_start", + "validity_end", + ], + ) + .with_additional( + ["id", "creationTimeUnix", "lastUpdateTimeUnix", "score", 'distance'] + ) + .with_near_text({"concepts": [observation]}) + .with_autocut(1) + .with_where(params_user_id) + .do() + ) return query_output async def delete_memories(self, params: dict = None): @@ -298,7 +361,7 @@ class WeaviateVectorDB(VectorDB): where_filter = { "path": ["id"], "operator": "Equal", - "valueText": params.get('id', None) + "valueText": params.get("id", None), } return client.batch.delete_objects( class_name=self.namespace, @@ -311,10 +374,10 @@ class WeaviateVectorDB(VectorDB): return client.batch.delete_objects( class_name=self.namespace, where={ - 'path': ['user_id'], - 'operator': 'Equal', - 'valueText': self.user_id - } + "path": ["user_id"], + "operator": "Equal", + "valueText": self.user_id, + }, ) def update_memories(self, observation, namespace: str, params: dict = None): @@ -322,35 +385,41 @@ class WeaviateVectorDB(VectorDB): client.data_object.update( data_object={ - "text": observation, + # "text": observation, "user_id": str(self.user_id), "memory_id": str(self.memory_id), "ltm_memory_id": str(self.ltm_memory_id), "st_memory_id": str(self.st_memory_id), "buffer_id": str(self.buffer_id), - "version": params.get('version', None) or "", - "agreement_id": params.get('agreement_id', None) or "", - "privacy_policy": params.get('privacy_policy', None) or "", - "terms_of_service": params.get('terms_of_service', None) or "", - "format": params.get('format', None) or "", - "schema_version": params.get('schema_version', None) or "", - "checksum": params.get('checksum', None) or "", - "owner": params.get('owner', None) or "", - "license": params.get('license', None) or "", - "validity_start": params.get('validity_start', None) or "", - "validity_end": params.get('validity_end', None) or "" - + "version": params.get("version", None) or "", + "agreement_id": params.get("agreement_id", None) or "", + "privacy_policy": params.get("privacy_policy", None) or "", + "terms_of_service": params.get("terms_of_service", None) or "", + "format": params.get("format", None) or "", + "schema_version": params.get("schema_version", None) or "", + "checksum": params.get("checksum", None) or "", + "owner": params.get("owner", None) or "", + "license": params.get("license", None) or "", + "validity_start": params.get("validity_start", None) or "", + "validity_end": params.get("validity_end", None) or "" # **source_metadata, - }, class_name="Test", - uuid=params.get('id', None), + uuid=params.get("id", None), consistency_level=weaviate.data.replication.ConsistencyLevel.ALL, # default QUORUM ) return + class BaseMemory: - def __init__(self, user_id: str, memory_id: Optional[str], index_name: Optional[str], db_type: str, namespace: str): + def __init__( + self, + user_id: str, + memory_id: Optional[str], + index_name: Optional[str], + db_type: str, + namespace: str, + ): self.user_id = user_id self.memory_id = memory_id self.index_name = index_name @@ -358,17 +427,40 @@ class BaseMemory: self.memory_type_id = str(uuid.uuid4()) self.db_type = db_type factory = VectorDBFactory() - self.vector_db = factory.create_vector_db(self.user_id, self.index_name, self.memory_id, db_type=self.db_type, - namespace=self.namespace) + self.vector_db = factory.create_vector_db( + self.user_id, + self.index_name, + self.memory_id, + db_type=self.db_type, + namespace=self.namespace, + ) - async def add_memories(self, observation: Optional[str] = None, params: Optional[dict] = None): + def init_client(self, namespace: str): if self.db_type == "weaviate": - return await self.vector_db.add_memories(observation=observation, params=params) + return self.vector_db.init_weaviate_client(namespace) + + async def add_memories( + self, + observation: Optional[str] = None, + loader_settings: dict = None, + params: Optional[dict] = None, + ): + if self.db_type == "weaviate": + return await self.vector_db.add_memories( + observation=observation, loader_settings=loader_settings, params=params + ) # Add other db_type conditions if necessary - async def fetch_memories(self, observation: str, params: Optional[str] = None): + async def fetch_memories( + self, + observation: str, + params: Optional[str] = None, + namespace: Optional[str] = None, + ): if self.db_type == "weaviate": - return await self.vector_db.fetch_memories(observation, params) + return await self.vector_db.fetch_memories( + observation=observation, params=params, namespace=namespace + ) async def delete_memories(self, params: Optional[str] = None): if self.db_type == "weaviate": @@ -378,180 +470,289 @@ class BaseMemory: class SemanticMemory(BaseMemory): - def __init__(self, user_id: str, memory_id: Optional[str], index_name: Optional[str], db_type: str = "weaviate"): - super().__init__(user_id, memory_id, index_name, db_type, namespace="SEMANTICMEMORY") + def __init__( + self, + user_id: str, + memory_id: Optional[str], + index_name: Optional[str], + db_type: str = "weaviate", + ): + super().__init__( + user_id, memory_id, index_name, db_type, namespace="SEMANTICMEMORY" + ) class EpisodicMemory(BaseMemory): - def __init__(self, user_id: str, memory_id: Optional[str], index_name: Optional[str], db_type: str = "weaviate"): - super().__init__(user_id, memory_id, index_name, db_type, namespace="EPISODICMEMORY") + def __init__( + self, + user_id: str, + memory_id: Optional[str], + index_name: Optional[str], + db_type: str = "weaviate", + ): + super().__init__( + user_id, memory_id, index_name, db_type, namespace="EPISODICMEMORY" + ) class EpisodicBuffer(BaseMemory): - def __init__(self, user_id: str, memory_id: Optional[str], index_name: Optional[str], db_type: str = "weaviate"): - super().__init__(user_id, memory_id, index_name, db_type, namespace="BUFFERMEMORY") + def __init__( + self, + user_id: str, + memory_id: Optional[str], + index_name: Optional[str], + db_type: str = "weaviate", + ): + super().__init__( + user_id, memory_id, index_name, db_type, namespace="BUFFERMEMORY" + ) self.st_memory_id = "blah" self.llm = ChatOpenAI( temperature=0.0, max_tokens=1200, - openai_api_key=os.environ.get('OPENAI_API_KEY'), + openai_api_key=os.environ.get("OPENAI_API_KEY"), model_name="gpt-4-0613", - callbacks=[MyCustomSyncHandler(), MyCustomAsyncHandler()], + # callbacks=[MyCustomSyncHandler(), MyCustomAsyncHandler()], ) self.llm_base = OpenAI( temperature=0.0, max_tokens=1200, - openai_api_key=os.environ.get('OPENAI_API_KEY'), - model_name="gpt-4-0613" + openai_api_key=os.environ.get("OPENAI_API_KEY"), + model_name="gpt-4-0613", ) + async def memory_route(self, text_time_diff: str): + @ai_classifier + class MemoryRoute(Enum): + """Represents classifer for freshness of memories""" + + data_uploaded_now = "1" + data_uploaded_very_recently = "0.9" + data_uploaded_recently = "0.7" + data_uploaded_more_than_a_month_ago = "0.5" + data_uploaded_more_than_three_months_ago = "0.3" + data_uploaded_more_than_six_months_ago = "0.1" + + namespace = MemoryRoute(str(text_time_diff)) + + return namespace async def freshness(self, observation: str, namespace: str = None) -> list[str]: - """Freshness - Score between 1 and 5 on how often was the information updated in episodic or semantic memory in the past""" + """Freshness - Score between 0 and 1 on how often was the information updated in episodic or semantic memory in the past""" - memory = Memory(user_id=self.user_id) - await memory.async_init() - - lookup_value = await memory._fetch_episodic_memory(observation=observation) - unix_t = lookup_value["data"]["Get"]["EPISODICMEMORY"][0]["_additional"]["lastUpdateTimeUnix"] + lookup_value = await self.fetch_memories( + observation=observation, namespace=namespace + ) + unix_t = lookup_value["data"]["Get"]["EPISODICMEMORY"][0]["_additional"][ + "lastUpdateTimeUnix" + ] # Convert Unix timestamp to datetime last_update_datetime = datetime.fromtimestamp(int(unix_t) / 1000) time_difference = datetime.now() - last_update_datetime time_difference_text = humanize.naturaltime(time_difference) - marvin.settings.openai.api_key = os.environ.get('OPENAI_API_KEY') - - @ai_classifier - class MemoryRoute(Enum): - """Represents classifer for freshness of memories""" - - data_uploaded_now = "0" - data_uploaded_very_recently = "1" - data_uploaded_recently = "2" - data_uploaded_more_than_a_month_ago = "3" - data_uploaded_more_than_three_months_ago = "4" - data_uploaded_more_than_six_months_ago = "5" - - namespace = MemoryRoute(str(time_difference_text)) + print("WE GOT THIS FAR") + namespace = await self.memory_route(str(time_difference_text)) return [namespace.value, lookup_value] async def frequency(self, observation: str, namespace: str) -> list[str]: - """Frequency - Score between 1 and 5 on how often was the information processed in episodic memory in the past - Counts the number of times a memory was accessed in the past and divides it by the total number of memories in the episodic memory """ - client = self.init_weaviate_client(self.namespace) + """Frequency - Score between 0 and 1 on how often was the information processed in episodic memory in the past + Counts the number of times a memory was accessed in the past and divides it by the total number of memories in the episodic memory + """ + weaviate_client = self.init_client(namespace=namespace) - memory = Memory(user_id=self.user_id) - await memory.async_init() - - result_output = await memory._fetch_episodic_memory(observation=observation) + result_output = await self.fetch_memories( + observation=observation, params=None, namespace=namespace + ) number_of_relevant_events = len(result_output["data"]["Get"]["EPISODICMEMORY"]) - number_of_total_events = client.query.aggregate(self.namespace).with_meta_count().do() - frequency = float(number_of_relevant_events) / float(number_of_total_events) + number_of_total_events = ( + weaviate_client.query.aggregate(namespace).with_meta_count().do() + ) + print(number_of_total_events) + print( + number_of_total_events["data"]["Aggregate"]["EPISODICMEMORY"][0]["meta"][ + "count" + ] + ) + frequency = float(number_of_relevant_events) / float( + number_of_total_events["data"]["Aggregate"]["EPISODICMEMORY"][0]["meta"][ + "count" + ] + ) return [str(frequency), result_output["data"]["Get"]["EPISODICMEMORY"][0]] - async def relevance(self, observation: str) -> list[str]: - """Relevance - Score between 1 and 5 on how often was the final information relevant to the user in the past. - Stored in the episodic memory, mainly to show how well a buffer did the job - Starts at 1, gets updated based on the user feedback """ + async def relevance(self, observation: str, namespace: str) -> list[str]: + """Relevance - Score between 0 and 1 on how often was the final information relevant to the user in the past. + Stored in the episodic memory, mainly to show how well a buffer did the job + Starts at 0, gets updated based on the user feedback""" - return ["5", "memory"] + print("WE START WITH RELEVANCE") + + weaviate_client = self.init_client(namespace=namespace) + + result_output = await self.fetch_memories( + observation=observation, params=None, namespace=namespace + ) + + print("HERE IS THE RESULT OUTPUT TO DETERMINE RELEVANCE: ", result_output) + + return ["0", "memory"] async def saliency(self, observation: str) -> list[str]: """Determines saliency by finding relevance between user input and document schema values. - After finding document schena value relevant for the user, it forms a new query based on the schema value and the user input """ + After finding document schena value relevant for the user, it forms a new query based on the schema value and the user input + """ - return ["5", "memory"] + return ["0", "memory"] - # @ai_classifier - # class MemoryRoute(Enum): - # """Represents classifer for freshness of memories""" - # - # data_uploaded_now = "0" - # data_uploaded_very_recently = "1" - # data_uploaded_recently = "2" - # data_uploaded_more_than_a_month_ago = "3" - # data_uploaded_more_than_three_months_ago = "4" - # data_uploaded_more_than_six_months_ago = "5" - # - # namespace= MemoryRoute(observation) - - # return ggur - - async def encoding(self, document: str, namespace: str = "EPISODICBUFFER", params: dict = None) -> list[str]: + async def encoding( + self, document: str, namespace: str = "EPISODICBUFFER", params: dict = None + ) -> list[str]: """Encoding for the buffer, stores raw data in the buffer - Note, this is not comp-sci encoding, but rather encoding in the sense of storing the content in the buffer""" + Note, this is not comp-sci encoding, but rather encoding in the sense of storing the content in the buffer + """ query = await self.add_memories(document, params=params) return query + async def handle_modulator( + self, + modulator_name: str, + attention_modulators: Dict[str, float], + observation: str, + namespace: Optional[str] = None, + ) -> Optional[List[Union[str, float]]]: + """ + Handle the given modulator based on the observation and namespace. + + Parameters: + - modulator_name: Name of the modulator to handle. + - attention_modulators: Dictionary of modulator values. + - observation: The current observation. + - namespace: An optional namespace. + + Returns: + - Result of the modulator if criteria met, else None. + """ + modulator_value = attention_modulators.get(modulator_name, 0.0) + modulator_functions = { + "freshness": lambda obs, ns: self.freshness(observation=obs, namespace=ns), + "frequency": lambda obs, ns: self.frequency(observation=obs, namespace=ns), + "relevance": lambda obs, ns: self.relevance(observation=obs, namespace=ns), + } + + result_func = modulator_functions.get(modulator_name) + if not result_func: + return None + + result = await result_func(observation, namespace) + if not result: + return None + + try: + if float(modulator_value) >= float(result[0]): + return result + except ValueError: + pass + + return None + async def available_operations(self) -> list[str]: """Determines what operations are available for the user to process PDFs""" - return ["translate", "structure", "load to database", "load to semantic memory", "load to episodic memory", - "load to buffer"] + return [ + "translate", + "structure", + "load to database", + "load to semantic memory", + "load to episodic memory", + "load to buffer", + ] - async def buffer_context(self, user_input=None, content=None, params=None): + async def buffer_context( + self, + user_input=None, + content=None, + params=None, + attention_modulators: dict = None, + ): """Generates the context to be used for the buffer and passed to the agent""" - # we get a list of available operations for our buffer to consider - # these operations are what we can do with the data, in the context of PDFs (load, translate, structure, etc) - list_of_operations = await self.available_operations() - try: # we delete all memories in the episodic buffer, so we can start fresh await self.delete_memories() except: # in case there are no memories, we pass pass - # we just filter the data here to make sure input is clean prompt_filter = ChatPromptTemplate.from_template( - "Filter and remove uneccessary information that is not relevant in the user query, keep it as original as possbile: {query}") + "Filter and remove uneccessary information that is not relevant in the user query, keep it as original as possbile: {query}" + ) chain_filter = prompt_filter | self.llm output = await chain_filter.ainvoke({"query": user_input}) - # this part is mostly unfinished but the idea is to apply different algorithms to the data to fetch the most relevant information from the vector stores + # this part is unfinished but the idea is to apply different attention modulators to the data to fetch the most relevant information from the vector stores context = [] - if params: + if attention_modulators: + print("HERE ARE THE ATTENTION MODULATORS: ", attention_modulators) + from typing import Optional, Dict, List, Union - if "freshness" in params: - params.get('freshness', None) # get the value of freshness - freshness = await self.freshness(observation=str(output)) - context.append(freshness) - - elif "frequency" in params: - params.get('freshness', None) - frequency = await self.freshness(observation=str(output)) - print("freshness", frequency) - context.append(frequency) - - # fix this so it actually filters + lookup_value_semantic = await self.fetch_memories( + observation=str(output), namespace="SEMANTICMEMORY" + ) + context = [] + for memory in lookup_value_semantic["data"]["Get"]["SEMANTICMEMORY"]: + # extract memory id, and pass it to fetch function as a parameter + modulators = list(attention_modulators.keys()) + for modulator in modulators: + result = await self.handle_modulator( + modulator, + attention_modulators, + str(output), + namespace="EPISODICMEMORY", + ) + if result: + context.append(result) + context.append(memory) else: # defaults to semantic search if we don't want to apply algorithms on the vectordb data - memory = Memory(user_id=self.user_id) - await memory.async_init() - - lookup_value_episodic = await memory._fetch_episodic_memory(observation=str(output)) - lookup_value_semantic = await memory._fetch_semantic_memory(observation=str(output)) + lookup_value_episodic = await self.fetch_memories( + observation=str(output), namespace="EPISODICMEMORY" + ) + lookup_value_semantic = await self.fetch_memories( + observation=str(output), namespace="SEMANTICMEMORY" + ) lookup_value_buffer = await self.fetch_memories(observation=str(output)) context.append(lookup_value_buffer) context.append(lookup_value_semantic) context.append(lookup_value_episodic) - # copy the context over into the buffer - # do i need to do it for the episodic + raw data, might make sense - print("HERE IS THE CONTEXT", context) + class BufferModulators(BaseModel): + frequency: str = Field(..., description="Frequency score of the document") + saliency: str = Field(..., description="Saliency score of the document") + relevance: str = Field(..., description="Relevance score of the document") class BufferRawContextTerms(BaseModel): """Schema for documentGroups""" - semantic_search_term: str = Field(..., - description="The search term to use to get relevant input based on user query") - document_description: str = Field(None, description="The short summary of what the document is about") - document_relevance: str = Field(None, - description="The relevance of the document for the task on the scale from 1 to 5") + + semantic_search_term: str = Field( + ..., + description="The search term to use to get relevant input based on user query", + ) + document_description: str = Field( + None, description="The short summary of what the document is about" + ) + document_relevance: str = Field( + None, + description="The relevance of the document for the task on the scale from 0 to 1", + ) + attention_modulators_list: List[BufferModulators] = Field( + ..., description="List of modulators" + ) class BufferRawContextList(BaseModel): """Buffer raw context processed by the buffer""" + docs: List[BufferRawContextTerms] = Field(..., description="List of docs") user_query: str = Field(..., description="The original user query") @@ -564,25 +765,44 @@ class EpisodicBuffer(BaseMemory): ) _input = prompt.format_prompt(query=user_input, context=context) - document_context_result = self.llm_base(_input.to_string()) - document_context_result_parsed = parser.parse(document_context_result) - print("HERE ARE THE DOCS PARSED AND STRUCTURED", document_context_result_parsed) + return document_context_result_parsed + + async def get_task_list( + self, user_input=None, content=None, params=None, attention_modulators=None + ): + """Gets the task list from the document context result to enchance it and be able to pass to the agent""" + list_of_operations = await self.available_operations() + document_context_result_parsed = await self.buffer_context( + user_input=user_input, + content=content, + params=params, + attention_modulators=attention_modulators, + ) + class Task(BaseModel): """Schema for an individual task.""" - task_order: str = Field(..., description="The order at which the task needs to be performed") - task_name: str = Field(None, description="The task that needs to be performed") + + task_order: str = Field( + ..., description="The order at which the task needs to be performed" + ) + task_name: str = Field( + None, description="The task that needs to be performed" + ) operation: str = Field(None, description="The operation to be performed") - original_query: str = Field(None, description="Original user query provided") + original_query: str = Field( + None, description="Original user query provided" + ) class TaskList(BaseModel): """Schema for the record containing a list of tasks.""" + tasks: List[Task] = Field(..., description="List of tasks") - prompt_filter_chunk = f"The raw context data is {str(document_context_result_parsed)} Based on available operations {list_of_operations} determine only the relevant list of steps and operations sequentially based {output}" + prompt_filter_chunk = f"The raw context data is {str(document_context_result_parsed)} Based on available operations {list_of_operations} determine only the relevant list of steps and operations sequentially based {user_input}" # chain_filter_chunk = prompt_filter_chunk | self.llm.bind(function_call={"TaskList": "tasks"}, functions=TaskList) # output_chunk = await chain_filter_chunk.ainvoke({"query": output, "list_of_operations": list_of_operations}) prompt_msgs = [ @@ -592,12 +812,16 @@ class EpisodicBuffer(BaseMemory): HumanMessage(content="Decompose based on the following prompt:"), HumanMessagePromptTemplate.from_template("{input}"), HumanMessage(content="Tips: Make sure to answer in the correct format"), - HumanMessage(content="Tips: Only choose actions that are relevant to the user query and ignore others") - + HumanMessage( + content="Tips: Only choose actions that are relevant to the user query and ignore others" + ), ] prompt_ = ChatPromptTemplate(messages=prompt_msgs) - chain = create_structured_output_chain(TaskList, self.llm, prompt_, verbose=True) + chain = create_structured_output_chain( + TaskList, self.llm, prompt_, verbose=True + ) from langchain.callbacks import get_openai_callback + with get_openai_callback() as cb: output = await chain.arun(input=prompt_filter_chunk, verbose=True) print(cb) @@ -606,37 +830,47 @@ class EpisodicBuffer(BaseMemory): print("HERE IS THE OUTPUT", my_object.json()) data = json.loads(my_object.json()) - # Extract the list of tasks tasks_list = data["tasks"] return tasks_list - async def main_buffer(self, user_input=None, content=None, params=None): - + async def main_buffer( + self, user_input=None, content=None, params=None, attention_modulators=None + ): """AI buffer to understand user PDF query, prioritize memory info and process it based on available operations""" memory = Memory(user_id=self.user_id) await memory.async_init() - tasks_list = await self.buffer_context(user_input=user_input, content=content, params=params) + tasks_list = await self.get_task_list( + user_input=user_input, + content=content, + params=params, + attention_modulators=attention_modulators, + ) result_tasks = [] for task in tasks_list: + class PromptWrapper(BaseModel): observation: str = Field( description="observation we want to fetch from vectordb" ) - @tool("convert_to_structured", args_schema=PromptWrapper, return_direct=True) + @tool( + "convert_to_structured", args_schema=PromptWrapper, return_direct=True + ) def convert_to_structured(observation=None, json_schema=None): """Convert unstructured data to structured data""" BASE_DIR = os.getcwd() - json_path = os.path.join(BASE_DIR, "schema_registry", "ticket_schema.json") + json_path = os.path.join( + BASE_DIR, "schema_registry", "ticket_schema.json" + ) def load_json_or_infer_schema(file_path, document_path): """Load JSON schema from file or infer schema from text""" # Attempt to load the JSON file - with open(file_path, 'r') as file: + with open(file_path, "r") as file: json_schema = json.load(file) return json_schema @@ -649,13 +883,18 @@ class EpisodicBuffer(BaseMemory): SystemMessage( content="You are a world class algorithm converting unstructured data into structured data." ), - HumanMessage(content="Convert unstructured data to structured data:"), + HumanMessage( + content="Convert unstructured data to structured data:" + ), HumanMessagePromptTemplate.from_template("{input}"), - HumanMessage(content="Tips: Make sure to answer in the correct format"), + HumanMessage( + content="Tips: Make sure to answer in the correct format" + ), ] prompt_ = ChatPromptTemplate(messages=prompt_msgs) - chain_funct = create_structured_output_chain(json_schema, prompt=prompt_, llm=self.llm, - verbose=True) + chain_funct = create_structured_output_chain( + json_schema, prompt=prompt_, llm=self.llm, verbose=True + ) output = chain_funct.run(input=observation, llm=self.llm) return output @@ -663,21 +902,20 @@ class EpisodicBuffer(BaseMemory): return result class TranslateText(BaseModel): - observation: str = Field( - description="observation we want to translate" - ) + observation: str = Field(description="observation we want to translate") @tool("translate_to_de", args_schema=TranslateText, return_direct=True) def translate_to_de(observation, args_schema=TranslateText): """Translate to English""" - out = GoogleTranslator(source='auto', target='de').translate(text=observation) + out = GoogleTranslator(source="auto", target="de").translate( + text=observation + ) return out agent = initialize_agent( llm=self.llm, tools=[translate_to_de, convert_to_structured], agent=AgentType.OPENAI_FUNCTIONS, - verbose=True, ) print("HERE IS THE TASK", task) @@ -690,23 +928,37 @@ class EpisodicBuffer(BaseMemory): await self.encoding(str(result_tasks), self.namespace, params=params) - buffer_result = await self.fetch_memories(observation=str(output)) + buffer_result = await self.fetch_memories(observation=str(user_input)) print("HERE IS THE RESULT TASKS", str(buffer_result)) class EpisodicTask(BaseModel): """Schema for an individual task.""" - task_order: str = Field(..., description="The order at which the task needs to be performed") - task_name: str = Field(None, description="The task that needs to be performed") + + task_order: str = Field( + ..., description="The order at which the task needs to be performed" + ) + task_name: str = Field( + None, description="The task that needs to be performed" + ) operation: str = Field(None, description="The operation to be performed") - operation_result: str = Field(None, description="The result of the operation") + operation_result: str = Field( + None, description="The result of the operation" + ) class EpisodicList(BaseModel): """Schema for the record containing a list of tasks.""" + tasks: List[EpisodicTask] = Field(..., description="List of tasks") - start_date: str = Field(..., description="The order at which the task needs to be performed") - end_date: str = Field(..., description="The order at which the task needs to be performed") - user_query: str = Field(..., description="The order at which the task needs to be performed") + start_date: str = Field( + ..., description="The order at which the task needs to be performed" + ) + end_date: str = Field( + ..., description="The order at which the task needs to be performed" + ) + user_query: str = Field( + ..., description="The order at which the task needs to be performed" + ) parser = PydanticOutputParser(pydantic_object=EpisodicList) @@ -716,21 +968,30 @@ class EpisodicBuffer(BaseMemory): partial_variables={"format_instructions": parser.get_format_instructions()}, ) - _input = prompt.format_prompt(query=user_input, steps=str(tasks_list), buffer=buffer_result) + _input = prompt.format_prompt( + query=user_input, steps=str(tasks_list), buffer=buffer_result + ) # return "a few things to do like load episodic memory in a structured format" output = self.llm_base(_input.to_string()) result_parsing = parser.parse(output) print("here is the parsing result", result_parsing) - lookup_value = await memory._add_episodic_memory(observation=str(output), params=params) + lookup_value = await memory._add_episodic_memory( + observation=str(output), params=params + ) await self.delete_memories() return lookup_value class LongTermMemory: - def __init__(self, user_id: str = "676", memory_id: Optional[str] = None, index_name: Optional[str] = None, - db_type: str = "weaviate"): + def __init__( + self, + user_id: str = "676", + memory_id: Optional[str] = None, + index_name: Optional[str] = None, + db_type: str = "weaviate", + ): self.user_id = user_id self.memory_id = memory_id self.ltm_memory_id = str(uuid.uuid4()) @@ -741,8 +1002,13 @@ class LongTermMemory: class ShortTermMemory: - def __init__(self, user_id: str = "676", memory_id: Optional[str] = None, index_name: Optional[str] = None, - db_type: str = "weaviate"): + def __init__( + self, + user_id: str = "676", + memory_id: Optional[str] = None, + index_name: Optional[str] = None, + db_type: str = "weaviate", + ): self.user_id = user_id self.memory_id = memory_id self.stm_memory_id = str(uuid.uuid4()) @@ -751,18 +1017,20 @@ class ShortTermMemory: self.episodic_buffer = EpisodicBuffer(user_id, memory_id, index_name, db_type) - - - - - class Memory: load_dotenv() OPENAI_TEMPERATURE = float(os.getenv("OPENAI_TEMPERATURE", 0.0)) OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") - def __init__(self, user_id: str = "676", index_name: str = None, knowledge_source: str = None, - knowledge_type: str = None, db_type: str = "weaviate", namespace: str = None) -> None: + def __init__( + self, + user_id: str = "676", + index_name: str = None, + knowledge_source: str = None, + knowledge_type: str = None, + db_type: str = "weaviate", + namespace: str = None, + ) -> None: self.user_id = user_id self.index_name = index_name self.db_type = db_type @@ -775,45 +1043,55 @@ class Memory: load_dotenv() # Asynchronous factory function for creating LongTermMemory - async def async_create_long_term_memory(self, user_id, memory_id, index_name, db_type): + async def async_create_long_term_memory( + self, user_id, memory_id, index_name, db_type + ): # Perform asynchronous initialization steps if needed return LongTermMemory( - user_id=self.user_id, memory_id=self.memory_id, index_name=self.index_name, - db_type=self.db_type + user_id=self.user_id, + memory_id=self.memory_id, + index_name=self.index_name, + db_type=self.db_type, ) async def async_init(self): # Asynchronous initialization of LongTermMemory and ShortTermMemory self.long_term_memory = await self.async_create_long_term_memory( - user_id=self.user_id, memory_id=self.memory_id, index_name=self.index_name, - db_type=self.db_type + user_id=self.user_id, + memory_id=self.memory_id, + index_name=self.index_name, + db_type=self.db_type, ) self.short_term_memory = await self.async_create_short_term_memory( - user_id=self.user_id, memory_id=self.memory_id, index_name=self.index_name, - db_type=self.db_type + user_id=self.user_id, + memory_id=self.memory_id, + index_name=self.index_name, + db_type=self.db_type, ) - async def async_create_short_term_memory(self, user_id, memory_id, index_name, db_type): + async def async_create_short_term_memory( + self, user_id, memory_id, index_name, db_type + ): # Perform asynchronous initialization steps if needed return ShortTermMemory( - user_id=self.user_id, memory_id=self.memory_id, index_name=self.index_name, db_type=self.db_type + user_id=self.user_id, + memory_id=self.memory_id, + index_name=self.index_name, + db_type=self.db_type, ) - - # self.short_term_memory = await ShortTermMemory.async_init( - # user_id=self.user_id, memory_id=self.memory_id, index_name=self.index_name, db_type=self.db_type - # ) - - async def _add_semantic_memory(self, semantic_memory: str, params: dict = None): + async def _add_semantic_memory( + self, observation: str, loader_settings: dict = None, params: dict = None + ): return await self.long_term_memory.semantic_memory.add_memories( - semantic_memory=semantic_memory, params=params - + observation=observation, + loader_settings=loader_settings, + params=params, ) async def _fetch_semantic_memory(self, observation, params): return await self.long_term_memory.semantic_memory.fetch_memories( observation=observation, params=params - ) async def _delete_semantic_memory(self, params: str = None): @@ -821,10 +1099,11 @@ class Memory: params=params ) - async def _add_episodic_memory(self, observation: str, params: dict = None): + async def _add_episodic_memory( + self, observation: str, loader_settings: dict = None, params: dict = None + ): return await self.long_term_memory.episodic_memory.add_memories( - observation=observation, params=params - + observation=observation, loader_settings=loader_settings, params=params ) async def _fetch_episodic_memory(self, observation, params: str = None): @@ -837,30 +1116,66 @@ class Memory: params=params ) - async def _run_buffer(self, user_input: str, content: str = None, params:str=None): - return await self.short_term_memory.episodic_buffer.main_buffer(user_input=user_input, content=content, params=params) + async def _run_buffer( + self, user_input: str, content: str = None, params: str = None + ): + return await self.short_term_memory.episodic_buffer.main_buffer( + user_input=user_input, content=content, params=params + ) - async def _add_buffer_memory(self, user_input: str, namespace: str = None, params: dict = None): - return await self.short_term_memory.episodic_buffer.add_memories(observation=user_input, - params=params) + async def _add_buffer_memory( + self, + user_input: str, + namespace: str = None, + loader_settings: dict = None, + params: dict = None, + ): + return await self.short_term_memory.episodic_buffer.add_memories( + observation=user_input, loader_settings=loader_settings, params=params + ) - async def _fetch_buffer_memory(self, user_input: str, namespace: str = None): - return await self.short_term_memory.episodic_buffer.fetch_memories(observation=user_input) + async def _fetch_buffer_memory(self, user_input: str): + return await self.short_term_memory.episodic_buffer.fetch_memories( + observation=user_input + ) async def _delete_buffer_memory(self, params: str = None): return await self.short_term_memory.episodic_buffer.delete_memories( params=params ) - async def _create_buffer_context(self, user_input: str, content: str = None, params:str=None): + + async def _create_buffer_context( + self, + user_input: str, + content: str = None, + params: str = None, + attention_modulators: dict = None, + ): return await self.short_term_memory.episodic_buffer.buffer_context( - user_input=user_input, content=content, params=params + user_input=user_input, + content=content, + params=params, + attention_modulators=attention_modulators, ) + + async def _run_main_buffer( + self, + user_input: str, + content: str = None, + params: str = None, + attention_modulators: str = None, + ): + return await self.short_term_memory.episodic_buffer.main_buffer( + user_input=user_input, + content=content, + params=params, + attention_modulators=attention_modulators, + ) + async def _available_operations(self): return await self.long_term_memory.episodic_buffer.available_operations() - - async def main(): memory = Memory(user_id="123") await memory.async_init() @@ -875,11 +1190,11 @@ async def main(): "owner": "John Doe", "license": "MIT", "validity_start": "2023-08-01", - "validity_end": "2024-07-31" + "validity_end": "2024-07-31", } - gg = await memory._run_buffer(user_input="i NEED TRANSLATION TO GERMAN ", content="i NEED TRANSLATION TO GERMAN ", params=params) - print(gg) + # gg = await memory._run_buffer(user_input="i NEED TRANSLATION TO GERMAN ", content="i NEED TRANSLATION TO GERMAN ", params=params) + # print(gg) # gg = await memory._fetch_buffer_memory(user_input="i TO GERMAN ") # print(gg) @@ -903,13 +1218,29 @@ async def main(): } ] }""" - # - # ggur = await memory._delete_buffer_memory() - # print(ggur) - # ggur = await memory._add_buffer_memory(user_input = episodic, params=params) + + # modulator = {"relevance": 0.2, "saliency": 0.0, "frequency": 0.7} + # # + # ggur = await memory._create_buffer_context( + # user_input="i NEED TRANSLATION TO GERMAN ", + # content="i NEED TRANSLATION TO GERMAN ", + # params=str(params), + # attention_modulators=modulator, + # ) # print(ggur) - # fff = await memory._fetch_episodic_memory(observation = "healthy diet") + ll = { + "format": "PDF", + "source": "url", + "path": "https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf" + } + # ggur = await memory._add_semantic_memory(observation = "bla", loader_settings=ll, params=params) + # print(ggur) + # fff = await memory._delete_semantic_memory() + # print(fff) + + fff = await memory._fetch_semantic_memory(observation = "dog pulling sleds ", params=None) + print(fff) # print(len(fff["data"]["Get"]["EPISODICMEMORY"])) @@ -931,4 +1262,3 @@ if __name__ == "__main__": # "operator": "Equal", # "valueText": "2017*" # } -