diff --git a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py index 4badb0a97..40aac7fce 100644 --- a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py +++ b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py @@ -1,12 +1,13 @@ import asyncio -from typing import List, Optional, get_type_hints from uuid import UUID, uuid4 +from typing import List, Optional, get_type_hints from sqlalchemy.orm import Mapped, mapped_column -from sqlalchemy import JSON, Column, Table, select, delete, MetaData +from sqlalchemy import JSON, Column, Table, select, delete, MetaData, text from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker from cognee.exceptions import InvalidValueError +from cognee.shared.logging_utils import get_logger from cognee.infrastructure.databases.exceptions import EntityNotFoundError from cognee.infrastructure.engine import DataPoint from cognee.infrastructure.engine.utils import parse_id @@ -20,6 +21,8 @@ from ..vector_db_interface import VectorDBInterface from .serialize_data import serialize_data from ..utils import normalize_distances +logger = get_logger("PGVectorAdapter") + class IndexSchema(DataPoint): text: str @@ -71,30 +74,23 @@ class PGVectorAdapter(SQLAlchemyAdapter, VectorDBInterface): return False async def create_collection(self, collection_name: str, payload_schema=None): - data_point_types = get_type_hints(DataPoint) - vector_size = self.embedding_engine.get_vector_size() + try: + vector_size = self.embedding_engine.get_vector_size() - if not await self.has_collection(collection_name): - - class PGVectorDataPoint(Base): - __tablename__ = collection_name - __table_args__ = {"extend_existing": True} - # PGVector requires one column to be the primary key - primary_key: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4) - id: Mapped[data_point_types["id"]] - payload = Column(JSON) - vector = Column(self.Vector(vector_size)) - - def __init__(self, id, payload, vector): - self.id = id - self.payload = payload - self.vector = vector - - async with self.engine.begin() as connection: - if len(Base.metadata.tables.keys()) > 0: - await connection.run_sync( - Base.metadata.create_all, tables=[PGVectorDataPoint.__table__] - ) + if not await self.has_collection(collection_name): + async with self.engine.begin() as connection: + create_table_sql = text(f""" + CREATE TABLE IF NOT EXISTS "{collection_name}" ( + primary_key UUID NOT NULL PRIMARY KEY, + id UUID NOT NULL, + payload JSON, + vector VECTOR({vector_size}) + ); + """) + await connection.execute(create_table_sql) + except Exception as error: + logger.error(f"An error occurred during vector collection creation: {error}") + raise error async def create_data_points(self, collection_name: str, data_points: List[DataPoint]): data_point_types = get_type_hints(DataPoint) diff --git a/cognee/modules/graph/utils/retrieve_existing_edges.py b/cognee/modules/graph/utils/retrieve_existing_edges.py index 637317a8f..13c3490a8 100644 --- a/cognee/modules/graph/utils/retrieve_existing_edges.py +++ b/cognee/modules/graph/utils/retrieve_existing_edges.py @@ -1,4 +1,4 @@ -from cognee.infrastructure.databases.graph.graph_db_interface import GraphDBInterface +from cognee.infrastructure.databases.graph import get_graph_engine from cognee.infrastructure.engine import DataPoint from cognee.modules.engine.utils import generate_node_id from cognee.shared.data_models import KnowledgeGraph @@ -7,12 +7,12 @@ from cognee.shared.data_models import KnowledgeGraph async def retrieve_existing_edges( data_chunks: list[DataPoint], chunk_graphs: list[KnowledgeGraph], - graph_engine: GraphDBInterface, ) -> dict[str, bool]: processed_nodes = {} type_node_edges = [] entity_node_edges = [] type_entity_edges = [] + graph_engine = await get_graph_engine() for index, data_chunk in enumerate(data_chunks): graph = chunk_graphs[index] diff --git a/cognee/tasks/storage/index_graph_edges.py b/cognee/tasks/storage/index_graph_edges.py index 36f15344a..852e384fa 100644 --- a/cognee/tasks/storage/index_graph_edges.py +++ b/cognee/tasks/storage/index_graph_edges.py @@ -68,6 +68,11 @@ async def index_graph_edges(): for index_name, indexable_points in index_points.items(): index_name, field_name = index_name.split(".") - await vector_engine.index_data_points(index_name, field_name, indexable_points) + + # We save the data in batches of 1000 to not put a lot of pressure on the database + for start in range(0, len(indexable_points), 1000): + batch = indexable_points[start : start + 1000] + + await vector_engine.index_data_points(index_name, field_name, batch) return None diff --git a/distributed/modal_distributed.py b/distributed/modal_distributed.py new file mode 100644 index 000000000..214d255b9 --- /dev/null +++ b/distributed/modal_distributed.py @@ -0,0 +1,282 @@ +import os +import json +import pathlib +import asyncio +from typing import Optional +from pydantic import BaseModel +from dotenv import dotenv_values +from cognee.modules.chunking.models import DocumentChunk +from modal import App, Queue, Image + +import cognee +from cognee.shared.logging_utils import get_logger +from cognee.shared.data_models import KnowledgeGraph +from cognee.modules.pipelines.operations import run_tasks +from cognee.modules.users.methods import get_default_user +from cognee.infrastructure.llm import get_max_chunk_tokens +from cognee.modules.chunking.TextChunker import TextChunker +from cognee.modules.data.methods import get_datasets_by_name +from cognee.modules.cognify.config import get_cognify_config +from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.modules.data.methods.get_dataset_data import get_dataset_data +from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver +from cognee.modules.graph.utils import deduplicate_nodes_and_edges, get_graph_from_model +from cognee.modules.pipelines.tasks import Task + + +# Global tasks +from cognee.tasks.documents import ( + classify_documents, + extract_chunks_from_documents, +) +from cognee.tasks.storage.index_data_points import index_data_points + +# Local tasks +from .tasks.extract_graph_from_data import extract_graph_from_data +from .tasks.summarize_text import summarize_text + + +# ------------------------------------------------------------------------------ +# App and Queue Initialization +# ------------------------------------------------------------------------------ + +# Initialize the Modal application +app = App("cognee_modal_distributed") +logger = get_logger("cognee_modal_distributed") + +local_env_vars = dict(dotenv_values(".env")) +logger.info("Modal deployment started with the following environmental variables:") +logger.info(json.dumps(local_env_vars, indent=4)) + +image = ( + Image.from_dockerfile(path="Dockerfile_modal", force_build=False) + .add_local_file("pyproject.toml", remote_path="/root/pyproject.toml", copy=True) + .add_local_file("poetry.lock", remote_path="/root/poetry.lock", copy=True) + .env(local_env_vars) + .poetry_install_from_file(poetry_pyproject_toml="pyproject.toml") + # .pip_install("protobuf", "h2", "neo4j", "asyncpg", "pgvector") + .add_local_python_source("../cognee") +) + + +# Create (or get) two queues: +# - graph_nodes_and_edges: Stores messages produced by the producer functions. +# - finished_producers: Keeps track of the number of finished producer jobs. +graph_nodes_and_edges = Queue.from_name("graph_nodes_and_edges", create_if_missing=True) + +finished_producers = Queue.from_name("finished_producers", create_if_missing=True) + +# ------------------------------------------------------------------------------ +# Cognee pipeline steps +# ------------------------------------------------------------------------------ + + +def add_data_to_save_queue(document_chunks: list[DocumentChunk]): + future = producer.spawn(file_name=document_name, chunk_list=event.result) + futures.append(future) + +# Preprocessing steps. This gets called in the entrypoint +async def get_preprocessing_steps(chunker=TextChunker) -> list[Task]: + preprocessing_tasks = [ + Task(classify_documents), + Task( # Extract text chunks based on the document type. + extract_chunks_from_documents, + max_chunk_size=None or get_max_chunk_tokens(), + chunker=chunker, + ), + Task( + add_data_to_save_queue, + task_config={"batch_size": 50}, + ), + ] + + return preprocessing_tasks + + +# This is the last step of the pipeline that gets executed on modal executors (functions) +async def save_data_points(data_points: list = None, data_point_connections: list = None): + data_point_connections = data_point_connections or [] + + nodes = [] + edges = [] + + added_nodes = {} + added_edges = {} + visited_properties = {} + + results = await asyncio.gather( + *[ + get_graph_from_model( + data_point, + added_nodes=added_nodes, + added_edges=added_edges, + visited_properties=visited_properties, + ) + for data_point in data_points + ] + ) + + for result_nodes, result_edges in results: + nodes.extend(result_nodes) + edges.extend(result_edges) + + nodes, edges = deduplicate_nodes_and_edges(nodes, edges) + + await index_data_points(nodes) + + graph_nodes_and_edges.put((nodes, edges + data_point_connections)) + + +# This is the pipeline for the modal executors +async def get_graph_tasks( + graph_model: BaseModel = KnowledgeGraph, + ontology_file_path: Optional[str] = None, +) -> list[Task]: + cognee_config = get_cognify_config() + + ontology_adapter = OntologyResolver(ontology_file=ontology_file_path) + + step_two_tasks = [ + Task( + extract_graph_from_data, + graph_model=graph_model, + ontology_adapter=ontology_adapter, + ), + Task( + summarize_text, + summarization_model=cognee_config.summarization_model, + ), + Task(save_data_points), + ] + + return step_two_tasks + + +# ------------------------------------------------------------------------------ +# Producer Function +# ------------------------------------------------------------------------------ + + +@app.function(image=image, timeout=86400, max_containers=100) +async def producer(file_name: str, chunk_list: list): + modal_tasks = await get_graph_tasks() + async for _ in run_tasks( + modal_tasks, data=chunk_list, pipeline_name=f"modal_execution_file_{file_name}" + ): + pass + + print(f"File execution finished: {file_name}") + + return file_name + + +# ------------------------------------------------------------------------------ +# Consumer Function +# ------------------------------------------------------------------------------ + + +@app.function(image=image, timeout=86400, max_containers=100) +async def consumer(number_of_files: int): + graph_engine = await get_graph_engine() + + while True: + if graph_nodes_and_edges.len() != 0: + nodes_and_edges = graph_nodes_and_edges.get(block=False) + if nodes_and_edges is not None: + if nodes_and_edges[0] is not None: + await graph_engine.add_nodes(nodes_and_edges[0]) + if nodes_and_edges[1] is not None: + await graph_engine.add_edges(nodes_and_edges[1]) + else: + print(f"Nodes and edges are: {nodes_and_edges}") + else: + await asyncio.sleep(5) + + number_of_finished_jobs = finished_producers.get(block=False) + + if number_of_finished_jobs == number_of_files: + # We put it back for the other consumers to see that we finished + finished_producers.put(number_of_finished_jobs) + + print("Finished processing all nodes and edges; stopping graph engine queue.") + return True + + +# ------------------------------------------------------------------------------ +# Entrypoint +# ------------------------------------------------------------------------------ + + +@app.local_entrypoint() +async def main(): + # Clear queues + graph_nodes_and_edges.clear() + finished_producers.clear() + + dataset_name = "main" + data_directory_name = ".data" + data_directory_path = os.path.join(pathlib.Path(__file__).parent, data_directory_name) + + number_of_consumers = 1 # Total number of consumer functions to spawn + batch_size = 50 # Batch size for producers + + results = [] + consumer_futures = [] + + # Delete DBs and saved files from metastore + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + # Add files to the metastore + await cognee.add(data=data_directory_path, dataset_name=dataset_name) + + user = await get_default_user() + datasets = await get_datasets_by_name(dataset_name, user.id) + documents = await get_dataset_data(dataset_id=datasets[0].id) + + print(f"We have {len(documents)} documents in the dataset.") + + preprocessing_tasks = await get_preprocessing_steps(user) + + # Start consumer functions + for _ in range(number_of_consumers): + consumer_future = consumer.spawn(number_of_files=len(documents)) + consumer_futures.append(consumer_future) + + # Process producer jobs in batches + for i in range(0, len(documents), batch_size): + batch = documents[i : i + batch_size] + futures = [] + for item in batch: + document_name = item.name + async for event in run_tasks( + preprocessing_tasks, data=[item], pipeline_name="preprocessing_steps" + ): + if ( + isinstance(event, TaskExecutionCompleted) + and event.task is extract_chunks_from_documents + ): + future = producer.spawn(file_name=document_name, chunk_list=event.result) + futures.append(future) + + batch_results = [] + for future in futures: + try: + result = future.get() + except Exception as e: + result = e + batch_results.append(result) + + results.extend(batch_results) + finished_producers.put(len(results)) + + for consumer_future in consumer_futures: + try: + print("Finished but waiting") + consumer_final = consumer_future.get() + print(f"We got all futures{consumer_final}") + except Exception as e: + print(e) + pass + + print(results) diff --git a/distributed/tasks/extract_graph_from_data.py b/distributed/tasks/extract_graph_from_data.py new file mode 100644 index 000000000..eab51a726 --- /dev/null +++ b/distributed/tasks/extract_graph_from_data.py @@ -0,0 +1,44 @@ +import asyncio +from typing import Type, List + +from pydantic import BaseModel + +from cognee.modules.graph.utils import ( + expand_with_nodes_and_edges, + retrieve_existing_edges, +) +from cognee.shared.data_models import KnowledgeGraph +from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.modules.chunking.models.DocumentChunk import DocumentChunk +from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver +from cognee.modules.data.extraction.knowledge_graph import extract_content_graph + + +async def extract_graph_from_data( + data_chunks: list[DocumentChunk], + graph_model: Type[BaseModel], + ontology_adapter: OntologyResolver = OntologyResolver(), +) -> List[DocumentChunk]: + """Extracts and integrates a knowledge graph from the text content of document chunks using a specified graph model.""" + chunk_graphs = await asyncio.gather( + *[extract_content_graph(chunk.text, graph_model) for chunk in data_chunks] + ) + graph_engine = await get_graph_engine() + + if graph_model is not KnowledgeGraph: + for chunk_index, chunk_graph in enumerate(chunk_graphs): + data_chunks[chunk_index].contains = chunk_graph + + return data_chunks + + existing_edges_map = await retrieve_existing_edges( + data_chunks, + chunk_graphs, + graph_engine, + ) + + graph_nodes, graph_edges = expand_with_nodes_and_edges( + data_chunks, chunk_graphs, ontology_adapter, existing_edges_map + ) + + return graph_nodes, graph_edges diff --git a/distributed/tasks/summarize_text.py b/distributed/tasks/summarize_text.py new file mode 100644 index 000000000..d48fc484a --- /dev/null +++ b/distributed/tasks/summarize_text.py @@ -0,0 +1,27 @@ +import asyncio +from typing import Type +from uuid import uuid5 +from pydantic import BaseModel +from cognee.tasks.summarization.models import TextSummary +from cognee.modules.data.extraction.extract_summary import extract_summary +from cognee.modules.chunking.models.DocumentChunk import DocumentChunk + + +async def summarize_text(data_chunks: list[DocumentChunk], edges: list, summarization_model: Type[BaseModel]): + if len(data_chunks) == 0: + return data_chunks + + chunk_summaries = await asyncio.gather( + *[extract_summary(chunk.text, summarization_model) for chunk in data_chunks] + ) + + summaries = [ + TextSummary( + id=uuid5(chunk.id, "TextSummary"), + made_from=chunk, + text=chunk_summaries[chunk_index].summary, + ) + for (chunk_index, chunk) in enumerate(data_chunks) + ] + + return summaries, edges diff --git a/modal_deployment.py b/modal_deployment.py index 4c2ff7d5d..3c9d0e05a 100644 --- a/modal_deployment.py +++ b/modal_deployment.py @@ -12,15 +12,15 @@ app = modal.App("cognee-runner") image = ( modal.Image.from_dockerfile(path="Dockerfile_modal", force_build=False) - .copy_local_file("pyproject.toml", "pyproject.toml") - .copy_local_file("poetry.lock", "poetry.lock") + .add_local_file("pyproject.toml", remote_path="/root/pyproject.toml", copy=True) + .add_local_file("poetry.lock", remote_path="/root/poetry.lock", copy=True) .env({"ENV": os.getenv("ENV"), "LLM_API_KEY": os.getenv("LLM_API_KEY")}) .poetry_install_from_file(poetry_pyproject_toml="pyproject.toml") .pip_install("protobuf", "h2") ) -@app.function(image=image, concurrency_limit=10) +@app.function(image=image, max_containers=4) async def entry(text: str, query: str): logger = get_logger() logger.info("Initializing Cognee")