feat: distributed cognee
This commit is contained in:
parent
90ffd513f9
commit
9197cf9d61
18 changed files with 437 additions and 305 deletions
|
|
@ -1,6 +1,5 @@
|
|||
import os
|
||||
from os import path
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
from uuid import UUID
|
||||
from typing import Optional
|
||||
from typing import AsyncGenerator, List
|
||||
|
|
@ -10,6 +9,7 @@ from sqlalchemy.orm import joinedload
|
|||
from sqlalchemy.exc import NoResultFound
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
|
||||
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
from cognee.infrastructure.databases.exceptions import EntityNotFoundError
|
||||
from cognee.modules.data.models.Data import Data
|
||||
|
||||
|
|
|
|||
|
|
@ -18,15 +18,16 @@ RUN apt-get update && apt-get install -y \
|
|||
|
||||
WORKDIR /app
|
||||
|
||||
|
||||
ENV PYTHONPATH=/app
|
||||
WORKDIR /app
|
||||
COPY pyproject.toml poetry.lock /app/
|
||||
|
||||
COPY pyproject.toml poetry.lock README.md /app/
|
||||
|
||||
RUN pip install poetry
|
||||
|
||||
RUN poetry install --all-extras --no-root --without dev
|
||||
RUN poetry config virtualenvs.create false
|
||||
|
||||
RUN poetry install --extras neo4j --extras qdrant --no-root --without dev
|
||||
|
||||
COPY cognee/ /app/cognee
|
||||
COPY README.md /app/README.md
|
||||
COPY distributed/ /app/distributed
|
||||
RUN chmod +x /app/distributed/entrypoint.sh
|
||||
|
||||
ENTRYPOINT ["/app/distributed/entrypoint.sh"]
|
||||
0
distributed/__init__.py
Normal file
0
distributed/__init__.py
Normal file
4
distributed/app.py
Normal file
4
distributed/app.py
Normal file
|
|
@ -0,0 +1,4 @@
|
|||
from modal import App
|
||||
|
||||
|
||||
app = App("cognee_modal_distributed")
|
||||
113
distributed/entrypoint.py
Normal file
113
distributed/entrypoint.py
Normal file
|
|
@ -0,0 +1,113 @@
|
|||
import pathlib
|
||||
from os import path
|
||||
|
||||
from cognee.api.v1.add import add
|
||||
from cognee.api.v1.prune import prune
|
||||
from cognee.infrastructure.llm.utils import get_max_chunk_tokens
|
||||
from cognee.modules.chunking.TextChunker import TextChunker
|
||||
from cognee.modules.chunking.models.DocumentChunk import DocumentChunk
|
||||
from cognee.modules.data.processing.document_types import Document
|
||||
from cognee.modules.pipelines.operations.run_tasks import run_tasks
|
||||
from cognee.modules.pipelines.tasks.task import Task
|
||||
from cognee.modules.users.methods.get_default_user import get_default_user
|
||||
from cognee.modules.data.methods.get_dataset_data import get_dataset_data
|
||||
from cognee.modules.data.methods.get_datasets_by_name import get_datasets_by_name
|
||||
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
from cognee.tasks.documents.classify_documents import classify_documents
|
||||
from cognee.tasks.documents.extract_chunks_from_documents import extract_chunks_from_documents
|
||||
|
||||
from distributed.app import app
|
||||
from distributed.queues import finished_jobs_queue, save_data_points_queue
|
||||
from distributed.workers.data_point_saver_worker import data_point_saver_worker
|
||||
from distributed.workers.graph_extraction_worker import graph_extraction_worker
|
||||
|
||||
logger = get_logger()
|
||||
|
||||
|
||||
@app.local_entrypoint()
|
||||
async def main():
|
||||
# Clear queues
|
||||
finished_jobs_queue.clear()
|
||||
save_data_points_queue.clear()
|
||||
|
||||
dataset_name = "main"
|
||||
data_directory_name = ".data"
|
||||
data_directory_path = path.join(pathlib.Path(__file__).parent, data_directory_name)
|
||||
|
||||
number_of_data_saving_workers = 1 # Total number of graph_extraction_worker functions to spawn
|
||||
document_batch_size = 50 # Batch size for producers
|
||||
|
||||
results = []
|
||||
consumer_futures = []
|
||||
|
||||
# Delete DBs and saved files from metastore
|
||||
await prune.prune_data()
|
||||
await prune.prune_system(metadata=True)
|
||||
|
||||
# Add files to the metastore
|
||||
await add(data=data_directory_path, dataset_name=dataset_name)
|
||||
|
||||
user = await get_default_user()
|
||||
datasets = await get_datasets_by_name(dataset_name, user.id)
|
||||
documents = await get_dataset_data(dataset_id=datasets[0].id)
|
||||
|
||||
print(f"We have {len(documents)} documents in the dataset.")
|
||||
|
||||
# Start data_point_saver_worker functions
|
||||
for _ in range(number_of_data_saving_workers):
|
||||
worker_future = data_point_saver_worker.spawn(total_number_of_workers=len(documents))
|
||||
consumer_futures.append(worker_future)
|
||||
|
||||
def process_chunks_remotely(document_chunks: list[DocumentChunk], document: Document):
|
||||
return graph_extraction_worker.spawn(
|
||||
user=user, document_name=document.name, document_chunks=document_chunks
|
||||
)
|
||||
|
||||
# Produce chunks and spawn a graph_extraction_worker job for each batch of chunks
|
||||
for i in range(0, len(documents), document_batch_size):
|
||||
batch = documents[i : i + document_batch_size]
|
||||
|
||||
producer_futures = []
|
||||
|
||||
for item in batch:
|
||||
async for run_info in run_tasks(
|
||||
[
|
||||
Task(classify_documents),
|
||||
Task(
|
||||
extract_chunks_from_documents,
|
||||
max_chunk_size=get_max_chunk_tokens(),
|
||||
chunker=TextChunker,
|
||||
),
|
||||
Task(
|
||||
process_chunks_remotely,
|
||||
document=item,
|
||||
task_config={"batch_size": 50},
|
||||
),
|
||||
],
|
||||
data=[item],
|
||||
user=user,
|
||||
pipeline_name="chunk_processing",
|
||||
):
|
||||
producer_futures.append(run_info)
|
||||
|
||||
batch_results = []
|
||||
for producer_future in producer_futures:
|
||||
try:
|
||||
result = producer_future.get()
|
||||
except Exception as e:
|
||||
result = e
|
||||
batch_results.append(result)
|
||||
|
||||
results.extend(batch_results)
|
||||
finished_jobs_queue.put(len(results))
|
||||
|
||||
for consumer_future in consumer_futures:
|
||||
try:
|
||||
print("Finished but waiting")
|
||||
consumer_final = consumer_future.get()
|
||||
print(f"We got all futures {consumer_final}")
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
|
||||
print(results)
|
||||
5
distributed/entrypoint.sh
Normal file
5
distributed/entrypoint.sh
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
#!/usr/bin/env bash
|
||||
|
||||
echo "$@"
|
||||
|
||||
exec "$@" # Runs the command passed to the entrypoint script.
|
||||
|
|
@ -1,282 +0,0 @@
|
|||
import os
|
||||
import json
|
||||
import pathlib
|
||||
import asyncio
|
||||
from typing import Optional
|
||||
from pydantic import BaseModel
|
||||
from dotenv import dotenv_values
|
||||
from cognee.modules.chunking.models import DocumentChunk
|
||||
from modal import App, Queue, Image
|
||||
|
||||
import cognee
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
from cognee.shared.data_models import KnowledgeGraph
|
||||
from cognee.modules.pipelines.operations import run_tasks
|
||||
from cognee.modules.users.methods import get_default_user
|
||||
from cognee.infrastructure.llm import get_max_chunk_tokens
|
||||
from cognee.modules.chunking.TextChunker import TextChunker
|
||||
from cognee.modules.data.methods import get_datasets_by_name
|
||||
from cognee.modules.cognify.config import get_cognify_config
|
||||
from cognee.infrastructure.databases.graph import get_graph_engine
|
||||
from cognee.modules.data.methods.get_dataset_data import get_dataset_data
|
||||
from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver
|
||||
from cognee.modules.graph.utils import deduplicate_nodes_and_edges, get_graph_from_model
|
||||
from cognee.modules.pipelines.tasks import Task
|
||||
|
||||
|
||||
# Global tasks
|
||||
from cognee.tasks.documents import (
|
||||
classify_documents,
|
||||
extract_chunks_from_documents,
|
||||
)
|
||||
from cognee.tasks.storage.index_data_points import index_data_points
|
||||
|
||||
# Local tasks
|
||||
from .tasks.extract_graph_from_data import extract_graph_from_data
|
||||
from .tasks.summarize_text import summarize_text
|
||||
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# App and Queue Initialization
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
# Initialize the Modal application
|
||||
app = App("cognee_modal_distributed")
|
||||
logger = get_logger("cognee_modal_distributed")
|
||||
|
||||
local_env_vars = dict(dotenv_values(".env"))
|
||||
logger.info("Modal deployment started with the following environmental variables:")
|
||||
logger.info(json.dumps(local_env_vars, indent=4))
|
||||
|
||||
image = (
|
||||
Image.from_dockerfile(path="Dockerfile_modal", force_build=False)
|
||||
.add_local_file("pyproject.toml", remote_path="/root/pyproject.toml", copy=True)
|
||||
.add_local_file("poetry.lock", remote_path="/root/poetry.lock", copy=True)
|
||||
.env(local_env_vars)
|
||||
.poetry_install_from_file(poetry_pyproject_toml="pyproject.toml")
|
||||
# .pip_install("protobuf", "h2", "neo4j", "asyncpg", "pgvector")
|
||||
.add_local_python_source("../cognee")
|
||||
)
|
||||
|
||||
|
||||
# Create (or get) two queues:
|
||||
# - graph_nodes_and_edges: Stores messages produced by the producer functions.
|
||||
# - finished_producers: Keeps track of the number of finished producer jobs.
|
||||
graph_nodes_and_edges = Queue.from_name("graph_nodes_and_edges", create_if_missing=True)
|
||||
|
||||
finished_producers = Queue.from_name("finished_producers", create_if_missing=True)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Cognee pipeline steps
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
|
||||
def add_data_to_save_queue(document_chunks: list[DocumentChunk]):
|
||||
future = producer.spawn(file_name=document_name, chunk_list=event.result)
|
||||
futures.append(future)
|
||||
|
||||
# Preprocessing steps. This gets called in the entrypoint
|
||||
async def get_preprocessing_steps(chunker=TextChunker) -> list[Task]:
|
||||
preprocessing_tasks = [
|
||||
Task(classify_documents),
|
||||
Task( # Extract text chunks based on the document type.
|
||||
extract_chunks_from_documents,
|
||||
max_chunk_size=None or get_max_chunk_tokens(),
|
||||
chunker=chunker,
|
||||
),
|
||||
Task(
|
||||
add_data_to_save_queue,
|
||||
task_config={"batch_size": 50},
|
||||
),
|
||||
]
|
||||
|
||||
return preprocessing_tasks
|
||||
|
||||
|
||||
# This is the last step of the pipeline that gets executed on modal executors (functions)
|
||||
async def save_data_points(data_points: list = None, data_point_connections: list = None):
|
||||
data_point_connections = data_point_connections or []
|
||||
|
||||
nodes = []
|
||||
edges = []
|
||||
|
||||
added_nodes = {}
|
||||
added_edges = {}
|
||||
visited_properties = {}
|
||||
|
||||
results = await asyncio.gather(
|
||||
*[
|
||||
get_graph_from_model(
|
||||
data_point,
|
||||
added_nodes=added_nodes,
|
||||
added_edges=added_edges,
|
||||
visited_properties=visited_properties,
|
||||
)
|
||||
for data_point in data_points
|
||||
]
|
||||
)
|
||||
|
||||
for result_nodes, result_edges in results:
|
||||
nodes.extend(result_nodes)
|
||||
edges.extend(result_edges)
|
||||
|
||||
nodes, edges = deduplicate_nodes_and_edges(nodes, edges)
|
||||
|
||||
await index_data_points(nodes)
|
||||
|
||||
graph_nodes_and_edges.put((nodes, edges + data_point_connections))
|
||||
|
||||
|
||||
# This is the pipeline for the modal executors
|
||||
async def get_graph_tasks(
|
||||
graph_model: BaseModel = KnowledgeGraph,
|
||||
ontology_file_path: Optional[str] = None,
|
||||
) -> list[Task]:
|
||||
cognee_config = get_cognify_config()
|
||||
|
||||
ontology_adapter = OntologyResolver(ontology_file=ontology_file_path)
|
||||
|
||||
step_two_tasks = [
|
||||
Task(
|
||||
extract_graph_from_data,
|
||||
graph_model=graph_model,
|
||||
ontology_adapter=ontology_adapter,
|
||||
),
|
||||
Task(
|
||||
summarize_text,
|
||||
summarization_model=cognee_config.summarization_model,
|
||||
),
|
||||
Task(save_data_points),
|
||||
]
|
||||
|
||||
return step_two_tasks
|
||||
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Producer Function
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
|
||||
@app.function(image=image, timeout=86400, max_containers=100)
|
||||
async def producer(file_name: str, chunk_list: list):
|
||||
modal_tasks = await get_graph_tasks()
|
||||
async for _ in run_tasks(
|
||||
modal_tasks, data=chunk_list, pipeline_name=f"modal_execution_file_{file_name}"
|
||||
):
|
||||
pass
|
||||
|
||||
print(f"File execution finished: {file_name}")
|
||||
|
||||
return file_name
|
||||
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Consumer Function
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
|
||||
@app.function(image=image, timeout=86400, max_containers=100)
|
||||
async def consumer(number_of_files: int):
|
||||
graph_engine = await get_graph_engine()
|
||||
|
||||
while True:
|
||||
if graph_nodes_and_edges.len() != 0:
|
||||
nodes_and_edges = graph_nodes_and_edges.get(block=False)
|
||||
if nodes_and_edges is not None:
|
||||
if nodes_and_edges[0] is not None:
|
||||
await graph_engine.add_nodes(nodes_and_edges[0])
|
||||
if nodes_and_edges[1] is not None:
|
||||
await graph_engine.add_edges(nodes_and_edges[1])
|
||||
else:
|
||||
print(f"Nodes and edges are: {nodes_and_edges}")
|
||||
else:
|
||||
await asyncio.sleep(5)
|
||||
|
||||
number_of_finished_jobs = finished_producers.get(block=False)
|
||||
|
||||
if number_of_finished_jobs == number_of_files:
|
||||
# We put it back for the other consumers to see that we finished
|
||||
finished_producers.put(number_of_finished_jobs)
|
||||
|
||||
print("Finished processing all nodes and edges; stopping graph engine queue.")
|
||||
return True
|
||||
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Entrypoint
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
|
||||
@app.local_entrypoint()
|
||||
async def main():
|
||||
# Clear queues
|
||||
graph_nodes_and_edges.clear()
|
||||
finished_producers.clear()
|
||||
|
||||
dataset_name = "main"
|
||||
data_directory_name = ".data"
|
||||
data_directory_path = os.path.join(pathlib.Path(__file__).parent, data_directory_name)
|
||||
|
||||
number_of_consumers = 1 # Total number of consumer functions to spawn
|
||||
batch_size = 50 # Batch size for producers
|
||||
|
||||
results = []
|
||||
consumer_futures = []
|
||||
|
||||
# Delete DBs and saved files from metastore
|
||||
await cognee.prune.prune_data()
|
||||
await cognee.prune.prune_system(metadata=True)
|
||||
|
||||
# Add files to the metastore
|
||||
await cognee.add(data=data_directory_path, dataset_name=dataset_name)
|
||||
|
||||
user = await get_default_user()
|
||||
datasets = await get_datasets_by_name(dataset_name, user.id)
|
||||
documents = await get_dataset_data(dataset_id=datasets[0].id)
|
||||
|
||||
print(f"We have {len(documents)} documents in the dataset.")
|
||||
|
||||
preprocessing_tasks = await get_preprocessing_steps(user)
|
||||
|
||||
# Start consumer functions
|
||||
for _ in range(number_of_consumers):
|
||||
consumer_future = consumer.spawn(number_of_files=len(documents))
|
||||
consumer_futures.append(consumer_future)
|
||||
|
||||
# Process producer jobs in batches
|
||||
for i in range(0, len(documents), batch_size):
|
||||
batch = documents[i : i + batch_size]
|
||||
futures = []
|
||||
for item in batch:
|
||||
document_name = item.name
|
||||
async for event in run_tasks(
|
||||
preprocessing_tasks, data=[item], pipeline_name="preprocessing_steps"
|
||||
):
|
||||
if (
|
||||
isinstance(event, TaskExecutionCompleted)
|
||||
and event.task is extract_chunks_from_documents
|
||||
):
|
||||
future = producer.spawn(file_name=document_name, chunk_list=event.result)
|
||||
futures.append(future)
|
||||
|
||||
batch_results = []
|
||||
for future in futures:
|
||||
try:
|
||||
result = future.get()
|
||||
except Exception as e:
|
||||
result = e
|
||||
batch_results.append(result)
|
||||
|
||||
results.extend(batch_results)
|
||||
finished_producers.put(len(results))
|
||||
|
||||
for consumer_future in consumer_futures:
|
||||
try:
|
||||
print("Finished but waiting")
|
||||
consumer_final = consumer_future.get()
|
||||
print(f"We got all futures{consumer_final}")
|
||||
except Exception as e:
|
||||
print(e)
|
||||
pass
|
||||
|
||||
print(results)
|
||||
24
distributed/modal_image.py
Normal file
24
distributed/modal_image.py
Normal file
|
|
@ -0,0 +1,24 @@
|
|||
import json
|
||||
import pathlib
|
||||
from os import path
|
||||
from modal import Image
|
||||
from logging import getLogger
|
||||
from dotenv import dotenv_values
|
||||
|
||||
logger = getLogger("modal_image_creation")
|
||||
|
||||
local_env_vars = dict(dotenv_values(".env"))
|
||||
|
||||
logger.debug("Modal deployment started with the following environmental variables:")
|
||||
logger.debug(json.dumps(local_env_vars, indent=4))
|
||||
|
||||
image = (
|
||||
Image.from_dockerfile(
|
||||
path=pathlib.Path(path.join(path.dirname(__file__), "Dockerfile")).resolve(),
|
||||
force_build=False,
|
||||
).env(local_env_vars)
|
||||
# .pip_install_from_pyproject(pyproject_toml=pathlib.Path(path.join(path.dirname(__file__), "../pyproject.toml")).resolve())
|
||||
# .poetry_install_from_file(poetry_pyproject_toml=pathlib.Path(path.join(path.dirname(__file__), "../pyproject.toml")).resolve())
|
||||
# .add_local_dir(pathlib.Path("./venv/bin").resolve(), remote_path="/app/.venv")
|
||||
# .add_local_python_source(pathlib.Path("./cognee").resolve())
|
||||
)
|
||||
10
distributed/queues.py
Normal file
10
distributed/queues.py
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
from modal import Queue
|
||||
|
||||
|
||||
# Create (or get) two queues:
|
||||
# - save_data_points_queue: Stores messages produced by the producer functions.
|
||||
# - finished_jobs_queue: Keeps track of the number of finished producer jobs.
|
||||
|
||||
save_data_points_queue = Queue.from_name("save_data_points_queue", create_if_missing=True)
|
||||
|
||||
finished_jobs_queue = Queue.from_name("finished_jobs_queue", create_if_missing=True)
|
||||
0
distributed/tasks/__init__.py
Normal file
0
distributed/tasks/__init__.py
Normal file
|
|
@ -8,7 +8,6 @@ from cognee.modules.graph.utils import (
|
|||
retrieve_existing_edges,
|
||||
)
|
||||
from cognee.shared.data_models import KnowledgeGraph
|
||||
from cognee.infrastructure.databases.graph import get_graph_engine
|
||||
from cognee.modules.chunking.models.DocumentChunk import DocumentChunk
|
||||
from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver
|
||||
from cognee.modules.data.extraction.knowledge_graph import extract_content_graph
|
||||
|
|
@ -23,7 +22,6 @@ async def extract_graph_from_data(
|
|||
chunk_graphs = await asyncio.gather(
|
||||
*[extract_content_graph(chunk.text, graph_model) for chunk in data_chunks]
|
||||
)
|
||||
graph_engine = await get_graph_engine()
|
||||
|
||||
if graph_model is not KnowledgeGraph:
|
||||
for chunk_index, chunk_graph in enumerate(chunk_graphs):
|
||||
|
|
@ -34,7 +32,6 @@ async def extract_graph_from_data(
|
|||
existing_edges_map = await retrieve_existing_edges(
|
||||
data_chunks,
|
||||
chunk_graphs,
|
||||
graph_engine,
|
||||
)
|
||||
|
||||
graph_nodes, graph_edges = expand_with_nodes_and_edges(
|
||||
|
|
|
|||
40
distributed/tasks/save_data_points.py
Normal file
40
distributed/tasks/save_data_points.py
Normal file
|
|
@ -0,0 +1,40 @@
|
|||
import asyncio
|
||||
|
||||
from cognee.modules.graph.utils import deduplicate_nodes_and_edges, get_graph_from_model
|
||||
from distributed.queues import save_data_points_queue
|
||||
|
||||
|
||||
async def save_data_points(
|
||||
data_points_and_relationships: tuple[list, list]
|
||||
):
|
||||
data_points = data_points_and_relationships[0]
|
||||
data_point_connections = data_points_and_relationships[1]
|
||||
|
||||
nodes = []
|
||||
edges = []
|
||||
|
||||
added_nodes = {}
|
||||
added_edges = {}
|
||||
visited_properties = {}
|
||||
|
||||
results = await asyncio.gather(
|
||||
*[
|
||||
get_graph_from_model(
|
||||
data_point,
|
||||
added_nodes=added_nodes,
|
||||
added_edges=added_edges,
|
||||
visited_properties=visited_properties,
|
||||
)
|
||||
for data_point in data_points
|
||||
]
|
||||
)
|
||||
|
||||
for result_nodes, result_edges in results:
|
||||
nodes.extend(result_nodes)
|
||||
edges.extend(result_edges)
|
||||
|
||||
nodes, edges = deduplicate_nodes_and_edges(nodes, edges + data_point_connections)
|
||||
|
||||
# await index_data_points(nodes)
|
||||
|
||||
save_data_points_queue.put((nodes, edges))
|
||||
|
|
@ -7,7 +7,12 @@ from cognee.modules.data.extraction.extract_summary import extract_summary
|
|||
from cognee.modules.chunking.models.DocumentChunk import DocumentChunk
|
||||
|
||||
|
||||
async def summarize_text(data_chunks: list[DocumentChunk], edges: list, summarization_model: Type[BaseModel]):
|
||||
async def summarize_text(
|
||||
data_points_and_relationships: tuple[list[DocumentChunk], list], summarization_model: Type[BaseModel]
|
||||
):
|
||||
data_chunks = data_points_and_relationships[0]
|
||||
edges = data_points_and_relationships[1]
|
||||
|
||||
if len(data_chunks) == 0:
|
||||
return data_chunks
|
||||
|
||||
|
|
|
|||
28
distributed/test.py
Normal file
28
distributed/test.py
Normal file
|
|
@ -0,0 +1,28 @@
|
|||
from modal import App
|
||||
|
||||
app = App("cognee_distributed_test")
|
||||
|
||||
|
||||
@app.function()
|
||||
def sum_distributed(numbers: list):
|
||||
result = sum(numbers)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@app.local_entrypoint()
|
||||
def main():
|
||||
sum = 0
|
||||
numbers = range(100)
|
||||
batch_size = 10
|
||||
|
||||
local_sum = sum_distributed.local(numbers=numbers)
|
||||
|
||||
print(f"Local sum: {local_sum}")
|
||||
|
||||
batches = [list(numbers[i : i + batch_size]) for i in range(0, len(numbers), batch_size)]
|
||||
|
||||
for result in sum_distributed.map(batches):
|
||||
sum += result
|
||||
|
||||
print(f"Distributed sum: {sum}")
|
||||
32
distributed/workers/data_point_saver_worker.py
Normal file
32
distributed/workers/data_point_saver_worker.py
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
import asyncio
|
||||
|
||||
from cognee.infrastructure.databases.graph.get_graph_engine import get_graph_engine
|
||||
|
||||
from distributed.app import app
|
||||
from distributed.modal_image import image
|
||||
from distributed.queues import finished_jobs_queue, save_data_points_queue
|
||||
|
||||
|
||||
@app.function(image=image, timeout=86400, max_containers=100)
|
||||
async def data_point_saver_worker(total_number_of_workers: int):
|
||||
graph_engine = await get_graph_engine()
|
||||
|
||||
while True:
|
||||
if save_data_points_queue.len() != 0:
|
||||
nodes_and_edges = save_data_points_queue.get(block=False)
|
||||
if nodes_and_edges and len(nodes_and_edges) == 2:
|
||||
await graph_engine.add_nodes(nodes_and_edges[0])
|
||||
await graph_engine.add_edges(nodes_and_edges[1])
|
||||
else:
|
||||
print(f"Nodes and edges are: {nodes_and_edges}")
|
||||
else:
|
||||
await asyncio.sleep(5)
|
||||
|
||||
number_of_finished_jobs = finished_jobs_queue.get(block=False)
|
||||
|
||||
if number_of_finished_jobs == total_number_of_workers:
|
||||
# We put it back for the other consumers to see that we finished
|
||||
finished_jobs_queue.put(number_of_finished_jobs)
|
||||
|
||||
print("Finished processing all nodes and edges; stopping graph engine queue.")
|
||||
return True
|
||||
42
distributed/workers/graph_extraction_worker.py
Normal file
42
distributed/workers/graph_extraction_worker.py
Normal file
|
|
@ -0,0 +1,42 @@
|
|||
# ------------------------------------------------------------------------------
|
||||
# Producer function that produces data points from documents and pushes them into the queue.
|
||||
# ------------------------------------------------------------------------------
|
||||
from cognee.modules.cognify.config import get_cognify_config
|
||||
from cognee.modules.pipelines.operations.run_tasks import run_tasks
|
||||
from cognee.modules.pipelines.tasks.task import Task
|
||||
from cognee.shared.data_models import KnowledgeGraph
|
||||
|
||||
from distributed.app import app
|
||||
from distributed.modal_image import image
|
||||
from distributed.tasks.summarize_text import summarize_text
|
||||
from distributed.tasks.extract_graph_from_data import extract_graph_from_data
|
||||
from distributed.tasks.save_data_points import save_data_points
|
||||
|
||||
|
||||
@app.function(image=image, timeout=86400, max_containers=100)
|
||||
async def graph_extraction_worker(user, document_name: str, document_chunks: list):
|
||||
cognee_config = get_cognify_config()
|
||||
|
||||
tasks = [
|
||||
Task(
|
||||
extract_graph_from_data,
|
||||
graph_model=KnowledgeGraph,
|
||||
),
|
||||
Task(
|
||||
summarize_text,
|
||||
summarization_model=cognee_config.summarization_model,
|
||||
),
|
||||
Task(save_data_points),
|
||||
]
|
||||
|
||||
async for _ in run_tasks(
|
||||
tasks,
|
||||
data=document_chunks,
|
||||
pipeline_name=f"modal_execution_file_{document_name}",
|
||||
user=user,
|
||||
):
|
||||
pass
|
||||
|
||||
print(f"File execution finished: {document_name}")
|
||||
|
||||
return document_name
|
||||
134
poetry.lock
generated
134
poetry.lock
generated
|
|
@ -3075,6 +3075,24 @@ grpcio = ">=1.67.1"
|
|||
protobuf = ">=5.26.1,<6.0dev"
|
||||
setuptools = "*"
|
||||
|
||||
[[package]]
|
||||
name = "grpclib"
|
||||
version = "0.4.7"
|
||||
description = "Pure-Python gRPC implementation for asyncio"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "grpclib-0.4.7.tar.gz", hash = "sha256:2988ef57c02b22b7a2e8e961792c41ccf97efc2ace91ae7a5b0de03c363823c3"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
h2 = ">=3.1.0,<5"
|
||||
multidict = "*"
|
||||
|
||||
[package.extras]
|
||||
protobuf = ["protobuf (>=3.20.0)"]
|
||||
|
||||
[[package]]
|
||||
name = "gunicorn"
|
||||
version = "20.1.0"
|
||||
|
|
@ -3113,10 +3131,9 @@ files = [
|
|||
name = "h2"
|
||||
version = "4.2.0"
|
||||
description = "Pure-Python HTTP/2 protocol implementation"
|
||||
optional = true
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"qdrant\""
|
||||
files = [
|
||||
{file = "h2-4.2.0-py3-none-any.whl", hash = "sha256:479a53ad425bb29af087f3458a61d30780bc818e4ebcf01f0b536ba916462ed0"},
|
||||
{file = "h2-4.2.0.tar.gz", hash = "sha256:c8a52129695e88b1a0578d8d2cc6842bbd79128ac685463b887ee278126ad01f"},
|
||||
|
|
@ -3147,10 +3164,9 @@ test = ["eth_utils (>=2.0.0)", "hypothesis (>=3.44.24,<=6.31.6)", "pytest (>=7.0
|
|||
name = "hpack"
|
||||
version = "4.1.0"
|
||||
description = "Pure-Python HPACK header encoding"
|
||||
optional = true
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"qdrant\""
|
||||
files = [
|
||||
{file = "hpack-4.1.0-py3-none-any.whl", hash = "sha256:157ac792668d995c657d93111f46b4535ed114f0c9c8d672271bbec7eae1b496"},
|
||||
{file = "hpack-4.1.0.tar.gz", hash = "sha256:ec5eca154f7056aa06f196a557655c5b009b382873ac8d1e66e79e87535f1dca"},
|
||||
|
|
@ -3395,10 +3411,9 @@ tests = ["freezegun", "pytest", "pytest-cov"]
|
|||
name = "hyperframe"
|
||||
version = "6.1.0"
|
||||
description = "Pure-Python HTTP/2 framing"
|
||||
optional = true
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"qdrant\""
|
||||
files = [
|
||||
{file = "hyperframe-6.1.0-py3-none-any.whl", hash = "sha256:b03380493a519fce58ea5af42e4a42317bf9bd425596f7a0835ffce80f1a42e5"},
|
||||
{file = "hyperframe-6.1.0.tar.gz", hash = "sha256:f630908a00854a7adeabd6382b43923a4c4cd4b821fcb527e6ab9e15382a3b08"},
|
||||
|
|
@ -5811,6 +5826,34 @@ plot = ["matplotlib (==3.10.0)", "pandas (==2.2.3)"]
|
|||
test = ["pytest (==8.3.4)", "pytest-sugar (==1.0.0)"]
|
||||
type = ["mypy (==1.14.1)"]
|
||||
|
||||
[[package]]
|
||||
name = "modal"
|
||||
version = "0.74.15"
|
||||
description = "Python client library for Modal"
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "modal-0.74.15-py3-none-any.whl", hash = "sha256:084e898ab202ccd698fd277d9dc9e9cec8d4b0954a1c09d4ba529f0446ab3526"},
|
||||
{file = "modal-0.74.15.tar.gz", hash = "sha256:95512811ebd42a52fa03724f60d0d1c32259788351e798d0d695974d94b2e49c"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
aiohttp = "*"
|
||||
certifi = "*"
|
||||
click = ">=8.1.0"
|
||||
fastapi = "*"
|
||||
grpclib = "0.4.7"
|
||||
protobuf = ">=3.19,<4.24.0 || >4.24.0,<6.0"
|
||||
rich = ">=12.0.0"
|
||||
synchronicity = ">=0.9.10,<0.10.0"
|
||||
toml = "*"
|
||||
typer = ">=0.9"
|
||||
types-certifi = "*"
|
||||
types-toml = "*"
|
||||
typing_extensions = ">=4.6,<5.0"
|
||||
watchfiles = "*"
|
||||
|
||||
[[package]]
|
||||
name = "monotonic"
|
||||
version = "1.6"
|
||||
|
|
@ -7524,10 +7567,9 @@ testing = ["google-api-core (>=1.31.5)"]
|
|||
name = "protobuf"
|
||||
version = "5.29.4"
|
||||
description = ""
|
||||
optional = true
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
groups = ["main"]
|
||||
markers = "python_version == \"3.10\" and extra == \"codegraph\" or (extra == \"chromadb\" or extra == \"qdrant\" or extra == \"weaviate\" or extra == \"deepeval\" or extra == \"gemini\" or extra == \"milvus\") and python_version < \"3.11\" or (python_version == \"3.12\" or extra == \"gemini\" or extra == \"qdrant\" or extra == \"weaviate\" or extra == \"chromadb\" or extra == \"deepeval\" or extra == \"milvus\") and (extra == \"codegraph\" or extra == \"gemini\" or extra == \"chromadb\" or extra == \"qdrant\" or extra == \"weaviate\" or extra == \"deepeval\" or extra == \"milvus\") and python_version >= \"3.12\" or python_version == \"3.11\" and (extra == \"codegraph\" or extra == \"gemini\" or extra == \"chromadb\" or extra == \"qdrant\" or extra == \"weaviate\" or extra == \"deepeval\" or extra == \"milvus\")"
|
||||
files = [
|
||||
{file = "protobuf-5.29.4-cp310-abi3-win32.whl", hash = "sha256:13eb236f8eb9ec34e63fc8b1d6efd2777d062fa6aaa68268fb67cf77f6839ad7"},
|
||||
{file = "protobuf-5.29.4-cp310-abi3-win_amd64.whl", hash = "sha256:bcefcdf3976233f8a502d265eb65ea740c989bacc6c30a58290ed0e519eb4b8d"},
|
||||
|
|
@ -9832,6 +9874,25 @@ files = [
|
|||
{file = "shiboken6-6.9.0-cp39-abi3-win_arm64.whl", hash = "sha256:24f53857458881b54798d7e35704611d07f6b6885bcdf80f13a4c8bb485b8df2"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sigtools"
|
||||
version = "4.0.1"
|
||||
description = "Utilities for working with inspect.Signature objects."
|
||||
optional = false
|
||||
python-versions = ">=3.6"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "sigtools-4.0.1-py2.py3-none-any.whl", hash = "sha256:d216b4cf920bbab0fce636ddc429ed8463a5b533d9e1492acb45a2a1bc36ac6c"},
|
||||
{file = "sigtools-4.0.1.tar.gz", hash = "sha256:4b8e135a9cd4d2ea00da670c093372d74e672ba3abb87f4c98d8e73dea54445c"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
attrs = "*"
|
||||
|
||||
[package.extras]
|
||||
test = ["coverage", "mock", "repeated-test (>=2.2.1)", "sphinx"]
|
||||
tests = ["coverage", "mock", "repeated-test (>=2.2.1)", "sphinx"]
|
||||
|
||||
[[package]]
|
||||
name = "simplejson"
|
||||
version = "3.20.1"
|
||||
|
|
@ -10215,6 +10276,22 @@ mpmath = ">=1.1.0,<1.4"
|
|||
[package.extras]
|
||||
dev = ["hypothesis (>=6.70.0)", "pytest (>=7.1.0)"]
|
||||
|
||||
[[package]]
|
||||
name = "synchronicity"
|
||||
version = "0.9.11"
|
||||
description = "Export blocking and async library versions from a single async implementation"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "synchronicity-0.9.11-py3-none-any.whl", hash = "sha256:231129654d2f56b1aa148e85ebd8545231be135771f6d2196d414175b1594ef6"},
|
||||
{file = "synchronicity-0.9.11.tar.gz", hash = "sha256:cb5dbbcb43d637e516ae50db05a776da51a705d1e1a9c0e301f6049afc3c2cae"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
sigtools = ">=4.0.1"
|
||||
typing-extensions = ">=4.12.2"
|
||||
|
||||
[[package]]
|
||||
name = "tabulate"
|
||||
version = "0.9.0"
|
||||
|
|
@ -10381,6 +10458,18 @@ dev = ["tokenizers[testing]"]
|
|||
docs = ["setuptools-rust", "sphinx", "sphinx-rtd-theme"]
|
||||
testing = ["black (==22.3)", "datasets", "numpy", "pytest", "requests", "ruff"]
|
||||
|
||||
[[package]]
|
||||
name = "toml"
|
||||
version = "0.10.2"
|
||||
description = "Python Library for Tom's Obvious, Minimal Language"
|
||||
optional = false
|
||||
python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b"},
|
||||
{file = "toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tomli"
|
||||
version = "2.2.1"
|
||||
|
|
@ -10701,6 +10790,18 @@ rich = ">=10.11.0"
|
|||
shellingham = ">=1.3.0"
|
||||
typing-extensions = ">=3.7.4.3"
|
||||
|
||||
[[package]]
|
||||
name = "types-certifi"
|
||||
version = "2021.10.8.3"
|
||||
description = "Typing stubs for certifi"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "types-certifi-2021.10.8.3.tar.gz", hash = "sha256:72cf7798d165bc0b76e1c10dd1ea3097c7063c42c21d664523b928e88b554a4f"},
|
||||
{file = "types_certifi-2021.10.8.3-py3-none-any.whl", hash = "sha256:b2d1e325e69f71f7c78e5943d410e650b4707bb0ef32e4ddf3da37f54176e88a"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "types-python-dateutil"
|
||||
version = "2.9.0.20241206"
|
||||
|
|
@ -10728,6 +10829,18 @@ files = [
|
|||
[package.dependencies]
|
||||
setuptools = "*"
|
||||
|
||||
[[package]]
|
||||
name = "types-toml"
|
||||
version = "0.10.8.20240310"
|
||||
description = "Typing stubs for toml"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "types-toml-0.10.8.20240310.tar.gz", hash = "sha256:3d41501302972436a6b8b239c850b26689657e25281b48ff0ec06345b8830331"},
|
||||
{file = "types_toml-0.10.8.20240310-py3-none-any.whl", hash = "sha256:627b47775d25fa29977d9c70dc0cbab3f314f32c8d8d0c012f2ef5de7aaec05d"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "typing-extensions"
|
||||
version = "4.12.2"
|
||||
|
|
@ -11172,10 +11285,9 @@ watchmedo = ["PyYAML (>=3.10)"]
|
|||
name = "watchfiles"
|
||||
version = "1.0.5"
|
||||
description = "Simple, modern and high performance file watching and code reload in python."
|
||||
optional = true
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
markers = "extra == \"chromadb\""
|
||||
files = [
|
||||
{file = "watchfiles-1.0.5-cp310-cp310-macosx_10_12_x86_64.whl", hash = "sha256:5c40fe7dd9e5f81e0847b1ea64e1f5dd79dd61afbedb57759df06767ac719b40"},
|
||||
{file = "watchfiles-1.0.5-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:8c0db396e6003d99bb2d7232c957b5f0b5634bbd1b24e381a5afcc880f7373fb"},
|
||||
|
|
@ -11737,4 +11849,4 @@ weaviate = ["weaviate-client"]
|
|||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = ">=3.10,<=3.13"
|
||||
content-hash = "15fe7b2b02efa88fb8070dcde58f32cdc1577df7966e4cd438cbb4f197935e1f"
|
||||
content-hash = "795d545668c7ef4e056cc33ca5ee5b39ca6f61de4a42eb6f514270358e85e507"
|
||||
|
|
|
|||
|
|
@ -88,6 +88,7 @@ pyside6 = {version = "^6.8.3", optional = true}
|
|||
google-generativeai = {version = "^0.8.4", optional = true}
|
||||
notebook = {version = "^7.1.0", optional = true}
|
||||
s3fs = "^2025.3.2"
|
||||
modal = "^0.74.15"
|
||||
|
||||
|
||||
[tool.poetry.extras]
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue