diff --git a/.github/actions/image_builder/action.yaml b/.github/actions/image_builder/action.yaml new file mode 100644 index 000000000..607b850af --- /dev/null +++ b/.github/actions/image_builder/action.yaml @@ -0,0 +1,36 @@ +name: 'Build Docker images for PromethAI' +description: 'Build PromethAI-related Docker images and push to the Docker registry (AWS ECR)' +inputs: + stage: + description: 'The stage of the pipeline, such as "dev" or "prd", for the PromethAI app' + required: true + aws_account_id: + description: 'The AWS account ID for the PromethAI app' + required: true + should_publish: + description: 'Whether to publish the PromethAI Docker image to AWS ECR; should be either "true" or "false"' + required: true + ecr_image_repo_name: + description: 'The Docker image ECR repository name for the PromethAI app, such as "workflows"' + required: true + dockerfile_location: + description: 'The directory location of the Dockerfile for the PromethAI app' + required: true + +runs: + using: "composite" + steps: + - name: Build PromethAI App Docker image + shell: bash + env: + STAGE: ${{ inputs.stage }} + run: | + export SHA_SHORT="$(git rev-parse --short HEAD)" + export CUR_DATE="$(date +%Y%m%d%H%M%S)" + export VERSION="${{ inputs.stage }}-$CUR_DATE-$SHA_SHORT" + export STAGE="${{ inputs.stage }}" + export APP_DIR="$PWD/${{ inputs.dockerfile_location }}" + image_name="${{ inputs.ecr_image_repo_name }}" docker_login="true" version="$VERSION" account="${{ inputs.aws_account_id }}" app_dir="$APP_DIR" publish="${{ inputs.should_publish }}" ./bin/dockerize + echo "Docker tag is: $VERSION" + echo $VERSION > /tmp/.DOCKER_IMAGE_VERSION + diff --git a/.github/workflows/cd.yaml b/.github/workflows/cd.yaml new file mode 100644 index 000000000..e03a3bd45 --- /dev/null +++ b/.github/workflows/cd.yaml @@ -0,0 +1,76 @@ +name: Publishing promethai-backend Docker image + +on: + push: + branches: + - dev + - feature/* + paths-ignore: + - '**.md' + +env: + AWS_ROLE_DEV_CICD: "arn:aws:iam::463722570299:role/promethai-dev-base-role-github-ci-cd" + AWS_ACCOUNT_ID_DEV: "463722570299" + +jobs: + + publish_docker_to_ecr: + name: Publish Docker PromethAI image + runs-on: ubuntu-latest + permissions: + id-token: write + contents: read + steps: + - name: Take code from repo + uses: actions/checkout@v3 + - name: Set environment variable for stage + id: set-env + run: | + if [[ "${{ github.ref }}" == "refs/heads/main" ]]; then + echo "STAGE=prd" >> $GITHUB_ENV + echo "::set-output name=stage::prd" + else + echo "STAGE=dev" >> $GITHUB_ENV + echo "::set-output name=stage::dev" + fi + - name: Use output + run: echo "The stage is ${{ steps.set-env.outputs.stage }}" + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + role-to-assume: ${{ env.AWS_ROLE_DEV_CICD }} + aws-region: eu-west-1 + - name: Create Docker image and push to ECR + uses: ./.github/actions/image_builder + id: generate-promethai-docker + with: + stage: dev + aws_account_id: ${{ env.AWS_ACCOUNT_ID_DEV }} + should_publish: true + ecr_image_repo_name: promethai-dev-backend-promethai-backend-memory + dockerfile_location: ./level_2 + - name: Export Docker image tag + id: export-promethai-docker-tag + run: | + export DOCKER_TAG=$(cat /tmp/.DOCKER_IMAGE_VERSION) + echo "Docker tag is: $DOCKER_TAG" + echo "promethai_docker_tag_backend=$DOCKER_TAG" >> $GITHUB_OUTPUT + outputs: + promethai_docker_tag_backend: ${{ steps.export-promethai-docker-tag.outputs.promethai_docker_tag_backend }} + +# apply_tf: +# name: Trigger terraform apply workflow +# runs-on: ubuntu-latest +# needs: publish_docker_to_ecr +# steps: +# - name: TF apply workflow triggers step +# uses: actions/github-script@v6 +# with: +# github-token: ${{ secrets.PAT_FOR_CROSS_REPOS_CICD_TRIGGERING }} +# script: | +# await github.rest.actions.createWorkflowDispatch({ +# owner: 'topoteretes', +# repo: 'PromethAI-Infra', +# workflow_id: 'terraform.apply.yml', +# ref: 'main' +# }) diff --git a/.github/workflows/cd_prd.yaml b/.github/workflows/cd_prd.yaml new file mode 100644 index 000000000..97b943ad2 --- /dev/null +++ b/.github/workflows/cd_prd.yaml @@ -0,0 +1,99 @@ +on: + push: + branches: + - main + paths-ignore: + - '**.md' + - 'examples/**' +name: Publishing promethai-backend Docker image to prd ECR + +env: + AWS_ROLE_DEV_CICD: "arn:aws:iam::463722570299:role/promethai-dev-base-role-github-ci-cd" + AWS_ACCOUNT_ID_DEV: "463722570299" + ENVIRONMENT: prd + +jobs: + + publish_docker_to_ecr: + name: Publish Docker PromethAI image + runs-on: ubuntu-latest + permissions: + id-token: write + contents: read + steps: + - name: Take code from repo + uses: actions/checkout@v3 + - name: Set environment variable for stage + id: set-env + run: | + if [[ "${{ github.ref }}" == "refs/heads/main" ]]; then + echo "STAGE=prd" >> $GITHUB_ENV + echo "::set-output name=stage::prd" + else + echo "STAGE=dev" >> $GITHUB_ENV + echo "::set-output name=stage::dev" + fi + - name: Use output + run: echo "The stage is ${{ steps.set-env.outputs.stage }}" + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + role-to-assume: ${{ env.AWS_ROLE_DEV_CICD }} + aws-region: eu-west-1 + - name: Create Docker image and push to ECR + uses: ./.github/actions/image_builder + id: generate-promethai-docker + with: + stage: prd + aws_account_id: ${{ env.AWS_ACCOUNT_ID_DEV }} + should_publish: true + ecr_image_repo_name: promethai-prd-backend-promethai-backend-memory + dockerfile_location: ./level_2 + - name: Export Docker image tag + id: export-promethai-docker-tag + run: | + export DOCKER_TAG=$(cat /tmp/.DOCKER_IMAGE_VERSION) + echo "Docker tag is: $DOCKER_TAG" + echo "promethai_docker_tag_backend=$DOCKER_TAG" >> $GITHUB_OUTPUT + +# - name: Create Tag and Release +# runs-on: ubuntu-latest +# uses: actions/checkout@v3 +# needs: publish_docker_to_ecr # ensure this job runs after Docker image is pushed +# steps: +# - name: Check out code +# uses: actions/checkout@v3 +# - name: Bump version and push tag +# id: bump_version_and_push_tag +# uses: anothrNick/github-tag-action@1.34.0 +# env: +# GITHUB_TOKEN: ${{ secrets.PAT_FOR_CROSS_REPOS_CICD_TRIGGERING }} +# WITH_V: true +# DEFAULT_BUMP: 'minor' # or 'minor' or 'major' +# - name: Create Release +# id: create_release +# uses: actions/create-release@v1 +# env: +# GITHUB_TOKEN: ${{ secrets.PAT_FOR_CROSS_REPOS_CICD_TRIGGERING }} +# with: +# tag_name: ${{ steps.bump_version_and_push_tag.outputs.tag }} +# release_name: Release ${{ steps.bump_version_and_push_tag.outputs.tag }} + outputs: + promethai_docker_tag_backend: ${{ steps.export-promethai-docker-tag.outputs.promethai_docker_tag_backend }} + +# apply_tf: +# name: Trigger terraform apply workflow +# runs-on: ubuntu-latest +# needs: publish_docker_to_ecr +# steps: +# - name: TF apply workflow triggers step +# uses: actions/github-script@v6 +# with: +# github-token: ${{ secrets.PAT_FOR_CROSS_REPOS_CICD_TRIGGERING }} +# script: | +# await github.rest.actions.createWorkflowDispatch({ +# owner: 'topoteretes', +# repo: 'PromethAI-Infra', +# workflow_id: 'terraform.apply.yml', +# ref: 'main' +# }) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 000000000..bc6e7f7ce --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,25 @@ +name: Test build docker image for PromethAI backend app + +on: pull_request + +env: + AWS_ACCOUNT_ID_DEV: "463722570299" + +jobs: + + build_docker: + name: Build PromethAI Backend Docker App Image + runs-on: ubuntu-latest + steps: + - name: Check out PromethAI code + uses: actions/checkout@v3 + + - name: Build PromethAI backend Docker image tag + id: backend-docker-tag + run: | + export SHA_SHORT="$(git rev-parse --short HEAD)" + export CUR_DATE="$(date +%Y%m%d%H%M%S)" + export VERSION="dev-$CUR_DATE-$SHA_SHORT" + image_name="backend-memory" docker_login="false" version="$VERSION" account="${{ env.AWS_ACCOUNT_ID_DEV }}" app_dir="level_2" publish="false" ./bin/dockerize + export DOCKER_TAG=$(cat level_2/tmp/.DOCKER_IMAGE_VERSION) + echo "Successfully built PromethAI backend Docker tag is: $DOCKER_TAG" diff --git a/bin/dockerize b/bin/dockerize new file mode 100755 index 000000000..8949bd773 --- /dev/null +++ b/bin/dockerize @@ -0,0 +1,30 @@ +STAGE=${stage:-"dev"} +SHA_SHORT="$(git rev-parse --short HEAD)" +CUR_DATE="$(date +%Y%m%d%H%M%S)" +VERSION="$STAGE-$CUR_DATE-$SHA_SHORT" +IMAGE_NAME=${image_name:-promethai-${STAGE}-promethai-backend-memory} + +REPO_NAME="${AWS_REPOSITORY}/${IMAGE_NAME}" +FULL_IMAGE_NAME="${REPO_NAME}:${VERSION}" +APP_DIR=${app_dir:-"./level_2"} # Updated this line + +PUBLISH=${publish:-false} + +echo "Building docker image ${FULL_IMAGE_NAME} located in dir ${app_dir}" + +pushd "${APP_DIR}" && + docker buildx build --platform linux/amd64 \ + --build-arg STAGE=${STAGE} \ + -t "${FULL_IMAGE_NAME}" . && + echo "${VERSION}" >/tmp/.DOCKER_IMAGE_VERSION && + echo "Successfully built docker image ${FULL_IMAGE_NAME}" + +if [ "${PUBLISH}" = true ]; then + echo "Pushing docker image ${FULL_IMAGE_NAME} to ECR repository to AWS account ${AWS_DEPLOYMENT_ACCOUNT}" + if [ "${PUBLISH}" = true ]; then + echo "logging in" + aws ecr get-login-password --region "${AWS_REGION}" | docker login --username AWS --password-stdin "${AWS_REPOSITORY}" + fi + docker push "${FULL_IMAGE_NAME}" && + echo "Successfully pushed docker image ${FULL_IMAGE_NAME} to ECR repository" +fi diff --git a/level_2/Readme.md b/level_2/Readme.md index 2b1d36675..3b99f4f4a 100644 --- a/level_2/Readme.md +++ b/level_2/Readme.md @@ -9,7 +9,8 @@ Initial code lets you do three operations: 1. Add to memory 2. Retrieve from memory -3. Structure the data to schema and load to duckdb +3. Structure the data to schema +4. Load to a database #How to use @@ -38,16 +39,17 @@ The Memory API provides the following endpoints: - /run-buffer (POST) - /buffer/create-context (POST) -Here is a payload example: + +## How To Get Started + +1. We do a post request to add-memory endpoint with the following payload: +It will upload Jack London "Call of the Wild" to SEMANTIC memory ``` -{ +curl -X POST http://localhost:8000/semantic/add-memory -H "Content-Type: application/json" -d '{ "payload": { "user_id": "681", - "session_id": "471", - "model_speed": "slow", - "prompt": "I want ", - "pdf_url": "https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/pdf/dummy.pdf", + "prompt": "I am adding docs", "params": { "version": "1.0", "agreement_id": "AG123456", @@ -60,7 +62,75 @@ Here is a payload example: "license": "MIT", "validity_start": "2023-08-01", "validity_end": "2024-07-31" + }, + "loader_settings": { + "format": "PDF", + "source": "url", + "path": "https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf" } } -} -``` \ No newline at end of file +}' +``` + +2. We run the buffer with the prompt "I want to know how does Buck adapt to life in the wild and then have that info translated to German " + +``` +curl -X POST http://localhost:8000/run-buffer -H "Content-Type: application/json" -d '{ + "payload": { + "user_id": "681", + "prompt": "I want to know how does Buck adapt to life in the wild and then have that info translated to German ", + "params": { + "version": "1.0", + "agreement_id": "AG123456", + "privacy_policy": "https://example.com/privacy", + "terms_of_service": "https://example.com/terms", + "format": "json", + "schema_version": "1.1", + "checksum": "a1b2c3d4e5f6", + "owner": "John Doe", + "license": "MIT", + "validity_start": "2023-08-01", + "validity_end": "2024-07-31" + }, + "attention_modulators": { + "relevance": 0.0, + "saliency": 0.1 + } + } +}' +``` + + +Other attention modulators that could be implemented: + + "frequency": 0.5, + "repetition": 0.5, + "length": 0.5, + "position": 0.5, + "context": 0.5, + "emotion": 0.5, + "sentiment": 0.5, + "perspective": 0.5, + "style": 0.5, + "grammar": 0.5, + "spelling": 0.5, + "logic": 0.5, + "coherence": 0.5, + "cohesion": 0.5, + "plausibility": 0.5, + "consistency": 0.5, + "informativeness": 0.5, + "specificity": 0.5, + "detail": 0.5, + "accuracy": 0.5, + "topicality": 0.5, + "focus": 0.5, + "clarity": 0.5, + "simplicity": 0.5, + "naturalness": 0.5, + "fluency": 0.5, + "variety": 0.5, + "vividness": 0.5, + "originality": 0.5, + "creativity": 0.5, + "humor": 0.5, \ No newline at end of file diff --git a/level_2/api.py b/level_2/api.py index 2e5a8e9fe..083e87e9d 100644 --- a/level_2/api.py +++ b/level_2/api.py @@ -1,23 +1,14 @@ -from io import BytesIO +import logging +import os +from typing import Dict, Any -from langchain.document_loaders import PyPDFLoader - -from level_2_pdf_vectorstore__dlt_contracts import Memory +import uvicorn from fastapi import FastAPI from fastapi.responses import JSONResponse from pydantic import BaseModel -from typing import Dict, Any -import re -import json -import logging -import os -import uvicorn -from fastapi import Request -import yaml -from fastapi import HTTPException -from fastapi import FastAPI, UploadFile, File -from typing import List -import requests + +from level_2_pdf_vectorstore__dlt_contracts import Memory +from dotenv import load_dotenv # Set up logging logging.basicConfig( level=logging.INFO, # Set the logging level (e.g., DEBUG, INFO, WARNING, ERROR, CRITICAL) @@ -25,26 +16,21 @@ logging.basicConfig( ) logger = logging.getLogger(__name__) -from dotenv import load_dotenv + load_dotenv() OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") - app = FastAPI(debug=True) -from fastapi import Depends - - class ImageResponse(BaseModel): success: bool message: str - - - -@app.get("/", ) +@app.get( + "/", +) async def root(): """ Root endpoint that returns a welcome message. @@ -57,187 +43,99 @@ def health_check(): Health check endpoint that returns the server status. """ return {"status": "OK"} - - - - - - -#curl -X POST -H "Content-Type: application/json" -d '{"data": "YourPayload"}' -F "files=@/path/to/your/pdf/file.pdf" http://127.0.0.1:8000/upload/ - - class Payload(BaseModel): payload: Dict[str, Any] -# @app.post("/upload/", response_model=dict) -# async def upload_pdf_and_payload( -# payload: Payload, -# # files: List[UploadFile] = File(...), -# ): -# try: -# # Process the payload -# decoded_payload = payload.payload -# # except: -# # pass -# # -# # return JSONResponse(content={"response": decoded_payload}, status_code=200) -# -# # Download the remote PDF if URL is provided -# if 'pdf_url' in decoded_payload: -# pdf_response = requests.get(decoded_payload['pdf_url']) -# pdf_content = pdf_response.content -# -# logging.info("Downloaded PDF from URL") -# -# # Create an in-memory file-like object for the PDF content -# pdf_stream = BytesIO(pdf_content) -# -# contents = pdf_stream.read() -# -# tmp_location = os.path.join('/tmp', "tmp.pdf") -# with open(tmp_location, 'wb') as tmp_file: -# tmp_file.write(contents) -# -# logging.info("Wrote PDF from URL") -# -# # Process the PDF using PyPDFLoader -# loader = PyPDFLoader(tmp_location) -# pages = loader.load_and_split() -# logging.info(" PDF split into pages") -# Memory_ = Memory(index_name="my-agent", user_id='555' ) -# await Memory_.async_init() -# Memory_._add_episodic_memory(user_input="I want to get a schema for my data", content =pages) -# -# -# # Run the buffer -# response = Memory_._run_buffer(user_input="I want to get a schema for my data") -# return JSONResponse(content={"response": response}, status_code=200) -# -# #to do: add the user id to the payload -# #to do add the raw pdf to payload -# # bb = await Memory_._run_buffer(user_input=decoded_payload['prompt']) -# # print(bb) -# -# -# except Exception as e: -# -# return {"error": str(e)} -# # Here you can perform your processing on the PDF contents -# # results.append({"filename": file.filename, "size": len(contents)}) -# -# # Append the in-memory file to the files list -# # files.append(UploadFile(pdf_stream, filename="downloaded.pdf")) -# - - def memory_factory(memory_type): load_dotenv() + class Payload(BaseModel): payload: Dict[str, Any] + @app.post("/{memory_type}/add-memory", response_model=dict) async def add_memory( - payload: Payload, - # files: List[UploadFile] = File(...), + payload: Payload, + # files: List[UploadFile] = File(...), ): try: - logging.info(" Init PDF processing") - decoded_payload = payload.payload - if 'pdf_url' in decoded_payload: - pdf_response = requests.get(decoded_payload['pdf_url']) - pdf_content = pdf_response.content + Memory_ = Memory(user_id=decoded_payload["user_id"]) - logging.info("Downloaded PDF from URL") + await Memory_.async_init() - # Create an in-memory file-like object for the PDF content - pdf_stream = BytesIO(pdf_content) - - contents = pdf_stream.read() - - tmp_location = os.path.join('/tmp', "tmp.pdf") - with open(tmp_location, 'wb') as tmp_file: - tmp_file.write(contents) - - logging.info("Wrote PDF from URL") - - # Process the PDF using PyPDFLoader - loader = PyPDFLoader(tmp_location) - # pages = loader.load_and_split() - logging.info(" PDF split into pages") - - Memory_ = Memory(user_id=decoded_payload['user_id']) - - await Memory_.async_init() - - memory_class = getattr(Memory_, f"_add_{memory_type}_memory", None) - output= await memory_class(observation=str(loader), params =decoded_payload['params']) - return JSONResponse(content={"response": output}, status_code=200) + memory_class = getattr(Memory_, f"_add_{memory_type}_memory", None) + output = await memory_class( + observation=decoded_payload["prompt"], + loader_settings=decoded_payload["loader_settings"], + params=decoded_payload["params"], + ) + return JSONResponse(content={"response": output}, status_code=200) except Exception as e: - - return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) + return JSONResponse( + content={"response": {"error": str(e)}}, status_code=503 + ) @app.post("/{memory_type}/fetch-memory", response_model=dict) async def fetch_memory( - payload: Payload, - # files: List[UploadFile] = File(...), + payload: Payload, + # files: List[UploadFile] = File(...), ): try: - decoded_payload = payload.payload - Memory_ = Memory(user_id=decoded_payload['user_id']) + Memory_ = Memory(user_id=decoded_payload["user_id"]) await Memory_.async_init() memory_class = getattr(Memory_, f"_fetch_{memory_type}_memory", None) - output = memory_class(observation=decoded_payload['prompt']) + output = memory_class(observation=decoded_payload["prompt"]) return JSONResponse(content={"response": output}, status_code=200) except Exception as e: - - return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) + return JSONResponse( + content={"response": {"error": str(e)}}, status_code=503 + ) @app.post("/{memory_type}/delete-memory", response_model=dict) async def delete_memory( - payload: Payload, - # files: List[UploadFile] = File(...), + payload: Payload, + # files: List[UploadFile] = File(...), ): try: - decoded_payload = payload.payload - Memory_ = Memory(user_id=decoded_payload['user_id']) + Memory_ = Memory(user_id=decoded_payload["user_id"]) await Memory_.async_init() memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None) - output = memory_class(observation=decoded_payload['prompt']) + output = memory_class(observation=decoded_payload["prompt"]) return JSONResponse(content={"response": output}, status_code=200) except Exception as e: + return JSONResponse( + content={"response": {"error": str(e)}}, status_code=503 + ) - return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) memory_list = ["episodic", "buffer", "semantic"] for memory_type in memory_list: memory_factory(memory_type) - @app.get("/available-buffer-actions", response_model=dict) async def available_buffer_actions( - payload: Payload, - # files: List[UploadFile] = File(...), + payload: Payload, + # files: List[UploadFile] = File(...), ): try: - decoded_payload = payload.payload - Memory_ = Memory(user_id=decoded_payload['user_id']) + Memory_ = Memory(user_id=decoded_payload["user_id"]) await Memory_.async_init() @@ -246,126 +144,73 @@ async def available_buffer_actions( return JSONResponse(content={"response": output}, status_code=200) except Exception as e: - return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) + @app.post("/run-buffer", response_model=dict) -async def available_buffer_actions( - payload: Payload, - # files: List[UploadFile] = File(...), +async def run_buffer( + payload: Payload, + # files: List[UploadFile] = File(...), ): try: - decoded_payload = payload.payload - Memory_ = Memory(user_id=decoded_payload['user_id']) + Memory_ = Memory(user_id=decoded_payload["user_id"]) await Memory_.async_init() # memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None) - output = await Memory_._run_buffer(user_input=decoded_payload['prompt'], params=decoded_payload['params']) + output = await Memory_._run_main_buffer( + user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"] + ) return JSONResponse(content={"response": output}, status_code=200) except Exception as e: - return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) + @app.post("/buffer/create-context", response_model=dict) -async def available_buffer_actions( - payload: Payload, - # files: List[UploadFile] = File(...), +async def create_context( + payload: Payload, + # files: List[UploadFile] = File(...), ): try: - decoded_payload = payload.payload - Memory_ = Memory(user_id=decoded_payload['user_id']) + Memory_ = Memory(user_id=decoded_payload["user_id"]) await Memory_.async_init() # memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None) - output = await Memory_._create_buffer_context(user_input=decoded_payload['prompt'], params=decoded_payload['params']) + output = await Memory_._create_buffer_context( + user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"] + ) return JSONResponse(content={"response": output}, status_code=200) except Exception as e: - return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) -# - # # Process each uploaded PDF file - # results = [] - # for file in files: - # contents = await file.read() - # tmp_location = os.path.join('/tmp', "tmp.pdf") - # with open(tmp_location, 'wb') as tmp_file: - # tmp_file.write(contents) - # loader = PyPDFLoader(tmp_location) - # pages = loader.load_and_split() - # - # stm = ShortTermMemory(user_id=decoded_payload['user_id']) - # stm.episodic_buffer.main_buffer(prompt=decoded_payload['prompt'], pages=pages) - # # Here you can perform your processing on the PDF contents - # results.append({"filename": file.filename, "size": len(contents)}) - # - # return {"message": "Upload successful", "results": results} - # - # except Exception as e: - # return {"error": str(e)} +@app.post("/buffer/get-tasks", response_model=dict) +async def create_context( + payload: Payload, + # files: List[UploadFile] = File(...), +): + try: + decoded_payload = payload.payload + Memory_ = Memory(user_id=decoded_payload["user_id"]) -# @app.post("/clear-cache", response_model=dict) -# async def clear_cache(request_data: Payload) -> dict: -# """ -# Endpoint to clear the cache. -# -# Parameters: -# request_data (Payload): The request data containing the user and session IDs. -# -# Returns: -# dict: A dictionary with a message indicating the cache was cleared. -# """ -# json_payload = request_data.payload -# agent = Agent() -# agent.set_user_session(json_payload["user_id"], json_payload["session_id"]) -# try: -# agent.clear_cache() -# return JSONResponse(content={"response": "Cache cleared"}, status_code=200) -# except Exception as e: -# raise HTTPException(status_code=500, detail=str(e)) -# -# @app.post("/correct-prompt-grammar", response_model=dict) -# async def prompt_to_correct_grammar(request_data: Payload) -> dict: -# json_payload = request_data.payload -# agent = Agent() -# agent.set_user_session(json_payload["user_id"], json_payload["session_id"]) -# logging.info("Correcting grammar %s", json_payload["prompt_source"]) -# -# output = agent.prompt_correction(json_payload["prompt_source"], model_speed= json_payload["model_speed"]) -# return JSONResponse(content={"response": {"result": json.loads(output)}}) - - -# @app.post("/action-add-zapier-calendar-action", response_model=dict,dependencies=[Depends(auth)]) -# async def action_add_zapier_calendar_action( -# request: Request, request_data: Payload -# ) -> dict: -# json_payload = request_data.payload -# agent = Agent() -# agent.set_user_session(json_payload["user_id"], json_payload["session_id"]) -# # Extract the bearer token from the header -# auth_header = request.headers.get("Authorization") -# if auth_header: -# bearer_token = auth_header.replace("Bearer ", "") -# else: -# bearer_token = None -# outcome = agent.add_zapier_calendar_action( -# prompt_base=json_payload["prompt_base"], -# token=bearer_token, -# model_speed=json_payload["model_speed"], -# ) -# return JSONResponse(content={"response": outcome}) + await Memory_.async_init() + # memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None) + output = await Memory_._get_task_list( + user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"] + ) + return JSONResponse(content={"response": output}, status_code=200) + except Exception as e: + return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) def start_api_server(host: str = "0.0.0.0", port: int = 8000): """ diff --git a/level_2/level_2_pdf_vectorstore__dlt_contracts.py b/level_2/level_2_pdf_vectorstore__dlt_contracts.py index 85598728c..2bda155bc 100644 --- a/level_2/level_2_pdf_vectorstore__dlt_contracts.py +++ b/level_2/level_2_pdf_vectorstore__dlt_contracts.py @@ -1,129 +1,123 @@ # Make sure to install the following packages: dlt, langchain, duckdb, python-dotenv, openai, weaviate-client - -import dlt -from langchain import PromptTemplate, LLMChain -from langchain.agents import initialize_agent, AgentType -from langchain.chains.openai_functions import create_structured_output_chain -from langchain.chat_models import ChatOpenAI -from langchain.document_loaders import PyPDFLoader -import weaviate -import os import json -import asyncio -from typing import Any, Dict, List, Coroutine -from deep_translator import (GoogleTranslator) -from langchain.chat_models import ChatOpenAI -from langchain.output_parsers import PydanticOutputParser -from langchain.schema import LLMResult, HumanMessage -from langchain.callbacks.base import AsyncCallbackHandler, BaseCallbackHandler -from pydantic import BaseModel, Field, parse_obj_as -from langchain.memory import VectorStoreRetrieverMemory -from marvin import ai_classifier from enum import Enum +from io import BytesIO +from typing import Dict, List, Union + import marvin -import asyncio -from langchain.embeddings import OpenAIEmbeddings -from langchain.prompts import HumanMessagePromptTemplate, ChatPromptTemplate -from langchain.retrievers import WeaviateHybridSearchRetriever -from langchain.schema import Document, SystemMessage, HumanMessage, LLMResult -from langchain.tools import tool -from langchain.vectorstores import Weaviate -import uuid +import requests +from deep_translator import GoogleTranslator from dotenv import load_dotenv +from langchain.agents import initialize_agent, AgentType +from langchain.document_loaders import PyPDFLoader +from langchain.output_parsers import PydanticOutputParser +from langchain.retrievers import WeaviateHybridSearchRetriever +from langchain.tools import tool +from marvin import ai_classifier +from pydantic import parse_obj_as load_dotenv() -from pathlib import Path -from langchain import OpenAI, LLMMathChain +from langchain import OpenAI from langchain.chat_models import ChatOpenAI -from langchain.prompts import ChatPromptTemplate -import uuid from typing import Optional +import tracemalloc +tracemalloc.start() -from datetime import datetime import os from datetime import datetime -from jinja2 import Template -from langchain import PromptTemplate, LLMChain +from langchain import PromptTemplate from langchain.chains.openai_functions import create_structured_output_chain from langchain.prompts import HumanMessagePromptTemplate, ChatPromptTemplate -from langchain.text_splitter import RecursiveCharacterTextSplitter -import pinecone -from langchain.vectorstores import Pinecone from langchain.embeddings.openai import OpenAIEmbeddings from pydantic import BaseModel, Field from dotenv import load_dotenv from langchain.schema import Document, SystemMessage, HumanMessage -from langchain.vectorstores import Weaviate -import weaviate import uuid import humanize -import pinecone import weaviate + load_dotenv() OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") +marvin.settings.openai.api_key = os.environ.get("OPENAI_API_KEY") - -class MyCustomSyncHandler(BaseCallbackHandler): - def on_llm_new_token(self, token: str, **kwargs) -> None: - print(f"Sync handler being called in a `thread_pool_executor`: token: {token}") - - -class MyCustomAsyncHandler(AsyncCallbackHandler): - """Async callback handler that can be used to handle callbacks from langchain.""" - - async def on_llm_start( - self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any - ) -> None: - """Run when chain starts running.""" - print("zzzz....") - await asyncio.sleep(0.3) - class_name = serialized["name"] - print("Hi! I just woke up. Your llm is starting") - - async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: - """Run when chain ends running.""" - print("zzzz....") - await asyncio.sleep(0.3) - print("Hi! I just woke up. Your llm is ending") - - +# class MyCustomSyncHandler(BaseCallbackHandler): +# def on_llm_new_token(self, token: str, **kwargs) -> None: +# print(f"Sync handler being called in a `thread_pool_executor`: token: {token}") +# +# +# class MyCustomAsyncHandler(AsyncCallbackHandler): +# """Async callback handler that can be used to handle callbacks from langchain.""" +# +# async def on_llm_start( +# self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any +# ) -> None: +# """Run when chain starts running.""" +# print("zzzz....") +# await asyncio.sleep(0.3) +# class_name = serialized["name"] +# print("Hi! I just woke up. Your llm is starting") +# +# async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: +# """Run when chain ends running.""" +# print("zzzz....") +# await asyncio.sleep(0.3) +# print("Hi! I just woke up. Your llm is ending") +# +# # Assuming OpenAIEmbeddings and other necessary imports are available # Default Values -LTM_MEMORY_ID_DEFAULT = '00000' -ST_MEMORY_ID_DEFAULT = '0000' -BUFFER_ID_DEFAULT = '0000' +LTM_MEMORY_ID_DEFAULT = "00000" +ST_MEMORY_ID_DEFAULT = "0000" +BUFFER_ID_DEFAULT = "0000" class VectorDBFactory: - def create_vector_db(self, user_id: str, index_name: str, memory_id: str, - ltm_memory_id: str = LTM_MEMORY_ID_DEFAULT, - st_memory_id: str = ST_MEMORY_ID_DEFAULT, - buffer_id: str = BUFFER_ID_DEFAULT, - db_type: str = "pinecone", - namespace: str = None): - db_map = { - "pinecone": PineconeVectorDB, - "weaviate": WeaviateVectorDB - } + def create_vector_db( + self, + user_id: str, + index_name: str, + memory_id: str, + ltm_memory_id: str = LTM_MEMORY_ID_DEFAULT, + st_memory_id: str = ST_MEMORY_ID_DEFAULT, + buffer_id: str = BUFFER_ID_DEFAULT, + db_type: str = "pinecone", + namespace: str = None, + ): + db_map = {"pinecone": PineconeVectorDB, "weaviate": WeaviateVectorDB} if db_type in db_map: - return db_map[db_type](user_id, index_name, memory_id, ltm_memory_id, st_memory_id, buffer_id, namespace) + return db_map[db_type]( + user_id, + index_name, + memory_id, + ltm_memory_id, + st_memory_id, + buffer_id, + namespace, + ) raise ValueError(f"Unsupported database type: {db_type}") class VectorDB: OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") - def __init__(self, user_id: str, index_name: str, memory_id: str, - ltm_memory_id: str = LTM_MEMORY_ID_DEFAULT, - st_memory_id: str = ST_MEMORY_ID_DEFAULT, - buffer_id: str = BUFFER_ID_DEFAULT, namespace: str = None): + + def __init__( + self, + user_id: str, + index_name: str, + memory_id: str, + ltm_memory_id: str = LTM_MEMORY_ID_DEFAULT, + st_memory_id: str = ST_MEMORY_ID_DEFAULT, + buffer_id: str = BUFFER_ID_DEFAULT, + namespace: str = None, + ): self.user_id = user_id self.index_name = index_name self.namespace = namespace @@ -151,14 +145,13 @@ class WeaviateVectorDB(VectorDB): def init_weaviate(self, namespace: str): # Weaviate initialization logic embeddings = OpenAIEmbeddings() - auth_config = weaviate.auth.AuthApiKey(api_key=os.environ.get('WEAVIATE_API_KEY')) + auth_config = weaviate.auth.AuthApiKey( + api_key=os.environ.get("WEAVIATE_API_KEY") + ) client = weaviate.Client( - url=os.environ.get('WEAVIATE_URL'), + url=os.environ.get("WEAVIATE_URL"), auth_client_secret=auth_config, - - additional_headers={ - "X-OpenAI-Api-Key": os.environ.get('OPENAI_API_KEY') - } + additional_headers={"X-OpenAI-Api-Key": os.environ.get("OPENAI_API_KEY")}, ) retriever = WeaviateHybridSearchRetriever( client=client, @@ -168,61 +161,107 @@ class WeaviateVectorDB(VectorDB): embedding=embeddings, create_schema_if_missing=True, ) - return retriever # If this is part of the initialization, call it here. + return retriever # If this is part of the initialization, call it here. def init_weaviate_client(self, namespace: str): # Weaviate client initialization logic - auth_config = weaviate.auth.AuthApiKey(api_key=os.environ.get('WEAVIATE_API_KEY')) + auth_config = weaviate.auth.AuthApiKey( + api_key=os.environ.get("WEAVIATE_API_KEY") + ) client = weaviate.Client( - url=os.environ.get('WEAVIATE_URL'), + url=os.environ.get("WEAVIATE_URL"), auth_client_secret=auth_config, - - additional_headers={ - "X-OpenAI-Api-Key": os.environ.get('OPENAI_API_KEY') - } + additional_headers={"X-OpenAI-Api-Key": os.environ.get("OPENAI_API_KEY")}, ) return client - async def add_memories(self, observation: str, params: dict = None): + def _document_loader(self, observation: str, loader_settings: dict): + # Create an in-memory file-like object for the PDF content + if loader_settings.get("format") == "PDF": + + if loader_settings.get("source") == "url": + pdf_response = requests.get(loader_settings["path"]) + pdf_stream = BytesIO(pdf_response.content) + contents = pdf_stream.read() + tmp_location = os.path.join("/tmp", "tmp.pdf") + with open(tmp_location, "wb") as tmp_file: + tmp_file.write(contents) + + # Process the PDF using PyPDFLoader + loader = PyPDFLoader(tmp_location) + # adapt this for different chunking strategies + pages = loader.load_and_split() + return pages + + if loader_settings.get("source") == "file": + # Process the PDF using PyPDFLoader + # might need adapting for different loaders + OCR + # need to test the path + loader = PyPDFLoader(loader_settings["path"]) + pages = loader.load_and_split() + + return pages + else: + # Process the text by just loading the base text + return observation + + + async def add_memories( + self, observation: str, loader_settings: dict = None, params: dict = None ,namespace:str=None + ): # Update Weaviate memories here print(self.namespace) - retriever = self.init_weaviate(self.namespace) + if namespace is None: + namespace = self.namespace + retriever = self.init_weaviate(namespace) - return retriever.add_documents([ + def _stuct(observation, params): + """Utility function to not repeat metadata structure""" + # needs smarter solution, like dynamic generation of metadata + return [ Document( metadata={ - "text": observation, + # "text": observation, "user_id": str(self.user_id), "memory_id": str(self.memory_id), "ltm_memory_id": str(self.ltm_memory_id), "st_memory_id": str(self.st_memory_id), "buffer_id": str(self.buffer_id), - "version": params.get('version', None) or "", - "agreement_id": params.get('agreement_id', None) or "", - "privacy_policy": params.get('privacy_policy', None) or "", - "terms_of_service": params.get('terms_of_service', None) or "", - "format": params.get('format', None) or "", - "schema_version": params.get('schema_version', None) or "", - "checksum": params.get('checksum', None) or "", - "owner": params.get('owner', None) or "", - "license": params.get('license', None) or "", - "validity_start": params.get('validity_start', None) or "", - "validity_end": params.get('validity_end', None) or "" - + "version": params.get("version", None) or "", + "agreement_id": params.get("agreement_id", None) or "", + "privacy_policy": params.get("privacy_policy", None) or "", + "terms_of_service": params.get("terms_of_service", None) or "", + "format": params.get("format", None) or "", + "schema_version": params.get("schema_version", None) or "", + "checksum": params.get("checksum", None) or "", + "owner": params.get("owner", None) or "", + "license": params.get("license", None) or "", + "validity_start": params.get("validity_start", None) or "", + "validity_end": params.get("validity_end", None) or "" # **source_metadata, }, page_content=observation, - )] + ) + ] + + if loader_settings: + # Load the document + document = self._document_loader(observation, loader_settings) + print("DOC LENGTH", len(document)) + for doc in document: + document_to_load = _stuct(doc.page_content, params) + retriever.add_documents( + document_to_load + ) + + return retriever.add_documents( + _stuct(observation, params) ) - # def get_pinecone_vectorstore(self, namespace: str) -> pinecone.VectorStore: - # return Pinecone.from_existing_index( - # index_name=self.index, embedding=OpenAIEmbeddings(), namespace=namespace - # ) - - async def fetch_memories(self, observation: str, namespace: str, params: dict = None): - + async def fetch_memories( + self, observation: str, namespace: str, params: dict = None + ): # Fetch Weaviate memories here """ Get documents from weaviate. @@ -247,49 +286,77 @@ class WeaviateVectorDB(VectorDB): params_user_id = { "path": ["user_id"], "operator": "Like", - "valueText": self.user_id + "valueText": self.user_id, } if params: - query_output = client.query.get(namespace, ["text" - , "user_id" - , "memory_id" - , "ltm_memory_id" - , "st_memory_id" - , "buffer_id" - , "version", - "agreement_id", - "privacy_policy", - "terms_of_service", - "format", - "schema_version", - "checksum", - "owner", - "license", - "validity_start", - "validity_end"]).with_where(params).with_additional( - ['id', 'creationTimeUnix', 'lastUpdateTimeUnix', "score"]).with_where(params_user_id).do() + query_output = ( + client.query.get( + namespace, + [ + # "text", + "user_id", + "memory_id", + "ltm_memory_id", + "st_memory_id", + "buffer_id", + "version", + "agreement_id", + "privacy_policy", + "terms_of_service", + "format", + "schema_version", + "checksum", + "owner", + "license", + "validity_start", + "validity_end", + ], + ) + .with_where(params) + .with_near_text({"concepts": [observation]}) + .with_additional( + ["id", "creationTimeUnix", "lastUpdateTimeUnix", "score",'distance'] + ) + .with_where(params_user_id) + .do() + ) return query_output else: - query_output = client.query.get(namespace, ["text", - "user_id", - "memory_id", - "ltm_memory_id", - "st_memory_id", - "buffer_id", - "version", - "agreement_id", - "privacy_policy", - "terms_of_service", - "format", - "schema_version", - "checksum", - "owner", - "license", - "validity_start", - "validity_end" - ]).with_additional( - ['id', 'creationTimeUnix', 'lastUpdateTimeUnix', "score"]).with_where(params_user_id).do() + query_output = ( + client.query.get( + namespace, + + [ + "text", + "user_id", + "memory_id", + "ltm_memory_id", + "st_memory_id", + "buffer_id", + "version", + "agreement_id", + "privacy_policy", + "terms_of_service", + "format", + "schema_version", + "checksum", + "owner", + "license", + "validity_start", + "validity_end", + ], + ) + .with_additional( + ["id", "creationTimeUnix", "lastUpdateTimeUnix", "score", 'distance'] + ) + .with_hybrid( + query=observation, + ) + .with_autocut(1) + .with_where(params_user_id) + .do() + ) return query_output async def delete_memories(self, params: dict = None): @@ -298,7 +365,7 @@ class WeaviateVectorDB(VectorDB): where_filter = { "path": ["id"], "operator": "Equal", - "valueText": params.get('id', None) + "valueText": params.get("id", None), } return client.batch.delete_objects( class_name=self.namespace, @@ -311,10 +378,10 @@ class WeaviateVectorDB(VectorDB): return client.batch.delete_objects( class_name=self.namespace, where={ - 'path': ['user_id'], - 'operator': 'Equal', - 'valueText': self.user_id - } + "path": ["user_id"], + "operator": "Equal", + "valueText": self.user_id, + }, ) def update_memories(self, observation, namespace: str, params: dict = None): @@ -322,35 +389,41 @@ class WeaviateVectorDB(VectorDB): client.data_object.update( data_object={ - "text": observation, + # "text": observation, "user_id": str(self.user_id), "memory_id": str(self.memory_id), "ltm_memory_id": str(self.ltm_memory_id), "st_memory_id": str(self.st_memory_id), "buffer_id": str(self.buffer_id), - "version": params.get('version', None) or "", - "agreement_id": params.get('agreement_id', None) or "", - "privacy_policy": params.get('privacy_policy', None) or "", - "terms_of_service": params.get('terms_of_service', None) or "", - "format": params.get('format', None) or "", - "schema_version": params.get('schema_version', None) or "", - "checksum": params.get('checksum', None) or "", - "owner": params.get('owner', None) or "", - "license": params.get('license', None) or "", - "validity_start": params.get('validity_start', None) or "", - "validity_end": params.get('validity_end', None) or "" - + "version": params.get("version", None) or "", + "agreement_id": params.get("agreement_id", None) or "", + "privacy_policy": params.get("privacy_policy", None) or "", + "terms_of_service": params.get("terms_of_service", None) or "", + "format": params.get("format", None) or "", + "schema_version": params.get("schema_version", None) or "", + "checksum": params.get("checksum", None) or "", + "owner": params.get("owner", None) or "", + "license": params.get("license", None) or "", + "validity_start": params.get("validity_start", None) or "", + "validity_end": params.get("validity_end", None) or "" # **source_metadata, - }, class_name="Test", - uuid=params.get('id', None), + uuid=params.get("id", None), consistency_level=weaviate.data.replication.ConsistencyLevel.ALL, # default QUORUM ) return + class BaseMemory: - def __init__(self, user_id: str, memory_id: Optional[str], index_name: Optional[str], db_type: str, namespace: str): + def __init__( + self, + user_id: str, + memory_id: Optional[str], + index_name: Optional[str], + db_type: str, + namespace: str, + ): self.user_id = user_id self.memory_id = memory_id self.index_name = index_name @@ -358,17 +431,41 @@ class BaseMemory: self.memory_type_id = str(uuid.uuid4()) self.db_type = db_type factory = VectorDBFactory() - self.vector_db = factory.create_vector_db(self.user_id, self.index_name, self.memory_id, db_type=self.db_type, - namespace=self.namespace) + self.vector_db = factory.create_vector_db( + self.user_id, + self.index_name, + self.memory_id, + db_type=self.db_type, + namespace=self.namespace, + ) - async def add_memories(self, observation: Optional[str] = None, params: Optional[dict] = None): + def init_client(self, namespace: str): if self.db_type == "weaviate": - return await self.vector_db.add_memories(observation=observation, params=params) + return self.vector_db.init_weaviate_client(namespace) + + async def add_memories( + self, + observation: Optional[str] = None, + loader_settings: dict = None, + params: Optional[dict] = None, + namespace: Optional[str] = None, + ): + if self.db_type == "weaviate": + return await self.vector_db.add_memories( + observation=observation, loader_settings=loader_settings, params=params, namespace=namespace + ) # Add other db_type conditions if necessary - async def fetch_memories(self, observation: str, params: Optional[str] = None): + async def fetch_memories( + self, + observation: str, + params: Optional[str] = None, + namespace: Optional[str] = None, + ): if self.db_type == "weaviate": - return await self.vector_db.fetch_memories(observation, params) + return await self.vector_db.fetch_memories( + observation=observation, params=params, namespace=namespace + ) async def delete_memories(self, params: Optional[str] = None): if self.db_type == "weaviate": @@ -378,180 +475,286 @@ class BaseMemory: class SemanticMemory(BaseMemory): - def __init__(self, user_id: str, memory_id: Optional[str], index_name: Optional[str], db_type: str = "weaviate"): - super().__init__(user_id, memory_id, index_name, db_type, namespace="SEMANTICMEMORY") + def __init__( + self, + user_id: str, + memory_id: Optional[str], + index_name: Optional[str], + db_type: str = "weaviate", + ): + super().__init__( + user_id, memory_id, index_name, db_type, namespace="SEMANTICMEMORY" + ) class EpisodicMemory(BaseMemory): - def __init__(self, user_id: str, memory_id: Optional[str], index_name: Optional[str], db_type: str = "weaviate"): - super().__init__(user_id, memory_id, index_name, db_type, namespace="EPISODICMEMORY") + def __init__( + self, + user_id: str, + memory_id: Optional[str], + index_name: Optional[str], + db_type: str = "weaviate", + ): + super().__init__( + user_id, memory_id, index_name, db_type, namespace="EPISODICMEMORY" + ) class EpisodicBuffer(BaseMemory): - def __init__(self, user_id: str, memory_id: Optional[str], index_name: Optional[str], db_type: str = "weaviate"): - super().__init__(user_id, memory_id, index_name, db_type, namespace="BUFFERMEMORY") + def __init__( + self, + user_id: str, + memory_id: Optional[str], + index_name: Optional[str], + db_type: str = "weaviate", + ): + super().__init__( + user_id, memory_id, index_name, db_type, namespace="BUFFERMEMORY" + ) self.st_memory_id = "blah" self.llm = ChatOpenAI( temperature=0.0, max_tokens=1200, - openai_api_key=os.environ.get('OPENAI_API_KEY'), + openai_api_key=os.environ.get("OPENAI_API_KEY"), model_name="gpt-4-0613", - callbacks=[MyCustomSyncHandler(), MyCustomAsyncHandler()], + # callbacks=[MyCustomSyncHandler(), MyCustomAsyncHandler()], ) self.llm_base = OpenAI( temperature=0.0, max_tokens=1200, - openai_api_key=os.environ.get('OPENAI_API_KEY'), - model_name="gpt-4-0613" + openai_api_key=os.environ.get("OPENAI_API_KEY"), + model_name="gpt-4-0613", ) + async def memory_route(self, text_time_diff: str): + @ai_classifier + class MemoryRoute(Enum): + """Represents classifer for freshness of memories""" + + data_uploaded_now = "1" + data_uploaded_very_recently = "0.9" + data_uploaded_recently = "0.7" + data_uploaded_more_than_a_month_ago = "0.5" + data_uploaded_more_than_three_months_ago = "0.3" + data_uploaded_more_than_six_months_ago = "0.1" + + namespace = MemoryRoute(str(text_time_diff)) + + return namespace async def freshness(self, observation: str, namespace: str = None) -> list[str]: - """Freshness - Score between 1 and 5 on how often was the information updated in episodic or semantic memory in the past""" + """Freshness - Score between 0 and 1 on how often was the information updated in episodic or semantic memory in the past""" - memory = Memory(user_id=self.user_id) - await memory.async_init() - - lookup_value = await memory._fetch_episodic_memory(observation=observation) - unix_t = lookup_value["data"]["Get"]["EPISODICMEMORY"][0]["_additional"]["lastUpdateTimeUnix"] + lookup_value = await self.fetch_memories( + observation=observation, namespace=namespace + ) + unix_t = lookup_value["data"]["Get"]["EPISODICMEMORY"][0]["_additional"][ + "lastUpdateTimeUnix" + ] # Convert Unix timestamp to datetime last_update_datetime = datetime.fromtimestamp(int(unix_t) / 1000) time_difference = datetime.now() - last_update_datetime time_difference_text = humanize.naturaltime(time_difference) - marvin.settings.openai.api_key = os.environ.get('OPENAI_API_KEY') - - @ai_classifier - class MemoryRoute(Enum): - """Represents classifer for freshness of memories""" - - data_uploaded_now = "0" - data_uploaded_very_recently = "1" - data_uploaded_recently = "2" - data_uploaded_more_than_a_month_ago = "3" - data_uploaded_more_than_three_months_ago = "4" - data_uploaded_more_than_six_months_ago = "5" - - namespace = MemoryRoute(str(time_difference_text)) + namespace = await self.memory_route(str(time_difference_text)) return [namespace.value, lookup_value] async def frequency(self, observation: str, namespace: str) -> list[str]: - """Frequency - Score between 1 and 5 on how often was the information processed in episodic memory in the past - Counts the number of times a memory was accessed in the past and divides it by the total number of memories in the episodic memory """ - client = self.init_weaviate_client(self.namespace) + """Frequency - Score between 0 and 1 on how often was the information processed in episodic memory in the past + Counts the number of times a memory was accessed in the past and divides it by the total number of memories in the episodic memory + """ + weaviate_client = self.init_client(namespace=namespace) - memory = Memory(user_id=self.user_id) - await memory.async_init() - - result_output = await memory._fetch_episodic_memory(observation=observation) + result_output = await self.fetch_memories( + observation=observation, params=None, namespace=namespace + ) number_of_relevant_events = len(result_output["data"]["Get"]["EPISODICMEMORY"]) - number_of_total_events = client.query.aggregate(self.namespace).with_meta_count().do() - frequency = float(number_of_relevant_events) / float(number_of_total_events) + number_of_total_events = ( + weaviate_client.query.aggregate(namespace).with_meta_count().do() + ) + frequency = float(number_of_relevant_events) / float( + number_of_total_events["data"]["Aggregate"]["EPISODICMEMORY"][0]["meta"][ + "count" + ] + ) return [str(frequency), result_output["data"]["Get"]["EPISODICMEMORY"][0]] - async def relevance(self, observation: str) -> list[str]: - """Relevance - Score between 1 and 5 on how often was the final information relevant to the user in the past. - Stored in the episodic memory, mainly to show how well a buffer did the job - Starts at 1, gets updated based on the user feedback """ + async def relevance(self, observation: str, namespace: str) -> list[str]: + """Relevance - Score between 0 and 1 on how often was the final information relevant to the user in the past. + Stored in the episodic memory, mainly to show how well a buffer did the job + Starts at 0, gets updated based on the user feedback""" - return ["5", "memory"] + return ["0", "memory"] - async def saliency(self, observation: str) -> list[str]: - """Determines saliency by finding relevance between user input and document schema values. - After finding document schena value relevant for the user, it forms a new query based on the schema value and the user input """ + async def saliency(self, observation: str, namespace=None) -> list[str]: + """Determines saliency by scoring the set of retrieved documents against each other and trying to determine saliency + """ + class SaliencyRawList(BaseModel): + """Schema for documentGroups""" + original_document: str = Field( + ..., + description="The original document retrieved from the database") + saliency_score: str = Field( + None, description="The score between 0 and 1") + class SailencyContextList(BaseModel): + """Buffer raw context processed by the buffer""" - return ["5", "memory"] + docs: List[SaliencyRawList] = Field(..., description="List of docs") + observation: str = Field(..., description="The original user query") - # @ai_classifier - # class MemoryRoute(Enum): - # """Represents classifer for freshness of memories""" - # - # data_uploaded_now = "0" - # data_uploaded_very_recently = "1" - # data_uploaded_recently = "2" - # data_uploaded_more_than_a_month_ago = "3" - # data_uploaded_more_than_three_months_ago = "4" - # data_uploaded_more_than_six_months_ago = "5" - # - # namespace= MemoryRoute(observation) + parser = PydanticOutputParser(pydantic_object=SailencyContextList) + prompt = PromptTemplate( + template="Determine saliency of documents compared to the other documents retrieved \n{format_instructions}\nOriginal observation is: {query}\n", + input_variables=["query"], + partial_variables={"format_instructions": parser.get_format_instructions()}, + ) - # return ggur + _input = prompt.format_prompt(query=observation) + document_context_result = self.llm_base(_input.to_string()) + document_context_result_parsed = parser.parse(document_context_result) + return document_context_result_parsed.json() - async def encoding(self, document: str, namespace: str = "EPISODICBUFFER", params: dict = None) -> list[str]: - """Encoding for the buffer, stores raw data in the buffer - Note, this is not comp-sci encoding, but rather encoding in the sense of storing the content in the buffer""" - query = await self.add_memories(document, params=params) - return query + + + async def handle_modulator( + self, + modulator_name: str, + attention_modulators: Dict[str, float], + observation: str, + namespace: Optional[str] = None, + ) -> Optional[List[Union[str, float]]]: + """ + Handle the given modulator based on the observation and namespace. + + Parameters: + - modulator_name: Name of the modulator to handle. + - attention_modulators: Dictionary of modulator values. + - observation: The current observation. + - namespace: An optional namespace. + + Returns: + - Result of the modulator if criteria met, else None. + """ + modulator_value = attention_modulators.get(modulator_name, 0.0) + modulator_functions = { + "freshness": lambda obs, ns: self.freshness(observation=obs, namespace=ns), + "frequency": lambda obs, ns: self.frequency(observation=obs, namespace=ns), + "relevance": lambda obs, ns: self.relevance(observation=obs, namespace=ns), + "saliency": lambda obs, ns: self.saliency(observation=obs, namespace=ns), + } + + result_func = modulator_functions.get(modulator_name) + if not result_func: + return None + + result = await result_func(observation, namespace) + if not result: + return None + + try: + if float(modulator_value) >= float(result[0]): + return result + except ValueError: + pass + + return None async def available_operations(self) -> list[str]: """Determines what operations are available for the user to process PDFs""" - return ["translate", "structure", "load to database", "load to semantic memory", "load to episodic memory", - "load to buffer"] + return [ + "translate", + "structure", + "fetch from vector store" + # "load to semantic memory", + # "load to episodic memory", + # "load to buffer", + ] - async def buffer_context(self, user_input=None, content=None, params=None): + async def buffer_context( + self, + user_input=None, + params=None, + attention_modulators: dict = None, + ): """Generates the context to be used for the buffer and passed to the agent""" - # we get a list of available operations for our buffer to consider - # these operations are what we can do with the data, in the context of PDFs (load, translate, structure, etc) - list_of_operations = await self.available_operations() - try: # we delete all memories in the episodic buffer, so we can start fresh await self.delete_memories() except: # in case there are no memories, we pass pass - # we just filter the data here to make sure input is clean prompt_filter = ChatPromptTemplate.from_template( - "Filter and remove uneccessary information that is not relevant in the user query, keep it as original as possbile: {query}") + "Filter and remove uneccessary information that is not relevant in the query to the vector store to get more information, keep it as original as possbile: {query}" + ) chain_filter = prompt_filter | self.llm output = await chain_filter.ainvoke({"query": user_input}) - # this part is mostly unfinished but the idea is to apply different algorithms to the data to fetch the most relevant information from the vector stores + # this part is unfinished but the idea is to apply different attention modulators to the data to fetch the most relevant information from the vector stores context = [] - if params: + if attention_modulators: + print("HERE ARE THE ATTENTION MODULATORS: ", attention_modulators) + from typing import Optional, Dict, List, Union - if "freshness" in params: - params.get('freshness', None) # get the value of freshness - freshness = await self.freshness(observation=str(output)) - context.append(freshness) - - elif "frequency" in params: - params.get('freshness', None) - frequency = await self.freshness(observation=str(output)) - print("freshness", frequency) - context.append(frequency) - - # fix this so it actually filters + lookup_value_semantic = await self.fetch_memories( + observation=str(output), namespace="SEMANTICMEMORY" + ) + context = [] + for memory in lookup_value_semantic["data"]["Get"]["SEMANTICMEMORY"]: + # extract memory id, and pass it to fetch function as a parameter + modulators = list(attention_modulators.keys()) + for modulator in modulators: + result = await self.handle_modulator( + modulator, + attention_modulators, + str(output), + namespace="EPISODICMEMORY", + ) + if result: + context.append(result) + context.append(memory) else: # defaults to semantic search if we don't want to apply algorithms on the vectordb data - memory = Memory(user_id=self.user_id) - await memory.async_init() - - lookup_value_episodic = await memory._fetch_episodic_memory(observation=str(output)) - lookup_value_semantic = await memory._fetch_semantic_memory(observation=str(output)) + lookup_value_episodic = await self.fetch_memories( + observation=str(output), namespace="EPISODICMEMORY" + ) + lookup_value_semantic = await self.fetch_memories( + observation=str(output), namespace="SEMANTICMEMORY" + ) lookup_value_buffer = await self.fetch_memories(observation=str(output)) context.append(lookup_value_buffer) context.append(lookup_value_semantic) context.append(lookup_value_episodic) - # copy the context over into the buffer - # do i need to do it for the episodic + raw data, might make sense - print("HERE IS THE CONTEXT", context) + class BufferModulators(BaseModel): + frequency: str = Field(..., description="Frequency score of the document") + saliency: str = Field(..., description="Saliency score of the document") + relevance: str = Field(..., description="Relevance score of the document") class BufferRawContextTerms(BaseModel): """Schema for documentGroups""" - semantic_search_term: str = Field(..., - description="The search term to use to get relevant input based on user query") - document_description: str = Field(None, description="The short summary of what the document is about") - document_relevance: str = Field(None, - description="The relevance of the document for the task on the scale from 1 to 5") + + semantic_search_term: str = Field( + ..., + description="The search term to use to get relevant input based on user query", + ) + document_content: str = Field( + None, description="Shortened original content of the document" + ) + document_relevance: str = Field( + None, + description="The relevance of the document for the task on the scale from 0 to 1", + ) + attention_modulators_list: List[BufferModulators] = Field( + ..., description="List of modulators" + ) class BufferRawContextList(BaseModel): """Buffer raw context processed by the buffer""" + docs: List[BufferRawContextTerms] = Field(..., description="List of docs") user_query: str = Field(..., description="The original user query") @@ -564,79 +767,111 @@ class EpisodicBuffer(BaseMemory): ) _input = prompt.format_prompt(query=user_input, context=context) - document_context_result = self.llm_base(_input.to_string()) - document_context_result_parsed = parser.parse(document_context_result) + print("HERE ARE THE DOCS PARSED AND STRUCTURED", document_context_result_parsed.json()) - print("HERE ARE THE DOCS PARSED AND STRUCTURED", document_context_result_parsed) + return document_context_result_parsed + + async def get_task_list( + self, user_input=None, params=None, attention_modulators=None, + ): + """Gets the task list from the document context result to enchance it and be able to pass to the agent""" + list_of_operations = await self.available_operations() class Task(BaseModel): """Schema for an individual task.""" - task_order: str = Field(..., description="The order at which the task needs to be performed") - task_name: str = Field(None, description="The task that needs to be performed") + + task_order: str = Field( + ..., description="The order at which the task needs to be performed" + ) + task_name: str = Field( + None, description="The task that needs to be performed" + ) operation: str = Field(None, description="The operation to be performed") - original_query: str = Field(None, description="Original user query provided") + original_query: str = Field( + None, description="Original user query provided" + ) class TaskList(BaseModel): """Schema for the record containing a list of tasks.""" + tasks: List[Task] = Field(..., description="List of tasks") - prompt_filter_chunk = f"The raw context data is {str(document_context_result_parsed)} Based on available operations {list_of_operations} determine only the relevant list of steps and operations sequentially based {output}" - # chain_filter_chunk = prompt_filter_chunk | self.llm.bind(function_call={"TaskList": "tasks"}, functions=TaskList) - # output_chunk = await chain_filter_chunk.ainvoke({"query": output, "list_of_operations": list_of_operations}) + prompt_filter_chunk = f" Based on available operations {list_of_operations} determine only the relevant list of steps and operations sequentially based {user_input}" prompt_msgs = [ SystemMessage( content="You are a world class algorithm for decomposing prompts into steps and operations and choosing relevant ones" ), - HumanMessage(content="Decompose based on the following prompt:"), + HumanMessage(content="Decompose based on the following prompt and provide relevant document context reponse:"), HumanMessagePromptTemplate.from_template("{input}"), HumanMessage(content="Tips: Make sure to answer in the correct format"), - HumanMessage(content="Tips: Only choose actions that are relevant to the user query and ignore others") - + HumanMessage( + content="Tips: Only choose actions that are relevant to the user query and ignore others" + ) ] prompt_ = ChatPromptTemplate(messages=prompt_msgs) - chain = create_structured_output_chain(TaskList, self.llm, prompt_, verbose=True) + chain = create_structured_output_chain( + TaskList, self.llm, prompt_, verbose=True + ) from langchain.callbacks import get_openai_callback + with get_openai_callback() as cb: output = await chain.arun(input=prompt_filter_chunk, verbose=True) print(cb) # output = json.dumps(output) my_object = parse_obj_as(TaskList, output) print("HERE IS THE OUTPUT", my_object.json()) - data = json.loads(my_object.json()) - # Extract the list of tasks tasks_list = data["tasks"] return tasks_list - async def main_buffer(self, user_input=None, content=None, params=None): + async def main_buffer( + self, user_input=None, params=None, attention_modulators=None + ): + """AI buffer to run the AI agent to execute the set of tasks""" - """AI buffer to understand user PDF query, prioritize memory info and process it based on available operations""" - - memory = Memory(user_id=self.user_id) - await memory.async_init() - tasks_list = await self.buffer_context(user_input=user_input, content=content, params=params) + document_context_result_parsed = await self.buffer_context( + user_input=user_input, + params=params, + attention_modulators=attention_modulators, + ) + tasks_list = await self.get_task_list( + user_input=user_input, + params=params, + attention_modulators=attention_modulators + ) result_tasks = [] + document_context_result_parsed = document_context_result_parsed.dict() + document_from_vectorstore = [doc["document_content"] for doc in document_context_result_parsed["docs"]] for task in tasks_list: + print("HERE IS THE TASK", task) + + complete_agent_prompt= f" Document context is: {document_from_vectorstore} \n Task is : {task['task_order']} {task['task_name']} {task['operation']} " + + # task['vector_store_context_results']=document_context_result_parsed.dict() class PromptWrapper(BaseModel): observation: str = Field( description="observation we want to fetch from vectordb" ) - @tool("convert_to_structured", args_schema=PromptWrapper, return_direct=True) + @tool( + "convert_to_structured", args_schema=PromptWrapper, return_direct=True + ) def convert_to_structured(observation=None, json_schema=None): """Convert unstructured data to structured data""" BASE_DIR = os.getcwd() - json_path = os.path.join(BASE_DIR, "schema_registry", "ticket_schema.json") + json_path = os.path.join( + BASE_DIR, "schema_registry", "ticket_schema.json" + ) def load_json_or_infer_schema(file_path, document_path): """Load JSON schema from file or infer schema from text""" # Attempt to load the JSON file - with open(file_path, 'r') as file: + with open(file_path, "r") as file: json_schema = json.load(file) return json_schema @@ -649,64 +884,91 @@ class EpisodicBuffer(BaseMemory): SystemMessage( content="You are a world class algorithm converting unstructured data into structured data." ), - HumanMessage(content="Convert unstructured data to structured data:"), + HumanMessage( + content="Convert unstructured data to structured data:" + ), HumanMessagePromptTemplate.from_template("{input}"), - HumanMessage(content="Tips: Make sure to answer in the correct format"), + HumanMessage( + content="Tips: Make sure to answer in the correct format" + ), ] prompt_ = ChatPromptTemplate(messages=prompt_msgs) - chain_funct = create_structured_output_chain(json_schema, prompt=prompt_, llm=self.llm, - verbose=True) + chain_funct = create_structured_output_chain( + json_schema, prompt=prompt_, llm=self.llm, verbose=True + ) output = chain_funct.run(input=observation, llm=self.llm) return output result = run_open_ai_mapper(observation, json_schema) return result + class FetchText(BaseModel): + observation: str = Field(description="observation we want to translate") + @tool("fetch_from_vector_store", args_schema=FetchText, return_direct=True) + def fetch_from_vector_store(observation, args_schema=FetchText): + """Fetch from vectorstore if data doesn't exist in the context""" + if document_context_result_parsed: + return document_context_result_parsed + else: + out = self.fetch_memories(observation['original_query'], namespace="SEMANTICMEMORY") + return out + class TranslateText(BaseModel): - observation: str = Field( - description="observation we want to translate" - ) + observation: str = Field(description="observation we want to translate") @tool("translate_to_de", args_schema=TranslateText, return_direct=True) def translate_to_de(observation, args_schema=TranslateText): """Translate to English""" - out = GoogleTranslator(source='auto', target='de').translate(text=observation) + out = GoogleTranslator(source="auto", target="de").translate( + text=observation + ) return out agent = initialize_agent( llm=self.llm, - tools=[translate_to_de, convert_to_structured], + tools=[fetch_from_vector_store,translate_to_de, convert_to_structured], agent=AgentType.OPENAI_FUNCTIONS, - verbose=True, ) - print("HERE IS THE TASK", task) - output = agent.run(input=task) - print(output) + + output = agent.run(input=complete_agent_prompt ) + result_tasks.append(task) result_tasks.append(output) - print("HERE IS THE RESULT TASKS", str(result_tasks)) - - await self.encoding(str(result_tasks), self.namespace, params=params) - - buffer_result = await self.fetch_memories(observation=str(output)) - - print("HERE IS THE RESULT TASKS", str(buffer_result)) + # print("HERE IS THE RESULT TASKS", str(result_tasks)) + # + # buffer_result = await self.fetch_memories(observation=str(user_input)) + # + # print("HERE IS THE RESULT TASKS", str(buffer_result)) class EpisodicTask(BaseModel): """Schema for an individual task.""" - task_order: str = Field(..., description="The order at which the task needs to be performed") - task_name: str = Field(None, description="The task that needs to be performed") + + task_order: str = Field( + ..., description="The order at which the task needs to be performed" + ) + task_name: str = Field( + None, description="The task that needs to be performed" + ) operation: str = Field(None, description="The operation to be performed") - operation_result: str = Field(None, description="The result of the operation") + operation_result: str = Field( + None, description="The result of the operation" + ) class EpisodicList(BaseModel): """Schema for the record containing a list of tasks.""" + tasks: List[EpisodicTask] = Field(..., description="List of tasks") - start_date: str = Field(..., description="The order at which the task needs to be performed") - end_date: str = Field(..., description="The order at which the task needs to be performed") - user_query: str = Field(..., description="The order at which the task needs to be performed") + start_date: str = Field( + ..., description="The order at which the task needs to be performed" + ) + end_date: str = Field( + ..., description="The order at which the task needs to be performed" + ) + user_query: str = Field( + ..., description="The order at which the task needs to be performed" + ) parser = PydanticOutputParser(pydantic_object=EpisodicList) @@ -716,21 +978,30 @@ class EpisodicBuffer(BaseMemory): partial_variables={"format_instructions": parser.get_format_instructions()}, ) - _input = prompt.format_prompt(query=user_input, steps=str(tasks_list), buffer=buffer_result) + _input = prompt.format_prompt( + query=user_input, steps=str(tasks_list) + , buffer=str(result_tasks) + ) # return "a few things to do like load episodic memory in a structured format" - output = self.llm_base(_input.to_string()) result_parsing = parser.parse(output) - print("here is the parsing result", result_parsing) - lookup_value = await memory._add_episodic_memory(observation=str(output), params=params) + lookup_value = await self.add_memories( + observation=str(result_parsing.json()), params=params, namespace='EPISODICMEMORY' + ) + # print("THE RESULT OF THIS QUERY IS ", result_parsing.json()) await self.delete_memories() - return lookup_value + return result_parsing.json() class LongTermMemory: - def __init__(self, user_id: str = "676", memory_id: Optional[str] = None, index_name: Optional[str] = None, - db_type: str = "weaviate"): + def __init__( + self, + user_id: str = "676", + memory_id: Optional[str] = None, + index_name: Optional[str] = None, + db_type: str = "weaviate", + ): self.user_id = user_id self.memory_id = memory_id self.ltm_memory_id = str(uuid.uuid4()) @@ -741,8 +1012,13 @@ class LongTermMemory: class ShortTermMemory: - def __init__(self, user_id: str = "676", memory_id: Optional[str] = None, index_name: Optional[str] = None, - db_type: str = "weaviate"): + def __init__( + self, + user_id: str = "676", + memory_id: Optional[str] = None, + index_name: Optional[str] = None, + db_type: str = "weaviate", + ): self.user_id = user_id self.memory_id = memory_id self.stm_memory_id = str(uuid.uuid4()) @@ -751,18 +1027,20 @@ class ShortTermMemory: self.episodic_buffer = EpisodicBuffer(user_id, memory_id, index_name, db_type) - - - - - class Memory: load_dotenv() OPENAI_TEMPERATURE = float(os.getenv("OPENAI_TEMPERATURE", 0.0)) OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") - def __init__(self, user_id: str = "676", index_name: str = None, knowledge_source: str = None, - knowledge_type: str = None, db_type: str = "weaviate", namespace: str = None) -> None: + def __init__( + self, + user_id: str = "676", + index_name: str = None, + knowledge_source: str = None, + knowledge_type: str = None, + db_type: str = "weaviate", + namespace: str = None, + ) -> None: self.user_id = user_id self.index_name = index_name self.db_type = db_type @@ -775,45 +1053,55 @@ class Memory: load_dotenv() # Asynchronous factory function for creating LongTermMemory - async def async_create_long_term_memory(self, user_id, memory_id, index_name, db_type): + async def async_create_long_term_memory( + self, user_id, memory_id, index_name, db_type + ): # Perform asynchronous initialization steps if needed return LongTermMemory( - user_id=self.user_id, memory_id=self.memory_id, index_name=self.index_name, - db_type=self.db_type + user_id=self.user_id, + memory_id=self.memory_id, + index_name=self.index_name, + db_type=self.db_type, ) async def async_init(self): # Asynchronous initialization of LongTermMemory and ShortTermMemory self.long_term_memory = await self.async_create_long_term_memory( - user_id=self.user_id, memory_id=self.memory_id, index_name=self.index_name, - db_type=self.db_type + user_id=self.user_id, + memory_id=self.memory_id, + index_name=self.index_name, + db_type=self.db_type, ) self.short_term_memory = await self.async_create_short_term_memory( - user_id=self.user_id, memory_id=self.memory_id, index_name=self.index_name, - db_type=self.db_type + user_id=self.user_id, + memory_id=self.memory_id, + index_name=self.index_name, + db_type=self.db_type, ) - async def async_create_short_term_memory(self, user_id, memory_id, index_name, db_type): + async def async_create_short_term_memory( + self, user_id, memory_id, index_name, db_type + ): # Perform asynchronous initialization steps if needed return ShortTermMemory( - user_id=self.user_id, memory_id=self.memory_id, index_name=self.index_name, db_type=self.db_type + user_id=self.user_id, + memory_id=self.memory_id, + index_name=self.index_name, + db_type=self.db_type, ) - - # self.short_term_memory = await ShortTermMemory.async_init( - # user_id=self.user_id, memory_id=self.memory_id, index_name=self.index_name, db_type=self.db_type - # ) - - async def _add_semantic_memory(self, semantic_memory: str, params: dict = None): + async def _add_semantic_memory( + self, observation: str, loader_settings: dict = None, params: dict = None + ): return await self.long_term_memory.semantic_memory.add_memories( - semantic_memory=semantic_memory, params=params - + observation=observation, + loader_settings=loader_settings, + params=params, ) async def _fetch_semantic_memory(self, observation, params): return await self.long_term_memory.semantic_memory.fetch_memories( observation=observation, params=params - ) async def _delete_semantic_memory(self, params: str = None): @@ -821,10 +1109,11 @@ class Memory: params=params ) - async def _add_episodic_memory(self, observation: str, params: dict = None): + async def _add_episodic_memory( + self, observation: str, loader_settings: dict = None, params: dict = None + ): return await self.long_term_memory.episodic_memory.add_memories( - observation=observation, params=params - + observation=observation, loader_settings=loader_settings, params=params ) async def _fetch_episodic_memory(self, observation, params: str = None): @@ -837,30 +1126,66 @@ class Memory: params=params ) - async def _run_buffer(self, user_input: str, content: str = None, params:str=None): - return await self.short_term_memory.episodic_buffer.main_buffer(user_input=user_input, content=content, params=params) - async def _add_buffer_memory(self, user_input: str, namespace: str = None, params: dict = None): - return await self.short_term_memory.episodic_buffer.add_memories(observation=user_input, - params=params) + async def _add_buffer_memory( + self, + user_input: str, + namespace: str = None, + loader_settings: dict = None, + params: dict = None, + ): + return await self.short_term_memory.episodic_buffer.add_memories( + observation=user_input, loader_settings=loader_settings, params=params + ) - async def _fetch_buffer_memory(self, user_input: str, namespace: str = None): - return await self.short_term_memory.episodic_buffer.fetch_memories(observation=user_input) + async def _fetch_buffer_memory(self, user_input: str): + return await self.short_term_memory.episodic_buffer.fetch_memories( + observation=user_input + ) async def _delete_buffer_memory(self, params: str = None): return await self.short_term_memory.episodic_buffer.delete_memories( params=params ) - async def _create_buffer_context(self, user_input: str, content: str = None, params:str=None): + + async def _create_buffer_context( + self, + user_input: str, + params: dict = None, + attention_modulators: dict = None, + ): return await self.short_term_memory.episodic_buffer.buffer_context( - user_input=user_input, content=content, params=params + user_input=user_input, + params=params, + attention_modulators=attention_modulators, ) + async def _get_task_list( + self, + user_input: str, + params: str = None, + attention_modulators: dict = None, + ): + return await self.short_term_memory.episodic_buffer.get_task_list( + user_input=user_input, + params=params, + attention_modulators=attention_modulators, + ) + async def _run_main_buffer( + self, + user_input: str, + params: dict = None, + attention_modulators: dict = None, + ): + return await self.short_term_memory.episodic_buffer.main_buffer( + user_input=user_input, + params=params, + attention_modulators=attention_modulators, + ) + async def _available_operations(self): return await self.long_term_memory.episodic_buffer.available_operations() - - async def main(): memory = Memory(user_id="123") await memory.async_init() @@ -875,41 +1200,37 @@ async def main(): "owner": "John Doe", "license": "MIT", "validity_start": "2023-08-01", - "validity_end": "2024-07-31" + "validity_end": "2024-07-31", } - gg = await memory._run_buffer(user_input="i NEED TRANSLATION TO GERMAN ", content="i NEED TRANSLATION TO GERMAN ", params=params) - print(gg) + # gg = await memory._run_buffer(user_input="i NEED TRANSLATION TO GERMAN ", content="i NEED TRANSLATION TO GERMAN ", params=params) + # print(gg) # gg = await memory._fetch_buffer_memory(user_input="i TO GERMAN ") # print(gg) - episodic = """{ - "start_date": "2023-08-23", - "end_date": "2023-08-30", - "user_query": "How can I plan a healthy diet?", - "action_steps": [ - { - "step_number": 1, - "description": "Research and gather information about basic principles of a healthy diet." - }, - { - "step_number": 2, - "description": "Create a weekly meal plan that includes a variety of nutritious foods." - }, - { - "step_number": 3, - "description": "Prepare and cook meals according to your meal plan. Include fruits, vegetables, lean proteins, and whole grains." - } - ] - }""" - # - # ggur = await memory._delete_buffer_memory() - # print(ggur) - # ggur = await memory._add_buffer_memory(user_input = episodic, params=params) - # print(ggur) - # fff = await memory._fetch_episodic_memory(observation = "healthy diet") + modulator = {"relevance": 0.0, "saliency": 0.0, "frequency": 0.0} + # # + ggur = await memory._run_main_buffer( + user_input="I want to know how does Buck adapt to life in the wild and then have that info translated to german ", + params=params, + attention_modulators=modulator, + ) + print(ggur) + + ll = { + "format": "PDF", + "source": "url", + "path": "https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf" + } + # ggur = await memory._add_semantic_memory(observation = "bla", loader_settings=ll, params=params) + # print(ggur) + # fff = await memory._delete_semantic_memory() + # print(fff) + + # fff = await memory._fetch_semantic_memory(observation = "dog pulling sleds ", params=None) + # print(fff) # print(len(fff["data"]["Get"]["EPISODICMEMORY"])) @@ -918,17 +1239,3 @@ if __name__ == "__main__": asyncio.run(main()) - # bb = agent._update_semantic_memory(semantic_memory="Users core summary") - # bb = agent._fetch_semantic_memory(observation= "Users core summary", params = { - # "path": ["inserted_at"], - # "operator": "Equal", - # "valueText": "*2023*" - # }) - # buffer = agent._run_buffer(user_input="I want to get a schema for my data") - # print(bb) - # rrr = { - # "path": ["year"], - # "operator": "Equal", - # "valueText": "2017*" - # } -