feat: Add initial custom pipeline (#1716)

<!-- .github/pull_request_template.md -->

## Description
Add run_custom_pipeline to have a way to execute a custom collection of tasks in Cognee

## Type of Change
<!-- Please check the relevant option -->
- [ ] Bug fix (non-breaking change that fixes an issue)
- [ ] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
- [ ] Documentation update
- [ ] Code refactoring
- [ ] Performance improvement
- [ ] Other (please specify):

## Screenshots/Videos (if applicable)
<!-- Add screenshots or videos to help explain your changes -->

## Pre-submission Checklist
<!-- Please check all boxes that apply before submitting your PR -->
- [ ] **I have tested my changes thoroughly before submitting this PR**
- [ ] **This PR contains minimal changes necessary to address the
issue/feature**
- [ ] My code follows the project's coding standards and style
guidelines
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have added necessary documentation (if applicable)
- [ ] All new and existing tests pass
- [ ] I have searched existing PRs to ensure this change hasn't been
submitted already
- [ ] I have linked any relevant issues in the description
- [ ] My commits have clear and descriptive messages

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
This commit is contained in:
Igor Ilic 2025-11-04 17:58:34 +01:00 committed by GitHub
parent d54cd85575
commit bee2fe3ba7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 181 additions and 1 deletions

View file

@ -210,6 +210,31 @@ jobs:
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
run: uv run python ./examples/python/memify_coding_agent_example.py
test-custom-pipeline:
name: Run Custom Pipeline Example
runs-on: ubuntu-22.04
steps:
- name: Check out repository
uses: actions/checkout@v4
- name: Cognee Setup
uses: ./.github/actions/cognee_setup
with:
python-version: '3.11.x'
- name: Run Custom Pipeline Example
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
LLM_MODEL: ${{ secrets.LLM_MODEL }}
LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }}
LLM_API_KEY: ${{ secrets.LLM_API_KEY }}
LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }}
EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }}
EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }}
EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }}
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
run: uv run python ./examples/python/run_custom_pipeline_example.py
test-permissions-example:
name: Run Permissions Example
runs-on: ubuntu-22.04

View file

@ -19,6 +19,7 @@ from .api.v1.add import add
from .api.v1.delete import delete
from .api.v1.cognify import cognify
from .modules.memify import memify
from .modules.run_custom_pipeline import run_custom_pipeline
from .api.v1.update import update
from .api.v1.config.config import config
from .api.v1.datasets.datasets import datasets

View file

@ -0,0 +1 @@
from .run_custom_pipeline import run_custom_pipeline

View file

@ -0,0 +1,69 @@
from typing import Union, Optional, List, Type, Any
from uuid import UUID
from cognee.shared.logging_utils import get_logger
from cognee.modules.pipelines import run_pipeline
from cognee.modules.pipelines.tasks.task import Task
from cognee.modules.users.models import User
from cognee.modules.pipelines.layers.pipeline_execution_mode import get_pipeline_executor
logger = get_logger()
async def run_custom_pipeline(
tasks: Union[List[Task], List[str]] = None,
data: Any = None,
dataset: Union[str, UUID] = "main_dataset",
user: User = None,
vector_db_config: Optional[dict] = None,
graph_db_config: Optional[dict] = None,
data_per_batch: int = 20,
run_in_background: bool = False,
pipeline_name: str = "custom_pipeline",
):
"""
Custom pipeline in Cognee, can work with already built graphs. Data needs to be provided which can be processed
with provided tasks.
Provided tasks and data will be arranged to run the Cognee pipeline and execute graph enrichment/creation.
This is the core processing step in Cognee that converts raw text and documents
into an intelligent knowledge graph. It analyzes content, extracts entities and
relationships, and creates semantic connections for enhanced search and reasoning.
Args:
tasks: List of Cognee Tasks to execute.
data: The data to ingest. Can be anything when custom extraction and enrichment tasks are used.
Data provided here will be forwarded to the first extraction task in the pipeline as input.
dataset: Dataset name or dataset uuid to process.
user: User context for authentication and data access. Uses default if None.
vector_db_config: Custom vector database configuration for embeddings storage.
graph_db_config: Custom graph database configuration for relationship storage.
data_per_batch: Number of data items to be processed in parallel.
run_in_background: If True, starts processing asynchronously and returns immediately.
If False, waits for completion before returning.
Background mode recommended for large datasets (>100MB).
Use pipeline_run_id from return value to monitor progress.
"""
custom_tasks = [
*tasks,
]
# By calling get pipeline executor we get a function that will have the run_pipeline run in the background or a function that we will need to wait for
pipeline_executor_func = get_pipeline_executor(run_in_background=run_in_background)
# Run the run_pipeline in the background or blocking based on executor
return await pipeline_executor_func(
pipeline=run_pipeline,
tasks=custom_tasks,
user=user,
data=data,
datasets=dataset,
vector_db_config=vector_db_config,
graph_db_config=graph_db_config,
incremental_loading=False,
data_per_batch=data_per_batch,
pipeline_name=pipeline_name,
)

View file

@ -10,7 +10,7 @@ from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.users.methods.create_default_user import create_default_user
async def get_default_user() -> SimpleNamespace:
async def get_default_user() -> User:
db_engine = get_relational_engine()
base_config = get_base_config()
default_email = base_config.default_user_email or "default_user@example.com"

View file

@ -0,0 +1,84 @@
import asyncio
import cognee
from cognee.modules.engine.operations.setup import setup
from cognee.modules.users.methods import get_default_user
from cognee.shared.logging_utils import setup_logging, INFO
from cognee.modules.pipelines import Task
from cognee.api.v1.search import SearchType
# Prerequisites:
# 1. Copy `.env.template` and rename it to `.env`.
# 2. Add your OpenAI API key to the `.env` file in the `LLM_API_KEY` field:
# LLM_API_KEY = "your_key_here"
async def main():
# Create a clean slate for cognee -- reset data and system state
print("Resetting cognee data...")
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
print("Data reset complete.\n")
# Create relational database and tables
await setup()
# cognee knowledge graph will be created based on this text
text = """
Natural language processing (NLP) is an interdisciplinary
subfield of computer science and information retrieval.
"""
print("Adding text to cognee:")
print(text.strip())
# Let's recreate the cognee add pipeline through the custom pipeline framework
from cognee.tasks.ingestion import ingest_data, resolve_data_directories
user = await get_default_user()
# Values for tasks need to be filled before calling the pipeline
add_tasks = [
Task(resolve_data_directories, include_subdirectories=True),
Task(
ingest_data,
"main_dataset",
user,
),
]
# Forward tasks to custom pipeline along with data and user information
await cognee.run_custom_pipeline(
tasks=add_tasks, data=text, user=user, dataset="main_dataset", pipeline_name="add_pipeline"
)
print("Text added successfully.\n")
# Use LLMs and cognee to create knowledge graph
from cognee.api.v1.cognify.cognify import get_default_tasks
cognify_tasks = await get_default_tasks(user=user)
print("Recreating existing cognify pipeline in custom pipeline to create knowledge graph...\n")
await cognee.run_custom_pipeline(
tasks=cognify_tasks, user=user, dataset="main_dataset", pipeline_name="cognify_pipeline"
)
print("Cognify process complete.\n")
query_text = "Tell me about NLP"
print(f"Searching cognee for insights with query: '{query_text}'")
# Query cognee for insights on the added text
search_results = await cognee.search(
query_type=SearchType.GRAPH_COMPLETION, query_text=query_text
)
print("Search results:")
# Display results
for result_text in search_results:
print(result_text)
if __name__ == "__main__":
logger = setup_logging(log_level=INFO)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())