From 6e5acec292a6a0e4b81e69a11674a8c7af72fdd2 Mon Sep 17 00:00:00 2001 From: Boris Date: Wed, 27 Aug 2025 09:49:20 +0200 Subject: [PATCH] refactor: make run_pipeline a high-level api for running pipelines (#1294) ## Description ## DCO Affirmation I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin. --- cognee/api/v1/add/add.py | 4 ++-- cognee/api/v1/cognify/cognify.py | 8 +++---- .../corpus_builder/corpus_builder_executor.py | 4 ++-- cognee/modules/pipelines/__init__.py | 2 +- ...py => check_pipeline_run_qualification.py} | 2 +- ...ecks.py => setup_and_check_environment.py} | 2 +- .../modules/pipelines/operations/__init__.py | 1 - .../modules/pipelines/operations/pipeline.py | 21 ++++++++++--------- 8 files changed, 22 insertions(+), 22 deletions(-) rename cognee/modules/pipelines/layers/{process_pipeline_check.py => check_pipeline_run_qualification.py} (98%) rename cognee/modules/pipelines/layers/{environment_setup_and_checks.py => setup_and_check_environment.py} (96%) diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index fbf4e0a54..98771947c 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -2,7 +2,7 @@ from uuid import UUID from typing import Union, BinaryIO, List, Optional from cognee.modules.users.models import User -from cognee.modules.pipelines import Task, cognee_pipeline +from cognee.modules.pipelines import Task, run_pipeline from cognee.modules.pipelines.layers.resolve_authorized_user_dataset import ( resolve_authorized_user_dataset, ) @@ -154,7 +154,7 @@ async def add( pipeline_run_info = None - async for run_info in cognee_pipeline( + async for run_info in run_pipeline( tasks=tasks, datasets=[authorized_dataset.id], data=data, diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index 59ee075ed..21d750875 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -7,7 +7,7 @@ from cognee.shared.logging_utils import get_logger from cognee.shared.data_models import KnowledgeGraph from cognee.infrastructure.llm import get_max_chunk_tokens -from cognee.modules.pipelines import cognee_pipeline +from cognee.modules.pipelines import run_pipeline from cognee.modules.pipelines.tasks.task import Task from cognee.modules.chunking.TextChunker import TextChunker from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver @@ -179,12 +179,12 @@ async def cognify( """ tasks = await get_default_tasks(user, graph_model, chunker, chunk_size, ontology_file_path) - # By calling get pipeline executor we get a function that will have the cognee_pipeline run in the background or a function that we will need to wait for + # By calling get pipeline executor we get a function that will have the run_pipeline run in the background or a function that we will need to wait for pipeline_executor_func = get_pipeline_executor(run_in_background=run_in_background) - # Run the cognee_pipeline in the background or blocking based on executor + # Run the run_pipeline in the background or blocking based on executor return await pipeline_executor_func( - pipeline=cognee_pipeline, + pipeline=run_pipeline, tasks=tasks, user=user, datasets=datasets, diff --git a/cognee/eval_framework/corpus_builder/corpus_builder_executor.py b/cognee/eval_framework/corpus_builder/corpus_builder_executor.py index ec5552020..3f7d296a5 100644 --- a/cognee/eval_framework/corpus_builder/corpus_builder_executor.py +++ b/cognee/eval_framework/corpus_builder/corpus_builder_executor.py @@ -5,7 +5,7 @@ from typing import Optional, Tuple, List, Dict, Union, Any, Callable, Awaitable from cognee.eval_framework.benchmark_adapters.benchmark_adapters import BenchmarkAdapter from cognee.modules.chunking.TextChunker import TextChunker from cognee.modules.pipelines.tasks.task import Task -from cognee.modules.pipelines import cognee_pipeline +from cognee.modules.pipelines import run_pipeline logger = get_logger(level=ERROR) @@ -61,7 +61,7 @@ class CorpusBuilderExecutor: await cognee.add(self.raw_corpus) tasks = await self.task_getter(chunk_size=chunk_size, chunker=chunker) - pipeline_run = cognee_pipeline(tasks=tasks) + pipeline_run = run_pipeline(tasks=tasks) async for run_info in pipeline_run: print(run_info) diff --git a/cognee/modules/pipelines/__init__.py b/cognee/modules/pipelines/__init__.py index 036b08f1e..6fca237ca 100644 --- a/cognee/modules/pipelines/__init__.py +++ b/cognee/modules/pipelines/__init__.py @@ -1,4 +1,4 @@ from .tasks.task import Task from .operations.run_tasks import run_tasks from .operations.run_parallel import run_tasks_parallel -from .operations.pipeline import cognee_pipeline +from .operations.pipeline import run_pipeline diff --git a/cognee/modules/pipelines/layers/process_pipeline_check.py b/cognee/modules/pipelines/layers/check_pipeline_run_qualification.py similarity index 98% rename from cognee/modules/pipelines/layers/process_pipeline_check.py rename to cognee/modules/pipelines/layers/check_pipeline_run_qualification.py index 8318bb50e..f1edcec91 100644 --- a/cognee/modules/pipelines/layers/process_pipeline_check.py +++ b/cognee/modules/pipelines/layers/check_pipeline_run_qualification.py @@ -14,7 +14,7 @@ from cognee.modules.pipelines.models.PipelineRunInfo import ( logger = get_logger(__name__) -async def process_pipeline_check( +async def check_pipeline_run_qualification( dataset: Dataset, data: list[Data], pipeline_name: str ) -> Optional[Union[PipelineRunStarted, PipelineRunCompleted]]: """ diff --git a/cognee/modules/pipelines/layers/environment_setup_and_checks.py b/cognee/modules/pipelines/layers/setup_and_check_environment.py similarity index 96% rename from cognee/modules/pipelines/layers/environment_setup_and_checks.py rename to cognee/modules/pipelines/layers/setup_and_check_environment.py index bca8bc4c9..55e58ed8a 100644 --- a/cognee/modules/pipelines/layers/environment_setup_and_checks.py +++ b/cognee/modules/pipelines/layers/setup_and_check_environment.py @@ -15,7 +15,7 @@ _first_run_done = False _first_run_lock = asyncio.Lock() -async def environment_setup_and_checks( +async def setup_and_check_environment( vector_db_config: dict = None, graph_db_config: dict = None, ): diff --git a/cognee/modules/pipelines/operations/__init__.py b/cognee/modules/pipelines/operations/__init__.py index 21ee552f0..68d7582c3 100644 --- a/cognee/modules/pipelines/operations/__init__.py +++ b/cognee/modules/pipelines/operations/__init__.py @@ -2,4 +2,3 @@ from .log_pipeline_run_initiated import log_pipeline_run_initiated from .log_pipeline_run_start import log_pipeline_run_start from .log_pipeline_run_complete import log_pipeline_run_complete from .log_pipeline_run_error import log_pipeline_run_error -from .pipeline import cognee_pipeline diff --git a/cognee/modules/pipelines/operations/pipeline.py b/cognee/modules/pipelines/operations/pipeline.py index ad24edac8..cbe6dee5c 100644 --- a/cognee/modules/pipelines/operations/pipeline.py +++ b/cognee/modules/pipelines/operations/pipeline.py @@ -2,8 +2,8 @@ import asyncio from uuid import UUID from typing import Union -from cognee.modules.pipelines.layers.environment_setup_and_checks import ( - environment_setup_and_checks, +from cognee.modules.pipelines.layers.setup_and_check_environment import ( + setup_and_check_environment, ) from cognee.shared.logging_utils import get_logger from cognee.modules.data.methods.get_dataset_data import get_dataset_data @@ -16,14 +16,16 @@ from cognee.context_global_variables import set_database_global_context_variable from cognee.modules.pipelines.layers.resolve_authorized_user_datasets import ( resolve_authorized_user_datasets, ) -from cognee.modules.pipelines.layers.process_pipeline_check import process_pipeline_check +from cognee.modules.pipelines.layers.check_pipeline_run_qualification import ( + check_pipeline_run_qualification, +) logger = get_logger("cognee.pipeline") update_status_lock = asyncio.Lock() -async def cognee_pipeline( +async def run_pipeline( tasks: list[Task], data=None, datasets: Union[str, list[str], list[UUID]] = None, @@ -33,12 +35,13 @@ async def cognee_pipeline( graph_db_config: dict = None, incremental_loading: bool = False, ): - await environment_setup_and_checks(vector_db_config, graph_db_config) + validate_pipeline_tasks(tasks) + await setup_and_check_environment(vector_db_config, graph_db_config) user, authorized_datasets = await resolve_authorized_user_datasets(datasets, user) for dataset in authorized_datasets: - async for run_info in run_pipeline( + async for run_info in run_pipeline_per_dataset( dataset=dataset, user=user, tasks=tasks, @@ -50,7 +53,7 @@ async def cognee_pipeline( yield run_info -async def run_pipeline( +async def run_pipeline_per_dataset( dataset: Dataset, user: User, tasks: list[Task], @@ -59,15 +62,13 @@ async def run_pipeline( context: dict = None, incremental_loading=False, ): - validate_pipeline_tasks(tasks) - # Will only be used if ENABLE_BACKEND_ACCESS_CONTROL is set to True await set_database_global_context_variables(dataset.id, dataset.owner_id) if not data: data: list[Data] = await get_dataset_data(dataset_id=dataset.id) - process_pipeline_status = await process_pipeline_check(dataset, data, pipeline_name) + process_pipeline_status = await check_pipeline_run_qualification(dataset, data, pipeline_name) if process_pipeline_status: # If pipeline was already processed or is currently being processed # return status information to async generator and finish execution