feat: make pipeline processing cache optional (#1876)

<!-- .github/pull_request_template.md -->

## Description
Make the pipeline cache mechanism optional, have it turned off by
default but use it for add and cognify like it has been used until now

## Type of Change
<!-- Please check the relevant option -->
- [ ] Bug fix (non-breaking change that fixes an issue)
- [x] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
- [ ] Documentation update
- [ ] Code refactoring
- [ ] Performance improvement
- [ ] Other (please specify):

## Pre-submission Checklist
<!-- Please check all boxes that apply before submitting your PR -->
- [x] **I have tested my changes thoroughly before submitting this PR**
- [x] **This PR contains minimal changes necessary to address the
issue/feature**
- [x] My code follows the project's coding standards and style
guidelines
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] I have added necessary documentation (if applicable)
- [x] All new and existing tests pass
- [ x I have searched existing PRs to ensure this change hasn't been
submitted already
- [x] I have linked any relevant issues in the description
- [x] My commits have clear and descriptive messages

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Introduced pipeline caching across ingestion, processing, and custom
pipeline flows with per-run controls to enable or disable caching.
  * Added an option for incremental loading in custom pipeline runs.

* **Behavior Changes**
* One pipeline path now explicitly bypasses caching by default to always
re-run when invoked.
* Disabling cache forces re-processing instead of early exit; cache
reset still enables re-execution.

* **Tests**
* Added tests validating caching, non-caching, and cache-reset
re-execution behavior.

* **Chores**
  * Added CI job to run pipeline caching tests.

<sub>✏️ Tip: You can customize this high-level summary in your review
settings.</sub>
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
Igor Ilic 2025-12-12 13:11:31 +01:00 committed by GitHub
parent 7b3d997a06
commit ede884e0b0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 220 additions and 10 deletions

View file

@ -582,3 +582,30 @@ jobs:
DB_USERNAME: cognee
DB_PASSWORD: cognee
run: uv run python ./cognee/tests/test_conversation_history.py
run-pipeline-cache-test:
name: Test Pipeline Caching
runs-on: ubuntu-22.04
steps:
- name: Check out
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Cognee Setup
uses: ./.github/actions/cognee_setup
with:
python-version: '3.11.x'
- name: Run Pipeline Cache Test
env:
ENV: 'dev'
LLM_MODEL: ${{ secrets.LLM_MODEL }}
LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }}
LLM_API_KEY: ${{ secrets.LLM_API_KEY }}
LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }}
EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }}
EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }}
EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }}
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
run: uv run python ./cognee/tests/test_pipeline_cache.py

View file

@ -205,6 +205,7 @@ async def add(
pipeline_name="add_pipeline",
vector_db_config=vector_db_config,
graph_db_config=graph_db_config,
use_pipeline_cache=True,
incremental_loading=incremental_loading,
data_per_batch=data_per_batch,
):

View file

@ -237,6 +237,7 @@ async def cognify(
vector_db_config=vector_db_config,
graph_db_config=graph_db_config,
incremental_loading=incremental_loading,
use_pipeline_cache=True,
pipeline_name="cognify_pipeline",
data_per_batch=data_per_batch,
)

View file

@ -12,9 +12,6 @@ from cognee.modules.users.models import User
from cognee.modules.pipelines.layers.resolve_authorized_user_datasets import (
resolve_authorized_user_datasets,
)
from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import (
reset_dataset_pipeline_run_status,
)
from cognee.modules.engine.operations.setup import setup
from cognee.modules.pipelines.layers.pipeline_execution_mode import get_pipeline_executor
from cognee.tasks.memify.extract_subgraph_chunks import extract_subgraph_chunks
@ -97,10 +94,6 @@ async def memify(
*enrichment_tasks,
]
await reset_dataset_pipeline_run_status(
authorized_dataset.id, user, pipeline_names=["memify_pipeline"]
)
# By calling get pipeline executor we get a function that will have the run_pipeline run in the background or a function that we will need to wait for
pipeline_executor_func = get_pipeline_executor(run_in_background=run_in_background)
@ -113,6 +106,7 @@ async def memify(
datasets=authorized_dataset.id,
vector_db_config=vector_db_config,
graph_db_config=graph_db_config,
use_pipeline_cache=False,
incremental_loading=False,
pipeline_name="memify_pipeline",
)

View file

@ -20,6 +20,9 @@ from cognee.modules.pipelines.layers.resolve_authorized_user_datasets import (
from cognee.modules.pipelines.layers.check_pipeline_run_qualification import (
check_pipeline_run_qualification,
)
from cognee.modules.pipelines.models.PipelineRunInfo import (
PipelineRunStarted,
)
from typing import Any
logger = get_logger("cognee.pipeline")
@ -35,6 +38,7 @@ async def run_pipeline(
pipeline_name: str = "custom_pipeline",
vector_db_config: dict = None,
graph_db_config: dict = None,
use_pipeline_cache: bool = False,
incremental_loading: bool = False,
data_per_batch: int = 20,
):
@ -51,6 +55,7 @@ async def run_pipeline(
data=data,
pipeline_name=pipeline_name,
context={"dataset": dataset},
use_pipeline_cache=use_pipeline_cache,
incremental_loading=incremental_loading,
data_per_batch=data_per_batch,
):
@ -64,6 +69,7 @@ async def run_pipeline_per_dataset(
data=None,
pipeline_name: str = "custom_pipeline",
context: dict = None,
use_pipeline_cache=False,
incremental_loading=False,
data_per_batch: int = 20,
):
@ -77,8 +83,18 @@ async def run_pipeline_per_dataset(
if process_pipeline_status:
# If pipeline was already processed or is currently being processed
# return status information to async generator and finish execution
yield process_pipeline_status
return
if use_pipeline_cache:
# If pipeline caching is enabled we do not proceed with re-processing
yield process_pipeline_status
return
else:
# If pipeline caching is disabled we always return pipeline started information and proceed with re-processing
yield PipelineRunStarted(
pipeline_run_id=process_pipeline_status.pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
payload=data,
)
pipeline_run = run_tasks(
tasks,

View file

@ -18,6 +18,8 @@ async def run_custom_pipeline(
user: User = None,
vector_db_config: Optional[dict] = None,
graph_db_config: Optional[dict] = None,
use_pipeline_cache: bool = False,
incremental_loading: bool = False,
data_per_batch: int = 20,
run_in_background: bool = False,
pipeline_name: str = "custom_pipeline",
@ -40,6 +42,10 @@ async def run_custom_pipeline(
user: User context for authentication and data access. Uses default if None.
vector_db_config: Custom vector database configuration for embeddings storage.
graph_db_config: Custom graph database configuration for relationship storage.
use_pipeline_cache: If True, pipelines with the same ID that are currently executing and pipelines with the same ID that were completed won't process data again.
Pipelines ID is created based on the generate_pipeline_id function. Pipeline status can be manually reset with the reset_dataset_pipeline_run_status function.
incremental_loading: If True, only new or modified data will be processed to avoid duplication. (Only works if data is used with the Cognee python Data model).
The incremental system stores and compares hashes of processed data in the Data model and skips data with the same content hash.
data_per_batch: Number of data items to be processed in parallel.
run_in_background: If True, starts processing asynchronously and returns immediately.
If False, waits for completion before returning.
@ -63,7 +69,8 @@ async def run_custom_pipeline(
datasets=dataset,
vector_db_config=vector_db_config,
graph_db_config=graph_db_config,
incremental_loading=False,
use_pipeline_cache=use_pipeline_cache,
incremental_loading=incremental_loading,
data_per_batch=data_per_batch,
pipeline_name=pipeline_name,
)

View file

@ -0,0 +1,164 @@
"""
Test suite for the pipeline_cache feature in Cognee pipelines.
This module tests the behavior of the `pipeline_cache` parameter which controls
whether a pipeline should skip re-execution when it has already been completed
for the same dataset.
Architecture Overview:
---------------------
The pipeline_cache mechanism works at the dataset level:
1. When a pipeline runs, it logs its status (INITIATED -> STARTED -> COMPLETED)
2. Before each run, `check_pipeline_run_qualification()` checks the pipeline status
3. If `use_pipeline_cache=True` and status is COMPLETED/STARTED, the pipeline skips
4. If `use_pipeline_cache=False`, the pipeline always re-executes regardless of status
"""
import pytest
import cognee
from cognee.modules.pipelines.tasks.task import Task
from cognee.modules.pipelines import run_pipeline
from cognee.modules.users.methods import get_default_user
from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import (
reset_dataset_pipeline_run_status,
)
from cognee.infrastructure.databases.relational import create_db_and_tables
class ExecutionCounter:
"""Helper class to track task execution counts."""
def __init__(self):
self.count = 0
async def create_counting_task(data, counter: ExecutionCounter):
"""Create a task that increments a counter from the ExecutionCounter instance when executed."""
counter.count += 1
return counter
class TestPipelineCache:
"""Tests for basic pipeline_cache on/off behavior."""
@pytest.mark.asyncio
async def test_pipeline_cache_off_allows_reexecution(self):
"""
Test that with use_pipeline_cache=False, the pipeline re-executes
even when it has already completed for the dataset.
Expected behavior:
- First run: Pipeline executes fully, task runs once
- Second run: Pipeline executes again, task runs again (total: 2 times)
"""
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
await create_db_and_tables()
counter = ExecutionCounter()
user = await get_default_user()
tasks = [Task(create_counting_task, counter=counter)]
# First run
pipeline_results_1 = []
async for result in run_pipeline(
tasks=tasks,
datasets="test_dataset_cache_off",
data=["sample data"], # Data is necessary to trigger processing
user=user,
pipeline_name="test_cache_off_pipeline",
use_pipeline_cache=False,
):
pipeline_results_1.append(result)
first_run_count = counter.count
assert first_run_count >= 1, "Task should have executed at least once on first run"
# Second run with pipeline_cache=False
pipeline_results_2 = []
async for result in run_pipeline(
tasks=tasks,
datasets="test_dataset_cache_off",
data=["sample data"], # Data is necessary to trigger processing
user=user,
pipeline_name="test_cache_off_pipeline",
use_pipeline_cache=False,
):
pipeline_results_2.append(result)
second_run_count = counter.count
assert second_run_count > first_run_count, (
f"With pipeline_cache=False, task should re-execute. "
f"First run: {first_run_count}, After second run: {second_run_count}"
)
@pytest.mark.asyncio
async def test_reset_pipeline_status_allows_reexecution_with_cache(self):
"""
Test that resetting pipeline status allows re-execution even with
pipeline_cache=True.
"""
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
await create_db_and_tables()
counter = ExecutionCounter()
user = await get_default_user()
dataset_name = "reset_status_test"
pipeline_name = "test_reset_pipeline"
tasks = [Task(create_counting_task, counter=counter)]
# First run
pipeline_result = []
async for result in run_pipeline(
tasks=tasks,
datasets=dataset_name,
user=user,
data=["sample data"], # Data is necessary to trigger processing
pipeline_name=pipeline_name,
use_pipeline_cache=True,
):
pipeline_result.append(result)
first_run_count = counter.count
assert first_run_count >= 1
# Second run without reset - should skip
async for _ in run_pipeline(
tasks=tasks,
datasets=dataset_name,
user=user,
data=["sample data"], # Data is necessary to trigger processing
pipeline_name=pipeline_name,
use_pipeline_cache=True,
):
pass
after_second_run = counter.count
assert after_second_run == first_run_count, "Should have skipped due to cache"
# Reset the pipeline status
await reset_dataset_pipeline_run_status(
pipeline_result[0].dataset_id, user, pipeline_names=[pipeline_name]
)
# Third run after reset - should execute
async for _ in run_pipeline(
tasks=tasks,
datasets=dataset_name,
user=user,
data=["sample data"], # Data is necessary to trigger processing
pipeline_name=pipeline_name,
use_pipeline_cache=True,
):
pass
after_reset_run = counter.count
assert after_reset_run > after_second_run, (
f"After reset, pipeline should re-execute. "
f"Before reset: {after_second_run}, After reset run: {after_reset_run}"
)