From af276b899947af2189fbb13ca56c61f50f82c6aa Mon Sep 17 00:00:00 2001 From: Igor Ilic <30923996+dexters1@users.noreply.github.com> Date: Thu, 17 Apr 2025 14:02:12 +0200 Subject: [PATCH] feat: Add initial cognee pipeline simplification [COG-1705] (#670) ## Description Simplify Cognee pipeline usage for users ## DCO Affirmation I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin --- cognee/api/v1/add/add.py | 38 +---- cognee/api/v1/cognify/code_graph_pipeline.py | 6 +- cognee/api/v1/cognify/cognify.py | 88 +----------- .../corpus_builder/corpus_builder_executor.py | 3 +- cognee/modules/pipelines/__init__.py | 1 + .../modules/pipelines/operations/__init__.py | 1 + .../modules/pipelines/operations/pipeline.py | 134 ++++++++++++++++++ .../modules/pipelines/operations/run_tasks.py | 9 +- .../methods/check_permission_on_documents.py | 4 + cognee/tasks/ingestion/ingest_data.py | 13 +- cognee/tasks/summarization/summarize_text.py | 9 +- 11 files changed, 179 insertions(+), 127 deletions(-) create mode 100644 cognee/modules/pipelines/operations/pipeline.py diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index b1c850965..409d42548 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -1,15 +1,8 @@ from typing import Union, BinaryIO from cognee.modules.users.models import User -from cognee.modules.users.methods import get_default_user -from cognee.modules.pipelines import run_tasks, Task +from cognee.modules.pipelines import Task from cognee.tasks.ingestion import ingest_data, resolve_data_directories -from cognee.infrastructure.databases.relational import ( - create_db_and_tables as create_relational_db_and_tables, -) -from cognee.infrastructure.databases.vector.pgvector import ( - create_db_and_tables as create_pgvector_db_and_tables, -) -from uuid import uuid5, NAMESPACE_OID +from cognee.modules.pipelines import cognee_pipeline async def add( @@ -17,31 +10,8 @@ async def add( dataset_name: str = "main_dataset", user: User = None, ): - # Create tables for databases - await create_relational_db_and_tables() - await create_pgvector_db_and_tables() - - # Initialize first_run attribute if it doesn't exist - if not hasattr(add, "first_run"): - add.first_run = True - - if add.first_run: - from cognee.infrastructure.llm.utils import test_llm_connection, test_embedding_connection - - # Test LLM and Embedding configuration once before running Cognee - await test_llm_connection() - await test_embedding_connection() - add.first_run = False # Update flag after first run - - if user is None: - user = await get_default_user() - tasks = [Task(resolve_data_directories), Task(ingest_data, dataset_name, user)] - dataset_id = uuid5(NAMESPACE_OID, dataset_name) - pipeline = run_tasks( - tasks=tasks, dataset_id=dataset_id, data=data, pipeline_name="add_pipeline" + await cognee_pipeline( + tasks=tasks, datasets=dataset_name, data=data, user=user, pipeline_name="add_pipeline" ) - - async for pipeline_status in pipeline: - print(f"Pipeline run status: {pipeline_status.pipeline_name} - {pipeline_status.status}") diff --git a/cognee/api/v1/cognify/code_graph_pipeline.py b/cognee/api/v1/cognify/code_graph_pipeline.py index f64cf292c..4315dba20 100644 --- a/cognee/api/v1/cognify/code_graph_pipeline.py +++ b/cognee/api/v1/cognify/code_graph_pipeline.py @@ -70,11 +70,13 @@ async def run_code_graph_pipeline(repo_path, include_docs=False): dataset_id = uuid5(NAMESPACE_OID, "codebase") if include_docs: - non_code_pipeline_run = run_tasks(non_code_tasks, dataset_id, repo_path, "cognify_pipeline") + non_code_pipeline_run = run_tasks( + non_code_tasks, dataset_id, repo_path, user, "cognify_pipeline" + ) async for run_status in non_code_pipeline_run: yield run_status - async for run_status in run_tasks(tasks, dataset_id, repo_path, "cognify_code_pipeline"): + async for run_status in run_tasks(tasks, dataset_id, repo_path, user, "cognify_code_pipeline"): yield run_status diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index 8b38e5304..81a2062f6 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -1,20 +1,11 @@ import asyncio from cognee.shared.logging_utils import get_logger from typing import Union, Optional - from pydantic import BaseModel from cognee.infrastructure.llm import get_max_chunk_tokens from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver -from cognee.modules.cognify.config import get_cognify_config -from cognee.modules.data.methods import get_datasets, get_datasets_by_name -from cognee.modules.data.methods.get_dataset_data import get_dataset_data -from cognee.modules.data.models import Data, Dataset -from cognee.modules.pipelines import run_tasks -from cognee.modules.pipelines.models import PipelineRunStatus -from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status from cognee.modules.pipelines.tasks.task import Task -from cognee.modules.users.methods import get_default_user from cognee.modules.users.models import User from cognee.shared.data_models import KnowledgeGraph from cognee.tasks.documents import ( @@ -26,6 +17,7 @@ from cognee.tasks.graph import extract_graph_from_data from cognee.tasks.storage import add_data_points from cognee.tasks.summarization import summarize_text from cognee.modules.chunking.TextChunker import TextChunker +from cognee.modules.pipelines import cognee_pipeline logger = get_logger("cognify") @@ -36,73 +28,13 @@ async def cognify( datasets: Union[str, list[str]] = None, user: User = None, graph_model: BaseModel = KnowledgeGraph, - tasks: list[Task] = None, + chunker=TextChunker, + chunk_size: int = None, ontology_file_path: Optional[str] = None, ): - if user is None: - user = await get_default_user() + tasks = await get_default_tasks(user, graph_model, chunker, chunk_size, ontology_file_path) - existing_datasets = await get_datasets(user.id) - - if datasets is None or len(datasets) == 0: - # If no datasets are provided, cognify all existing datasets. - datasets = existing_datasets - - if isinstance(datasets[0], str): - datasets = await get_datasets_by_name(datasets, user.id) - - existing_datasets_map = { - generate_dataset_name(dataset.name): True for dataset in existing_datasets - } - - awaitables = [] - - if tasks is None: - tasks = await get_default_tasks(user, graph_model, ontology_file_path=ontology_file_path) - - for dataset in datasets: - dataset_name = generate_dataset_name(dataset.name) - - if dataset_name in existing_datasets_map: - awaitables.append(run_cognify_pipeline(dataset, user, tasks)) - - return await asyncio.gather(*awaitables) - - -async def run_cognify_pipeline(dataset: Dataset, user: User, tasks: list[Task]): - data_documents: list[Data] = await get_dataset_data(dataset_id=dataset.id) - - dataset_id = dataset.id - dataset_name = generate_dataset_name(dataset.name) - - # async with update_status_lock: TODO: Add UI lock to prevent multiple backend requests - task_status = await get_pipeline_status([dataset_id]) - - if ( - str(dataset_id) in task_status - and task_status[str(dataset_id)] == PipelineRunStatus.DATASET_PROCESSING_STARTED - ): - logger.info("Dataset %s is already being processed.", dataset_name) - return - - if not isinstance(tasks, list): - raise ValueError("Tasks must be a list") - - for task in tasks: - if not isinstance(task, Task): - raise ValueError(f"Task {task} is not an instance of Task") - - pipeline_run = run_tasks(tasks, dataset.id, data_documents, "cognify_pipeline") - pipeline_run_status = None - - async for run_status in pipeline_run: - pipeline_run_status = run_status - - return pipeline_run_status - - -def generate_dataset_name(dataset_name: str) -> str: - return dataset_name.replace(".", "_").replace(" ", "_") + return await cognee_pipeline(tasks=tasks, datasets=datasets, user=user) async def get_default_tasks( # TODO: Find out a better way to do this (Boris's comment) @@ -112,13 +44,6 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's chunk_size: int = None, ontology_file_path: Optional[str] = None, ) -> list[Task]: - if user is None: - user = await get_default_user() - - cognee_config = get_cognify_config() - - ontology_adapter = OntologyResolver(ontology_file=ontology_file_path) - default_tasks = [ Task(classify_documents), Task(check_permissions_on_documents, user=user, permissions=["write"]), @@ -130,12 +55,11 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's Task( extract_graph_from_data, graph_model=graph_model, - ontology_adapter=ontology_adapter, + ontology_adapter=OntologyResolver(ontology_file=ontology_file_path), task_config={"batch_size": 10}, ), # Generate knowledge graphs from the document chunks. Task( summarize_text, - summarization_model=cognee_config.summarization_model, task_config={"batch_size": 10}, ), Task(add_data_points, task_config={"batch_size": 10}), diff --git a/cognee/eval_framework/corpus_builder/corpus_builder_executor.py b/cognee/eval_framework/corpus_builder/corpus_builder_executor.py index a5a7d4164..e88db8ac4 100644 --- a/cognee/eval_framework/corpus_builder/corpus_builder_executor.py +++ b/cognee/eval_framework/corpus_builder/corpus_builder_executor.py @@ -5,6 +5,7 @@ from typing import Optional, Tuple, List, Dict, Union, Any, Callable, Awaitable from cognee.eval_framework.benchmark_adapters.benchmark_adapters import BenchmarkAdapter from cognee.modules.chunking.TextChunker import TextChunker from cognee.modules.pipelines.tasks.task import Task +from cognee.modules.pipelines import cognee_pipeline logger = get_logger(level=ERROR) @@ -60,4 +61,4 @@ class CorpusBuilderExecutor: await cognee.add(self.raw_corpus) tasks = await self.task_getter(chunk_size=chunk_size, chunker=chunker) - await cognee.cognify(tasks=tasks) + await cognee_pipeline(tasks=tasks) diff --git a/cognee/modules/pipelines/__init__.py b/cognee/modules/pipelines/__init__.py index 52a0942a6..036b08f1e 100644 --- a/cognee/modules/pipelines/__init__.py +++ b/cognee/modules/pipelines/__init__.py @@ -1,3 +1,4 @@ from .tasks.task import Task from .operations.run_tasks import run_tasks from .operations.run_parallel import run_tasks_parallel +from .operations.pipeline import cognee_pipeline diff --git a/cognee/modules/pipelines/operations/__init__.py b/cognee/modules/pipelines/operations/__init__.py index 5522976ba..5a9d8c447 100644 --- a/cognee/modules/pipelines/operations/__init__.py +++ b/cognee/modules/pipelines/operations/__init__.py @@ -1,3 +1,4 @@ from .log_pipeline_run_start import log_pipeline_run_start from .log_pipeline_run_complete import log_pipeline_run_complete from .log_pipeline_run_error import log_pipeline_run_error +from .pipeline import cognee_pipeline diff --git a/cognee/modules/pipelines/operations/pipeline.py b/cognee/modules/pipelines/operations/pipeline.py new file mode 100644 index 000000000..77e8c2c7e --- /dev/null +++ b/cognee/modules/pipelines/operations/pipeline.py @@ -0,0 +1,134 @@ +import asyncio +from cognee.shared.logging_utils import get_logger +from typing import Union +from uuid import uuid5, NAMESPACE_OID + +from cognee.modules.data.methods import get_datasets, get_datasets_by_name +from cognee.modules.data.methods.get_dataset_data import get_dataset_data +from cognee.modules.data.models import Data, Dataset +from cognee.modules.pipelines.operations.run_tasks import run_tasks +from cognee.modules.pipelines.models import PipelineRunStatus +from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status +from cognee.modules.pipelines.tasks.task import Task +from cognee.modules.users.methods import get_default_user +from cognee.modules.users.models import User + +from cognee.infrastructure.databases.relational import ( + create_db_and_tables as create_relational_db_and_tables, +) +from cognee.infrastructure.databases.vector.pgvector import ( + create_db_and_tables as create_pgvector_db_and_tables, +) + +logger = get_logger("cognee.pipeline") + +update_status_lock = asyncio.Lock() + + +async def cognee_pipeline( + tasks: list[Task], + data=None, + datasets: Union[str, list[str]] = None, + user: User = None, + pipeline_name: str = "custom_pipeline", +): + # Create tables for databases + await create_relational_db_and_tables() + await create_pgvector_db_and_tables() + + # Initialize first_run attribute if it doesn't exist + if not hasattr(cognee_pipeline, "first_run"): + cognee_pipeline.first_run = True + + if cognee_pipeline.first_run: + from cognee.infrastructure.llm.utils import test_llm_connection, test_embedding_connection + + # Test LLM and Embedding configuration once before running Cognee + await test_llm_connection() + await test_embedding_connection() + cognee_pipeline.first_run = False # Update flag after first run + + # If no user is provided use default user + if user is None: + user = await get_default_user() + + # Convert datasets to list in case it's a string + if isinstance(datasets, str): + datasets = [datasets] + + # If no datasets are provided, work with all existing datasets. + existing_datasets = await get_datasets(user.id) + if datasets is None or len(datasets) == 0: + datasets = existing_datasets + if isinstance(datasets[0], str): + datasets = await get_datasets_by_name(datasets, user.id) + else: + # Try to get datasets objects from database, if they don't exist use dataset name + datasets_names = await get_datasets_by_name(datasets, user.id) + if datasets_names: + datasets = datasets_names + + awaitables = [] + + for dataset in datasets: + awaitables.append( + run_pipeline( + dataset=dataset, user=user, tasks=tasks, data=data, pipeline_name=pipeline_name + ) + ) + + return await asyncio.gather(*awaitables) + + +async def run_pipeline( + dataset: Dataset, + user: User, + tasks: list[Task], + data=None, + pipeline_name: str = "custom_pipeline", +): + if isinstance(dataset, Dataset): + check_dataset_name(dataset.name) + dataset_id = dataset.id + elif isinstance(dataset, str): + check_dataset_name(dataset) + # Generate id based on unique dataset_id formula + dataset_id = uuid5(NAMESPACE_OID, f"{dataset}{str(user.id)}") + + if not data: + data: list[Data] = await get_dataset_data(dataset_id=dataset_id) + + # async with update_status_lock: TODO: Add UI lock to prevent multiple backend requests + if isinstance(dataset, Dataset): + task_status = await get_pipeline_status([dataset_id]) + else: + task_status = [ + PipelineRunStatus.DATASET_PROCESSING_COMPLETED + ] # TODO: this is a random assignment, find permanent solution + + if ( + str(dataset_id) in task_status + and task_status[str(dataset_id)] == PipelineRunStatus.DATASET_PROCESSING_STARTED + ): + logger.info("Dataset %s is already being processed.", dataset_id) + return + + if not isinstance(tasks, list): + raise ValueError("Tasks must be a list") + + for task in tasks: + if not isinstance(task, Task): + raise ValueError(f"Task {task} is not an instance of Task") + + pipeline_run = run_tasks(tasks, dataset_id, data, user, pipeline_name) + pipeline_run_status = None + + async for run_status in pipeline_run: + pipeline_run_status = run_status + + return pipeline_run_status + + +def check_dataset_name(dataset_name: str) -> str: + if "." in dataset_name or " " in dataset_name: + raise ValueError("Dataset name cannot contain spaces or underscores") diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index e87259753..71f22ac48 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -20,13 +20,11 @@ from ..tasks.task import Task logger = get_logger("run_tasks(tasks: [Task], data)") -async def run_tasks_with_telemetry(tasks: list[Task], data, pipeline_name: str): +async def run_tasks_with_telemetry(tasks: list[Task], data, user: User, pipeline_name: str): config = get_current_settings() logger.debug("\nRunning pipeline with configuration:\n%s\n", json.dumps(config, indent=1)) - user = await get_default_user() - try: logger.info("Pipeline run started: `%s`", pipeline_name) send_telemetry( @@ -72,6 +70,7 @@ async def run_tasks( tasks: list[Task], dataset_id: UUID = uuid4(), data: Any = None, + user: User = None, pipeline_name: str = "unknown_pipeline", ): pipeline_id = uuid5(NAMESPACE_OID, pipeline_name) @@ -82,7 +81,9 @@ async def run_tasks( pipeline_run_id = pipeline_run.pipeline_run_id try: - async for _ in run_tasks_with_telemetry(tasks, data, pipeline_id): + async for _ in run_tasks_with_telemetry( + tasks=tasks, data=data, user=user, pipeline_name=pipeline_id + ): pass yield await log_pipeline_run_complete( diff --git a/cognee/modules/users/permissions/methods/check_permission_on_documents.py b/cognee/modules/users/permissions/methods/check_permission_on_documents.py index 34cb96e57..b2386b06a 100644 --- a/cognee/modules/users/permissions/methods/check_permission_on_documents.py +++ b/cognee/modules/users/permissions/methods/check_permission_on_documents.py @@ -1,4 +1,5 @@ from cognee.shared.logging_utils import get_logger +from cognee.modules.users.methods import get_default_user from uuid import UUID from sqlalchemy import select from sqlalchemy.orm import joinedload @@ -13,6 +14,9 @@ logger = get_logger() async def check_permission_on_documents(user: User, permission_type: str, document_ids: list[UUID]): + if user is None: + user = await get_default_user() + # TODO: Enable user role permissions again. Temporarily disabled during rework. # user_roles_ids = [role.id for role in user.roles] user_roles_ids = [] diff --git a/cognee/tasks/ingestion/ingest_data.py b/cognee/tasks/ingestion/ingest_data.py index fe8a81be0..3608ed4e6 100644 --- a/cognee/tasks/ingestion/ingest_data.py +++ b/cognee/tasks/ingestion/ingest_data.py @@ -5,6 +5,7 @@ import s3fs import cognee.modules.ingestion as ingestion from cognee.infrastructure.databases.relational import get_relational_engine from cognee.modules.data.methods import create_dataset, get_dataset_data, get_datasets_by_name +from cognee.modules.users.methods import get_default_user from cognee.modules.data.models.DatasetData import DatasetData from cognee.modules.users.models import User from cognee.modules.users.permissions.methods import give_permission_on_document @@ -20,6 +21,9 @@ from cognee.api.v1.add.config import get_s3_config async def ingest_data(data: Any, dataset_name: str, user: User): destination = get_dlt_destination() + if not user: + user = await get_default_user() + pipeline = dlt.pipeline( pipeline_name="metadata_extraction_pipeline", destination=destination, @@ -169,7 +173,10 @@ async def ingest_data(data: Any, dataset_name: str, user: User): ) datasets = await get_datasets_by_name(dataset_name, user.id) - dataset = datasets[0] - data_documents = await get_dataset_data(dataset_id=dataset.id) - return data_documents + # In case no files were processed no dataset will be created + if datasets: + dataset = datasets[0] + data_documents = await get_dataset_data(dataset_id=dataset.id) + return data_documents + return [] diff --git a/cognee/tasks/summarization/summarize_text.py b/cognee/tasks/summarization/summarize_text.py index df7ac6740..cca41ae88 100644 --- a/cognee/tasks/summarization/summarize_text.py +++ b/cognee/tasks/summarization/summarize_text.py @@ -4,13 +4,20 @@ from uuid import uuid5 from pydantic import BaseModel from cognee.modules.data.extraction.extract_summary import extract_summary from cognee.modules.chunking.models.DocumentChunk import DocumentChunk +from cognee.modules.cognify.config import get_cognify_config from .models import TextSummary -async def summarize_text(data_chunks: list[DocumentChunk], summarization_model: Type[BaseModel]): +async def summarize_text( + data_chunks: list[DocumentChunk], summarization_model: Type[BaseModel] = None +): if len(data_chunks) == 0: return data_chunks + if summarization_model is None: + cognee_config = get_cognify_config() + summarization_model = cognee_config.summarization_model + chunk_summaries = await asyncio.gather( *[extract_summary(chunk.text, summarization_model) for chunk in data_chunks] )