From bee2fe3ba70307c4a5ea03a5cdc1e07e927708b4 Mon Sep 17 00:00:00 2001 From: Igor Ilic <30923996+dexters1@users.noreply.github.com> Date: Tue, 4 Nov 2025 17:58:34 +0100 Subject: [PATCH] feat: Add initial custom pipeline (#1716) ## Description Add run_custom_pipeline to have a way to execute a custom collection of tasks in Cognee ## Type of Change - [ ] Bug fix (non-breaking change that fixes an issue) - [ ] New feature (non-breaking change that adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) - [ ] Documentation update - [ ] Code refactoring - [ ] Performance improvement - [ ] Other (please specify): ## Screenshots/Videos (if applicable) ## Pre-submission Checklist - [ ] **I have tested my changes thoroughly before submitting this PR** - [ ] **This PR contains minimal changes necessary to address the issue/feature** - [ ] My code follows the project's coding standards and style guidelines - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] I have added necessary documentation (if applicable) - [ ] All new and existing tests pass - [ ] I have searched existing PRs to ensure this change hasn't been submitted already - [ ] I have linked any relevant issues in the description - [ ] My commits have clear and descriptive messages ## DCO Affirmation I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin. --- .github/workflows/examples_tests.yml | 25 ++++++ cognee/__init__.py | 1 + .../modules/run_custom_pipeline/__init__.py | 1 + .../run_custom_pipeline.py | 69 +++++++++++++++ .../modules/users/methods/get_default_user.py | 2 +- .../python/run_custom_pipeline_example.py | 84 +++++++++++++++++++ 6 files changed, 181 insertions(+), 1 deletion(-) create mode 100644 cognee/modules/run_custom_pipeline/__init__.py create mode 100644 cognee/modules/run_custom_pipeline/run_custom_pipeline.py create mode 100644 examples/python/run_custom_pipeline_example.py diff --git a/.github/workflows/examples_tests.yml b/.github/workflows/examples_tests.yml index 57bc88157..36953e259 100644 --- a/.github/workflows/examples_tests.yml +++ b/.github/workflows/examples_tests.yml @@ -210,6 +210,31 @@ jobs: EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }} run: uv run python ./examples/python/memify_coding_agent_example.py + test-custom-pipeline: + name: Run Custom Pipeline Example + runs-on: ubuntu-22.04 + steps: + - name: Check out repository + uses: actions/checkout@v4 + + - name: Cognee Setup + uses: ./.github/actions/cognee_setup + with: + python-version: '3.11.x' + + - name: Run Custom Pipeline Example + env: + OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} + LLM_MODEL: ${{ secrets.LLM_MODEL }} + LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }} + LLM_API_KEY: ${{ secrets.LLM_API_KEY }} + LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }} + EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }} + EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }} + EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }} + EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }} + run: uv run python ./examples/python/run_custom_pipeline_example.py + test-permissions-example: name: Run Permissions Example runs-on: ubuntu-22.04 diff --git a/cognee/__init__.py b/cognee/__init__.py index 6e4d2a903..4d150ce4e 100644 --- a/cognee/__init__.py +++ b/cognee/__init__.py @@ -19,6 +19,7 @@ from .api.v1.add import add from .api.v1.delete import delete from .api.v1.cognify import cognify from .modules.memify import memify +from .modules.run_custom_pipeline import run_custom_pipeline from .api.v1.update import update from .api.v1.config.config import config from .api.v1.datasets.datasets import datasets diff --git a/cognee/modules/run_custom_pipeline/__init__.py b/cognee/modules/run_custom_pipeline/__init__.py new file mode 100644 index 000000000..2d30e2e0c --- /dev/null +++ b/cognee/modules/run_custom_pipeline/__init__.py @@ -0,0 +1 @@ +from .run_custom_pipeline import run_custom_pipeline diff --git a/cognee/modules/run_custom_pipeline/run_custom_pipeline.py b/cognee/modules/run_custom_pipeline/run_custom_pipeline.py new file mode 100644 index 000000000..d3df1c060 --- /dev/null +++ b/cognee/modules/run_custom_pipeline/run_custom_pipeline.py @@ -0,0 +1,69 @@ +from typing import Union, Optional, List, Type, Any +from uuid import UUID + +from cognee.shared.logging_utils import get_logger + +from cognee.modules.pipelines import run_pipeline +from cognee.modules.pipelines.tasks.task import Task +from cognee.modules.users.models import User +from cognee.modules.pipelines.layers.pipeline_execution_mode import get_pipeline_executor + +logger = get_logger() + + +async def run_custom_pipeline( + tasks: Union[List[Task], List[str]] = None, + data: Any = None, + dataset: Union[str, UUID] = "main_dataset", + user: User = None, + vector_db_config: Optional[dict] = None, + graph_db_config: Optional[dict] = None, + data_per_batch: int = 20, + run_in_background: bool = False, + pipeline_name: str = "custom_pipeline", +): + """ + Custom pipeline in Cognee, can work with already built graphs. Data needs to be provided which can be processed + with provided tasks. + + Provided tasks and data will be arranged to run the Cognee pipeline and execute graph enrichment/creation. + + This is the core processing step in Cognee that converts raw text and documents + into an intelligent knowledge graph. It analyzes content, extracts entities and + relationships, and creates semantic connections for enhanced search and reasoning. + + Args: + tasks: List of Cognee Tasks to execute. + data: The data to ingest. Can be anything when custom extraction and enrichment tasks are used. + Data provided here will be forwarded to the first extraction task in the pipeline as input. + dataset: Dataset name or dataset uuid to process. + user: User context for authentication and data access. Uses default if None. + vector_db_config: Custom vector database configuration for embeddings storage. + graph_db_config: Custom graph database configuration for relationship storage. + data_per_batch: Number of data items to be processed in parallel. + run_in_background: If True, starts processing asynchronously and returns immediately. + If False, waits for completion before returning. + Background mode recommended for large datasets (>100MB). + Use pipeline_run_id from return value to monitor progress. + """ + + custom_tasks = [ + *tasks, + ] + + # By calling get pipeline executor we get a function that will have the run_pipeline run in the background or a function that we will need to wait for + pipeline_executor_func = get_pipeline_executor(run_in_background=run_in_background) + + # Run the run_pipeline in the background or blocking based on executor + return await pipeline_executor_func( + pipeline=run_pipeline, + tasks=custom_tasks, + user=user, + data=data, + datasets=dataset, + vector_db_config=vector_db_config, + graph_db_config=graph_db_config, + incremental_loading=False, + data_per_batch=data_per_batch, + pipeline_name=pipeline_name, + ) diff --git a/cognee/modules/users/methods/get_default_user.py b/cognee/modules/users/methods/get_default_user.py index 773545f8e..9e3940617 100644 --- a/cognee/modules/users/methods/get_default_user.py +++ b/cognee/modules/users/methods/get_default_user.py @@ -10,7 +10,7 @@ from cognee.infrastructure.databases.relational import get_relational_engine from cognee.modules.users.methods.create_default_user import create_default_user -async def get_default_user() -> SimpleNamespace: +async def get_default_user() -> User: db_engine = get_relational_engine() base_config = get_base_config() default_email = base_config.default_user_email or "default_user@example.com" diff --git a/examples/python/run_custom_pipeline_example.py b/examples/python/run_custom_pipeline_example.py new file mode 100644 index 000000000..1ca1b4402 --- /dev/null +++ b/examples/python/run_custom_pipeline_example.py @@ -0,0 +1,84 @@ +import asyncio +import cognee +from cognee.modules.engine.operations.setup import setup +from cognee.modules.users.methods import get_default_user +from cognee.shared.logging_utils import setup_logging, INFO +from cognee.modules.pipelines import Task +from cognee.api.v1.search import SearchType + +# Prerequisites: +# 1. Copy `.env.template` and rename it to `.env`. +# 2. Add your OpenAI API key to the `.env` file in the `LLM_API_KEY` field: +# LLM_API_KEY = "your_key_here" + + +async def main(): + # Create a clean slate for cognee -- reset data and system state + print("Resetting cognee data...") + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + print("Data reset complete.\n") + + # Create relational database and tables + await setup() + + # cognee knowledge graph will be created based on this text + text = """ + Natural language processing (NLP) is an interdisciplinary + subfield of computer science and information retrieval. + """ + + print("Adding text to cognee:") + print(text.strip()) + + # Let's recreate the cognee add pipeline through the custom pipeline framework + from cognee.tasks.ingestion import ingest_data, resolve_data_directories + + user = await get_default_user() + + # Values for tasks need to be filled before calling the pipeline + add_tasks = [ + Task(resolve_data_directories, include_subdirectories=True), + Task( + ingest_data, + "main_dataset", + user, + ), + ] + # Forward tasks to custom pipeline along with data and user information + await cognee.run_custom_pipeline( + tasks=add_tasks, data=text, user=user, dataset="main_dataset", pipeline_name="add_pipeline" + ) + print("Text added successfully.\n") + + # Use LLMs and cognee to create knowledge graph + from cognee.api.v1.cognify.cognify import get_default_tasks + + cognify_tasks = await get_default_tasks(user=user) + print("Recreating existing cognify pipeline in custom pipeline to create knowledge graph...\n") + await cognee.run_custom_pipeline( + tasks=cognify_tasks, user=user, dataset="main_dataset", pipeline_name="cognify_pipeline" + ) + print("Cognify process complete.\n") + + query_text = "Tell me about NLP" + print(f"Searching cognee for insights with query: '{query_text}'") + # Query cognee for insights on the added text + search_results = await cognee.search( + query_type=SearchType.GRAPH_COMPLETION, query_text=query_text + ) + + print("Search results:") + # Display results + for result_text in search_results: + print(result_text) + + +if __name__ == "__main__": + logger = setup_logging(log_level=INFO) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(main()) + finally: + loop.run_until_complete(loop.shutdown_asyncgens())