Merge pull request #26 from topoteretes/add_async_elements

Add async elements
This commit is contained in:
Vasilije 2023-10-15 18:49:28 +02:00 committed by GitHub
commit 57706d468c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 592 additions and 310 deletions

View file

@ -44,6 +44,7 @@ RUN apt-get update -q && \
WORKDIR /app WORKDIR /app
COPY . /app COPY . /app
COPY entrypoint.sh /app/entrypoint.sh COPY entrypoint.sh /app/entrypoint.sh
COPY scripts/create_database.py /app/create_database.py
RUN chmod +x /app/entrypoint.sh RUN chmod +x /app/entrypoint.sh
ENTRYPOINT ["/app/entrypoint.sh"] ENTRYPOINT ["/app/entrypoint.sh"]

View file

@ -7,6 +7,8 @@ from fastapi import FastAPI
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from pydantic import BaseModel from pydantic import BaseModel
from level_3.database.database import AsyncSessionLocal
from level_3.database.database_crud import session_scope
from vectorstore_manager import Memory from vectorstore_manager import Memory
from dotenv import load_dotenv from dotenv import load_dotenv
# Set up logging # Set up logging
@ -62,21 +64,28 @@ def memory_factory(memory_type):
# files: List[UploadFile] = File(...), # files: List[UploadFile] = File(...),
): ):
try: try:
logging.info(" Init PDF processing") logging.info(" Adding to Memory ")
decoded_payload = payload.payload decoded_payload = payload.payload
async with session_scope(session=AsyncSessionLocal()) as session:
Memory_ = Memory(user_id=decoded_payload["user_id"]) memory = await Memory.create_memory(decoded_payload["user_id"], session, namespace='SEMANTICMEMORY')
await Memory_.async_init() # Adding a memory instance
await memory.add_memory_instance(decoded_payload["memory_object"])
memory_class = getattr(Memory_, f"_add_{memory_type}_memory", None) # Managing memory attributes
output = await memory_class( existing_user = await Memory.check_existing_user(decoded_payload["user_id"], session)
observation=decoded_payload["prompt"], await memory.manage_memory_attributes(existing_user)
loader_settings=decoded_payload["loader_settings"], await memory.add_dynamic_memory_class(decoded_payload["memory_object"], decoded_payload["memory_object"].upper())
params=decoded_payload["params"], memory_class = decoded_payload["memory_object"] + "_class"
) dynamic_memory_class = getattr(memory, memory_class.lower(), None)
return JSONResponse(content={"response": output}, status_code=200)
await memory.add_method_to_class(dynamic_memory_class, 'add_memories')
# await memory.add_method_to_class(memory.semanticmemory_class, 'fetch_memories')
output = await memory.dynamic_method_call(dynamic_memory_class, 'add_memories',
observation='some_observation', params=decoded_payload["params"],
loader_settings=decoded_payload["loader_settings"])
return JSONResponse(content={"response": output}, status_code=200)
except Exception as e: except Exception as e:
return JSONResponse( return JSONResponse(
@ -89,15 +98,27 @@ def memory_factory(memory_type):
# files: List[UploadFile] = File(...), # files: List[UploadFile] = File(...),
): ):
try: try:
logging.info(" Adding to Memory ")
decoded_payload = payload.payload decoded_payload = payload.payload
async with session_scope(session=AsyncSessionLocal()) as session:
Memory_ = Memory(user_id=decoded_payload["user_id"]) memory = await Memory.create_memory(decoded_payload["user_id"], session, namespace='SEMANTICMEMORY')
await Memory_.async_init() # Adding a memory instance
await memory.add_memory_instance(decoded_payload["memory_object"])
memory_class = getattr(Memory_, f"_fetch_{memory_type}_memory", None) # Managing memory attributes
output = memory_class(observation=decoded_payload["prompt"]) existing_user = await Memory.check_existing_user(decoded_payload["user_id"], session)
return JSONResponse(content={"response": output}, status_code=200) await memory.manage_memory_attributes(existing_user)
await memory.add_dynamic_memory_class(decoded_payload["memory_object"], decoded_payload["memory_object"].upper())
memory_class = decoded_payload["memory_object"] + "_class"
dynamic_memory_class = getattr(memory, memory_class.lower(), None)
await memory.add_method_to_class(dynamic_memory_class, 'add_memories')
# await memory.add_method_to_class(memory.semanticmemory_class, 'fetch_memories')
output = await memory.dynamic_method_call(dynamic_memory_class, 'fetch_memories',
observation=decoded_payload['observation'])
return JSONResponse(content={"response": output}, status_code=200)
except Exception as e: except Exception as e:
return JSONResponse( return JSONResponse(
@ -110,141 +131,152 @@ def memory_factory(memory_type):
# files: List[UploadFile] = File(...), # files: List[UploadFile] = File(...),
): ):
try: try:
logging.info(" Adding to Memory ")
decoded_payload = payload.payload decoded_payload = payload.payload
async with session_scope(session=AsyncSessionLocal()) as session:
Memory_ = Memory(user_id=decoded_payload["user_id"]) memory = await Memory.create_memory(decoded_payload["user_id"], session, namespace='SEMANTICMEMORY')
await Memory_.async_init() # Adding a memory instance
await memory.add_memory_instance(decoded_payload["memory_object"])
memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None) # Managing memory attributes
output = memory_class(observation=decoded_payload["prompt"]) existing_user = await Memory.check_existing_user(decoded_payload["user_id"], session)
return JSONResponse(content={"response": output}, status_code=200) await memory.manage_memory_attributes(existing_user)
await memory.add_dynamic_memory_class(decoded_payload["memory_object"], decoded_payload["memory_object"].upper())
memory_class = decoded_payload["memory_object"] + "_class"
dynamic_memory_class = getattr(memory, memory_class.lower(), None)
await memory.add_method_to_class(dynamic_memory_class, 'delete_memories')
# await memory.add_method_to_class(memory.semanticmemory_class, 'fetch_memories')
output = await memory.dynamic_method_call(dynamic_memory_class, 'delete_memories',
namespace=decoded_payload["memory_object"].upper())
return JSONResponse(content={"response": output}, status_code=200)
except Exception as e: except Exception as e:
return JSONResponse( return JSONResponse(
content={"response": {"error": str(e)}}, status_code=503 content={"response": {"error": str(e)}}, status_code=503
) )
memory_list = ["episodic", "buffer", "semantic"] memory_list = ["episodic", "buffer", "semantic"]
for memory_type in memory_list: for memory_type in memory_list:
memory_factory(memory_type) memory_factory(memory_type)
@app.get("/available-buffer-actions", response_model=dict) # @app.get("/available-buffer-actions", response_model=dict)
async def available_buffer_actions( # async def available_buffer_actions(
payload: Payload, # payload: Payload,
# files: List[UploadFile] = File(...), # # files: List[UploadFile] = File(...),
): # ):
try: # try:
decoded_payload = payload.payload # decoded_payload = payload.payload
#
Memory_ = Memory(user_id=decoded_payload["user_id"]) # Memory_ = Memory(user_id=decoded_payload["user_id"])
#
await Memory_.async_init() # await Memory_.async_init()
#
# memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None) # # memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None)
output = await Memory_._available_operations() # output = await Memory_._available_operations()
return JSONResponse(content={"response": output}, status_code=200) # return JSONResponse(content={"response": output}, status_code=200)
#
except Exception as e: # except Exception as e:
return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) # return JSONResponse(content={"response": {"error": str(e)}}, status_code=503)
@app.post("/run-buffer", response_model=dict) # @app.post("/run-buffer", response_model=dict)
async def run_buffer( # async def run_buffer(
payload: Payload, # payload: Payload,
# files: List[UploadFile] = File(...), # # files: List[UploadFile] = File(...),
): # ):
try: # try:
decoded_payload = payload.payload # decoded_payload = payload.payload
#
Memory_ = Memory(user_id=decoded_payload["user_id"]) # Memory_ = Memory(user_id=decoded_payload["user_id"])
#
await Memory_.async_init() # await Memory_.async_init()
#
# memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None) # # memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None)
output = await Memory_._run_main_buffer( # output = await Memory_._run_main_buffer(
user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"] # user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"]
) # )
return JSONResponse(content={"response": output}, status_code=200) # return JSONResponse(content={"response": output}, status_code=200)
#
except Exception as e: # except Exception as e:
return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) # return JSONResponse(content={"response": {"error": str(e)}}, status_code=503)
#
#
@app.post("/buffer/create-context", response_model=dict) # @app.post("/buffer/create-context", response_model=dict)
async def create_context( # async def create_context(
payload: Payload, # payload: Payload,
# files: List[UploadFile] = File(...), # # files: List[UploadFile] = File(...),
): # ):
try: # try:
decoded_payload = payload.payload # decoded_payload = payload.payload
#
Memory_ = Memory(user_id=decoded_payload["user_id"]) # Memory_ = Memory(user_id=decoded_payload["user_id"])
#
await Memory_.async_init() # await Memory_.async_init()
#
# memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None) # # memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None)
output = await Memory_._create_buffer_context( # output = await Memory_._create_buffer_context(
user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"] # user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"]
) # )
return JSONResponse(content={"response": output}, status_code=200) # return JSONResponse(content={"response": output}, status_code=200)
#
except Exception as e: # except Exception as e:
return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) # return JSONResponse(content={"response": {"error": str(e)}}, status_code=503)
#
#
@app.post("/buffer/get-tasks", response_model=dict) # @app.post("/buffer/get-tasks", response_model=dict)
async def create_context( # async def create_context(
payload: Payload, # payload: Payload,
# files: List[UploadFile] = File(...), # # files: List[UploadFile] = File(...),
): # ):
try: # try:
decoded_payload = payload.payload # decoded_payload = payload.payload
#
Memory_ = Memory(user_id=decoded_payload["user_id"]) # Memory_ = Memory(user_id=decoded_payload["user_id"])
#
await Memory_.async_init() # await Memory_.async_init()
#
# memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None) # # memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None)
output = await Memory_._get_task_list( # output = await Memory_._get_task_list(
user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"] # user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"]
) # )
return JSONResponse(content={"response": output}, status_code=200) # return JSONResponse(content={"response": output}, status_code=200)
#
except Exception as e: # except Exception as e:
return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) # return JSONResponse(content={"response": {"error": str(e)}}, status_code=503)
#
#
@app.post("/buffer/provide-feedback", response_model=dict) # @app.post("/buffer/provide-feedback", response_model=dict)
async def provide_feedback( # async def provide_feedback(
payload: Payload, # payload: Payload,
# files: List[UploadFile] = File(...), # # files: List[UploadFile] = File(...),
): # ):
try: # try:
decoded_payload = payload.payload # decoded_payload = payload.payload
#
Memory_ = Memory(user_id=decoded_payload["user_id"]) # Memory_ = Memory(user_id=decoded_payload["user_id"])
#
await Memory_.async_init() # await Memory_.async_init()
#
# memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None) # # memory_class = getattr(Memory_, f"_delete_{memory_type}_memory", None)
if decoded_payload["total_score"] is None: # if decoded_payload["total_score"] is None:
#
output = await Memory_._provide_feedback( # output = await Memory_._provide_feedback(
user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=None, total_score=decoded_payload["total_score"] # user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=None, total_score=decoded_payload["total_score"]
) # )
return JSONResponse(content={"response": output}, status_code=200) # return JSONResponse(content={"response": output}, status_code=200)
else: # else:
output = await Memory_._provide_feedback( # output = await Memory_._provide_feedback(
user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"], total_score=None # user_input=decoded_payload["prompt"], params=decoded_payload["params"], attention_modulators=decoded_payload["attention_modulators"], total_score=None
) # )
return JSONResponse(content={"response": output}, status_code=200) # return JSONResponse(content={"response": output}, status_code=200)
#
#
except Exception as e: # except Exception as e:
return JSONResponse(content={"response": {"error": str(e)}}, status_code=503) # return JSONResponse(content={"response": {"error": str(e)}}, status_code=503)
def start_api_server(host: str = "0.0.0.0", port: int = 8000): def start_api_server(host: str = "0.0.0.0", port: int = 8000):
""" """
Start the API server using uvicorn. Start the API server using uvicorn.

View file

@ -23,5 +23,4 @@ async def session_scope(session):
async def add_entity(session, entity): async def add_entity(session, entity):
async with session_scope(session) as s: # Use your async session_scope async with session_scope(session) as s: # Use your async session_scope
s.add(entity) # No need to commit; session_scope takes care of it s.add(entity) # No need to commit; session_scope takes care of it
s.commit()
return "Successfully added entity" return "Successfully added entity"

View file

@ -1,6 +1,18 @@
version: "3.9" version: "3.9"
services: services:
neo4j:
image: neo4j:latest
container_name: neo4j
ports:
- "7474:7474"
- "7687:7687"
environment:
- NEO4J_AUTH=neo4j/pleaseletmein
- NEO4J_PLUGINS=["apoc"]
networks:
- promethai_mem_backend
promethai_mem: promethai_mem:
networks: networks:
- promethai_mem_backend - promethai_mem_backend
@ -10,15 +22,16 @@ services:
- "./:/app" - "./:/app"
environment: environment:
- HOST=0.0.0.0 - HOST=0.0.0.0
profiles: ["exclude-from-up"] # Use `docker-compose run teenage-agi` to get an attached container profiles: ["exclude-from-up"]
ports: ports:
- 8000:8000 - 8000:8000
- 443:443 - 443:443
postgres: postgres:
image: postgres image: postgres
container_name: postgres container_name: postgres
environment: environment:
- POSTGRES_HOST_AUTH_METHOD= trust - POSTGRES_HOST_AUTH_METHOD=trust
- POSTGRES_USER=bla - POSTGRES_USER=bla
- POSTGRES_PASSWORD=bla - POSTGRES_PASSWORD=bla
- POSTGRES_DB=bubu - POSTGRES_DB=bubu
@ -26,6 +39,7 @@ services:
- promethai_mem_backend - promethai_mem_backend
ports: ports:
- "5432:5432" - "5432:5432"
superset: superset:
platform: linux/amd64 platform: linux/amd64
build: build:
@ -43,6 +57,8 @@ services:
- promethai_mem_backend - promethai_mem_backend
ports: ports:
- '8088:8088' - '8088:8088'
networks: networks:
promethai_mem_backend: promethai_mem_backend:
name: promethai_mem_backend name: promethai_mem_backend

View file

@ -1,7 +1,7 @@
#!/bin/bash #!/bin/bash
export ENVIRONMENT export ENVIRONMENT
python fetch_secret.py python fetch_secret.py
python scripts/create_database.py python create_database.py
# Start Gunicorn # Start Gunicorn
gunicorn -w 2 -k uvicorn.workers.UvicornWorker -t 120 --bind=0.0.0.0:8000 --bind=0.0.0.0:443 --log-level debug api:app gunicorn -w 2 -k uvicorn.workers.UvicornWorker -t 120 --bind=0.0.0.0:8000 --bind=0.0.0.0:443 --log-level debug api:app

182
level_3/poetry.lock generated
View file

@ -122,6 +122,21 @@ files = [
[package.dependencies] [package.dependencies]
frozenlist = ">=1.1.0" frozenlist = ">=1.1.0"
[[package]]
name = "ansi2html"
version = "1.8.0"
description = ""
optional = false
python-versions = ">=3.6"
files = [
{file = "ansi2html-1.8.0-py3-none-any.whl", hash = "sha256:ef9cc9682539dbe524fbf8edad9c9462a308e04bce1170c32daa8fdfd0001785"},
{file = "ansi2html-1.8.0.tar.gz", hash = "sha256:38b82a298482a1fa2613f0f9c9beb3db72a8f832eeac58eb2e47bf32cd37f6d5"},
]
[package.extras]
docs = ["Sphinx", "setuptools-scm", "sphinx-rtd-theme"]
test = ["pytest", "pytest-cov"]
[[package]] [[package]]
name = "anyio" name = "anyio"
version = "3.7.1" version = "3.7.1"
@ -686,6 +701,73 @@ files = [
{file = "cymem-2.0.7.tar.gz", hash = "sha256:e6034badb5dd4e10344211c81f16505a55553a7164adc314c75bd80cf07e57a8"}, {file = "cymem-2.0.7.tar.gz", hash = "sha256:e6034badb5dd4e10344211c81f16505a55553a7164adc314c75bd80cf07e57a8"},
] ]
[[package]]
name = "dash"
version = "2.14.0"
description = "A Python framework for building reactive web-apps. Developed by Plotly."
optional = false
python-versions = ">=3.6"
files = [
{file = "dash-2.14.0-py3-none-any.whl", hash = "sha256:b88ffb53cb1bf54c12780ecf89943bb901c8914b3c075002e46519c9b17d7a72"},
{file = "dash-2.14.0.tar.gz", hash = "sha256:bd28be70be24ae1d1f764b8217a03da35e9ed895406686d24dfb6ed4e331e5a9"},
]
[package.dependencies]
ansi2html = "*"
dash-core-components = "2.0.0"
dash-html-components = "2.0.0"
dash-table = "5.0.0"
Flask = ">=1.0.4,<2.3.0"
importlib-metadata = {version = "*", markers = "python_version >= \"3.7\""}
nest-asyncio = "*"
plotly = ">=5.0.0"
requests = "*"
retrying = "*"
setuptools = "*"
typing-extensions = ">=4.1.1"
Werkzeug = "<2.3.0"
[package.extras]
celery = ["celery[redis] (>=5.1.2)", "importlib-metadata (<5)", "redis (>=3.5.3)"]
ci = ["black (==21.6b0)", "black (==22.3.0)", "dash-dangerously-set-inner-html", "dash-flow-example (==0.0.5)", "flake8 (==3.9.2)", "flaky (==3.7.0)", "flask-talisman (==1.0.0)", "isort (==4.3.21)", "jupyterlab (<4.0.0)", "mimesis", "mock (==4.0.3)", "numpy (<=1.25.2)", "openpyxl", "orjson (==3.5.4)", "orjson (==3.6.7)", "pandas (==1.1.5)", "pandas (>=1.4.0)", "preconditions", "pyarrow", "pyarrow (<3)", "pylint (==2.13.5)", "pytest-mock", "pytest-rerunfailures", "pytest-sugar (==0.9.6)", "xlrd (<2)", "xlrd (>=2.0.1)"]
compress = ["flask-compress"]
dev = ["PyYAML (>=5.4.1)", "coloredlogs (>=15.0.1)", "fire (>=0.4.0)"]
diskcache = ["diskcache (>=5.2.1)", "multiprocess (>=0.70.12)", "psutil (>=5.8.0)"]
testing = ["beautifulsoup4 (>=4.8.2)", "cryptography (<3.4)", "dash-testing-stub (>=0.0.2)", "lxml (>=4.6.2)", "multiprocess (>=0.70.12)", "percy (>=2.0.2)", "psutil (>=5.8.0)", "pytest (>=6.0.2)", "requests[security] (>=2.21.0)", "selenium (>=3.141.0,<=4.2.0)", "waitress (>=1.4.4)"]
[[package]]
name = "dash-core-components"
version = "2.0.0"
description = "Core component suite for Dash"
optional = false
python-versions = "*"
files = [
{file = "dash_core_components-2.0.0-py3-none-any.whl", hash = "sha256:52b8e8cce13b18d0802ee3acbc5e888cb1248a04968f962d63d070400af2e346"},
{file = "dash_core_components-2.0.0.tar.gz", hash = "sha256:c6733874af975e552f95a1398a16c2ee7df14ce43fa60bb3718a3c6e0b63ffee"},
]
[[package]]
name = "dash-html-components"
version = "2.0.0"
description = "Vanilla HTML components for Dash"
optional = false
python-versions = "*"
files = [
{file = "dash_html_components-2.0.0-py3-none-any.whl", hash = "sha256:b42cc903713c9706af03b3f2548bda4be7307a7cf89b7d6eae3da872717d1b63"},
{file = "dash_html_components-2.0.0.tar.gz", hash = "sha256:8703a601080f02619a6390998e0b3da4a5daabe97a1fd7a9cebc09d015f26e50"},
]
[[package]]
name = "dash-table"
version = "5.0.0"
description = "Dash table"
optional = false
python-versions = "*"
files = [
{file = "dash_table-5.0.0-py3-none-any.whl", hash = "sha256:19036fa352bb1c11baf38068ec62d172f0515f73ca3276c79dee49b95ddc16c9"},
{file = "dash_table-5.0.0.tar.gz", hash = "sha256:18624d693d4c8ef2ddec99a6f167593437a7ea0bf153aa20f318c170c5bc7308"},
]
[[package]] [[package]]
name = "dataclasses-json" name = "dataclasses-json"
version = "0.5.9" version = "0.5.9"
@ -1005,6 +1087,27 @@ typing-extensions = {version = ">=4.7.1", markers = "python_version < \"3.11\""}
docs = ["furo (>=2023.7.26)", "sphinx (>=7.1.2)", "sphinx-autodoc-typehints (>=1.24)"] docs = ["furo (>=2023.7.26)", "sphinx (>=7.1.2)", "sphinx-autodoc-typehints (>=1.24)"]
testing = ["covdefaults (>=2.3)", "coverage (>=7.3)", "diff-cover (>=7.7)", "pytest (>=7.4)", "pytest-cov (>=4.1)", "pytest-mock (>=3.11.1)", "pytest-timeout (>=2.1)"] testing = ["covdefaults (>=2.3)", "coverage (>=7.3)", "diff-cover (>=7.7)", "pytest (>=7.4)", "pytest-cov (>=4.1)", "pytest-mock (>=3.11.1)", "pytest-timeout (>=2.1)"]
[[package]]
name = "flask"
version = "2.2.5"
description = "A simple framework for building complex web applications."
optional = false
python-versions = ">=3.7"
files = [
{file = "Flask-2.2.5-py3-none-any.whl", hash = "sha256:58107ed83443e86067e41eff4631b058178191a355886f8e479e347fa1285fdf"},
{file = "Flask-2.2.5.tar.gz", hash = "sha256:edee9b0a7ff26621bd5a8c10ff484ae28737a2410d99b0bb9a6850c7fb977aa0"},
]
[package.dependencies]
click = ">=8.0"
itsdangerous = ">=2.0"
Jinja2 = ">=3.0"
Werkzeug = ">=2.2.2"
[package.extras]
async = ["asgiref (>=3.2)"]
dotenv = ["python-dotenv"]
[[package]] [[package]]
name = "frozenlist" name = "frozenlist"
version = "1.4.0" version = "1.4.0"
@ -1424,6 +1527,25 @@ files = [
{file = "idna-3.4.tar.gz", hash = "sha256:814f528e8dead7d329833b91c5faa87d60bf71824cd12a7530b5526063d02cb4"}, {file = "idna-3.4.tar.gz", hash = "sha256:814f528e8dead7d329833b91c5faa87d60bf71824cd12a7530b5526063d02cb4"},
] ]
[[package]]
name = "importlib-metadata"
version = "6.8.0"
description = "Read metadata from Python packages"
optional = false
python-versions = ">=3.8"
files = [
{file = "importlib_metadata-6.8.0-py3-none-any.whl", hash = "sha256:3ebb78df84a805d7698245025b975d9d67053cd94c79245ba4b3eb694abe68bb"},
{file = "importlib_metadata-6.8.0.tar.gz", hash = "sha256:dbace7892d8c0c4ac1ad096662232f831d4e64f4c4545bd53016a3e9d4654743"},
]
[package.dependencies]
zipp = ">=0.5"
[package.extras]
docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"]
perf = ["ipython"]
testing = ["flufl.flake8", "importlib-resources (>=1.3)", "packaging", "pyfakefs", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-mypy (>=0.9.1)", "pytest-perf (>=0.9.2)", "pytest-ruff"]
[[package]] [[package]]
name = "iniconfig" name = "iniconfig"
version = "2.0.0" version = "2.0.0"
@ -1435,6 +1557,17 @@ files = [
{file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"}, {file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"},
] ]
[[package]]
name = "itsdangerous"
version = "2.1.2"
description = "Safely pass data to untrusted environments and back."
optional = false
python-versions = ">=3.7"
files = [
{file = "itsdangerous-2.1.2-py3-none-any.whl", hash = "sha256:2c2349112351b88699d8d4b6b075022c0808887cb7ad10069318a8b0bc88db44"},
{file = "itsdangerous-2.1.2.tar.gz", hash = "sha256:5dbbc68b317e5e42f327f9021763545dc3fc3bfe22e6deb96aaf1fc38874156a"},
]
[[package]] [[package]]
name = "jinja2" name = "jinja2"
version = "3.1.2" version = "3.1.2"
@ -2515,6 +2648,21 @@ files = [
greenlet = "2.0.2" greenlet = "2.0.2"
pyee = "9.0.4" pyee = "9.0.4"
[[package]]
name = "plotly"
version = "5.17.0"
description = "An open-source, interactive data visualization library for Python"
optional = false
python-versions = ">=3.6"
files = [
{file = "plotly-5.17.0-py2.py3-none-any.whl", hash = "sha256:7c84cdf11da162423da957bb093287134f2d6f170eb9a74f1459f825892247c3"},
{file = "plotly-5.17.0.tar.gz", hash = "sha256:290d796bf7bab87aad184fe24b86096234c4c95dcca6ecbca02d02bdf17d3d97"},
]
[package.dependencies]
packaging = "*"
tenacity = ">=6.2.0"
[[package]] [[package]]
name = "pluggy" name = "pluggy"
version = "1.3.0" version = "1.3.0"
@ -4661,6 +4809,23 @@ validators = ">=0.18.2,<=0.21.0"
[package.extras] [package.extras]
grpc = ["grpcio", "grpcio-tools"] grpc = ["grpcio", "grpcio-tools"]
[[package]]
name = "werkzeug"
version = "2.2.3"
description = "The comprehensive WSGI web application library."
optional = false
python-versions = ">=3.7"
files = [
{file = "Werkzeug-2.2.3-py3-none-any.whl", hash = "sha256:56433961bc1f12533306c624f3be5e744389ac61d722175d543e1751285da612"},
{file = "Werkzeug-2.2.3.tar.gz", hash = "sha256:2e1ccc9417d4da358b9de6f174e3ac094391ea1d4fbef2d667865d819dfd0afe"},
]
[package.dependencies]
MarkupSafe = ">=2.1.1"
[package.extras]
watchdog = ["watchdog"]
[[package]] [[package]]
name = "wheel" name = "wheel"
version = "0.41.2" version = "0.41.2"
@ -4874,7 +5039,22 @@ files = [
idna = ">=2.0" idna = ">=2.0"
multidict = ">=4.0" multidict = ">=4.0"
[[package]]
name = "zipp"
version = "3.17.0"
description = "Backport of pathlib-compatible object wrapper for zip files"
optional = false
python-versions = ">=3.8"
files = [
{file = "zipp-3.17.0-py3-none-any.whl", hash = "sha256:0e923e726174922dce09c53c59ad483ff7bbb8e572e00c7f7c46b88556409f31"},
{file = "zipp-3.17.0.tar.gz", hash = "sha256:84e64a1c28cf7e91ed2078bb8cc8c259cb19b76942096c8d7b84947690cabaf0"},
]
[package.extras]
docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (<7.2.5)", "sphinx (>=3.5)", "sphinx-lint"]
testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-ignore-flaky", "pytest-mypy (>=0.9.1)", "pytest-ruff"]
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = "^3.10" python-versions = "^3.10"
content-hash = "b3f795baef70806c8dab1058b644d95f3c36b1c2fae53a7e5784a633731bc268" content-hash = "d022983c3e6224970ff0e1d90bffb2e303c6a474ff1fc87345f0227eae2c7b29"

View file

@ -40,13 +40,14 @@ weaviate-client = "^3.22.1"
python-multipart = "^0.0.6" python-multipart = "^0.0.6"
deep-translator = "^1.11.4" deep-translator = "^1.11.4"
humanize = "^4.8.0" humanize = "^4.8.0"
deepeval = "^0.20.6" deepeval = "^0.20.4"
pymupdf = "^1.23.3" pymupdf = "^1.23.3"
psycopg2 = "^2.9.8" psycopg2 = "^2.9.8"
llama-index = "^0.8.39.post2" llama-index = "^0.8.39.post2"
llama-hub = "^0.0.34" llama-hub = "^0.0.34"
sqlalchemy = "^2.0.21" sqlalchemy = "^2.0.21"
asyncpg = "^0.28.0" asyncpg = "^0.28.0"
dash = "^2.14.0"

View file

@ -1,38 +1,56 @@
import argparse
import json
from enum import Enum
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from deepeval.metrics.overall_score import OverallScoreMetric
from deepeval.test_case import LLMTestCase
from deepeval.run_test import assert_test, run_test
from gptcache.embedding import openai
from marvin import ai_classifier
import random
import itertools
import logging
import string
from enum import Enum
import openai
from deepeval.metrics.overall_score import OverallScoreMetric
from deepeval.run_test import run_test
from deepeval.test_case import LLMTestCase
from marvin import ai_classifier
from sqlalchemy.future import select
logging.basicConfig(level=logging.INFO)
import marvin
from dotenv import load_dotenv
from sqlalchemy.orm import sessionmaker
from database.database import engine # Ensure you have database engine defined somewhere
from models.user import User
from models.memory import MemoryModel
from models.sessions import Session from models.sessions import Session
from models.testset import TestSet from models.testset import TestSet
from models.testoutput import TestOutput from models.testoutput import TestOutput
from models.metadatas import MetaDatas from models.metadatas import MetaDatas
from models.operation import Operation from models.operation import Operation
from sqlalchemy.orm import sessionmaker load_dotenv()
from database.database import engine import ast
from vectorstore_manager import Memory import tracemalloc
import uuid from database.database_crud import session_scope, add_entity
from contextlib import contextmanager
from database.database import AsyncSessionLocal
from database.database_crud import session_scope
import random tracemalloc.start()
import string
import itertools import os
import logging from dotenv import load_dotenv
import dotenv import uuid
dotenv.load_dotenv()
logger = logging.getLogger(__name__) load_dotenv()
import openai
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
marvin.settings.openai.api_key = os.environ.get("OPENAI_API_KEY")
from vectordb.basevectordb import BaseMemory
from vectorstore_manager import Memory
import asyncio
from database.database_crud import session_scope
from database.database import AsyncSessionLocal
openai.api_key = os.getenv("OPENAI_API_KEY", "") openai.api_key = os.getenv("OPENAI_API_KEY", "")
async def retrieve_latest_test_case(session, user_id, memory_id): async def retrieve_latest_test_case(session, user_id, memory_id):
try: try:
# Use await with session.execute() and row.fetchone() or row.all() for async query execution # Use await with session.execute() and row.fetchone() or row.all() for async query execution
@ -43,13 +61,14 @@ async def retrieve_latest_test_case(session, user_id, memory_id):
) )
return result.scalar_one_or_none() # scalar_one_or_none() is a non-blocking call return result.scalar_one_or_none() # scalar_one_or_none() is a non-blocking call
except Exception as e: except Exception as e:
logger.error(f"An error occurred while retrieving the latest test case: {str(e)}") logging.error(f"An error occurred while retrieving the latest test case: {str(e)}")
return None return None
async def add_entity(session, entity): async def add_entity(session, entity):
async with session_scope(session) as s: # Use your async session_scope async with session_scope(session) as s: # Use your async session_scope
s.add(entity) # No need to commit; session_scope takes care of it s.add(entity) # No need to commit; session_scope takes care of it
s.commit() s.commit()
return "Successfully added entity" return "Successfully added entity"
async def retrieve_job_by_id(session, user_id, job_id): async def retrieve_job_by_id(session, user_id, job_id):
@ -61,7 +80,7 @@ async def retrieve_job_by_id(session, user_id, job_id):
) )
return result.scalar_one_or_none() return result.scalar_one_or_none()
except Exception as e: except Exception as e:
logger.error(f"An error occurred while retrieving the job: {str(e)}") logging.error(f"An error occurred while retrieving the job: {str(e)}")
return None return None
async def fetch_job_id(session, user_id=None, memory_id=None, job_id=None): async def fetch_job_id(session, user_id=None, memory_id=None, job_id=None):
@ -73,7 +92,7 @@ async def fetch_job_id(session, user_id=None, memory_id=None, job_id=None):
) )
return result.scalar_one_or_none() return result.scalar_one_or_none()
except Exception as e: except Exception as e:
logger.error(f"An error occurred: {str(e)}") logging.error(f"An error occurred: {str(e)}")
return None return None
@ -87,7 +106,7 @@ async def fetch_test_set_id(session, user_id, id):
) )
return result.scalar_one_or_none() # scalar_one_or_none() is a non-blocking call return result.scalar_one_or_none() # scalar_one_or_none() is a non-blocking call
except Exception as e: except Exception as e:
logger.error(f"An error occurred while retrieving the test set: {str(e)}") logging.error(f"An error occurred while retrieving the test set: {str(e)}")
return None return None
# Adding "embeddings" to the parameter variants function # Adding "embeddings" to the parameter variants function
@ -178,20 +197,20 @@ async def generate_chatgpt_output(query:str, context:str=None):
# print(llm_output) # print(llm_output)
return llm_output return llm_output
async def eval_test(query=None, output=None, expected_output=None, context=None): async def eval_test(query=None, output=None, expected_output=None, context=None, synthetic_test_set=False):
# query = "How does photosynthesis work?"
# output = "Photosynthesis is the process by which green plants and some other organisms use sunlight to synthesize foods with the help of chlorophyll pigment."
# expected_output = "Photosynthesis is the process by which green plants and some other organisms use sunlight to synthesize food with the help of chlorophyll pigment."
# context = "Biology"
result_output = await generate_chatgpt_output(query, context) result_output = await generate_chatgpt_output(query, context)
test_case = LLMTestCase( if synthetic_test_set:
query=query, test_case = synthetic_test_set
output=result_output, else:
expected_output=expected_output,
context=context,
) test_case = LLMTestCase(
query=query,
output=result_output,
expected_output=expected_output,
context=context,
)
metric = OverallScoreMetric() metric = OverallScoreMetric()
# If you want to run the test # If you want to run the test
@ -230,12 +249,11 @@ def data_location_route(data_string: str):
return LocationRoute(data_string).name return LocationRoute(data_string).name
def dynamic_test_manager(data, test_set=None, user=None, params=None): def dynamic_test_manager(context=None):
from deepeval.dataset import create_evaluation_query_answer_pairs from deepeval.dataset import create_evaluation_query_answer_pairs
# fetch random chunks from the document # fetch random chunks from the document
#feed them to the evaluation pipeline #feed them to the evaluation pipeline
dataset = create_evaluation_query_answer_pairs( dataset = create_evaluation_query_answer_pairs(openai_api_key=os.environ.get("OPENAI_API_KEY"), context= context ,n=10)
"Python is a great language for mathematical expression and machine learning.")
return dataset return dataset
@ -246,11 +264,12 @@ def generate_letter_uuid(length=8):
letters = string.ascii_uppercase # A-Z letters = string.ascii_uppercase # A-Z
return ''.join(random.choice(letters) for _ in range(length)) return ''.join(random.choice(letters) for _ in range(length))
async def start_test(data, test_set=None, user_id=None, params=None, job_id=None, metadata=None): async def start_test(data, test_set=None, user_id=None, params=None, job_id=None, metadata=None, generate_test_set=False, only_llm_context=False):
async with session_scope(session=AsyncSessionLocal()) as session: async with session_scope(session=AsyncSessionLocal()) as session:
memory = await Memory.create_memory(user_id, session, namespace="SEMANTICMEMORY")
job_id = await fetch_job_id(session, user_id=user_id, job_id=job_id) job_id = await fetch_job_id(session, user_id=user_id, job_id=job_id)
test_set_id = await fetch_test_set_id(session, user_id=user_id, id=job_id) test_set_id = await fetch_test_set_id(session, user_id=user_id, id=job_id)
@ -266,104 +285,113 @@ async def start_test(data, test_set=None, user_id=None, params=None, job_id=None
data_format = data_format_route(data) # Assume data_format_route is predefined data_format = data_format_route(data) # Assume data_format_route is predefined
data_location = data_location_route(data) # Assume data_location_route is predefined data_location = data_location_route(data) # Assume data_location_route is predefined
test_params = generate_param_variants( test_params = generate_param_variants(
included_params=['chunk_size', 'chunk_overlap', 'search_type']) included_params=['chunk_size'])
print("Here are the test params", str(test_params)) print("Here are the test params", str(test_params))
loader_settings = { loader_settings = {
"format": f"{data_format}", "format": f"{data_format}",
"source": f"{data_location}", "source": f"{data_location}",
"path": "https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf" "path": data
} }
async def run_test(test, loader_settings, metadata):
test_id = str(generate_letter_uuid()) + "_" + "SEMANTICEMEMORY" async def run_test(test, loader_settings, metadata, test_id=None,only_llm_context=False):
if test_id is None:
test_id = str(generate_letter_uuid()) + "_" +"SEMANTICMEMORY"
memory = await Memory.create_memory(user_id, session, namespace="SEMANTICMEMORY")
await memory.add_memory_instance("ExampleMemory") await memory.add_memory_instance("ExampleMemory")
existing_user = await Memory.check_existing_user(user_id, session) existing_user = await Memory.check_existing_user(user_id, session)
await memory.manage_memory_attributes(existing_user) await memory.manage_memory_attributes(existing_user)
test_class = test_id + "_class" test_class = test_id + "_class"
await memory.add_dynamic_memory_class(test_id.lower(), test_id) await memory.add_dynamic_memory_class(test_id.lower(), test_id)
dynamic_memory_class = getattr(memory, test_class.lower(), None) dynamic_memory_class = getattr(memory, test_class.lower(), None)
methods_to_add = ['add_memories', 'fetch_memories', 'delete_memories']
# Assuming add_method_to_class and dynamic_method_call are predefined methods in Memory class
if dynamic_memory_class is not None: if dynamic_memory_class is not None:
await memory.add_method_to_class(dynamic_memory_class, 'add_memories') for method_name in methods_to_add:
else: await memory.add_method_to_class(dynamic_memory_class, method_name)
print(f"No attribute named {test_class.lower()} in memory.") print(f"Memory method {method_name} has been added")
if dynamic_memory_class is not None:
await memory.add_method_to_class(dynamic_memory_class, 'fetch_memories')
else: else:
print(f"No attribute named {test_class.lower()} in memory.") print(f"No attribute named {test_class.lower()} in memory.")
print(f"Trying to access: {test_class.lower()}") print(f"Trying to access: {test_class.lower()}")
print("Available memory classes:", await memory.list_memory_classes()) print("Available memory classes:", await memory.list_memory_classes())
if test:
print(f"Trying to check: ", test) loader_settings.update(test)
loader_settings.update(test) test_class = test_id + "_class"
dynamic_memory_class = getattr(memory, test_class.lower(), None)
async def run_load_test_element( loader_settings=loader_settings, metadata=metadata, test_id=test_id, test_set=test_set):
async def run_load_test_element(test, loader_settings, metadata, test_id):
test_class = test_id + "_class"
# memory.test_class
await memory.add_dynamic_memory_class(test_id.lower(), test_id)
dynamic_memory_class = getattr(memory, test_class.lower(), None)
if dynamic_memory_class is not None:
await memory.add_method_to_class(dynamic_memory_class, 'add_memories')
else:
print(f"No attribute named {test_class.lower()} in memory.")
if dynamic_memory_class is not None:
await memory.add_method_to_class(dynamic_memory_class, 'fetch_memories')
else:
print(f"No attribute named {test_class.lower()} in memory.")
print(f"Trying to access: {test_class.lower()}") print(f"Trying to access: {test_class.lower()}")
print("Available memory classes:", await memory.list_memory_classes()) await memory.dynamic_method_call(dynamic_memory_class, 'add_memories',
observation='Observation loaded', params=metadata,
print(f"Trying to check: ", test)
# print("Here is the loader settings", str(loader_settings))
# print("Here is the medatadata", str(metadata))
load_action = await memory.dynamic_method_call(dynamic_memory_class, 'add_memories',
observation='some_observation', params=metadata,
loader_settings=loader_settings) loader_settings=loader_settings)
async def run_search_eval_element(test_item, test_id): return "Loaded test element"
async def run_search_element(test_item, test_id):
retrieve_action = await memory.dynamic_method_call(dynamic_memory_class, 'fetch_memories',
observation=str(test_item["question"]))
print("Here is the test result", str(retrieve_action["data"]['Get'][test_id][0]["text"]))
return retrieve_action["data"]['Get'][test_id][0]["text"]
async def run_eval(test_item, search_result):
test_eval= await eval_test(query=test_item["question"], expected_output=test_item["answer"],
context=str(search_result))
return test_eval
async def run_generate_test_set( test_id):
test_class = test_id + "_class" test_class = test_id + "_class"
await memory.add_dynamic_memory_class(test_id.lower(), test_id) # await memory.add_dynamic_memory_class(test_id.lower(), test_id)
dynamic_memory_class = getattr(memory, test_class.lower(), None) dynamic_memory_class = getattr(memory, test_class.lower(), None)
print(dynamic_memory_class)
retrieve_action = await memory.dynamic_method_call(dynamic_memory_class, 'fetch_memories', retrieve_action = await memory.dynamic_method_call(dynamic_memory_class, 'fetch_memories',
observation=test_item["question"], observation="Generate a short summary of this document",
search_type=test_item["search_type"]) search_type="generative")
test_result = await eval_test(query=test_item["question"], expected_output=test_item["answer"], return dynamic_test_manager(retrieve_action)
context=str(retrieve_action)) test_eval_pipeline =[]
print(test_result)
delete_mems = await memory.dynamic_method_call(dynamic_memory_class, 'delete_memories',
namespace=test_id)
return test_result
test_load_pipeline = await asyncio.gather(
*(run_load_test_element(test_item,loader_settings, metadata, test_id) for test_item in test_set)
)
test_eval_pipeline = await asyncio.gather(
*(run_search_eval_element(test_item, test_id) for test_item in test_set)
)
logging.info("Results of the eval pipeline %s", str(test_eval_pipeline))
await add_entity(session, TestOutput(id=test_id, user_id=user_id, test_results=str(test_eval_pipeline)))
# # Gather and run all tests in parallel if only_llm_context:
results = await asyncio.gather( for test_qa in test_set:
*(run_test(test, loader_settings, metadata) for test in test_params) context=""
) test_result = await run_eval(test_qa, context)
test_eval_pipeline.append(test_result)
if generate_test_set is True:
synthetic_test_set = run_generate_test_set(test_id)
else:
pass
if test_set:
logging.info("Loading and evaluating test set")
await run_load_test_element(loader_settings, metadata, test_id, test_set)
for test_qa in test_set:
result = await run_search_element(test_qa, test_id)
test_result = await run_eval(test_qa, result)
test_eval_pipeline.append( test_result)
else:
pass
await memory.dynamic_method_call(dynamic_memory_class, 'delete_memories',
namespace=test_id)
return test_eval_pipeline
results = []
if only_llm_context:
result = await run_test(test= None, loader_settings=loader_settings, metadata=metadata, only_llm_context=only_llm_context)
results.append(result)
for param in test_params:
result = await run_test(param, loader_settings, metadata,only_llm_context=only_llm_context)
results.append(result)
print(results)
return results return results
async def main(): async def main():
params = { metadata = {
"version": "1.0", "version": "1.0",
"agreement_id": "AG123456", "agreement_id": "AG123456",
"privacy_policy": "https://example.com/privacy", "privacy_policy": "https://example.com/privacy",
@ -399,7 +427,9 @@ async def main():
"answer": "Buck becomes the leader after defeating the original leader, Spitz, in a fight." "answer": "Buck becomes the leader after defeating the original leader, Spitz, in a fight."
} }
] ]
result = await start_test("https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf", test_set=test_set, user_id="666", params=None, metadata=params) # "https://www.ibiblio.org/ebooks/London/Call%20of%20Wild.pdf"
#http://public-library.uk/ebooks/59/83.pdf
result = await start_test("http://public-library.uk/ebooks/59/83.pdf", test_set=test_set, user_id="676", params=None, metadata=metadata)
# #
# parser = argparse.ArgumentParser(description="Run tests against a document.") # parser = argparse.ArgumentParser(description="Run tests against a document.")
# parser.add_argument("--url", required=True, help="URL of the document to test.") # parser.add_argument("--url", required=True, help="URL of the document to test.")
@ -407,9 +437,10 @@ async def main():
# parser.add_argument("--user_id", required=True, help="User ID.") # parser.add_argument("--user_id", required=True, help="User ID.")
# parser.add_argument("--params", help="Additional parameters in JSON format.") # parser.add_argument("--params", help="Additional parameters in JSON format.")
# parser.add_argument("--metadata", required=True, help="Path to JSON file containing metadata.") # parser.add_argument("--metadata", required=True, help="Path to JSON file containing metadata.")
# # parser.add_argument("--generate_test_set", required=True, help="Make a test set.")
# parser.add_argument("--only_llm_context", required=True, help="Do a test only within the existing LLM context")
# args = parser.parse_args() # args = parser.parse_args()
#
# try: # try:
# with open(args.test_set, "r") as file: # with open(args.test_set, "r") as file:
# test_set = json.load(file) # test_set = json.load(file)
@ -441,10 +472,24 @@ async def main():
# #clean up params here # #clean up params here
# await start_test(args.url, test_set, args.user_id, params=None, metadata=metadata) # await start_test(args.url, test_set, args.user_id, params=None, metadata=metadata)
if __name__ == "__main__": if __name__ == "__main__":
import asyncio
asyncio.run(main()) asyncio.run(main())
# delete_mems = await memory.dynamic_method_call(dynamic_memory_class, 'delete_memories',
# namespace=test_id)
# test_load_pipeline = await asyncio.gather(
# *(run_load_test_element(test_item,loader_settings, metadata, test_id) for test_item in test_set)
# )
#
# test_eval_pipeline = await asyncio.gather(
# *(run_search_eval_element(test_item, test_id) for test_item in test_set)
# )
# logging.info("Results of the eval pipeline %s", str(test_eval_pipeline))
# await add_entity(session, TestOutput(id=test_id, user_id=user_id, test_results=str(test_eval_pipeline)))
# return test_eval_pipeline
# # Gather and run all tests in parallel
# results = await asyncio.gather(
# *(run_testo(test, loader_settings, metadata) for test in test_params)
# )
# return results

View file

@ -202,6 +202,7 @@ class BaseMemory:
return field_mapping[field_type](**kwargs) return field_mapping[field_type](**kwargs)
# Dynamic Schema Creation # Dynamic Schema Creation
params['user_id'] = self.user_id
schema_instance = self.create_dynamic_schema(params) # Always creating Str field, adjust as needed schema_instance = self.create_dynamic_schema(params) # Always creating Str field, adjust as needed
@ -227,6 +228,10 @@ class BaseMemory:
namespace: Optional[str] = None, namespace: Optional[str] = None,
n_of_observations: Optional[int] = 2, n_of_observations: Optional[int] = 2,
): ):
logging.info(namespace)
logging.info(search_type)
logging.info(params)
logging.info(observation)
return await self.vector_db.fetch_memories( return await self.vector_db.fetch_memories(
observation=observation, search_type= search_type, params=params, observation=observation, search_type= search_type, params=params,

View file

@ -8,7 +8,7 @@ from vectordb.chunkers.chunkers import chunk_data
from llama_hub.file.base import SimpleDirectoryReader from llama_hub.file.base import SimpleDirectoryReader
import requests import requests
def _document_loader( observation: str, loader_settings: dict): async def _document_loader( observation: str, loader_settings: dict):
# Check the format of the document # Check the format of the document
document_format = loader_settings.get("format", "text") document_format = loader_settings.get("format", "text")
loader_strategy = loader_settings.get("strategy", "VANILLA") loader_strategy = loader_settings.get("strategy", "VANILLA")

View file

@ -121,17 +121,18 @@ class WeaviateVectorDB(VectorDB):
retriever = self.init_weaviate(embeddings=embeddings,namespace = namespace) retriever = self.init_weaviate(embeddings=embeddings,namespace = namespace)
if loader_settings: if loader_settings:
# Assuming _document_loader returns a list of documents # Assuming _document_loader returns a list of documents
documents = _document_loader(observation, loader_settings) documents = await _document_loader(observation, loader_settings)
logging.info("here are the docs %s", str(documents)) logging.info("here are the docs %s", str(documents))
for doc in documents: for doc in documents:
document_to_load = self._stuct(doc.page_content, params, metadata_schema_class) document_to_load = self._stuct(doc.page_content, params, metadata_schema_class)
print("here is the doc to load1", document_to_load)
logging.info("Loading document with provided loader settings %s", str(document_to_load))
retriever.add_documents([ retriever.add_documents([
Document(metadata=document_to_load[0]['metadata'], page_content=document_to_load[0]['page_content'])]) Document(metadata=document_to_load[0]['metadata'], page_content=document_to_load[0]['page_content'])])
else: else:
document_to_load = self._stuct(observation, params, metadata_schema_class) document_to_load = self._stuct(observation, params, metadata_schema_class)
print("here is the doc to load2", document_to_load) logging.info("Loading document with defautl loader settings %s", str(document_to_load))
retriever.add_documents([ retriever.add_documents([
Document(metadata=document_to_load[0]['metadata'], page_content=document_to_load[0]['page_content'])]) Document(metadata=document_to_load[0]['metadata'], page_content=document_to_load[0]['page_content'])])
@ -159,6 +160,8 @@ class WeaviateVectorDB(VectorDB):
if not namespace: if not namespace:
namespace = self.namespace namespace = self.namespace
logging.info("Query on namespace %s", namespace)
params_user_id = { params_user_id = {
"path": ["user_id"], "path": ["user_id"],
"operator": "Like", "operator": "Like",
@ -181,52 +184,52 @@ class WeaviateVectorDB(VectorDB):
n_of_observations = kwargs.get('n_of_observations', 2) n_of_observations = kwargs.get('n_of_observations', 2)
try: # try:
if search_type == 'text': if search_type == 'text':
query_output = ( query_output = (
base_query base_query
.with_near_text({"concepts": [observation]}) .with_near_text({"concepts": [observation]})
.with_autocut(n_of_observations) .with_autocut(n_of_observations)
.do() .do()
) )
elif search_type == 'hybrid': elif search_type == 'hybrid':
query_output = ( query_output = (
base_query base_query
.with_hybrid(query=observation, fusion_type=HybridFusion.RELATIVE_SCORE) .with_hybrid(query=observation, fusion_type=HybridFusion.RELATIVE_SCORE)
.with_autocut(n_of_observations) .with_autocut(n_of_observations)
.do() .do()
) )
elif search_type == 'bm25': elif search_type == 'bm25':
query_output = ( query_output = (
base_query base_query
.with_bm25(query=observation) .with_bm25(query=observation)
.with_autocut(n_of_observations) .with_autocut(n_of_observations)
.do() .do()
) )
elif search_type == 'generate': elif search_type == 'generate':
generate_prompt = kwargs.get('generate_prompt', "") generate_prompt = kwargs.get('generate_prompt', "")
query_output = ( query_output = (
base_query base_query
.with_generate(single_prompt=generate_prompt) .with_generate(single_prompt=observation)
.with_near_text({"concepts": [observation]}) .with_near_text({"concepts": [observation]})
.with_autocut(n_of_observations) .with_autocut(n_of_observations)
.do() .do()
) )
elif search_type == 'generate_grouped': elif search_type == 'generate_grouped':
generate_prompt = kwargs.get('generate_prompt', "") generate_prompt = kwargs.get('generate_prompt', "")
query_output = ( query_output = (
base_query base_query
.with_generate(grouped_task=generate_prompt) .with_generate(grouped_task=observation)
.with_near_text({"concepts": [observation]}) .with_near_text({"concepts": [observation]})
.with_autocut(n_of_observations) .with_autocut(n_of_observations)
.do() .do()
) )
else: else:
logging.error(f"Invalid search_type: {search_type}") logging.error(f"Invalid search_type: {search_type}")
return []
except Exception as e:
logging.error(f"Error executing query: {str(e)}")
return [] return []
# except Exception as e:
# logging.error(f"Error executing query: {str(e)}")
# return []
return query_output return query_output

View file

@ -342,11 +342,11 @@ async def main():
await memory.manage_memory_attributes(existing_user) await memory.manage_memory_attributes(existing_user)
# aeehuvyq_semanticememory_class # aeehuvyq_semanticememory_class
await memory.add_dynamic_memory_class('SemanticMemory', 'SEMANTICMEMORY') await memory.add_dynamic_memory_class('semanticmemory', 'SEMANTICMEMORY')
await memory.add_method_to_class(memory.semanticmemory_class, 'add_memories') await memory.add_method_to_class(memory.semanticmemory_class, 'add_memories')
await memory.add_method_to_class(memory.semanticmemory_class, 'fetch_memories') await memory.add_method_to_class(memory.semanticmemory_class, 'fetch_memories')
sss = await memory.dynamic_method_call(memory.semanticmemory_class, 'add_memories', # sss = await memory.dynamic_method_call(memory.semanticmemory_class, 'add_memories',
observation='some_observation', params=params, loader_settings=loader_settings) # observation='some_observation', params=params, loader_settings=loader_settings)
susu = await memory.dynamic_method_call(memory.semanticmemory_class, 'fetch_memories', susu = await memory.dynamic_method_call(memory.semanticmemory_class, 'fetch_memories',
observation='some_observation') observation='some_observation')
@ -362,8 +362,8 @@ async def main():
# print(sss) # print(sss)
load_jack_london = await memory._add_semantic_memory(observation = "bla", loader_settings=loader_settings, params=params) # load_jack_london = await memory._add_semantic_memory(observation = "bla", loader_settings=loader_settings, params=params)
print(load_jack_london) # print(load_jack_london)
modulator = {"relevance": 0.1, "frequency": 0.1} modulator = {"relevance": 0.1, "frequency": 0.1}