From 5d5c395f470a4543db00983484addd0696ed769f Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Wed, 19 Mar 2025 16:02:56 +0100 Subject: [PATCH] feat: adds batch example for cognee modal --- cognee_parallel_deployment/cognee_modal.py | 69 ++++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 cognee_parallel_deployment/cognee_modal.py diff --git a/cognee_parallel_deployment/cognee_modal.py b/cognee_parallel_deployment/cognee_modal.py new file mode 100644 index 000000000..e9c0adb1d --- /dev/null +++ b/cognee_parallel_deployment/cognee_modal.py @@ -0,0 +1,69 @@ +import modal +import os +import logging +import asyncio +import cognee +import signal +import json +from dotenv import dotenv_values + +from cognee.shared.utils import setup_logging +from cognee.modules.search.types import SearchType + +logger = logging.getLogger("MODAL_DEPLOYED_INSTANCE") + +app = modal.App("cognee-runner") + +local_env_vars = dict(dotenv_values(".env")) +print("Modal deployment started with the following environmental variables:") +print(json.dumps(local_env_vars, indent=4)) + +image = ( + modal.Image.from_dockerfile(path="Dockerfile_modal", force_build=False) + .add_local_file("pyproject.toml", remote_path="/root/pyproject.toml", copy=True) + .add_local_file("poetry.lock", remote_path="/root/poetry.lock", copy=True) + .env(local_env_vars) + .poetry_install_from_file(poetry_pyproject_toml="pyproject.toml") + .pip_install("protobuf", "h2") + .add_local_python_source("cognee") +) + + +@app.function(image=image, max_containers=5) +async def entry(name: str, text: str): + setup_logging(logging.INFO) + logger.info(f"file_name: {name}") + await cognee.add(text) + + +def batch_files(file_list, batch_size): + for i in range(0, len(file_list), batch_size): + yield file_list[i : i + batch_size] + + +@app.local_entrypoint() +async def main(): + directory_name = "cognee_parallel_deployment/modal_input/" + batch_size = 10 + + files = [ + os.path.join(directory_name, f) + for f in os.listdir(directory_name) + if os.path.isfile(os.path.join(directory_name, f)) + ] + + for batch in batch_files(files, batch_size): + print("Processing batch:") + files = [] + for file_path in batch: + with open(file_path, "r") as file: + content = file.read() + # Process the content as needed. + print(f"Read data from {file_path}") + + files.append({"name": file_path, "text": content}) + print("Batch reading finished...") + tasks = [entry.remote.aio(item["name"], item["text"]) for item in files] + await asyncio.gather(*tasks) + + os.kill(os.getpid(), signal.SIGTERM)