diff --git a/.github/workflows/e2e_tests.yml b/.github/workflows/e2e_tests.yml index 520d93689..cb69e9ef6 100644 --- a/.github/workflows/e2e_tests.yml +++ b/.github/workflows/e2e_tests.yml @@ -582,3 +582,30 @@ jobs: DB_USERNAME: cognee DB_PASSWORD: cognee run: uv run python ./cognee/tests/test_conversation_history.py + + run-pipeline-cache-test: + name: Test Pipeline Caching + runs-on: ubuntu-22.04 + steps: + - name: Check out + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Cognee Setup + uses: ./.github/actions/cognee_setup + with: + python-version: '3.11.x' + + - name: Run Pipeline Cache Test + env: + ENV: 'dev' + LLM_MODEL: ${{ secrets.LLM_MODEL }} + LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }} + LLM_API_KEY: ${{ secrets.LLM_API_KEY }} + LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }} + EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }} + EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }} + EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }} + EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }} + run: uv run python ./cognee/tests/test_pipeline_cache.py diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index a521b316b..1ea4caca4 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -205,6 +205,7 @@ async def add( pipeline_name="add_pipeline", vector_db_config=vector_db_config, graph_db_config=graph_db_config, + use_pipeline_cache=True, incremental_loading=incremental_loading, data_per_batch=data_per_batch, ): diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index 9d9f7d154..9862edd49 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -237,6 +237,7 @@ async def cognify( vector_db_config=vector_db_config, graph_db_config=graph_db_config, incremental_loading=incremental_loading, + use_pipeline_cache=True, pipeline_name="cognify_pipeline", data_per_batch=data_per_batch, ) diff --git a/cognee/modules/memify/memify.py b/cognee/modules/memify/memify.py index 2d9b32a1b..e60eb5a4e 100644 --- a/cognee/modules/memify/memify.py +++ b/cognee/modules/memify/memify.py @@ -12,9 +12,6 @@ from cognee.modules.users.models import User from cognee.modules.pipelines.layers.resolve_authorized_user_datasets import ( resolve_authorized_user_datasets, ) -from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import ( - reset_dataset_pipeline_run_status, -) from cognee.modules.engine.operations.setup import setup from cognee.modules.pipelines.layers.pipeline_execution_mode import get_pipeline_executor from cognee.tasks.memify.extract_subgraph_chunks import extract_subgraph_chunks @@ -97,10 +94,6 @@ async def memify( *enrichment_tasks, ] - await reset_dataset_pipeline_run_status( - authorized_dataset.id, user, pipeline_names=["memify_pipeline"] - ) - # By calling get pipeline executor we get a function that will have the run_pipeline run in the background or a function that we will need to wait for pipeline_executor_func = get_pipeline_executor(run_in_background=run_in_background) @@ -113,6 +106,7 @@ async def memify( datasets=authorized_dataset.id, vector_db_config=vector_db_config, graph_db_config=graph_db_config, + use_pipeline_cache=False, incremental_loading=False, pipeline_name="memify_pipeline", ) diff --git a/cognee/modules/pipelines/operations/pipeline.py b/cognee/modules/pipelines/operations/pipeline.py index eb0ebe8bd..6641d3a4c 100644 --- a/cognee/modules/pipelines/operations/pipeline.py +++ b/cognee/modules/pipelines/operations/pipeline.py @@ -20,6 +20,9 @@ from cognee.modules.pipelines.layers.resolve_authorized_user_datasets import ( from cognee.modules.pipelines.layers.check_pipeline_run_qualification import ( check_pipeline_run_qualification, ) +from cognee.modules.pipelines.models.PipelineRunInfo import ( + PipelineRunStarted, +) from typing import Any logger = get_logger("cognee.pipeline") @@ -35,6 +38,7 @@ async def run_pipeline( pipeline_name: str = "custom_pipeline", vector_db_config: dict = None, graph_db_config: dict = None, + use_pipeline_cache: bool = False, incremental_loading: bool = False, data_per_batch: int = 20, ): @@ -51,6 +55,7 @@ async def run_pipeline( data=data, pipeline_name=pipeline_name, context={"dataset": dataset}, + use_pipeline_cache=use_pipeline_cache, incremental_loading=incremental_loading, data_per_batch=data_per_batch, ): @@ -64,6 +69,7 @@ async def run_pipeline_per_dataset( data=None, pipeline_name: str = "custom_pipeline", context: dict = None, + use_pipeline_cache=False, incremental_loading=False, data_per_batch: int = 20, ): @@ -77,8 +83,18 @@ async def run_pipeline_per_dataset( if process_pipeline_status: # If pipeline was already processed or is currently being processed # return status information to async generator and finish execution - yield process_pipeline_status - return + if use_pipeline_cache: + # If pipeline caching is enabled we do not proceed with re-processing + yield process_pipeline_status + return + else: + # If pipeline caching is disabled we always return pipeline started information and proceed with re-processing + yield PipelineRunStarted( + pipeline_run_id=process_pipeline_status.pipeline_run_id, + dataset_id=dataset.id, + dataset_name=dataset.name, + payload=data, + ) pipeline_run = run_tasks( tasks, diff --git a/cognee/modules/run_custom_pipeline/run_custom_pipeline.py b/cognee/modules/run_custom_pipeline/run_custom_pipeline.py index d3df1c060..269238503 100644 --- a/cognee/modules/run_custom_pipeline/run_custom_pipeline.py +++ b/cognee/modules/run_custom_pipeline/run_custom_pipeline.py @@ -18,6 +18,8 @@ async def run_custom_pipeline( user: User = None, vector_db_config: Optional[dict] = None, graph_db_config: Optional[dict] = None, + use_pipeline_cache: bool = False, + incremental_loading: bool = False, data_per_batch: int = 20, run_in_background: bool = False, pipeline_name: str = "custom_pipeline", @@ -40,6 +42,10 @@ async def run_custom_pipeline( user: User context for authentication and data access. Uses default if None. vector_db_config: Custom vector database configuration for embeddings storage. graph_db_config: Custom graph database configuration for relationship storage. + use_pipeline_cache: If True, pipelines with the same ID that are currently executing and pipelines with the same ID that were completed won't process data again. + Pipelines ID is created based on the generate_pipeline_id function. Pipeline status can be manually reset with the reset_dataset_pipeline_run_status function. + incremental_loading: If True, only new or modified data will be processed to avoid duplication. (Only works if data is used with the Cognee python Data model). + The incremental system stores and compares hashes of processed data in the Data model and skips data with the same content hash. data_per_batch: Number of data items to be processed in parallel. run_in_background: If True, starts processing asynchronously and returns immediately. If False, waits for completion before returning. @@ -63,7 +69,8 @@ async def run_custom_pipeline( datasets=dataset, vector_db_config=vector_db_config, graph_db_config=graph_db_config, - incremental_loading=False, + use_pipeline_cache=use_pipeline_cache, + incremental_loading=incremental_loading, data_per_batch=data_per_batch, pipeline_name=pipeline_name, ) diff --git a/cognee/tests/test_pipeline_cache.py b/cognee/tests/test_pipeline_cache.py new file mode 100644 index 000000000..8cdd6aa3c --- /dev/null +++ b/cognee/tests/test_pipeline_cache.py @@ -0,0 +1,164 @@ +""" +Test suite for the pipeline_cache feature in Cognee pipelines. + +This module tests the behavior of the `pipeline_cache` parameter which controls +whether a pipeline should skip re-execution when it has already been completed +for the same dataset. + +Architecture Overview: +--------------------- +The pipeline_cache mechanism works at the dataset level: +1. When a pipeline runs, it logs its status (INITIATED -> STARTED -> COMPLETED) +2. Before each run, `check_pipeline_run_qualification()` checks the pipeline status +3. If `use_pipeline_cache=True` and status is COMPLETED/STARTED, the pipeline skips +4. If `use_pipeline_cache=False`, the pipeline always re-executes regardless of status +""" + +import pytest + +import cognee +from cognee.modules.pipelines.tasks.task import Task +from cognee.modules.pipelines import run_pipeline +from cognee.modules.users.methods import get_default_user + +from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import ( + reset_dataset_pipeline_run_status, +) +from cognee.infrastructure.databases.relational import create_db_and_tables + + +class ExecutionCounter: + """Helper class to track task execution counts.""" + + def __init__(self): + self.count = 0 + + +async def create_counting_task(data, counter: ExecutionCounter): + """Create a task that increments a counter from the ExecutionCounter instance when executed.""" + counter.count += 1 + return counter + + +class TestPipelineCache: + """Tests for basic pipeline_cache on/off behavior.""" + + @pytest.mark.asyncio + async def test_pipeline_cache_off_allows_reexecution(self): + """ + Test that with use_pipeline_cache=False, the pipeline re-executes + even when it has already completed for the dataset. + + Expected behavior: + - First run: Pipeline executes fully, task runs once + - Second run: Pipeline executes again, task runs again (total: 2 times) + """ + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + await create_db_and_tables() + + counter = ExecutionCounter() + user = await get_default_user() + + tasks = [Task(create_counting_task, counter=counter)] + + # First run + pipeline_results_1 = [] + async for result in run_pipeline( + tasks=tasks, + datasets="test_dataset_cache_off", + data=["sample data"], # Data is necessary to trigger processing + user=user, + pipeline_name="test_cache_off_pipeline", + use_pipeline_cache=False, + ): + pipeline_results_1.append(result) + + first_run_count = counter.count + assert first_run_count >= 1, "Task should have executed at least once on first run" + + # Second run with pipeline_cache=False + pipeline_results_2 = [] + async for result in run_pipeline( + tasks=tasks, + datasets="test_dataset_cache_off", + data=["sample data"], # Data is necessary to trigger processing + user=user, + pipeline_name="test_cache_off_pipeline", + use_pipeline_cache=False, + ): + pipeline_results_2.append(result) + + second_run_count = counter.count + assert second_run_count > first_run_count, ( + f"With pipeline_cache=False, task should re-execute. " + f"First run: {first_run_count}, After second run: {second_run_count}" + ) + + @pytest.mark.asyncio + async def test_reset_pipeline_status_allows_reexecution_with_cache(self): + """ + Test that resetting pipeline status allows re-execution even with + pipeline_cache=True. + """ + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + await create_db_and_tables() + + counter = ExecutionCounter() + user = await get_default_user() + dataset_name = "reset_status_test" + pipeline_name = "test_reset_pipeline" + + tasks = [Task(create_counting_task, counter=counter)] + + # First run + pipeline_result = [] + async for result in run_pipeline( + tasks=tasks, + datasets=dataset_name, + user=user, + data=["sample data"], # Data is necessary to trigger processing + pipeline_name=pipeline_name, + use_pipeline_cache=True, + ): + pipeline_result.append(result) + + first_run_count = counter.count + assert first_run_count >= 1 + + # Second run without reset - should skip + async for _ in run_pipeline( + tasks=tasks, + datasets=dataset_name, + user=user, + data=["sample data"], # Data is necessary to trigger processing + pipeline_name=pipeline_name, + use_pipeline_cache=True, + ): + pass + + after_second_run = counter.count + assert after_second_run == first_run_count, "Should have skipped due to cache" + + # Reset the pipeline status + await reset_dataset_pipeline_run_status( + pipeline_result[0].dataset_id, user, pipeline_names=[pipeline_name] + ) + + # Third run after reset - should execute + async for _ in run_pipeline( + tasks=tasks, + datasets=dataset_name, + user=user, + data=["sample data"], # Data is necessary to trigger processing + pipeline_name=pipeline_name, + use_pipeline_cache=True, + ): + pass + + after_reset_run = counter.count + assert after_reset_run > after_second_run, ( + f"After reset, pipeline should re-execute. " + f"Before reset: {after_second_run}, After reset run: {after_reset_run}" + )