<!-- .github/pull_request_template.md -->
## Description
Make the pipeline cache mechanism optional, have it turned off by
default but use it for add and cognify like it has been used until now
## Type of Change
<!-- Please check the relevant option -->
- [ ] Bug fix (non-breaking change that fixes an issue)
- [x] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
- [ ] Documentation update
- [ ] Code refactoring
- [ ] Performance improvement
- [ ] Other (please specify):
## Pre-submission Checklist
<!-- Please check all boxes that apply before submitting your PR -->
- [x] **I have tested my changes thoroughly before submitting this PR**
- [x] **This PR contains minimal changes necessary to address the
issue/feature**
- [x] My code follows the project's coding standards and style
guidelines
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] I have added necessary documentation (if applicable)
- [x] All new and existing tests pass
- [ x I have searched existing PRs to ensure this change hasn't been
submitted already
- [x] I have linked any relevant issues in the description
- [x] My commits have clear and descriptive messages
## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit
* **New Features**
* Introduced pipeline caching across ingestion, processing, and custom
pipeline flows with per-run controls to enable or disable caching.
* Added an option for incremental loading in custom pipeline runs.
* **Behavior Changes**
* One pipeline path now explicitly bypasses caching by default to always
re-run when invoked.
* Disabling cache forces re-processing instead of early exit; cache
reset still enables re-execution.
* **Tests**
* Added tests validating caching, non-caching, and cache-reset
re-execution behavior.
* **Chores**
* Added CI job to run pipeline caching tests.
<sub>✏️ Tip: You can customize this high-level summary in your review
settings.</sub>
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
111 lines
3.7 KiB
Python
111 lines
3.7 KiB
Python
import asyncio
|
|
from uuid import UUID
|
|
from typing import Union
|
|
|
|
from cognee.modules.pipelines.layers.setup_and_check_environment import (
|
|
setup_and_check_environment,
|
|
)
|
|
|
|
from cognee.shared.logging_utils import get_logger
|
|
from cognee.modules.data.methods.get_dataset_data import get_dataset_data
|
|
from cognee.modules.data.models import Data, Dataset
|
|
from cognee.modules.pipelines.operations.run_tasks import run_tasks
|
|
from cognee.modules.pipelines.layers import validate_pipeline_tasks
|
|
from cognee.modules.pipelines.tasks.task import Task
|
|
from cognee.modules.users.models import User
|
|
from cognee.context_global_variables import set_database_global_context_variables
|
|
from cognee.modules.pipelines.layers.resolve_authorized_user_datasets import (
|
|
resolve_authorized_user_datasets,
|
|
)
|
|
from cognee.modules.pipelines.layers.check_pipeline_run_qualification import (
|
|
check_pipeline_run_qualification,
|
|
)
|
|
from cognee.modules.pipelines.models.PipelineRunInfo import (
|
|
PipelineRunStarted,
|
|
)
|
|
from typing import Any
|
|
|
|
logger = get_logger("cognee.pipeline")
|
|
|
|
update_status_lock = asyncio.Lock()
|
|
|
|
|
|
async def run_pipeline(
|
|
tasks: list[Task],
|
|
data=None,
|
|
datasets: Union[str, list[str], list[UUID]] = None,
|
|
user: User = None,
|
|
pipeline_name: str = "custom_pipeline",
|
|
vector_db_config: dict = None,
|
|
graph_db_config: dict = None,
|
|
use_pipeline_cache: bool = False,
|
|
incremental_loading: bool = False,
|
|
data_per_batch: int = 20,
|
|
):
|
|
validate_pipeline_tasks(tasks)
|
|
await setup_and_check_environment(vector_db_config, graph_db_config)
|
|
|
|
user, authorized_datasets = await resolve_authorized_user_datasets(datasets, user)
|
|
|
|
for dataset in authorized_datasets:
|
|
async for run_info in run_pipeline_per_dataset(
|
|
dataset=dataset,
|
|
user=user,
|
|
tasks=tasks,
|
|
data=data,
|
|
pipeline_name=pipeline_name,
|
|
context={"dataset": dataset},
|
|
use_pipeline_cache=use_pipeline_cache,
|
|
incremental_loading=incremental_loading,
|
|
data_per_batch=data_per_batch,
|
|
):
|
|
yield run_info
|
|
|
|
|
|
async def run_pipeline_per_dataset(
|
|
dataset: Dataset,
|
|
user: User,
|
|
tasks: list[Task],
|
|
data=None,
|
|
pipeline_name: str = "custom_pipeline",
|
|
context: dict = None,
|
|
use_pipeline_cache=False,
|
|
incremental_loading=False,
|
|
data_per_batch: int = 20,
|
|
):
|
|
# Will only be used if ENABLE_BACKEND_ACCESS_CONTROL is set to True
|
|
await set_database_global_context_variables(dataset.id, dataset.owner_id)
|
|
|
|
if not data:
|
|
data: list[Data] = await get_dataset_data(dataset_id=dataset.id)
|
|
|
|
process_pipeline_status = await check_pipeline_run_qualification(dataset, data, pipeline_name)
|
|
if process_pipeline_status:
|
|
# If pipeline was already processed or is currently being processed
|
|
# return status information to async generator and finish execution
|
|
if use_pipeline_cache:
|
|
# If pipeline caching is enabled we do not proceed with re-processing
|
|
yield process_pipeline_status
|
|
return
|
|
else:
|
|
# If pipeline caching is disabled we always return pipeline started information and proceed with re-processing
|
|
yield PipelineRunStarted(
|
|
pipeline_run_id=process_pipeline_status.pipeline_run_id,
|
|
dataset_id=dataset.id,
|
|
dataset_name=dataset.name,
|
|
payload=data,
|
|
)
|
|
|
|
pipeline_run = run_tasks(
|
|
tasks,
|
|
dataset.id,
|
|
data,
|
|
user,
|
|
pipeline_name,
|
|
context,
|
|
incremental_loading,
|
|
data_per_batch,
|
|
)
|
|
|
|
async for pipeline_run_info in pipeline_run:
|
|
yield pipeline_run_info
|