added level 2 initial code

This commit is contained in:
Vasilije 2023-08-17 17:30:40 +02:00
parent e8fc6c8952
commit 81b6cc1920
15 changed files with 5063 additions and 0 deletions

11
level_2/.env Normal file
View file

@ -0,0 +1,11 @@
OPENAI_API_KEY = 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
PINECONE_API_KEY = 4e0982ad-57d5-41ac-bce5-d1fd2c2da273
PINECONE_API_ENV = us-west1-gcp
REPLICATE_API_TOKEN = 4e0982ad-57d5-41ac-bce5-d1fd2c2da273
GPLACES_API_KEY = AIzaSyAfuT9tBy6wC3phZR1Tl5acknNA_TU2mKE
REDIS_HOST=redis
SERPAPI_API_KEY=17bb94b76b0d7cf3fb1c36d8376e0fc4c3ed761e862b05ef154e116d73c39da5
ZAPIER_NLA_API_KEY=sk-ak-GtXls7Y5JcPOSbWw7SZDzSvtAF
LOCAL_DEV = True
WEAVIATE_API_KEY =shCCL5EVpOKxIdZhMRH090lFqDb5aE1XgUTP
WEAVIATE_URL = https://new-test-cluster-i49dzudl.weaviate.network

3
level_2/.env.template Normal file
View file

@ -0,0 +1,3 @@
OPENAI_API_KEY=sk
WEAVIATE_URL =
WEAVIATE_API_KEY =

36
level_2/Dockerfile Normal file
View file

@ -0,0 +1,36 @@
FROM python:3.11-slim
# Set build argument
ARG API_ENABLED
# Set environment variable based on the build argument
ENV API_ENABLED=${API_ENABLED} \
PIP_NO_CACHE_DIR=true
ENV PATH="${PATH}:/root/.poetry/bin"
RUN pip install poetry
WORKDIR /app
COPY pyproject.toml poetry.lock /app/
# Install the dependencies
RUN poetry config virtualenvs.create false && \
poetry install --no-root --no-dev
RUN apt-get update -q && \
apt-get install curl zip jq netcat-traditional -y -q
RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && \
unzip -qq awscliv2.zip && ./aws/install && \
apt-get clean && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
#RUN playwright install
#RUN playwright install-deps
WORKDIR /app
COPY . /app
COPY entrypoint.sh /app/entrypoint.sh
RUN chmod +x /app/entrypoint.sh
ENTRYPOINT ["/app/entrypoint.sh"]

44
level_2/Readme.md Normal file
View file

@ -0,0 +1,44 @@
## PromethAI Memory Manager
### Description
Initial code lets you do three operations:
1. Add to memory
2. Retrieve from memory
3. Structure the data to schema and load to duckdb
#How to use
## Installation
```docker compose build promethai_mem ```
## Run
```docker compose up promethai_mem ```
## Usage
The fast API endpoint accepts prompts and PDF files and returns a JSON object with the generated text.
```curl
-X POST
-F "prompt=The quick brown fox"
-F "file=@/path/to/file.pdf"
http://localhost:8000/upload/
```
{
"payload": {
"user_id": "681",
"session_id": "471",
"model_speed": "slow",
"prompt": "Temperature=Cold;Food Type=Ice Cream",
"pdf_url": "https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/pdf/dummy.pdf"
}
}

230
level_2/api.py Normal file
View file

@ -0,0 +1,230 @@
from langchain.document_loaders import PyPDFLoader
from level_2_pdf_vectorstore__dlt_contracts import Memory
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import Dict, Any
import re
import json
import logging
import os
import uvicorn
from fastapi import Request
import yaml
from fastapi import HTTPException
from fastapi import FastAPI, UploadFile, File
from typing import List
import requests
# Set up logging
logging.basicConfig(
level=logging.INFO, # Set the logging level (e.g., DEBUG, INFO, WARNING, ERROR, CRITICAL)
format="%(asctime)s [%(levelname)s] %(message)s", # Set the log message format
)
logger = logging.getLogger(__name__)
from dotenv import load_dotenv
load_dotenv()
app = FastAPI(debug=True)
from fastapi import Depends
class ImageResponse(BaseModel):
success: bool
message: str
@app.get("/", )
async def root():
"""
Root endpoint that returns a welcome message.
"""
return {"message": "Hello, World, I am alive!"}
@app.get("/health")
def health_check():
"""
Health check endpoint that returns the server status.
"""
return {"status": "OK"}
#curl -X POST -H "Content-Type: application/json" -d '{"data": "YourPayload"}' -F "files=@/path/to/your/pdf/file.pdf" http://127.0.0.1:8000/upload/
from fastapi import FastAPI, UploadFile, File
import requests
import os
import json
app = FastAPI()
from io import BytesIO
class Payload(BaseModel):
payload: Dict[str, Any]
@app.post("/upload/", response_model=dict)
async def upload_pdf_and_payload(
payload: Payload,
# files: List[UploadFile] = File(...),
):
try:
# Process the payload
decoded_payload = payload.payload
# except:
# pass
#
# return JSONResponse(content={"response": decoded_payload}, status_code=200)
# Download the remote PDF if URL is provided
if 'pdf_url' in decoded_payload:
pdf_response = requests.get(decoded_payload['pdf_url'])
pdf_content = pdf_response.content
logging.info("Downloaded PDF from URL")
# Create an in-memory file-like object for the PDF content
pdf_stream = BytesIO(pdf_content)
contents = pdf_stream.read()
tmp_location = os.path.join('/tmp', "tmp.pdf")
with open(tmp_location, 'wb') as tmp_file:
tmp_file.write(contents)
logging.info("Wrote PDF from URL")
# Process the PDF using PyPDFLoader
loader = PyPDFLoader(tmp_location)
pages = loader.load_and_split()
logging.info(" PDF split into pages")
Memory_ = Memory(index_name="my-agent", user_id='555' )
await Memory_.async_init()
Memory_._run_buffer(user_input="I want to get a schema for my data", content =pages)
# Run the buffer
response = Memory_._run_buffer(user_input="I want to get a schema for my data")
return JSONResponse(content={"response": response}, status_code=200)
#to do: add the user id to the payload
#to do add the raw pdf to payload
# bb = await Memory_._run_buffer(user_input=decoded_payload['prompt'])
# print(bb)
except Exception as e:
return {"error": str(e)}
# Here you can perform your processing on the PDF contents
# results.append({"filename": file.filename, "size": len(contents)})
# Append the in-memory file to the files list
# files.append(UploadFile(pdf_stream, filename="downloaded.pdf"))
#
# # Process each uploaded PDF file
# results = []
# for file in files:
# contents = await file.read()
# tmp_location = os.path.join('/tmp', "tmp.pdf")
# with open(tmp_location, 'wb') as tmp_file:
# tmp_file.write(contents)
# loader = PyPDFLoader(tmp_location)
# pages = loader.load_and_split()
#
# stm = ShortTermMemory(user_id=decoded_payload['user_id'])
# stm.episodic_buffer.main_buffer(prompt=decoded_payload['prompt'], pages=pages)
# # Here you can perform your processing on the PDF contents
# results.append({"filename": file.filename, "size": len(contents)})
#
# return {"message": "Upload successful", "results": results}
#
# except Exception as e:
# return {"error": str(e)}
# @app.post("/clear-cache", response_model=dict)
# async def clear_cache(request_data: Payload) -> dict:
# """
# Endpoint to clear the cache.
#
# Parameters:
# request_data (Payload): The request data containing the user and session IDs.
#
# Returns:
# dict: A dictionary with a message indicating the cache was cleared.
# """
# json_payload = request_data.payload
# agent = Agent()
# agent.set_user_session(json_payload["user_id"], json_payload["session_id"])
# try:
# agent.clear_cache()
# return JSONResponse(content={"response": "Cache cleared"}, status_code=200)
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
#
# @app.post("/correct-prompt-grammar", response_model=dict)
# async def prompt_to_correct_grammar(request_data: Payload) -> dict:
# json_payload = request_data.payload
# agent = Agent()
# agent.set_user_session(json_payload["user_id"], json_payload["session_id"])
# logging.info("Correcting grammar %s", json_payload["prompt_source"])
#
# output = agent.prompt_correction(json_payload["prompt_source"], model_speed= json_payload["model_speed"])
# return JSONResponse(content={"response": {"result": json.loads(output)}})
# @app.post("/action-add-zapier-calendar-action", response_model=dict,dependencies=[Depends(auth)])
# async def action_add_zapier_calendar_action(
# request: Request, request_data: Payload
# ) -> dict:
# json_payload = request_data.payload
# agent = Agent()
# agent.set_user_session(json_payload["user_id"], json_payload["session_id"])
# # Extract the bearer token from the header
# auth_header = request.headers.get("Authorization")
# if auth_header:
# bearer_token = auth_header.replace("Bearer ", "")
# else:
# bearer_token = None
# outcome = agent.add_zapier_calendar_action(
# prompt_base=json_payload["prompt_base"],
# token=bearer_token,
# model_speed=json_payload["model_speed"],
# )
# return JSONResponse(content={"response": outcome})
def start_api_server(host: str = "0.0.0.0", port: int = 8000):
"""
Start the API server using uvicorn.
Parameters:
host (str): The host for the server.
port (int): The port for the server.
"""
try:
logger.info(f"Starting server at {host}:{port}")
uvicorn.run(app, host=host, port=port)
except Exception as e:
logger.exception(f"Failed to start server: {e}")
# Here you could add any cleanup code or error recovery code.
if __name__ == "__main__":
start_api_server()

View file

@ -0,0 +1,22 @@
version: "3.9"
services:
promethai_mem:
networks:
- promethai_mem_backend
build:
context: ./
volumes:
- "./:/app"
environment:
- HOST=0.0.0.0
profiles: ["exclude-from-up"] # Use `docker-compose run teenage-agi` to get an attached container
ports:
- 8000:8000
- 443:443
networks:
promethai_mem_backend:
name: promethai_mem_backend

6
level_2/entrypoint.sh Executable file
View file

@ -0,0 +1,6 @@
#!/bin/bash
export ENVIRONMENT
#python fetch_secret.py
# Start Gunicorn
gunicorn -w 2 -k uvicorn.workers.UvicornWorker -t 120 --bind=0.0.0.0:8000 --bind=0.0.0.0:443 --log-level debug api:app

View file

@ -0,0 +1,711 @@
#Make sure to install the following packages: dlt, langchain, duckdb, python-dotenv, openai, weaviate-client
import dlt
from langchain import PromptTemplate, LLMChain
from langchain.agents import initialize_agent, AgentType
from langchain.chains.openai_functions import create_structured_output_chain
from langchain.chat_models import ChatOpenAI
from langchain.document_loaders import PyPDFLoader
import weaviate
import os
import json
import asyncio
from typing import Any, Dict, List
from deep_translator import (GoogleTranslator)
from langchain.chat_models import ChatOpenAI
from langchain.schema import LLMResult, HumanMessage
from langchain.callbacks.base import AsyncCallbackHandler, BaseCallbackHandler
from langchain.memory import VectorStoreRetrieverMemory
from marvin import ai_classifier
from enum import Enum
import marvin
import asyncio
from langchain.embeddings import OpenAIEmbeddings
from langchain.prompts import HumanMessagePromptTemplate, ChatPromptTemplate
from langchain.retrievers import WeaviateHybridSearchRetriever
from langchain.schema import Document, SystemMessage, HumanMessage, LLMResult
from langchain.tools import tool
from langchain.vectorstores import Weaviate
import uuid
from dotenv import load_dotenv
load_dotenv()
from pathlib import Path
from langchain import OpenAI, LLMMathChain
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
import os
from datetime import datetime
import os
from datetime import datetime
from jinja2 import Template
from langchain import PromptTemplate, LLMChain
from langchain.chains.openai_functions import create_structured_output_chain
from langchain.prompts import HumanMessagePromptTemplate, ChatPromptTemplate
from langchain.text_splitter import RecursiveCharacterTextSplitter
import pinecone
from langchain.vectorstores import Pinecone
from langchain.embeddings.openai import OpenAIEmbeddings
from pydantic import BaseModel, Field
from dotenv import load_dotenv
from langchain.schema import Document, SystemMessage, HumanMessage
from langchain.vectorstores import Weaviate
import weaviate
import uuid
load_dotenv()
class MyCustomSyncHandler(BaseCallbackHandler):
def on_llm_new_token(self, token: str, **kwargs) -> None:
print(f"Sync handler being called in a `thread_pool_executor`: token: {token}")
class MyCustomAsyncHandler(AsyncCallbackHandler):
"""Async callback handler that can be used to handle callbacks from langchain."""
async def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> None:
"""Run when chain starts running."""
print("zzzz....")
await asyncio.sleep(0.3)
class_name = serialized["name"]
print("Hi! I just woke up. Your llm is starting")
async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
"""Run when chain ends running."""
print("zzzz....")
await asyncio.sleep(0.3)
print("Hi! I just woke up. Your llm is ending")
class VectorDB:
OPENAI_TEMPERATURE = float(os.getenv("OPENAI_TEMPERATURE", 0.0))
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
def __init__(self, user_id: str, index_name: str, memory_id:str, ltm_memory_id:str='00000', st_memory_id:str='0000', buffer_id:str='0000', db_type: str = "pinecone", namespace:str = None):
self.user_id = user_id
self.index_name = index_name
self.db_type = db_type
self.namespace=namespace
self.memory_id = memory_id
self.ltm_memory_id = ltm_memory_id
self.st_memory_id = st_memory_id
self.buffer_id = buffer_id
# if self.db_type == "pinecone":
# self.vectorstore = self.init_pinecone(self.index_name)
if self.db_type == "weaviate":
self.init_weaviate(namespace=self.namespace)
else:
raise ValueError(f"Unsupported database type: {db_type}")
if self.db_type == "weaviate":
self.init_weaviate_client(namespace=self.namespace)
else:
raise ValueError(f"Unsupported VectorDB client type: {db_type}")
load_dotenv()
def init_pinecone(self, index_name):
load_dotenv()
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY", "")
PINECONE_API_ENV = os.getenv("PINECONE_API_ENV", "")
pinecone.init(api_key=PINECONE_API_KEY, environment=PINECONE_API_ENV)
pinecone.Index(index_name)
vectorstore: Pinecone = Pinecone.from_existing_index(
index_name=self.index_name,
embedding=OpenAIEmbeddings(),
namespace='RESULT'
)
return vectorstore
def init_weaviate_client(self, namespace:str):
embeddings = OpenAIEmbeddings()
auth_config = weaviate.auth.AuthApiKey(api_key=os.environ.get('WEAVIATE_API_KEY'))
client = weaviate.Client(
url=os.environ.get('WEAVIATE_URL'),
auth_client_secret=auth_config,
additional_headers={
"X-OpenAI-Api-Key": os.environ.get('OPENAI_API_KEY')
}
)
return client
def init_weaviate(self, namespace:str):
embeddings = OpenAIEmbeddings()
auth_config = weaviate.auth.AuthApiKey(api_key=os.environ.get('WEAVIATE_API_KEY'))
client = weaviate.Client(
url=os.environ.get('WEAVIATE_URL'),
auth_client_secret=auth_config,
additional_headers={
"X-OpenAI-Api-Key": os.environ.get('OPENAI_API_KEY')
}
)
retriever = WeaviateHybridSearchRetriever(
client=client,
index_name=namespace,
text_key="text",
attributes=[],
embedding=embeddings,
create_schema_if_missing=True,
)
return retriever
async def add_memories(self, observation: str, page: str = "", source: str = ""):
if self.db_type == "pinecone":
# Update Pinecone memories here
vectorstore: Pinecone = Pinecone.from_existing_index(
index_name=self.index_name, embedding=OpenAIEmbeddings(), namespace=self.namespace
)
retriever = vectorstore.as_retriever()
retriever.add_documents(
[
Document(
page_content=observation,
metadata={
"inserted_at": datetime.now(),
"text": observation,
"user_id": self.user_id,
"page": page,
"source": source
},
namespace=self.namespace,
)
]
)
elif self.db_type == "weaviate":
# Update Weaviate memories here
print(self.namespace)
retriever = self.init_weaviate( self.namespace)
return retriever.add_documents([
Document(
metadata={
"inserted_at": str(datetime.now()),
"text": observation,
"user_id": str(self.user_id),
"memory_id": str(self.memory_id),
"ltm_memory_id": str(self.ltm_memory_id),
"st_memory_id": str(self.st_memory_id),
"buffer_id": str(self.buffer_id),
"last_accessed_at": str(datetime.now()),
# **source_metadata,
},
page_content=observation,
)]
)
# def get_pinecone_vectorstore(self, namespace: str) -> pinecone.VectorStore:
# return Pinecone.from_existing_index(
# index_name=self.index, embedding=OpenAIEmbeddings(), namespace=namespace
# )
async def fetch_memories(self, observation: str, params = None):
if self.db_type == "pinecone":
# Fetch Pinecone memories here
pass
elif self.db_type == "weaviate":
# Fetch Weaviate memories here
"""
Get documents from weaviate.
Args a json containing:
query (str): The query string.
path (list): The path for filtering, e.g., ['year'].
operator (str): The operator for filtering, e.g., 'Equal'.
valueText (str): The value for filtering, e.g., '2017*'.
Example:
get_from_weaviate(query="some query", path=['year'], operator='Equal', valueText='2017*')
"""
retriever = self.init_weaviate(self.namespace)
print(self.namespace)
print(str(datetime.now()))
print(observation)
# Retrieve documents with filters applied
output = retriever.get_relevant_documents(
observation
# ,
# score=True
# ,
# where_filter=params
)
print(output)
return output
# def
def delete_memories(self, params: None):
auth_config = weaviate.auth.AuthApiKey(api_key=os.environ.get('WEAVIATE_API_KEY'))
client = weaviate.Client(
url=os.environ.get('WEAVIATE_API_KEY'),
auth_client_secret=auth_config,
additional_headers={
"X-OpenAI-Api-Key": os.environ.get('OPENAI_API_KEY')
}
)
client.batch.delete_objects(
class_name=self.namespace,
# Same `where` filter as in the GraphQL API
where=params,
)
def update_memories(self):
pass
class SemanticMemory:
def __init__(self, user_id: str, memory_id:str, ltm_memory_id:str, index_name: str, db_type:str="weaviate", namespace:str="SEMANTICMEMORY"):
# Add any semantic memory-related attributes or setup here
self.user_id=user_id
self.index_name = index_name
self.namespace = namespace
self.semantic_memory_id = str(uuid.uuid4())
self.memory_id = memory_id
self.ltm_memory_id = ltm_memory_id
self.vector_db = VectorDB(user_id=user_id, memory_id= self.memory_id, ltm_memory_id = self.ltm_memory_id, index_name=index_name, db_type=db_type, namespace=self.namespace)
self.db_type = db_type
def _update_memories(self ,memory_id:str="None", semantic_memory: str="None") -> None:
"""Update semantic memory for the user"""
if self.db_type == "weaviate":
self.vector_db.add_memories( observation = semantic_memory)
elif self.db_type == "pinecone":
pass
def _fetch_memories(self, observation: str,params) -> dict[str, str] | str:
"""Fetch related characteristics, preferences or dislikes for a user."""
# self.init_pinecone(index_name=self.index)
if self.db_type == "weaviate":
return self.vector_db.fetch_memories(observation, params)
elif self.db_type == "pinecone":
pass
class LongTermMemory:
def __init__(self, user_id: str = "676", memory_id:str=None, index_name: str = None, namespace:str=None, db_type:str="weaviate"):
self.user_id = user_id
self.memory_id = memory_id
self.ltm_memory_id = str(uuid.uuid4())
self.index_name = index_name
self.namespace = namespace
self.db_type = db_type
# self.episodic_memory = EpisodicMemory()
self.semantic_memory = SemanticMemory(user_id = self.user_id, memory_id=self.memory_id, ltm_memory_id = self.ltm_memory_id, index_name=self.index_name, db_type=self.db_type)
class ShortTermMemory:
def __init__(self, user_id: str = "676", memory_id:str=None, index_name: str = None, namespace:str=None, db_type:str="weaviate"):
# Add any short-term memory-related attributes or setup here
self.user_id = user_id
self.memory_id = memory_id
self.namespace = namespace
self.db_type = db_type
self.stm_memory_id = str(uuid.uuid4())
self.index_name = index_name
self.episodic_buffer = EpisodicBuffer(user_id=self.user_id, memory_id=self.memory_id, index_name=self.index_name, db_type=self.db_type)
class EpisodicBuffer:
def __init__(self, user_id: str = "676", memory_id:str=None, index_name: str = None, namespace:str='EPISODICBUFFER', db_type:str="weaviate"):
# Add any short-term memory-related attributes or setup here
self.user_id = user_id
self.memory_id = memory_id
self.namespace = namespace
self.db_type = db_type
self.st_memory_id = "blah"
self.index_name = index_name
self.llm= ChatOpenAI(
temperature=0.0,
max_tokens=1200,
openai_api_key=os.environ.get('OPENAI_API_KEY'),
model_name="gpt-4-0613",
callbacks=[MyCustomSyncHandler(), MyCustomAsyncHandler()],
)
# self.vector_db = VectorDB(user_id=user_id, memory_id= self.memory_id, st_memory_id = self.st_memory_id, index_name=index_name, db_type=db_type, namespace=self.namespace)
def _compute_weights(self, context: str):
"""Computes the weights for the buffer"""
pass
def _temporal_weighting(self, context: str):
"""Computes the temporal weighting for the buffer"""
pass
# async def infer_schema_from_text(self, text: str):
# """Infer schema from text"""
#
# prompt_ = """ You are a json schema master. Create a JSON schema based on the following data and don't write anything else: {prompt} """
#
# complete_query = PromptTemplate(
# input_variables=["prompt"],
# template=prompt_,
# )
#
# chain = LLMChain(
# llm=self.llm, prompt=complete_query, verbose=True
# )
# chain_result = chain.run(prompt=text).strip()
#
# json_data = json.dumps(chain_result)
# return json_data
async def _fetch_memories(self, observation: str,namespace:str) -> str:
vector_db = VectorDB(user_id=self.user_id, memory_id=self.memory_id, st_memory_id=self.st_memory_id,
index_name=self.index_name, db_type=self.db_type, namespace=namespace)
query = await vector_db.fetch_memories(observation=observation)
return query
async def _add_memories(self, observation: str,namespace:str):
vector_db = VectorDB(user_id=self.user_id, memory_id=self.memory_id, st_memory_id=self.st_memory_id,
index_name=self.index_name, db_type=self.db_type, namespace=namespace)
query = await vector_db.add_memories(observation)
return query
def encoding(self, document: str, namespace: str = "EPISODICBUFFER") -> None:
"""Encoding for the buffer, stores raw data in the buffer
Note, this is not comp-sci encoding, but rather encoding in the sense of storing the content in the buffer"""
vector_db = VectorDB(user_id=self.user_id, memory_id=self.memory_id, st_memory_id=self.st_memory_id,
index_name=self.index_name, db_type=self.db_type, namespace=namespace)
query = vector_db.add_memories(document)
return query
async def main_buffer(self, user_input=None, content=None):
"""AI buffer to convert unstructured data to structured data"""
# Here we define the user prompt and the structure of the output we desire
# prompt = output[0].page_content
if content is not None:
#We need to encode the content. Note, this is not comp-sci encoding, but rather encoding in the sense of storing the content in the buffer
self.encoding(content)
prompt_filter = ChatPromptTemplate.from_template("Filter and remove uneccessary information that is not relevant in the user query {query}")
chain_filter = prompt_filter | self.llm
output = await chain_filter.ainvoke({"query": user_input})
print(output)
if content is None:
# Sensory and Linguistic Processing
prompt_filter = ChatPromptTemplate.from_template("Filter and remove uneccessary information that is not relevant in the user query {query}")
chain_filter = prompt_filter | self.llm
output = await chain_filter.ainvoke({"query": user_input})
translation = GoogleTranslator(source='auto', target='en').translate(text=output.content)
def top_down_processing():
"""Top-down processing"""
def bottom_up_processing():
"""Bottom-up processing"""
pass
def interactive_processing():
"""interactive processing"""
pass
working_memory_activation = "bla"
prompt_chunk = ChatPromptTemplate.from_template("Can you break down the instruction 'Structure a PDF and load it into duckdb' into smaller tasks or actions? Return only tasks or actions. Be brief")
chain_chunk = prompt_chunk | self.llm
output_chunks = await chain_chunk.ainvoke({"query": output.content})
print(output_chunks.content)
# vectorstore = Weaviate.from_documents(documents, embeddings, client=client, by_text=False)
# retriever = WeaviateHybridSearchRetriever(
# client=client,
# index_name="EVENTBUFFER",
# text_key="text",
# attributes=[],
# embedding=embeddings,
# create_schema_if_missing=True,
# )
# vector_db = VectorDB(user_id=self.user_id, memory_id=self.memory_id, st_memory_id=self.st_memory_id,
# index_name=self.index_name, db_type=self.db_type, namespace="EVENTBUFFER")
# query = vector_db.
# retriever = vectorstore.as_retriever(search_kwargs=dict(k=1))
# memory = VectorStoreRetrieverMemory(retriever=retriever)
# class PromptWrapper(BaseModel):
# observation: str = Field(
# description="observation we want to fetch from vectordb"
# )
# # ,
# # json_schema: str = Field(description="json schema we want to infer")
# @tool("convert_to_structured", args_schema=PromptWrapper, return_direct=True)
# def convert_to_structured( observation=None, json_schema=None):
# """Convert unstructured data to structured data"""
# BASE_DIR = os.getcwd()
# json_path = os.path.join(BASE_DIR, "schema_registry", "ticket_schema.json")
#
# def load_json_or_infer_schema(file_path, document_path):
# """Load JSON schema from file or infer schema from text"""
#
# # Attempt to load the JSON file
# with open(file_path, 'r') as file:
# json_schema = json.load(file)
# return json_schema
#
# json_schema =load_json_or_infer_schema(json_path, None)
# def run_open_ai_mapper(observation=None, json_schema=None):
# """Convert unstructured data to structured data"""
#
# prompt_msgs = [
# SystemMessage(
# content="You are a world class algorithm converting unstructured data into structured data."
# ),
# HumanMessage(content="Convert unstructured data to structured data:"),
# HumanMessagePromptTemplate.from_template("{input}"),
# HumanMessage(content="Tips: Make sure to answer in the correct format"),
# ]
# prompt_ = ChatPromptTemplate(messages=prompt_msgs)
# chain_funct = create_structured_output_chain(json_schema, prompt=prompt_, llm=self.llm, verbose=True)
# output = chain_funct.run(input=observation, llm=self.llm)
# yield output
# pipeline = dlt.pipeline(pipeline_name="train_ticket", destination='duckdb', dataset_name='train_ticket_data')
# info = pipeline.run(data=run_open_ai_mapper(prompt, json_schema))
# return print(info)
#
#
# class GoalWrapper(BaseModel):
# observation: str = Field(
# description="observation we want to fetch from vectordb"
# )
#
# @tool("fetch_memory_wrapper", args_schema=GoalWrapper, return_direct=True)
# def fetch_memory_wrapper(observation, args_schema=GoalWrapper):
# """Fetches data from the VectorDB and returns it as a python dictionary."""
# print("HELLO, HERE IS THE OBSERVATION: ", observation)
#
# marvin.settings.openai.api_key = os.environ.get('OPENAI_API_KEY')
# @ai_classifier
# class MemoryRoute(Enum):
# """Represents distinct routes for different memory types."""
#
# storage_of_documents_and_knowledge_to_memory = "SEMANTICMEMORY"
# raw_information_currently_processed_in_short_term_memory = "EPISODICBUFFER"
# raw_information_kept_in_short_term_memory = "SHORTTERMMEMORY"
# long_term_recollections_of_past_events_and_emotions = "EPISODICMEMORY"
# raw_information_to_store_as_events = "EVENTBUFFER"
#
# namespace= MemoryRoute(observation)
# vector_db = VectorDB(user_id=self.user_id, memory_id=self.memory_id, st_memory_id=self.st_memory_id,
# index_name=self.index_name, db_type=self.db_type, namespace=namespace.value)
#
#
# query = vector_db.fetch_memories(observation)
#
# return query
#
# class UpdatePreferences(BaseModel):
# observation: str = Field(
# description="observation we want to fetch from vectordb"
# )
#
# @tool("add_memories_wrapper", args_schema=UpdatePreferences, return_direct=True)
# def add_memories_wrapper(observation, args_schema=UpdatePreferences):
# """Updates user preferences in the VectorDB."""
# @ai_classifier
# class MemoryRoute(Enum):
# """Represents distinct routes for different memory types."""
#
# storage_of_documents_and_knowledge_to_memory = "SEMANTICMEMORY"
# raw_information_currently_processed_in_short_term_memory = "EPISODICBUFFER"
# raw_information_kept_in_short_term_memory = "SHORTTERMMEMORY"
# long_term_recollections_of_past_events_and_emotions = "EPISODICMEMORY"
# raw_information_to_store_as_events = "EVENTBUFFER"
#
# namespace= MemoryRoute(observation)
# print("HELLO, HERE IS THE OBSERVATION 2: ")
# vector_db = VectorDB(user_id=self.user_id, memory_id=self.memory_id, st_memory_id=self.st_memory_id,
# index_name=self.index_name, db_type=self.db_type, namespace=namespace.value)
# return vector_db.add_memories(observation)
#
# agent = initialize_agent(
# llm=self.llm,
# tools=[convert_to_structured,fetch_memory_wrapper, add_memories_wrapper],
# agent=AgentType.OPENAI_FUNCTIONS,
#
# verbose=True,
# )
#
# prompt = """
# Based on all the history and information of this user, decide based on user query query: {query} which of the following tasks needs to be done:
# 1. Memory retrieval , 2. Memory update, 3. Convert data to structured If the query is not any of these, then classify it as 'Other'
# Return the result in format: 'Result_type': 'Goal', "Original_query": "Original query"
# """
#
# # template = Template(prompt)
# # output = template.render(query=user_input)
# # complete_query = output
# complete_query = PromptTemplate(
# input_variables=[ "query"], template=prompt
# )
# summary_chain = LLMChain(
# llm=self.llm, prompt=complete_query, verbose=True
# )
# from langchain.chains import SimpleSequentialChain
#
# overall_chain = SimpleSequentialChain(
# chains=[summary_chain, agent], verbose=True
# )
# output = overall_chain.run(user_input)
# return output
#DEFINE STM
#DEFINE LTM
class Memory:
load_dotenv()
OPENAI_TEMPERATURE = float(os.getenv("OPENAI_TEMPERATURE", 0.0))
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
def __init__(self, user_id: str = "676", index_name: str = None, knowledge_source: str = None,
knowledge_type: str = None, db_type:str="weaviate", namespace:str=None) -> None:
self.user_id = user_id
self.index_name = index_name
self.db_type = db_type
self.knowledge_source = knowledge_source
self.knowledge_type = knowledge_type
self.memory_id = str(uuid.uuid4())
self.long_term_memory = None
self.short_term_memory = None
self.namespace = namespace
load_dotenv()
# Asynchronous factory function for creating LongTermMemory
async def async_create_long_term_memory(self,user_id, memory_id, index_name, namespace, db_type):
# Perform asynchronous initialization steps if needed
return LongTermMemory(
user_id=user_id, memory_id=memory_id, index_name=index_name,
namespace=namespace, db_type=db_type
)
async def async_init(self):
# Asynchronous initialization of LongTermMemory and ShortTermMemory
self.long_term_memory = await self.async_create_long_term_memory(
user_id=self.user_id, memory_id=self.memory_id, index_name=self.index_name,
namespace=self.namespace, db_type=self.db_type
)
async def async_create_short_term_memory(self, user_id, memory_id, index_name, db_type):
# Perform asynchronous initialization steps if needed
return ShortTermMemory(
user_id=user_id, memory_id=memory_id, index_name=index_name, db_type=db_type
)
async def async_init(self):
# Asynchronous initialization of LongTermMemory and ShortTermMemory
self.long_term_memory = await self.async_create_long_term_memory(
user_id=self.user_id, memory_id=self.memory_id, index_name=self.index_name,
namespace=self.namespace, db_type=self.db_type
)
self.short_term_memory = await self.async_create_short_term_memory(
user_id=self.user_id, memory_id=self.memory_id, index_name=self.index_name,
db_type=self.db_type
)
# self.short_term_memory = await ShortTermMemory.async_init(
# user_id=self.user_id, memory_id=self.memory_id, index_name=self.index_name, db_type=self.db_type
# )
def _update_semantic_memory(self, semantic_memory:str):
return self.long_term_memory.semantic_memory._update_memories(
memory_id=self.memory_id,
semantic_memory=semantic_memory
)
def _fetch_semantic_memory(self, observation, params):
return self.long_term_memory.semantic_memory._fetch_memories(
observation=observation, params=params
)
async def _run_buffer(self, user_input:str):
return await self.short_term_memory.episodic_buffer.main_buffer(user_input=user_input)
async def _add_memories_buffer(self, user_input: str, namespace: str = None ):
return await self.short_term_memory.episodic_buffer._add_memories(observation=user_input, namespace=namespace)
async def _fetch_memories_buffer(self, user_input: str, namespace: str = None ):
return await self.short_term_memory.episodic_buffer._fetch_memories(observation=user_input, namespace=namespace)
async def main():
memory = Memory(user_id="123")
await memory.async_init()
# gg = await memory._run_buffer(user_input="I want to get a my past data from 2017")
ggur = await memory._add_memories_buffer(user_input = "bla bla bla", namespace="test")
print(ggur)
fff = await memory._fetch_memories_buffer(user_input = "bla bla bla", namespace="Test")
print(fff)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
# bb = agent._update_semantic_memory(semantic_memory="Users core summary")
# bb = agent._fetch_semantic_memory(observation= "Users core summary", params = {
# "path": ["inserted_at"],
# "operator": "Equal",
# "valueText": "*2023*"
# })
# buffer = agent._run_buffer(user_input="I want to get a schema for my data")
# print(bb)
# rrr = {
# "path": ["year"],
# "operator": "Equal",
# "valueText": "2017*"
# }

3772
level_2/poetry.lock generated Normal file

File diff suppressed because it is too large Load diff

48
level_2/pyproject.toml Normal file
View file

@ -0,0 +1,48 @@
[tool.poetry]
name = "PromethAI_memory"
version = "0.1.0"
description = "PromethAI memory manager"
authors = ["Vasilije Markovic"]
readme = "README.md"
[tool.poetry.dependencies]
python = "^3.10"
#langchain = {git = "https://github.com/topoteretes/langchain.git" , tag = "v0.0.209"}
langchain = "v0.0.250"
nltk = "3.8.1"
openai = "0.27.8"
pinecone-client = "2.2.2"
python-dotenv = "1.0.0"
pyyaml = "6.0"
fastapi = "0.98.0"
uvicorn = "0.22.0"
googlemaps = "4.10.0"
jinja2 = "3.1.2"
replicate = "^0.8.4"
pexpect = "^4.8.0"
selenium = "^4.9.0"
playwright = "^1.32.1"
pytest-playwright = "^0.3.3"
boto3 = "^1.26.125"
gptcache = "^0.1.22"
redis = "^4.5.5"
gunicorn = "^20.1.0"
tiktoken = "^0.4.0"
google-search-results = "^2.4.2"
spacy = "^3.5.3"
python-jose = "^3.3.0"
pypdf = "^3.12.0"
fastjsonschema = "^2.18.0"
marvin = "^1.3.0"
dlt = { version ="^0.3.8" , extras = ["duckdb"]}
weaviate-client = "^3.22.1"
python-multipart = "^0.0.6"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

View file

@ -0,0 +1,180 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"ticketType": {
"type": "string",
"enum": ["online ticket", "ICE ticket"]
},
"departureDate": {
"type": "string",
"format": "date"
},
"priceType": {
"type": "string",
"enum": ["Flex price (single journey)"]
},
"class": {
"type": "integer",
"enum": [1]
},
"adult": {
"type": "object",
"properties": {
"quantity": {
"type": "integer"
},
"BC50": {
"type": "integer"
}
},
"required": ["quantity", "BC50"]
},
"journey": {
"type": "object",
"properties": {
"from": {
"type": "string"
},
"to": {
"type": "string"
},
"via": {
"type": "string"
},
"train": {
"type": "string",
"enum": ["ICE"]
}
},
"required": ["from", "to", "via", "train"]
},
"refundPolicy": {
"type": "string"
},
"payment": {
"type": "object",
"properties": {
"items": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"quantity": {
"type": "integer"
},
"price": {
"type": "number"
},
"vat19": {
"type": "number"
},
"vat7": {
"type": "number"
}
},
"required": ["name", "quantity", "price", "vat19", "vat7"]
}
},
"total": {
"type": "number"
},
"method": {
"type": "string",
"enum": ["credit card"]
},
"transactionDetails": {
"type": "object",
"properties": {
"amount": {
"type": "number"
},
"VUNumber": {
"type": "integer"
},
"transactionNumber": {
"type": "integer"
},
"date": {
"type": "string",
"format": "date"
},
"genNumber": {
"type": "string"
}
},
"required": ["amount", "VUNumber", "transactionNumber", "date", "genNumber"]
}
},
"required": ["items", "total", "method", "transactionDetails"]
},
"bookingDetails": {
"type": "object",
"properties": {
"bookingDate": {
"type": "string",
"format": "date-time"
},
"bookingAddress": {
"type": "string"
},
"taxNumber": {
"type": "string"
}
},
"required": ["bookingDate", "bookingAddress", "taxNumber"]
},
"journeyDetails": {
"type": "object",
"properties": {
"validFrom": {
"type": "string",
"format": "date"
},
"passengerName": {
"type": "string"
},
"orderNumber": {
"type": "string"
},
"stops": {
"type": "array",
"items": {
"type": "object",
"properties": {
"stop": {
"type": "string"
},
"date": {
"type": "string",
"format": "date"
},
"time": {
"type": "string",
"format": "time"
},
"track": {
"type": "integer"
},
"product": {
"type": "string"
},
"reservation": {
"type": "string"
}
},
"required": ["stop", "date", "time", "track", "product", "reservation"]
}
}
},
"required": ["validFrom", "passengerName", "orderNumber", "stops"]
},
"usageNotes": {
"type": "string"
}
},
"required": ["ticketType", "departureDate", "priceType", "class", "adult", "journey", "refundPolicy", "payment", "bookingDetails", "journeyDetails", "usageNotes"]
}